Skip to content

Commit 6685600

Browse files
committed
Further cleanups to broker code.
1 parent 82f89d0 commit 6685600

2 files changed

Lines changed: 142 additions & 112 deletions

File tree

src/mqtt_broker.c

Lines changed: 141 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
*
77
* wolfMQTT is free software; you can redistribute it and/or modify
88
* it under the terms of the GNU General Public License as published by
9-
* the Free Software Foundation; either version 2 of the License, or
9+
* the Free Software Foundation; either version 3 of the License, or
1010
* (at your option) any later version.
1111
*
1212
* wolfMQTT is distributed in the hope that it will be useful,
@@ -24,6 +24,7 @@
2424
#include <config.h>
2525
#endif
2626

27+
#include "wolfmqtt/mqtt_types.h"
2728
#include "wolfmqtt/mqtt_broker.h"
2829
#include "wolfmqtt/mqtt_client.h"
2930
#include "wolfmqtt/mqtt_packet.h"
@@ -46,22 +47,14 @@
4647
#include <sys/socket.h>
4748
#include <time.h>
4849
#include <unistd.h>
49-
#ifdef ENABLE_MQTT_TLS
50-
#include <wolfssl/ssl.h>
51-
#endif
5250
#endif /* !WOLFMQTT_BROKER_CUSTOM_NET */
5351

5452
/* -------------------------------------------------------------------------- */
5553
/* Default time abstraction */
5654
/* -------------------------------------------------------------------------- */
5755
#ifndef WOLFMQTT_BROKER_GET_TIME_S
58-
#ifndef WOLFMQTT_BROKER_CUSTOM_NET
59-
#define WOLFMQTT_BROKER_GET_TIME_S() \
60-
((WOLFMQTT_BROKER_TIME_T)time(NULL))
61-
#else
62-
#error "WOLFMQTT_BROKER_CUSTOM_NET requires " \
63-
"WOLFMQTT_BROKER_GET_TIME_S to be defined"
64-
#endif
56+
#define WOLFMQTT_BROKER_GET_TIME_S() \
57+
((WOLFMQTT_BROKER_TIME_T)time(NULL))
6558
#endif
6659

6760
/* -------------------------------------------------------------------------- */
@@ -70,11 +63,8 @@
7063
#ifndef BROKER_SLEEP_MS
7164
#ifdef USE_WINDOWS_API
7265
#define BROKER_SLEEP_MS(ms) Sleep(ms)
73-
#elif !defined(WOLFMQTT_BROKER_CUSTOM_NET)
74-
#define BROKER_SLEEP_MS(ms) usleep((unsigned)(ms) * 1000)
7566
#else
76-
#error "WOLFMQTT_BROKER_CUSTOM_NET requires " \
77-
"BROKER_SLEEP_MS to be defined"
67+
#define BROKER_SLEEP_MS(ms) usleep((unsigned)(ms) * 1000)
7868
#endif
7969
#endif
8070

@@ -351,84 +341,96 @@ int MqttBrokerNet_Init(MqttBrokerNet* net)
351341
#ifdef ENABLE_MQTT_TLS
352342
static int BrokerTls_Init(MqttBroker* broker)
353343
{
354-
WOLFSSL_CTX* ctx;
344+
WOLFSSL_CTX* ctx = NULL;
355345
int rc;
356346

357347
rc = wolfSSL_Init();
358348
if (rc != WOLFSSL_SUCCESS) {
359349
PRINTF("broker: wolfSSL_Init failed %d", rc);
360-
return MQTT_CODE_ERROR_BAD_ARG;
350+
rc = MQTT_CODE_ERROR_BAD_ARG;
361351
}
362352

363353
/* Select TLS method based on version preference */
364-
if (broker->tls_version == 12) {
365-
ctx = wolfSSL_CTX_new(wolfTLSv1_2_server_method());
366-
}
367-
else if (broker->tls_version == 13) {
368-
ctx = wolfSSL_CTX_new(wolfTLSv1_3_server_method());
369-
}
370-
else {
371-
/* Default: auto-negotiate (supports TLS 1.2 and 1.3) */
372-
ctx = wolfSSL_CTX_new(wolfSSLv23_server_method());
373-
}
374-
if (ctx == NULL) {
375-
PRINTF("broker: wolfSSL_CTX_new failed");
376-
wolfSSL_Cleanup();
377-
return MQTT_CODE_ERROR_MEMORY;
354+
if (rc == WOLFSSL_SUCCESS) {
355+
if (broker->tls_version == 12) {
356+
ctx = wolfSSL_CTX_new(wolfTLSv1_2_server_method());
357+
}
358+
else if (broker->tls_version == 13) {
359+
ctx = wolfSSL_CTX_new(wolfTLSv1_3_server_method());
360+
}
361+
else {
362+
ctx = wolfSSL_CTX_new(wolfSSLv23_server_method());
363+
}
364+
if (ctx == NULL) {
365+
PRINTF("broker: wolfSSL_CTX_new failed");
366+
rc = MQTT_CODE_ERROR_MEMORY;
367+
}
378368
}
379369

380370
/* Load server certificate */
381-
if (broker->tls_cert == NULL) {
382-
PRINTF("broker: TLS cert not set (-c)");
383-
wolfSSL_CTX_free(ctx);
384-
wolfSSL_Cleanup();
385-
return MQTT_CODE_ERROR_BAD_ARG;
371+
if (rc == WOLFSSL_SUCCESS) {
372+
if (broker->tls_cert == NULL) {
373+
PRINTF("broker: TLS cert not set (-c)");
374+
rc = MQTT_CODE_ERROR_BAD_ARG;
375+
}
386376
}
387-
rc = wolfSSL_CTX_use_certificate_file(ctx, broker->tls_cert,
388-
WOLFSSL_FILETYPE_PEM);
389-
if (rc != WOLFSSL_SUCCESS) {
390-
PRINTF("broker: load cert failed %d (%s)", rc, broker->tls_cert);
391-
wolfSSL_CTX_free(ctx);
392-
wolfSSL_Cleanup();
393-
return MQTT_CODE_ERROR_BAD_ARG;
377+
if (rc == WOLFSSL_SUCCESS) {
378+
rc = wolfSSL_CTX_use_certificate_file(ctx, broker->tls_cert,
379+
WOLFSSL_FILETYPE_PEM);
380+
if (rc != WOLFSSL_SUCCESS) {
381+
PRINTF("broker: load cert failed %d (%s)", rc, broker->tls_cert);
382+
rc = MQTT_CODE_ERROR_BAD_ARG;
383+
}
394384
}
395385

396386
/* Load server private key */
397-
if (broker->tls_key == NULL) {
398-
PRINTF("broker: TLS key not set (-K)");
399-
wolfSSL_CTX_free(ctx);
400-
wolfSSL_Cleanup();
401-
return MQTT_CODE_ERROR_BAD_ARG;
387+
if (rc == WOLFSSL_SUCCESS) {
388+
if (broker->tls_key == NULL) {
389+
PRINTF("broker: TLS key not set (-K)");
390+
rc = MQTT_CODE_ERROR_BAD_ARG;
391+
}
402392
}
403-
rc = wolfSSL_CTX_use_PrivateKey_file(ctx, broker->tls_key,
404-
WOLFSSL_FILETYPE_PEM);
405-
if (rc != WOLFSSL_SUCCESS) {
406-
PRINTF("broker: load key failed %d (%s)", rc, broker->tls_key);
407-
wolfSSL_CTX_free(ctx);
408-
wolfSSL_Cleanup();
409-
return MQTT_CODE_ERROR_BAD_ARG;
393+
if (rc == WOLFSSL_SUCCESS) {
394+
rc = wolfSSL_CTX_use_PrivateKey_file(ctx, broker->tls_key,
395+
WOLFSSL_FILETYPE_PEM);
396+
if (rc != WOLFSSL_SUCCESS) {
397+
PRINTF("broker: load key failed %d (%s)", rc, broker->tls_key);
398+
rc = MQTT_CODE_ERROR_BAD_ARG;
399+
}
410400
}
411401

412-
/* Set wolfSSL IO callbacks (reuse existing WOLFMQTT_API functions) */
413-
wolfSSL_CTX_SetIORecv(ctx, MqttSocket_TlsSocketReceive);
414-
wolfSSL_CTX_SetIOSend(ctx, MqttSocket_TlsSocketSend);
402+
/* Set wolfSSL IO callbacks */
403+
if (rc == WOLFSSL_SUCCESS) {
404+
wolfSSL_CTX_SetIORecv(ctx, MqttSocket_TlsSocketReceive);
405+
wolfSSL_CTX_SetIOSend(ctx, MqttSocket_TlsSocketSend);
406+
}
415407

416408
/* Mutual TLS: load CA and require client certificate */
417-
if (broker->tls_ca != NULL) {
409+
if (rc == WOLFSSL_SUCCESS && broker->tls_ca != NULL) {
418410
rc = wolfSSL_CTX_load_verify_locations(ctx, broker->tls_ca, NULL);
419411
if (rc != WOLFSSL_SUCCESS) {
420412
PRINTF("broker: load CA failed %d (%s)", rc, broker->tls_ca);
421-
wolfSSL_CTX_free(ctx);
422-
wolfSSL_Cleanup();
423-
return MQTT_CODE_ERROR_BAD_ARG;
413+
rc = MQTT_CODE_ERROR_BAD_ARG;
414+
}
415+
else {
416+
wolfSSL_CTX_set_verify(ctx,
417+
WOLFSSL_VERIFY_PEER | WOLFSSL_VERIFY_FAIL_IF_NO_PEER_CERT,
418+
NULL);
419+
PRINTF("broker: mutual TLS enabled (CA=%s)", broker->tls_ca);
424420
}
425-
wolfSSL_CTX_set_verify(ctx,
426-
WOLFSSL_VERIFY_PEER | WOLFSSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL);
427-
PRINTF("broker: mutual TLS enabled (CA=%s)", broker->tls_ca);
428421
}
429422

430-
broker->tls_ctx = ctx;
431-
return MQTT_CODE_SUCCESS;
423+
if (rc == WOLFSSL_SUCCESS) {
424+
broker->tls_ctx = ctx;
425+
rc = MQTT_CODE_SUCCESS;
426+
}
427+
else {
428+
if (ctx != NULL) {
429+
wolfSSL_CTX_free(ctx);
430+
}
431+
wolfSSL_Cleanup();
432+
}
433+
return rc;
432434
}
433435

434436
static void BrokerTls_Free(MqttBroker* broker)
@@ -1179,6 +1181,7 @@ static int BrokerPendingWill_Add(MqttBroker* broker, BrokerClient* bc)
11791181
{
11801182
BrokerPendingWill* pw = NULL;
11811183
WOLFMQTT_BROKER_TIME_T now = WOLFMQTT_BROKER_GET_TIME_S();
1184+
int rc = MQTT_CODE_SUCCESS;
11821185

11831186
#ifdef WOLFMQTT_STATIC_MEMORY
11841187
{
@@ -1190,69 +1193,96 @@ static int BrokerPendingWill_Add(MqttBroker* broker, BrokerClient* bc)
11901193
}
11911194
}
11921195
if (pw == NULL) {
1193-
return MQTT_CODE_ERROR_MEMORY;
1196+
rc = MQTT_CODE_ERROR_MEMORY;
11941197
}
1195-
XMEMSET(pw, 0, sizeof(*pw));
1196-
pw->in_use = 1;
1197-
XSTRNCPY(pw->client_id, bc->client_id, BROKER_MAX_CLIENT_ID_LEN - 1);
1198-
pw->client_id[BROKER_MAX_CLIENT_ID_LEN - 1] = '\0';
1199-
XSTRNCPY(pw->topic, bc->will_topic, BROKER_MAX_TOPIC_LEN - 1);
1200-
pw->topic[BROKER_MAX_TOPIC_LEN - 1] = '\0';
1201-
if (bc->will_payload_len > 0) {
1202-
word16 len = bc->will_payload_len;
1203-
if (len > BROKER_MAX_WILL_PAYLOAD_LEN) {
1204-
len = BROKER_MAX_WILL_PAYLOAD_LEN;
1198+
if (rc == MQTT_CODE_SUCCESS) {
1199+
XMEMSET(pw, 0, sizeof(*pw));
1200+
pw->in_use = 1;
1201+
{
1202+
int len = (int)XSTRLEN(bc->client_id);
1203+
if (len >= BROKER_MAX_CLIENT_ID_LEN) {
1204+
len = BROKER_MAX_CLIENT_ID_LEN - 1;
1205+
}
1206+
XMEMCPY(pw->client_id, bc->client_id, len);
1207+
pw->client_id[len] = '\0';
1208+
}
1209+
{
1210+
int len = (int)XSTRLEN(bc->will_topic);
1211+
if (len >= BROKER_MAX_TOPIC_LEN) {
1212+
len = BROKER_MAX_TOPIC_LEN - 1;
1213+
}
1214+
XMEMCPY(pw->topic, bc->will_topic, len);
1215+
pw->topic[len] = '\0';
1216+
}
1217+
if (bc->will_payload_len > 0) {
1218+
word16 len = bc->will_payload_len;
1219+
if (len > BROKER_MAX_WILL_PAYLOAD_LEN) {
1220+
len = BROKER_MAX_WILL_PAYLOAD_LEN;
1221+
}
1222+
XMEMCPY(pw->payload, bc->will_payload, len);
1223+
pw->payload_len = len;
12051224
}
1206-
XMEMCPY(pw->payload, bc->will_payload, len);
1207-
pw->payload_len = len;
12081225
}
12091226
}
12101227
#else
12111228
pw = (BrokerPendingWill*)WOLFMQTT_MALLOC(sizeof(BrokerPendingWill));
12121229
if (pw == NULL) {
1213-
return MQTT_CODE_ERROR_MEMORY;
1230+
rc = MQTT_CODE_ERROR_MEMORY;
12141231
}
1215-
XMEMSET(pw, 0, sizeof(*pw));
1216-
{
1232+
if (rc == MQTT_CODE_SUCCESS) {
12171233
int id_len = (int)XSTRLEN(bc->client_id);
1234+
int t_len = (int)XSTRLEN(bc->will_topic);
1235+
XMEMSET(pw, 0, sizeof(*pw));
12181236
pw->client_id = (char*)WOLFMQTT_MALLOC((size_t)id_len + 1);
1219-
if (pw->client_id == NULL) {
1220-
WOLFMQTT_FREE(pw);
1221-
return MQTT_CODE_ERROR_MEMORY;
1237+
if (pw->client_id != NULL) {
1238+
XMEMCPY(pw->client_id, bc->client_id, (size_t)id_len + 1);
12221239
}
1223-
XMEMCPY(pw->client_id, bc->client_id, (size_t)id_len + 1);
1224-
}
1225-
{
1226-
int t_len = (int)XSTRLEN(bc->will_topic);
1227-
pw->topic = (char*)WOLFMQTT_MALLOC((size_t)t_len + 1);
1228-
if (pw->topic == NULL) {
1229-
WOLFMQTT_FREE(pw->client_id);
1230-
WOLFMQTT_FREE(pw);
1231-
return MQTT_CODE_ERROR_MEMORY;
1240+
else {
1241+
rc = MQTT_CODE_ERROR_MEMORY;
1242+
}
1243+
if (rc == MQTT_CODE_SUCCESS) {
1244+
pw->topic = (char*)WOLFMQTT_MALLOC((size_t)t_len + 1);
1245+
if (pw->topic != NULL) {
1246+
XMEMCPY(pw->topic, bc->will_topic, (size_t)t_len + 1);
1247+
}
1248+
else {
1249+
rc = MQTT_CODE_ERROR_MEMORY;
1250+
}
1251+
}
1252+
if (rc == MQTT_CODE_SUCCESS && bc->will_payload_len > 0) {
1253+
pw->payload = (byte*)WOLFMQTT_MALLOC(bc->will_payload_len);
1254+
if (pw->payload != NULL) {
1255+
XMEMCPY(pw->payload, bc->will_payload, bc->will_payload_len);
1256+
pw->payload_len = bc->will_payload_len;
1257+
}
1258+
else {
1259+
rc = MQTT_CODE_ERROR_MEMORY;
1260+
}
12321261
}
1233-
XMEMCPY(pw->topic, bc->will_topic, (size_t)t_len + 1);
12341262
}
1235-
if (bc->will_payload_len > 0) {
1236-
pw->payload = (byte*)WOLFMQTT_MALLOC(bc->will_payload_len);
1237-
if (pw->payload == NULL) {
1263+
if (rc == MQTT_CODE_SUCCESS) {
1264+
pw->next = broker->pending_wills;
1265+
broker->pending_wills = pw;
1266+
}
1267+
else if (pw != NULL) {
1268+
if (pw->topic) {
12381269
WOLFMQTT_FREE(pw->topic);
1270+
}
1271+
if (pw->client_id) {
12391272
WOLFMQTT_FREE(pw->client_id);
1240-
WOLFMQTT_FREE(pw);
1241-
return MQTT_CODE_ERROR_MEMORY;
12421273
}
1243-
XMEMCPY(pw->payload, bc->will_payload, bc->will_payload_len);
1244-
pw->payload_len = bc->will_payload_len;
1274+
WOLFMQTT_FREE(pw);
12451275
}
1246-
pw->next = broker->pending_wills;
1247-
broker->pending_wills = pw;
12481276
#endif
1249-
pw->qos = bc->will_qos;
1250-
pw->retain = bc->will_retain;
1251-
pw->publish_time = now + (WOLFMQTT_BROKER_TIME_T)bc->will_delay_sec;
12521277

1253-
PRINTF("broker: will deferred sock=%d client_id=%s delay=%u",
1254-
(int)bc->sock, bc->client_id, (unsigned)bc->will_delay_sec);
1255-
return MQTT_CODE_SUCCESS;
1278+
if (rc == MQTT_CODE_SUCCESS) {
1279+
pw->qos = bc->will_qos;
1280+
pw->retain = bc->will_retain;
1281+
pw->publish_time = now + (WOLFMQTT_BROKER_TIME_T)bc->will_delay_sec;
1282+
PRINTF("broker: will deferred sock=%d client_id=%s delay=%u",
1283+
(int)bc->sock, bc->client_id, (unsigned)bc->will_delay_sec);
1284+
}
1285+
return rc;
12561286
}
12571287

12581288
/* Cancel a pending will for the given client_id (client reconnected) */

wolfmqtt/mqtt_broker.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
*
77
* wolfMQTT is free software; you can redistribute it and/or modify
88
* it under the terms of the GNU General Public License as published by
9-
* the Free Software Foundation; either version 2 of the License, or
9+
* the Free Software Foundation; either version 3 of the License, or
1010
* (at your option) any later version.
1111
*
1212
* wolfMQTT is distributed in the hope that it will be useful,

0 commit comments

Comments
 (0)