|
17 | 17 |
|
18 | 18 | import attr |
19 | 19 |
|
20 | | -from synapse.api.constants import EventContentFields, EventTypes, RelationTypes |
| 20 | +from synapse.api.constants import EventContentFields, RelationTypes |
21 | 21 | from synapse.api.room_versions import KNOWN_ROOM_VERSIONS |
22 | 22 | from synapse.events import make_event_from_dict |
23 | 23 | from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause |
@@ -71,10 +71,6 @@ class _BackgroundUpdates: |
71 | 71 |
|
72 | 72 | EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index" |
73 | 73 |
|
74 | | - POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING = ( |
75 | | - "populate_membership_event_stream_ordering" |
76 | | - ) |
77 | | - |
78 | 74 |
|
79 | 75 | @attr.s(slots=True, frozen=True, auto_attribs=True) |
80 | 76 | class _CalculateChainCover: |
@@ -103,10 +99,6 @@ def __init__( |
103 | 99 | ): |
104 | 100 | super().__init__(database, db_conn, hs) |
105 | 101 |
|
106 | | - self.db_pool.updates.register_background_update_handler( |
107 | | - _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING, |
108 | | - self._populate_membership_event_stream_ordering, |
109 | | - ) |
110 | 102 | self.db_pool.updates.register_background_update_handler( |
111 | 103 | _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME, |
112 | 104 | self._background_reindex_origin_server_ts, |
@@ -1506,97 +1498,3 @@ def _populate_txn(txn: LoggingTransaction) -> bool: |
1506 | 1498 | ) |
1507 | 1499 |
|
1508 | 1500 | return batch_size |
1509 | | - |
1510 | | - async def _populate_membership_event_stream_ordering( |
1511 | | - self, progress: JsonDict, batch_size: int |
1512 | | - ) -> int: |
1513 | | - def _populate_membership_event_stream_ordering( |
1514 | | - txn: LoggingTransaction, |
1515 | | - ) -> bool: |
1516 | | - |
1517 | | - if "max_stream_ordering" in progress: |
1518 | | - max_stream_ordering = progress["max_stream_ordering"] |
1519 | | - else: |
1520 | | - txn.execute("SELECT max(stream_ordering) FROM events") |
1521 | | - res = txn.fetchone() |
1522 | | - if res is None or res[0] is None: |
1523 | | - return True |
1524 | | - else: |
1525 | | - max_stream_ordering = res[0] |
1526 | | - |
1527 | | - start = progress.get("stream_ordering", 0) |
1528 | | - stop = start + batch_size |
1529 | | - |
1530 | | - sql = f""" |
1531 | | - SELECT room_id, event_id, stream_ordering |
1532 | | - FROM events |
1533 | | - WHERE |
1534 | | - type = '{EventTypes.Member}' |
1535 | | - AND stream_ordering >= ? |
1536 | | - AND stream_ordering < ? |
1537 | | - """ |
1538 | | - txn.execute(sql, (start, stop)) |
1539 | | - |
1540 | | - rows: List[Tuple[str, str, int]] = cast( |
1541 | | - List[Tuple[str, str, int]], txn.fetchall() |
1542 | | - ) |
1543 | | - |
1544 | | - event_ids: List[Tuple[str]] = [] |
1545 | | - event_stream_orderings: List[Tuple[int]] = [] |
1546 | | - |
1547 | | - for _, event_id, event_stream_ordering in rows: |
1548 | | - event_ids.append((event_id,)) |
1549 | | - event_stream_orderings.append((event_stream_ordering,)) |
1550 | | - |
1551 | | - self.db_pool.simple_update_many_txn( |
1552 | | - txn, |
1553 | | - table="current_state_events", |
1554 | | - key_names=("event_id",), |
1555 | | - key_values=event_ids, |
1556 | | - value_names=("event_stream_ordering",), |
1557 | | - value_values=event_stream_orderings, |
1558 | | - ) |
1559 | | - |
1560 | | - self.db_pool.simple_update_many_txn( |
1561 | | - txn, |
1562 | | - table="room_memberships", |
1563 | | - key_names=("event_id",), |
1564 | | - key_values=event_ids, |
1565 | | - value_names=("event_stream_ordering",), |
1566 | | - value_values=event_stream_orderings, |
1567 | | - ) |
1568 | | - |
1569 | | - # NOTE: local_current_membership has no index on event_id, so only |
1570 | | - # the room ID here will reduce the query rows read. |
1571 | | - for room_id, event_id, event_stream_ordering in rows: |
1572 | | - txn.execute( |
1573 | | - """ |
1574 | | - UPDATE local_current_membership |
1575 | | - SET event_stream_ordering = ? |
1576 | | - WHERE room_id = ? AND event_id = ? |
1577 | | - """, |
1578 | | - (event_stream_ordering, room_id, event_id), |
1579 | | - ) |
1580 | | - |
1581 | | - self.db_pool.updates._background_update_progress_txn( |
1582 | | - txn, |
1583 | | - _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING, |
1584 | | - { |
1585 | | - "stream_ordering": stop, |
1586 | | - "max_stream_ordering": max_stream_ordering, |
1587 | | - }, |
1588 | | - ) |
1589 | | - |
1590 | | - return stop > max_stream_ordering |
1591 | | - |
1592 | | - finished = await self.db_pool.runInteraction( |
1593 | | - "_populate_membership_event_stream_ordering", |
1594 | | - _populate_membership_event_stream_ordering, |
1595 | | - ) |
1596 | | - |
1597 | | - if finished: |
1598 | | - await self.db_pool.updates._end_background_update( |
1599 | | - _BackgroundUpdates.POPULATE_MEMBERSHIP_EVENT_STREAM_ORDERING |
1600 | | - ) |
1601 | | - |
1602 | | - return batch_size |
0 commit comments