diff --git a/pom.xml b/pom.xml index 3716256..c665d14 100644 --- a/pom.xml +++ b/pom.xml @@ -17,7 +17,7 @@ 8.4.0 4.12.0 4.27.2 - 1.19.8 + 1.20.0 diff --git a/src/main/java/io/eigr/spawn/api/Spawn.java b/src/main/java/io/eigr/spawn/api/Spawn.java index a034ba4..47d2e41 100644 --- a/src/main/java/io/eigr/spawn/api/Spawn.java +++ b/src/main/java/io/eigr/spawn/api/Spawn.java @@ -219,7 +219,10 @@ private Map getActors(List entities) { Map tags = new HashMap<>(); ActorOuterClass.Metadata metadata = ActorOuterClass.Metadata.newBuilder() - .setChannelGroup(actorEntity.getChannel()).putAllTags(tags) + .addChannelGroup(ActorOuterClass.Channel.newBuilder() + .setTopic(actorEntity.getChannel()) + .build()) + .putAllTags(tags) .build(); return ActorOuterClass.Actor.newBuilder() diff --git a/src/main/proto/eigr/functions/protocol/actors/actor.proto b/src/main/proto/eigr/functions/protocol/actors/actor.proto index 10df44c..5e3aa71 100644 --- a/src/main/proto/eigr/functions/protocol/actors/actor.proto +++ b/src/main/proto/eigr/functions/protocol/actors/actor.proto @@ -70,11 +70,19 @@ message ActorState { message Metadata { // A channel group represents a way to send actions to various actors // that belong to a certain semantic group. - string channel_group = 1; + repeated Channel channel_group = 1; map tags = 2; } +// Represents a Pub-Sub binding, where a actor can be subscribed to a channel +// and map a specific action to a specific topic if necessary +// if the action is not informed, the default action will be "receive". +message Channel { + string topic = 1; + string action = 2; +} + // The type that defines the runtime characteristics of the Actor. // Regardless of the type of actor it is important that // all actors are registered during the proxy and host initialization phase. diff --git a/src/test/java/io/eigr/spawn/AbstractContainerBaseTest.java b/src/test/java/io/eigr/spawn/AbstractContainerBaseTest.java new file mode 100644 index 0000000..e6d33b6 --- /dev/null +++ b/src/test/java/io/eigr/spawn/AbstractContainerBaseTest.java @@ -0,0 +1,74 @@ +package io.eigr.spawn; + +import com.github.dockerjava.api.model.HostConfig; +import com.github.dockerjava.api.model.PortBinding; +import io.eigr.spawn.api.Spawn; +import io.eigr.spawn.api.TransportOpts; +import io.eigr.spawn.api.exceptions.SpawnException; +import io.eigr.spawn.api.extensions.DependencyInjector; +import io.eigr.spawn.api.extensions.SimpleDependencyInjector; +import io.eigr.spawn.test.actors.ActorWithConstructor; +import io.eigr.spawn.test.actors.JoeActor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.Testcontainers; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +abstract class AbstractContainerBaseTest { + + private static final Logger log = LoggerFactory.getLogger(AbstractContainerBaseTest.class); + private static final GenericContainer SPAWN_CONTAINER; + private static final String spawnProxyImage = "eigr/spawn-proxy:1.4.1-rc.1"; + private static final String userFunctionPort = "8091"; + private static final String spawnProxyPort = "9004"; + protected static final Spawn spawnSystem; + protected static final String spawnSystemName = "spawn-system-test"; + + static { + Testcontainers.exposeHostPorts(8091); + + SPAWN_CONTAINER = new GenericContainer<>(DockerImageName.parse(spawnProxyImage)) + .withCreateContainerCmdModifier(e -> e.withHostConfig(HostConfig.newHostConfig() + .withPortBindings(PortBinding.parse("9004:9004")))) + // .withEnv("TZ", "America/Fortaleza") + .withEnv("SPAWN_PROXY_LOGGER_LEVEL", "DEBUG") + .withEnv("SPAWN_STATESTORE_KEY", "3Jnb0hZiHIzHTOih7t2cTEPEpY98Tu1wvQkPfq/XwqE=") + .withEnv("PROXY_ACTOR_SYSTEM_NAME", spawnSystemName) + .withEnv("PROXY_DATABASE_TYPE", "native") + .withEnv("PROXY_DATABASE_DATA_DIR", "mnesia_data") + .withEnv("NODE_COOKIE", "cookie-9ce3712b0c3ee21b582c30f942c0d4da-HLuZyQzy+nt0p0r/PVVFTp2tqfLom5igrdmwkYSuO+Q=") + .withEnv("POD_NAMESPACE", spawnSystemName) + .withEnv("POD_IP", spawnSystemName) + .withEnv("PROXY_HTTP_PORT", spawnProxyPort) + .withEnv("USER_FUNCTION_PORT", userFunctionPort) + .withEnv("USER_FUNCTION_HOST", "host.docker.internal") // Docker + .withExtraHost("host.docker.internal", "host-gateway") // Docker +// .withEnv("USER_FUNCTION_HOST", "host.containers.internal") // Podman +// .withExtraHost("host.containers.internal", "host-gateway") // Podman + .withExposedPorts(9004); + SPAWN_CONTAINER.start(); + + DependencyInjector injector = SimpleDependencyInjector.createInjector(); + injector.bind(String.class, "Hello with Constructor"); + + spawnSystem = new Spawn.SpawnSystem() + .create(spawnSystemName) + .withActor(JoeActor.class) + .withActor(ActorWithConstructor.class, injector, arg -> new ActorWithConstructor((DependencyInjector) arg)) + .withTerminationGracePeriodSeconds(5) + .withTransportOptions(TransportOpts.builder() + .port(8091) + .proxyPort(9004) + .build()) + .build(); + + try { + spawnSystem.start(); + } catch (SpawnException e) { + throw new RuntimeException(e); + } + log.info(String.format("%s started", spawnSystemName)); + } +} + diff --git a/src/test/java/io/eigr/spawn/ContainerTest.java b/src/test/java/io/eigr/spawn/ContainerTest.java new file mode 100644 index 0000000..6440e8a --- /dev/null +++ b/src/test/java/io/eigr/spawn/ContainerTest.java @@ -0,0 +1,37 @@ +package io.eigr.spawn; + +import io.eigr.spawn.api.ActorIdentity; +import io.eigr.spawn.api.ActorRef; +import io.eigr.spawn.api.exceptions.SpawnException; +import io.eigr.spawn.java.test.domain.Actor; +import org.junit.Test; + +import java.util.Optional; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public final class ContainerTest extends AbstractContainerBaseTest { + + @Test + public void testApp() throws SpawnException { + ActorRef joeActor = spawnSystem.createActorRef( + ActorIdentity.of(spawnSystemName, "test_joe")); + + Actor.Request msg = Actor.Request.newBuilder() + .setLanguage("erlang") + .build(); + + Optional maybeReply = + joeActor.invoke("setLanguage", msg, Actor.Reply.class); + + if (maybeReply.isPresent()) { + Actor.Reply reply = maybeReply.get(); + assertNotNull(reply); + assertEquals("Hello From Java", reply.getResponse()); + } else { + throw new RuntimeException("Error"); + } + + } +}