@@ -41,71 +41,78 @@ def data_received(self, data):
4141
4242 async def process_requests (self ):
4343 try :
44- async for hs , meta , name , params in self .unpacker :
45- if hs is not None :
46- out = bytes (hs )
47- out = struct .pack (">L" , len (out )) + out
48- self .transport .write (out )
49- if hs .match == "NONE" :
50- name = ""
51-
52- out_meta = io .BytesIO ()
53- fastavro .schemaless_writer (
54- out_meta , {"type" : "map" , "values" : "bytes" }, meta
55- )
56- length = out_meta .tell ()
57- self .transport .write (struct .pack (">L" , length ) + out_meta .getvalue ())
58- self .logger .debug (f"Wrote meta, { meta } , { out_meta .getvalue ()} " )
59- try :
60- response_out = io .BytesIO ()
61- response = None
62- response_schema = "null"
63- if name :
64- fun = getattr (self ._daemon , name )
65- if params is None :
66- params = []
67- response = fun (* params )
68- response_schema = fastavro .parse_schema (
69- self ._avro_protocol ["messages" ][name ].get (
70- "response" , "null"
71- ),
72- expand = True ,
73- named_schemas = self ._named_types ,
74- )
75- # Needed twice for nested types... Probably can be fixed upstream
76- response_schema = fastavro .parse_schema (
77- response_schema ,
78- expand = True ,
79- named_schemas = self ._named_types ,
80- )
81- fastavro .schemaless_writer (response_out , response_schema , response )
82- except Exception as e :
83- self .logger .error (f"Caught exception: { type (e )} in message { name } " )
84- self .logger .debug (traceback .format_exc ())
85- self .transport .write (struct .pack (">L" , 1 ) + b"\1 " )
86- error_out = io .BytesIO ()
87- fastavro .schemaless_writer (error_out , ["string" ], repr (e ))
88- length = error_out .tell ()
89- self .transport .write (
90- struct .pack (">L" , length ) + error_out .getvalue ()
91- )
92- else :
93- self .transport .write (struct .pack (">L" , 1 ) + b"\0 " )
94- self .logger .debug (f"Wrote non-error flag" )
95- length = response_out .tell ()
96- self .transport .write (
97- struct .pack (">L" , length ) + response_out .getvalue ()
98- )
99- self .logger .debug (
100- f"Wrote response { response } , { response_out .getvalue ()} "
101- )
102- self .transport .write (struct .pack (">L" , 0 ))
103- if name == "shutdown" :
104- self .logger .debug ("Closing transport" )
105- self .transport .close ()
44+ await self ._process_requests ()
10645 except asyncio .CancelledError as e :
107- self .logger .debug ("task cancellation caught " )
46+ self .logger .debug ("cancelling process_requests " )
10847 await self .unpacker .__aexit__ (None , None , None )
10948 self .transport .close ()
110- self .logger .debug (f"file closed? { self .unpacker ._file .closed } " )
11149 raise e
50+
51+ async def _process_requests (self ):
52+ async for hs , meta , name , params in self .unpacker :
53+ if hs is not None :
54+ out = bytes (hs )
55+ out = struct .pack (">L" , len (out )) + out
56+ self .transport .write (out )
57+ if hs .match == "NONE" :
58+ name = ""
59+
60+ meta_out = io .BytesIO ()
61+ fastavro .schemaless_writer (
62+ meta_out , {"type" : "map" , "values" : "bytes" }, meta
63+ )
64+ length = meta_out .tell ()
65+ self .transport .write (struct .pack (">L" , length ) + meta_out .getvalue ())
66+ self .logger .debug (f"Wrote meta, { meta } , { meta_out .getvalue ()} " )
67+ try :
68+ response_out = io .BytesIO ()
69+ response = None
70+ response_schema = "null"
71+ if name :
72+ fun = getattr (self ._daemon , name )
73+ if params is None :
74+ params = []
75+ response = fun (* params )
76+ response_schema = fastavro .parse_schema (
77+ self ._avro_protocol ["messages" ][name ].get (
78+ "response" , "null"
79+ ),
80+ expand = True ,
81+ named_schemas = self ._named_types ,
82+ )
83+ # Needed twice for nested types... Probably can be fixed upstream
84+ response_schema = fastavro .parse_schema (
85+ response_schema ,
86+ expand = True ,
87+ named_schemas = self ._named_types ,
88+ )
89+ fastavro .schemaless_writer (response_out , response_schema , response )
90+ except Exception as e :
91+ self .logger .error (f"Caught exception: { type (e )} in message { name } " )
92+ self .logger .debug (traceback .format_exc ())
93+ self .transport .write (struct .pack (">L" , 1 ) + b"\1 " )
94+ error_out = io .BytesIO ()
95+ fastavro .schemaless_writer (error_out , ["string" ], repr (e ))
96+ length = error_out .tell ()
97+ self .transport .write (
98+ struct .pack (">L" , length ) + error_out .getvalue ()
99+ )
100+ error_out .close ()
101+ else :
102+ self .transport .write (struct .pack (">L" , 1 ) + b"\0 " )
103+ self .logger .debug (f"Wrote non-error flag" )
104+ length = response_out .tell ()
105+ self .transport .write (
106+ struct .pack (">L" , length ) + response_out .getvalue ()
107+ )
108+ self .logger .debug (
109+ f"Wrote response { response } , { response_out .getvalue ()} "
110+ )
111+ finally :
112+ response_out .close ()
113+ meta_out .close ()
114+ self .transport .write (struct .pack (">L" , 0 ))
115+ self .unpacker ._file = io .BytesIO ()
116+ if name == "shutdown" :
117+ self .logger .debug ("Closing transport" )
118+ self .transport .close ()
0 commit comments