Skip to content

Commit 4a56e34

Browse files
feat(grpc): Support span streaming (#6204)
Drop the `type` attribute in streaming lifecycle mode. Use `rpc.method` and `rpc.response.status_code` instead of `method` and `code` in the streaming mode.
1 parent d750512 commit 4a56e34

8 files changed

Lines changed: 1266 additions & 441 deletions

File tree

sentry_sdk/consts.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -899,6 +899,12 @@ class SPANDATA:
899899
Example: "com.example.ExampleService/exampleMethod"
900900
"""
901901

902+
RPC_RESPONSE_STATUS_CODE = "rpc.response.status_code"
903+
"""
904+
Status code of the RPC returned by the RPC server or generated by the client.
905+
Example: "DEADLINE_EXCEEDED"
906+
"""
907+
902908
SERVER_ADDRESS = "server.address"
903909
"""
904910
Name of the database host.

sentry_sdk/integrations/grpc/aio/client.py

Lines changed: 82 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from typing import Callable, Union, AsyncIterable, Any
22

33
import sentry_sdk
4-
from sentry_sdk.consts import OP
4+
from sentry_sdk.consts import OP, SPANDATA
55
from sentry_sdk.integrations import DidNotEnable
66
from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN
7+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
78

89
try:
910
from grpc.aio import (
@@ -49,23 +50,47 @@ async def intercept_unary_unary(
4950
) -> "Union[UnaryUnaryCall, Message]":
5051
method = client_call_details.method
5152

52-
with sentry_sdk.start_span(
53-
op=OP.GRPC_CLIENT,
54-
name="unary unary call to %s" % method.decode(),
55-
origin=SPAN_ORIGIN,
56-
) as span:
57-
span.set_data("type", "unary unary")
58-
span.set_data("method", method)
59-
60-
client_call_details = self._update_client_call_details_metadata_from_scope(
61-
client_call_details
62-
)
63-
64-
response = await continuation(client_call_details, request)
65-
status_code = await response.code()
66-
span.set_data("code", status_code.name)
67-
68-
return response
53+
span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options)
54+
if span_streaming:
55+
with sentry_sdk.traces.start_span(
56+
name="unary unary call to %s" % method.decode(),
57+
attributes={
58+
"sentry.op": OP.GRPC_CLIENT,
59+
"sentry.origin": SPAN_ORIGIN,
60+
SPANDATA.RPC_METHOD: method.decode(),
61+
},
62+
) as span:
63+
client_call_details = (
64+
self._update_client_call_details_metadata_from_scope(
65+
client_call_details
66+
)
67+
)
68+
69+
response = await continuation(client_call_details, request)
70+
status_code = await response.code()
71+
span.set_attribute(SPANDATA.RPC_RESPONSE_STATUS_CODE, status_code.name)
72+
73+
return response
74+
else:
75+
with sentry_sdk.start_span(
76+
op=OP.GRPC_CLIENT,
77+
name="unary unary call to %s" % method.decode(),
78+
origin=SPAN_ORIGIN,
79+
) as span:
80+
span.set_data("type", "unary unary")
81+
span.set_data("method", method)
82+
83+
client_call_details = (
84+
self._update_client_call_details_metadata_from_scope(
85+
client_call_details
86+
)
87+
)
88+
89+
response = await continuation(client_call_details, request)
90+
status_code = await response.code()
91+
span.set_data("code", status_code.name)
92+
93+
return response
6994

7095

7196
class SentryUnaryStreamClientInterceptor(
@@ -80,20 +105,42 @@ async def intercept_unary_stream(
80105
) -> "Union[AsyncIterable[Any], UnaryStreamCall]":
81106
method = client_call_details.method
82107

83-
with sentry_sdk.start_span(
84-
op=OP.GRPC_CLIENT,
85-
name="unary stream call to %s" % method.decode(),
86-
origin=SPAN_ORIGIN,
87-
) as span:
88-
span.set_data("type", "unary stream")
89-
span.set_data("method", method)
90-
91-
client_call_details = self._update_client_call_details_metadata_from_scope(
92-
client_call_details
93-
)
94-
95-
response = await continuation(client_call_details, request)
96-
# status_code = await response.code()
97-
# span.set_data("code", status_code)
98-
99-
return response
108+
span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options)
109+
if span_streaming:
110+
with sentry_sdk.traces.start_span(
111+
name="unary stream call to %s" % method.decode(),
112+
attributes={
113+
"sentry.op": OP.GRPC_CLIENT,
114+
"sentry.origin": SPAN_ORIGIN,
115+
SPANDATA.RPC_METHOD: method.decode(),
116+
},
117+
) as span:
118+
client_call_details = (
119+
self._update_client_call_details_metadata_from_scope(
120+
client_call_details
121+
)
122+
)
123+
124+
response = await continuation(client_call_details, request)
125+
126+
return response
127+
else:
128+
with sentry_sdk.start_span(
129+
op=OP.GRPC_CLIENT,
130+
name="unary stream call to %s" % method.decode(),
131+
origin=SPAN_ORIGIN,
132+
) as span:
133+
span.set_data("type", "unary stream")
134+
span.set_data("method", method)
135+
136+
client_call_details = (
137+
self._update_client_call_details_metadata_from_scope(
138+
client_call_details
139+
)
140+
)
141+
142+
response = await continuation(client_call_details, request)
143+
# status_code = await response.code()
144+
# span.set_data("code", status_code)
145+
146+
return response

sentry_sdk/integrations/grpc/aio/server.py

Lines changed: 51 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN
55
from sentry_sdk.tracing import TransactionSource
66
from sentry_sdk.utils import event_from_exception
7+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
78

89
from typing import TYPE_CHECKING
910

@@ -52,27 +53,57 @@ async def wrapped(request: "Any", context: "ServicerContext") -> "Any":
5253
if not name:
5354
return await handler(request, context)
5455

55-
# What if the headers are empty?
56-
transaction = sentry_sdk.continue_trace(
57-
dict(context.invocation_metadata()),
58-
op=OP.GRPC_SERVER,
59-
name=name,
60-
source=TransactionSource.CUSTOM,
61-
origin=SPAN_ORIGIN,
56+
span_streaming = has_span_streaming_enabled(
57+
sentry_sdk.get_client().options
6258
)
63-
64-
with sentry_sdk.start_transaction(transaction=transaction):
65-
try:
66-
return await handler.unary_unary(request, context)
67-
except AbortError:
68-
raise
69-
except Exception as exc:
70-
event, hint = event_from_exception(
71-
exc,
72-
mechanism={"type": "grpc", "handled": False},
73-
)
74-
sentry_sdk.capture_event(event, hint=hint)
75-
raise
59+
if span_streaming:
60+
# What if the headers are empty?
61+
sentry_sdk.traces.continue_trace(
62+
dict(context.invocation_metadata())
63+
)
64+
65+
with sentry_sdk.traces.start_span(
66+
name=name,
67+
attributes={
68+
"sentry.op": OP.GRPC_SERVER,
69+
"sentry.span.source": TransactionSource.CUSTOM.value,
70+
"sentry.origin": SPAN_ORIGIN,
71+
},
72+
parent_span=None,
73+
):
74+
try:
75+
return await handler.unary_unary(request, context)
76+
except AbortError:
77+
raise
78+
except Exception as exc:
79+
event, hint = event_from_exception(
80+
exc,
81+
mechanism={"type": "grpc", "handled": False},
82+
)
83+
sentry_sdk.capture_event(event, hint=hint)
84+
raise
85+
else:
86+
# What if the headers are empty?
87+
transaction = sentry_sdk.continue_trace(
88+
dict(context.invocation_metadata()),
89+
op=OP.GRPC_SERVER,
90+
name=name,
91+
source=TransactionSource.CUSTOM,
92+
origin=SPAN_ORIGIN,
93+
)
94+
95+
with sentry_sdk.start_transaction(transaction=transaction):
96+
try:
97+
return await handler.unary_unary(request, context)
98+
except AbortError:
99+
raise
100+
except Exception as exc:
101+
event, hint = event_from_exception(
102+
exc,
103+
mechanism={"type": "grpc", "handled": False},
104+
)
105+
sentry_sdk.capture_event(event, hint=hint)
106+
raise
76107

77108
elif not handler.request_streaming and handler.response_streaming:
78109
handler_factory = grpc.unary_stream_rpc_method_handler

sentry_sdk/integrations/grpc/client.py

Lines changed: 85 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import sentry_sdk
2-
from sentry_sdk.consts import OP
2+
from sentry_sdk.consts import OP, SPANDATA
33
from sentry_sdk.integrations import DidNotEnable
44
from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN
5+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
56

67
from typing import TYPE_CHECKING
78

@@ -30,22 +31,47 @@ def intercept_unary_unary(
3031
) -> "_UnaryOutcome":
3132
method = client_call_details.method
3233

33-
with sentry_sdk.start_span(
34-
op=OP.GRPC_CLIENT,
35-
name="unary unary call to %s" % method,
36-
origin=SPAN_ORIGIN,
37-
) as span:
38-
span.set_data("type", "unary unary")
39-
span.set_data("method", method)
40-
41-
client_call_details = self._update_client_call_details_metadata_from_scope(
42-
client_call_details
43-
)
44-
45-
response = continuation(client_call_details, request)
46-
span.set_data("code", response.code().name)
47-
48-
return response
34+
span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options)
35+
if span_streaming:
36+
with sentry_sdk.traces.start_span(
37+
name="unary unary call to %s" % method,
38+
attributes={
39+
"sentry.op": OP.GRPC_CLIENT,
40+
"sentry.origin": SPAN_ORIGIN,
41+
SPANDATA.RPC_METHOD: method,
42+
},
43+
) as span:
44+
client_call_details = (
45+
self._update_client_call_details_metadata_from_scope(
46+
client_call_details
47+
)
48+
)
49+
50+
response = continuation(client_call_details, request)
51+
span.set_attribute(
52+
SPANDATA.RPC_RESPONSE_STATUS_CODE, response.code().name
53+
)
54+
55+
return response
56+
else:
57+
with sentry_sdk.start_span(
58+
op=OP.GRPC_CLIENT,
59+
name="unary unary call to %s" % method,
60+
origin=SPAN_ORIGIN,
61+
) as span:
62+
span.set_data("type", "unary unary")
63+
span.set_data("method", method)
64+
65+
client_call_details = (
66+
self._update_client_call_details_metadata_from_scope(
67+
client_call_details
68+
)
69+
)
70+
71+
response = continuation(client_call_details, request)
72+
span.set_data("code", response.code().name)
73+
74+
return response
4975

5076
def intercept_unary_stream(
5177
self: "ClientInterceptor",
@@ -55,23 +81,48 @@ def intercept_unary_stream(
5581
) -> "Union[Iterator[Message], Call]":
5682
method = client_call_details.method
5783

58-
with sentry_sdk.start_span(
59-
op=OP.GRPC_CLIENT,
60-
name="unary stream call to %s" % method,
61-
origin=SPAN_ORIGIN,
62-
) as span:
63-
span.set_data("type", "unary stream")
64-
span.set_data("method", method)
65-
66-
client_call_details = self._update_client_call_details_metadata_from_scope(
67-
client_call_details
68-
)
69-
70-
response: "UnaryStreamCall" = continuation(client_call_details, request)
71-
# Setting code on unary-stream leads to execution getting stuck
72-
# span.set_data("code", response.code().name)
73-
74-
return response
84+
span_streaming = has_span_streaming_enabled(sentry_sdk.get_client().options)
85+
response: "UnaryStreamCall"
86+
if span_streaming:
87+
with sentry_sdk.traces.start_span(
88+
name="unary stream call to %s" % method,
89+
attributes={
90+
"sentry.op": OP.GRPC_CLIENT,
91+
"sentry.origin": SPAN_ORIGIN,
92+
SPANDATA.RPC_METHOD: method,
93+
},
94+
) as span:
95+
client_call_details = (
96+
self._update_client_call_details_metadata_from_scope(
97+
client_call_details
98+
)
99+
)
100+
101+
response = continuation(client_call_details, request)
102+
# Setting code on unary-stream leads to execution getting stuck
103+
# span.set_data("code", response.code().name)
104+
105+
return response
106+
else:
107+
with sentry_sdk.start_span(
108+
op=OP.GRPC_CLIENT,
109+
name="unary stream call to %s" % method,
110+
origin=SPAN_ORIGIN,
111+
) as span:
112+
span.set_data("type", "unary stream")
113+
span.set_data("method", method)
114+
115+
client_call_details = (
116+
self._update_client_call_details_metadata_from_scope(
117+
client_call_details
118+
)
119+
)
120+
121+
response = continuation(client_call_details, request)
122+
# Setting code on unary-stream leads to execution getting stuck
123+
# span.set_data("code", response.code().name)
124+
125+
return response
75126

76127
@staticmethod
77128
def _update_client_call_details_metadata_from_scope(

0 commit comments

Comments
 (0)