Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
0eec736
mqtt: renaming
viacheslauK Jan 14, 2026
2260aeb
mqtt: componentStatus error for PublisherFB if connection lost
viacheslauK Jan 14, 2026
55dae62
mqtt: warning status for a publisher if there are no connected input …
viacheslauK Jan 14, 2026
3d599bd
mqtt: warning for a publisher FB if the topic property is empty
viacheslauK Jan 14, 2026
da6eff2
mqtt: removing SignalName property from a Publisher FB
viacheslauK Jan 14, 2026
835c248
mqtt: merging two FB - Raw and JSON to MQTTSubscriberFb
viacheslauK Jan 15, 2026
bb3b570
mqtt: Topics property for publisherFB
viacheslauK Jan 15, 2026
8d90647
mqtt: renaming; removing unused code
viacheslauK Jan 15, 2026
821c758
mqtt: SignalValueJSONKey property for publisher FB
viacheslauK Jan 15, 2026
19dc87d
mqtt: EnablePreviewSignal for subscriber FB
viacheslauK Jan 15, 2026
71cf5c3
mqtt: EnablePreviewSignal for publisher FB
viacheslauK Jan 16, 2026
3b67aa3
mqtt: Schema property for publisher FB
viacheslauK Jan 16, 2026
828314e
mqtt: renaming, README updating
viacheslauK Jan 19, 2026
0172d86
mqtt: status helper - check if status has been set
viacheslauK Jan 19, 2026
acfb7de
mqtt: MqttPublisherFbImpl::readProperties fix
viacheslauK Jan 19, 2026
024aed4
mqtt: fix for subscriber component status
viacheslauK Jan 19, 2026
d7a884f
mqtt: memory leak fix
viacheslauK Jan 20, 2026
2be52e5
mqtt: publisher test
viacheslauK Jan 20, 2026
5d9f6b3
mqtt: SharedTs property has been removed from publisher; removing of …
viacheslauK Jan 21, 2026
445dd39
mqtt: removed() for publisher FB
viacheslauK Jan 21, 2026
3b7ce49
mqtt: parent for all publisher handlers; propper addInputPots for mul…
viacheslauK Jan 22, 2026
627a484
mqtt: new checks for publisher FB signal validation
viacheslauK Jan 23, 2026
e784024
mqtt: publisher FB statuses improvement
viacheslauK Jan 23, 2026
0512e59
gitignore
viacheslauK Jan 23, 2026
eb867a4
mqtt: removing SubscribingStatus from subscriber FB
viacheslauK Jan 23, 2026
ec7f92e
mqtt: removing parsing status from JSON decoder FB; updating parsing …
viacheslauK Jan 23, 2026
8aba451
mqtt: publisher FB deadlock fix
viacheslauK Jan 26, 2026
6e413ea
mqtt: array parsing for decoder FB
viacheslauK Jan 26, 2026
ebf5c01
mqtt: publisher fix - processing SampleCount() to extract all samples…
viacheslauK Jan 26, 2026
4c740d2
mqtt: new publisher mode with shared ts arrays
viacheslauK Jan 28, 2026
70bcd95
mqtt: SampleType::String for preview signals
viacheslauK Jan 28, 2026
49a255f
mqtt: use existing ComponentStatusType for client FB
viacheslauK Jan 28, 2026
56d308a
mqtt: internal buffer for AtomicSignalSampleArrayHandler
viacheslauK Jan 28, 2026
1ce672b
mqtt: template for tracking descriptor changes in publisher handlers
viacheslauK Jan 28, 2026
975f6a3
mqtt: tracking descriptor changes in publisher handlers
viacheslauK Jan 29, 2026
df82d7b
mqtt: CoreEvent checking for DataDescriptorChanged for connected signals
viacheslauK Jan 30, 2026
0b27c22
mqtt: proper initBuilders for GroupSignalSharedTsArrHandler
viacheslauK Jan 30, 2026
78f2daf
mqtt: improving port numeration for Publisher FB
viacheslauK Jan 30, 2026
d69bb2f
mqtt: updating app examples
viacheslauK Jan 30, 2026
32424d7
mqtt: main branch for openDAQ
viacheslauK Jan 30, 2026
937f527
mqtt: README updating
viacheslauK Jan 30, 2026
fb33e31
mqtt: version 0.1.0
viacheslauK Feb 2, 2026
9ce17e7
new layout
viacheslauK Feb 2, 2026
dff2d5b
ci.yml
viacheslauK Feb 2, 2026
4330fae
mqtt: export only if tests are enabled
viacheslauK Feb 2, 2026
5c4c297
CI: tests and mosquitto
viacheslauK Feb 3, 2026
2927f65
mqtt: disable tests that require 2 mqtt brokers
viacheslauK Feb 3, 2026
bec1204
CI: remove tests for Windows
viacheslauK Feb 3, 2026
42176a5
mqtt: openDAQ version update
viacheslauK Feb 4, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: Build and Test

on:
pull_request:
types: [opened, reopened, synchronize, ready_for_review]

jobs:
build-and-test:
strategy:
fail-fast: false
matrix:
include:
- os: ubuntu-latest
generator: Ninja
- os: windows-latest
generator: "Visual Studio 17 2022"

runs-on: ${{ matrix.os }}

steps:
- name: Install additional dependencies
if: matrix.os == 'ubuntu-latest'
run: |
sudo apt-get install -y --no-install-recommends mono-runtime libmono-system-json-microsoft4.0-cil libmono-system-data4.0-cil

- name: Checkout project repo
uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.branch || github.event.client_payload.branch || github.ref }}

- name: Configure project with CMake
run: cmake -B build/output -S . -G "${{ matrix.generator }}" -DOPENDAQ_MQTT_ENABLE_TESTS=ON -DCMAKE_BUILD_TYPE=Debug

- name: Build project with CMake
run: cmake --build build/output --config Debug

- name: Install and run mosquitto Linux
if: matrix.os == 'ubuntu-latest'
run: |
sudo apt-get install -y --no-install-recommends mosquitto

- name: Run project tests with CMake
if: matrix.os == 'ubuntu-latest'
run: ctest --test-dir build/output --output-on-failure -C Debug
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ include/*
lib/*
bin/*
test/test_runner
docker/*
build*/*

CMakeLists.txt.user
.vscode
.clang-format
.dockerignore

11 changes: 8 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,15 @@ if(OPENDAQ_MQTT_MODULE_ENABLE_SSL)
add_compile_definitions(OPENDAQ_MQTT_MODULE_ENABLE_SSL)
endif()

set(MQTT_MODULE_VERSION "0.1.0" CACHE STRING "MQTT module version" FORCE)

if(OPENDAQ_MQTT_ENABLE_TESTS)
enable_testing()
endif()

add_subdirectory(external)
add_subdirectory(helper_utils)
add_subdirectory(mqtt_streaming_protocol)
add_subdirectory(mqtt_streaming_module)
add_subdirectory(shared)
add_subdirectory(modules)

if(OPENDAQ_DEVICE_EXAMPLE_ENABLE_EXAMPLE_APPS)
message(STATUS "Example applications have been enabled")
Expand Down
100 changes: 47 additions & 53 deletions README.md

Large diffs are not rendered by default.

23 changes: 12 additions & 11 deletions examples/custom-mqtt-sub/src/custom-mqtt-sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ std::string to_string(daq::DataPacketPtr packet)
case SampleType::Int64:
data = std::to_string(*(static_cast<int64_t*>(packet.getData())));
break;
case SampleType::String:
case SampleType::Binary:
data = '\"' + std::string(static_cast<char*>(packet.getData()), packet.getDataSize()) + '\"';
break;
Expand Down Expand Up @@ -123,24 +124,24 @@ int main(int argc, char* argv[])

// Create OpenDAQ instance and add MQTT broker FB
const InstancePtr instance = InstanceBuilder().addModulePath(MODULE_PATH).build();
const std::string rootFbName = "RootMqttFb";
auto rootFbConfig = instance.getAvailableFunctionBlockTypes().get(rootFbName).createDefaultConfig();
rootFbConfig.setPropertyValue("MqttBrokerAddress", appConfig.brokerAddress);
auto brokerFB = instance.addFunctionBlock(rootFbName, rootFbConfig);
const std::string clientFbName = "MQTTClientFB";
auto clientFbConfig = instance.getAvailableFunctionBlockTypes().get(clientFbName).createDefaultConfig();
clientFbConfig.setPropertyValue("BrokerAddress", appConfig.brokerAddress);
auto brokerFB = instance.addFunctionBlock(clientFbName, clientFbConfig);
auto availableFbs = brokerFB.getAvailableFunctionBlockTypes();

const std::string jsonFbName = "JsonSubscriberMqttFb";
std::cout << "Try to add the " << jsonFbName << std::endl;
const std::string subFbName = "MQTTSubscriberFB";
std::cout << "Try to add the " << subFbName << std::endl;

auto config = availableFbs.get(jsonFbName).createDefaultConfig();
config.setPropertyValue("JsonConfigFile", appConfig.configFilePath);
auto config = availableFbs.get(subFbName).createDefaultConfig();
config.setPropertyValue("JSONConfigFile", appConfig.configFilePath);

// Add the JSON function block to the broker FB
daq::FunctionBlockPtr jsonFb = brokerFB.addFunctionBlock(jsonFbName, config);
// Add the subscriber function block to the broker FB
daq::FunctionBlockPtr subFb = brokerFB.addFunctionBlock(subFbName, config);

// Create packet readers for all signals
auto signals = List<daq::ISignal>();
const auto fbs = jsonFb.getFunctionBlocks();
const auto fbs = subFb.getFunctionBlocks();
for (const auto& fb : fbs)
{
const auto sig = fb.getSignals();
Expand Down
20 changes: 11 additions & 9 deletions examples/raw-mqtt-sub/src/raw-mqtt-sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,26 @@ int main(int argc, char* argv[])

// Create OpenDAQ instance and add MQTT broker FB
const InstancePtr instance = InstanceBuilder().addModulePath(MODULE_PATH).build();
const std::string rootFbName = "RootMqttFb";
auto rootFbConfig = instance.getAvailableFunctionBlockTypes().get(rootFbName).createDefaultConfig();
rootFbConfig.setPropertyValue("MqttBrokerAddress", appConfig.brokerAddress);
auto brokerFB = instance.addFunctionBlock(rootFbName, rootFbConfig);
const std::string clientFbName = "MQTTClientFB";
auto clientFbConfig = instance.getAvailableFunctionBlockTypes().get(clientFbName).createDefaultConfig();
clientFbConfig.setPropertyValue("BrokerAddress", appConfig.brokerAddress);
auto brokerFB = instance.addFunctionBlock(clientFbName, clientFbConfig);
auto availableFbs = brokerFB.getAvailableFunctionBlockTypes();

const std::string fbName = "RawSubscriberMqttFb";
const std::string fbName = "MQTTSubscriberFB";
std::cout << "Try to add the " << fbName << std::endl;

// Create RAW function block configuration
// Create subscriber function block configuration
auto config = availableFbs.get(fbName).createDefaultConfig();
config.setPropertyValue("Topic", appConfig.topic);
config.setPropertyValue("EnablePreviewSignal", True);
config.setPropertyValue("MessageIsString", True);

// Add the RAW function block to the broker FB
daq::FunctionBlockPtr rawFb = brokerFB.addFunctionBlock(fbName, config);
// Add the subscriber function block to the broker FB
daq::FunctionBlockPtr subFb = brokerFB.addFunctionBlock(fbName, config);

// Create packet readers for a signal
const auto signal = rawFb.getSignals()[0];
const auto signal = subFb.getSignals()[0];
PacketReaderPtr reader = daq::PacketReader(signal);

// Start a thread to read packets from the reader
Expand Down
81 changes: 36 additions & 45 deletions examples/ref-dev-mqtt-pub/src/ref-dev-mqtt-pub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@
using namespace daq;

enum class Mode {
ATOMIC_SIGNAL_ATOMIC_SAMPLE = 0,
ATOMIC_SIGNAL_SAMPLE_ARRAY,
SIGNAL_ARRAY_ATOMIC_SAMPLE,
GROUP_SIGNAL_ATOMIC_SAMPLE_SHARED_TS,
TopicPerSignal = 0,
SingleTopic,
_COUNT
};

struct ConfigStruct {
std::string brokerAddress;
Mode mode;
bool useArray = false;
size_t arraySize = 0;
bool exit = true;
int error = 0;
};
Expand All @@ -27,12 +27,11 @@ ConfigStruct StartUp(int argc, char* argv[])
args.addArg("--help", "Show help message");
args.addArg("--address", "MQTT broker address", true);
args.addArg("--mode", "publisher FB mode", true);
args.addArg("--array", "pablish samples as arrays with specified size", true);
args.setUsageHelp(APP_NAME " [options]\n"
"Available modes:\n"
" 0 - ATOMIC_SIGNAL_ATOMIC_SAMPLE\n"
" 1 - ATOMIC_SIGNAL_SAMPLE_ARRAY\n"
" 2 - SIGNAL_ARRAY_ATOMIC_SAMPLE\n"
" 3 - GROUP_SIGNAL_ATOMIC_SAMPLE_SHARED_TS");
" 0 - Topic per signal\n"
" 1 - Single topic\n");
args.parse(argc, argv);

if (args.hasArg("--help") || args.hasUnknownArgs())
Expand All @@ -54,6 +53,19 @@ ConfigStruct StartUp(int argc, char* argv[])
return config;
}
config.mode = static_cast<Mode>(mode);
if (args.hasArg("--array"))
{
config.useArray = true;
config.arraySize = std::stoi(args.getArgValue("--array", "0"));
if (config.arraySize == 0)
{
std::cout << "Invalid array size value. It must be greater than 0." << std::endl;
args.printHelp();
config.error = -1;
config.exit = true;
return config;
}
}
return config;
}

Expand All @@ -73,59 +85,38 @@ int main(int argc, char* argv[])

// Configure channels
const auto channels = refDevice.getChannelsRecursive();
channels[0].setPropertyValue("UseGlobalSampleRate", appConfig.mode == Mode::GROUP_SIGNAL_ATOMIC_SAMPLE_SHARED_TS);
channels[0].setPropertyValue("UseGlobalSampleRate", appConfig.mode == Mode::SingleTopic);
channels[0].setPropertyValue("SampleRate", 10);
channels[0].setPropertyValue("Frequency", 1);
channels[0].setPropertyValue("Waveform", 1);
channels[1].setPropertyValue("UseGlobalSampleRate", appConfig.mode == Mode::GROUP_SIGNAL_ATOMIC_SAMPLE_SHARED_TS);
channels[1].setPropertyValue("UseGlobalSampleRate", appConfig.mode == Mode::SingleTopic);
channels[1].setPropertyValue("SampleRate", 20);
channels[1].setPropertyValue("Frequency", 1);
channels[1].setPropertyValue("Waveform", 3);
channels[2].setPropertyValue("UseGlobalSampleRate", appConfig.mode == Mode::GROUP_SIGNAL_ATOMIC_SAMPLE_SHARED_TS);
channels[2].setPropertyValue("UseGlobalSampleRate", appConfig.mode == Mode::SingleTopic);
channels[2].setPropertyValue("SampleRate", 50);
channels[2].setPropertyValue("Frequency", 4);
channels[3].setPropertyValue("UseGlobalSampleRate", appConfig.mode == Mode::GROUP_SIGNAL_ATOMIC_SAMPLE_SHARED_TS);
channels[3].setPropertyValue("UseGlobalSampleRate", appConfig.mode == Mode::SingleTopic);
channels[3].setPropertyValue("SampleRate", 100);
channels[3].setPropertyValue("Frequency", 20);

// Create and configure MQTT server
const std::string rootFbName = "RootMqttFb";
auto rootFbConfig = instance.getAvailableFunctionBlockTypes().get(rootFbName).createDefaultConfig();
rootFbConfig.setPropertyValue("MqttBrokerAddress", appConfig.brokerAddress);
auto brokerFB = instance.addFunctionBlock(rootFbName, rootFbConfig);
const std::string clientFbName = "MQTTClientFB";
auto clientFbConfig = instance.getAvailableFunctionBlockTypes().get(clientFbName).createDefaultConfig();
clientFbConfig.setPropertyValue("BrokerAddress", appConfig.brokerAddress);
auto brokerFB = instance.addFunctionBlock(clientFbName, clientFbConfig);
auto availableFbs = brokerFB.getAvailableFunctionBlockTypes();
const std::string fbName = "PublisherMqttFb";
const std::string fbName = "MQTTJSONPublisherFB";
std::cout << "Try to add the " << fbName << std::endl;

auto config = availableFbs.get(fbName).createDefaultConfig();
config.setPropertyValue("MqttQoS", 1);
config.setPropertyValue("ReaderPeriod", 20);
config.setPropertyValue("UseSignalNames", True);
switch (appConfig.mode) {
case Mode::ATOMIC_SIGNAL_ATOMIC_SAMPLE:
config.setPropertyValue("SharedTimestamp", False);
config.setPropertyValue("TopicMode", 0);
config.setPropertyValue("GroupValues", False);
break;
case Mode::ATOMIC_SIGNAL_SAMPLE_ARRAY:
config.setPropertyValue("SharedTimestamp", False);
config.setPropertyValue("TopicMode", 0);
config.setPropertyValue("GroupValues", True);
config.setPropertyValue("GroupValuesPackSize", 3);
break;
case Mode::SIGNAL_ARRAY_ATOMIC_SAMPLE:
config.setPropertyValue("SharedTimestamp", False);
config.setPropertyValue("TopicMode", 1);
config.setPropertyValue("GroupValues", False);
break;
case Mode::GROUP_SIGNAL_ATOMIC_SAMPLE_SHARED_TS:
config.setPropertyValue("SharedTimestamp", True);
config.setPropertyValue("TopicMode", 1);
config.setPropertyValue("GroupValues", False);
break;
default:
break;
}
config.setPropertyValue("QoS", 1);
config.setPropertyValue("ReaderWaitPeriod", 20);
config.setPropertyValue("SignalValueJSONKey", 2);
config.setPropertyValue("TopicMode", (appConfig.mode == Mode::TopicPerSignal) ? 0 : 1);
config.setPropertyValue("GroupValues", (appConfig.useArray) ? True : False);
config.setPropertyValue("SamplesPerMessage", appConfig.arraySize);
config.setPropertyValue("Topic", "opendaq/test/values");


// Add the publisher function block to the broker device
Expand Down
2 changes: 1 addition & 1 deletion external/opendaq/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ set(OPENDAQ_ENABLE_TESTS false)
FetchContent_Declare(
opendaq
GIT_REPOSITORY https://github.com/openDAQ/openDAQ.git
GIT_TAG origin/main
GIT_TAG 41396d19a1567ab6aecacfe7e381ea616dfdad6c
GIT_PROGRESS ON
EXCLUDE_FROM_ALL
SYSTEM
Expand Down
4 changes: 4 additions & 0 deletions modules/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
cmake_minimum_required(VERSION 3.10)
list(APPEND CMAKE_MESSAGE_CONTEXT modules)

add_subdirectory(mqtt_streaming_module)
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
cmake_minimum_required(VERSION 3.10)
set_cmake_folder_context(TARGET_FOLDER_NAME)

set(MQTT_MODULE_VERSION ${OPENDAQ_PACKAGE_VERSION})
set(MQTT_MODULE_PRJ_NAME "OpenDaqMqttModule")

message(STATUS "${MQTT_MODULE_PRJ_NAME} version: ${MQTT_MODULE_VERSION}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,30 @@

#pragma once

#include <opendaq/function_block_ptr.h>
#include <mqtt_streaming_module/handler_base.h>

BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE

class AtomicSignalAtomicSampleHandler : public HandlerBase
{
public:
explicit AtomicSignalAtomicSampleHandler(bool useSignalNames);
explicit AtomicSignalAtomicSampleHandler(WeakRefPtr<IFunctionBlock> parentFb, SignalValueJSONKey signalNamesMode);

MqttData processSignalContexts(std::vector<SignalContext>& signalContexts) override;
ProcedureStatus validateSignalContexts(const std::vector<SignalContext>& signalContexts) const override;
ProcedureStatus signalListChanged(std::vector<SignalContext>& signalContexts) override
{
return ProcedureStatus{true, {}};
};

ListPtr<IString> getTopics(const std::vector<SignalContext>& signalContexts) override;
std::string getSchema() override;
protected:
bool useSignalNames;

virtual MqttData processSignalContext(SignalContext& signalContext);
void
processSignalDescriptorChanged(SignalContext& signalCtx, const DataDescriptorPtr& valueSigDesc, const DataDescriptorPtr& domainSigDesc);
MqttDataSample processDataPacket(SignalContext& signalContext, const DataPacketPtr& dataPacket);
std::string toString(const std::string valueFieldName, daq::DataPacketPtr packet);
MqttDataSample processDataPacket(SignalContext& signalContext, const DataPacketPtr& dataPacket, size_t offset);
std::string toString(const std::string valueFieldName, daq::DataPacketPtr packet, size_t offset);
std::string buildTopicName(const SignalContext& signalContext);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,42 @@

#pragma once

#include <opendaq/function_block_ptr.h>
#include <mqtt_streaming_module/atomic_signal_atomic_sample_handler.h>
#include <list>

BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE

class AtomicSignalSampleArrayHandler : public AtomicSignalAtomicSampleHandler
{
public:
explicit AtomicSignalSampleArrayHandler(bool useSignalNames, size_t packSize);

struct SignalBuffer
{
std::list<DataPacketPtr> data;
size_t dataSize = 0;
size_t offset = 0;
void clear()
{
data.clear();
dataSize = 0;
offset = 0;
}
};
explicit AtomicSignalSampleArrayHandler(WeakRefPtr<IFunctionBlock> parentFb, SignalValueJSONKey signalNamesMode, size_t packSize);
ProcedureStatus signalListChanged(std::vector<SignalContext>& signalContexts) override;
std::string getSchema() override;

protected:
size_t packSize;
std::unordered_map<std::string, SignalBuffer> signalBuffers;

MqttData processSignalContext(SignalContext& signalContext) override;
MqttDataSample processDataPackets(SignalContext& signalContext, const std::vector<DataPacketPtr>& dataPacket);
std::string toString(const std::string valueFieldName, const std::vector<DataPacketPtr>& dataPackets);
MqttDataSample processDataPackets(SignalContext& signalContext);
std::string toString(const std::string valueFieldName, SignalContext& signalContext);
std::pair<DataPacketPtr, size_t> getSample(SignalContext& signalContext);
void
processSignalDescriptorChanged(SignalContext& signalCtx, const DataDescriptorPtr& valueSigDesc, const DataDescriptorPtr& domainSigDesc);
};

END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
Loading