Skip to content

Commit b634f26

Browse files
committed
Fix issue with ESP-NOW, improve logging and subscription recovery
1 parent 1a2af22 commit b634f26

10 files changed

Lines changed: 64 additions & 23 deletions

File tree

components/robusto/base/src/logging/logging_robusto.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ void rob_log_write(rob_log_level_t level,
4848
va_end(list);
4949
}
5050
#endif
51+
52+
// TODO: Add a Mac address log function
53+
5154
// TODO: Move rob_log_bit_mesh into a macro instead so we don't need this.
5255
void rob_log_bit_mesh(rob_log_level_t level,
5356
const char *tag,

components/robusto/include/robusto_pubsub_client.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ typedef enum
2121
TOPIC_STATE_INACTIVE, /* The topic in inactive */
2222
TOPIC_STATE_ACTIVE, /* The topic is inactive */
2323
TOPIC_STATE_PROBLEM, /* The topic has some problems */
24+
TOPIC_STATE_RECOVERING, /* We have recently published data */
2425
TOPIC_STATE_UNKNOWN, /* The topic is unknown by the server */
2526
TOPIC_STATE_PUBLISHED, /* We have recently published data */
2627
TOPIC_STATE_STALE, /* Haven't gotten or published data in stale_time_ms */
@@ -46,8 +47,6 @@ struct subscribed_topic
4647
uint32_t stale_time_ms;
4748
/* Calculated hash used for referencing the topic */
4849
uint32_t topic_hash;
49-
/* Is being restored*/
50-
bool restoring;
5150
/* The peer */
5251
robusto_peer_t *peer;
5352
/* A conversation ID for communication */

components/robusto/misc/src/pubsub/robusto_pubsub_client.c

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ void incoming_callback(robusto_message_t *message)
154154
ROB_LOGW(pubsub_client_log_prefix, "Server told us that the topic %s (topic hash %lu) is unknown,", curr_topic->topic_name, curr_topic->topic_hash);
155155
set_topic_state(curr_topic, TOPIC_STATE_UNKNOWN);
156156
} else {
157-
ROB_LOGW(pubsub_client_log_prefix, "Server told us that the %lu topic hash is unknown, it is to us too, newly removed?", *(uint32_t *)(message->binary_data + 1));
157+
ROB_LOGW(pubsub_client_log_prefix, "Server told us that the %lu topic hash is unknown, it is to us too, newly removed?", curr_topic->topic_hash);
158158
}
159159

160160
} else
@@ -255,7 +255,6 @@ subscribed_topic_t *_add_topic_and_conv(robusto_peer_t *peer, char *topic_name,
255255
new_topic->next = NULL;
256256
new_topic->peer = peer;
257257
new_topic->topic_hash = 0;
258-
new_topic->restoring = false;
259258
new_topic->callback = callback;
260259
new_topic->display_offset = display_offset;
261260
new_topic->state = TOPIC_STATE_UNSET;
@@ -302,7 +301,7 @@ subscribed_topic_t *robusto_pubsub_client_get_topic(robusto_peer_t *peer, char *
302301
msg[0] = PUBSUB_GET_TOPIC;
303302
}
304303

305-
ROB_LOGE(pubsub_client_log_prefix, "Sending subscription for %s conversation_id %u!", topic_name, new_topic->conversation_id);
304+
ROB_LOGE(pubsub_client_log_prefix, "Sending subscription for %s conversation_id %u, hash: %lu", topic_name, new_topic->conversation_id, new_topic->topic_hash);
306305
rob_ret_val_t ret_sub = send_message_binary(peer, PUBSUB_SERVER_ID, new_topic->conversation_id, (uint8_t *)msg, data_length, NULL);
307306
if (ret_sub != ROB_OK) {
308307
ROB_LOGE(pubsub_client_log_prefix, "Pub Sub client: Subscription failed, failed to queue or send message %s.", new_topic->topic_name);
@@ -327,18 +326,28 @@ void recover_topic(subscribed_topic_t *topic)
327326
if (topic->state == TOPIC_STATE_PROBLEM && !robusto_waitfor_byte_change(&topic->state, 1000) != ROB_OK)
328327
{
329328
ROB_LOGE(pubsub_client_log_prefix, "Failed to recover %s using the %s peer", topic->topic_name, topic->peer->name);
329+
topic->state = TOPIC_STATE_PROBLEM;
330+
r_delay(5000);
330331
}
331332
else
332333
{
333334
// A successful incoming call will set the state to inactive or active elsewhere
334335
ROB_LOGI(pubsub_client_log_prefix, "Recovered %s using the %s peer!", topic->topic_name, topic->peer->name);
336+
topic->state = TOPIC_STATE_ACTIVE;
335337
}
336-
topic->restoring = false;
338+
337339
robusto_delete_current_task();
338340
}
339341

340342
void create_topic_recovery_task(subscribed_topic_t *topic)
341343
{
344+
topic->state = TOPIC_STATE_RECOVERING;
345+
346+
if (topic->peer->state < PEER_KNOWN_INSECURE){
347+
ROB_LOGW(pubsub_client_log_prefix, "Not creating a topic recovery task for %s topic, peer %s because the peer is not working properly.",
348+
topic->topic_name, topic->peer->name);
349+
return;
350+
}
342351
ROB_LOGW(pubsub_client_log_prefix, "Creating a topic recovery task for %s topic, peer %s",
343352
topic->topic_name, topic->peer->name);
344353
char *task_name;
@@ -355,7 +364,7 @@ void robusto_pubsub_check_topics()
355364
subscribed_topic_t *curr_topic = first_subscribed_topic;
356365
while (curr_topic)
357366
{
358-
if (curr_topic->restoring)
367+
if (curr_topic->state == TOPIC_STATE_RECOVERING)
359368
{
360369
// Do nothing regardless of state to not disturb any existing recovery processes
361370
}
@@ -365,7 +374,7 @@ void robusto_pubsub_check_topics()
365374
if (curr_topic->peer->problematic_media_types != curr_topic->peer->supported_media_types)
366375
{
367376
create_topic_recovery_task(curr_topic);
368-
r_delay(1000);
377+
r_delay(5000);
369378
}
370379
}
371380

components/robusto/misc/src/pubsub/robusto_pubsub_server.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ uint32_t robusto_pubsub_add_subscriber(pubsub_server_topic_t *topic, robusto_pee
9797
pubsub_server_subscriber_t * existing = find_subscription_by_subscriber_hash(topic, peer);
9898
// TODO: Handle cases where adding with only callback, they might be double
9999
if (existing) {
100-
ROB_LOGW(pubsub_log_prefix, "%s asked to subscribe to %s, but is already a subscriber, returning hash.", peer->name, topic->name);
100+
ROB_LOGW(pubsub_log_prefix, "%s asked to subscribe to %s, but is already a subscriber, returning hash: %lu.", peer->name, topic->name, topic->hash);
101101
return topic->hash;
102102
}
103103

components/robusto/network/src/incoming/incoming.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ static incoming_callback_cb *incoming_callback = NULL;
4848
rob_ret_val_t robusto_handle_incoming(uint8_t *data, uint32_t data_length, robusto_peer_t *peer, e_media_type media_type, int offset)
4949
{
5050
// TODO: Messages might be too large for SRAM and might need to be stored elsewhere (flash), how to handle? We might also have to *send* large files.
51+
// TODO: As this might be done inside receive callback, it should possibly spawn a task instead
5152
// rob_log_bit_mesh(ROB_LOG_WARN, incoming_log_prefix, data + offset, data_length - offset);
5253
ROB_LOGD(incoming_log_prefix, "In robusto_handle_incoming.");
5354
robusto_message_t *message;

components/robusto/network/src/media/espnow/espnow_control.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ static char *espnow_log_prefix;
5050

5151
void init_wifi()
5252
{
53-
53+
ROB_LOGI(espnow_log_prefix, "Creating default event loop.");
54+
ESP_ERROR_CHECK(esp_event_loop_create_default());
5455
ROB_LOGI(espnow_log_prefix, "Initializing wifi (for ESP-NOW)");
5556
ESP_ERROR_CHECK(esp_netif_init());
5657
ROB_LOGI(espnow_log_prefix, "esp_netif_init done.");

components/robusto/network/src/media/espnow/espnow_messaging.c

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,12 @@
5252

5353
static char *espnow_log_prefix;
5454

55-
#define ESPNOW_MAXDELAY 512
5655
#define ESPNOW_FRAGMENT_SIZE (ESP_NOW_MAX_DATA_LEN - 10)
5756

5857
static void espnow_deinit(espnow_send_param_t *send_param);
5958

6059
static bool has_receipt = false;
61-
static esp_now_send_status_t send_status = ESP_NOW_SEND_FAIL;
60+
static int send_status = -1;
6261

6362
// For synchronously reading async data
6463
static uint8_t *synchro_data = NULL;
@@ -69,9 +68,12 @@ rob_ret_val_t esp_now_send_check(robusto_peer_t *peer, uint8_t *data, uint32_t d
6968
{
7069

7170
// TODO: Recommend more wifi TX buffers and add warning if not enabled
72-
has_receipt = false;
71+
7372
ROB_LOGD(espnow_log_prefix, "esp_now_send_check, sending %lu bytes.", data_length);
7473
int rc = esp_now_send(&peer->base_mac_address, data, data_length);
74+
send_status = -1;
75+
has_receipt = false;
76+
7577
if (rc != ESP_OK)
7678
{
7779
ROB_LOGE(espnow_log_prefix, "Mac address:");
@@ -90,7 +92,8 @@ rob_ret_val_t esp_now_send_check(robusto_peer_t *peer, uint8_t *data, uint32_t d
9092
}
9193
else if (rc == ESP_ERR_ESPNOW_NO_MEM)
9294
{
93-
ROB_LOGE(espnow_log_prefix, "ESP-NOW error: ESP_ERR_ESPNOW_NO_MEM");
95+
ROB_LOGE(espnow_log_prefix, "ESP-NOW error: ESP_ERR_ESPNOW_NO_MEM - Will delay a short while to let it free its memory.");
96+
r_delay(3000);
9497
}
9598
else if (rc == ESP_ERR_ESPNOW_FULL)
9699
{
@@ -121,15 +124,29 @@ rob_ret_val_t esp_now_send_check(robusto_peer_t *peer, uint8_t *data, uint32_t d
121124
rc = ROB_OK;
122125
add_to_history(&peer->espnow_info, true, rc);
123126
}
127+
// We will wait here until the message was sent, for 2 seconds
128+
int32_t start_send = r_millis();
129+
while ((send_status < 0) && (r_millis() < start_send + 2000))
130+
{
131+
robusto_yield();
132+
}
124133

134+
if (send_status == ESP_NOW_SEND_FAIL) {
135+
ROB_LOGE(espnow_log_prefix, "ESP-NOW transmission failed completing after %lu ms. Peer: %s", r_millis() - start_send, peer->name);
136+
return ROB_FAIL;
137+
}
138+
if (send_status < 0) {
139+
ROB_LOGE(espnow_log_prefix, "ESP-NOW transmission didn't complete in time (2000 ms). Peer: %s", peer->name);
140+
return ROB_FAIL;
141+
}
125142
if (!receipt)
126143
{
127144
return rc;
128145
}
129146

130-
// We want to wait to make sure the transmission is done.
147+
// We want to wait to make sure the transmission is received successfully.
131148
int32_t start = r_millis();
132-
while ((!has_receipt) && (r_millis() < start + 2000))
149+
while ((!has_receipt) && (r_millis() < start + 500))
133150
{
134151
robusto_yield();
135152
}
@@ -281,13 +298,14 @@ static void espnow_recv_cb(const esp_now_recv_info_t *esp_now_info, const uint8_
281298
has_receipt = true;
282299
return;
283300
}
284-
285301
// Send a receipt
286302
uint8_t response[2];
287303
response[0] = 0xff;
288304
response[1] = 0x00;
289-
290-
esp_now_send_check(peer, &response, 2, false);
305+
// NOTE: We are calling esp_now_send directly here as we are calling it inside of the receive callback
306+
if (esp_now_send(&peer->base_mac_address, &response, 2) != ROB_OK) {
307+
ROB_LOGE(espnow_log_prefix, ">> espnow_recv_cb failed to send a receipt to %s.", peer->name);
308+
}
291309
if (!handled)
292310
{
293311
// Copy data a ESP-NOW frees it

components/robusto/network/src/message/robusto_message_sending.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,16 +308,19 @@ void send_work_item(media_queue_item_t * queue_item, robusto_media_t *info, e_me
308308

309309
add_to_history(info, true, retval);
310310
} else {
311-
ROB_LOGI(message_sending_log_prefix, ">> As the %s, mt %i is recovering, we need to try some other media for a message (%i, %i)", queue_item->peer->name, media_type, info->state, queue_item->queue_item_type );
311+
312+
ROB_LOGI(message_sending_log_prefix, ">> As the %s, mt %i is recovering, we might need to try some other media for a message (%i, %i)", queue_item->peer->name, media_type, info->state, queue_item->queue_item_type );
312313
}
313314

314315
if ((retval != ROB_OK) && // We only try other medias if we have failed..
315316
(queue_item->receipt) && // ..and if it is receipt required, then we infer that we will try with multiple medias
316-
(retval != ROB_ERR_WHO)) // ..or if the peer knew who we were (it will only respond to unknowns on wired connections)
317+
(retval != ROB_ERR_WHO) && // ..or if the peer knew who we were (it will only respond to unknowns on wired connections)
318+
(get_host_supported_media_types() != (queue_item->exclude_media | media_type))) // And that there are no other medias to try
317319
{
318320
e_media_type next_media_type;
319321
// Add current media type to excluded media
320322
queue_item->exclude_media = queue_item->exclude_media | media_type;
323+
321324
ROB_LOGW(message_sending_log_prefix, "Failed sending using %s, will try some other media.", media_type_to_str(media_type));
322325
// Check suitability again to find another media to try
323326
rob_ret_val_t suitability_res = set_suitable_media(queue_item->peer, queue_item->data_length, queue_item->exclude_media, &next_media_type);

components/robusto/network/src/qos/qos_recovery.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ void task_recover(recover_params_t *params)
7878
set_state(peer, params->info, media_type, media_state_working, media_problem_none);
7979
rec_retval = ROB_OK;
8080
} else {
81+
set_state(peer, params->info, media_type, media_state_problem, media_problem_unknown);
8182
ROB_LOGW(recovery_log_prefix, ">> Sending presentation failed");
83+
rec_retval = ROB_FAIL;
8284
}
8385
}
8486
params->info->postpone_qos = false;
@@ -88,7 +90,7 @@ void task_recover(recover_params_t *params)
8890
send_heartbeat_message(peer, media_type);
8991
}
9092
}
91-
93+
9294
robusto_delete_current_task();
9395
}
9496

components/robusto/network/src/qos/qos_state.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,14 @@ void working_state(robusto_peer_t *peer, robusto_media_t *info, uint64_t last_he
133133
// Problem state behavior
134134
void problem_state(robusto_peer_t *peer, robusto_media_t *info, uint64_t last_heartbeat_time, e_media_type media_type)
135135
{
136-
// Has it been having a problem more than one cycle, mark it as as recovering.
136+
137+
if (info->state == media_state_recovering) {
138+
// If is in recovery, do nothing.
139+
return;
140+
}
137141
if (info->last_state_change < last_heartbeat_time - 1000)
138142
{
143+
// Has it been having a problem more than one cycle, mark it as as recovering and initiate recovery
139144
ROB_LOGE(qos_state_log_prefix, "%s, media type %s has had a problem for too long, going into recovery.", peer->name, media_type_to_str(media_type));
140145
set_state(peer, info, media_type, media_state_recovering, info->problem);
141146
recover_media(peer, info, last_heartbeat_time, media_type);

0 commit comments

Comments
 (0)