diff --git a/.gitignore b/.gitignore index 46a7e1f7..6c8853a7 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ CMakeCache.txt Makefile cmake_install.cmake out +build.log build/ build-debug/ build-release/ diff --git a/CMakeLists.txt b/CMakeLists.txt index a7a312d9..86eb1e49 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -335,6 +335,7 @@ add_library(livekit SHARED src/room.cpp src/room_proto_converter.cpp src/room_proto_converter.h + src/subscription_thread_dispatcher.cpp src/local_participant.cpp src/remote_participant.cpp src/stats.cpp @@ -723,4 +724,4 @@ add_custom_target(clean_all COMMAND ${CMAKE_COMMAND} -E chdir "${CMAKE_SOURCE_DIR}" ${CMAKE_COMMAND} -E rm -rf "${CMAKE_BINARY_DIR}" COMMENT "Full clean: CMake outputs + Rust target + generated protos + delete build/" VERBATIM -) \ No newline at end of file +) diff --git a/bridge/CMakeLists.txt b/bridge/CMakeLists.txt index 1d045a19..369c98df 100644 --- a/bridge/CMakeLists.txt +++ b/bridge/CMakeLists.txt @@ -9,8 +9,6 @@ add_library(livekit_bridge SHARED src/livekit_bridge.cpp src/bridge_audio_track.cpp src/bridge_video_track.cpp - src/bridge_room_delegate.cpp - src/bridge_room_delegate.h src/rpc_constants.cpp src/rpc_controller.cpp src/rpc_controller.h diff --git a/bridge/README.md b/bridge/README.md index cebb09b2..5f7c54cf 100644 --- a/bridge/README.md +++ b/bridge/README.md @@ -1,5 +1,7 @@ # LiveKit Bridge +# **WARNING: This library is deprecated, use the base sdk found in src/** + A simplified, high-level C++ wrapper around the [LiveKit C++ SDK](../README.md). The bridge abstracts away room lifecycle management, track creation, publishing, and subscription boilerplate so that external codebases can interface with LiveKit in just a few lines. It is intended that this library will be used to bridge the LiveKit C++ SDK into other SDKs such as, but not limited to, Foxglove, ROS, and Rerun. It is intended that this library closely matches the style of the core LiveKit C++ SDK. @@ -98,36 +100,33 @@ Your Application **`BridgeAudioTrack` / `BridgeVideoTrack`** -- RAII handles for published local tracks. Created via `createAudioTrack()` / `createVideoTrack()`. When the `shared_ptr` is dropped, the track is automatically unpublished and all underlying SDK resources are freed. Call `pushFrame()` to send audio/video data to remote participants. -**`BridgeRoomDelegate`** -- Internal (not part of the public API; lives in `src/`). Listens for `onTrackSubscribed` / `onTrackUnsubscribed` events from the LiveKit SDK and wires up reader threads automatically. - ### What is a Reader? A **reader** is a background thread that receives decoded media frames from a remote participant. -When a remote participant publishes an audio or video track and the bridge subscribes to it (auto-subscribe is enabled by default), the bridge creates an `AudioStream` or `VideoStream` from that track and spins up a dedicated thread. This thread loops on `stream->read()`, which blocks until a new frame arrives. Each received frame is forwarded to the user's registered callback. +When a remote participant publishes an audio or video track and the room subscribes to it (auto-subscribe is enabled by default), `Room` creates an `AudioStream` or `VideoStream` from that track and spins up a dedicated thread. This thread loops on `stream->read()`, which blocks until a new frame arrives. Each received frame is forwarded to the user's registered callback. In short: - **Sending** (you -> remote): `BridgeAudioTrack::pushFrame()` / `BridgeVideoTrack::pushFrame()` - **Receiving** (remote -> you): reader threads invoke your registered callbacks -Reader threads are managed entirely by the bridge. They are created when a matching remote track is subscribed, and torn down (stream closed, thread joined) when the track is unsubscribed, the callback is unregistered, or `disconnect()` is called. +Reader threads are managed by `Room` internally. They are created when a matching remote track is subscribed, and torn down (stream closed, thread joined) when the track is unsubscribed, the callback is unregistered, or the `Room` is destroyed. ### Callback Registration Timing -Callbacks are keyed by `(participant_identity, track_source)`. You can register them **before** the remote participant has joined the room. The bridge stores the callback and automatically wires it up when the matching track is subscribed. +Callbacks are keyed by `(participant_identity, track_source)`. You can register them **after connecting** but before the remote participant has joined the room. `Room` stores the callback and automatically wires it up when the matching track is subscribed. > **Note:** Only one callback may be set per `(participant_identity, track_source)` pair. Calling `setOnAudioFrameCallback` or `setOnVideoFrameCallback` again with the same identity and source will silently replace the previous callback. If you need to fan-out a single stream to multiple consumers, do so inside your callback. This means the typical pattern is: ```cpp -// Register first, connect second -- or register after connect but before -// the remote participant joins. -bridge.setOnAudioFrameCallback("robot-1", livekit::TrackSource::SOURCE_MICROPHONE, my_callback); +// Connect first, then register callbacks before the remote participant joins. livekit::RoomOptions options; options.auto_subscribe = true; bridge.connect(url, token, options); +bridge.setOnAudioFrameCallback("robot-1", livekit::TrackSource::SOURCE_MICROPHONE, my_callback); // When robot-1 joins and publishes a mic track, my_callback starts firing. ``` @@ -135,7 +134,7 @@ bridge.connect(url, token, options); - `LiveKitBridge` uses a mutex to protect the callback map and active reader state. - Frame callbacks fire on background reader threads. If your callback accesses shared application state, you are responsible for synchronization. -- `disconnect()` closes all streams and joins all reader threads before returning -- it is safe to destroy the bridge immediately after. +- `disconnect()` destroys the `Room`, which closes all streams and joins all reader threads before returning -- it is safe to destroy the bridge immediately after. ## API Reference @@ -226,9 +225,8 @@ The human will print periodic summaries like: ## Testing The bridge includes a unit test suite built with [Google Test](https://github.com/google/googletest). Tests cover -1. `CallbackKey` hashing/equality, -2. `BridgeAudioTrack`/`BridgeVideoTrack` state management, and -3. `LiveKitBridge` pre-connection behaviour (callback registration, error handling). +1. `BridgeAudioTrack`/`BridgeVideoTrack` state management, and +2. `LiveKitBridge` pre-connection behaviour (callback registration, error handling). ### Building and running tests diff --git a/bridge/include/livekit_bridge/bridge_audio_track.h b/bridge/include/livekit_bridge/bridge_audio_track.h index 5683e069..cdd22342 100644 --- a/bridge/include/livekit_bridge/bridge_audio_track.h +++ b/bridge/include/livekit_bridge/bridge_audio_track.h @@ -140,6 +140,7 @@ class BridgeAudioTrack { std::shared_ptr source_; std::shared_ptr track_; + /* DEPRECATED. use track_->publication() instead */ std::shared_ptr publication_; livekit::LocalParticipant *participant_ = nullptr; // not owned }; diff --git a/bridge/include/livekit_bridge/bridge_video_track.h b/bridge/include/livekit_bridge/bridge_video_track.h index 8057b7a9..27169151 100644 --- a/bridge/include/livekit_bridge/bridge_video_track.h +++ b/bridge/include/livekit_bridge/bridge_video_track.h @@ -138,6 +138,7 @@ class BridgeVideoTrack { std::shared_ptr source_; std::shared_ptr track_; + /* DEPRECATED. use track_->publication() instead */ std::shared_ptr publication_; livekit::LocalParticipant *participant_ = nullptr; // not owned }; diff --git a/bridge/include/livekit_bridge/livekit_bridge.h b/bridge/include/livekit_bridge/livekit_bridge.h index 7f152597..47f13d46 100644 --- a/bridge/include/livekit_bridge/livekit_bridge.h +++ b/bridge/include/livekit_bridge/livekit_bridge.h @@ -29,44 +29,35 @@ #include "livekit/rpc_error.h" #include -#include #include #include #include -#include -#include #include namespace livekit { class Room; class AudioFrame; class VideoFrame; -class AudioStream; -class VideoStream; -class Track; enum class TrackSource; } // namespace livekit namespace livekit_bridge { -class BridgeRoomDelegate; class RpcController; namespace test { -class CallbackKeyTest; class LiveKitBridgeTest; } // namespace test /// Callback type for incoming audio frames. -/// Called on a background reader thread. -using AudioFrameCallback = std::function; +/// Called on a background reader thread owned by Room. +using AudioFrameCallback = livekit::AudioFrameCallback; /// Callback type for incoming video frames. -/// Called on a background reader thread. +/// Called on a background reader thread owned by Room. /// @param frame The decoded video frame (RGBA by default). /// @param timestamp_us Presentation timestamp in microseconds. -using VideoFrameCallback = std::function; +using VideoFrameCallback = livekit::VideoFrameCallback; /** * High-level bridge to the LiveKit C++ SDK. @@ -74,6 +65,9 @@ using VideoFrameCallback = std::function audio_stream; - std::shared_ptr video_stream; - std::thread thread; - bool is_audio = false; - }; - - /// Called by BridgeRoomDelegate when a remote track is subscribed. - void onTrackSubscribed(const std::string &participant_identity, - livekit::TrackSource source, - const std::shared_ptr &track); - - /// Called by BridgeRoomDelegate when a remote track is unsubscribed. - void onTrackUnsubscribed(const std::string &participant_identity, - livekit::TrackSource source); - - /// Extract the thread for the given callback key. - /// @pre Caller must hold @c mutex_. - std::thread extractReaderThread(const CallbackKey &key); - - /// Start a reader thread for a subscribed track. - /// @return The reader thread for this track. - /// @pre Caller must hold @c mutex_. - std::thread startAudioReader(const CallbackKey &key, - const std::shared_ptr &track, - AudioFrameCallback cb); - /// @copydoc startAudioReader - std::thread startVideoReader(const CallbackKey &key, - const std::shared_ptr &track, - VideoFrameCallback cb); - /// Execute a track action (mute/unmute) by track name. /// Used as the TrackActionFn callback for RpcController. /// Throws livekit::RpcError if the track is not found. @@ -415,23 +361,9 @@ class LiveKitBridge { bool connecting_; // guards against concurrent connect() calls bool sdk_initialized_; - static constexpr int kMaxActiveReaders = 20; - std::unique_ptr room_; - std::unique_ptr delegate_; std::unique_ptr rpc_controller_; - /// Registered callbacks (may be registered before tracks are subscribed). - std::unordered_map - audio_callbacks_; - /// @copydoc audio_callbacks_ - std::unordered_map - video_callbacks_; - - /// Active reader threads for subscribed tracks. - std::unordered_map - active_readers_; - /// All tracks created by this bridge. The bridge retains a shared_ptr so /// it can force-release every track on disconnect() before the room is /// destroyed, preventing dangling @c participant_ pointers. diff --git a/bridge/src/bridge_audio_track.cpp b/bridge/src/bridge_audio_track.cpp index 66577145..1fa58cd2 100644 --- a/bridge/src/bridge_audio_track.cpp +++ b/bridge/src/bridge_audio_track.cpp @@ -23,7 +23,6 @@ #include "livekit/audio_source.h" #include "livekit/local_audio_track.h" #include "livekit/local_participant.h" -#include "livekit/local_track_publication.h" #include @@ -111,9 +110,9 @@ void BridgeAudioTrack::release() { released_ = true; // Unpublish the track from the room - if (participant_ && publication_) { + if (participant_ && track_ && track_->publication()) { try { - participant_->unpublishTrack(publication_->sid()); + participant_->unpublishTrack(track_->publication()->sid()); } catch (...) { // Best-effort cleanup; ignore errors during teardown LK_LOG_WARN("BridgeAudioTrack unpublishTrack error, continuing with " @@ -122,7 +121,6 @@ void BridgeAudioTrack::release() { } // Release SDK objects in reverse order - publication_.reset(); track_.reset(); source_.reset(); participant_ = nullptr; diff --git a/bridge/src/bridge_room_delegate.cpp b/bridge/src/bridge_room_delegate.cpp deleted file mode 100644 index 118627c6..00000000 --- a/bridge/src/bridge_room_delegate.cpp +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2025 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/// @file bridge_room_delegate.cpp -/// @brief Implementation of BridgeRoomDelegate event forwarding. - -#include "bridge_room_delegate.h" - -#include "livekit/remote_participant.h" -#include "livekit/remote_track_publication.h" -#include "livekit/track.h" -#include "livekit_bridge/livekit_bridge.h" - -namespace livekit_bridge { - -void BridgeRoomDelegate::onTrackSubscribed( - livekit::Room & /*room*/, const livekit::TrackSubscribedEvent &ev) { - if (!ev.track || !ev.participant || !ev.publication) { - return; - } - - const std::string identity = ev.participant->identity(); - const livekit::TrackSource source = ev.publication->source(); - - bridge_.onTrackSubscribed(identity, source, ev.track); -} - -void BridgeRoomDelegate::onTrackUnsubscribed( - livekit::Room & /*room*/, const livekit::TrackUnsubscribedEvent &ev) { - if (!ev.participant || !ev.publication) { - return; - } - - const std::string identity = ev.participant->identity(); - const livekit::TrackSource source = ev.publication->source(); - - bridge_.onTrackUnsubscribed(identity, source); -} - -} // namespace livekit_bridge diff --git a/bridge/src/bridge_room_delegate.h b/bridge/src/bridge_room_delegate.h deleted file mode 100644 index 8f5dd311..00000000 --- a/bridge/src/bridge_room_delegate.h +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2025 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/// @file bridge_room_delegate.h -/// @brief Internal RoomDelegate forwarding SDK events to LiveKitBridge. - -#pragma once - -#include "livekit/room_delegate.h" - -namespace livekit_bridge { - -class LiveKitBridge; - -/** - * Internal RoomDelegate that forwards SDK room events to the LiveKitBridge. - * - * Handles track subscribe/unsubscribe lifecycle. Not part of the public API, - * so its in src/ instead of include/. - */ -class BridgeRoomDelegate : public livekit::RoomDelegate { -public: - explicit BridgeRoomDelegate(LiveKitBridge &bridge) : bridge_(bridge) {} - - /// Forwards a track-subscribed event to LiveKitBridge::onTrackSubscribed(). - void onTrackSubscribed(livekit::Room &room, - const livekit::TrackSubscribedEvent &ev) override; - - /// Forwards a track-unsubscribed event to - /// LiveKitBridge::onTrackUnsubscribed(). - void onTrackUnsubscribed(livekit::Room &room, - const livekit::TrackUnsubscribedEvent &ev) override; - -private: - LiveKitBridge &bridge_; -}; - -} // namespace livekit_bridge diff --git a/bridge/src/bridge_video_track.cpp b/bridge/src/bridge_video_track.cpp index bb763422..7b8a6cc8 100644 --- a/bridge/src/bridge_video_track.cpp +++ b/bridge/src/bridge_video_track.cpp @@ -20,7 +20,6 @@ #include "livekit_bridge/bridge_video_track.h" #include "livekit/local_participant.h" -#include "livekit/local_track_publication.h" #include "livekit/local_video_track.h" #include "livekit/video_frame.h" #include "livekit/video_source.h" @@ -110,9 +109,9 @@ void BridgeVideoTrack::release() { released_ = true; // Unpublish the track from the room - if (participant_ && publication_) { + if (participant_ && track_ && track_->publication()) { try { - participant_->unpublishTrack(publication_->sid()); + participant_->unpublishTrack(track_->publication()->sid()); } catch (...) { // Best-effort cleanup; ignore errors during teardown LK_LOG_WARN("BridgeVideoTrack unpublishTrack error, continuing with " @@ -121,7 +120,6 @@ void BridgeVideoTrack::release() { } // Release SDK objects in reverse order - publication_.reset(); track_.reset(); source_.reset(); participant_ = nullptr; diff --git a/bridge/src/livekit_bridge.cpp b/bridge/src/livekit_bridge.cpp index 809aae6a..9f782904 100644 --- a/bridge/src/livekit_bridge.cpp +++ b/bridge/src/livekit_bridge.cpp @@ -18,13 +18,11 @@ /// @brief Implementation of the LiveKitBridge high-level API. #include "livekit_bridge/livekit_bridge.h" -#include "bridge_room_delegate.h" #include "livekit_bridge/rpc_constants.h" #include "rpc_controller.h" #include "livekit/audio_frame.h" #include "livekit/audio_source.h" -#include "livekit/audio_stream.h" #include "livekit/livekit.h" #include "livekit/local_audio_track.h" #include "livekit/local_participant.h" @@ -34,28 +32,12 @@ #include "livekit/track.h" #include "livekit/video_frame.h" #include "livekit/video_source.h" -#include "livekit/video_stream.h" #include #include namespace livekit_bridge { -// --------------------------------------------------------------- -// CallbackKey -// --------------------------------------------------------------- - -bool LiveKitBridge::CallbackKey::operator==(const CallbackKey &o) const { - return identity == o.identity && source == o.source; -} - -std::size_t -LiveKitBridge::CallbackKeyHash::operator()(const CallbackKey &k) const { - std::size_t h1 = std::hash{}(k.identity); - std::size_t h2 = std::hash{}(static_cast(k.source)); - return h1 ^ (h2 << 1); -} - // --------------------------------------------------------------- // Construction / Destruction // --------------------------------------------------------------- @@ -99,8 +81,8 @@ bool LiveKitBridge::connect(const std::string &url, const std::string &token, // ---- Phase 2: create room and connect without holding the lock ---- // This avoids blocking other threads during the network handshake and - // eliminates the risk of deadlock if the SDK delivers delegate callbacks - // synchronously during Connect(). + // eliminates the risk of deadlock if the SDK delivers callbacks synchronously + // during Connect(). auto room = std::make_unique(); assert(room != nullptr); @@ -111,19 +93,11 @@ bool LiveKitBridge::connect(const std::string &url, const std::string &token, return false; } - // ---- Phase 3: commit and attach delegate under lock ---- - // Setting the delegate here (after Connect) ensures that any queued - // onTrackSubscribed events are delivered only after - // room_/delegate_/connected_ are all in a consistent state. - - auto delegate = std::make_unique(*this); - assert(delegate != nullptr); - room->setDelegate(delegate.get()); + // ---- Phase 3: commit under lock ---- livekit::LocalParticipant *lp = nullptr; { std::lock_guard lock(mutex_); room_ = std::move(room); - delegate_ = std::move(delegate); connected_ = true; connecting_ = false; @@ -142,8 +116,6 @@ void LiveKitBridge::disconnect() { rpc_controller_->disable(); } - // Collect threads to join outside the lock to avoid deadlock. - std::vector threads_to_join; bool should_shutdown_sdk = false; { @@ -157,8 +129,6 @@ void LiveKitBridge::disconnect() { connected_ = false; connecting_ = false; - // Release all published tracks while the room/participant are still alive. - // This calls unpublishTrack() on each, ensuring participant_ is valid. for (auto &track : published_audio_tracks_) { track->release(); } @@ -168,29 +138,7 @@ void LiveKitBridge::disconnect() { published_audio_tracks_.clear(); published_video_tracks_.clear(); - // Close all streams (unblocks read loops) and collect threads - for (auto &[key, reader] : active_readers_) { - if (reader.audio_stream) { - reader.audio_stream->close(); - } - if (reader.video_stream) { - reader.video_stream->close(); - } - if (reader.thread.joinable()) { - threads_to_join.emplace_back(std::move(reader.thread)); - } - } - active_readers_.clear(); - - // Clear callback registrations - audio_callbacks_.clear(); - video_callbacks_.clear(); - - // Tear down the room - if (room_) { - room_->setDelegate(nullptr); - } - delegate_.reset(); + // Room destructor handles stopping all reader threads room_.reset(); if (sdk_initialized_) { @@ -199,14 +147,6 @@ void LiveKitBridge::disconnect() { } } - // Join threads outside the lock - for (auto &t : threads_to_join) { - if (t.joinable()) { - t.join(); - } - } - - // Shut down the SDK outside the lock (may block) if (should_shutdown_sdk) { livekit::shutdown(); } @@ -246,12 +186,13 @@ LiveKitBridge::createAudioTrack(const std::string &name, int sample_rate, auto lp = room_->localParticipant(); assert(lp != nullptr); - auto publication = lp->publishTrack(track, opts); + lp->publishTrack(track, opts); + auto publication = track->publication(); // 4. Wrap in handle and retain a reference auto bridge_track = std::shared_ptr(new BridgeAudioTrack( name, sample_rate, num_channels, std::move(audio_source), - std::move(track), std::move(publication), lp)); + std::move(track), publication, lp)); published_audio_tracks_.emplace_back(bridge_track); return bridge_track; } @@ -280,12 +221,13 @@ LiveKitBridge::createVideoTrack(const std::string &name, int width, int height, auto lp = room_->localParticipant(); assert(lp != nullptr); - auto publication = lp->publishTrack(track, opts); + lp->publishTrack(track, opts); + auto publication = track->publication(); // 4. Wrap in handle and retain a reference auto bridge_track = std::shared_ptr( new BridgeVideoTrack(name, width, height, std::move(video_source), - std::move(track), std::move(publication), lp)); + std::move(track), publication, lp)); published_video_tracks_.emplace_back(bridge_track); return bridge_track; } @@ -298,57 +240,42 @@ void LiveKitBridge::setOnAudioFrameCallback( const std::string &participant_identity, livekit::TrackSource source, AudioFrameCallback callback) { std::lock_guard lock(mutex_); - - CallbackKey key{participant_identity, source}; - audio_callbacks_[key] = std::move(callback); - - // If there is already an active reader for this key (e.g., track was - // subscribed before the callback was registered), we don't need to do - // anything special -- the next time onTrackSubscribed fires it will - // pick up the callback. However, since auto_subscribe is on, the track - // may have already been subscribed. We don't have a way to retroactively - // query subscribed tracks here, so the user should register callbacks - // before connecting or before the remote participant joins. In practice, - // the delegate fires onTrackSubscribed when the track arrives, so if we - // register the callback first (before the participant joins), it will - // be picked up. + if (!room_) { + LK_LOG_WARN("setOnAudioFrameCallback called before connect(); ignored"); + return; + } + room_->setOnAudioFrameCallback(participant_identity, source, + std::move(callback)); } void LiveKitBridge::setOnVideoFrameCallback( const std::string &participant_identity, livekit::TrackSource source, VideoFrameCallback callback) { std::lock_guard lock(mutex_); - - CallbackKey key{participant_identity, source}; - video_callbacks_[key] = std::move(callback); + if (!room_) { + LK_LOG_WARN("setOnVideoFrameCallback called before connect(); ignored"); + return; + } + room_->setOnVideoFrameCallback(participant_identity, source, + std::move(callback)); } void LiveKitBridge::clearOnAudioFrameCallback( const std::string &participant_identity, livekit::TrackSource source) { - std::thread thread_to_join; - { - std::lock_guard lock(mutex_); - CallbackKey key{participant_identity, source}; - audio_callbacks_.erase(key); - thread_to_join = extractReaderThread(key); - } - if (thread_to_join.joinable()) { - thread_to_join.join(); + std::lock_guard lock(mutex_); + if (!room_) { + return; } + room_->clearOnAudioFrameCallback(participant_identity, source); } void LiveKitBridge::clearOnVideoFrameCallback( const std::string &participant_identity, livekit::TrackSource source) { - std::thread thread_to_join; - { - std::lock_guard lock(mutex_); - CallbackKey key{participant_identity, source}; - video_callbacks_.erase(key); - thread_to_join = extractReaderThread(key); - } - if (thread_to_join.joinable()) { - thread_to_join.join(); + std::lock_guard lock(mutex_); + if (!room_) { + return; } + room_->clearOnVideoFrameCallback(participant_identity, source); } // --------------------------------------------------------------- @@ -494,159 +421,4 @@ void LiveKitBridge::executeTrackAction(const rpc::track_control::Action &action, "track not found: " + track_name); } -// --------------------------------------------------------------- -// Internal: track subscribe / unsubscribe from delegate -// --------------------------------------------------------------- - -void LiveKitBridge::onTrackSubscribed( - const std::string &participant_identity, livekit::TrackSource source, - const std::shared_ptr &track) { - std::thread old_thread; - { - std::lock_guard lock(mutex_); - - CallbackKey key{participant_identity, source}; - - if (track->kind() == livekit::TrackKind::KIND_AUDIO) { - auto it = audio_callbacks_.find(key); - if (it != audio_callbacks_.end()) { - old_thread = startAudioReader(key, track, it->second); - } - } else if (track->kind() == livekit::TrackKind::KIND_VIDEO) { - auto it = video_callbacks_.find(key); - if (it != video_callbacks_.end()) { - old_thread = startVideoReader(key, track, it->second); - } - } - } - // If this key already had a reader (e.g. track was re-subscribed), the old - // reader's stream was closed inside startAudioReader/startVideoReader. We - // join its thread here -- outside the lock -- to guarantee it has finished - // invoking the old callback before we return. - if (old_thread.joinable()) { - old_thread.join(); - } -} - -void LiveKitBridge::onTrackUnsubscribed(const std::string &participant_identity, - livekit::TrackSource source) { - std::thread thread_to_join; - { - std::lock_guard lock(mutex_); - CallbackKey key{participant_identity, source}; - thread_to_join = extractReaderThread(key); - } - if (thread_to_join.joinable()) { - thread_to_join.join(); - } -} - -// --------------------------------------------------------------- -// Internal: reader thread management -// --------------------------------------------------------------- - -std::thread LiveKitBridge::extractReaderThread(const CallbackKey &key) { - // Caller must hold mutex_. - // Closes the stream and extracts the thread for the caller to join. - auto it = active_readers_.find(key); - if (it == active_readers_.end()) { - return {}; - } - - auto &reader = it->second; - - // Close the stream to unblock the read() loop - if (reader.audio_stream) { - reader.audio_stream->close(); - } - if (reader.video_stream) { - reader.video_stream->close(); - } - - auto thread = std::move(reader.thread); - active_readers_.erase(it); - return thread; -} - -std::thread -LiveKitBridge::startAudioReader(const CallbackKey &key, - const std::shared_ptr &track, - AudioFrameCallback cb) { - // Caller must hold mutex_. - // Returns the old reader thread (if any) for the caller to join outside - // the lock. - auto old_thread = extractReaderThread(key); - - livekit::AudioStream::Options opts; - auto stream = livekit::AudioStream::fromTrack(track, opts); - if (!stream) { - LK_LOG_ERROR("Failed to create AudioStream for {}", key.identity); - return old_thread; - } - - auto stream_copy = stream; - - ActiveReader reader; - reader.audio_stream = std::move(stream); - reader.is_audio = true; - reader.thread = std::thread([stream_copy, cb]() { - livekit::AudioFrameEvent ev; - while (stream_copy->read(ev)) { - try { - cb(ev.frame); - } catch (const std::exception &e) { - LK_LOG_ERROR("Audio callback exception: {}", e.what()); - } - } - }); - - active_readers_[key] = std::move(reader); - if (active_readers_.size() > kMaxActiveReaders) { - LK_LOG_WARN("More than expected active readers. Need to evaluate how much " - "to expect/support."); - } - return old_thread; -} - -std::thread -LiveKitBridge::startVideoReader(const CallbackKey &key, - const std::shared_ptr &track, - VideoFrameCallback cb) { - // Caller must hold mutex_. - // Returns the old reader thread (if any) for the caller to join outside - // the lock. - auto old_thread = extractReaderThread(key); - - livekit::VideoStream::Options opts; - opts.format = livekit::VideoBufferType::RGBA; - auto stream = livekit::VideoStream::fromTrack(track, opts); - if (!stream) { - LK_LOG_ERROR("Failed to create VideoStream for {}", key.identity); - return old_thread; - } - - auto stream_copy = stream; - - ActiveReader reader; - reader.video_stream = std::move(stream); - reader.is_audio = false; - reader.thread = std::thread([stream_copy, cb]() { - livekit::VideoFrameEvent ev; - while (stream_copy->read(ev)) { - try { - cb(ev.frame, ev.timestamp_us); - } catch (const std::exception &e) { - LK_LOG_ERROR("Video callback exception: {}", e.what()); - } - } - }); - - active_readers_[key] = std::move(reader); - if (active_readers_.size() > kMaxActiveReaders) { - LK_LOG_WARN("More than expected active readers. Need to evaluate how much " - "to expect/support."); - } - return old_thread; -} - } // namespace livekit_bridge diff --git a/bridge/tests/test_callback_key.cpp b/bridge/tests/test_callback_key.cpp deleted file mode 100644 index 29974f6f..00000000 --- a/bridge/tests/test_callback_key.cpp +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright 2025 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/// @file test_callback_key.cpp -/// @brief Unit tests for LiveKitBridge::CallbackKey hash and equality. - -#include -#include - -#include - -#include - -namespace livekit_bridge { -namespace test { - -class CallbackKeyTest : public ::testing::Test { -protected: - // Type aliases for convenience -- these are private types in LiveKitBridge, - // accessible via the friend declaration. - using CallbackKey = LiveKitBridge::CallbackKey; - using CallbackKeyHash = LiveKitBridge::CallbackKeyHash; -}; - -TEST_F(CallbackKeyTest, EqualKeysCompareEqual) { - CallbackKey a{"alice", livekit::TrackSource::SOURCE_MICROPHONE}; - CallbackKey b{"alice", livekit::TrackSource::SOURCE_MICROPHONE}; - - EXPECT_TRUE(a == b) << "Identical keys should compare equal"; -} - -TEST_F(CallbackKeyTest, DifferentIdentityComparesUnequal) { - CallbackKey a{"alice", livekit::TrackSource::SOURCE_MICROPHONE}; - CallbackKey b{"bob", livekit::TrackSource::SOURCE_MICROPHONE}; - - EXPECT_FALSE(a == b) << "Keys with different identities should not be equal"; -} - -TEST_F(CallbackKeyTest, DifferentSourceComparesUnequal) { - CallbackKey a{"alice", livekit::TrackSource::SOURCE_MICROPHONE}; - CallbackKey b{"alice", livekit::TrackSource::SOURCE_CAMERA}; - - EXPECT_FALSE(a == b) << "Keys with different sources should not be equal"; -} - -TEST_F(CallbackKeyTest, EqualKeysProduceSameHash) { - CallbackKey a{"alice", livekit::TrackSource::SOURCE_MICROPHONE}; - CallbackKey b{"alice", livekit::TrackSource::SOURCE_MICROPHONE}; - CallbackKeyHash hasher; - - EXPECT_EQ(hasher(a), hasher(b)) - << "Equal keys must produce the same hash value"; -} - -TEST_F(CallbackKeyTest, DifferentKeysProduceDifferentHashes) { - CallbackKeyHash hasher; - - CallbackKey mic{"alice", livekit::TrackSource::SOURCE_MICROPHONE}; - CallbackKey cam{"alice", livekit::TrackSource::SOURCE_CAMERA}; - CallbackKey bob{"bob", livekit::TrackSource::SOURCE_MICROPHONE}; - - // While hash collisions are technically allowed, these simple cases - // should not collide with a reasonable hash function. - EXPECT_NE(hasher(mic), hasher(cam)) - << "Different sources should (likely) produce different hashes"; - EXPECT_NE(hasher(mic), hasher(bob)) - << "Different identities should (likely) produce different hashes"; -} - -TEST_F(CallbackKeyTest, WorksAsUnorderedMapKey) { - std::unordered_map map; - - CallbackKey key1{"alice", livekit::TrackSource::SOURCE_MICROPHONE}; - CallbackKey key2{"bob", livekit::TrackSource::SOURCE_CAMERA}; - CallbackKey key3{"alice", livekit::TrackSource::SOURCE_CAMERA}; - - // Insert - map[key1] = 1; - map[key2] = 2; - map[key3] = 3; - - EXPECT_EQ(map.size(), 3u) - << "Three distinct keys should produce three entries"; - - // Find - EXPECT_EQ(map[key1], 1); - EXPECT_EQ(map[key2], 2); - EXPECT_EQ(map[key3], 3); - - // Overwrite - map[key1] = 42; - EXPECT_EQ(map[key1], 42) << "Inserting with same key should overwrite"; - EXPECT_EQ(map.size(), 3u) << "Size should not change after overwrite"; - - // Erase - map.erase(key2); - EXPECT_EQ(map.size(), 2u); - EXPECT_EQ(map.count(key2), 0u) << "Erased key should not be found"; -} - -TEST_F(CallbackKeyTest, EmptyIdentityWorks) { - CallbackKey empty{"", livekit::TrackSource::SOURCE_UNKNOWN}; - CallbackKey also_empty{"", livekit::TrackSource::SOURCE_UNKNOWN}; - CallbackKeyHash hasher; - - EXPECT_TRUE(empty == also_empty); - EXPECT_EQ(hasher(empty), hasher(also_empty)); -} - -} // namespace test -} // namespace livekit_bridge diff --git a/bridge/tests/test_livekit_bridge.cpp b/bridge/tests/test_livekit_bridge.cpp index 38b1d7d3..43c8f6fb 100644 --- a/bridge/tests/test_livekit_bridge.cpp +++ b/bridge/tests/test_livekit_bridge.cpp @@ -94,10 +94,10 @@ TEST_F(LiveKitBridgeTest, CreateVideoTrackBeforeConnectThrows) { } // ============================================================================ -// Callback registration (pre-connection, pure map operations) +// Callback registration (pre-connection — warns but does not crash) // ============================================================================ -TEST_F(LiveKitBridgeTest, RegisterAndUnregisterAudioCallbackDoesNotCrash) { +TEST_F(LiveKitBridgeTest, SetAndClearAudioCallbackBeforeConnectDoesNotCrash) { LiveKitBridge bridge; EXPECT_NO_THROW({ @@ -107,11 +107,10 @@ TEST_F(LiveKitBridgeTest, RegisterAndUnregisterAudioCallbackDoesNotCrash) { bridge.clearOnAudioFrameCallback("remote-participant", livekit::TrackSource::SOURCE_MICROPHONE); - }) << "Registering and unregistering an audio callback should be safe " - "even without a connection"; + }) << "set/clear audio callback before connect should be safe (warns)"; } -TEST_F(LiveKitBridgeTest, RegisterAndUnregisterVideoCallbackDoesNotCrash) { +TEST_F(LiveKitBridgeTest, SetAndClearVideoCallbackBeforeConnectDoesNotCrash) { LiveKitBridge bridge; EXPECT_NO_THROW({ @@ -121,11 +120,10 @@ TEST_F(LiveKitBridgeTest, RegisterAndUnregisterVideoCallbackDoesNotCrash) { bridge.clearOnVideoFrameCallback("remote-participant", livekit::TrackSource::SOURCE_CAMERA); - }) << "Registering and unregistering a video callback should be safe " - "even without a connection"; + }) << "set/clear video callback before connect should be safe (warns)"; } -TEST_F(LiveKitBridgeTest, UnregisterNonExistentCallbackIsNoOp) { +TEST_F(LiveKitBridgeTest, ClearNonExistentCallbackIsNoOp) { LiveKitBridge bridge; EXPECT_NO_THROW({ @@ -133,52 +131,7 @@ TEST_F(LiveKitBridgeTest, UnregisterNonExistentCallbackIsNoOp) { livekit::TrackSource::SOURCE_MICROPHONE); bridge.clearOnVideoFrameCallback("nonexistent", livekit::TrackSource::SOURCE_CAMERA); - }) << "Unregistering a callback that was never registered should be a no-op"; -} - -TEST_F(LiveKitBridgeTest, MultipleRegistrationsSameKeyOverwrites) { - LiveKitBridge bridge; - - int call_count = 0; - - // Register a first callback - bridge.setOnAudioFrameCallback("alice", - livekit::TrackSource::SOURCE_MICROPHONE, - [](const livekit::AudioFrame &) {}); - - // Register a second callback for the same key -- should overwrite - bridge.setOnAudioFrameCallback( - "alice", livekit::TrackSource::SOURCE_MICROPHONE, - [&call_count](const livekit::AudioFrame &) { call_count++; }); - - // Unregister once should be enough (only one entry per key) - EXPECT_NO_THROW(bridge.clearOnAudioFrameCallback( - "alice", livekit::TrackSource::SOURCE_MICROPHONE)); -} - -TEST_F(LiveKitBridgeTest, RegisterCallbacksForMultipleParticipants) { - LiveKitBridge bridge; - - EXPECT_NO_THROW({ - bridge.setOnAudioFrameCallback("alice", - livekit::TrackSource::SOURCE_MICROPHONE, - [](const livekit::AudioFrame &) {}); - - bridge.setOnVideoFrameCallback( - "bob", livekit::TrackSource::SOURCE_CAMERA, - [](const livekit::VideoFrame &, std::int64_t) {}); - - bridge.setOnAudioFrameCallback( - "charlie", livekit::TrackSource::SOURCE_SCREENSHARE_AUDIO, - [](const livekit::AudioFrame &) {}); - }) << "Should be able to register callbacks for multiple participants"; - - // Cleanup - bridge.clearOnAudioFrameCallback("alice", - livekit::TrackSource::SOURCE_MICROPHONE); - bridge.clearOnVideoFrameCallback("bob", livekit::TrackSource::SOURCE_CAMERA); - bridge.clearOnAudioFrameCallback( - "charlie", livekit::TrackSource::SOURCE_SCREENSHARE_AUDIO); + }) << "Clearing a callback that was never registered should be a no-op"; } } // namespace test diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 2eb68be1..19a37781 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -56,7 +56,6 @@ set(EXAMPLES_ALL # Bridge examples (need livekit_bridge DLL/shared lib in addition to livekit_ffi) set(EXAMPLES_BRIDGE BridgeRobot - BridgeHuman BridgeMuteCaller BridgeMuteReceiver BridgeRpcCaller @@ -265,7 +264,7 @@ target_include_directories(BridgeHuman PRIVATE ${EXAMPLES_PRIVATE_INCLUDE_DIRS} ${EXAMPLES_COMMON_DIR} ) -target_link_libraries(BridgeHuman PRIVATE livekit_bridge spdlog::spdlog SDL3::SDL3) +target_link_libraries(BridgeHuman PRIVATE livekit spdlog::spdlog SDL3::SDL3) # --- bridge_rpc examples (headless custom RPC caller + receiver) --- diff --git a/examples/bridge_human_robot/human.cpp b/examples/bridge_human_robot/human.cpp index e323f76b..81989eb5 100644 --- a/examples/bridge_human_robot/human.cpp +++ b/examples/bridge_human_robot/human.cpp @@ -18,6 +18,11 @@ * Human example -- receives audio and video frames from a robot in a * LiveKit room and renders them using SDL3. * + * This example demonstrates the base SDK's convenience frame callback API + * (Room::setOnAudioFrameCallback / Room::setOnVideoFrameCallback) which + * eliminates the need for a RoomDelegate subclass, manual AudioStream/ + * VideoStream creation, and reader threads. + * * The robot publishes two video tracks and two audio tracks: * - "robot-cam" (SOURCE_CAMERA) -- webcam or placeholder * - "robot-sim-frame" (SOURCE_SCREENSHARE) -- simulated diagnostic @@ -44,9 +49,9 @@ */ #include "livekit/audio_frame.h" +#include "livekit/livekit.h" #include "livekit/track.h" #include "livekit/video_frame.h" -#include "livekit_bridge/livekit_bridge.h" #include "sdl_media.h" #include @@ -66,24 +71,21 @@ static std::atomic g_running{true}; static void handleSignal(int) { g_running.store(false); } // ---- Video source selection ---- -enum class VideoSource : int { Webcam = 0, SimFrame = 1 }; +enum class SelectedSource : int { Webcam = 0, SimFrame = 1 }; static std::atomic g_selected_source{ - static_cast(VideoSource::Webcam)}; + static_cast(SelectedSource::Webcam)}; // ---- Thread-safe video frame slot ---- -// renderFrame() writes the latest frame here; the main loop reads it. struct LatestVideoFrame { std::mutex mutex; std::vector data; int width = 0; int height = 0; - bool dirty = false; // true when a new frame has been written + bool dirty = false; }; static LatestVideoFrame g_latest_video; -/// Store a video frame for the main loop to render. -/// Called from bridge callbacks when their source is the active selection. static void renderFrame(const livekit::VideoFrame &frame) { const std::uint8_t *src = frame.data(); const std::size_t size = frame.dataSize(); @@ -132,7 +134,7 @@ int main(int argc, char *argv[]) { } } if (url.empty()) - url = positional[0], token = positional[1]; // fallback by position + url = positional[0], token = positional[1]; } else { const char *e = std::getenv("LIVEKIT_URL"); if (e) @@ -179,24 +181,24 @@ int main(int argc, char *argv[]) { return 1; } - // Texture for displaying video frames (lazily recreated on size change) SDL_Texture *texture = nullptr; int tex_width = 0; int tex_height = 0; // ----- SDL speaker for audio playback ----- - // We lazily initialize the DDLSpeakerSink on the first audio frame, - // so we know the sample rate and channel count. std::unique_ptr speaker; std::mutex speaker_mutex; - // ----- Connect to LiveKit ----- - livekit_bridge::LiveKitBridge bridge; + // ----- Connect to LiveKit using the base SDK ----- + livekit::initialize(); + + auto room = std::make_unique(); std::cout << "[human] Connecting to " << url << " ...\n"; livekit::RoomOptions options; options.auto_subscribe = true; - if (!bridge.connect(url, token, options)) { + if (!room->Connect(url, token, options)) { LK_LOG_ERROR("[human] Failed to connect."); + livekit::shutdown(); SDL_DestroyRenderer(renderer); SDL_DestroyWindow(window); SDL_Quit(); @@ -228,64 +230,58 @@ int main(int argc, char *argv[]) { speaker->enqueue(samples.data(), frame.samples_per_channel()); }; - // ----- set audio callbacks ----- - // Real mic (SOURCE_MICROPHONE) -- plays only when 'w' is selected - bridge.setOnAudioFrameCallback( + // ----- Set audio callbacks using Room::setOnAudioFrameCallback ----- + room->setOnAudioFrameCallback( "robot", livekit::TrackSource::SOURCE_MICROPHONE, [playAudio, no_audio](const livekit::AudioFrame &frame) { g_audio_frames.fetch_add(1, std::memory_order_relaxed); if (!no_audio && g_selected_source.load(std::memory_order_relaxed) == - static_cast(VideoSource::Webcam)) { + static_cast(SelectedSource::Webcam)) { playAudio(frame); } }); - // Sim audio / siren (SOURCE_SCREENSHARE_AUDIO) -- plays only when 's' is - // selected - bridge.setOnAudioFrameCallback( + room->setOnAudioFrameCallback( "robot", livekit::TrackSource::SOURCE_SCREENSHARE_AUDIO, [playAudio, no_audio](const livekit::AudioFrame &frame) { g_audio_frames.fetch_add(1, std::memory_order_relaxed); if (!no_audio && g_selected_source.load(std::memory_order_relaxed) == - static_cast(VideoSource::SimFrame)) { + static_cast(SelectedSource::SimFrame)) { playAudio(frame); } }); - // ----- set video callbacks ----- - // Webcam feed (SOURCE_CAMERA) -- renders only when 'w' is selected - bridge.setOnVideoFrameCallback( + // ----- Set video callbacks using Room::setOnVideoFrameCallback ----- + room->setOnVideoFrameCallback( "robot", livekit::TrackSource::SOURCE_CAMERA, [](const livekit::VideoFrame &frame, std::int64_t /*timestamp_us*/) { g_video_frames.fetch_add(1, std::memory_order_relaxed); if (g_selected_source.load(std::memory_order_relaxed) == - static_cast(VideoSource::Webcam)) { + static_cast(SelectedSource::Webcam)) { renderFrame(frame); } }); - // Sim frame feed (SOURCE_SCREENSHARE) -- renders only when 's' is selected - bridge.setOnVideoFrameCallback( + room->setOnVideoFrameCallback( "robot", livekit::TrackSource::SOURCE_SCREENSHARE, [](const livekit::VideoFrame &frame, std::int64_t /*timestamp_us*/) { g_video_frames.fetch_add(1, std::memory_order_relaxed); if (g_selected_source.load(std::memory_order_relaxed) == - static_cast(VideoSource::SimFrame)) { + static_cast(SelectedSource::SimFrame)) { renderFrame(frame); } }); - // ----- Stdin input thread (for switching when the SDL window is not focused) - // ----- + // ----- Stdin input thread ----- std::thread input_thread([&]() { std::string line; while (g_running.load() && std::getline(std::cin, line)) { if (line == "w" || line == "W") { - g_selected_source.store(static_cast(VideoSource::Webcam), + g_selected_source.store(static_cast(SelectedSource::Webcam), std::memory_order_relaxed); std::cout << "[human] Switched to webcam + mic.\n"; } else if (line == "s" || line == "S") { - g_selected_source.store(static_cast(VideoSource::SimFrame), + g_selected_source.store(static_cast(SelectedSource::SimFrame), std::memory_order_relaxed); std::cout << "[human] Switched to sim frame + siren.\n"; } @@ -300,30 +296,26 @@ int main(int argc, char *argv[]) { auto last_report = std::chrono::steady_clock::now(); - // Reused across iterations so the swap gives the producer a pre-allocated - // buffer back, avoiding repeated allocations in steady state. std::vector local_pixels; while (g_running.load()) { - // Pump SDL events (including keyboard input for source selection) SDL_Event ev; while (SDL_PollEvent(&ev)) { if (ev.type == SDL_EVENT_QUIT) { g_running.store(false); } else if (ev.type == SDL_EVENT_KEY_DOWN) { if (ev.key.key == SDLK_W) { - g_selected_source.store(static_cast(VideoSource::Webcam), + g_selected_source.store(static_cast(SelectedSource::Webcam), std::memory_order_relaxed); std::cout << "[human] Switched to webcam + mic.\n"; } else if (ev.key.key == SDLK_S) { - g_selected_source.store(static_cast(VideoSource::SimFrame), + g_selected_source.store(static_cast(SelectedSource::SimFrame), std::memory_order_relaxed); std::cout << "[human] Switched to sim frame + siren.\n"; } } } - // Grab the latest video frame under a minimal lock, then render outside it. int fw = 0, fh = 0; bool have_frame = false; { @@ -339,7 +331,6 @@ int main(int argc, char *argv[]) { } if (have_frame) { - // Recreate texture if size changed if (fw != tex_width || fh != tex_height) { if (texture) SDL_DestroyTexture(texture); @@ -352,7 +343,6 @@ int main(int argc, char *argv[]) { tex_height = fh; } - // Upload pixels to texture if (texture) { void *pixels = nullptr; int pitch = 0; @@ -367,7 +357,6 @@ int main(int argc, char *argv[]) { } } - // Render SDL_SetRenderDrawColor(renderer, 0, 0, 0, 255); SDL_RenderClear(renderer); if (texture) { @@ -375,13 +364,12 @@ int main(int argc, char *argv[]) { } SDL_RenderPresent(renderer); - // Periodic status auto now = std::chrono::steady_clock::now(); if (now - last_report >= std::chrono::seconds(5)) { last_report = now; const char *src_name = g_selected_source.load(std::memory_order_relaxed) == - static_cast(VideoSource::Webcam) + static_cast(SelectedSource::Webcam) ? "webcam" : "sim_frame"; std::cout << "[human] Status: " << g_audio_frames.load() @@ -389,7 +377,6 @@ int main(int argc, char *argv[]) { << " video frames received (showing: " << src_name << ").\n"; } - // ~60fps render loop SDL_Delay(16); } @@ -398,12 +385,13 @@ int main(int argc, char *argv[]) { std::cout << "[human] Total received: " << g_audio_frames.load() << " audio frames, " << g_video_frames.load() << " video frames.\n"; - // The input thread blocks on std::getline; detach it since there is no - // portable way to interrupt blocking stdin reads. if (input_thread.joinable()) input_thread.detach(); - bridge.disconnect(); + // Room destructor calls stopAllReaders() which closes streams and joins + // reader threads, then tears down FFI state. + room.reset(); + livekit::shutdown(); { std::lock_guard lock(speaker_mutex); diff --git a/examples/simple_room/main.cpp b/examples/simple_room/main.cpp index 4b2e2e16..bd7c6abc 100644 --- a/examples/simple_room/main.cpp +++ b/examples/simple_room/main.cpp @@ -332,11 +332,9 @@ int main(int argc, char *argv[]) { audioOpts.source = TrackSource::SOURCE_MICROPHONE; audioOpts.dtx = false; audioOpts.simulcast = false; - std::shared_ptr audioPub; try { - // publishTrack takes std::shared_ptr, LocalAudioTrack derives from - // Track - audioPub = room->localParticipant()->publishTrack(audioTrack, audioOpts); + room->localParticipant()->publishTrack(audioTrack, audioOpts); + const auto audioPub = audioTrack->publication(); std::cout << "Published track:\n" << " SID: " << audioPub->sid() << "\n" @@ -354,17 +352,18 @@ int main(int argc, char *argv[]) { // Setup Video Source / Track auto videoSource = std::make_shared(1280, 720); - std::shared_ptr videoTrack = - LocalVideoTrack::createLocalVideoTrack("cam", videoSource); + auto videoTrack = LocalVideoTrack::createLocalVideoTrack("cam", videoSource); + TrackPublishOptions videoOpts; videoOpts.source = TrackSource::SOURCE_CAMERA; videoOpts.dtx = false; videoOpts.simulcast = true; - std::shared_ptr videoPub; try { // publishTrack takes std::shared_ptr, LocalAudioTrack derives from // Track - videoPub = room->localParticipant()->publishTrack(videoTrack, videoOpts); + room->localParticipant()->publishTrack(videoTrack, videoOpts); + + const auto videoPub = videoTrack->publication(); std::cout << "Published track:\n" << " SID: " << videoPub->sid() << "\n" @@ -400,11 +399,14 @@ int main(int argc, char *argv[]) { // Must be cleaned up before FfiClient::instance().shutdown(); room->setDelegate(nullptr); - // Clean up the audio track publishment - room->localParticipant()->unpublishTrack(audioPub->sid()); - - // Clean up the video track publishment - room->localParticipant()->unpublishTrack(videoPub->sid()); + if (audioTrack->publication()) { + room->localParticipant()->unpublishTrack(audioTrack->publication()->sid()); + } + if (videoTrack->publication()) { + room->localParticipant()->unpublishTrack(videoTrack->publication()->sid()); + } + audioTrack.reset(); + videoTrack.reset(); room.reset(); diff --git a/include/livekit/local_audio_track.h b/include/livekit/local_audio_track.h index 1d99c0a5..9c46819a 100644 --- a/include/livekit/local_audio_track.h +++ b/include/livekit/local_audio_track.h @@ -16,6 +16,8 @@ #pragma once +#include "audio_frame.h" +#include "local_track_publication.h" #include "track.h" #include #include @@ -55,6 +57,8 @@ class LocalAudioTrack : public Track { /// @param name Human-readable name for the track. This may appear to /// remote participants and in analytics/debug logs. /// @param source The audio source that produces PCM frames for this track. + /// The caller retains ownership and should use this source + /// directly for frame capture. /// /// @return A shared pointer to the newly constructed `LocalAudioTrack`. static std::shared_ptr @@ -74,8 +78,24 @@ class LocalAudioTrack : public Track { /// including its SID and name. Useful for debugging and logging. std::string to_string() const; + /// Returns the publication that owns this track, or nullptr if the track is + /// not published. + std::shared_ptr publication() const noexcept { + return local_publication_; + } + + /// Sets the publication that owns this track. + void setPublication(const std::shared_ptr + &publication) noexcept override { + local_publication_ = std::move(publication); + } + private: explicit LocalAudioTrack(FfiHandle handle, const proto::OwnedTrack &track); + + /// The publication that owns this track. This is a nullptr until the track + /// is published, and then points to the publication that owns this track. + std::shared_ptr local_publication_; }; } // namespace livekit \ No newline at end of file diff --git a/include/livekit/local_participant.h b/include/livekit/local_participant.h index 3d0954c6..edd7c945 100644 --- a/include/livekit/local_participant.h +++ b/include/livekit/local_participant.h @@ -17,6 +17,8 @@ #pragma once #include "livekit/ffi_handle.h" +#include "livekit/local_audio_track.h" +#include "livekit/local_video_track.h" #include "livekit/participant.h" #include "livekit/room_event_types.h" #include "livekit/rpc_error.h" @@ -55,6 +57,7 @@ class LocalParticipant : public Participant { public: using PublicationMap = std::unordered_map>; + using TrackMap = std::unordered_map>; /** * Type of callback used to handle incoming RPC method invocations. @@ -74,10 +77,13 @@ class LocalParticipant : public Participant { std::unordered_map attributes, ParticipantKind kind, DisconnectReason reason); - /// Track publications associated with this participant, keyed by track SID. - const PublicationMap &trackPublications() const noexcept { - return track_publications_; - } + /** + * Track publications for this participant, keyed by publication SID. + * + * Built on each call from published local tracks (see \ref publishTrack). + * Expired track handles are removed from the internal map while building. + */ + PublicationMap trackPublications() const; /** * Publish arbitrary data to the room. @@ -124,9 +130,32 @@ class LocalParticipant : public Participant { * * Throws std::runtime_error on error (e.g. publish failure). */ - std::shared_ptr - publishTrack(const std::shared_ptr &track, - const TrackPublishOptions &options); + void publishTrack(const std::shared_ptr &track, + const TrackPublishOptions &options); + + /** + * Create a \ref LocalVideoTrack backed by the given \ref VideoSource, + * publish it, and return the track. + * + * The caller retains ownership of \p source and should use it directly + * for frame capture on the video thread. + */ + std::shared_ptr + publishVideoTrack(const std::string &name, + const std::shared_ptr &source, + TrackSource track_source); + + /** + * Create a \ref LocalAudioTrack backed by the given \ref AudioSource, + * publish it, and return the track. + * + * The caller retains ownership of \p source and should use it directly + * for frame capture on the audio thread. + */ + std::shared_ptr + publishAudioTrack(const std::string &name, + const std::shared_ptr &source, + TrackSource track_source); /** * Unpublish a track from the room by SID. @@ -211,7 +240,10 @@ class LocalParticipant : public Participant { friend class Room; private: - PublicationMap track_publications_; + /// Publication SID → local track (\ref unpublishTrack clears the track’s + /// cached publication). \c mutable so \ref trackPublications() const can + /// prune expired \c weak_ptr entries. + mutable TrackMap published_tracks_by_sid_; std::unordered_map rpc_handlers_; // Shared state for RPC invocation tracking. Using shared_ptr so the state diff --git a/include/livekit/local_video_track.h b/include/livekit/local_video_track.h index 2e0e22fe..a431740c 100644 --- a/include/livekit/local_video_track.h +++ b/include/livekit/local_video_track.h @@ -16,7 +16,9 @@ #pragma once +#include "local_track_publication.h" #include "track.h" + #include #include @@ -37,9 +39,10 @@ class VideoSource; * * Typical usage: * - * auto source = VideoSource::create(...); + * auto source = std::make_shared(1280, 720); * auto track = LocalVideoTrack::createLocalVideoTrack("cam", source); * room->localParticipant()->publishTrack(track); + * // Capture frames on the video thread via `source`, not via the track. * * Muting a local video track stops transmitting video to the room, but * the underlying source may continue capturing depending on platform @@ -55,6 +58,8 @@ class LocalVideoTrack : public Track { /// @param name Human-readable name for the track. This may appear to /// remote participants and in analytics/debug logs. /// @param source The video source that produces video frames for this track. + /// The caller retains ownership and should use this source + /// directly for frame capture. /// /// @return A shared pointer to the newly constructed `LocalVideoTrack`. static std::shared_ptr @@ -74,8 +79,24 @@ class LocalVideoTrack : public Track { /// including its SID and name. Useful for debugging and logging. std::string to_string() const; + /// Returns the publication that owns this track, or nullptr if the track is + /// not published. + std::shared_ptr publication() const noexcept { + return local_publication_; + } + + /// Sets the publication that owns this track. + void setPublication(const std::shared_ptr + &publication) noexcept override { + local_publication_ = std::move(publication); + } + private: explicit LocalVideoTrack(FfiHandle handle, const proto::OwnedTrack &track); + + /// The publication that owns this track. This is a nullptr until the track + /// is published, and then points to the publication that owns this track. + std::shared_ptr local_publication_; }; } // namespace livekit \ No newline at end of file diff --git a/include/livekit/room.h b/include/livekit/room.h index b0375840..d808ecd4 100644 --- a/include/livekit/room.h +++ b/include/livekit/room.h @@ -21,6 +21,9 @@ #include "livekit/e2ee.h" #include "livekit/ffi_handle.h" #include "livekit/room_event_types.h" +#include "livekit/subscription_thread_dispatcher.h" + +#include #include #include @@ -233,6 +236,66 @@ class Room { */ E2EEManager *e2eeManager() const; + // --------------------------------------------------------------- + // Frame callbacks + // --------------------------------------------------------------- + + /** + * Set a callback for audio frames from a specific remote participant and + * track source. + * + * A dedicated reader thread is spawned for each (participant, source) pair + * when the track is subscribed. If the track is already subscribed, the + * reader starts immediately. If not, it starts when the track arrives. + * + * Only one callback may exist per (participant, source) pair. Re-calling + * with the same pair replaces the previous callback. + * + * @param participant_identity Identity of the remote participant. + * @param source Track source (e.g. SOURCE_MICROPHONE). + * @param callback Function invoked per audio frame. + * @param opts AudioStream options (capacity, noise + * cancellation). + */ + void setOnAudioFrameCallback(const std::string &participant_identity, + TrackSource source, AudioFrameCallback callback, + AudioStream::Options opts = {}); + + /** + * Set a callback for video frames from a specific remote participant and + * track source. + * + * @see setOnAudioFrameCallback for threading and lifecycle semantics. + * + * @param participant_identity Identity of the remote participant. + * @param source Track source (e.g. SOURCE_CAMERA). + * @param callback Function invoked per video frame. + * @param opts VideoStream options (capacity, pixel format). + */ + void setOnVideoFrameCallback(const std::string &participant_identity, + TrackSource source, VideoFrameCallback callback, + VideoStream::Options opts = {}); + + /** + * Clear the audio frame callback for a specific (participant, source) pair. + * Stops and joins any active reader thread. + * No-op if no callback is registered for this key. + * @param participant_identity Identity of the remote participant. + * @param source Track source (e.g. SOURCE_MICROPHONE). + */ + void clearOnAudioFrameCallback(const std::string &participant_identity, + TrackSource source); + + /** + * Clear the video frame callback for a specific (participant, source) pair. + * Stops and joins any active reader thread. + * No-op if no callback is registered for this key. + * @param participant_identity Identity of the remote participant. + * @param source Track source (e.g. SOURCE_CAMERA). + */ + void clearOnVideoFrameCallback(const std::string &participant_identity, + TrackSource source); + private: mutable std::mutex lock_; ConnectionState connection_state_ = ConnectionState::Disconnected; @@ -251,6 +314,7 @@ class Room { byte_stream_readers_; // E2EE std::unique_ptr e2ee_manager_; + std::shared_ptr subscription_thread_dispatcher_; // FfiClient listener ID (0 means no listener registered) int listener_id_{0}; diff --git a/include/livekit/subscription_thread_dispatcher.h b/include/livekit/subscription_thread_dispatcher.h new file mode 100644 index 00000000..3e843541 --- /dev/null +++ b/include/livekit/subscription_thread_dispatcher.h @@ -0,0 +1,267 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIVEKIT_SUBSCRIPTION_THREAD_DISPATCHER_H +#define LIVEKIT_SUBSCRIPTION_THREAD_DISPATCHER_H + +#include "livekit/audio_stream.h" +#include "livekit/video_stream.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace livekit { + +class AudioFrame; +class Track; +class VideoFrame; + +/// Callback type for incoming audio frames. +/// Invoked on a dedicated reader thread per (participant, source) pair. +using AudioFrameCallback = std::function; + +/// Callback type for incoming video frames. +/// Invoked on a dedicated reader thread per (participant, source) pair. +using VideoFrameCallback = + std::function; + +/** + * Owns subscription callback registration and per-subscription reader threads. + * + * `SubscriptionThreadDispatcher` is the low-level companion to \ref Room's + * remote track subscription flow. `Room` forwards user-facing callback + * registration requests here, and then calls \ref handleTrackSubscribed and + * \ref handleTrackUnsubscribed as room events arrive. + * + * For each registered `(participant identity, TrackSource)` pair, this class + * may create a dedicated \ref AudioStream or \ref VideoStream and a matching + * reader thread. That thread blocks on stream reads and invokes the + * registered callback with decoded frames. + * + * This type is intentionally independent from \ref RoomDelegate. High-level + * room events such as `RoomDelegate::onTrackSubscribed()` remain in \ref Room, + * while this dispatcher focuses only on callback registration, stream + * ownership, and reader-thread lifecycle. + * + * The design keeps track-type-specific startup isolated so additional track + * kinds can be added later without pushing more thread state back into + * \ref Room. + */ +class SubscriptionThreadDispatcher { +public: + /// Constructs an empty dispatcher with no registered callbacks or readers. + SubscriptionThreadDispatcher(); + + /// Stops all active readers and clears all registered callbacks. + ~SubscriptionThreadDispatcher(); + + /** + * Register or replace an audio frame callback for a remote subscription. + * + * The callback is keyed by remote participant identity plus \p source. + * If the matching remote audio track is already subscribed, \ref Room may + * immediately call \ref handleTrackSubscribed to start a reader. + * + * @param participant_identity Identity of the remote participant. + * @param source Track source to match. + * @param callback Function invoked for each decoded audio frame. + * @param opts Options used when creating the backing + * \ref AudioStream. + */ + void setOnAudioFrameCallback(const std::string &participant_identity, + TrackSource source, AudioFrameCallback callback, + AudioStream::Options opts = {}); + + /** + * Register or replace a video frame callback for a remote subscription. + * + * The callback is keyed by remote participant identity plus \p source. + * If the matching remote video track is already subscribed, \ref Room may + * immediately call \ref handleTrackSubscribed to start a reader. + * + * @param participant_identity Identity of the remote participant. + * @param source Track source to match. + * @param callback Function invoked for each decoded video frame. + * @param opts Options used when creating the backing + * \ref VideoStream. + */ + void setOnVideoFrameCallback(const std::string &participant_identity, + TrackSource source, VideoFrameCallback callback, + VideoStream::Options opts = {}); + + /** + * Remove an audio callback registration and stop any active reader. + * + * If an audio reader thread is active for the given key, its stream is + * closed and the thread is joined before this call returns. + * + * @param participant_identity Identity of the remote participant. + * @param source Track source to clear. + */ + void clearOnAudioFrameCallback(const std::string &participant_identity, + TrackSource source); + + /** + * Remove a video callback registration and stop any active reader. + * + * If a video reader thread is active for the given key, its stream is + * closed and the thread is joined before this call returns. + * + * @param participant_identity Identity of the remote participant. + * @param source Track source to clear. + */ + void clearOnVideoFrameCallback(const std::string &participant_identity, + TrackSource source); + + /** + * Start or restart reader dispatch for a newly subscribed remote track. + * + * \ref Room calls this after it has processed a track-subscription event and + * updated its publication state. If a matching callback registration exists, + * the dispatcher creates the appropriate stream type and launches a reader + * thread for the `(participant, source)` key. + * + * If no matching callback is registered, this is a no-op. + * + * @param participant_identity Identity of the remote participant. + * @param source Track source associated with the subscription. + * @param track Subscribed remote track to read from. + */ + void handleTrackSubscribed(const std::string &participant_identity, + TrackSource source, + const std::shared_ptr &track); + + /** + * Stop reader dispatch for an unsubscribed remote track. + * + * \ref Room calls this when a remote track is unsubscribed. Any active + * reader stream for the given `(participant, source)` key is closed and its + * thread is joined. Callback registration is preserved so future + * re-subscription can start dispatch again automatically. + * + * @param participant_identity Identity of the remote participant. + * @param source Track source associated with the subscription. + */ + void handleTrackUnsubscribed(const std::string &participant_identity, + TrackSource source); + + /** + * Stop all readers and clear all callback registrations. + * + * This is used during room teardown or EOS handling to ensure no reader + * thread survives beyond the lifetime of the owning \ref Room. + */ + void stopAll(); + +private: + friend class SubscriptionThreadDispatcherTest; + + /// Compound lookup key for a remote participant identity and track source. + struct CallbackKey { + std::string participant_identity; + TrackSource source; + + bool operator==(const CallbackKey &o) const { + return participant_identity == o.participant_identity && + source == o.source; + } + }; + + /// Hash function for \ref CallbackKey so it can be used in unordered maps. + struct CallbackKeyHash { + std::size_t operator()(const CallbackKey &k) const { + auto h1 = std::hash{}(k.participant_identity); + auto h2 = std::hash{}(static_cast(k.source)); + return h1 ^ (h2 << 1); + } + }; + + /// Active read-side resources for one subscription dispatch slot. + struct ActiveReader { + std::shared_ptr audio_stream; + std::shared_ptr video_stream; + std::thread thread; + }; + + /// Stored audio callback registration plus stream-construction options. + struct RegisteredAudioCallback { + AudioFrameCallback callback; + AudioStream::Options options; + }; + + /// Stored video callback registration plus stream-construction options. + struct RegisteredVideoCallback { + VideoFrameCallback callback; + VideoStream::Options options; + }; + + /// Remove and close the active reader for \p key, returning its thread. + /// + /// Must be called with \ref lock_ held. The returned thread, if joinable, + /// must be joined after releasing the lock. + std::thread extractReaderThreadLocked(const CallbackKey &key); + + /// Select the appropriate reader startup path for \p track. + /// + /// Must be called with \ref lock_ held. + std::thread startReaderLocked(const CallbackKey &key, + const std::shared_ptr &track); + + /// Start an audio reader thread for \p key using \p track. + /// + /// Must be called with \ref lock_ held. Any previous reader for the same key + /// is extracted and returned to the caller for joining outside the lock. + std::thread startAudioReaderLocked(const CallbackKey &key, + const std::shared_ptr &track, + AudioFrameCallback cb, + const AudioStream::Options &opts); + + /// Start a video reader thread for \p key using \p track. + /// + /// Must be called with \ref lock_ held. Any previous reader for the same key + /// is extracted and returned to the caller for joining outside the lock. + std::thread startVideoReaderLocked(const CallbackKey &key, + const std::shared_ptr &track, + VideoFrameCallback cb, + const VideoStream::Options &opts); + + /// Protects callback registration maps and active reader state. + mutable std::mutex lock_; + + /// Registered audio frame callbacks keyed by `(participant, source)`. + std::unordered_map + audio_callbacks_; + + /// Registered video frame callbacks keyed by `(participant, source)`. + std::unordered_map + video_callbacks_; + + /// Active stream/thread state keyed by `(participant, source)`. + std::unordered_map + active_readers_; + + /// Hard limit on concurrently active per-subscription reader threads. + static constexpr int kMaxActiveReaders = 20; +}; + +} // namespace livekit + +#endif /* LIVEKIT_SUBSCRIPTION_THREAD_DISPATCHER_H */ diff --git a/include/livekit/track.h b/include/livekit/track.h index 850e359b..7630f6ec 100644 --- a/include/livekit/track.h +++ b/include/livekit/track.h @@ -29,6 +29,8 @@ namespace livekit { +class LocalTrackPublication; + enum class TrackKind { KIND_UNKNOWN = 0, KIND_AUDIO = 1, @@ -94,6 +96,13 @@ class Track { // Async get stats std::future> getStats() const; + /// After publishing a local track, associates the \ref LocalTrackPublication + /// with this track. Default implementation is a no-op (e.g. remote tracks). + virtual void setPublication( + const std::shared_ptr &publication) noexcept { + (void)publication; + } + // Internal updates (called by Room) void setStreamState(StreamState s) noexcept { state_ = s; } void setMuted(bool m) noexcept { muted_ = m; } diff --git a/src/local_participant.cpp b/src/local_participant.cpp index c908cc9e..8aea35ff 100644 --- a/src/local_participant.cpp +++ b/src/local_participant.cpp @@ -17,7 +17,9 @@ #include "livekit/local_participant.h" #include "livekit/ffi_handle.h" +#include "livekit/local_audio_track.h" #include "livekit/local_track_publication.h" +#include "livekit/local_video_track.h" #include "livekit/room_delegate.h" #include "livekit/track.h" @@ -32,6 +34,24 @@ #include #include +namespace { + +std::shared_ptr +localTrackPublication(const std::shared_ptr &t) { + if (!t) { + return nullptr; + } + if (auto v = std::dynamic_pointer_cast(t)) { + return v->publication(); + } + if (auto a = std::dynamic_pointer_cast(t)) { + return a->publication(); + } + return nullptr; +} + +} // namespace + namespace livekit { using proto::FfiRequest; @@ -167,9 +187,8 @@ void LocalParticipant::setTrackSubscriptionPermissions( // Track publish / unpublish // ---------------------------------------------------------------------------- -std::shared_ptr -LocalParticipant::publishTrack(const std::shared_ptr &track, - const TrackPublishOptions &options) { +void LocalParticipant::publishTrack(const std::shared_ptr &track, + const TrackPublishOptions &options) { if (!track) { throw std::invalid_argument( "LocalParticipant::publishTrack: track is null"); @@ -196,11 +215,32 @@ LocalParticipant::publishTrack(const std::shared_ptr &track, // Construct a LocalTrackPublication from the proto publication. auto publication = std::make_shared(owned_pub); - // Cache in local map by track SID. const std::string sid = publication->sid(); - track_publications_[sid] = publication; + published_tracks_by_sid_[sid] = std::weak_ptr(track); + + track->setPublication(publication); +} + +std::shared_ptr +LocalParticipant::publishVideoTrack(const std::string &name, + const std::shared_ptr &source, + TrackSource track_source) { + auto track = LocalVideoTrack::createLocalVideoTrack(name, source); + TrackPublishOptions opts; + opts.source = track_source; + publishTrack(track, opts); + return track; +} - return publication; +std::shared_ptr +LocalParticipant::publishAudioTrack(const std::string &name, + const std::shared_ptr &source, + TrackSource track_source) { + auto track = LocalAudioTrack::createLocalAudioTrack(name, source); + TrackPublishOptions opts; + opts.source = track_source; + publishTrack(track, opts); + return track; } void LocalParticipant::unpublishTrack(const std::string &track_sid) { @@ -220,7 +260,30 @@ void LocalParticipant::unpublishTrack(const std::string &track_sid) { fut.get(); - track_publications_.erase(track_sid); + if (auto it = published_tracks_by_sid_.find(track_sid); + it != published_tracks_by_sid_.end()) { + if (auto t = it->second.lock()) { + t->setPublication(nullptr); + } + published_tracks_by_sid_.erase(it); + } +} + +LocalParticipant::PublicationMap LocalParticipant::trackPublications() const { + PublicationMap out; + for (auto it = published_tracks_by_sid_.begin(); + it != published_tracks_by_sid_.end();) { + auto t = it->second.lock(); + if (!t) { + it = published_tracks_by_sid_.erase(it); + continue; + } + if (auto pub = localTrackPublication(t)) { + out.emplace(it->first, std::move(pub)); + } + ++it; + } + return out; } std::string LocalParticipant::performRpc( @@ -389,11 +452,20 @@ void LocalParticipant::handleRpcMethodInvocation( std::shared_ptr LocalParticipant::findTrackPublication(const std::string &sid) const { - auto it = track_publications_.find(sid); - if (it == track_publications_.end()) { + auto it = published_tracks_by_sid_.find(sid); + if (it == published_tracks_by_sid_.end()) { + return nullptr; + } + auto t = it->second.lock(); + if (!t) { + published_tracks_by_sid_.erase(it); + return nullptr; + } + auto pub = localTrackPublication(t); + if (!pub) { return nullptr; } - return std::static_pointer_cast(it->second); + return std::static_pointer_cast(pub); } } // namespace livekit diff --git a/src/room.cpp b/src/room.cpp index 6e464079..ab7ab286 100644 --- a/src/room.cpp +++ b/src/room.cpp @@ -26,7 +26,6 @@ #include "livekit/remote_video_track.h" #include "livekit/room_delegate.h" #include "livekit/room_event_types.h" -#include "livekit/video_stream.h" #include "ffi.pb.h" #include "ffi_client.h" @@ -65,9 +64,15 @@ createRemoteParticipant(const proto::OwnedParticipant &owned) { } } // namespace -Room::Room() {} +Room::Room() + : subscription_thread_dispatcher_( + std::make_unique()) {} Room::~Room() { + if (subscription_thread_dispatcher_) { + subscription_thread_dispatcher_->stopAll(); + } + int listener_to_remove = 0; std::unique_ptr local_participant_to_cleanup; { @@ -254,11 +259,50 @@ void Room::unregisterByteStreamHandler(const std::string &topic) { byte_stream_handlers_.erase(topic); } +// ------------------------------------------------------------------- +// Frame callback registration +// ------------------------------------------------------------------- + +void Room::setOnAudioFrameCallback(const std::string &participant_identity, + TrackSource source, + AudioFrameCallback callback, + AudioStream::Options opts) { + if (subscription_thread_dispatcher_) { + subscription_thread_dispatcher_->setOnAudioFrameCallback( + participant_identity, source, std::move(callback), std::move(opts)); + } +} + +void Room::setOnVideoFrameCallback(const std::string &participant_identity, + TrackSource source, + VideoFrameCallback callback, + VideoStream::Options opts) { + if (subscription_thread_dispatcher_) { + subscription_thread_dispatcher_->setOnVideoFrameCallback( + participant_identity, source, std::move(callback), std::move(opts)); + } +} + +void Room::clearOnAudioFrameCallback(const std::string &participant_identity, + TrackSource source) { + if (subscription_thread_dispatcher_) { + subscription_thread_dispatcher_->clearOnAudioFrameCallback( + participant_identity, source); + } +} + +void Room::clearOnVideoFrameCallback(const std::string &participant_identity, + TrackSource source) { + if (subscription_thread_dispatcher_) { + subscription_thread_dispatcher_->clearOnVideoFrameCallback( + participant_identity, source); + } +} + void Room::OnEvent(const FfiEvent &event) { // Take a snapshot of the delegate under lock, but do NOT call it under the // lock. RoomDelegate *delegate_snapshot = nullptr; - { std::lock_guard guard(lock_); delegate_snapshot = delegate_; @@ -365,7 +409,7 @@ void Room::OnEvent(const FfiEvent &event) { } const auto <p = re.local_track_published(); const std::string &sid = ltp.track_sid(); - auto &pubs = local_participant_->trackPublications(); + const auto pubs = local_participant_->trackPublications(); auto it = pubs.find(sid); if (it == pubs.end()) { LK_LOG_WARN("local_track_published for unknown sid: {}", sid); @@ -389,7 +433,7 @@ void Room::OnEvent(const FfiEvent &event) { } const auto <u = re.local_track_unpublished(); const std::string &pub_sid = ltu.publication_sid(); - auto &pubs = local_participant_->trackPublications(); + const auto pubs = local_participant_->trackPublications(); auto it = pubs.find(pub_sid); if (it == pubs.end()) { LK_LOG_WARN("local_track_unpublished for unknown publication sid: {}", @@ -412,7 +456,7 @@ void Room::OnEvent(const FfiEvent &event) { } const auto <s = re.local_track_subscribed(); const std::string &sid = lts.track_sid(); - auto &pubs = local_participant_->trackPublications(); + const auto pubs = local_participant_->trackPublications(); auto it = pubs.find(sid); if (it == pubs.end()) { LK_LOG_WARN("local_track_subscribed for unknown sid: {}", sid); @@ -539,19 +583,26 @@ void Room::OnEvent(const FfiEvent &event) { if (delegate_snapshot) { delegate_snapshot->onTrackSubscribed(*this, ev); } + + if (subscription_thread_dispatcher_ && remote_track && rpublication) { + subscription_thread_dispatcher_->handleTrackSubscribed( + identity, rpublication->source(), remote_track); + } break; } case proto::RoomEvent::kTrackUnsubscribed: { TrackUnsubscribedEvent ev; + TrackSource unsub_source = TrackSource::SOURCE_UNKNOWN; + std::string unsub_identity; { std::lock_guard guard(lock_); const auto &tu = re.track_unsubscribed(); - const std::string &identity = tu.participant_identity(); + unsub_identity = tu.participant_identity(); const std::string &track_sid = tu.track_sid(); - auto pit = remote_participants_.find(identity); + auto pit = remote_participants_.find(unsub_identity); if (pit == remote_participants_.end()) { LK_LOG_WARN("track_unsubscribed for unknown participant: {}", - identity); + unsub_identity); break; } RemoteParticipant *rparticipant = pit->second.get(); @@ -560,10 +611,11 @@ void Room::OnEvent(const FfiEvent &event) { if (pubIt == pubs.end()) { LK_LOG_WARN("track_unsubscribed for unknown publication sid {} " "(participant {})", - track_sid, identity); + track_sid, unsub_identity); break; } auto publication = pubIt->second; + unsub_source = publication->source(); auto track = publication->track(); publication->setTrack(nullptr); publication->setSubscribed(false); @@ -575,6 +627,12 @@ void Room::OnEvent(const FfiEvent &event) { if (delegate_snapshot) { delegate_snapshot->onTrackUnsubscribed(*this, ev); } + + if (subscription_thread_dispatcher_ && + unsub_source != TrackSource::SOURCE_UNKNOWN) { + subscription_thread_dispatcher_->handleTrackUnsubscribed(unsub_identity, + unsub_source); + } break; } case proto::RoomEvent::kTrackSubscriptionFailed: { @@ -1000,7 +1058,10 @@ void Room::OnEvent(const FfiEvent &event) { break; } case proto::RoomEvent::kEos: { - // Remove listener since no more events will come for this room + if (subscription_thread_dispatcher_) { + subscription_thread_dispatcher_->stopAll(); + } + int listener_to_remove = 0; // Move state out of lock scope before destroying to avoid holding lock diff --git a/src/subscription_thread_dispatcher.cpp b/src/subscription_thread_dispatcher.cpp new file mode 100644 index 00000000..a7f9a2a7 --- /dev/null +++ b/src/subscription_thread_dispatcher.cpp @@ -0,0 +1,362 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "livekit/subscription_thread_dispatcher.h" + +#include "livekit/lk_log.h" +#include "livekit/track.h" + +#include +#include +#include + +namespace livekit { + +namespace { + +const char *trackKindName(TrackKind kind) { + if (kind == TrackKind::KIND_AUDIO) + return "audio"; + if (kind == TrackKind::KIND_VIDEO) + return "video"; + if (kind == TrackKind::KIND_UNKNOWN) + return "unknown"; + + return "unsupported"; +} + +} // namespace + +SubscriptionThreadDispatcher::SubscriptionThreadDispatcher() = default; + +SubscriptionThreadDispatcher::~SubscriptionThreadDispatcher() { + LK_LOG_DEBUG("Destroying SubscriptionThreadDispatcher"); + stopAll(); +} + +void SubscriptionThreadDispatcher::setOnAudioFrameCallback( + const std::string &participant_identity, TrackSource source, + AudioFrameCallback callback, AudioStream::Options opts) { + CallbackKey key{participant_identity, source}; + std::lock_guard lock(lock_); + const bool replacing = audio_callbacks_.find(key) != audio_callbacks_.end(); + audio_callbacks_[key] = + RegisteredAudioCallback{std::move(callback), std::move(opts)}; + LK_LOG_DEBUG("Registered audio frame callback for participant={} source={} " + "replacing_existing={} total_audio_callbacks={}", + participant_identity, static_cast(source), replacing, + audio_callbacks_.size()); +} + +void SubscriptionThreadDispatcher::setOnVideoFrameCallback( + const std::string &participant_identity, TrackSource source, + VideoFrameCallback callback, VideoStream::Options opts) { + CallbackKey key{participant_identity, source}; + std::lock_guard lock(lock_); + const bool replacing = video_callbacks_.find(key) != video_callbacks_.end(); + video_callbacks_[key] = + RegisteredVideoCallback{std::move(callback), std::move(opts)}; + LK_LOG_DEBUG("Registered video frame callback for participant={} source={} " + "replacing_existing={} total_video_callbacks={}", + participant_identity, static_cast(source), replacing, + video_callbacks_.size()); +} + +void SubscriptionThreadDispatcher::clearOnAudioFrameCallback( + const std::string &participant_identity, TrackSource source) { + CallbackKey key{participant_identity, source}; + std::thread old_thread; + bool removed_callback = false; + { + std::lock_guard lock(lock_); + removed_callback = audio_callbacks_.erase(key) > 0; + old_thread = extractReaderThreadLocked(key); + LK_LOG_DEBUG( + "Clearing audio frame callback for participant={} source={} " + "removed_callback={} stopped_reader={} remaining_audio_callbacks={}", + participant_identity, static_cast(source), removed_callback, + old_thread.joinable(), audio_callbacks_.size()); + } + if (old_thread.joinable()) { + old_thread.join(); + } +} + +void SubscriptionThreadDispatcher::clearOnVideoFrameCallback( + const std::string &participant_identity, TrackSource source) { + CallbackKey key{participant_identity, source}; + std::thread old_thread; + bool removed_callback = false; + { + std::lock_guard lock(lock_); + removed_callback = video_callbacks_.erase(key) > 0; + old_thread = extractReaderThreadLocked(key); + LK_LOG_DEBUG( + "Clearing video frame callback for participant={} source={} " + "removed_callback={} stopped_reader={} remaining_video_callbacks={}", + participant_identity, static_cast(source), removed_callback, + old_thread.joinable(), video_callbacks_.size()); + } + if (old_thread.joinable()) { + old_thread.join(); + } +} + +void SubscriptionThreadDispatcher::handleTrackSubscribed( + const std::string &participant_identity, TrackSource source, + const std::shared_ptr &track) { + if (!track) { + LK_LOG_WARN( + "Ignoring subscribed track dispatch for participant={} source={} " + "because track is null", + participant_identity, static_cast(source)); + return; + } + + LK_LOG_DEBUG("Handling subscribed track for participant={} source={} kind={}", + participant_identity, static_cast(source), + trackKindName(track->kind())); + + CallbackKey key{participant_identity, source}; + std::thread old_thread; + { + std::lock_guard lock(lock_); + old_thread = startReaderLocked(key, track); + } + if (old_thread.joinable()) { + old_thread.join(); + } +} + +void SubscriptionThreadDispatcher::handleTrackUnsubscribed( + const std::string &participant_identity, TrackSource source) { + CallbackKey key{participant_identity, source}; + std::thread old_thread; + { + std::lock_guard lock(lock_); + old_thread = extractReaderThreadLocked(key); + LK_LOG_DEBUG("Handling unsubscribed track for participant={} source={} " + "stopped_reader={}", + participant_identity, static_cast(source), + old_thread.joinable()); + } + if (old_thread.joinable()) { + old_thread.join(); + } +} + +void SubscriptionThreadDispatcher::stopAll() { + std::vector threads; + std::size_t active_reader_count = 0; + std::size_t audio_callback_count = 0; + std::size_t video_callback_count = 0; + { + std::lock_guard lock(lock_); + active_reader_count = active_readers_.size(); + audio_callback_count = audio_callbacks_.size(); + video_callback_count = video_callbacks_.size(); + LK_LOG_DEBUG("Stopping all subscription readers active_readers={} " + "audio_callbacks={} video_callbacks={}", + active_reader_count, audio_callback_count, + video_callback_count); + for (auto &[key, reader] : active_readers_) { + LK_LOG_TRACE("Closing active reader for participant={} source={}", + key.participant_identity, static_cast(key.source)); + if (reader.audio_stream) { + reader.audio_stream->close(); + } + if (reader.video_stream) { + reader.video_stream->close(); + } + if (reader.thread.joinable()) { + threads.push_back(std::move(reader.thread)); + } + } + active_readers_.clear(); + audio_callbacks_.clear(); + video_callbacks_.clear(); + } + for (auto &thread : threads) { + thread.join(); + } + LK_LOG_DEBUG("Stopped {} subscription reader threads", threads.size()); +} + +std::thread SubscriptionThreadDispatcher::extractReaderThreadLocked( + const CallbackKey &key) { + auto it = active_readers_.find(key); + if (it == active_readers_.end()) { + LK_LOG_TRACE("No active reader to extract for participant={} source={}", + key.participant_identity, static_cast(key.source)); + return {}; + } + + LK_LOG_DEBUG("Extracting active reader for participant={} source={}", + key.participant_identity, static_cast(key.source)); + ActiveReader reader = std::move(it->second); + active_readers_.erase(it); + + if (reader.audio_stream) { + reader.audio_stream->close(); + } + if (reader.video_stream) { + reader.video_stream->close(); + } + return std::move(reader.thread); +} + +std::thread SubscriptionThreadDispatcher::startReaderLocked( + const CallbackKey &key, const std::shared_ptr &track) { + if (track->kind() == TrackKind::KIND_AUDIO) { + auto it = audio_callbacks_.find(key); + if (it == audio_callbacks_.end()) { + LK_LOG_TRACE("Skipping audio reader start for participant={} source={} " + "because no audio callback is registered", + key.participant_identity, static_cast(key.source)); + return {}; + } + return startAudioReaderLocked(key, track, it->second.callback, + it->second.options); + } + if (track->kind() == TrackKind::KIND_VIDEO) { + auto it = video_callbacks_.find(key); + if (it == video_callbacks_.end()) { + LK_LOG_TRACE("Skipping video reader start for participant={} source={} " + "because no video callback is registered", + key.participant_identity, static_cast(key.source)); + return {}; + } + return startVideoReaderLocked(key, track, it->second.callback, + it->second.options); + } + if (track->kind() == TrackKind::KIND_UNKNOWN) { + LK_LOG_WARN( + "Skipping reader start for participant={} source={} because track " + "kind is unknown", + key.participant_identity, static_cast(key.source)); + return {}; + } + + LK_LOG_WARN( + "Skipping reader start for participant={} source={} because track kind " + "is unsupported", + key.participant_identity, static_cast(key.source)); + return {}; +} + +std::thread SubscriptionThreadDispatcher::startAudioReaderLocked( + const CallbackKey &key, const std::shared_ptr &track, + AudioFrameCallback cb, const AudioStream::Options &opts) { + LK_LOG_DEBUG("Starting audio reader for participant={} source={}", + key.participant_identity, static_cast(key.source)); + auto old_thread = extractReaderThreadLocked(key); + + if (static_cast(active_readers_.size()) >= kMaxActiveReaders) { + LK_LOG_ERROR( + "Cannot start audio reader for {} source={}: active reader limit ({}) " + "reached", + key.participant_identity, static_cast(key.source), + kMaxActiveReaders); + return old_thread; + } + + auto stream = AudioStream::fromTrack(track, opts); + if (!stream) { + LK_LOG_ERROR("Failed to create AudioStream for {} source={}", + key.participant_identity, static_cast(key.source)); + return old_thread; + } + + ActiveReader reader; + reader.audio_stream = stream; + auto stream_copy = stream; + const std::string participant_identity = key.participant_identity; + const TrackSource source = key.source; + reader.thread = + std::thread([stream_copy, cb, participant_identity, source]() { + LK_LOG_DEBUG("Audio reader thread started for participant={} source={}", + participant_identity, static_cast(source)); + AudioFrameEvent ev; + while (stream_copy->read(ev)) { + try { + cb(ev.frame); + } catch (const std::exception &e) { + LK_LOG_ERROR("Audio frame callback exception: {}", e.what()); + } + } + LK_LOG_DEBUG("Audio reader thread exiting for participant={} source={}", + participant_identity, static_cast(source)); + }); + active_readers_[key] = std::move(reader); + LK_LOG_DEBUG("Started audio reader for participant={} source={} " + "active_readers={}", + key.participant_identity, static_cast(key.source), + active_readers_.size()); + return old_thread; +} + +std::thread SubscriptionThreadDispatcher::startVideoReaderLocked( + const CallbackKey &key, const std::shared_ptr &track, + VideoFrameCallback cb, const VideoStream::Options &opts) { + LK_LOG_DEBUG("Starting video reader for participant={} source={}", + key.participant_identity, static_cast(key.source)); + auto old_thread = extractReaderThreadLocked(key); + + if (static_cast(active_readers_.size()) >= kMaxActiveReaders) { + LK_LOG_ERROR( + "Cannot start video reader for {} source={}: active reader limit ({}) " + "reached", + key.participant_identity, static_cast(key.source), + kMaxActiveReaders); + return old_thread; + } + + auto stream = VideoStream::fromTrack(track, opts); + if (!stream) { + LK_LOG_ERROR("Failed to create VideoStream for {} source={}", + key.participant_identity, static_cast(key.source)); + return old_thread; + } + + ActiveReader reader; + reader.video_stream = stream; + auto stream_copy = stream; + const std::string participant_identity = key.participant_identity; + const TrackSource source = key.source; + reader.thread = + std::thread([stream_copy, cb, participant_identity, source]() { + LK_LOG_DEBUG("Video reader thread started for participant={} source={}", + participant_identity, static_cast(source)); + VideoFrameEvent ev; + while (stream_copy->read(ev)) { + try { + cb(ev.frame, ev.timestamp_us); + } catch (const std::exception &e) { + LK_LOG_ERROR("Video frame callback exception: {}", e.what()); + } + } + LK_LOG_DEBUG("Video reader thread exiting for participant={} source={}", + participant_identity, static_cast(source)); + }); + active_readers_[key] = std::move(reader); + LK_LOG_DEBUG("Started video reader for participant={} source={} " + "active_readers={}", + key.participant_identity, static_cast(key.source), + active_readers_.size()); + return old_thread; +} + +} // namespace livekit diff --git a/src/tests/integration/test_subscription_thread_dispatcher.cpp b/src/tests/integration/test_subscription_thread_dispatcher.cpp new file mode 100644 index 00000000..71601a18 --- /dev/null +++ b/src/tests/integration/test_subscription_thread_dispatcher.cpp @@ -0,0 +1,383 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// @file test_subscription_thread_dispatcher.cpp +/// @brief Unit tests for SubscriptionThreadDispatcher registration state. + +#include +#include + +#include +#include +#include +#include + +namespace livekit { + +class SubscriptionThreadDispatcherTest : public ::testing::Test { +protected: + void SetUp() override { + livekit::initialize(livekit::LogLevel::Info, livekit::LogSink::kConsole); + } + + void TearDown() override { livekit::shutdown(); } + + using CallbackKey = SubscriptionThreadDispatcher::CallbackKey; + using CallbackKeyHash = SubscriptionThreadDispatcher::CallbackKeyHash; + + static auto &audioCallbacks(SubscriptionThreadDispatcher &dispatcher) { + return dispatcher.audio_callbacks_; + } + static auto &videoCallbacks(SubscriptionThreadDispatcher &dispatcher) { + return dispatcher.video_callbacks_; + } + static auto &activeReaders(SubscriptionThreadDispatcher &dispatcher) { + return dispatcher.active_readers_; + } + static int maxActiveReaders() { + return SubscriptionThreadDispatcher::kMaxActiveReaders; + } +}; + +// ============================================================================ +// CallbackKey equality +// ============================================================================ + +TEST_F(SubscriptionThreadDispatcherTest, CallbackKeyEqualKeysCompareEqual) { + CallbackKey a{"alice", TrackSource::SOURCE_MICROPHONE}; + CallbackKey b{"alice", TrackSource::SOURCE_MICROPHONE}; + EXPECT_TRUE(a == b); +} + +TEST_F(SubscriptionThreadDispatcherTest, + CallbackKeyDifferentIdentityNotEqual) { + CallbackKey a{"alice", TrackSource::SOURCE_MICROPHONE}; + CallbackKey b{"bob", TrackSource::SOURCE_MICROPHONE}; + EXPECT_FALSE(a == b); +} + +TEST_F(SubscriptionThreadDispatcherTest, CallbackKeyDifferentSourceNotEqual) { + CallbackKey a{"alice", TrackSource::SOURCE_MICROPHONE}; + CallbackKey b{"alice", TrackSource::SOURCE_CAMERA}; + EXPECT_FALSE(a == b); +} + +// ============================================================================ +// CallbackKeyHash +// ============================================================================ + +TEST_F(SubscriptionThreadDispatcherTest, + CallbackKeyHashEqualKeysProduceSameHash) { + CallbackKey a{"alice", TrackSource::SOURCE_MICROPHONE}; + CallbackKey b{"alice", TrackSource::SOURCE_MICROPHONE}; + CallbackKeyHash hasher; + EXPECT_EQ(hasher(a), hasher(b)); +} + +TEST_F(SubscriptionThreadDispatcherTest, + CallbackKeyHashDifferentKeysLikelyDifferentHash) { + CallbackKeyHash hasher; + CallbackKey mic{"alice", TrackSource::SOURCE_MICROPHONE}; + CallbackKey cam{"alice", TrackSource::SOURCE_CAMERA}; + CallbackKey bob{"bob", TrackSource::SOURCE_MICROPHONE}; + + EXPECT_NE(hasher(mic), hasher(cam)); + EXPECT_NE(hasher(mic), hasher(bob)); +} + +TEST_F(SubscriptionThreadDispatcherTest, CallbackKeyWorksAsUnorderedMapKey) { + std::unordered_map map; + + CallbackKey k1{"alice", TrackSource::SOURCE_MICROPHONE}; + CallbackKey k2{"bob", TrackSource::SOURCE_CAMERA}; + CallbackKey k3{"alice", TrackSource::SOURCE_CAMERA}; + + map[k1] = 1; + map[k2] = 2; + map[k3] = 3; + + EXPECT_EQ(map.size(), 3u); + EXPECT_EQ(map[k1], 1); + EXPECT_EQ(map[k2], 2); + EXPECT_EQ(map[k3], 3); + + map[k1] = 42; + EXPECT_EQ(map[k1], 42); + EXPECT_EQ(map.size(), 3u); + + map.erase(k2); + EXPECT_EQ(map.size(), 2u); + EXPECT_EQ(map.count(k2), 0u); +} + +TEST_F(SubscriptionThreadDispatcherTest, CallbackKeyEmptyIdentityWorks) { + CallbackKey a{"", TrackSource::SOURCE_UNKNOWN}; + CallbackKey b{"", TrackSource::SOURCE_UNKNOWN}; + CallbackKeyHash hasher; + EXPECT_TRUE(a == b); + EXPECT_EQ(hasher(a), hasher(b)); +} + +// ============================================================================ +// kMaxActiveReaders +// ============================================================================ + +TEST_F(SubscriptionThreadDispatcherTest, MaxActiveReadersIs20) { + EXPECT_EQ(maxActiveReaders(), 20); +} + +// ============================================================================ +// Registration and clearing +// ============================================================================ + +TEST_F(SubscriptionThreadDispatcherTest, SetAudioCallbackStoresRegistration) { + SubscriptionThreadDispatcher dispatcher; + dispatcher.setOnAudioFrameCallback("alice", TrackSource::SOURCE_MICROPHONE, + [](const AudioFrame &) {}); + + EXPECT_EQ(audioCallbacks(dispatcher).size(), 1u); +} + +TEST_F(SubscriptionThreadDispatcherTest, SetVideoCallbackStoresRegistration) { + SubscriptionThreadDispatcher dispatcher; + dispatcher.setOnVideoFrameCallback("alice", TrackSource::SOURCE_CAMERA, + [](const VideoFrame &, std::int64_t) {}); + + EXPECT_EQ(videoCallbacks(dispatcher).size(), 1u); +} + +TEST_F(SubscriptionThreadDispatcherTest, ClearAudioCallbackRemovesRegistration) { + SubscriptionThreadDispatcher dispatcher; + dispatcher.setOnAudioFrameCallback("alice", TrackSource::SOURCE_MICROPHONE, + [](const AudioFrame &) {}); + ASSERT_EQ(audioCallbacks(dispatcher).size(), 1u); + + dispatcher.clearOnAudioFrameCallback("alice", TrackSource::SOURCE_MICROPHONE); + EXPECT_EQ(audioCallbacks(dispatcher).size(), 0u); +} + +TEST_F(SubscriptionThreadDispatcherTest, ClearVideoCallbackRemovesRegistration) { + SubscriptionThreadDispatcher dispatcher; + dispatcher.setOnVideoFrameCallback("alice", TrackSource::SOURCE_CAMERA, + [](const VideoFrame &, std::int64_t) {}); + ASSERT_EQ(videoCallbacks(dispatcher).size(), 1u); + + dispatcher.clearOnVideoFrameCallback("alice", TrackSource::SOURCE_CAMERA); + EXPECT_EQ(videoCallbacks(dispatcher).size(), 0u); +} + +TEST_F(SubscriptionThreadDispatcherTest, ClearNonExistentCallbackIsNoOp) { + SubscriptionThreadDispatcher dispatcher; + EXPECT_NO_THROW(dispatcher.clearOnAudioFrameCallback( + "nobody", TrackSource::SOURCE_MICROPHONE)); + EXPECT_NO_THROW( + dispatcher.clearOnVideoFrameCallback("nobody", TrackSource::SOURCE_CAMERA)); +} + +TEST_F(SubscriptionThreadDispatcherTest, OverwriteAudioCallbackKeepsSingleEntry) { + SubscriptionThreadDispatcher dispatcher; + std::atomic counter1{0}; + std::atomic counter2{0}; + + dispatcher.setOnAudioFrameCallback( + "alice", TrackSource::SOURCE_MICROPHONE, + [&counter1](const AudioFrame &) { counter1++; }); + dispatcher.setOnAudioFrameCallback( + "alice", TrackSource::SOURCE_MICROPHONE, + [&counter2](const AudioFrame &) { counter2++; }); + + EXPECT_EQ(audioCallbacks(dispatcher).size(), 1u) + << "Re-registering with the same key should overwrite, not add"; +} + +TEST_F(SubscriptionThreadDispatcherTest, OverwriteVideoCallbackKeepsSingleEntry) { + SubscriptionThreadDispatcher dispatcher; + dispatcher.setOnVideoFrameCallback("alice", TrackSource::SOURCE_CAMERA, + [](const VideoFrame &, std::int64_t) {}); + dispatcher.setOnVideoFrameCallback("alice", TrackSource::SOURCE_CAMERA, + [](const VideoFrame &, std::int64_t) {}); + + EXPECT_EQ(videoCallbacks(dispatcher).size(), 1u); +} + +TEST_F(SubscriptionThreadDispatcherTest, + MultipleDistinctCallbacksAreIndependent) { + SubscriptionThreadDispatcher dispatcher; + dispatcher.setOnAudioFrameCallback("alice", TrackSource::SOURCE_MICROPHONE, + [](const AudioFrame &) {}); + dispatcher.setOnVideoFrameCallback("alice", TrackSource::SOURCE_CAMERA, + [](const VideoFrame &, std::int64_t) {}); + dispatcher.setOnAudioFrameCallback("bob", TrackSource::SOURCE_MICROPHONE, + [](const AudioFrame &) {}); + dispatcher.setOnVideoFrameCallback("bob", TrackSource::SOURCE_CAMERA, + [](const VideoFrame &, std::int64_t) {}); + + EXPECT_EQ(audioCallbacks(dispatcher).size(), 2u); + EXPECT_EQ(videoCallbacks(dispatcher).size(), 2u); + + dispatcher.clearOnAudioFrameCallback("alice", + TrackSource::SOURCE_MICROPHONE); + EXPECT_EQ(audioCallbacks(dispatcher).size(), 1u); + EXPECT_EQ(videoCallbacks(dispatcher).size(), 2u); +} + +TEST_F(SubscriptionThreadDispatcherTest, ClearingOneSourceDoesNotAffectOther) { + SubscriptionThreadDispatcher dispatcher; + dispatcher.setOnAudioFrameCallback("alice", TrackSource::SOURCE_MICROPHONE, + [](const AudioFrame &) {}); + dispatcher.setOnAudioFrameCallback("alice", + TrackSource::SOURCE_SCREENSHARE_AUDIO, + [](const AudioFrame &) {}); + ASSERT_EQ(audioCallbacks(dispatcher).size(), 2u); + + dispatcher.clearOnAudioFrameCallback("alice", + TrackSource::SOURCE_MICROPHONE); + EXPECT_EQ(audioCallbacks(dispatcher).size(), 1u); + + CallbackKey remaining{"alice", TrackSource::SOURCE_SCREENSHARE_AUDIO}; + EXPECT_EQ(audioCallbacks(dispatcher).count(remaining), 1u); +} + +// ============================================================================ +// Active readers state (no real streams, just map state) +// ============================================================================ + +TEST_F(SubscriptionThreadDispatcherTest, NoActiveReadersInitially) { + SubscriptionThreadDispatcher dispatcher; + EXPECT_TRUE(activeReaders(dispatcher).empty()); +} + +TEST_F(SubscriptionThreadDispatcherTest, + ActiveReadersEmptyAfterCallbackRegistration) { + SubscriptionThreadDispatcher dispatcher; + dispatcher.setOnAudioFrameCallback("alice", TrackSource::SOURCE_MICROPHONE, + [](const AudioFrame &) {}); + EXPECT_TRUE(activeReaders(dispatcher).empty()) + << "Registering a callback without a subscribed track should not spawn " + "readers"; +} + +// ============================================================================ +// Destruction safety +// ============================================================================ + +TEST_F(SubscriptionThreadDispatcherTest, + DestroyDispatcherWithRegisteredCallbacksIsSafe) { + EXPECT_NO_THROW({ + SubscriptionThreadDispatcher dispatcher; + dispatcher.setOnAudioFrameCallback("alice", TrackSource::SOURCE_MICROPHONE, + [](const AudioFrame &) {}); + dispatcher.setOnVideoFrameCallback("bob", TrackSource::SOURCE_CAMERA, + [](const VideoFrame &, std::int64_t) {}); + }); +} + +TEST_F(SubscriptionThreadDispatcherTest, + DestroyDispatcherAfterClearingCallbacksIsSafe) { + EXPECT_NO_THROW({ + SubscriptionThreadDispatcher dispatcher; + dispatcher.setOnAudioFrameCallback("alice", TrackSource::SOURCE_MICROPHONE, + [](const AudioFrame &) {}); + dispatcher.clearOnAudioFrameCallback("alice", + TrackSource::SOURCE_MICROPHONE); + }); +} + +// ============================================================================ +// Thread-safety of registration/clearing +// ============================================================================ + +TEST_F(SubscriptionThreadDispatcherTest, ConcurrentRegistrationDoesNotCrash) { + SubscriptionThreadDispatcher dispatcher; + constexpr int kThreads = 8; + constexpr int kIterations = 100; + + std::vector threads; + threads.reserve(kThreads); + + for (int t = 0; t < kThreads; ++t) { + threads.emplace_back([&dispatcher, t]() { + for (int i = 0; i < kIterations; ++i) { + std::string id = "participant-" + std::to_string(t); + dispatcher.setOnAudioFrameCallback(id, TrackSource::SOURCE_MICROPHONE, + [](const AudioFrame &) {}); + dispatcher.clearOnAudioFrameCallback(id, + TrackSource::SOURCE_MICROPHONE); + } + }); + } + + for (auto &thread : threads) { + thread.join(); + } + + EXPECT_TRUE(audioCallbacks(dispatcher).empty()) + << "All callbacks should be cleared after concurrent register/clear"; +} + +TEST_F(SubscriptionThreadDispatcherTest, + ConcurrentMixedAudioVideoRegistration) { + SubscriptionThreadDispatcher dispatcher; + constexpr int kThreads = 4; + constexpr int kIterations = 50; + + std::vector threads; + + for (int t = 0; t < kThreads; ++t) { + threads.emplace_back([&dispatcher, t]() { + std::string id = "p-" + std::to_string(t); + for (int i = 0; i < kIterations; ++i) { + dispatcher.setOnAudioFrameCallback(id, TrackSource::SOURCE_MICROPHONE, + [](const AudioFrame &) {}); + dispatcher.setOnVideoFrameCallback(id, TrackSource::SOURCE_CAMERA, + [](const VideoFrame &, + std::int64_t) {}); + } + }); + } + + for (auto &thread : threads) { + thread.join(); + } + + EXPECT_EQ(audioCallbacks(dispatcher).size(), static_cast(kThreads)); + EXPECT_EQ(videoCallbacks(dispatcher).size(), static_cast(kThreads)); +} + +// ============================================================================ +// Bulk registration +// ============================================================================ + +TEST_F(SubscriptionThreadDispatcherTest, ManyDistinctCallbacksCanBeRegistered) { + SubscriptionThreadDispatcher dispatcher; + constexpr int kCount = 50; + + for (int i = 0; i < kCount; ++i) { + dispatcher.setOnAudioFrameCallback("participant-" + std::to_string(i), + TrackSource::SOURCE_MICROPHONE, + [](const AudioFrame &) {}); + } + + EXPECT_EQ(audioCallbacks(dispatcher).size(), static_cast(kCount)); + + for (int i = 0; i < kCount; ++i) { + dispatcher.clearOnAudioFrameCallback("participant-" + std::to_string(i), + TrackSource::SOURCE_MICROPHONE); + } + + EXPECT_EQ(audioCallbacks(dispatcher).size(), 0u); +} + +} // namespace livekit diff --git a/src/tests/stress/test_latency_measurement.cpp b/src/tests/stress/test_latency_measurement.cpp index 2c95859f..ed988b79 100644 --- a/src/tests/stress/test_latency_measurement.cpp +++ b/src/tests/stress/test_latency_measurement.cpp @@ -190,9 +190,9 @@ TEST_F(LatencyMeasurementTest, AudioLatency) { LocalAudioTrack::createLocalAudioTrack("latency-test", audio_source); TrackPublishOptions publish_options; - auto publication = sender_room->localParticipant()->publishTrack( - audio_track, publish_options); - ASSERT_NE(publication, nullptr) << "Failed to publish audio track"; + sender_room->localParticipant()->publishTrack(audio_track, publish_options); + ASSERT_NE(audio_track->publication(), nullptr) + << "Failed to publish audio track"; std::cout << "Audio track published, waiting for subscription..." << std::endl; @@ -343,7 +343,8 @@ TEST_F(LatencyMeasurementTest, AudioLatency) { } // Clean up - sender_room->localParticipant()->unpublishTrack(publication->sid()); + sender_room->localParticipant()->unpublishTrack( + audio_track->publication()->sid()); EXPECT_GT(stats.count(), 0) << "At least one audio latency measurement should be recorded";