|
1 | | -from collections.abc import Iterable |
2 | | - |
3 | 1 | from pynumaflow.shared.asynciter import NonBlockingIterator |
4 | 2 | from pynumaflow.sourcer import ReadRequest, Message, UserMetadata |
5 | 3 | from pynumaflow.sourcer import ( |
@@ -56,28 +54,6 @@ async def partitions_handler(self) -> PartitionsResponse: |
56 | 54 | return PartitionsResponse(partitions=mock_partitions()) |
57 | 55 |
|
58 | 56 |
|
59 | | -class SyncSource(Sourcer): |
60 | | - def read_handler(self, datum: ReadRequest) -> Iterable[Message]: |
61 | | - payload = b"payload:test_mock_message" |
62 | | - keys = ["test_key"] |
63 | | - offset = mock_offset() |
64 | | - event_time = mock_event_time() |
65 | | - for i in range(10): |
66 | | - yield Message(payload=payload, keys=keys, offset=offset, event_time=event_time) |
67 | | - |
68 | | - def ack_handler(self, ack_request: AckRequest): |
69 | | - return |
70 | | - |
71 | | - def nack_handler(self, nack_request: NackRequest): |
72 | | - return |
73 | | - |
74 | | - def pending_handler(self) -> PendingResponse: |
75 | | - return PendingResponse(count=10) |
76 | | - |
77 | | - def partitions_handler(self) -> PartitionsResponse: |
78 | | - return PartitionsResponse(partitions=mock_partitions()) |
79 | | - |
80 | | - |
81 | 57 | def read_req_source_fn() -> ReadRequest: |
82 | 58 | request = source_pb2.ReadRequest.Request( |
83 | 59 | num_records=10, |
@@ -128,20 +104,3 @@ async def pending_handler(self) -> PendingResponse: |
128 | 104 |
|
129 | 105 | async def partitions_handler(self) -> PartitionsResponse: |
130 | 106 | raise RuntimeError("Got a runtime error from partition handler.") |
131 | | - |
132 | | - |
133 | | -class SyncSourceError(Sourcer): |
134 | | - def read_handler(self, datum: ReadRequest) -> Iterable[Message]: |
135 | | - raise RuntimeError("Got a runtime error from read handler.") |
136 | | - |
137 | | - def ack_handler(self, ack_request: AckRequest): |
138 | | - raise RuntimeError("Got a runtime error from ack handler.") |
139 | | - |
140 | | - def nack_handler(self, nack_request: NackRequest): |
141 | | - raise RuntimeError("Got a runtime error from nack handler.") |
142 | | - |
143 | | - def pending_handler(self) -> PendingResponse: |
144 | | - raise RuntimeError("Got a runtime error from pending handler.") |
145 | | - |
146 | | - def partitions_handler(self) -> PartitionsResponse: |
147 | | - raise RuntimeError("Got a runtime error from partition handler.") |
0 commit comments