diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 32aa5e1..c027488 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -12,6 +12,9 @@ jobs: - stable - beta - nightly + features: + - js-v7 + - python-v4 steps: - uses: actions/checkout@v2 @@ -26,16 +29,20 @@ jobs: - uses: actions-rs/cargo@v1 with: command: build + args: --no-default-features --features ${{ matrix.features }} - uses: actions-rs/cargo@v1 with: command: test + args: --no-default-features --features ${{ matrix.features }} - uses: actions-rs/cargo@v1 with: command: fmt args: --all -- --check + args: --no-default-features --features ${{ matrix.features }} - uses: actions-rs/cargo@v1 with: command: clippy + args: --no-default-features --features ${{ matrix.features }} diff --git a/Cargo.toml b/Cargo.toml index ef19889..87007e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,11 +12,28 @@ keywords = ["socketio"] license = "MIT" [dependencies] -redis = "0.21.2" -rmp = "0.8.10" -serde = "1.0.130" -serde_derive = "1.0.130" -rmp-serde = "0.15.5" +redis = "0.27" +rmp = "0.8" +serde = "1.0" +serde_json = "1.0" +serde_derive = "1.0" +rmp-serde = { version = "1.3", optional = true } +thiserror = "1" [dev-dependencies] -testcontainers = "0.12.0" +testcontainers = { version = "0.22.0", features = ["blocking"] } +tempfile = "3.12" + +[features] +default = ["js-v7"] +# Feature structure: {language}-{version of adapter} +# Unfortunatelly, the version of the adapter is not directly related to the version of the protocol, and differs per implementation. + +# Compatible with socket.io-redis-adapter v7 analogous with socket.io-redis-emitter v4 (socket.io protocol version 4) +js-v7 = ["rmp-serde"] +# Compatible with python-socketio v4 (socket.io protocol version 3 or 4) +python-v4 = [] + +[lints.clippy] +unwrap_in_result = "deny" +unwrap_used = "deny" diff --git a/README.md b/README.md index d520acb..01ac78f 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,9 @@ # socketio-rust-emitter +Emits messages to a Redis instance for consumption by either: +- (default) socket.io servers (protocol version 5, socket.io-redis-adapter v7) (under the feature flag `js_v7`) +- python-socketio (version 4) (under the feature flag `py_v4`) + [![build status](https://github.com/epli2/socketio-rust-emitter/actions/workflows/ci.yaml/badge.svg?branch=master&event=push)](https://github.com/epli2/socketio-rust-emitter/actions) [![socketio-rust-emitter at crates.io](https://img.shields.io/crates/v/socketio-rust-emitter.svg)](https://crates.io/crates/socketio-rust-emitter) [![socketio-rust-emitter at docs.rs](https://docs.rs/socketio-rust-emitter/badge.svg)](https://docs.rs/socketio-rust-emitter) diff --git a/src/implementations/javascript.rs b/src/implementations/javascript.rs new file mode 100644 index 0000000..29d8546 --- /dev/null +++ b/src/implementations/javascript.rs @@ -0,0 +1,208 @@ +use std::collections::HashMap; + +use crate::{Emitter, Result}; +use redis::Commands; +use rmp_serde::Serializer; +use serde::Serialize; + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +struct Packet { + #[serde(rename = "type")] + _type: i32, + data: Vec, + nsp: String, +} + +#[derive(Debug, PartialEq, Serialize, Deserialize)] +struct Opts { + rooms: Vec, + flags: HashMap, +} + +impl Emitter { + pub fn json(mut self) -> Emitter { + let mut flags = HashMap::new(); + flags.insert("json".to_string(), true); + self.flags = flags; + self + } + + pub fn volatile(mut self) -> Emitter { + let mut flags = HashMap::new(); + flags.insert("volatile".to_string(), true); + self.flags = flags; + self + } + + pub fn broadcast(mut self) -> Emitter { + let mut flags = HashMap::new(); + flags.insert("broadcast".to_string(), true); + self.flags = flags; + self + } + + pub fn emit(&mut self, message: Vec<&str>) -> Result<()> { + let packet = Packet { + _type: 2, + data: message.iter().map(|s| s.to_string()).collect(), + nsp: self.nsp.clone(), + }; + let opts = Opts { + rooms: self.rooms.clone(), + flags: self.flags.clone(), + }; + let mut msg = Vec::new(); + let val = (self.uid.clone(), packet, opts); + val.serialize(&mut Serializer::new(&mut msg).with_struct_map())?; + + let channel = if self.rooms.len() == 1 { + format!("{}{}#", self.channel, self.rooms.join("#")) + } else { + self.channel.clone() + }; + + let _: () = self.redis.publish(channel, msg)?; + self.rooms = vec![]; + self.flags = HashMap::new(); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::{Emitter, Opts, Packet}; + use crate::tests::create_redis; + use redis::Msg; + use rmp_serde::Deserializer; + use serde::Deserialize; + + fn decode_msg(msg: Msg) -> (String, Packet, Opts) { + let payload: Vec = msg.get_payload().unwrap(); + let mut de = Deserializer::new(&payload[..]); + Deserialize::deserialize(&mut de).unwrap() + } + + #[test] + fn emit() { + create_redis!(redis); + let mut con = redis.get_connection().unwrap(); + let mut pubsub = con.as_pubsub(); + pubsub.subscribe("socket.io#/#").unwrap(); + + // act + let mut io = Emitter::new(redis).unwrap(); + io.emit(vec!["test1", "test2"]).unwrap(); + + // assert + let actual = decode_msg(pubsub.get_message().unwrap()); + assert_eq!("emitter", actual.0); + assert_eq!( + Packet { + _type: 2, + data: vec!["test1".to_string(), "test2".to_string()], + nsp: "/".to_string(), + }, + actual.1 + ); + assert_eq!( + Opts { + rooms: vec![], + flags: Default::default() + }, + actual.2 + ); + } + + #[test] + fn emit_in_namespaces() { + create_redis!(redis); + let mut con = redis.get_connection().unwrap(); + let mut pubsub = con.as_pubsub(); + pubsub.subscribe("socket.io#/custom#").unwrap(); + + // act + let io = Emitter::new(redis).unwrap(); + io.of("/custom").emit(vec!["test"]).unwrap(); + + // assert + let actual = decode_msg(pubsub.get_message().unwrap()); + assert_eq!("emitter", actual.0); + assert_eq!( + Packet { + _type: 2, + data: vec!["test".to_string()], + nsp: "/custom".to_string(), + }, + actual.1 + ); + assert_eq!( + Opts { + rooms: vec![], + flags: Default::default() + }, + actual.2 + ); + } + + #[test] + fn emit_to_namespaces() { + create_redis!(redis); + let mut con = redis.get_connection().unwrap(); + let mut pubsub = con.as_pubsub(); + pubsub.subscribe("socket.io#/custom#").unwrap(); + + // act + let io = Emitter::new(redis).unwrap(); + io.of("/custom").emit(vec!["test"]).unwrap(); + + // assert + let actual = decode_msg(pubsub.get_message().unwrap()); + assert_eq!("emitter", actual.0); + assert_eq!( + Packet { + _type: 2, + data: vec!["test".to_string()], + nsp: "/custom".to_string(), + }, + actual.1 + ); + assert_eq!( + Opts { + rooms: vec![], + flags: Default::default() + }, + actual.2 + ); + } + + #[test] + fn emit_to_room() { + create_redis!(redis); + let mut con = redis.get_connection().unwrap(); + let mut pubsub = con.as_pubsub(); + pubsub.subscribe("socket.io#/#room1#").unwrap(); + + // act + let io = Emitter::new(redis).unwrap(); + io.to("room1").emit(vec!["test"]).unwrap(); + + // assert + let actual = decode_msg(pubsub.get_message().unwrap()); + assert_eq!("emitter", actual.0); + assert_eq!( + Packet { + _type: 2, + data: vec!["test".to_string()], + nsp: "/".to_string(), + }, + actual.1 + ); + assert_eq!( + Opts { + rooms: vec!["room1".to_string()], + flags: Default::default() + }, + actual.2 + ); + } +} diff --git a/src/implementations/mod.rs b/src/implementations/mod.rs new file mode 100644 index 0000000..ffa2325 --- /dev/null +++ b/src/implementations/mod.rs @@ -0,0 +1,8 @@ +//! A specific protocol used by the redis adapter. +//! +//! Unfortunately, the line protocol differs between the Python and JS implementations. +//! This module provides a way to abstract over the differences. +#[cfg(feature = "python-v4")] +mod python_socketio; +#[cfg(feature = "js-v7")] +mod javascript; diff --git a/src/implementations/python_socketio.rs b/src/implementations/python_socketio.rs new file mode 100644 index 0000000..81e1be7 --- /dev/null +++ b/src/implementations/python_socketio.rs @@ -0,0 +1,333 @@ +use std::fmt::Display; + +use redis::Commands; +use serde::Serialize; +use serde_json::json; + +use crate::{Emitter, Result}; + +impl Emitter { + /// Overrides the default channel name. + pub fn channel(mut self, name: &str) -> Emitter { + self.channel = name.to_string(); + self + } + + pub fn emit_json( + &mut self, + event: Event, + message: T, + ) -> Result<()> { + fn _emit_json( + redis: &mut redis::Client, + channel: &str, + nsp: &str, + room: Option<&str>, + event: &str, + data: &serde_json::Value, + ) -> Result<()> { + let message = json!({ + "method": "emit", + "event": event, + "data": data, + "namespace": nsp, + "room": room, + "skip_sid": null, + "callback": null, + "host_id": null, + }); + let _: () = redis.publish(channel, message.to_string())?; + Ok(()) + } + let event = event.to_string(); + let data = serde_json::to_value(&message)?; + + if self.rooms.is_empty() { + _emit_json( + &mut self.redis, + &self.channel, + &self.nsp, + None, + &event, + &data, + )?; + } else { + for room in self.rooms.iter() { + _emit_json( + &mut self.redis, + &self.channel, + &self.nsp, + Some(&room), + &event, + &data, + )?; + } + } + Ok(()) + } +} + +#[cfg(test)] +mod test { + use std::{io::Write, time::Duration}; + + use super::*; + use serde_json::json; + use testcontainers::{core::ExecCommand, runners::SyncRunner, ImageExt}; + + struct PythonSocketIOImage { + mounts: Vec, + // Keep temp file alive + _temp_file: tempfile::NamedTempFile, + } + + impl PythonSocketIOImage { + fn new(redis_url: String, channel: Option) -> Self { + let mut temp_file = tempfile::NamedTempFile::new().unwrap(); + let channel_python_arg = if let Some(channel) = channel { + format!(",channel='{}'", channel) + } else { + "".to_owned() + }; + temp_file + .as_file_mut() + .write_all( + b" +import socketio +import os +import logging +import asyncio + +logging.basicConfig(level=logging.DEBUG) + +", + ) + .unwrap(); + + temp_file + .as_file_mut() + .write_all( + format!( + r#"client_manager = socketio.AsyncRedisManager(url="{}"{})"#, + redis_url, channel_python_arg + ) + .as_bytes(), + ) + .unwrap(); + + temp_file + .as_file_mut() + .write_all( + b" +loop = asyncio.new_event_loop() + +print('Server running') +with open('/var/run/socketio.ready', 'w') as f: + f.write('ready') +while True: + # We need to flush in order to see the message across the pipe + print('message:', loop.run_until_complete(client_manager._listen()), flush=True) +", + ) + .unwrap(); + let path = temp_file.path().to_str().unwrap().to_string(); + Self { + mounts: vec![testcontainers::core::Mount::bind_mount( + path, + "/opt/socketio/server.py", + )], + _temp_file: temp_file, + } + } + } + + impl PythonSocketIOImage { + fn messages(container: &testcontainers::Container) -> Vec { + // Grab the contents of /var/log/socketio.log + let mut buf = String::new(); + container + .exec( + ExecCommand::new(vec!["cat", "/var/log/socketio.log"]) + .with_cmd_ready_condition(testcontainers::core::CmdWaitFor::exit_code(0)), + ) + .unwrap() + .stdout() + .read_to_string(&mut buf) + .unwrap(); + // Unfortunately this log message doesn't have the contents of the message + buf.lines() + .filter_map(|line| { + if line.starts_with("message: ") { + Some(line[("message: ".len())..].to_string()) + } else { + None + } + }) + .collect() + } + } + + impl testcontainers::Image for PythonSocketIOImage { + fn name(&self) -> &str { + "python" + } + + fn tag(&self) -> &str { + // This is the last supported release of aioredis (which is a dependency of python-socketio) + "3.10" + } + + fn ready_conditions(&self) -> Vec { + vec![] + } + + fn mounts(&self) -> impl IntoIterator { + self.mounts.iter() + } + + fn expose_ports(&self) -> &[testcontainers::core::ContainerPort] { + &[testcontainers::core::ContainerPort::Tcp(8443)] + } + + fn cmd(&self) -> impl IntoIterator>> { + // Sleep for infinity, we will start the server in the test + vec!["sleep", "infinity"].into_iter() + } + } + + fn launch_containers( + channel: Option, + ) -> ( + testcontainers::Container, + String, + testcontainers::Container, + ) { + let redis = crate::tests::launch_redis_container(); + std::thread::sleep(Duration::from_secs(1)); + + // Get the first 12 characters of the container id, this is the hostname + let container_redis_url = format!("redis://{}:{}/0", &redis.id()[..12], 6379); + let python_socketio_server = PythonSocketIOImage::new(container_redis_url.clone(), channel) + .with_network(crate::tests::DOCKER_NETWORK_NAME) + .start() + .unwrap(); + python_socketio_server + .exec( + ExecCommand::new(vec!["pip", "install", "python-socketio==4.6.1", "python-engineio==3.14.2", "six==1.16.0", "aioredis==1.3.1"]) + .with_cmd_ready_condition(testcontainers::core::CmdWaitFor::message_on_stdout( + "Successfully installed aioredis-1.3.1 async-timeout-4.0.3 hiredis-3.0.0 python-engineio-3.14.2 python-socketio-4.6.1 six-1.16.0\n", + )), + ) + .unwrap(); + python_socketio_server + .exec(ExecCommand::new(vec![ + "/bin/bash", + "-c", + "(python /opt/socketio/server.py 2>&1) > /var/log/socketio.log", + ])) + .unwrap(); + // Wait for the server to start by checking the ready file + python_socketio_server + .exec( + ExecCommand::new(vec![ + "/bin/sh", + "-c", + "while [ ! -f /var/run/socketio.ready ]; do sleep 1; done", + ]) + .with_cmd_ready_condition(testcontainers::core::CmdWaitFor::exit_code(0)), + ) + .unwrap(); + + let host_redis_url = format!( + "redis://localhost:{}/0", + redis.get_host_port_ipv4(6379).unwrap() + ); + (redis, host_redis_url, python_socketio_server) + } + + #[test] + fn test_emit_json() { + let (redis, redis_url, container) = launch_containers(None); + let emitter = Emitter::new(redis::Client::open(redis_url).unwrap()).unwrap(); + emitter + .to("room") + .emit_json("my_event", json!({"key": "value"})) + .unwrap(); + // Now check to see if the message was received + let messages = PythonSocketIOImage::messages(&container); + container.stop().unwrap(); + redis.stop().unwrap(); + container.rm().unwrap(); + redis.rm().unwrap(); + assert_eq!( + messages, + vec![ + r#"b'{"callback":null,"data":{"key":"value"},"event":"my_event","host_id":null,"method":"emit","namespace":"/","room":"room","skip_sid":null}'"# + ] + ); + } + + #[test] + fn test_custom_channel() { + let (redis, redis_url, container) = launch_containers(Some("custom_channel".to_string())); + let emitter = Emitter::new(redis::Client::open(redis_url).unwrap()) + .unwrap() + .channel("custom_channel"); + emitter + .to("room") + .emit_json("my_event", json!({"key": "value"})) + .unwrap(); + // Now check to see if the message was received + let messages = PythonSocketIOImage::messages(&container); + container.stop().unwrap(); + redis.stop().unwrap(); + container.rm().unwrap(); + redis.rm().unwrap(); + assert_eq!( + messages, + vec![ + r#"b'{"callback":null,"data":{"key":"value"},"event":"my_event","host_id":null,"method":"emit","namespace":"/","room":"room","skip_sid":null}'"# + ] + ); + } + + #[test] + fn test_no_room() { + let (redis, redis_url, container) = launch_containers(None); + let mut emitter = Emitter::new(redis::Client::open(redis_url).unwrap()).unwrap(); + emitter + .emit_json("my_event", json!({"key": "value"})) + .unwrap(); + // Now check to see if the message was received + let messages = PythonSocketIOImage::messages(&container); + container.stop().unwrap(); + redis.stop().unwrap(); + container.rm().unwrap(); + redis.rm().unwrap(); + assert_eq!( + messages, + vec![ + r#"b'{"callback":null,"data":{"key":"value"},"event":"my_event","host_id":null,"method":"emit","namespace":"/","room":null,"skip_sid":null}'"# + ] + ); + } + + #[test] + fn test_array_data() { + let (redis, redis_url, container) = launch_containers(None); + let mut emitter = Emitter::new(redis::Client::open(redis_url).unwrap()).unwrap(); + emitter.emit_json("my_event", json!([1, 2, 3])).unwrap(); + // Now check to see if the message was received + let messages = PythonSocketIOImage::messages(&container); + container.stop().unwrap(); + redis.stop().unwrap(); + container.rm().unwrap(); + redis.rm().unwrap(); + assert_eq!( + messages, + vec![ + r#"b'{"callback":null,"data":[1,2,3],"event":"my_event","host_id":null,"method":"emit","namespace":"/","room":null,"skip_sid":null}'"# + ] + ); + } +} diff --git a/src/lib.rs b/src/lib.rs index a021811..b52f247 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,11 +1,11 @@ +#[cfg(feature = "js-v7")] #[macro_use] extern crate serde_derive; - -use redis::Commands; -use rmp_serde::Serializer; -use serde::Serialize; +#[cfg(feature = "js-v7")] use std::collections::HashMap; +mod implementations; + #[derive(Debug, Clone)] pub struct Emitter { redis: redis::Client, @@ -13,24 +13,12 @@ pub struct Emitter { nsp: String, channel: String, rooms: Vec, + #[cfg(feature = "js-v7")] flags: HashMap, + #[cfg(feature = "js-v7")] uid: String, } -#[derive(Debug, PartialEq, Serialize, Deserialize)] -struct Opts { - rooms: Vec, - flags: HashMap, -} - -#[derive(Debug, PartialEq, Serialize, Deserialize)] -pub struct Packet { - #[serde(rename = "type")] - _type: i32, - data: Vec, - nsp: String, -} - #[derive(Debug, PartialEq, Clone, Default)] pub struct EmitterOpts<'a> { pub host: String, @@ -40,31 +28,31 @@ pub struct EmitterOpts<'a> { } pub trait IntoEmitter { - fn into_emitter(self) -> Emitter; + fn into_emitter(self) -> Result; } impl IntoEmitter for redis::Client { - fn into_emitter(self) -> Emitter { - create_emitter(self, "socket.io", "/") + fn into_emitter(self) -> Result { + Ok(create_emitter(self, "socket.io", "/")) } } impl<'a> IntoEmitter for EmitterOpts<'a> { - fn into_emitter(self) -> Emitter { + fn into_emitter(self) -> Result { let addr = format!("redis://{}:{}", self.host, self.port); let prefix = self.key.unwrap_or("socket.io"); - create_emitter(redis::Client::open(addr.as_str()).unwrap(), prefix, "/") + Ok(create_emitter(redis::Client::open(addr.as_str())?, prefix, "/")) } } impl IntoEmitter for &str { - fn into_emitter(self) -> Emitter { - create_emitter( - redis::Client::open(format!("redis://{}", self).as_str()).unwrap(), + fn into_emitter(self) -> Result { + Ok(create_emitter( + redis::Client::open(format!("redis://{}", self).as_str())?, "socket.io", "/", - ) + )) } } @@ -73,15 +61,20 @@ fn create_emitter(redis: redis::Client, prefix: &str, nsp: &str) -> Emitter { redis, prefix: prefix.to_string(), nsp: nsp.to_string(), + #[cfg(feature = "js-v7")] channel: format!("{}#{}#", prefix, nsp), + #[cfg(feature = "python-v4")] + channel: "socketio".to_string(), rooms: Vec::new(), + #[cfg(feature = "js-v7")] flags: HashMap::new(), + #[cfg(feature = "js-v7")] uid: "emitter".to_string(), } } impl Emitter { - pub fn new(data: I) -> Emitter { + pub fn new(data: I) -> Result { data.into_emitter() } @@ -89,202 +82,66 @@ impl Emitter { self.rooms.push(room.to_string()); self } + pub fn of(self, nsp: &str) -> Emitter { create_emitter(self.redis, self.prefix.as_str(), nsp) } - pub fn json(mut self) -> Emitter { - let mut flags = HashMap::new(); - flags.insert("json".to_string(), true); - self.flags = flags; - self - } - pub fn volatile(mut self) -> Emitter { - let mut flags = HashMap::new(); - flags.insert("volatile".to_string(), true); - self.flags = flags; - self - } - pub fn broadcast(mut self) -> Emitter { - let mut flags = HashMap::new(); - flags.insert("broadcast".to_string(), true); - self.flags = flags; - self - } - pub fn emit(mut self, message: Vec<&str>) -> Emitter { - let packet = Packet { - _type: 2, - data: message.iter().map(|s| s.to_string()).collect(), - nsp: self.nsp.clone(), - }; - let opts = Opts { - rooms: self.rooms.clone(), - flags: self.flags.clone(), - }; - let mut msg = Vec::new(); - let val = (self.uid.clone(), packet, opts); - val.serialize(&mut Serializer::new(&mut msg).with_struct_map()) - .unwrap(); - let channel = if self.rooms.len() == 1 { - format!("{}{}#", self.channel, self.rooms.join("#")) - } else { - self.channel.clone() - }; - let _: () = self.redis.publish(channel, msg).unwrap(); - self.rooms = vec![]; - self.flags = HashMap::new(); - self - } + // Emitting functions are added in the implementation modules. } +#[cfg(all(not(feature = "js-v7"), not(feature = "python-v4")))] +compile_error!("At least one of the features 'js-v7' or 'python-v4' must be enabled."); + +#[cfg(all(feature = "js-v7", feature = "python-v4"))] +compile_error!("Only one of the features 'js-v7' or 'python-v4' can be enabled."); + +#[non_exhaustive] +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Error connecting to Redis: {0}")] + Redis(#[from] redis::RedisError), + #[error("Error serializing data: {0}")] + #[cfg(feature = "python-v4")] + Serde(#[from] serde_json::Error), + #[error("Error serializing data: {0}")] + #[cfg(feature = "js-v7")] + Serde(#[from] rmp_serde::encode::Error), + +} + +pub type Result = std::result::Result; + #[cfg(test)] -mod tests { - use crate::{Emitter, Opts, Packet}; - use redis::Msg; - use rmp_serde::Deserializer; - use serde::Deserialize; +pub(crate) mod tests { + pub const DOCKER_NETWORK_NAME: &str = "testcontainers-socketio"; + pub(crate) fn launch_redis_container() -> testcontainers::Container + { + use testcontainers::runners::SyncRunner; + let redis = testcontainers::GenericImage::new("redis", "latest") + .with_exposed_port(testcontainers::core::ContainerPort::Tcp(6379)) + .with_wait_for(testcontainers::core::WaitFor::message_on_stdout( + "Ready to accept connections", + )) + .with_network(DOCKER_NETWORK_NAME) + .start() + .unwrap(); + redis + } + #[cfg(feature = "js-v7")] macro_rules! create_redis { ($redis:ident) => { - use testcontainers::{clients, core::RunArgs, images, Docker}; - let docker = clients::Cli::default(); - let container = - docker.run_with_args(images::redis::Redis::default(), RunArgs::default()); + let redis = crate::tests::launch_redis_container(); let redis_url = format!( "redis://localhost:{}", - container.get_host_port(6379).unwrap() + redis.get_host_port_ipv4(6379).unwrap() ); let $redis = redis::Client::open(redis_url.as_str()).unwrap(); }; } - fn decode_msg(msg: Msg) -> (String, Packet, Opts) { - let payload: Vec = msg.get_payload().unwrap(); - let mut de = Deserializer::new(&payload[..]); - Deserialize::deserialize(&mut de).unwrap() - } - - #[test] - fn emit() { - create_redis!(redis); - let mut con = redis.get_connection().unwrap(); - let mut pubsub = con.as_pubsub(); - pubsub.subscribe("socket.io#/#").unwrap(); - - // act - let io = Emitter::new(redis); - io.emit(vec!["test1", "test2"]); - - // assert - let actual = decode_msg(pubsub.get_message().unwrap()); - assert_eq!("emitter", actual.0); - assert_eq!( - Packet { - _type: 2, - data: vec!["test1".to_string(), "test2".to_string()], - nsp: "/".to_string(), - }, - actual.1 - ); - assert_eq!( - Opts { - rooms: vec![], - flags: Default::default() - }, - actual.2 - ); - } - - #[test] - fn emit_in_namespaces() { - create_redis!(redis); - let mut con = redis.get_connection().unwrap(); - let mut pubsub = con.as_pubsub(); - pubsub.subscribe("socket.io#/custom#").unwrap(); - - // act - let io = Emitter::new(redis); - io.of("/custom").emit(vec!["test"]); - - // assert - let actual = decode_msg(pubsub.get_message().unwrap()); - assert_eq!("emitter", actual.0); - assert_eq!( - Packet { - _type: 2, - data: vec!["test".to_string()], - nsp: "/custom".to_string(), - }, - actual.1 - ); - assert_eq!( - Opts { - rooms: vec![], - flags: Default::default() - }, - actual.2 - ); - } - - #[test] - fn emit_to_namespaces() { - create_redis!(redis); - let mut con = redis.get_connection().unwrap(); - let mut pubsub = con.as_pubsub(); - pubsub.subscribe("socket.io#/custom#").unwrap(); - - // act - let io = Emitter::new(redis); - io.of("/custom").emit(vec!["test"]); - - // assert - let actual = decode_msg(pubsub.get_message().unwrap()); - assert_eq!("emitter", actual.0); - assert_eq!( - Packet { - _type: 2, - data: vec!["test".to_string()], - nsp: "/custom".to_string(), - }, - actual.1 - ); - assert_eq!( - Opts { - rooms: vec![], - flags: Default::default() - }, - actual.2 - ); - } - - #[test] - fn emit_to_room() { - create_redis!(redis); - let mut con = redis.get_connection().unwrap(); - let mut pubsub = con.as_pubsub(); - pubsub.subscribe("socket.io#/#room1#").unwrap(); - - // act - let io = Emitter::new(redis); - io.to("room1").emit(vec!["test"]); - - // assert - let actual = decode_msg(pubsub.get_message().unwrap()); - assert_eq!("emitter", actual.0); - assert_eq!( - Packet { - _type: 2, - data: vec!["test".to_string()], - nsp: "/".to_string(), - }, - actual.1 - ); - assert_eq!( - Opts { - rooms: vec!["room1".to_string()], - flags: Default::default() - }, - actual.2 - ); - } + #[cfg(feature = "js-v7")] + pub(crate) use create_redis; + use testcontainers::ImageExt; }