File tree Expand file tree Collapse file tree
internal/pkg/service/stream
definition/repository/sink
storage/node/coordinator/metacleanup Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -12,3 +12,7 @@ import (
1212func (r * Repository ) GetAllAndWatch (ctx context.Context , opts ... etcd.OpOption ) * etcdop.RestartableWatchStreamT [definition.Sink ] {
1313 return r .schema .Active ().GetAllAndWatch (ctx , r .client , opts ... )
1414}
15+
16+ func (r * Repository ) GetActivePlusDeletedAndWatch (ctx context.Context , opts ... etcd.OpOption ) * etcdop.RestartableWatchStreamT [definition.Sink ] {
17+ return r .schema .GetAllAndWatch (ctx , r .client , opts ... )
18+ }
Original file line number Diff line number Diff line change @@ -101,7 +101,7 @@ func Start(d dependencies, cfg Config) error {
101101 })
102102
103103 n .sinks = etcdop .SetupMirrorMap [definition.Sink , key.SinkKey , * sinkData ](
104- n .definitionRepository .Sink ().GetAllAndWatch (ctx ),
104+ n .definitionRepository .Sink ().GetActivePlusDeletedAndWatch (ctx ),
105105 func (_ string , sink definition.Sink ) key.SinkKey {
106106 return sink .SinkKey
107107 },
@@ -165,10 +165,6 @@ func (n *Node) cleanMetadata(ctx context.Context) (err error) {
165165
166166 // Process all sink keys
167167 n .sinks .ForEach (func (sinkKey key.SinkKey , sink * sinkData ) (stop bool ) {
168- if ! sink .Enabled {
169- return false
170- }
171-
172168 grp .Go (func () error {
173169 // There can be several cleanup nodes, each node processes an own part.
174170 owner , err := n .dist .IsOwner (sinkKey .ProjectID .String ())
You can’t perform that action at this time.
0 commit comments