-
Notifications
You must be signed in to change notification settings - Fork 164
Add Sparkplug B Industrial IoT Example #461
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
New example demonstrating the Sparkplug B industrial IoT protocol:
- Edge Node client publishes sensor data and responds to commands
- Host Application client subscribes to data and sends commands
- Implements Sparkplug topic namespace (spBv1.0/{group}/{type}/{node}[/{device}])
- Demonstrates NBIRTH, NDEATH, DDATA, and DCMD message types
- Includes simplified payload encoding (not full protobuf)
- Supports both single-threaded and multi-threaded builds
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
README covers: - Protocol overview and architecture - Message types and simulated metrics - Build instructions (Autotools and CMake) - Command-line options and example output - Configuration and payload format notes Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Link to examples/sparkplug/README.md for detailed documentation. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Use WOLF_CRYPT_TYPES_H include guard to skip the word64 typedef when wolfSSL has already defined it, matching the pattern used in wolfmqtt/mqtt_types.h. Fixes build error on macOS where wolfSSL defines word64 as unsigned long vs uint64_t (unsigned long long). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds a new Sparkplug B Industrial IoT example to wolfMQTT, demonstrating Sparkplug-style topic namespaces and message flows using two MQTT clients (Edge Node + Host Application), plus build-system and documentation integration.
Changes:
- Added new Sparkplug example implementation (
examples/sparkplug/sparkplug.c/.h) with simplified (non-protobuf) payload encoding/decoding. - Integrated the new example into Autotools and CMake builds, and updated
.gitignore. - Added user documentation for the example and linked it from the project README.
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| examples/sparkplug/sparkplug.h | Defines Sparkplug message/topic helpers and simplified payload encode/decode utilities. |
| examples/sparkplug/sparkplug.c | Implements the Edge Node + Host example clients, message callback handling, and publish/subscribe loops. |
| examples/sparkplug/README.md | Documents Sparkplug concepts, build/run instructions, and example output. |
| examples/include.am | Adds the new example to Autotools build targets and distribution lists. |
| README.md | Adds a top-level section pointing users to the Sparkplug example. |
| CMakeLists.txt | Registers the Sparkplug example target under WOLFMQTT_EXAMPLES. |
| .gitignore | Ignores the built Sparkplug example binary. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| static INLINE int SparkplugTopic_Build(char* buf, int buf_len, | ||
| const char* group_id, SparkplugMsgType msg_type, | ||
| const char* edge_node_id, const char* device_id) | ||
| { | ||
| int len; | ||
| const char* type_str = SparkplugMsgType_ToString(msg_type); | ||
|
|
||
| if (device_id != NULL) { | ||
| len = XSNPRINTF(buf, buf_len, "%s/%s/%s/%s/%s", | ||
| SPARKPLUG_NAMESPACE, group_id, type_str, edge_node_id, device_id); | ||
| } | ||
| else { | ||
| len = XSNPRINTF(buf, buf_len, "%s/%s/%s/%s", | ||
| SPARKPLUG_NAMESPACE, group_id, type_str, edge_node_id); | ||
| } | ||
|
|
||
| return len; | ||
| } |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SparkplugTopic_Build returns the XSNPRINTF result, but callers don’t check for truncation (return >= buf_len) or errors (negative). If the topic is truncated, publishes/subscribes can silently target the wrong topic. Consider returning an error code on truncation and checking the return value at call sites.
| /* Encode each metric */ | ||
| for (i = 0; i < payload->metric_count; i++) { | ||
| const SparkplugMetric* m = &payload->metrics[i]; | ||
|
|
||
| /* Name length and name */ | ||
| name_len = (word16)XSTRLEN(m->name); | ||
| if (pos + 2 + name_len + 17 > buf_len) { | ||
| return MQTT_CODE_ERROR_OUT_OF_BUFFER; | ||
| } |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SparkplugPayload_Encode trusts payload->metric_count and iterates that many metrics without bounding it to SPARKPLUG_MAX_METRICS. If a caller sets a larger count, this will read past the metrics array. Clamp/validate metric_count up-front and return BAD_ARG when out of range.
| /* Decode each metric */ | ||
| for (i = 0; i < payload->metric_count && pos < buf_len; i++) { | ||
| SparkplugMetric* m = &payload->metrics[i]; | ||
|
|
||
| /* Name length and name */ | ||
| if (pos + 2 > buf_len) break; | ||
| name_len = ((word16)buf[pos] << 8) | buf[pos+1]; | ||
| pos += 2; | ||
| if (pos + name_len > buf_len || name_len >= sizeof(name_bufs[0])) break; | ||
| XMEMCPY(name_bufs[i], &buf[pos], name_len); |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SparkplugPayload_Decode can break out of the metric loop on malformed/truncated input, but it does not update payload->metric_count to the number of successfully decoded metrics. Callers then iterate payload.metric_count and may dereference NULL m->name / uninitialized metrics, leading to crashes (e.g., XSTRCMP(m->name, ...)). Update metric_count to i on early exit and consider returning MQTT_CODE_ERROR_MALFORMED_DATA when decoding fails.
| MQTTCtx* mqttCtx = &spCtx->mqttCtx; | ||
| static MqttTopic topics[1]; | ||
|
|
||
| XMEMSET(&mqttCtx->subscribe, 0, sizeof(mqttCtx->subscribe)); | ||
| mqttCtx->subscribe.packet_id = mqtt_get_packetid(); | ||
| mqttCtx->subscribe.topic_count = 1; | ||
| topics[0].topic_filter = topic_filter; | ||
| topics[0].qos = mqttCtx->qos; | ||
| mqttCtx->subscribe.topics = topics; | ||
|
|
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sparkplug_subscribe uses a static MqttTopic topics[1], which is shared across threads. In multi-thread mode the edge and host threads can call subscribe concurrently, causing races/corruption of the subscribe request. Use a stack MqttTopic (or mqttCtx->topics like other examples) instead of a static shared array.
| if (msg_new) { | ||
| /* Parse topic */ | ||
| char topic_str[SPARKPLUG_TOPIC_MAX_LEN]; | ||
| int topic_len = msg->topic_name_len; | ||
| if (topic_len >= (int)sizeof(topic_str)) { | ||
| topic_len = sizeof(topic_str) - 1; | ||
| } | ||
| XMEMCPY(topic_str, msg->topic_name, topic_len); | ||
| topic_str[topic_len] = '\0'; | ||
|
|
||
| rc = SparkplugTopic_Parse(topic_str, group_id, sizeof(group_id), | ||
| &msg_type, node_id, sizeof(node_id), | ||
| device_id, sizeof(device_id)); | ||
|
|
||
| if (rc == MQTT_CODE_SUCCESS) { | ||
| PRINTF("Sparkplug [%s]: Received %s from %s/%s%s%s", | ||
| spCtx->mqttCtx.client_id, | ||
| SparkplugMsgType_ToString(msg_type), | ||
| group_id, node_id, | ||
| device_id[0] ? "/" : "", device_id); | ||
|
|
||
| /* Decode payload if complete */ | ||
| if (msg_done && msg->buffer_len > 0) { | ||
| rc = SparkplugPayload_Decode(msg->buffer, msg->buffer_len, &payload); | ||
| if (rc > 0) { | ||
| #ifdef SPARKPLUG_DEBUG |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mqtt_message_cb only processes messages inside if (msg_new) { ... }, but wolfMQTT can invoke the callback multiple times per PUBLISH (msg_new=1 first chunk, then msg_new=0 for subsequent chunks, with msg_done=1 on the final chunk). With the current structure, multi-chunk payloads will never be decoded/handled because the msg_done path is skipped when msg_new==0. Consider caching the parsed topic/msg_type on msg_new and performing decode/handling when msg_done is true (even if msg_new is false), or accumulate payload until complete.
| /* Cleanup */ | ||
| MqttClientNet_DeInit(&mqttCtx->net); | ||
|
|
||
| if (mqttCtx->tx_buf) { | ||
| WOLFMQTT_FREE(mqttCtx->tx_buf); | ||
| mqttCtx->tx_buf = NULL; | ||
| } | ||
| if (mqttCtx->rx_buf) { | ||
| WOLFMQTT_FREE(mqttCtx->rx_buf); | ||
| mqttCtx->rx_buf = NULL; | ||
| } | ||
|
|
||
| PRINTF("Sparkplug: Disconnected %s", mqttCtx->client_id); | ||
| return rc; |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sparkplug_disconnect frees buffers and deinitializes the network, but it never calls MqttClient_DeInit(&mqttCtx->client). In multi-thread builds MqttClient_Init allocates/initializes semaphores inside the client, so skipping MqttClient_DeInit can leak OS resources. Call MqttClient_DeInit during disconnect (and on connect/init error paths).
| | Type | Description | | ||
| |------|-------------| | ||
| | NBIRTH | Node Birth Certificate - Edge node announces itself | | ||
| | NDEATH | Node Death Certificate - Edge node goes offline (LWT) | | ||
| | DBIRTH | Device Birth Certificate - Device announces itself | | ||
| | DDEATH | Device Death Certificate - Device goes offline | | ||
| | NDATA | Node Data - Metrics from the edge node | | ||
| | DDATA | Device Data - Metrics from a device | | ||
| | NCMD | Node Command - Command to the edge node | | ||
| | DCMD | Device Command - Command to a specific device | | ||
| | STATE | Host Application state | | ||
|
|
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The markdown tables in this README use || instead of standard table | delimiters (e.g., the “Message Types” table). As written, these won’t render as tables on GitHub. Convert them to proper GitHub-flavored markdown tables (single | with a header separator row).
| dist_example_DATA+= examples/mqttnet.c \ | ||
| examples/mqttexample.c \ | ||
| examples/mqttport.c \ | ||
| examples/mqttclient/mqttclient.c \ | ||
| examples/mqttsimple/mqttsimple.c \ | ||
| examples/firmware/fwpush.c \ | ||
| examples/firmware/fwclient.c \ | ||
| examples/azure/azureiothub.c \ | ||
| examples/aws/awsiot.c \ | ||
| examples/wiot/wiot.c | ||
| dist_example_DATA+= examples/nbclient/nbclient.c | ||
| dist_example_DATA+= examples/multithread/multithread.c | ||
| if BUILD_SN | ||
| dist_example_DATA+= examples/sn-client/sn-client.c | ||
| dist_example_DATA+= examples/sn-client/sn-client_qos-1.c | ||
| dist_example_DATA+= examples/sn-client/sn-multithread.c | ||
| endif | ||
| dist_example_DATA+= examples/pub-sub/mqtt-pub.c | ||
| dist_example_DATA+= examples/pub-sub/mqtt-sub.c | ||
| dist_example_DATA+= examples/sparkplug/sparkplug.c | ||
| if BUILD_WEBSOCKET |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
examples/include.am adds the Sparkplug example source to dist_example_DATA, but does not include the new examples/sparkplug/README.md (and possibly sparkplug.h) anywhere for make dist packaging. If the project distributes example docs/headers, add these files to the appropriate dist_*/EXTRA_DIST lists so they’re included in release tarballs.
| /* Initialize outputs */ | ||
| if (group_id) group_id[0] = '\0'; | ||
| if (edge_node_id) edge_node_id[0] = '\0'; | ||
| if (device_id) device_id[0] = '\0'; | ||
|
|
||
| /* Parse topic: spBv1.0/group/type/node[/device] */ | ||
| matched = XSSCANF(topic, "%15[^/]/%63[^/]/%15[^/]/%63[^/]/%63s", | ||
| namespace_buf, group_id, type_buf, edge_node_id, device_id); | ||
|
|
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SparkplugTopic_Parse initializes outputs only when the pointers are non-NULL, but then unconditionally passes group_id, edge_node_id, and device_id to XSSCANF. This will crash if any of those pointers are NULL, despite the signature implying they’re optional. Make the parameters required (and assert/return BAD_ARG when NULL) or scan into local temp buffers and copy out conditionally.
| /* Parse topic: spBv1.0/group/type/node[/device] */ | ||
| matched = XSSCANF(topic, "%15[^/]/%63[^/]/%15[^/]/%63[^/]/%63s", | ||
| namespace_buf, group_id, type_buf, edge_node_id, device_id); | ||
|
|
||
| if (matched < 4) { | ||
| return MQTT_CODE_ERROR_BAD_ARG; | ||
| } | ||
|
|
||
| /* Verify namespace */ | ||
| if (XSTRCMP(namespace_buf, SPARKPLUG_NAMESPACE) != 0) { | ||
| return MQTT_CODE_ERROR_BAD_ARG; | ||
| } | ||
|
|
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SparkplugTopic_Parse ignores the *_len parameters and uses hard-coded %63 field widths when scanning into group_id/edge_node_id/device_id. If a caller passes smaller buffers, this can overflow. Use widths derived from the provided lengths (len-1) and/or parse manually using delimiters.
Add Sparkplug B Industrial IoT Example
Summary
This PR adds a new example demonstrating the https://sparkplug.eclipse.org/
industrial IoT protocol specification using wolfMQTT. The example creates two
MQTT clients that communicate using the Sparkplug topic namespace and message
types.
Changes
(CMakeLists.txt)
Features
Two Communicating Clients:
--enable-mt)
Sparkplug Protocol Support:
spBv1.0/{group_id}/{message_type}/{edge_node_id}[/{device_id}]
protobuf dependency)
Simulated Metrics:
Build & Test
Autotools - single-threaded (Edge Node only)
./configure --disable-tls
make
Autotools - multi-threaded (both clients)
./configure --enable-mt --disable-tls
make
CMake - multi-threaded
mkdir build && cd build
cmake -DWOLFMQTT_TLS=no -DWOLFMQTT_MT=yes ..
make sparkplug
Run
./examples/sparkplug/sparkplug -h test.mosquitto.org -p 1883
Example Output
Sparkplug B Example
Starting Edge Node and Host Application threads...
Sparkplug: Connected! (client_id=WolfMQTT_Sparkplug_Edge)
Sparkplug: Published NBIRTH to spBv1.0/WolfMQTT/NBIRTH/EdgeNode1
Sparkplug [WolfMQTT_Sparkplug_Host]: Received NBIRTH from WolfMQTT/EdgeNode1
-> Edge Node came online (bdSeq=0)
Sparkplug: Published DDATA to spBv1.0/WolfMQTT/DDATA/EdgeNode1/Device1
Sparkplug [WolfMQTT_Sparkplug_Host]: Received DDATA from
WolfMQTT/EdgeNode1/Device1
-> Device data received:
Temperature = 22.83
Humidity = 45.36
LED = OFF
Sparkplug [Host]: Sending DCMD to spBv1.0/WolfMQTT/DCMD/EdgeNode1/Device1
(LED=ON)
Sparkplug [WolfMQTT_Sparkplug_Edge]: Received DCMD from
WolfMQTT/EdgeNode1/Device1
-> Command received:
LED set to ON
Sparkplug example completed!
Test Plan