Skip to content

Commit 5cff6d4

Browse files
committed
Refactor: Refactor the RadasReceiver
* Support timeout mechanism for Receiver exit
1 parent 6949e52 commit 5cff6d4

File tree

4 files changed

+214
-35
lines changed

4 files changed

+214
-35
lines changed

charon/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def __init__(self, data: Dict):
4242
self.__radas_sign_timeout_retry_interval: int = data.get(
4343
"radas_sign_timeout_retry_interval", 60
4444
)
45+
self.__radas_receiver_timeout: int = int(data.get("radas_receiver_timeout", 1800))
4546

4647
def validate(self) -> bool:
4748
if not self.__umb_host:
@@ -112,6 +113,9 @@ def radas_sign_timeout_retry_count(self) -> int:
112113
def radas_sign_timeout_retry_interval(self) -> int:
113114
return self.__radas_sign_timeout_retry_interval
114115

116+
def receiver_timeout(self) -> int:
117+
return self.__radas_receiver_timeout
118+
115119

116120
class CharonConfig(object):
117121
"""CharonConfig is used to store all configurations for charon

charon/pkgs/radas_sign.py

Lines changed: 79 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717
import logging
1818
import json
1919
import os
20-
import asyncio
2120
import sys
21+
import asyncio
2222
import uuid
23+
import time
2324
from typing import List, Any, Tuple, Callable, Dict, Optional
2425
from charon.config import RadasConfig
2526
from charon.pkgs.oras_client import OrasClient
26-
from proton import SSLDomain, Message, Event, Sender
27+
from proton import SSLDomain, Message, Event, Sender, Connection
2728
from proton.handlers import MessagingHandler
2829
from proton.reactor import Container
2930

@@ -40,6 +41,8 @@ class RadasReceiver(MessagingHandler):
4041
from the cmd flag,should register UmbListener when the client starts
4142
request_id (str):
4243
Identifier of the request for the signing result
44+
rconf (RadasConfig):
45+
the configurations for the radas messaging system.
4346
sign_result_status (str):
4447
Result of the signing(success/failed)
4548
sign_result_errors (list):
@@ -50,10 +53,13 @@ def __init__(self, sign_result_loc: str, request_id: str, rconf: RadasConfig) ->
5053
super().__init__()
5154
self.sign_result_loc = sign_result_loc
5255
self.request_id = request_id
53-
self.conn = None
56+
self.conn: Optional[Connection] = None
57+
self.message_handled = False
5458
self.sign_result_status: Optional[str] = None
5559
self.sign_result_errors: List[str] = []
5660
self.rconf = rconf
61+
self.start_time = 0.0
62+
self.timeout_check_delay = 30.0
5763
self.ssl = SSLDomain(SSLDomain.MODE_CLIENT)
5864
self.ssl.set_trusted_ca_db(self.rconf.root_ca())
5965
self.ssl.set_peer_authentication(SSLDomain.VERIFY_PEER)
@@ -62,27 +68,58 @@ def __init__(self, sign_result_loc: str, request_id: str, rconf: RadasConfig) ->
6268
self.rconf.client_key(),
6369
self.rconf.client_key_password()
6470
)
71+
self.log = logging.getLogger("charon.pkgs.radas_sign.RadasReceiver")
6572

6673
def on_start(self, event: Event) -> None:
67-
self.conn = event.container.connect(
68-
url=self.rconf.umb_target(),
69-
ssl_domain=self.ssl
74+
umb_target = self.rconf.umb_target()
75+
container = event.container
76+
self.conn = container.connect(
77+
url=umb_target,
78+
ssl_domain=self.ssl,
79+
heartbeat=500
7080
)
71-
event.container.create_receiver(
72-
self.conn, self.rconf.result_queue(), dynamic=True
81+
receiver = container.create_receiver(
82+
context=self.conn, source=self.rconf.result_queue(),
7383
)
74-
logger.info("Listening on %s, queue: %s",
75-
self.rconf.umb_target(),
76-
self.rconf.result_queue())
84+
self.log.info("Listening on %s, queue: %s",
85+
umb_target,
86+
receiver.source.address)
87+
self.start_time = time.time()
88+
container.schedule(self.timeout_check_delay, self)
89+
90+
def on_timer_task(self, event: Event) -> None:
91+
current = time.time()
92+
timeout = self.rconf.receiver_timeout()
93+
idle_time = current - self.start_time
94+
self.log.debug("Checking timeout: passed %s seconds, timeout time %s seconds",
95+
idle_time, timeout)
96+
if idle_time > self.rconf.receiver_timeout():
97+
self.log.error("The receiver did not receive messages for more than %s seconds,"
98+
" and needs to stop receiving and quit.", timeout)
99+
self._close(event)
100+
else:
101+
event.container.schedule(self.timeout_check_delay, self)
77102

78103
def on_message(self, event: Event) -> None:
104+
self.log.debug("Got message: %s", event.message.body)
79105
self._process_message(event.message.body)
106+
if self.message_handled:
107+
self.log.debug("The signing result is handled.")
108+
self._close(event)
80109

81-
def on_connection_error(self, event: Event) -> None:
82-
logger.error("Received an error event:\n%s", event)
110+
def on_error(self, event: Event) -> None:
111+
self.log.error("Received an error event:\n%s", event.message.body)
83112

84113
def on_disconnected(self, event: Event) -> None:
85-
logger.error("Disconnected from AMQP broker.")
114+
self.log.info("Disconnected from AMQP broker: %s",
115+
event.connection.connected_address)
116+
117+
def _close(self, event: Event) -> None:
118+
if event:
119+
if event.connection:
120+
event.connection.close()
121+
if event.container:
122+
event.container.stop()
86123

87124
def _process_message(self, msg: Any) -> None:
88125
"""
@@ -93,32 +130,37 @@ def _process_message(self, msg: Any) -> None:
93130
msg_dict = json.loads(msg)
94131
msg_request_id = msg_dict.get("request_id")
95132
if msg_request_id != self.request_id:
96-
logger.info(
133+
self.log.info(
97134
"Message request_id %s does not match the request_id %s from sender, ignoring",
98135
msg_request_id,
99136
self.request_id,
100137
)
101138
return
102139

103-
logger.info(
140+
self.message_handled = True
141+
self.log.info(
104142
"Start to process the sign event message, request_id %s is matched", msg_request_id
105143
)
106144
self.sign_result_status = msg_dict.get("signing_status")
107145
self.sign_result_errors = msg_dict.get("errors", [])
108-
result_reference_url = msg_dict.get("result_reference")
109-
if not result_reference_url:
110-
logger.warning("Not found result_reference in message,ignore.")
111-
return
146+
if self.sign_result_status == "success":
147+
result_reference_url = msg_dict.get("result_reference")
148+
if not result_reference_url:
149+
self.log.warning("Not found result_reference in message,ignore.")
150+
return
112151

113-
logger.info("Using SIGN RESULT LOC: %s", self.sign_result_loc)
114-
sign_result_parent_dir = os.path.dirname(self.sign_result_loc)
115-
os.makedirs(sign_result_parent_dir, exist_ok=True)
152+
self.log.info("Using SIGN RESULT LOC: %s", self.sign_result_loc)
153+
sign_result_parent_dir = os.path.dirname(self.sign_result_loc)
154+
os.makedirs(sign_result_parent_dir, exist_ok=True)
116155

117-
oras_client = OrasClient()
118-
files = oras_client.pull(
119-
result_reference_url=result_reference_url, sign_result_loc=self.sign_result_loc
120-
)
121-
logger.info("Number of files pulled: %d, path: %s", len(files), files[0])
156+
oras_client = OrasClient()
157+
files = oras_client.pull(
158+
result_reference_url=result_reference_url, sign_result_loc=self.sign_result_loc
159+
)
160+
self.log.info("Number of files pulled: %d, path: %s", len(files), files[0])
161+
else:
162+
self.log.error("The signing result received with failed status. Errors: %s",
163+
self.sign_result_errors)
122164

123165

124166
class RadasSender(MessagingHandler):
@@ -141,7 +183,6 @@ def __init__(self, payload: Any, rconf: RadasConfig):
141183
self.message: Optional[Message] = None
142184
self.container: Optional[Container] = None
143185
self.sender: Optional[Sender] = None
144-
self.log = logging.getLogger("charon.pkgs.radas_sign.RadasSender")
145186
self.ssl = SSLDomain(SSLDomain.MODE_CLIENT)
146187
self.ssl.set_trusted_ca_db(self.rconf.root_ca())
147188
self.ssl.set_peer_authentication(SSLDomain.VERIFY_PEER)
@@ -150,6 +191,7 @@ def __init__(self, payload: Any, rconf: RadasConfig):
150191
self.rconf.client_key(),
151192
self.rconf.client_key_password()
152193
)
194+
self.log = logging.getLogger("charon.pkgs.radas_sign.RadasSender")
153195

154196
def on_start(self, event):
155197
self.container = event.container
@@ -329,7 +371,6 @@ def sign_in_radas(repo_url: str,
329371
repo_url, requester, sign_key, result_path)
330372
request_id = str(uuid.uuid4())
331373
exclude = ignore_patterns if ignore_patterns else []
332-
333374
payload = {
334375
"request_id": request_id,
335376
"requested_by": requester,
@@ -347,8 +388,12 @@ def sign_in_radas(repo_url: str,
347388
logger.error("Something wrong happened in message sending, see logs")
348389
sys.exit(1)
349390

350-
listener = RadasReceiver(result_path, request_id, radas_config)
351-
Container(listener).run()
391+
# request_id = "some-request-id-1" # for test purpose
392+
receiver = RadasReceiver(result_path, request_id, radas_config)
393+
Container(receiver).run()
352394

353-
if listener.conn:
354-
listener.conn.close()
395+
status = receiver.sign_result_status
396+
if status != "success":
397+
logger.error("The signing result is processed with errors: %s",
398+
receiver.sign_result_errors)
399+
sys.exit(1)

tests/test_radas_sign_receiver.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
from unittest import mock
2+
import unittest
3+
import tempfile
4+
import time
5+
import json
6+
from charon.pkgs.radas_sign import RadasReceiver
7+
8+
9+
class RadasSignReceiverTest(unittest.TestCase):
10+
def setUp(self) -> None:
11+
super().setUp()
12+
13+
def tearDown(self) -> None:
14+
super().tearDown()
15+
16+
def reset_receiver(self, r_receiver: RadasReceiver) -> None:
17+
r_receiver.message_handled = False
18+
r_receiver.sign_result_errors = []
19+
r_receiver.sign_result_status = None
20+
21+
def test_radas_receiver(self):
22+
# Mock configuration
23+
mock_radas_config = mock.MagicMock()
24+
mock_radas_config.validate.return_value = True
25+
mock_radas_config.client_ca.return_value = "test-client-ca"
26+
mock_radas_config.client_key.return_value = "test-client-key"
27+
mock_radas_config.client_key_password.return_value = "test-client-key-pass"
28+
mock_radas_config.root_ca.return_value = "test-root-ca"
29+
mock_radas_config.receiver_timeout.return_value = 60
30+
31+
# Mock Container run to avoid real AMQP connection
32+
with mock.patch(
33+
"charon.pkgs.radas_sign.Container") as mock_container, \
34+
mock.patch("charon.pkgs.radas_sign.SSLDomain") as ssl_domain, \
35+
mock.patch("charon.pkgs.radas_sign.OrasClient") as oras_client, \
36+
mock.patch("charon.pkgs.radas_sign.Event") as event:
37+
test_result_path = tempfile.mkdtemp()
38+
test_request_id = "test-request-id"
39+
r_receiver = RadasReceiver(test_result_path, test_request_id, mock_radas_config)
40+
self.assertEqual(ssl_domain.call_count, 1)
41+
self.assertEqual(r_receiver.sign_result_loc, test_result_path)
42+
self.assertEqual(r_receiver.request_id, test_request_id)
43+
44+
# prepare mock
45+
mock_receiver = mock.MagicMock()
46+
mock_conn = mock.MagicMock()
47+
mock_container.connect.return_value = mock_conn
48+
mock_container.create_receiver.return_value = mock_receiver
49+
event.container = mock_container
50+
event.message = mock.MagicMock()
51+
event.connection = mock.MagicMock()
52+
53+
# test on_start
54+
r_receiver.on_start(event)
55+
self.assertEqual(mock_container.connect.call_count, 1)
56+
self.assertEqual(mock_container.create_receiver.call_count, 1)
57+
self.assertTrue(r_receiver.start_time > 0.0)
58+
self.assertTrue(r_receiver.start_time < time.time())
59+
self.assertEqual(mock_container.schedule.call_count, 1)
60+
61+
# test on_message: unmatched case
62+
test_ummatch_result = {
63+
"request_id": "test-request-id-no-match",
64+
"file_reference": "quay.io/example/test-repo",
65+
"result_reference": "quay.io/example-sign/sign-repo",
66+
"sig_keyname": "testkey",
67+
"signing_status": "success",
68+
"errors": []
69+
}
70+
event.message.body = json.dumps(test_ummatch_result)
71+
r_receiver.on_message(event)
72+
self.assertEqual(event.connection.close.call_count, 0)
73+
self.assertEqual(mock_container.stop.call_count, 0)
74+
self.assertFalse(r_receiver.message_handled)
75+
self.assertIsNone(r_receiver.sign_result_status)
76+
self.assertEqual(r_receiver.sign_result_errors, [])
77+
self.assertEqual(oras_client.call_count, 0)
78+
79+
# test on_message: matched case with failed status
80+
self.reset_receiver(r_receiver)
81+
test_failed_result = {
82+
"request_id": "test-request-id",
83+
"file_reference": "quay.io/example/test-repo",
84+
"result_reference": "quay.io/example-sign/sign-repo",
85+
"sig_keyname": "testkey",
86+
"signing_status": "failed",
87+
"errors": ["error1", "error2"]
88+
}
89+
event.message.body = json.dumps(test_failed_result)
90+
r_receiver.on_message(event)
91+
self.assertEqual(event.connection.close.call_count, 1)
92+
self.assertEqual(mock_container.stop.call_count, 1)
93+
self.assertTrue(r_receiver.message_handled)
94+
self.assertEqual(r_receiver.sign_result_status, "failed")
95+
self.assertEqual(r_receiver.sign_result_errors, ["error1", "error2"])
96+
self.assertEqual(oras_client.call_count, 0)
97+
98+
# test on_message: matched case with success status
99+
self.reset_receiver(r_receiver)
100+
test_success_result = {
101+
"request_id": "test-request-id",
102+
"file_reference": "quay.io/example/test-repo",
103+
"result_reference": "quay.io/example-sign/sign-repo",
104+
"sig_keyname": "testkey",
105+
"signing_status": "success",
106+
"errors": []
107+
}
108+
event.message.body = json.dumps(test_success_result)
109+
r_receiver.on_message(event)
110+
self.assertEqual(event.connection.close.call_count, 2)
111+
self.assertEqual(mock_container.stop.call_count, 2)
112+
self.assertTrue(r_receiver.message_handled)
113+
self.assertEqual(r_receiver.sign_result_status, "success")
114+
self.assertEqual(r_receiver.sign_result_errors, [])
115+
self.assertEqual(oras_client.call_count, 1)
116+
oras_client_call = oras_client.return_value
117+
self.assertEqual(oras_client_call.pull.call_count, 1)
118+
119+
# test on_timer_task: not timeout
120+
r_receiver.on_timer_task(event)
121+
self.assertEqual(event.connection.close.call_count, 2)
122+
self.assertEqual(mock_container.stop.call_count, 2)
123+
self.assertEqual(mock_container.schedule.call_count, 2)
124+
125+
# test on_timer_task: timeout
126+
mock_radas_config.receiver_timeout.return_value = 0
127+
r_receiver.on_timer_task(event)
128+
self.assertEqual(event.connection.close.call_count, 3)
129+
self.assertEqual(mock_container.stop.call_count, 3)
130+
self.assertEqual(mock_container.schedule.call_count, 2)

tests/test_radas_sign_sender.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from charon.pkgs.radas_sign import RadasSender
55

66

7-
class RadasSignHandlerTest(unittest.TestCase):
7+
class RadasSignSenderTest(unittest.TestCase):
88
def setUp(self) -> None:
99
super().setUp()
1010

0 commit comments

Comments
 (0)