Skip to content

Commit cf249b9

Browse files
authored
Fix Redis stream leak: MAXLEN on xadd + sliding TTL on stream keys (#339)
1 parent 09a816c commit cf249b9

1 file changed

Lines changed: 57 additions & 6 deletions

File tree

src/agentex/lib/core/adapters/streams/adapter_redis.py

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,40 @@
1515
logger = make_logger(__name__)
1616

1717

18+
_DEFAULT_STREAM_MAXLEN = 10000
19+
_DEFAULT_STREAM_TTL_SECONDS = 3600
20+
21+
1822
class RedisStreamRepository(StreamRepository):
1923
"""
2024
A simplified Redis implementation of the EventStreamRepository interface.
2125
Optimized for text/JSON streaming with SSE.
2226
"""
2327

24-
def __init__(self, redis_url: str | None = None):
28+
def __init__(
29+
self,
30+
redis_url: str | None = None,
31+
stream_maxlen: int | None = None,
32+
stream_ttl_seconds: int | None = None,
33+
):
2534
# Get Redis URL from environment if not provided
2635
self.redis_url = redis_url or os.environ.get(
2736
"REDIS_URL", "redis://localhost:6379"
2837
)
2938
self.redis = redis.from_url(self.redis_url)
39+
self.stream_maxlen = (
40+
stream_maxlen
41+
if stream_maxlen is not None
42+
else int(os.environ.get("REDIS_STREAM_MAXLEN", _DEFAULT_STREAM_MAXLEN))
43+
)
44+
# 0 disables sliding TTL.
45+
self.stream_ttl_seconds = (
46+
stream_ttl_seconds
47+
if stream_ttl_seconds is not None
48+
else int(
49+
os.environ.get("REDIS_STREAM_TTL_SECONDS", _DEFAULT_STREAM_TTL_SECONDS)
50+
)
51+
)
3052

3153
@override
3254
async def send_event(self, topic: str, event: dict[str, Any]) -> str:
@@ -47,11 +69,40 @@ async def send_event(self, topic: str, event: dict[str, Any]) -> str:
4769
# # Uncomment to debug
4870
# logger.info(f"Sending event to Redis stream {topic}: {event_json}")
4971

50-
# Add to Redis stream with a reasonable max length
51-
message_id = await self.redis.xadd(
52-
name=topic,
53-
fields={"data": event_json},
54-
)
72+
# Pipeline XADD + EXPIRE in one round-trip so the stream key gets
73+
# a sliding TTL — orphaned streams (no writes for the TTL window)
74+
# self-delete. Mirrors the server-side adapter (scaleapi/scale-agentex#215).
75+
if self.stream_ttl_seconds > 0:
76+
async with self.redis.pipeline(transaction=False) as pipe:
77+
pipe.xadd(
78+
name=topic,
79+
fields={"data": event_json},
80+
maxlen=self.stream_maxlen,
81+
approximate=True,
82+
)
83+
pipe.expire(name=topic, time=self.stream_ttl_seconds)
84+
# raise_on_error=False so an EXPIRE failure does not surface
85+
# to the caller after XADD already succeeded — that would
86+
# risk callers retrying and duplicating messages. A failed
87+
# TTL refresh is recoverable: MAXLEN still caps RAM and the
88+
# next write resets the clock.
89+
results = await pipe.execute(raise_on_error=False)
90+
# results[0] = xadd message ID (or Exception)
91+
# results[1] = expire bool (or Exception)
92+
message_id = results[0]
93+
if isinstance(message_id, Exception):
94+
raise message_id
95+
if isinstance(results[1], Exception):
96+
logger.warning(
97+
f"Failed to refresh TTL on stream {topic}: {results[1]}"
98+
)
99+
else:
100+
message_id = await self.redis.xadd(
101+
name=topic,
102+
fields={"data": event_json},
103+
maxlen=self.stream_maxlen,
104+
approximate=True,
105+
)
55106

56107
return message_id
57108
except Exception as e:

0 commit comments

Comments
 (0)