Skip to content

Commit 5e9f28d

Browse files
authored
fix(tracing): make SGP processor stateless to stop dropping span closes (#354)
1 parent 2c528c6 commit 5e9f28d

2 files changed

Lines changed: 129 additions & 157 deletions

File tree

Lines changed: 43 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import annotations
22

3-
from typing import override
3+
from typing import cast, override
44

55
import scale_gp_beta.lib.tracing as tracing
66
from scale_gp_beta import SGPClient, AsyncSGPClient
@@ -27,6 +27,39 @@ def _get_span_type(span: Span) -> str:
2727
return "STANDALONE"
2828

2929

30+
def _add_source_to_span(span: Span, env_vars: EnvironmentVariables) -> None:
31+
if span.data is None:
32+
span.data = {}
33+
if isinstance(span.data, dict):
34+
span.data["__source__"] = "agentex"
35+
if env_vars.ACP_TYPE is not None:
36+
span.data["__acp_type__"] = env_vars.ACP_TYPE
37+
if env_vars.AGENT_NAME is not None:
38+
span.data["__agent_name__"] = env_vars.AGENT_NAME
39+
if env_vars.AGENT_ID is not None:
40+
span.data["__agent_id__"] = env_vars.AGENT_ID
41+
42+
43+
def _build_sgp_span(span: Span, env_vars: EnvironmentVariables) -> SGPSpan:
44+
"""Build an SGPSpan from an agentex Span. Idempotent on span_id at the SGP backend."""
45+
_add_source_to_span(span, env_vars)
46+
sgp_span = cast(
47+
SGPSpan,
48+
create_span(
49+
name=span.name,
50+
span_type=_get_span_type(span),
51+
span_id=span.id,
52+
parent_id=span.parent_id,
53+
trace_id=span.trace_id,
54+
input=span.input,
55+
output=span.output,
56+
metadata=span.data,
57+
),
58+
)
59+
sgp_span.start_time = span.start_time.isoformat() # type: ignore[union-attr]
60+
return sgp_span
61+
62+
3063
class SGPSyncTracingProcessor(SyncTracingProcessor):
3164
def __init__(self, config: SGPTracingProcessorConfig):
3265
disabled = config.sgp_api_key == "" or config.sgp_account_id == ""
@@ -38,63 +71,27 @@ def __init__(self, config: SGPTracingProcessorConfig):
3871
),
3972
disabled=disabled,
4073
)
41-
self._spans: dict[str, SGPSpan] = {}
4274
self.env_vars = EnvironmentVariables.refresh()
4375

44-
def _add_source_to_span(self, span: Span) -> None:
45-
if span.data is None:
46-
span.data = {}
47-
if isinstance(span.data, dict):
48-
span.data["__source__"] = "agentex"
49-
if self.env_vars.ACP_TYPE is not None:
50-
span.data["__acp_type__"] = self.env_vars.ACP_TYPE
51-
if self.env_vars.AGENT_NAME is not None:
52-
span.data["__agent_name__"] = self.env_vars.AGENT_NAME
53-
if self.env_vars.AGENT_ID is not None:
54-
span.data["__agent_id__"] = self.env_vars.AGENT_ID
55-
5676
@override
5777
def on_span_start(self, span: Span) -> None:
58-
self._add_source_to_span(span)
59-
60-
sgp_span = create_span(
61-
name=span.name,
62-
span_type=_get_span_type(span),
63-
span_id=span.id,
64-
parent_id=span.parent_id,
65-
trace_id=span.trace_id,
66-
input=span.input,
67-
output=span.output,
68-
metadata=span.data,
69-
)
70-
sgp_span.start_time = span.start_time.isoformat() # type: ignore[union-attr]
78+
sgp_span = _build_sgp_span(span, self.env_vars)
7179
sgp_span.flush(blocking=False)
7280

73-
self._spans[span.id] = sgp_span
74-
7581
@override
7682
def on_span_end(self, span: Span) -> None:
77-
sgp_span = self._spans.pop(span.id, None)
78-
if sgp_span is None:
79-
logger.warning(f"Span {span.id} not found in stored spans, skipping span end")
80-
return
81-
82-
self._add_source_to_span(span)
83-
sgp_span.output = span.output # type: ignore[assignment]
84-
sgp_span.metadata = span.data # type: ignore[assignment]
83+
sgp_span = _build_sgp_span(span, self.env_vars)
8584
sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr]
8685
sgp_span.flush(blocking=False)
8786

8887
@override
8988
def shutdown(self) -> None:
90-
self._spans.clear()
9189
flush_queue()
9290

9391

9492
class SGPAsyncTracingProcessor(AsyncTracingProcessor):
9593
def __init__(self, config: SGPTracingProcessorConfig):
9694
self.disabled = config.sgp_api_key == "" or config.sgp_account_id == ""
97-
self._spans: dict[str, SGPSpan] = {}
9895
import httpx
9996

10097
# Disable keepalive so each HTTP call gets a fresh TCP connection,
@@ -113,18 +110,6 @@ def __init__(self, config: SGPTracingProcessorConfig):
113110
)
114111
self.env_vars = EnvironmentVariables.refresh()
115112

116-
def _add_source_to_span(self, span: Span) -> None:
117-
if span.data is None:
118-
span.data = {}
119-
if isinstance(span.data, dict):
120-
span.data["__source__"] = "agentex"
121-
if self.env_vars.ACP_TYPE is not None:
122-
span.data["__acp_type__"] = self.env_vars.ACP_TYPE
123-
if self.env_vars.AGENT_NAME is not None:
124-
span.data["__agent_name__"] = self.env_vars.AGENT_NAME
125-
if self.env_vars.AGENT_ID is not None:
126-
span.data["__agent_id__"] = self.env_vars.AGENT_ID
127-
128113
@override
129114
async def on_span_start(self, span: Span) -> None:
130115
await self.on_spans_start([span])
@@ -138,22 +123,7 @@ async def on_spans_start(self, spans: list[Span]) -> None:
138123
if not spans:
139124
return
140125

141-
sgp_spans: list[SGPSpan] = []
142-
for span in spans:
143-
self._add_source_to_span(span)
144-
sgp_span = create_span(
145-
name=span.name,
146-
span_type=_get_span_type(span),
147-
span_id=span.id,
148-
parent_id=span.parent_id,
149-
trace_id=span.trace_id,
150-
input=span.input,
151-
output=span.output,
152-
metadata=span.data,
153-
)
154-
sgp_span.start_time = span.start_time.isoformat() # type: ignore[union-attr]
155-
self._spans[span.id] = sgp_span
156-
sgp_spans.append(sgp_span)
126+
sgp_spans = [_build_sgp_span(span, self.env_vars) for span in spans]
157127

158128
if self.disabled:
159129
logger.warning("SGP is disabled, skipping span upsert")
@@ -167,29 +137,18 @@ async def on_spans_end(self, spans: list[Span]) -> None:
167137
if not spans:
168138
return
169139

170-
to_upsert: list[SGPSpan] = []
140+
sgp_spans: list[SGPSpan] = []
171141
for span in spans:
172-
sgp_span = self._spans.pop(span.id, None)
173-
if sgp_span is None:
174-
logger.warning(f"Span {span.id} not found in stored spans, skipping span end")
175-
continue
176-
177-
self._add_source_to_span(span)
178-
sgp_span.input = span.input # type: ignore[assignment]
179-
sgp_span.output = span.output # type: ignore[assignment]
180-
sgp_span.metadata = span.data # type: ignore[assignment]
142+
sgp_span = _build_sgp_span(span, self.env_vars)
181143
sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr]
182-
to_upsert.append(sgp_span)
144+
sgp_spans.append(sgp_span)
183145

184-
if self.disabled or not to_upsert:
146+
if self.disabled:
185147
return
186148
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
187-
items=[s.to_request_params() for s in to_upsert]
149+
items=[s.to_request_params() for s in sgp_spans]
188150
)
189151

190152
@override
191153
async def shutdown(self) -> None:
192-
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
193-
items=[sgp_span.to_request_params() for sgp_span in self._spans.values()]
194-
)
195-
self._spans.clear()
154+
pass

0 commit comments

Comments
 (0)