Skip to content

Commit e01fcc7

Browse files
authored
Merge pull request #391 from capocchi/version-5.1
Version 5.1
2 parents 3c37930 + baa58e8 commit e01fcc7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+3606
-975
lines changed

devsimpy/DEVSKernel/BrokerDEVS/ARCHITECTURE.md

Lines changed: 419 additions & 0 deletions
Large diffs are not rendered by default.

devsimpy/DEVSKernel/BrokerDEVS/Brokers/mqtt/MqttAdapter.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import json
2323
from typing import Any, Dict, Optional, Callable
2424
from threading import Event
25+
from queue import Queue
2526

2627
from DEVSKernel.BrokerDEVS.Core.BrokerAdapter import BrokerAdapter
2728

@@ -99,9 +100,8 @@ def __init__(self, broker_address: str, port: int = 1883, **kwargs):
99100
self.port = port
100101
self.client = mqtt.Client()
101102
self.subscribed_topics = []
102-
self.message_queue = []
103+
self.message_queue = Queue() # Thread-safe queue for messages
103104
self.connected_event = Event()
104-
self.last_message = None
105105

106106
# Set up callbacks
107107
self.client.on_connect = self._on_connect
@@ -145,7 +145,7 @@ def _on_connect(self, client, userdata, flags, rc):
145145
def _on_message(self, client, userdata, msg):
146146
"""Callback for when a PUBLISH message is received from the broker."""
147147
mqtt_message = MqttMessage(msg.topic, msg.payload, msg.qos)
148-
self.last_message = mqtt_message
148+
self.message_queue.put(mqtt_message)
149149

150150
def _on_disconnect(self, client, userdata, rc):
151151
"""Callback for when client disconnects from broker."""
@@ -181,14 +181,12 @@ def poll(self, timeout: float = 1.0) -> Optional[MqttMessage]:
181181
Returns:
182182
MqttMessage if available, None otherwise
183183
"""
184-
if self.last_message is not None:
185-
msg = self.last_message
186-
self.last_message = None
187-
return msg
188-
189-
# Wait for a message
190-
time.sleep(min(timeout, 0.1))
191-
return self.last_message
184+
try:
185+
# Try to get a message from the queue with timeout
186+
return self.message_queue.get(timeout=timeout)
187+
except:
188+
# Queue is empty or timeout occurred
189+
return None
192190

193191
def close(self) -> None:
194192
"""Close the consumer connection."""
0 Bytes
Binary file not shown.
File renamed without changes.

devsimpy/DEVSKernel/BrokerDEVS/MS4Me/MS4MeKafkaWorker.py renamed to devsimpy/DEVSKernel/BrokerDEVS/DEVSStreaming/MS4MeKafkaWorker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
logger = logging.getLogger("DEVSKernel.BrokerDEVS.InMemoryKafkaWorker")
77
logger.setLevel(LOGGING_LEVEL)
88

9-
from DEVSKernel.BrokerDEVS.InMemoryKafkaWorker import InMemoryKafkaWorker
9+
from DEVSKernel.BrokerDEVS.Workers.InMemoryKafkaWorker import InMemoryKafkaWorker
1010

1111
from DEVSKernel.BrokerDEVS.Core.BrokerMessageTypes import (
1212
BaseMessage,
@@ -22,7 +22,7 @@
2222
SimulationDone,
2323
)
2424

25-
from DEVSKernel.BrokerDEVS.MS4Me.ms4me_kafka_wire_adapters import StandardWireAdapter
25+
from DEVSKernel.BrokerDEVS.DEVSStreaming.ms4me_kafka_wire_adapters import StandardWireAdapter
2626
from DomainInterface.Object import Message
2727

2828
class MS4MeKafkaWorker(InMemoryKafkaWorker):

devsimpy/DEVSKernel/BrokerDEVS/MS4Me/MS4MeMqttWorker.py renamed to devsimpy/DEVSKernel/BrokerDEVS/DEVSStreaming/MS4MeMqttWorker.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
#
1313
# GENERAL NOTES AND REMARKS:
1414
#
15-
# MQTT-specific worker for MS4Me distributed DEVS simulation.
15+
# MQTT-specific worker for DEVSStreaming distributed DEVS simulation.
1616
# Mirrors MS4MeKafkaWorker but uses MQTT as the message broker.
1717
#
1818
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
@@ -53,15 +53,15 @@
5353

5454
class MS4MeMqttWorker(threading.Thread):
5555
"""
56-
MQTT-based worker thread for MS4Me distributed DEVS simulation.
56+
MQTT-based worker thread for DEVSStreaming distributed DEVS simulation.
5757
5858
Manages one atomic DEVS model running in-memory while communicating
5959
with the coordinator via MQTT topics.
6060
6161
Features:
6262
- Receives commands from coordinator on input topic
6363
- Sends model outputs to coordinator on output topic
64-
- Handles MS4Me message serialization/deserialization
64+
- Handles DEVSStreaming message serialization/deserialization
6565
- Thread-safe operations
6666
- Automatic reconnection on disconnect
6767
"""
@@ -119,7 +119,12 @@ def __init__(
119119
self.password = password
120120

121121
# Wire adapter for message serialization
122-
self.wire_adapter = wire_adapter
122+
if wire_adapter is None:
123+
# Default to StandardWireAdapter (pickle-based) to match proxy defaults
124+
from DEVSKernel.BrokerDEVS.DEVSStreaming.ms4me_mqtt_wire_adapters import StandardWireAdapter
125+
self.wire_adapter = StandardWireAdapter()
126+
else:
127+
self.wire_adapter = wire_adapter
123128

124129
# MQTT client - try VERSION2 API first (paho-mqtt 2.x)
125130
try:

devsimpy/DEVSKernel/BrokerDEVS/MS4Me/SimStrategyKafkaMS4Me.py renamed to devsimpy/DEVSKernel/BrokerDEVS/DEVSStreaming/SimStrategyKafkaMS4Me.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313

1414
from DEVSKernel.PyDEVS.SimStrategies import DirectCouplingPyDEVSSimStrategy
1515
from DomainInterface import DomainStructure, DomainBehavior
16-
from DEVSKernel.BrokerDEVS.MS4Me.MS4MeKafkaWorker import MS4MeKafkaWorker
17-
from DEVSKernel.BrokerDEVS.MS4Me.ms4me_kafka_wire_adapters import StandardWireAdapter
16+
from DEVSKernel.BrokerDEVS.DEVSStreaming.MS4MeKafkaWorker import MS4MeKafkaWorker
17+
from DEVSKernel.BrokerDEVS.DEVSStreaming.ms4me_kafka_wire_adapters import StandardWireAdapter
1818
from DEVSKernel.BrokerDEVS.Core.BrokerMessageTypes import (
1919
BaseMessage,
2020
SimTime,
@@ -27,11 +27,11 @@
2727
TransitionDone,
2828
SimulationDone,
2929
)
30-
from DEVSKernel.BrokerDEVS.Proxies.BrokerReceiverProxy import KafkaReceiverProxy
31-
from DEVSKernel.BrokerDEVS.Proxies.BrokerStreamProxy import KafkaStreamProxy
32-
from DEVSKernel.BrokerDEVS.MS4Me.auto_kafka import ensure_kafka_broker
30+
from DEVSKernel.BrokerDEVS.Proxies.kafka import KafkaReceiverProxy
31+
from DEVSKernel.BrokerDEVS.Proxies.kafka import KafkaStreamProxy
32+
from DEVSKernel.BrokerDEVS.DEVSStreaming.auto_kafka import ensure_kafka_broker
3333
from DEVSKernel.BrokerDEVS.logconfig import configure_logging, LOGGING_LEVEL
34-
from DEVSKernel.BrokerDEVS.MS4Me.kafkaconfig import KAFKA_BOOTSTRAP, AUTO_START_KAFKA_BROKER
34+
from DEVSKernel.BrokerDEVS.DEVSStreaming.kafkaconfig import KAFKA_BOOTSTRAP, AUTO_START_KAFKA_BROKER
3535

3636

3737
configure_logging()
@@ -238,13 +238,11 @@ def __init__(
238238
# Instantiate proxies (replaces direct Producer/Consumer)
239239
self._stream_proxy = StreamProxyClass(
240240
self.bootstrap,
241-
wire_adapter=self.wire,
242241
)
243242

244243
self._receiver_proxy = ReceiverProxyClass(
245244
self.bootstrap,
246245
group_id,
247-
wire_adapter=self.wire,
248246
)
249247

250248
# Subscribe to output topic
@@ -666,7 +664,7 @@ def _simulate_for_ms4me(self, T=1e8):
666664
)
667665

668666
self._terminate_workers()
669-
logger.info("MS4Me BrokerDEVS Simulation Ended")
667+
logger.info("DEVSStreaming BrokerDEVS Simulation Ended")
670668

671669
def __del__(self):
672670
"""Cleanup: close proxies properly"""

devsimpy/DEVSKernel/BrokerDEVS/MS4Me/SimStrategyMqttMS4Me.py renamed to devsimpy/DEVSKernel/BrokerDEVS/DEVSStreaming/SimStrategyMqttMS4Me.py

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
# -*- coding: utf-8 -*-
22
"""
33
SimStrategyMqttMS4Me - MQTT-based distributed DEVS simulation strategy
4-
with in-memory workers (threads) using typed DEVS messages (MS4Me format).
4+
with in-memory workers (threads) using typed DEVS messages (DEVSStreaming format).
55
66
Mirrors SimStrategyKafkaMS4Me but uses MQTT as the message broker instead of Kafka.
7-
Maintains compatibility with MS4Me message standardization and worker coordination.
7+
Maintains compatibility with DEVSStreaming message standardization and worker coordination.
88
"""
99

1010
import logging
@@ -13,8 +13,8 @@
1313

1414
from DEVSKernel.PyDEVS.SimStrategies import DirectCouplingPyDEVSSimStrategy
1515
from DomainInterface import DomainStructure, DomainBehavior
16-
from DEVSKernel.BrokerDEVS.MS4Me.MS4MeMqttWorker import MS4MeMqttWorker
17-
from DEVSKernel.BrokerDEVS.MS4Me.ms4me_mqtt_wire_adapters import StandardWireAdapter
16+
from DEVSKernel.BrokerDEVS.DEVSStreaming.MS4MeMqttWorker import MS4MeMqttWorker
17+
from DEVSKernel.BrokerDEVS.DEVSStreaming.ms4me_mqtt_wire_adapters import StandardWireAdapter
1818
from DEVSKernel.BrokerDEVS.Core.BrokerMessageTypes import (
1919
BaseMessage,
2020
SimTime,
@@ -27,11 +27,11 @@
2727
TransitionDone,
2828
SimulationDone,
2929
)
30-
from DEVSKernel.BrokerDEVS.Proxies.MqttReceiverProxy import MqttReceiverProxy
31-
from DEVSKernel.BrokerDEVS.Proxies.MqttStreamProxy import MqttStreamProxy
32-
from DEVSKernel.BrokerDEVS.MS4Me.auto_mqtt import ensure_mqtt_broker
30+
from DEVSKernel.BrokerDEVS.Proxies.mqtt import MqttReceiverProxy
31+
from DEVSKernel.BrokerDEVS.Proxies.mqtt import MqttStreamProxy
32+
from DEVSKernel.BrokerDEVS.DEVSStreaming.auto_mqtt import ensure_mqtt_broker
3333
from DEVSKernel.BrokerDEVS.logconfig import configure_logging, LOGGING_LEVEL
34-
from DEVSKernel.BrokerDEVS.MS4Me.mqttconfig import MQTT_BROKER_ADDRESS, MQTT_BROKER_PORT, AUTO_START_MQTT_BROKER
34+
from DEVSKernel.BrokerDEVS.DEVSStreaming.mqttconfig import MQTT_BROKER_ADDRESS, MQTT_BROKER_PORT, AUTO_START_MQTT_BROKER
3535

3636

3737
configure_logging()
@@ -225,20 +225,20 @@ def resolve_hierarchical(model, target_port, visited=None):
225225

226226
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
227227
#
228-
# MQTT SIMULATION STRATEGY WITH MS4Me MESSAGES
228+
# MQTT SIMULATION STRATEGY WITH DEVSStreaming MESSAGES
229229
#
230230
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
231231

232232

233233
class SimStrategyMqttMS4Me(DirectCouplingPyDEVSSimStrategy):
234234
"""
235-
MQTT-based distributed DEVS simulation strategy with MS4Me message format.
235+
MQTT-based distributed DEVS simulation strategy with DEVSStreaming message format.
236236
237237
Orchestrates distributed DEVS simulation where:
238238
- Each atomic model runs in a worker thread
239239
- Workers communicate via MQTT topics
240240
- Coordinator manages simulation timing and routing
241-
- Messages follow MS4Me standardization
241+
- Messages follow DEVSStreaming standardization
242242
243243
Features:
244244
- Thread-based in-memory workers
@@ -262,7 +262,7 @@ def __init__(
262262
mqtt_password: str = None,
263263
):
264264
"""
265-
Initialize MQTT MS4Me simulation strategy.
265+
Initialize MQTT DEVSStreaming simulation strategy.
266266
267267
Args:
268268
simulator: PyDEVS simulator instance
@@ -349,7 +349,6 @@ def __init__(
349349
self.broker_address,
350350
self.broker_port,
351351
client_id=f"coordinator-{group_id}",
352-
wire_adapter=self.wire,
353352
username=self.mqtt_username,
354353
password=self.mqtt_password,
355354
)
@@ -358,7 +357,6 @@ def __init__(
358357
self.broker_address,
359358
self.broker_port,
360359
client_id=f"receiver-{group_id}",
361-
wire_adapter=self.wire,
362360
username=self.mqtt_username,
363361
password=self.mqtt_password,
364362
)
@@ -418,7 +416,6 @@ def _create_workers(self):
418416
broker_port=self.broker_port,
419417
in_topic=in_topic,
420418
out_topic=out_topic,
421-
wire_adapter=self.wire,
422419
username=self.mqtt_username,
423420
password=self.mqtt_password,
424421
)
@@ -470,7 +467,7 @@ def _await_msgs_from_mqtt(self, pending: Optional[List] = None) -> Dict:
470467
# ------------------------------------------------------------------
471468

472469
def _simulate_for_ms4me(self, T=1e8):
473-
"""Simulate using standard MQTT MS4Me message routing."""
470+
"""Simulate using standard MQTT DEVSStreaming message routing."""
474471
try:
475472
# STEP 0: distributed init
476473
logger.info("Initializing atomic models...")
@@ -687,7 +684,7 @@ def _simulate_for_ms4me(self, T=1e8):
687684
)
688685

689686
self._terminate_workers()
690-
logger.info("MS4Me MqttMS4Me Simulation Ended")
687+
logger.info("DEVSStreaming MqttMS4Me Simulation Ended")
691688

692689
# Call terminate to set end_flag and exit the simulation thread loop
693690
self._simulator.terminate()

devsimpy/DEVSKernel/BrokerDEVS/MS4Me/auto_broker.py renamed to devsimpy/DEVSKernel/BrokerDEVS/DEVSStreaming/auto_broker.py

File renamed without changes.

devsimpy/DEVSKernel/BrokerDEVS/MS4Me/auto_kafka.py renamed to devsimpy/DEVSKernel/BrokerDEVS/DEVSStreaming/auto_kafka.py

File renamed without changes.

0 commit comments

Comments
 (0)