Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ CMakeCache.txt
Makefile
cmake_install.cmake
out
build.log
build/
build-debug/
build-release/
Expand Down
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ add_library(livekit SHARED
src/room.cpp
src/room_proto_converter.cpp
src/room_proto_converter.h
src/subscription_thread_dispatcher.cpp
src/local_participant.cpp
src/remote_participant.cpp
src/stats.cpp
Expand Down Expand Up @@ -723,4 +724,4 @@ add_custom_target(clean_all
COMMAND ${CMAKE_COMMAND} -E chdir "${CMAKE_SOURCE_DIR}" ${CMAKE_COMMAND} -E rm -rf "${CMAKE_BINARY_DIR}"
COMMENT "Full clean: CMake outputs + Rust target + generated protos + delete build/"
VERBATIM
)
)
2 changes: 0 additions & 2 deletions bridge/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ add_library(livekit_bridge SHARED
src/livekit_bridge.cpp
src/bridge_audio_track.cpp
src/bridge_video_track.cpp
src/bridge_room_delegate.cpp
src/bridge_room_delegate.h
src/rpc_constants.cpp
src/rpc_controller.cpp
src/rpc_controller.h
Expand Down
22 changes: 10 additions & 12 deletions bridge/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# LiveKit Bridge

# **WARNING: This library is deprecated, use the base sdk found in src/**

A simplified, high-level C++ wrapper around the [LiveKit C++ SDK](../README.md). The bridge abstracts away room lifecycle management, track creation, publishing, and subscription boilerplate so that external codebases can interface with LiveKit in just a few lines. It is intended that this library will be used to bridge the LiveKit C++ SDK into other SDKs such as, but not limited to, Foxglove, ROS, and Rerun.

It is intended that this library closely matches the style of the core LiveKit C++ SDK.
Expand Down Expand Up @@ -98,44 +100,41 @@ Your Application

**`BridgeAudioTrack` / `BridgeVideoTrack`** -- RAII handles for published local tracks. Created via `createAudioTrack()` / `createVideoTrack()`. When the `shared_ptr` is dropped, the track is automatically unpublished and all underlying SDK resources are freed. Call `pushFrame()` to send audio/video data to remote participants.

**`BridgeRoomDelegate`** -- Internal (not part of the public API; lives in `src/`). Listens for `onTrackSubscribed` / `onTrackUnsubscribed` events from the LiveKit SDK and wires up reader threads automatically.

### What is a Reader?

A **reader** is a background thread that receives decoded media frames from a remote participant.

When a remote participant publishes an audio or video track and the bridge subscribes to it (auto-subscribe is enabled by default), the bridge creates an `AudioStream` or `VideoStream` from that track and spins up a dedicated thread. This thread loops on `stream->read()`, which blocks until a new frame arrives. Each received frame is forwarded to the user's registered callback.
When a remote participant publishes an audio or video track and the room subscribes to it (auto-subscribe is enabled by default), `Room` creates an `AudioStream` or `VideoStream` from that track and spins up a dedicated thread. This thread loops on `stream->read()`, which blocks until a new frame arrives. Each received frame is forwarded to the user's registered callback.

In short:

- **Sending** (you -> remote): `BridgeAudioTrack::pushFrame()` / `BridgeVideoTrack::pushFrame()`
- **Receiving** (remote -> you): reader threads invoke your registered callbacks

Reader threads are managed entirely by the bridge. They are created when a matching remote track is subscribed, and torn down (stream closed, thread joined) when the track is unsubscribed, the callback is unregistered, or `disconnect()` is called.
Reader threads are managed by `Room` internally. They are created when a matching remote track is subscribed, and torn down (stream closed, thread joined) when the track is unsubscribed, the callback is unregistered, or the `Room` is destroyed.

### Callback Registration Timing

Callbacks are keyed by `(participant_identity, track_source)`. You can register them **before** the remote participant has joined the room. The bridge stores the callback and automatically wires it up when the matching track is subscribed.
Callbacks are keyed by `(participant_identity, track_source)`. You can register them **after connecting** but before the remote participant has joined the room. `Room` stores the callback and automatically wires it up when the matching track is subscribed.

> **Note:** Only one callback may be set per `(participant_identity, track_source)` pair. Calling `setOnAudioFrameCallback` or `setOnVideoFrameCallback` again with the same identity and source will silently replace the previous callback. If you need to fan-out a single stream to multiple consumers, do so inside your callback.

This means the typical pattern is:

```cpp
// Register first, connect second -- or register after connect but before
// the remote participant joins.
bridge.setOnAudioFrameCallback("robot-1", livekit::TrackSource::SOURCE_MICROPHONE, my_callback);
// Connect first, then register callbacks before the remote participant joins.
livekit::RoomOptions options;
options.auto_subscribe = true;
bridge.connect(url, token, options);
bridge.setOnAudioFrameCallback("robot-1", livekit::TrackSource::SOURCE_MICROPHONE, my_callback);
// When robot-1 joins and publishes a mic track, my_callback starts firing.
```

### Thread Safety

- `LiveKitBridge` uses a mutex to protect the callback map and active reader state.
- Frame callbacks fire on background reader threads. If your callback accesses shared application state, you are responsible for synchronization.
- `disconnect()` closes all streams and joins all reader threads before returning -- it is safe to destroy the bridge immediately after.
- `disconnect()` destroys the `Room`, which closes all streams and joins all reader threads before returning -- it is safe to destroy the bridge immediately after.

## API Reference

Expand Down Expand Up @@ -226,9 +225,8 @@ The human will print periodic summaries like:
## Testing

The bridge includes a unit test suite built with [Google Test](https://github.com/google/googletest). Tests cover
1. `CallbackKey` hashing/equality,
2. `BridgeAudioTrack`/`BridgeVideoTrack` state management, and
3. `LiveKitBridge` pre-connection behaviour (callback registration, error handling).
1. `BridgeAudioTrack`/`BridgeVideoTrack` state management, and
2. `LiveKitBridge` pre-connection behaviour (callback registration, error handling).

### Building and running tests

Expand Down
1 change: 1 addition & 0 deletions bridge/include/livekit_bridge/bridge_audio_track.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class BridgeAudioTrack {

std::shared_ptr<livekit::AudioSource> source_;
std::shared_ptr<livekit::LocalAudioTrack> track_;
/* DEPRECATED. use track_->publication() instead */
std::shared_ptr<livekit::LocalTrackPublication> publication_;
livekit::LocalParticipant *participant_ = nullptr; // not owned
};
Expand Down
1 change: 1 addition & 0 deletions bridge/include/livekit_bridge/bridge_video_track.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class BridgeVideoTrack {

std::shared_ptr<livekit::VideoSource> source_;
std::shared_ptr<livekit::LocalVideoTrack> track_;
/* DEPRECATED. use track_->publication() instead */
std::shared_ptr<livekit::LocalTrackPublication> publication_;
livekit::LocalParticipant *participant_ = nullptr; // not owned
};
Expand Down
106 changes: 19 additions & 87 deletions bridge/include/livekit_bridge/livekit_bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,51 +29,45 @@
#include "livekit/rpc_error.h"

#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

namespace livekit {
class Room;
class AudioFrame;
class VideoFrame;
class AudioStream;
class VideoStream;
class Track;
enum class TrackSource;
} // namespace livekit

namespace livekit_bridge {

class BridgeRoomDelegate;
class RpcController;

namespace test {
class CallbackKeyTest;
class LiveKitBridgeTest;
} // namespace test

/// Callback type for incoming audio frames.
/// Called on a background reader thread.
using AudioFrameCallback = std::function<void(const livekit::AudioFrame &)>;
/// Called on a background reader thread owned by Room.
using AudioFrameCallback = livekit::AudioFrameCallback;

/// Callback type for incoming video frames.
/// Called on a background reader thread.
/// Called on a background reader thread owned by Room.
/// @param frame The decoded video frame (RGBA by default).
/// @param timestamp_us Presentation timestamp in microseconds.
using VideoFrameCallback = std::function<void(const livekit::VideoFrame &frame,
std::int64_t timestamp_us)>;
using VideoFrameCallback = livekit::VideoFrameCallback;

/**
* High-level bridge to the LiveKit C++ SDK.
*
* Owns the full room lifecycle: initialize SDK, create Room, connect,
* publish tracks, and manage incoming frame callbacks.
*
* Frame callback reader threads are managed by Room internally via
* Room::setOnAudioFrameCallback / Room::setOnVideoFrameCallback.
*
* The bridge retains a shared_ptr to every track it creates. On
* disconnect(), all tracks are released (unpublished) before the room
* is torn down, guaranteeing safe teardown order. To unpublish a track
Expand Down Expand Up @@ -114,7 +108,7 @@ class LiveKitBridge {
LiveKitBridge();
~LiveKitBridge();

// Non-copyable, non-movable (owns threads, callbacks, room)
// Non-copyable, non-movable (owns room, callbacks)
LiveKitBridge(const LiveKitBridge &) = delete;
LiveKitBridge &operator=(const LiveKitBridge &) = delete;
LiveKitBridge(LiveKitBridge &&) = delete;
Expand All @@ -139,7 +133,6 @@ class LiveKitBridge {
* @param url WebSocket URL of the LiveKit server.
* @param token Access token for authentication.
* @param options Room options.

* @return true if connection succeeded (or was already connected).
*/
bool connect(const std::string &url, const std::string &token,
Expand All @@ -148,8 +141,9 @@ class LiveKitBridge {
/**
* Disconnect from the room and release all resources.
*
* All published tracks are unpublished, all reader threads are joined,
* and the SDK is shut down. Safe to call multiple times.
* All published tracks are unpublished, reader threads are stopped
* by Room's destructor, and the SDK is shut down. Safe to call
* multiple times.
*/
void disconnect();

Expand Down Expand Up @@ -209,17 +203,16 @@ class LiveKitBridge {
livekit::TrackSource source);

// ---------------------------------------------------------------
// Incoming frame callbacks
// Incoming frame callbacks (delegates to Room)
// ---------------------------------------------------------------

/**
* Set the callback for audio frames from a specific remote participant
* and track source.
*
* The callback fires on a background thread whenever a new audio frame
* is received. If the remote participant has not yet connected, the
* callback is stored and auto-wired when the participant's track is
* subscribed.
* Delegates to Room::setOnAudioFrameCallback. The callback fires on a
* dedicated reader thread owned by Room whenever a new audio frame is
* received.
*
* @note Only **one** callback may be registered per (participant, source)
* pair. Calling this again with the same identity and source will
Expand All @@ -237,6 +230,8 @@ class LiveKitBridge {
* Register a callback for video frames from a specific remote participant
* and track source.
*
* Delegates to Room::setOnVideoFrameCallback.
*
* @note Only **one** callback may be registered per (participant, source)
* pair. Calling this again with the same identity and source will
* silently replace the previous callback.
Expand All @@ -253,8 +248,7 @@ class LiveKitBridge {
* Clear the audio frame callback for a specific remote participant + track
* source.
*
* If a reader thread is active for this (identity, source), it is
* stopped and joined.
* Delegates to Room::clearOnAudioFrameCallback.
*/
void clearOnAudioFrameCallback(const std::string &participant_identity,
livekit::TrackSource source);
Expand All @@ -263,8 +257,7 @@ class LiveKitBridge {
* Clear the video frame callback for a specific remote participant + track
* source.
*
* If a reader thread is active for this (identity, source), it is
* stopped and joined.
* Delegates to Room::clearOnVideoFrameCallback.
*/
void clearOnVideoFrameCallback(const std::string &participant_identity,
livekit::TrackSource source);
Expand Down Expand Up @@ -354,55 +347,8 @@ class LiveKitBridge {
const std::string &track_name);

private:
friend class BridgeRoomDelegate;
friend class test::CallbackKeyTest;
friend class test::LiveKitBridgeTest;

/// Composite key for the callback map: (participant_identity, source).
/// Only one callback can exist per key -- re-registering overwrites.
struct CallbackKey {
std::string identity;
livekit::TrackSource source;

bool operator==(const CallbackKey &o) const;
};

struct CallbackKeyHash {
std::size_t operator()(const CallbackKey &k) const;
};

/// Active reader thread + stream for an incoming track.
struct ActiveReader {
std::shared_ptr<livekit::AudioStream> audio_stream;
std::shared_ptr<livekit::VideoStream> video_stream;
std::thread thread;
bool is_audio = false;
};

/// Called by BridgeRoomDelegate when a remote track is subscribed.
void onTrackSubscribed(const std::string &participant_identity,
livekit::TrackSource source,
const std::shared_ptr<livekit::Track> &track);

/// Called by BridgeRoomDelegate when a remote track is unsubscribed.
void onTrackUnsubscribed(const std::string &participant_identity,
livekit::TrackSource source);

/// Extract the thread for the given callback key.
/// @pre Caller must hold @c mutex_.
std::thread extractReaderThread(const CallbackKey &key);

/// Start a reader thread for a subscribed track.
/// @return The reader thread for this track.
/// @pre Caller must hold @c mutex_.
std::thread startAudioReader(const CallbackKey &key,
const std::shared_ptr<livekit::Track> &track,
AudioFrameCallback cb);
/// @copydoc startAudioReader
std::thread startVideoReader(const CallbackKey &key,
const std::shared_ptr<livekit::Track> &track,
VideoFrameCallback cb);

/// Execute a track action (mute/unmute) by track name.
/// Used as the TrackActionFn callback for RpcController.
/// Throws livekit::RpcError if the track is not found.
Expand All @@ -415,23 +361,9 @@ class LiveKitBridge {
bool connecting_; // guards against concurrent connect() calls
bool sdk_initialized_;

static constexpr int kMaxActiveReaders = 20;

std::unique_ptr<livekit::Room> room_;
std::unique_ptr<BridgeRoomDelegate> delegate_;
std::unique_ptr<RpcController> rpc_controller_;

/// Registered callbacks (may be registered before tracks are subscribed).
std::unordered_map<CallbackKey, AudioFrameCallback, CallbackKeyHash>
audio_callbacks_;
/// @copydoc audio_callbacks_
std::unordered_map<CallbackKey, VideoFrameCallback, CallbackKeyHash>
video_callbacks_;

/// Active reader threads for subscribed tracks.
std::unordered_map<CallbackKey, ActiveReader, CallbackKeyHash>
active_readers_;

/// All tracks created by this bridge. The bridge retains a shared_ptr so
/// it can force-release every track on disconnect() before the room is
/// destroyed, preventing dangling @c participant_ pointers.
Expand Down
6 changes: 2 additions & 4 deletions bridge/src/bridge_audio_track.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include "livekit/audio_source.h"
#include "livekit/local_audio_track.h"
#include "livekit/local_participant.h"
#include "livekit/local_track_publication.h"

#include <stdexcept>

Expand Down Expand Up @@ -111,9 +110,9 @@ void BridgeAudioTrack::release() {
released_ = true;

// Unpublish the track from the room
if (participant_ && publication_) {
if (participant_ && track_ && track_->publication()) {
try {
participant_->unpublishTrack(publication_->sid());
participant_->unpublishTrack(track_->publication()->sid());
} catch (...) {
// Best-effort cleanup; ignore errors during teardown
LK_LOG_WARN("BridgeAudioTrack unpublishTrack error, continuing with "
Expand All @@ -122,7 +121,6 @@ void BridgeAudioTrack::release() {
}

// Release SDK objects in reverse order
publication_.reset();
track_.reset();
source_.reset();
participant_ = nullptr;
Expand Down
Loading
Loading