Skip to content

Commit 1b3d4ff

Browse files
committed
Add concurrent call tests for nodes
1 parent 8090602 commit 1b3d4ff

2 files changed

Lines changed: 92 additions & 22 deletions

File tree

lib/typesense/api_call.rb

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -130,35 +130,34 @@ def uri_for(endpoint, node)
130130
# But if no healthy nodes are found, it will just return the next node, even if it's unhealthy
131131
# so we can try the request for good measure, in case that node has become healthy since
132132
def next_node
133-
# Check if nearest_node is set and is healthy, if so return it
134-
unless @nearest_node.nil?
135-
@logger.debug "Nodes health: Node #{@nearest_node[:index]} is #{@nearest_node[:is_healthy] == true ? 'Healthy' : 'Unhealthy'}"
136-
if @nearest_node[:is_healthy] == true || node_due_for_healthcheck?(@nearest_node)
137-
@logger.debug "Updated current node to Node #{@nearest_node[:index]}"
138-
return @nearest_node
133+
@nodes_mutex.synchronize do
134+
# Check if nearest_node is set and is healthy, if so return it
135+
unless @nearest_node.nil?
136+
@logger.debug "Nodes health: Node #{@nearest_node[:index]} is #{@nearest_node[:is_healthy] == true ? 'Healthy' : 'Unhealthy'}"
137+
if @nearest_node[:is_healthy] == true || node_due_for_healthcheck?(@nearest_node)
138+
@logger.debug "Updated current node to Node #{@nearest_node[:index]}"
139+
return @nearest_node
140+
end
141+
@logger.debug 'Falling back to individual nodes'
139142
end
140-
@logger.debug 'Falling back to individual nodes'
141-
end
142143

143-
# Fallback to nodes as usual
144-
@logger.debug "Nodes health: #{@nodes.each_with_index.map { |node, i| "Node #{i} is #{node[:is_healthy] == true ? 'Healthy' : 'Unhealthy'}" }.join(' || ')}"
145-
candidate_node = @nodes_mutex.synchronize do
146-
node = nil
144+
# Fallback to nodes as usual
145+
@logger.debug "Nodes health: #{@nodes.each_with_index.map { |node, i| "Node #{i} is #{node[:is_healthy] == true ? 'Healthy' : 'Unhealthy'}" }.join(' || ')}"
146+
candidate_node = nil
147147
(0..@nodes.length).each do |_i|
148148
@current_node_index = (@current_node_index + 1) % @nodes.length
149-
node = @nodes[@current_node_index]
150-
if node[:is_healthy] == true || node_due_for_healthcheck?(node)
151-
@logger.debug "Updated current node to Node #{node[:index]}"
152-
return node
149+
candidate_node = @nodes[@current_node_index]
150+
if candidate_node[:is_healthy] == true || node_due_for_healthcheck?(candidate_node)
151+
@logger.debug "Updated current node to Node #{candidate_node[:index]}"
152+
return candidate_node
153153
end
154154
end
155-
node
156-
end
157155

158-
# None of the nodes are marked healthy, but some of them could have become healthy since last health check.
159-
# So we will just return the next node.
160-
@logger.debug "No healthy nodes were found. Returning the next node, Node #{candidate_node[:index]}"
161-
candidate_node
156+
# None of the nodes are marked healthy, but some of them could have become healthy since last health check.
157+
# So we will just return the next node.
158+
@logger.debug "No healthy nodes were found. Returning the next node, Node #{candidate_node[:index]}"
159+
candidate_node
160+
end
162161
end
163162

164163
def node_due_for_healthcheck?(node)

spec/typesense/api_call_spec.rb

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,37 @@
286286
expect(counts).to all(eq(expected_per_node))
287287
end
288288

289+
it 'never returns a node held unhealthy while next_node is called concurrently' do
290+
unhealthy_node = api_call.instance_variable_get(:@nodes)[1]
291+
api_call.send(:set_node_healthcheck, unhealthy_node, is_healthy: false)
292+
293+
threads = Array.new(8) do
294+
Thread.new do
295+
Array.new(200) { api_call.send(:next_node)[:index] }
296+
end
297+
end
298+
299+
results = threads.flat_map(&:value)
300+
expect(results).not_to include(1)
301+
expect(results).to include(0).and include(2)
302+
end
303+
304+
it 'still returns a node when every node is unhealthy under concurrent calls' do
305+
nodes = api_call.instance_variable_get(:@nodes)
306+
nodes.each { |node| api_call.send(:set_node_healthcheck, node, is_healthy: false) }
307+
308+
threads = Array.new(8) do
309+
Thread.new do
310+
Array.new(50) { api_call.send(:next_node) }
311+
end
312+
end
313+
314+
results = threads.flat_map(&:value)
315+
expect(results.length).to eq(8 * 50)
316+
expect(results).to all(be_a(Hash))
317+
expect(results.map { |n| n[:index] }).to all(be_between(0, nodes.length - 1).inclusive)
318+
end
319+
289320
context 'with a single node' do
290321
let(:typesense) do
291322
Typesense::Client.new(
@@ -314,5 +345,45 @@
314345
expect(node[:last_access_timestamp]).to be_a(Integer)
315346
end
316347
end
348+
349+
context 'with nearest_node configured' do
350+
let(:typesense) do
351+
Typesense::Client.new(
352+
api_key: 'abcd',
353+
nearest_node: { host: 'nearestNode', port: 6108, protocol: 'http' },
354+
nodes: [
355+
{ host: 'node0', port: 8108, protocol: 'http' },
356+
{ host: 'node1', port: 8108, protocol: 'http' },
357+
{ host: 'node2', port: 8108, protocol: 'http' }
358+
],
359+
connection_timeout_seconds: 10,
360+
retry_interval_seconds: 0.01,
361+
log_level: Logger::ERROR
362+
)
363+
end
364+
365+
it 'serializes reads and writes of nearest_node health state under concurrent access' do
366+
nearest_node = api_call.instance_variable_get(:@nearest_node)
367+
368+
writer_threads = Array.new(4) do |i|
369+
Thread.new do
370+
100.times { api_call.send(:set_node_healthcheck, nearest_node, is_healthy: i.even?) }
371+
end
372+
end
373+
374+
reader_threads = Array.new(8) do
375+
Thread.new do
376+
Array.new(100) { api_call.send(:next_node) }
377+
end
378+
end
379+
380+
writer_threads.each(&:join)
381+
reader_results = reader_threads.flat_map(&:value)
382+
383+
expect(reader_results).to all(be_a(Hash))
384+
expect([true, false]).to include(nearest_node[:is_healthy])
385+
expect(nearest_node[:last_access_timestamp]).to be_a(Integer)
386+
end
387+
end
317388
end
318389
end

0 commit comments

Comments
 (0)