Skip to content

Commit e8f94ba

Browse files
committed
feat: improve startup queue management
1 parent ea15b70 commit e8f94ba

1 file changed

Lines changed: 61 additions & 20 deletions

File tree

apps/llm/run_model.py

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,18 @@
2424
CNV_MAX_SEC = float(os.getenv("CNV_MAX_SEC", "120"))
2525
CNV_READ_CHUNK = int(os.getenv("CNV_READ_CHUNK", "256"))
2626
CNV_STARTUP_DRAIN_MAX = float(os.getenv("CNV_STARTUP_DRAIN_MAX", "4"))
27-
CNV_STARTUP_QUIET = float(os.getenv("CNV_STARTUP_QUIET", "0.4"))
27+
CNV_STARTUP_QUIET_COLD = float(os.getenv("CNV_STARTUP_QUIET", "0.85"))
28+
CNV_STARTUP_QUIET_WARM = float(os.getenv("CNV_STARTUP_QUIET_WARM", "0.42"))
29+
CNV_STARTUP_STRAGGLER_COLD = float(os.getenv("CNV_STARTUP_STRAGGLER_SEC", "0.55"))
30+
CNV_STARTUP_STRAGGLER_WARM = float(os.getenv("CNV_STARTUP_STRAGGLER_WARM", "0.2"))
2831
THREADS = os.getenv("THREADS", "4")
2932
CTX_SIZE = os.getenv("CTX_SIZE", "2048")
3033
TEMP = os.getenv("TEMP", "0.8")
3134

3235
cnv_sessions: dict = {}
3336
cnv_global_lock = threading.Lock()
37+
_startup_warm_lock = threading.Lock()
38+
_cnv_process_startup_warmed = False
3439

3540

3641
def sse_chunk(obj):
@@ -94,28 +99,60 @@ def _stdout_reader(proc: subprocess.Popen, q: queue.Queue) -> None:
9499
q.put(None)
95100

96101

97-
def drain_cnv_startup_queue(q: queue.Queue) -> None:
98-
end = time.monotonic() + CNV_STARTUP_DRAIN_MAX
99-
saw_line = False
100-
quiet_at = None
101-
no_line_deadline = time.monotonic() + 0.5
102-
while time.monotonic() < end:
102+
def _startup_drain_stragglers(q: queue.Queue, budget_sec: float) -> None:
103+
deadline = time.monotonic() + budget_sec
104+
while time.monotonic() < deadline:
105+
drained = False
103106
try:
104-
_ = q.get(timeout=0.12)
105-
if _ is None:
106-
return
107-
saw_line = True
108-
quiet_at = None
107+
while True:
108+
x = q.get_nowait()
109+
if x is None:
110+
return
111+
drained = True
109112
except queue.Empty:
110-
if not saw_line:
111-
if time.monotonic() >= no_line_deadline:
113+
pass
114+
if not drained:
115+
time.sleep(0.07)
116+
117+
118+
def drain_cnv_startup_queue(q: queue.Queue) -> None:
119+
with _startup_warm_lock:
120+
warmed = _cnv_process_startup_warmed
121+
quiet_need = CNV_STARTUP_QUIET_WARM if warmed else CNV_STARTUP_QUIET_COLD
122+
straggler_budget = CNV_STARTUP_STRAGGLER_WARM if warmed else CNV_STARTUP_STRAGGLER_COLD
123+
try:
124+
end = time.monotonic() + CNV_STARTUP_DRAIN_MAX
125+
saw_any = False
126+
quiet_at = None
127+
while time.monotonic() < end:
128+
try:
129+
_ = q.get(timeout=0.12)
130+
if _ is None:
112131
return
113-
continue
114-
now = time.monotonic()
115-
if quiet_at is None:
116-
quiet_at = now
117-
elif now - quiet_at >= CNV_STARTUP_QUIET:
118-
return
132+
saw_any = True
133+
quiet_at = None
134+
except queue.Empty:
135+
if not saw_any:
136+
continue
137+
now = time.monotonic()
138+
if quiet_at is None:
139+
quiet_at = now
140+
elif now - quiet_at >= quiet_need:
141+
got_more = False
142+
try:
143+
while True:
144+
x = q.get_nowait()
145+
if x is None:
146+
return
147+
got_more = True
148+
except queue.Empty:
149+
pass
150+
if got_more:
151+
quiet_at = None
152+
continue
153+
return
154+
finally:
155+
_startup_drain_stragglers(q, straggler_budget)
119156

120157

121158
def _terminate_session(data: dict) -> None:
@@ -194,6 +231,7 @@ def stop():
194231

195232
@app.post("/cnv/init")
196233
def cnv_init():
234+
global _cnv_process_startup_warmed
197235
body = request.get_json(silent=True) or {}
198236
session_id = body.get("session_id") or body.get("sessionId")
199237
system = (body.get("system") or body.get("system_prompt") or DEFAULT_SYSTEM).strip()
@@ -227,6 +265,9 @@ def cnv_init():
227265
"lock": threading.Lock(),
228266
}
229267
drain_cnv_startup_queue(q)
268+
if proc.poll() is None:
269+
with _startup_warm_lock:
270+
_cnv_process_startup_warmed = True
230271
return {"ok": True}
231272

232273

0 commit comments

Comments
 (0)