forked from attwad/python-osc
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathosc_server.py
More file actions
193 lines (149 loc) · 6.25 KB
/
osc_server.py
File metadata and controls
193 lines (149 loc) · 6.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
"""OSC Servers that receive UDP packets and invoke handlers accordingly."""
import asyncio
import os
import socketserver
from socket import socket as _socket
from typing import Any, Coroutine, Tuple, Union, cast
from pythonosc import osc_bundle, osc_message
from pythonosc.dispatcher import Dispatcher
from pythonosc.osc_message_builder import build_msg
_RequestType = Union[_socket, Tuple[bytes, _socket]]
_AddressType = Union[Tuple[str, int], str]
class _UDPHandler(socketserver.BaseRequestHandler):
"""Handles correct UDP messages for all types of server."""
def __init__(self, request, client_address, server):
self.socket = request[1]
super().__init__(request, client_address, server)
def handle(self) -> None:
"""Calls the handlers via dispatcher
This method is called after a basic sanity check was done on the datagram,
whether this datagram looks like an osc message or bundle.
If not the server won't call it and so no new
threads/processes will be spawned.
"""
server = cast(OSCUDPServer, self.server)
resp = server.dispatcher.call_handlers_for_packet(
self.request[0], self.client_address
)
for r in resp:
if not isinstance(r, tuple):
r = [r]
msg = build_msg(r[0], r[1:])
self.socket.sendto(msg.dgram, self.client_address)
def _is_valid_request(request: _RequestType) -> bool:
"""Returns true if the request's data looks like an osc bundle or message.
Returns:
True if request is OSC bundle or OSC message
"""
assert isinstance(
request, tuple
) # TODO: handle requests which are passed just as a socket?
data = request[0]
return osc_bundle.OscBundle.dgram_is_bundle(
data
) or osc_message.OscMessage.dgram_is_message(data)
class OSCUDPServer(socketserver.UDPServer):
"""Superclass for different flavors of OSC UDP servers"""
def __init__(
self,
server_address: Tuple[str, int],
dispatcher: Dispatcher,
bind_and_activate: bool = True,
) -> None:
"""Initialize
Args:
server_address: IP and port of server
dispatcher: Dispatcher this server will use
(optional) bind_and_activate: default=True defines if the server has to start on call of constructor
"""
super().__init__(server_address, _UDPHandler, bind_and_activate)
self._dispatcher = dispatcher
def verify_request(
self, request: _RequestType, client_address: _AddressType
) -> bool:
"""Returns true if the data looks like a valid OSC UDP datagram
Args:
request: Incoming data
client_address: IP and port of client this message came from
Returns:
True if request is OSC bundle or OSC message
"""
return _is_valid_request(request)
@property
def dispatcher(self) -> Dispatcher:
return self._dispatcher
class BlockingOSCUDPServer(OSCUDPServer):
"""Blocking version of the UDP server.
Each message will be handled sequentially on the same thread.
Use this is you don't care about latency in your message handling or don't
have a multiprocess/multithread environment.
"""
class ThreadingOSCUDPServer(socketserver.ThreadingMixIn, OSCUDPServer):
"""Threading version of the OSC UDP server.
Each message will be handled in its own new thread.
Use this when lightweight operations are done by each message handlers.
"""
if hasattr(os, "fork"):
class ForkingOSCUDPServer(socketserver.ForkingMixIn, OSCUDPServer):
"""Forking version of the OSC UDP server.
Each message will be handled in its own new process.
Use this when heavyweight operations are done by each message handlers
and forking a whole new process for each of them is worth it.
"""
class AsyncIOOSCUDPServer:
"""Asynchronous OSC Server
An asynchronous OSC Server using UDP. It creates a datagram endpoint that runs in an event loop.
"""
def __init__(
self,
server_address: Tuple[str, int],
dispatcher: Dispatcher,
loop: asyncio.BaseEventLoop,
) -> None:
"""Initialize
Args:
server_address: IP and port of server
dispatcher: Dispatcher this server shall use
loop: Event loop to add the server task to. Use ``asyncio.get_event_loop()`` unless you know what you're
doing.
"""
self._server_address = server_address
self._dispatcher = dispatcher
self._loop = loop
class _OSCProtocolFactory(asyncio.DatagramProtocol):
"""OSC protocol factory which passes datagrams to dispatcher"""
def __init__(self, dispatcher: Dispatcher) -> None:
self.dispatcher = dispatcher
def connection_made(self, transport):
self.transport = transport
def datagram_received(
self, data: bytes, client_address: Tuple[str, int]
) -> None:
resp = self.dispatcher.call_handlers_for_packet(data, client_address)
for r in resp:
if not isinstance(r, tuple):
r = [r]
msg = build_msg(r[0], r[1:])
self.transport.sendto(msg.dgram, client_address)
def serve(self) -> None:
"""Creates a datagram endpoint and registers it with event loop.
Use this only in synchronous code (i.e. not from within a coroutine). This will start the server and run it
forever or until a ``stop()`` is called on the event loop.
"""
self._loop.run_until_complete(self.create_serve_endpoint())
def create_serve_endpoint(
self,
) -> Coroutine[
Any, Any, Tuple[asyncio.transports.BaseTransport, asyncio.DatagramProtocol]
]:
"""Creates a datagram endpoint and registers it with event loop as coroutine.
Returns:
Awaitable coroutine that returns transport and protocol objects
"""
return self._loop.create_datagram_endpoint(
lambda: self._OSCProtocolFactory(self.dispatcher),
local_addr=self._server_address,
)
@property
def dispatcher(self) -> Dispatcher:
return self._dispatcher