diff --git a/grpc/lib/grpc/client/application.ex b/grpc/lib/grpc/client/application.ex index 16d4a230..b131a8e9 100644 --- a/grpc/lib/grpc/client/application.ex +++ b/grpc/lib/grpc/client/application.ex @@ -4,6 +4,7 @@ defmodule GRPC.Client.Application do def start(_type, _args) do children = [ + {Registry, [keys: :unique, name: GRPC.Client.Registry]}, {DynamicSupervisor, [name: GRPC.Client.Supervisor]} ] diff --git a/grpc/lib/grpc/client/connection.ex b/grpc/lib/grpc/client/connection.ex index 7e7a6aca..f51a6e26 100644 --- a/grpc/lib/grpc/client/connection.ex +++ b/grpc/lib/grpc/client/connection.ex @@ -6,8 +6,8 @@ defmodule GRPC.Client.Connection do A `Conn` process manages one or more underlying gRPC connections (`GRPC.Channel` structs) and exposes a virtual channel to be used by client stubs. The orchestration process runs as a `GenServer` registered - globally (via `:global`), so only one orchestrator exists per connection - in a BEAM node. + in a node-local `Registry`, so named channels are scoped to the current + BEAM node. ## Overview @@ -513,7 +513,7 @@ defmodule GRPC.Client.Connection do defp channel_alive?(_), do: false defp via(ref) do - {:global, {__MODULE__, ref}} + {:via, Registry, {GRPC.Client.Registry, {__MODULE__, ref}}} end defp do_disconnect(adapter, channel) do diff --git a/grpc/test/grpc/client/connection_test.exs b/grpc/test/grpc/client/connection_test.exs index 3d29e327..c868efa6 100644 --- a/grpc/test/grpc/client/connection_test.exs +++ b/grpc/test/grpc/client/connection_test.exs @@ -4,6 +4,8 @@ defmodule GRPC.Client.ConnectionTest do alias GRPC.Channel alias GRPC.Client.Connection + @peer if Code.ensure_loaded?(:peer), do: :peer, else: GRPC.Test.PeerShim + setup do %{ ref: make_ref(), @@ -77,7 +79,7 @@ defmodule GRPC.Client.ConnectionTest do } do {:ok, channel} = Connection.connect(target, adapter: adapter, name: ref) - [{pid, _}] = :global.whereis_name({Connection, ref}) |> List.wrap() |> Enum.map(&{&1, nil}) + pid = whereis_name(ref) {:ok, _} = Connection.disconnect(channel) @@ -103,7 +105,7 @@ defmodule GRPC.Client.ConnectionTest do } do {:ok, channel} = Connection.connect(target, adapter: adapter, name: ref) - pid = :global.whereis_name({Connection, ref}) + pid = whereis_name(ref) ref_mon = Process.monitor(pid) GenServer.stop(pid, :shutdown) assert_receive {:DOWN, ^ref_mon, :process, ^pid, :shutdown}, 500 @@ -111,4 +113,66 @@ defmodule GRPC.Client.ConnectionTest do assert {:error, :no_connection} = Connection.pick_channel(channel) end end + + describe "connect/2 - distributed named channels" do + test "named channels do not conflict across connected nodes" do + {:ok, _, port} = GRPC.Server.start(FeatureServer, 0) + + on_exit(fn -> + :ok = GRPC.Server.stop(FeatureServer) + end) + + {peer1, node1} = start_peer() + {peer2, node2} = start_peer() + + on_exit(fn -> + stop_peer(peer1) + stop_peer(peer2) + end) + + assert :pong == @peer.call(peer1, :net_adm, :ping, [node2]) + assert :pong == @peer.call(peer2, :net_adm, :ping, [node1]) + assert :ok == @peer.call(peer1, :global, :sync, []) + assert :ok == @peer.call(peer2, :global, :sync, []) + + target = "ipv4:127.0.0.1:#{port}" + ref = :shared_channel + + assert {:ok, %Channel{ref: ^ref}} = + @peer.call(peer1, Connection, :connect, [target, [name: ref]]) + + assert {:ok, %Channel{ref: ^ref}} = + @peer.call(peer2, Connection, :connect, [target, [name: ref]]) + end + end + + defp start_peer do + {:ok, peer, node} = + @peer.start_link(%{ + name: @peer.random_name(~c"grpcpeer"), + longnames: true, + host: ~c"127.0.0.1", + connection: :standard_io, + args: [~c"-setcookie", ~c"grpcpeer"] + }) + + :ok = @peer.call(peer, :code, :add_paths, [:code.get_path()]) + {:ok, _apps} = @peer.call(peer, Application, :ensure_all_started, [:grpc]) + + {peer, node} + end + + defp stop_peer(peer) do + @peer.stop(peer) + catch + :exit, _reason -> + :ok + end + + defp whereis_name(ref) do + case Registry.lookup(GRPC.Client.Registry, {Connection, ref}) do + [{pid, _value}] -> pid + [] -> nil + end + end end diff --git a/grpc/test/grpc/client/dns_resolver_test.exs b/grpc/test/grpc/client/dns_resolver_test.exs index a0e49cad..8029529a 100644 --- a/grpc/test/grpc/client/dns_resolver_test.exs +++ b/grpc/test/grpc/client/dns_resolver_test.exs @@ -64,9 +64,16 @@ defmodule GRPC.Client.ReResolveTest do } end + defp whereis_name(ref) do + case Registry.lookup(GRPC.Client.Registry, {Connection, ref}) do + [{pid, _value}] -> pid + [] -> nil + end + end + defp disconnect_and_wait(channel) do ref = channel.ref - pid = :global.whereis_name({Connection, ref}) + pid = whereis_name(ref) if pid && Process.alive?(pid) do state = :sys.get_state(pid) @@ -134,7 +141,7 @@ defmodule GRPC.Client.ReResolveTest do end defp get_state(ref) do - pid = :global.whereis_name({Connection, ref}) + pid = whereis_name(ref) :sys.get_state(pid) end @@ -1083,7 +1090,7 @@ defmodule GRPC.Client.ReResolveTest do # Wait for several :refresh cycles (15s default, but we'll trigger manually). # Round-robin will eventually pick 10.0.0.2. Without the fix, this crashes. - pid = :global.whereis_name({Connection, ctx.ref}) + pid = whereis_name(ctx.ref) for _ <- 1..5 do send(pid, :refresh) @@ -1157,7 +1164,7 @@ defmodule GRPC.Client.ReResolveTest do Process.exit(original_pid, :kill) Process.sleep(100) - conn_pid = :global.whereis_name({Connection, ctx.ref}) + conn_pid = whereis_name(ctx.ref) assert Process.alive?(conn_pid) # resolver_state should have a NEW worker pid diff --git a/grpc/test/support/peer_shim.ex b/grpc/test/support/peer_shim.ex new file mode 100644 index 00000000..f3456619 --- /dev/null +++ b/grpc/test/support/peer_shim.ex @@ -0,0 +1,38 @@ +if not Code.ensure_loaded?(:peer) do + defmodule GRPC.Test.PeerShim do + @moduledoc """ + Minimal `:peer` compatibility shim for OTP 24, since `:peer` was added in OTP 25. + """ + + def start_link(options) do + name = Map.fetch!(options, :name) + [~c"-setcookie", cookie] = Map.fetch!(options, :args) + + ensure_origin_node(cookie) + + {:ok, node} = :slave.start_link(~c"127.0.0.1", name, ~c"-setcookie " ++ cookie) + + {:ok, node, node} + end + + def call(node, module, function, args), do: :rpc.call(node, module, function, args) + + def random_name(prefix), do: ~c"#{prefix}-#{:erlang.unique_integer([:positive])}" + + def stop(node), do: :slave.stop(node) + + defp ensure_origin_node(cookie) do + if not Node.alive?() do + :os.cmd(~c"epmd -daemon") + + {:ok, _} = + :net_kernel.start([ + :"grpcorigin-#{:os.getpid()}-#{:erlang.unique_integer([:positive])}@127.0.0.1", + :longnames + ]) + end + + Node.set_cookie(String.to_atom(to_string(cookie))) + end + end +end