Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private static class State {
String version;
String telemetryProxyEndpoint;
Set<String> peerTags = emptySet();
String orgPropagationMarker;
long lastTimeDiscovered;
}

Expand Down Expand Up @@ -316,6 +317,8 @@ private boolean processInfoResponse(State newState, String response) {
? unmodifiableSet(new HashSet<>((List<String>) peer_tags))
: emptySet();
}
Object opm = map.get("org_prop_marker");
newState.orgPropagationMarker = (opm instanceof String) ? (String) opm : null;
try {
newState.state = Strings.sha256(response);
} catch (Throwable ex) {
Expand Down Expand Up @@ -403,6 +406,10 @@ public Set<String> peerTags() {
return discoveryState.peerTags;
}

public String getOrgPropagationMarker() {
return discoveryState.orgPropagationMarker;
}

public String getMetricsEndpoint() {
return discoveryState.metricsEndpoint;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
static final String INFO_WITH_LONG_RUNNING_SPANS = loadJsonFile("agent-info-with-long-running-spans.json")
static final String INFO_WITH_TELEMETRY_PROXY_RESPONSE = loadJsonFile("agent-info-with-telemetry-proxy.json")
static final String INFO_WITH_OLD_EVP_PROXY = loadJsonFile("agent-info-with-old-evp-proxy.json")
static final String INFO_WITH_OPM = loadJsonFile("agent-info-with-opm.json")
static final String PROBE_STATE = "probestate"

def "test parse /info response"() {
Expand Down Expand Up @@ -209,6 +210,34 @@ class DDAgentFeaturesDiscoveryTest extends DDSpecification {
0 * _
}

def "test parse /info response with org propagation marker"() {
setup:
OkHttpClient client = Mock(OkHttpClient)
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_5, true, false)

when: "/info available"
features.discover()

then:
1 * client.newCall(_) >> { Request request -> infoResponse(request, INFO_WITH_OPM) }
features.getOrgPropagationMarker() == "abc123def0"
0 * _
}

def "test parse /info response without org propagation marker"() {
setup:
OkHttpClient client = Mock(OkHttpClient)
DDAgentFeaturesDiscovery features = new DDAgentFeaturesDiscovery(client, monitoring, agentUrl, V0_5, true, false)

when: "/info available"
features.discover()

then:
1 * client.newCall(_) >> { Request request -> infoResponse(request, INFO_RESPONSE) }
features.getOrgPropagationMarker() == null
0 * _
}

def "test fallback when /info empty"() {
setup:
OkHttpClient client = Mock(OkHttpClient)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"version": "7.67.0",
"git_commit": "bdf863ccc9",
"endpoints": [
"/v0.3/traces",
"/v0.4/traces",
"/v0.5/traces",
"/v0.6/stats",
"/v0.1/pipeline_stats",
"/telemetry/proxy/",
"/evp_proxy/v4/",
"/debugger/v1/input"
],
"client_drop_p0s": true,
"long_running_spans": true,
"config": {
"statsd_port": 8125
},
"org_prop_marker": "abc123def0"
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,5 +169,9 @@ public final class TracerConfig {
"trace.cloud.payload.tagging.max-tags";
public static final String TRACE_SERVICE_DISCOVERY_ENABLED = "trace.service.discovery.enabled";

public static final String TRACE_ORG_GUARD_ENABLED = "trace.org.guard.enabled";
public static final String TRACE_ORG_GUARD_STRICT = "trace.org.guard.strict";
public static final String TRACE_ORG_GUARD_TRUSTED_OPMS = "trace.org.guard.trusted.opms";

private TracerConfig() {}
}
16 changes: 14 additions & 2 deletions dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@
import datadog.trace.core.propagation.ExtractedContext;
import datadog.trace.core.propagation.HttpCodec;
import datadog.trace.core.propagation.InferredProxyPropagator;
import datadog.trace.core.propagation.OpmStampingInjector;
import datadog.trace.core.propagation.OrgGuardEnforcer;
import datadog.trace.core.propagation.PropagationTags;
import datadog.trace.core.propagation.TracingPropagator;
import datadog.trace.core.propagation.XRayPropagator;
Expand Down Expand Up @@ -827,11 +829,22 @@ private CoreTracer(

sharedCommunicationObjects.whenReady(this.dataStreamsMonitoring::start);

propagationTagsFactory = PropagationTags.factory(config);

// Register context propagators
HttpCodec.Extractor tracingExtractor =
extractor == null ? HttpCodec.createExtractor(config, this::captureTraceConfig) : extractor;
HttpCodec.Injector tracingInjector =
new OpmStampingInjector(injector, featuresDiscovery::getOrgPropagationMarker);
OrgGuardEnforcer orgGuardEnforcer =
new OrgGuardEnforcer(
config,
featuresDiscovery::getOrgPropagationMarker,
propagationTagsFactory,
this.healthMetrics);
TracingPropagator tracingPropagator =
new TracingPropagator(config.isApmTracingEnabled(), injector, tracingExtractor);
new TracingPropagator(
config.isApmTracingEnabled(), tracingInjector, tracingExtractor, orgGuardEnforcer);
Propagators.register(TRACING_CONCERN, tracingPropagator);
Propagators.register(XRAY_TRACING_CONCERN, new XRayPropagator(config), false);
if (config.isDataStreamsEnabled()) {
Expand Down Expand Up @@ -889,7 +902,6 @@ private CoreTracer(

StatusLogger.logStatus(config);

propagationTagsFactory = PropagationTags.factory(config);
this.profilingContextIntegration = profilingContextIntegration;
this.injectBaggageAsTags = injectBaggageAsTags;
this.injectLinksAsTags = injectLinksAsTags;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ public void onCloseScope() {}

public void onScopeStackOverflow() {}

/**
* Reports that the Org Propagation Guard dropped the inbound Datadog context for an extracted
* trace. {@code reason} is one of {@code "mismatch"} (inbound and local OPMs differ) or {@code
* "strict_missing"} (strict mode + inbound OPM absent).
*/
public void onOrgGuardEnforce(final String reason) {}

public void onSend(
final int traceCount, final int sizeInBytes, final RemoteApi.Response response) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable
private final LongAdder longRunningTracesDropped = new LongAdder();
private final LongAdder longRunningTracesExpired = new LongAdder();

private final LongAdder orgGuardEnforceMismatch = new LongAdder();
private final LongAdder orgGuardEnforceStrictMissing = new LongAdder();

private final LongAdder clientStatsProcessedSpans = new LongAdder();
private final LongAdder clientStatsProcessedTraces = new LongAdder();
private final LongAdder clientStatsP0DroppedSpans = new LongAdder();
Expand Down Expand Up @@ -285,6 +288,15 @@ public void onScopeStackOverflow() {
scopeStackOverflow.increment();
}

@Override
public void onOrgGuardEnforce(final String reason) {
if ("mismatch".equals(reason)) {
orgGuardEnforceMismatch.increment();
} else if ("strict_missing".equals(reason)) {
orgGuardEnforceStrictMissing.increment();
}
}

@Override
public void onSend(
final int traceCount, final int sizeInBytes, final RemoteApi.Response response) {
Expand Down Expand Up @@ -374,8 +386,11 @@ private static class Flush implements AgentTaskScheduler.Task<TracerHealthMetric
private static final String[] UNSET_TAG = new String[] {"priority:unset"};
private static final String[] SINGLE_SPAN_SAMPLER = new String[] {"sampler:single-span"};
private static final String[] REASON_LRU_EVICTION_TAG = new String[] {"reason:lru_eviction"};
private static final String[] ORG_GUARD_MISMATCH_TAGS = new String[] {"reason:mismatch"};
private static final String[] ORG_GUARD_STRICT_MISSING_TAGS =
new String[] {"reason:strict_missing"};

private final long[] previousCounts = new long[51];
private final long[] previousCounts = new long[53];

@SuppressFBWarnings("AT_STALE_THREAD_WRITE_OF_PRIMITIVE")
private int countIndex;
Expand Down Expand Up @@ -488,6 +503,17 @@ public void run(TracerHealthMetrics target) {
reportIfChanged(
target.statsd, "long-running.expired", target.longRunningTracesExpired, NO_TAGS);

reportIfChanged(
target.statsd,
"org_guard.enforce",
target.orgGuardEnforceMismatch,
ORG_GUARD_MISMATCH_TAGS);
reportIfChanged(
target.statsd,
"org_guard.enforce",
target.orgGuardEnforceStrictMissing,
ORG_GUARD_STRICT_MISSING_TAGS);

reportIfChanged(
target.statsd, "stats.traces_in", target.clientStatsProcessedTraces, NO_TAGS);
reportIfChanged(target.statsd, "stats.spans_in", target.clientStatsProcessedSpans, NO_TAGS);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package datadog.trace.core.propagation;

import datadog.context.propagation.CarrierSetter;
import datadog.trace.core.DDSpanContext;
import java.util.function.Supplier;

/**
* Decorates an {@link HttpCodec.Injector} so that, just before delegating to the underlying codecs,
* it stamps the local Org Propagation Marker (OPM) onto the span's propagation tags. The codecs
* (Datadog, W3C tracecontext) then serialize whatever is in the propagation tags, which causes the
* local OPM to overwrite any inbound OPM in {@code _dd.p.opm} / {@code t.opm}.
*
* <p>If the supplier returns {@code null} (the agent hasn't reported an OPM yet), this is a no-op
* and any inbound OPM is forwarded as-is, per the RFC.
*/
public final class OpmStampingInjector implements HttpCodec.Injector {

private final HttpCodec.Injector delegate;
private final Supplier<String> localOpmSupplier;

public OpmStampingInjector(HttpCodec.Injector delegate, Supplier<String> localOpmSupplier) {
this.delegate = delegate;
this.localOpmSupplier = localOpmSupplier;
}

@Override
public <C> void inject(DDSpanContext context, C carrier, CarrierSetter<C> setter) {
String localOpm = localOpmSupplier.get();
if (localOpm != null) {
context.getPropagationTags().updateOrgPropagationMarker(localOpm);
}
delegate.inject(context, carrier, setter);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package datadog.trace.core.propagation;

import datadog.trace.api.Config;
import datadog.trace.api.sampling.PrioritySampling;
import datadog.trace.bootstrap.instrumentation.api.TagContext;
import datadog.trace.core.monitor.HealthMetrics;
import java.util.Set;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Enforces the Org Propagation Guard (OPG) on extracted contexts. When an inbound trace carries an
* Org Propagation Marker (OPM) that does not match the local one, the Datadog-side context
* (sampling priority, origin, {@code _dd.p.*} propagated tags) is dropped while parent identifiers,
* baggage, and non-{@code dd} tracestate vendor sections are preserved.
*
* <p>Behavior is gated by three configuration knobs:
*
* <ul>
* <li>{@code DD_TRACE_ORG_GUARD_ENABLED} — master switch; when {@code false} this class is a
* no-op.
* <li>{@code DD_TRACE_ORG_GUARD_STRICT} — when {@code true}, also enforces when the inbound OPM
* is absent.
* <li>{@code DD_TRACE_ORG_GUARD_TRUSTED_OPMS} — comma-separated allow-list of inbound OPMs that
* should be treated as trusted.
* </ul>
*
* <p>Enforcement never runs when the local OPM is unknown (the agent has not yet reported one).
*/
public final class OrgGuardEnforcer {

private static final Logger log = LoggerFactory.getLogger(OrgGuardEnforcer.class);

static final String REASON_MISMATCH = "mismatch";
static final String REASON_STRICT_MISSING = "strict_missing";

private final boolean enabled;
private final boolean strict;
private final Set<String> trustedOpms;
private final Supplier<String> localOpmSupplier;
private final PropagationTags.Factory factory;
private final HealthMetrics healthMetrics;

public OrgGuardEnforcer(
Config config,
Supplier<String> localOpmSupplier,
PropagationTags.Factory factory,
HealthMetrics healthMetrics) {
this(
config.isTraceOrgGuardEnabled(),
config.isTraceOrgGuardStrict(),
config.getTraceOrgGuardTrustedOpms(),
localOpmSupplier,
factory,
healthMetrics);
}

// Visible for testing.
OrgGuardEnforcer(
boolean enabled,
boolean strict,
Set<String> trustedOpms,
Supplier<String> localOpmSupplier,
PropagationTags.Factory factory,
HealthMetrics healthMetrics) {
this.enabled = enabled;
this.strict = strict;
this.trustedOpms = trustedOpms;
this.localOpmSupplier = localOpmSupplier;
this.factory = factory;
this.healthMetrics = healthMetrics;
}

/**
* Returns {@code extracted} unchanged unless OPG enforcement applies, in which case it returns a
* fresh {@link ExtractedContext} with the Datadog-side context dropped.
*/
public TagContext maybeStrip(TagContext extracted) {
if (!enabled || !(extracted instanceof ExtractedContext)) {
return extracted;
}
ExtractedContext ctx = (ExtractedContext) extracted;
String localOpm = localOpmSupplier.get();
if (localOpm == null) {
// We don't know our own OPM yet — never enforce.
return extracted;
}
CharSequence inboundCs = ctx.getPropagationTags().getOrgPropagationMarker();
String inbound = inboundCs == null ? null : inboundCs.toString();

if (inbound == null) {
if (!strict) {
return extracted;
}
return strip(ctx, REASON_STRICT_MISSING, localOpm, null);
}
if (localOpm.equals(inbound) || trustedOpms.contains(inbound)) {
return extracted;
}
return strip(ctx, REASON_MISMATCH, localOpm, inbound);
}

private ExtractedContext strip(
ExtractedContext ctx, String reason, String localOpm, String inbound) {
log.debug(
"OPG enforcement: dropping dd context (reason={}, inbound={}, local={})",
reason,
inbound,
localOpm);
healthMetrics.onOrgGuardEnforce(reason);

PropagationTags stripped = factory.emptyW3C(ctx.getPropagationTags().getW3CTracestate());
return new ExtractedContext(
ctx.getTraceId(),
ctx.getSpanId(),
PrioritySampling.UNSET,
/* origin */ null,
ctx.getEndToEndStartTime(),
ctx.getBaggage(),
ctx.getTags(),
/* httpHeaders */ null,
stripped,
ctx.getTraceConfig(),
ctx.getPropagationStyle());
}
}
Loading
Loading