Skip to content

Commit 0205ac4

Browse files
Merge pull request #2476 from keboola/jt-cleanup
fix: Cleanup deleted and disabled sinks
2 parents 8db6246 + 1fe0ea0 commit 0205ac4

2 files changed

Lines changed: 5 additions & 5 deletions

File tree

internal/pkg/service/stream/definition/repository/sink/sink_watch.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,7 @@ import (
1212
func (r *Repository) GetAllAndWatch(ctx context.Context, opts ...etcd.OpOption) *etcdop.RestartableWatchStreamT[definition.Sink] {
1313
return r.schema.Active().GetAllAndWatch(ctx, r.client, opts...)
1414
}
15+
16+
func (r *Repository) GetActivePlusDeletedAndWatch(ctx context.Context, opts ...etcd.OpOption) *etcdop.RestartableWatchStreamT[definition.Sink] {
17+
return r.schema.GetAllAndWatch(ctx, r.client, opts...)
18+
}

internal/pkg/service/stream/storage/node/coordinator/metacleanup/metacleanup.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func Start(d dependencies, cfg Config) error {
101101
})
102102

103103
n.sinks = etcdop.SetupMirrorMap[definition.Sink, key.SinkKey, *sinkData](
104-
n.definitionRepository.Sink().GetAllAndWatch(ctx),
104+
n.definitionRepository.Sink().GetActivePlusDeletedAndWatch(ctx),
105105
func(_ string, sink definition.Sink) key.SinkKey {
106106
return sink.SinkKey
107107
},
@@ -165,10 +165,6 @@ func (n *Node) cleanMetadata(ctx context.Context) (err error) {
165165

166166
// Process all sink keys
167167
n.sinks.ForEach(func(sinkKey key.SinkKey, sink *sinkData) (stop bool) {
168-
if !sink.Enabled {
169-
return false
170-
}
171-
172168
grp.Go(func() error {
173169
// There can be several cleanup nodes, each node processes an own part.
174170
owner, err := n.dist.IsOwner(sinkKey.ProjectID.String())

0 commit comments

Comments
 (0)