Skip to content

Commit 012c958

Browse files
committed
feat(conversation_service): add _schema_version to all OTS put_* methods
每次写入 OTS 行时在 attribute_columns 中携带 _schema_version 字段, 用于 SDK 写入端与 Core 读取端(funagent-core)的 schema 兼容性协调。 - 在 model.py 中定义 SCHEMA_VERSION_COLUMN 和 6 个表级版本常量 - 修改 __ots_backend_async_template.py 中所有 put_* 方法 - codegen 生成同步方法(ots_backend.py) - 新增 TestSchemaVersionAsync 测试类验证写入正确性 Made-with: Cursor
1 parent dd40d1d commit 012c958

4 files changed

Lines changed: 178 additions & 24 deletions

File tree

agentrun/conversation_service/__ots_backend_async_template.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535
)
3636

3737
from agentrun.conversation_service.model import (
38+
CHECKPOINT_BLOBS_SCHEMA_VERSION,
39+
CHECKPOINT_SCHEMA_VERSION,
40+
CHECKPOINT_WRITES_SCHEMA_VERSION,
41+
CONVERSATION_SCHEMA_VERSION,
3842
ConversationEvent,
3943
ConversationSession,
4044
DEFAULT_APP_STATE_TABLE,
@@ -48,6 +52,9 @@
4852
DEFAULT_STATE_SEARCH_INDEX,
4953
DEFAULT_STATE_TABLE,
5054
DEFAULT_USER_STATE_TABLE,
55+
EVENT_SCHEMA_VERSION,
56+
SCHEMA_VERSION_COLUMN,
57+
STATE_SCHEMA_VERSION,
5158
StateData,
5259
StateScope,
5360
)
@@ -607,6 +614,7 @@ async def put_session_async(self, session: ConversationSession) -> None:
607614
]
608615

609616
attribute_columns = [
617+
(SCHEMA_VERSION_COLUMN, CONVERSATION_SCHEMA_VERSION),
610618
("created_at", session.created_at),
611619
("updated_at", session.updated_at),
612620
("is_pinned", session.is_pinned),
@@ -991,6 +999,7 @@ async def put_event_async(
991999

9921000
content_json = json.dumps(content, ensure_ascii=False)
9931001
attribute_columns = [
1002+
(SCHEMA_VERSION_COLUMN, EVENT_SCHEMA_VERSION),
9941003
("type", event_type),
9951004
("content", content_json),
9961005
("created_at", created_at),
@@ -1204,6 +1213,7 @@ async def put_state_async(
12041213
state_json = serialize_state(state)
12051214

12061215
put_cols: list[tuple[str, Any]] = [
1216+
(SCHEMA_VERSION_COLUMN, STATE_SCHEMA_VERSION),
12071217
("updated_at", now),
12081218
("version", version + 1),
12091219
]
@@ -1349,6 +1359,7 @@ async def put_checkpoint_async(
13491359
("checkpoint_id", checkpoint_id),
13501360
]
13511361
attribute_columns = [
1362+
(SCHEMA_VERSION_COLUMN, CHECKPOINT_SCHEMA_VERSION),
13521363
("checkpoint_type", checkpoint_type),
13531364
("checkpoint_data", checkpoint_data),
13541365
("metadata", metadata_json),
@@ -1502,6 +1513,7 @@ async def put_checkpoint_writes_async(
15021513
("task_idx", w["task_idx"]),
15031514
]
15041515
attrs = [
1516+
(SCHEMA_VERSION_COLUMN, CHECKPOINT_WRITES_SCHEMA_VERSION),
15051517
("task_id", w["task_id"]),
15061518
("task_path", w.get("task_path", "")),
15071519
("channel", w["channel"]),
@@ -1580,6 +1592,7 @@ async def put_checkpoint_blob_async(
15801592
("version", version),
15811593
]
15821594
attribute_columns = [
1595+
(SCHEMA_VERSION_COLUMN, CHECKPOINT_BLOBS_SCHEMA_VERSION),
15831596
("blob_type", blob_type),
15841597
("blob_data", blob_data),
15851598
]

agentrun/conversation_service/model.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,36 @@
2828
DEFAULT_CHECKPOINT_WRITES_TABLE = "checkpoint_writes"
2929
DEFAULT_CHECKPOINT_BLOBS_TABLE = "checkpoint_blobs"
3030

31+
# ---------------------------------------------------------------------------
32+
# OTS Schema 版本管理
33+
#
34+
# 每张表独立计数,用于 SDK 写入端与 Core 读取端(funagent-core)的兼容性协调。
35+
# 每次 PutRow 时在 attribute_columns 中写入 _schema_version 字段。
36+
# Core 端读取时检查该字段,版本不匹配时打 WARN 日志并尽力解析。
37+
# 历史数据(无此字段)视为 v0。
38+
#
39+
# 升级流程:
40+
# 1. 递增对应表的 *_SCHEMA_VERSION 常量
41+
# 2. 在 PR 描述中记录变更的列名/类型/语义
42+
# 3. 通知 funagent-core 侧同步更新解析逻辑和版本常量
43+
# 4. 如涉及 breaking change,提供数据迁移指引
44+
#
45+
# 兼容性规则:
46+
# - 只加不删:新增列允许,删除/重命名列视为 breaking change
47+
# - PK 不可变:主键结构永不改变
48+
# - 索引名不可变:Search Index 名称一旦确定不再修改
49+
# - 语义不可变:已有列的类型和含义不改变
50+
# ---------------------------------------------------------------------------
51+
52+
SCHEMA_VERSION_COLUMN = "_schema_version"
53+
54+
CONVERSATION_SCHEMA_VERSION = 1
55+
EVENT_SCHEMA_VERSION = 1
56+
STATE_SCHEMA_VERSION = 1 # state / app_state / user_state 共享
57+
CHECKPOINT_SCHEMA_VERSION = 1
58+
CHECKPOINT_WRITES_SCHEMA_VERSION = 1
59+
CHECKPOINT_BLOBS_SCHEMA_VERSION = 1
60+
3161

3262
# ---------------------------------------------------------------------------
3363
# 枚举

agentrun/conversation_service/ots_backend.py

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@
4545
)
4646

4747
from agentrun.conversation_service.model import (
48+
CHECKPOINT_BLOBS_SCHEMA_VERSION,
49+
CHECKPOINT_SCHEMA_VERSION,
50+
CHECKPOINT_WRITES_SCHEMA_VERSION,
51+
CONVERSATION_SCHEMA_VERSION,
4852
ConversationEvent,
4953
ConversationSession,
5054
DEFAULT_APP_STATE_TABLE,
@@ -58,6 +62,9 @@
5862
DEFAULT_STATE_SEARCH_INDEX,
5963
DEFAULT_STATE_TABLE,
6064
DEFAULT_USER_STATE_TABLE,
65+
EVENT_SCHEMA_VERSION,
66+
SCHEMA_VERSION_COLUMN,
67+
STATE_SCHEMA_VERSION,
6168
StateData,
6269
StateScope,
6370
)
@@ -991,21 +998,13 @@ async def _create_state_search_index_async(self) -> None:
991998
self._state_table,
992999
)
9931000
except OTSServiceError as e:
994-
err_str = str(e).lower()
995-
if "already exist" in err_str or (
1001+
if "already exist" in str(e).lower() or (
9961002
hasattr(e, "code") and e.code == "OTSObjectAlreadyExist"
9971003
):
9981004
logger.warning(
9991005
"Search index %s already exists, skipping.",
10001006
self._state_search_index,
10011007
)
1002-
elif "does not exist" in err_str and "table" in err_str:
1003-
logger.warning(
1004-
"Table %s does not exist, skipping search index creation"
1005-
" for %s.",
1006-
self._state_table,
1007-
self._state_search_index,
1008-
)
10091008
else:
10101009
raise
10111010

@@ -1084,21 +1083,13 @@ def _create_state_search_index(self) -> None:
10841083
self._state_table,
10851084
)
10861085
except OTSServiceError as e:
1087-
err_str = str(e).lower()
1088-
if "already exist" in err_str or (
1086+
if "already exist" in str(e).lower() or (
10891087
hasattr(e, "code") and e.code == "OTSObjectAlreadyExist"
10901088
):
10911089
logger.warning(
10921090
"Search index %s already exists, skipping.",
10931091
self._state_search_index,
10941092
)
1095-
elif "does not exist" in err_str and "table" in err_str:
1096-
logger.warning(
1097-
"Table %s does not exist, skipping search index creation"
1098-
" for %s.",
1099-
self._state_table,
1100-
self._state_search_index,
1101-
)
11021093
else:
11031094
raise
11041095

@@ -1115,6 +1106,7 @@ async def put_session_async(self, session: ConversationSession) -> None:
11151106
]
11161107

11171108
attribute_columns = [
1109+
(SCHEMA_VERSION_COLUMN, CONVERSATION_SCHEMA_VERSION),
11181110
("created_at", session.created_at),
11191111
("updated_at", session.updated_at),
11201112
("is_pinned", session.is_pinned),
@@ -1148,6 +1140,7 @@ def put_session(self, session: ConversationSession) -> None:
11481140
]
11491141

11501142
attribute_columns = [
1143+
(SCHEMA_VERSION_COLUMN, CONVERSATION_SCHEMA_VERSION),
11511144
("created_at", session.created_at),
11521145
("updated_at", session.updated_at),
11531146
("is_pinned", session.is_pinned),
@@ -1844,6 +1837,7 @@ async def put_event_async(
18441837

18451838
content_json = json.dumps(content, ensure_ascii=False)
18461839
attribute_columns = [
1840+
(SCHEMA_VERSION_COLUMN, EVENT_SCHEMA_VERSION),
18471841
("type", event_type),
18481842
("content", content_json),
18491843
("created_at", created_at),
@@ -1918,6 +1912,7 @@ def put_event(
19181912

19191913
content_json = json.dumps(content, ensure_ascii=False)
19201914
attribute_columns = [
1915+
(SCHEMA_VERSION_COLUMN, EVENT_SCHEMA_VERSION),
19211916
("type", event_type),
19221917
("content", content_json),
19231918
("created_at", created_at),
@@ -2282,6 +2277,7 @@ async def put_state_async(
22822277
state_json = serialize_state(state)
22832278

22842279
put_cols: list[tuple[str, Any]] = [
2280+
(SCHEMA_VERSION_COLUMN, STATE_SCHEMA_VERSION),
22852281
("updated_at", now),
22862282
("version", version + 1),
22872283
]
@@ -2374,6 +2370,7 @@ def put_state(
23742370
state_json = serialize_state(state)
23752371

23762372
put_cols: list[tuple[str, Any]] = [
2373+
(SCHEMA_VERSION_COLUMN, STATE_SCHEMA_VERSION),
23772374
("updated_at", now),
23782375
("version", version + 1),
23792376
]
@@ -2542,7 +2539,7 @@ async def delete_state_row_async(
25422539
await self._async_client.delete_row(table_name, row, condition)
25432540

25442541
# -----------------------------------------------------------------------
2545-
# State CRUD(同步
2542+
# Checkpoint CRUD(LangGraph)(异步
25462543
# -----------------------------------------------------------------------
25472544

25482545
def delete_state_row(
@@ -2561,7 +2558,7 @@ def delete_state_row(
25612558
self._client.delete_row(table_name, row, condition)
25622559

25632560
# -----------------------------------------------------------------------
2564-
# Checkpoint CRUD(LangGraph)(异步
2561+
# Checkpoint CRUD(LangGraph)(同步
25652562
# -----------------------------------------------------------------------
25662563

25672564
async def put_checkpoint_async(
@@ -2582,6 +2579,7 @@ async def put_checkpoint_async(
25822579
("checkpoint_id", checkpoint_id),
25832580
]
25842581
attribute_columns = [
2582+
(SCHEMA_VERSION_COLUMN, CHECKPOINT_SCHEMA_VERSION),
25852583
("checkpoint_type", checkpoint_type),
25862584
("checkpoint_data", checkpoint_data),
25872585
("metadata", metadata_json),
@@ -2591,10 +2589,6 @@ async def put_checkpoint_async(
25912589
condition = Condition(RowExistenceExpectation.IGNORE)
25922590
await self._async_client.put_row(self._checkpoint_table, row, condition)
25932591

2594-
# -----------------------------------------------------------------------
2595-
# Checkpoint CRUD(LangGraph)(同步)
2596-
# -----------------------------------------------------------------------
2597-
25982592
def put_checkpoint(
25992593
self,
26002594
thread_id: str,
@@ -2613,6 +2607,7 @@ def put_checkpoint(
26132607
("checkpoint_id", checkpoint_id),
26142608
]
26152609
attribute_columns = [
2610+
(SCHEMA_VERSION_COLUMN, CHECKPOINT_SCHEMA_VERSION),
26162611
("checkpoint_type", checkpoint_type),
26172612
("checkpoint_data", checkpoint_data),
26182613
("metadata", metadata_json),
@@ -2881,6 +2876,7 @@ async def put_checkpoint_writes_async(
28812876
("task_idx", w["task_idx"]),
28822877
]
28832878
attrs = [
2879+
(SCHEMA_VERSION_COLUMN, CHECKPOINT_WRITES_SCHEMA_VERSION),
28842880
("task_id", w["task_id"]),
28852881
("task_path", w.get("task_path", "")),
28862882
("channel", w["channel"]),
@@ -2928,6 +2924,7 @@ def put_checkpoint_writes(
29282924
("task_idx", w["task_idx"]),
29292925
]
29302926
attrs = [
2927+
(SCHEMA_VERSION_COLUMN, CHECKPOINT_WRITES_SCHEMA_VERSION),
29312928
("task_id", w["task_id"]),
29322929
("task_path", w.get("task_path", "")),
29332930
("channel", w["channel"]),
@@ -3048,6 +3045,7 @@ async def put_checkpoint_blob_async(
30483045
("version", version),
30493046
]
30503047
attribute_columns = [
3048+
(SCHEMA_VERSION_COLUMN, CHECKPOINT_BLOBS_SCHEMA_VERSION),
30513049
("blob_type", blob_type),
30523050
("blob_data", blob_data),
30533051
]
@@ -3075,6 +3073,7 @@ def put_checkpoint_blob(
30753073
("version", version),
30763074
]
30773075
attribute_columns = [
3076+
(SCHEMA_VERSION_COLUMN, CHECKPOINT_BLOBS_SCHEMA_VERSION),
30783077
("blob_type", blob_type),
30793078
("blob_data", blob_data),
30803079
]

0 commit comments

Comments
 (0)