-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathsse_manager.py
More file actions
76 lines (68 loc) · 2.88 KB
/
sse_manager.py
File metadata and controls
76 lines (68 loc) · 2.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import threading
import ld_eventsource
import ld_eventsource.actions
import logging
import ld_eventsource.config
from typing import Callable
logger = logging.getLogger(__name__)
class SSEManager:
def __init__(
self,
handle_state: Callable[[ld_eventsource.actions.Start], None],
handle_error: Callable[[ld_eventsource.actions.Fault], None],
handle_message: Callable[[ld_eventsource.actions.Event], None],
):
self.client: ld_eventsource.SSEClient = None
self.url = ""
self.handle_state = handle_state
self.handle_error = handle_error
self.handle_message = handle_message
self.read_thread = threading.Thread(
target=self.read_events,
args=(self.handle_state, self.handle_error, self.handle_message),
)
def read_events(
self,
handle_state: Callable[[ld_eventsource.actions.Start], None],
handle_error: Callable[[ld_eventsource.actions.Fault], None],
handle_message: Callable[[ld_eventsource.actions.Event], None],
):
try:
self.client.start()
logger.info("DevCycle: SSE connection created successfully")
for event in self.client.all:
if isinstance(event, ld_eventsource.actions.Start):
handle_state(event)
elif isinstance(event, ld_eventsource.actions.Fault):
handle_error(event)
elif isinstance(event, ld_eventsource.actions.Event):
handle_message(event)
elif isinstance(event, ld_eventsource.actions.Comment):
handle_state(None)
except Exception as e:
logger.debug(f"DevCycle SSE: Error in read loop: {e}")
fault_event = ld_eventsource.actions.Fault(error=e)
handle_error(fault_event)
finally:
logger.debug("DevCycle SSE: Connection closed")
def update(self, config: dict):
if self.use_new_config(config["sse"]):
self.url = config["sse"]["hostname"] + config["sse"]["path"]
if self.client is not None:
self.client.close()
if self.read_thread.is_alive():
self.read_thread.join()
self.client = ld_eventsource.SSEClient(
connect=ld_eventsource.config.ConnectStrategy.http(self.url),
error_strategy=ld_eventsource.config.ErrorStrategy.CONTINUE,
)
self.read_thread = threading.Thread(
target=self.read_events,
args=(self.handle_state, self.handle_error, self.handle_message),
)
self.read_thread.start()
def use_new_config(self, config: dict) -> bool:
new_url = config["hostname"] + config["path"]
if (self.url == "" or self.url is None) and new_url != "":
return True
return self.url != new_url