Skip to content

Commit 38b2884

Browse files
committed
examples: add streamed parsing example
Example for parsing and processing an ULog file in real-time, without keeping the whole file in memory.
1 parent 3635d1a commit 38b2884

5 files changed

Lines changed: 201 additions & 0 deletions

File tree

examples/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,8 @@ add_executable(ulog_writer ulog_writer.cpp)
1414
target_link_libraries(ulog_writer PUBLIC
1515
ulog_cpp::ulog_cpp
1616
)
17+
18+
add_executable(ulog_streamed_parsing ulog_streamed_parsing.cpp)
19+
target_link_libraries(ulog_streamed_parsing PUBLIC
20+
ulog_cpp::ulog_cpp
21+
)

examples/ulog_data.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
#include <ulog_cpp/data_container.hpp>
1010
#include <ulog_cpp/reader.hpp>
1111

12+
// Example of how to use the typed data API for accessing topic data
13+
1214
int main(int argc, char** argv)
1315
{
1416
if (argc < 2) {

examples/ulog_info.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
#include <ulog_cpp/reader.hpp>
1414
#include <variant>
1515

16+
// Example of how to access the different data messages (info, logging, parameters, subscriptions)
17+
1618
int main(int argc, char** argv)
1719
{
1820
if (argc < 2) {

examples/ulog_streamed_parsing.cpp

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/****************************************************************************
2+
* Copyright (c) 2025 PX4 Development Team.
3+
* SPDX-License-Identifier: BSD-3-Clause
4+
****************************************************************************/
5+
6+
#include <cinttypes>
7+
#include <fstream>
8+
#include <iostream>
9+
#include <string>
10+
#include <ulog_cpp/data_container.hpp>
11+
#include <ulog_cpp/reader.hpp>
12+
13+
// Example for parsing and processing an ULog file in real-time, without keeping the whole file
14+
// in memory
15+
16+
class TopicSubscription {
17+
public:
18+
virtual ~TopicSubscription() = default;
19+
20+
virtual void handleData(const ulog_cpp::TypedDataView& data) = 0;
21+
};
22+
23+
// A topic we are interested in
24+
class VehicleStatus : public TopicSubscription {
25+
public:
26+
VehicleStatus(const std::shared_ptr<ulog_cpp::Subscription>& subscription)
27+
{
28+
_timestamp_field = subscription->field("timestamp");
29+
_nav_state_field = subscription->field("nav_state");
30+
// Optional field (e.g. when a message changes)
31+
if (subscription->fieldMap().find("armed_state") != subscription->fieldMap().end()) {
32+
_armed_state_field = subscription->field("armed_state");
33+
}
34+
}
35+
36+
void handleData(const ulog_cpp::TypedDataView& data) override
37+
{
38+
const auto timestamp = data[*_timestamp_field].as<std::uint64_t>();
39+
const auto nav_state = data[*_nav_state_field].as<std::uint32_t>();
40+
uint8_t armed_state = 0;
41+
if (_armed_state_field) {
42+
armed_state = data[*_nav_state_field].as<std::uint8_t>();
43+
}
44+
printf("vehicle_status: t: %" PRId64 ": nav_state: %" PRIu32 ", armed_state: %" PRId8 "\n", timestamp, nav_state, armed_state);
45+
}
46+
47+
private:
48+
std::shared_ptr<ulog_cpp::Field> _timestamp_field;
49+
std::shared_ptr<ulog_cpp::Field> _nav_state_field;
50+
std::shared_ptr<ulog_cpp::Field> _armed_state_field;
51+
};
52+
53+
class ULogDataHandler : public ulog_cpp::DataContainer {
54+
public:
55+
ULogDataHandler()
56+
: ulog_cpp::DataContainer(ulog_cpp::DataContainer::StorageConfig::Header)
57+
{
58+
}
59+
60+
void error(const std::string& msg, bool is_recoverable) override
61+
{
62+
printf("Parsing error: %s\n", msg.c_str());
63+
}
64+
65+
void headerComplete() override
66+
{
67+
ulog_cpp::DataContainer::headerComplete();
68+
}
69+
70+
void messageInfo(const ulog_cpp::MessageInfo& message_info) override
71+
{
72+
DataContainer::messageInfo(message_info);
73+
if (message_info.isMulti()) {
74+
// Multi messages might be continued, but we only know with the next message, so we keep it
75+
// stored and append if needed. We assume that continued multi messages are not interleaved
76+
// with other messages.
77+
if (message_info.isContinued()) {
78+
if (_current_multi_message.field().name() == message_info.field().name()) {
79+
// Append to previous
80+
_current_multi_message.valueRaw().insert(_current_multi_message.valueRaw().end(),
81+
message_info.valueRaw().begin(),
82+
message_info.valueRaw().end());
83+
}
84+
} else {
85+
finishCurrentMultiMessage();
86+
_current_multi_message = message_info;
87+
}
88+
} else {
89+
finishCurrentMultiMessage();
90+
messageInfoComplete(message_info);
91+
}
92+
}
93+
void parameter(const ulog_cpp::Parameter& parameter) override
94+
{
95+
finishCurrentMultiMessage();
96+
DataContainer::parameter(parameter);
97+
}
98+
void addLoggedMessage(const ulog_cpp::AddLoggedMessage& add_logged_message) override
99+
{
100+
finishCurrentMultiMessage();
101+
DataContainer::addLoggedMessage(add_logged_message);
102+
if (_subscriptions_by_message_id.find(add_logged_message.msgId()) !=
103+
_subscriptions_by_message_id.end()) {
104+
throw ulog_cpp::ParsingException("Duplicate AddLoggedMessage message ID");
105+
}
106+
107+
auto format_iter = messageFormats().find(add_logged_message.messageName());
108+
if (format_iter == messageFormats().end()) {
109+
throw ulog_cpp::ParsingException("AddLoggedMessage message format not found");
110+
}
111+
112+
auto ulog_subscription = std::make_shared<ulog_cpp::Subscription>(
113+
add_logged_message, std::vector<ulog_cpp::Data>{}, format_iter->second);
114+
115+
if (add_logged_message.messageName() == "vehicle_status" && add_logged_message.multiId() == 0) {
116+
auto subscription = std::make_shared<VehicleStatus>(ulog_subscription);
117+
_subscriptions_by_message_id.insert(
118+
{add_logged_message.msgId(), SubscriptionData{ulog_subscription, subscription}});
119+
}
120+
}
121+
void logging(const ulog_cpp::Logging& logging) override
122+
{
123+
finishCurrentMultiMessage();
124+
DataContainer::logging(logging);
125+
}
126+
void data(const ulog_cpp::Data& data) override
127+
{
128+
finishCurrentMultiMessage();
129+
const auto iter = _subscriptions_by_message_id.find(data.msgId());
130+
if (iter != _subscriptions_by_message_id.end()) {
131+
const ulog_cpp::TypedDataView data_view(data, *iter->second.ulog_subscription->format());
132+
iter->second.subscription->handleData(data_view);
133+
}
134+
}
135+
136+
private:
137+
struct SubscriptionData {
138+
std::shared_ptr<ulog_cpp::Subscription> ulog_subscription;
139+
std::shared_ptr<TopicSubscription> subscription;
140+
};
141+
142+
void finishCurrentMultiMessage()
143+
{
144+
if (!_current_multi_message.field().name().empty()) {
145+
messageInfoComplete(_current_multi_message);
146+
_current_multi_message.field() = {};
147+
}
148+
}
149+
void messageInfoComplete(const ulog_cpp::MessageInfo& message_info)
150+
{
151+
if (message_info.field().definitionResolved()) {
152+
printf("Info message: %s\n", message_info.field().name().c_str());
153+
}
154+
}
155+
156+
std::map<uint16_t, SubscriptionData> _subscriptions_by_message_id;
157+
ulog_cpp::MessageInfo _current_multi_message{"", ""}; ///< Keep this stored for continued messages
158+
};
159+
160+
int main(int argc, char** argv)
161+
{
162+
if (argc < 2) {
163+
printf("Usage: %s <file.ulg>\n", argv[0]);
164+
return -1;
165+
}
166+
FILE* file = fopen(argv[1], "rb");
167+
if (!file) {
168+
printf("opening file failed\n");
169+
return -1;
170+
}
171+
uint8_t buffer[4096];
172+
int bytes_read;
173+
const auto data_container = std::make_shared<ULogDataHandler>();
174+
ulog_cpp::Reader reader{data_container};
175+
while ((bytes_read = fread(buffer, 1, sizeof(buffer), file)) > 0) {
176+
try {
177+
reader.readChunk(buffer, bytes_read);
178+
} catch (const ulog_cpp::ExceptionBase& exception) {
179+
printf("Failed to parse ulog file: %s\n", exception.what());
180+
return -1;
181+
}
182+
if (data_container->hadFatalError()) {
183+
printf("Failed to parse ulog file\n");
184+
return -1;
185+
}
186+
}
187+
fclose(file);
188+
189+
return 0;
190+
}

examples/ulog_writer.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#include <thread>
99
#include <ulog_cpp/simple_writer.hpp>
1010

11+
// Example of how to create an ULog file with timeseries, printf messages and parameters
12+
1113
using namespace std::chrono_literals;
1214

1315
namespace {

0 commit comments

Comments
 (0)