-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstdio_server.py
More file actions
49 lines (34 loc) · 1.28 KB
/
stdio_server.py
File metadata and controls
49 lines (34 loc) · 1.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import sys
from typing import Optional
import anyio
from anyio.streams.file import FileReadStream
from anyio.streams.file import FileWriteStream
from anyio.streams.stapled import StapledByteStream
from .handler import Handler
from .stream_io import PrefixStreamIO
from .stream_io import StreamIO
from .stream_io import StreamServer
class StdioClient(StreamIO):
def __init__(self):
self._stdout_buffer = sys.stdout.buffer
self.stream = StapledByteStream(
FileWriteStream(self._stdout_buffer),
FileReadStream(sys.stdin.buffer),
)
async def read(self, size: int) -> bytes:
return await self.stream.receive(size)
async def write(self, data: bytes):
await self.stream.send(data)
await anyio.to_thread.run_sync(self._stdout_buffer.flush)
async def close(self):
await self.stream.aclose()
class StdioServer(StreamServer):
_client: StdioClient
def __init__(self, handler: Optional[Handler] = None):
super().__init__(handler)
def wrap_io(self, client: StreamIO) -> StreamIO:
return PrefixStreamIO(client)
async def run(self):
print("StdioServer started", file=sys.stderr, flush=True)
self._client = StdioClient()
await self._handle_client(self._client)