Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions grpc/lib/grpc/client/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule GRPC.Client.Application do

def start(_type, _args) do
children = [
{Registry, [keys: :unique, name: GRPC.Client.Registry]},
{DynamicSupervisor, [name: GRPC.Client.Supervisor]}
]

Expand Down
6 changes: 3 additions & 3 deletions grpc/lib/grpc/client/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ defmodule GRPC.Client.Connection do
A `Conn` process manages one or more underlying gRPC connections
(`GRPC.Channel` structs) and exposes a virtual channel to be used by
client stubs. The orchestration process runs as a `GenServer` registered
globally (via `:global`), so only one orchestrator exists per connection
in a BEAM node.
in a node-local `Registry`, so named channels are scoped to the current
BEAM node.

## Overview

Expand Down Expand Up @@ -513,7 +513,7 @@ defmodule GRPC.Client.Connection do
defp channel_alive?(_), do: false

defp via(ref) do
{:global, {__MODULE__, ref}}
{:via, Registry, {GRPC.Client.Registry, {__MODULE__, ref}}}
end

defp do_disconnect(adapter, channel) do
Expand Down
68 changes: 66 additions & 2 deletions grpc/test/grpc/client/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ defmodule GRPC.Client.ConnectionTest do
alias GRPC.Channel
alias GRPC.Client.Connection

@peer if Code.ensure_loaded?(:peer), do: :peer, else: GRPC.Test.PeerShim

setup do
%{
ref: make_ref(),
Expand Down Expand Up @@ -77,7 +79,7 @@ defmodule GRPC.Client.ConnectionTest do
} do
{:ok, channel} = Connection.connect(target, adapter: adapter, name: ref)

[{pid, _}] = :global.whereis_name({Connection, ref}) |> List.wrap() |> Enum.map(&{&1, nil})
pid = whereis_name(ref)

{:ok, _} = Connection.disconnect(channel)

Expand All @@ -103,12 +105,74 @@ defmodule GRPC.Client.ConnectionTest do
} do
{:ok, channel} = Connection.connect(target, adapter: adapter, name: ref)

pid = :global.whereis_name({Connection, ref})
pid = whereis_name(ref)
ref_mon = Process.monitor(pid)
GenServer.stop(pid, :shutdown)
assert_receive {:DOWN, ^ref_mon, :process, ^pid, :shutdown}, 500

assert {:error, :no_connection} = Connection.pick_channel(channel)
end
end

describe "connect/2 - distributed named channels" do
test "named channels do not conflict across connected nodes" do
{:ok, _, port} = GRPC.Server.start(FeatureServer, 0)

on_exit(fn ->
:ok = GRPC.Server.stop(FeatureServer)
end)

{peer1, node1} = start_peer()
{peer2, node2} = start_peer()

on_exit(fn ->
stop_peer(peer1)
stop_peer(peer2)
end)

assert :pong == @peer.call(peer1, :net_adm, :ping, [node2])
assert :pong == @peer.call(peer2, :net_adm, :ping, [node1])
assert :ok == @peer.call(peer1, :global, :sync, [])
assert :ok == @peer.call(peer2, :global, :sync, [])

target = "ipv4:127.0.0.1:#{port}"
ref = :shared_channel

assert {:ok, %Channel{ref: ^ref}} =
@peer.call(peer1, Connection, :connect, [target, [name: ref]])

assert {:ok, %Channel{ref: ^ref}} =
@peer.call(peer2, Connection, :connect, [target, [name: ref]])
end
end

defp start_peer do
{:ok, peer, node} =
@peer.start_link(%{
name: @peer.random_name(~c"grpcpeer"),
longnames: true,
host: ~c"127.0.0.1",
connection: :standard_io,
args: [~c"-setcookie", ~c"grpcpeer"]
})

:ok = @peer.call(peer, :code, :add_paths, [:code.get_path()])
{:ok, _apps} = @peer.call(peer, Application, :ensure_all_started, [:grpc])

{peer, node}
end

defp stop_peer(peer) do
@peer.stop(peer)
catch
:exit, _reason ->
:ok
end

defp whereis_name(ref) do
case Registry.lookup(GRPC.Client.Registry, {Connection, ref}) do
[{pid, _value}] -> pid
[] -> nil
end
end
end
15 changes: 11 additions & 4 deletions grpc/test/grpc/client/dns_resolver_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,16 @@ defmodule GRPC.Client.ReResolveTest do
}
end

defp whereis_name(ref) do
case Registry.lookup(GRPC.Client.Registry, {Connection, ref}) do
[{pid, _value}] -> pid
[] -> nil
end
end

defp disconnect_and_wait(channel) do
ref = channel.ref
pid = :global.whereis_name({Connection, ref})
pid = whereis_name(ref)

if pid && Process.alive?(pid) do
state = :sys.get_state(pid)
Expand Down Expand Up @@ -134,7 +141,7 @@ defmodule GRPC.Client.ReResolveTest do
end

defp get_state(ref) do
pid = :global.whereis_name({Connection, ref})
pid = whereis_name(ref)
:sys.get_state(pid)
end

Expand Down Expand Up @@ -1083,7 +1090,7 @@ defmodule GRPC.Client.ReResolveTest do

# Wait for several :refresh cycles (15s default, but we'll trigger manually).
# Round-robin will eventually pick 10.0.0.2. Without the fix, this crashes.
pid = :global.whereis_name({Connection, ctx.ref})
pid = whereis_name(ctx.ref)

for _ <- 1..5 do
send(pid, :refresh)
Expand Down Expand Up @@ -1157,7 +1164,7 @@ defmodule GRPC.Client.ReResolveTest do
Process.exit(original_pid, :kill)
Process.sleep(100)

conn_pid = :global.whereis_name({Connection, ctx.ref})
conn_pid = whereis_name(ctx.ref)
assert Process.alive?(conn_pid)

# resolver_state should have a NEW worker pid
Expand Down
38 changes: 38 additions & 0 deletions grpc/test/support/peer_shim.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
if not Code.ensure_loaded?(:peer) do
defmodule GRPC.Test.PeerShim do
@moduledoc """
Minimal `:peer` compatibility shim for OTP 24, since `:peer` was added in OTP 25.
"""

def start_link(options) do
name = Map.fetch!(options, :name)
[~c"-setcookie", cookie] = Map.fetch!(options, :args)

ensure_origin_node(cookie)

{:ok, node} = :slave.start_link(~c"127.0.0.1", name, ~c"-setcookie " ++ cookie)

{:ok, node, node}
end

def call(node, module, function, args), do: :rpc.call(node, module, function, args)

def random_name(prefix), do: ~c"#{prefix}-#{:erlang.unique_integer([:positive])}"

def stop(node), do: :slave.stop(node)

defp ensure_origin_node(cookie) do
if not Node.alive?() do
:os.cmd(~c"epmd -daemon")

{:ok, _} =
:net_kernel.start([
:"grpcorigin-#{:os.getpid()}-#{:erlang.unique_integer([:positive])}@127.0.0.1",
:longnames
])
end

Node.set_cookie(String.to_atom(to_string(cookie)))
end
end
end