Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions grpc/lib/grpc/client/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule GRPC.Client.Application do

def start(_type, _args) do
children = [
{Registry, [keys: :unique, name: GRPC.Client.Registry]},
{DynamicSupervisor, [name: GRPC.Client.Supervisor]}
]

Expand Down
6 changes: 3 additions & 3 deletions grpc/lib/grpc/client/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ defmodule GRPC.Client.Connection do
A `Conn` process manages one or more underlying gRPC connections
(`GRPC.Channel` structs) and exposes a virtual channel to be used by
client stubs. The orchestration process runs as a `GenServer` registered
globally (via `:global`), so only one orchestrator exists per connection
in a BEAM node.
in a node-local `Registry`, so named channels are scoped to the current
BEAM node rather than shared across connected nodes.
Comment thread
smartinio marked this conversation as resolved.
Outdated

## Overview

Expand Down Expand Up @@ -309,7 +309,7 @@ defmodule GRPC.Client.Connection do
def terminate(_reason, _state), do: :ok

defp via(ref) do
{:global, {__MODULE__, ref}}
{:via, Registry, {GRPC.Client.Registry, {__MODULE__, ref}}}
end

defp do_disconnect(adapter, channel) do
Expand Down
68 changes: 66 additions & 2 deletions grpc/test/grpc/client/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ defmodule GRPC.Client.ConnectionTest do
alias GRPC.Channel
alias GRPC.Client.Connection

@peer if Code.ensure_loaded?(:peer), do: :peer, else: GRPC.Test.PeerShim

setup do
%{
ref: make_ref(),
Expand Down Expand Up @@ -77,7 +79,7 @@ defmodule GRPC.Client.ConnectionTest do
} do
{:ok, channel} = Connection.connect(target, adapter: adapter, name: ref)

[{pid, _}] = :global.whereis_name({Connection, ref}) |> List.wrap() |> Enum.map(&{&1, nil})
pid = whereis_name(ref)

{:ok, _} = Connection.disconnect(channel)

Expand All @@ -103,12 +105,74 @@ defmodule GRPC.Client.ConnectionTest do
} do
{:ok, channel} = Connection.connect(target, adapter: adapter, name: ref)

pid = :global.whereis_name({Connection, ref})
pid = whereis_name(ref)
ref_mon = Process.monitor(pid)
GenServer.stop(pid, :shutdown)
assert_receive {:DOWN, ^ref_mon, :process, ^pid, :shutdown}, 500

assert {:error, :no_connection} = Connection.pick_channel(channel)
end
end

describe "connect/2 - distributed named channels" do
test "named channels do not conflict across connected nodes" do
{:ok, _, port} = GRPC.Server.start(FeatureServer, 0)

on_exit(fn ->
:ok = GRPC.Server.stop(FeatureServer)
end)

{peer1, node1} = start_peer()
{peer2, node2} = start_peer()

on_exit(fn ->
stop_peer(peer1)
stop_peer(peer2)
end)

assert :pong == @peer.call(peer1, :net_adm, :ping, [node2])
assert :pong == @peer.call(peer2, :net_adm, :ping, [node1])
assert :ok == @peer.call(peer1, :global, :sync, [])
assert :ok == @peer.call(peer2, :global, :sync, [])

target = "ipv4:127.0.0.1:#{port}"
ref = :shared_channel

assert {:ok, %Channel{ref: ^ref}} =
@peer.call(peer1, Connection, :connect, [target, [name: ref]])

assert {:ok, %Channel{ref: ^ref}} =
@peer.call(peer2, Connection, :connect, [target, [name: ref]])
end
end

defp start_peer do
{:ok, peer, node} =
@peer.start_link(%{
name: @peer.random_name(~c"grpcpeer"),
longnames: true,
host: ~c"127.0.0.1",
connection: :standard_io,
args: [~c"-setcookie", ~c"grpcpeer"]
})

:ok = @peer.call(peer, :code, :add_paths, [:code.get_path()])
{:ok, _apps} = @peer.call(peer, Application, :ensure_all_started, [:grpc])

{peer, node}
end

defp stop_peer(peer) do
@peer.stop(peer)
catch
:exit, _reason ->
:ok
end

defp whereis_name(ref) do
case Registry.lookup(GRPC.Client.Registry, {Connection, ref}) do
[{pid, _value}] -> pid
[] -> nil
end
end
end
38 changes: 38 additions & 0 deletions grpc/test/support/peer_shim.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
if not Code.ensure_loaded?(:peer) do
defmodule GRPC.Test.PeerShim do
@moduledoc """
Minimal `:peer` compatibility shim for OTP 24, since `:peer` was added in OTP 25.
"""

def start_link(options) do
name = Map.fetch!(options, :name)
[~c"-setcookie", cookie] = Map.fetch!(options, :args)

ensure_origin_node(cookie)

{:ok, node} = :slave.start_link(~c"127.0.0.1", name, ~c"-setcookie " ++ cookie)

{:ok, node, node}
end

def call(node, module, function, args), do: :rpc.call(node, module, function, args)

def random_name(prefix), do: ~c"#{prefix}-#{:erlang.unique_integer([:positive])}"

def stop(node), do: :slave.stop(node)

defp ensure_origin_node(cookie) do
if not Node.alive?() do
:os.cmd(~c"epmd -daemon")

{:ok, _} =
:net_kernel.start([
:"grpcorigin-#{:os.getpid()}-#{:erlang.unique_integer([:positive])}@127.0.0.1",
:longnames
])
end

Node.set_cookie(String.to_atom(to_string(cookie)))
end
end
end
Loading