Skip to content

Commit 04f6b06

Browse files
committed
Implement memberlist KV delete
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent 5ad7985 commit 04f6b06

8 files changed

Lines changed: 538 additions & 32 deletions

File tree

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4569,6 +4569,10 @@ The `memberlist_config` configures the Gossip memberlist.
45694569
# CLI flag: -memberlist.left-ingesters-timeout
45704570
[left_ingesters_timeout: <duration> | default = 5m]
45714571
4572+
# How long to keep deleted keys (tombstones) in the KV store
4573+
# CLI flag: -memberlist.tombstone-timeout
4574+
[tombstone_timeout: <duration> | default = 5m]
4575+
45724576
# Timeout for leaving memberlist cluster.
45734577
# CLI flag: -memberlist.leave-timeout
45744578
[leave_timeout: <duration> | default = 5s]

pkg/ha/ha_tracker_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package ha
33
import (
44
"context"
55
"fmt"
6+
"math"
67
"strings"
78
"testing"
89
"time"
@@ -20,6 +21,7 @@ import (
2021

2122
"github.com/cortexproject/cortex/pkg/ring"
2223
"github.com/cortexproject/cortex/pkg/ring/kv"
24+
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
2325
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
2426
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
2527
"github.com/cortexproject/cortex/pkg/util"
@@ -138,6 +140,89 @@ func TestHATrackerConfig_Validate(t *testing.T) {
138140
}
139141
}
140142

143+
type dnsProviderMock struct {
144+
resolved []string
145+
}
146+
147+
func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string, flushOld bool) error {
148+
p.resolved = addrs
149+
return nil
150+
}
151+
152+
func (p dnsProviderMock) Addresses() []string {
153+
return p.resolved
154+
}
155+
156+
// TestHATracker_CleanupDeletesArePropagated demonstrates that when the HA tracker's
157+
// background cleanup loop removes stale replicas, it correctly triggers a memberlist
158+
// KV.Delete which in turn generates a tombstone broadcast.
159+
func TestHATracker_CleanupDeletesArePropagatedWithMemberlist(t *testing.T) {
160+
ctx := context.Background()
161+
logger := log.NewNopLogger()
162+
reg := prometheus.NewPedanticRegistry()
163+
164+
var cfg memberlist.KVConfig
165+
flagext.DefaultValues(&cfg)
166+
replicaDescCodec := GetReplicaDescCodec()
167+
cfg.Codecs = []codec.Codec{replicaDescCodec}
168+
cfg.RetransmitMult = 1
169+
170+
mkv := memberlist.NewKV(cfg, logger, &dnsProviderMock{}, reg)
171+
require.NoError(t, services.StartAndAwaitRunning(ctx, mkv))
172+
defer services.StopAndAwaitTerminated(ctx, mkv) //nolint:errcheck
173+
174+
client, err := memberlist.NewClient(mkv, replicaDescCodec)
175+
require.NoError(t, err)
176+
177+
trackerCfg := HATrackerConfig{
178+
EnableHATracker: false,
179+
KVStore: kv.Config{
180+
Store: "memberlist",
181+
},
182+
}
183+
tracker, err := NewHATracker(trackerCfg, nil, HATrackerStatusConfig{}, reg, "", logger)
184+
require.NoError(t, err)
185+
186+
// Inject our test memberlist client into the tracker
187+
tracker.cfg.EnableHATracker = true
188+
tracker.client = client
189+
190+
userID := "user1"
191+
cluster := "cluster1"
192+
replica := "replica0"
193+
key := userID + "/" + cluster
194+
195+
// Inject initial HA data (simulates receiving a sample)
196+
now := time.Now()
197+
err = tracker.CheckReplica(ctx, userID, cluster, replica, now)
198+
require.NoError(t, err)
199+
200+
// Drain the broadcast queue to clear out the CAS notification from CheckReplica
201+
mkv.GetBroadcasts(0, math.MaxInt32)
202+
203+
// To call c.client.Delete(ctx, key) in cleanupOldReplicas
204+
futureDeadline := now.Add(time.Hour)
205+
206+
// This will see (ReceivedAt < deadline) and CAS the entry to set DeletedAt = time.Now()
207+
tracker.cleanupOldReplicas(ctx, futureDeadline)
208+
require.Equal(t, float64(1), testutil.ToFloat64(tracker.replicasMarkedForDeletion))
209+
require.Equal(t, float64(0), testutil.ToFloat64(tracker.deletedReplicas))
210+
211+
// This will see (DeletedAt > 0) and (DeletedAt < deadline), calling c.client.Delete(ctx, key)
212+
tracker.cleanupOldReplicas(ctx, futureDeadline)
213+
require.Equal(t, float64(1), testutil.ToFloat64(tracker.deletedReplicas))
214+
require.Equal(t, float64(0), testutil.ToFloat64(tracker.markingForDeletionsFailed))
215+
216+
// Verify Local Deletion
217+
val, err := client.Get(ctx, key)
218+
require.NoError(t, err)
219+
require.Nil(t, val, "HA Tracker cleanup should have deleted the key locally")
220+
221+
// Verify Broadcast Generation
222+
broadcasts := mkv.GetBroadcasts(0, math.MaxInt32)
223+
require.NotEmpty(t, broadcasts, "Cleanup Delete should generate a broadcast for tombstone propagation")
224+
}
225+
141226
// Test that values are set in the HATracker after WatchPrefix has found it in the KVStore.
142227
func TestWatchPrefixAssignment(t *testing.T) {
143228
t.Parallel()

pkg/ring/kv/memberlist/kv.pb.go

Lines changed: 107 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/ring/kv/memberlist/kv.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,10 @@ message KeyValuePair {
1919

2020
// ID of the codec used to write the value
2121
string codec = 3;
22+
23+
// Indicates whether the key is marked as logically deleted (Tombstone).
24+
bool deleted = 4;
25+
26+
// Unix timestamp (in nanoseconds) of the last update or deletion.
27+
int64 updatedAt = 5;
2228
}

0 commit comments

Comments
 (0)