Skip to content

Commit 2f257fe

Browse files
fix issues causing command tests to fall and fix and issue in the api_helper where res_id was ignored
1 parent a0eb68f commit 2f257fe

File tree

3 files changed

+106
-32
lines changed

3 files changed

+106
-32
lines changed

conSys4Py/comm/mqtt.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ def __init__(self, url, port=1883, username=None, password=None, path='mqtt', cl
2020
self.__client_id = client_id
2121
self.__transport = transport
2222

23-
self.__client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
23+
self.__client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=client_id)
2424

2525
if self.__transport == 'websockets':
2626
self.__client.ws_set_options(path=self.__path)
2727

2828
if username is not None and password is not None:
2929
self.__client.username_pw_set(username, password)
30-
self.__client.tls_set()
30+
self.__client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLSv1_2)
3131

3232
self.__client.on_connect = self.on_connect
3333
self.__client.on_subscribe = self.on_subscribe
@@ -52,7 +52,7 @@ def on_message(client, userdata, msg):
5252
print(f'{msg.payload.decode("utf-8")}')
5353

5454
@staticmethod
55-
def on_publish(client, userdata, mid):
55+
def on_publish(client, userdata, mid, info, properties):
5656
print(f'Published: {mid}')
5757

5858
@staticmethod
@@ -61,11 +61,11 @@ def on_log(client, userdata, level, buf):
6161

6262
@staticmethod
6363
def on_disconnect(client, userdata, dc_flag, rc, properties):
64-
print(f'Disconnected: {rc}')
64+
print(f'Client {client} disconnected: {dc_flag} {rc}')
6565

66-
def connect(self):
67-
print(f'Connecting to {self.__url}:{self.__port}')
68-
self.__client.connect(self.__url, self.__port)
66+
def connect(self, keepalive=60):
67+
# print(f'Connecting to {self.__url}:{self.__port}')
68+
self.__client.connect(self.__url, self.__port, keepalive=keepalive)
6969

7070
def subscribe(self, topic, qos=0, msg_callback=None):
7171
"""
@@ -182,13 +182,13 @@ def __toggle_is_connected(self):
182182
def is_connected(self):
183183
return self.__is_connected
184184

185-
# def publish(self, topic, msg):
186-
# self.__client.publish(topic, msg, 1)
187-
188185
@staticmethod
189186
def publish_single(self, topic, msg):
190187
self.__client.single(topic, msg, 0)
191188

192189
@staticmethod
193190
def publish_multiple(self, topic, msgs):
194191
self.__client.multiple(msgs, )
192+
193+
def tls_set(self):
194+
self.__client.tls_set()

conSys4Py/core/default_api_helpers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ def create_resource(self, res_type: APIResourceTypes, json_data: any, parent_res
102102
body=json_data, headers=req_headers)
103103
return api_request.make_request()
104104

105-
def retrieve_resource(self, res_type: APIResourceTypes, res_id: str, parent_res_id: str = None,
105+
def retrieve_resource(self, res_type: APIResourceTypes, res_id: str = None, parent_res_id: str = None,
106106
from_collection: bool = False,
107107
collection_id: str = None, url_endpoint: str = None, req_headers: dict = None):
108108
"""
@@ -118,7 +118,7 @@ def retrieve_resource(self, res_type: APIResourceTypes, res_id: str, parent_res_
118118
:return:
119119
"""
120120
if url_endpoint is None:
121-
url = self.resource_url_resolver(res_type, None, parent_res_id, from_collection)
121+
url = self.resource_url_resolver(res_type, res_id, parent_res_id, from_collection)
122122
else:
123123
url = f'{self.server_url}/{self.api_root}/{url_endpoint}'
124124
api_request = ConnectedSystemAPIRequest(url=url, request_method='GET', auth=self.get_helper_auth(),

tests/test_commands.py

Lines changed: 94 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def test_setup():
6262
# print(cmd_resp)
6363

6464

65-
def test_subscribe_and_command():
65+
def subscribe_and_command():
6666
mqtt_client = MQTTCommClient(url='localhost')
6767

6868
control_streams = ControlChannels.list_all_control_streams(server_url).json()
@@ -74,12 +74,17 @@ def on_message_command(client, userdata, msg):
7474
print("Received Command")
7575
print(f'{msg.payload.decode("utf-8")}')
7676
control_stream_id = control_id
77+
78+
payload = msg.payload.decode("utf-8")
79+
p_dict = json.loads(payload)
80+
id = p_dict["id"]
81+
7782
resp = {
78-
'id': '*******',
83+
'id': id,
7984
'command@id': control_stream_id,
8085
'statusCode': 'COMPLETED'
8186
}
82-
# client.publish(f'/api/controls/{control_stream_id}/status', payload=json.dumps(resp), qos=1)
87+
client.publish(f'/api/controls/{control_stream_id}/status', payload=json.dumps(resp), qos=1)
8388

8489
def on_message_all(client, userdata, msg):
8590
print(f'\nReceived Message:{msg}')
@@ -93,36 +98,105 @@ def on_message_all(client, userdata, msg):
9398
mqtt_client.start()
9499

95100
time.sleep(2)
96-
command_json = CommandJSON(control_id=control_streams["items"][0]["id"],
97-
issue_time=datetime.now().isoformat() + 'Z',
98-
params={"timestamp": datetime.now().timestamp() * 1000, "testcount": 1})
99101

100-
print(f'Issuing Command: {command_json.model_dump_json(exclude_none=True, by_alias=True)}')
101-
cmd_resp = Commands.send_commands_to_specific_control_stream(server_url, control_streams["items"][0]["id"],
102-
command_json.model_dump_json(exclude_none=True,
103-
by_alias=True),
104-
headers=json_headers)
105-
# try issuing a command from the MQTT client
102+
# print(f'Issuing Command: {command_json.model_dump_json(exclude_none=True, by_alias=True)}')
103+
# cmd_resp = Commands.send_commands_to_specific_control_stream(server_url, control_streams["items"][0]["id"],
104+
# command_json.model_dump_json(exclude_none=True,
105+
# by_alias=True),
106+
# headers=json_headers)
107+
# # try issuing a command from the MQTT client
106108
# mqtt_client.publish(f'/api/controls/{control_id}/commands', command_json.model_dump_json(exclude_none=True,
107109
# by_alias=True),
108110
# 1)
109111
# print(f'\n*****Command Response: {cmd_resp}*****')
110-
status_resp = {
111-
'id': '*******',
112-
'command@id': "unknown",
113-
'statusCode': 'COMPLETED'
114-
}
112+
# status_resp = {
113+
# 'id': '*******',
114+
# 'command@id': "unknown",
115+
# 'statusCode': 'COMPLETED'
116+
# }
115117
# Commands.add_command_status_reports(server_url, "0", json.dumps(status_resp))
116118

117119

118-
def test_command_dahua():
120+
def command_dahua():
119121
system_id = "tstk16o31es4m"
120122
control_stream_id = "k08p16h6k4a6c"
121123
control_input = CommandJSON(control_id=control_stream_id, issue_time=datetime.now().isoformat() + 'Z',
122124
params={"pan": 180})
123125
print(f'Issuing Command: {control_input.model_dump_json(exclude_none=True, by_alias=True)}')
124126
cmd_resp = Commands.send_commands_to_specific_control_stream(server_url, control_stream_id,
125127
control_input.model_dump_json(exclude_none=True,
126-
by_alias=True),
128+
by_alias=True),
127129
headers=json_headers)
128-
print(f'\n*****Command Response: {cmd_resp}*****')
130+
print(f'\n*****Command Response: {cmd_resp}*****')
131+
132+
133+
def create_status_listener_client(control_id: str):
134+
def on_connect(client, userdata, flags, rc, props=None):
135+
print(f"Status Updater Client connected with result code {rc}")
136+
137+
def on_command(client, userdata, msg):
138+
payload = msg.payload.decode("utf-8")
139+
print(f"Received Command: {payload}")
140+
p_dict = json.loads(payload)
141+
id = p_dict["id"]
142+
143+
resp = {
144+
'id': id,
145+
# 'command@id': control_id,
146+
'statusCode': 'COMPLETED'
147+
}
148+
149+
print(f'Issuing Status: {resp}')
150+
151+
client.publish(f'/api/controls/{control_id}/status', payload=json.dumps(resp), qos=1)
152+
153+
mqtt_client = MQTTCommClient(url='localhost')
154+
mqtt_client.set_on_connect(on_connect)
155+
mqtt_client.connect(keepalive=60)
156+
mqtt_client.set_on_message_callback(f'/api/controls/{control_id}/commands', on_command)
157+
mqtt_client.subscribe(f'/api/controls/{control_id}/commands')
158+
mqtt_client.start()
159+
160+
return mqtt_client
161+
162+
163+
def create_command_client(control_id: str):
164+
def on_connect(client, userdata, flags, rc, props=None):
165+
print(f"Command Client connected with result code {rc}")
166+
167+
def on_status(client, userdata, msg):
168+
print("")
169+
print(f"Received Status: {msg.payload.decode('utf-8')}")
170+
print("")
171+
172+
mqtt_client = MQTTCommClient(url='localhost')
173+
mqtt_client.set_on_connect(on_connect)
174+
mqtt_client.connect()
175+
mqtt_client.set_on_message_callback(f'/api/controls/o1l72d8md66a0/status', on_status)
176+
mqtt_client.subscribe(f'/api/controls/o1l72d8md66a0/status')
177+
mqtt_client.start()
178+
179+
return mqtt_client
180+
181+
182+
def create_test_command(control_id):
183+
command_json = CommandJSON(control_id=control_id,
184+
issue_time=datetime.now().isoformat() + 'Z',
185+
params={"timestamp": datetime.now().timestamp() * 1000, "testcount": 1})
186+
187+
return command_json.model_dump_json(exclude_none=True, by_alias=True)
188+
189+
190+
def test_command_with_status_updates():
191+
control_streams = ControlChannels.list_all_control_streams(server_url).json()
192+
control_id = control_streams["items"][0]["id"]
193+
194+
# Create a Command Client For Status Updates
195+
status_updater = create_status_listener_client(control_id)
196+
command_sender = create_command_client(control_id)
197+
198+
# Wait for a bit
199+
time.sleep(1)
200+
201+
# Send a Command
202+
command_sender.publish(f'/api/controls/{control_id}/commands', create_test_command(control_id), 0)

0 commit comments

Comments
 (0)