Skip to content

Commit 9cd8d2b

Browse files
authored
docs: refine pynumaflow README (#351)
Signed-off-by: kohlisid <sidhant.kohli@gmail.com>
1 parent 5830a34 commit 9cd8d2b

6 files changed

Lines changed: 674 additions & 112 deletions

File tree

packages/pynumaflow/README.md

Lines changed: 42 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,16 @@
55
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE)
66
[![Release Version](https://img.shields.io/github/v/release/numaproj/numaflow-python?label=pynumaflow)](https://github.com/numaproj/numaflow-python/releases/latest)
77

8-
This is the Python SDK for [Numaflow](https://numaflow.numaproj.io/).
9-
10-
This SDK provides the interface for writing different functionalities of Numaflow like [UDFs](https://numaflow.numaproj.io/user-guide/user-defined-functions/user-defined-functions/), [UDSinks](https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/), [UDSources](https://numaflow.numaproj.io/user-guide/sources/user-defined-sources/) and [SideInput](https://numaflow.numaproj.io/specifications/side-inputs/) in Python.
8+
`pynumaflow` is the Python SDK for [Numaflow](https://numaflow.numaproj.io/), a Kubernetes-native stream processing framework. Write a Python function, wire it to a server class, and Numaflow handles the gRPC transport, autoscaling, and deployment — no boilerplate required. The SDK supports synchronous and asynchronous execution models, and both function-based and class-based handler styles.
119

1210
## Installation
1311

14-
Install the package using pip.
1512
```bash
1613
pip install pynumaflow
1714
```
1815

19-
### Build locally
16+
<details>
17+
<summary>Build &amp; develop locally</summary>
2018

2119
This project uses [uv](https://docs.astral.sh/uv/) for dependency management and packaging.
2220
To build the package locally, run the following command from the root of the project.
@@ -40,124 +38,56 @@ Setup [pre-commit](https://pre-commit.com/) hooks:
4038
pre-commit install
4139
```
4240

43-
## Implementing different functionalities
44-
- [Implement User Defined Sources](https://github.com/numaproj/numaflow-python/tree/main/packages/pynumaflow/examples/source)
45-
- [Implement User Defined Source Transformers](https://github.com/numaproj/numaflow-python/tree/main/packages/pynumaflow/examples/sourcetransform)
46-
- Implement User Defined Functions
47-
- [Map](https://github.com/numaproj/numaflow-python/tree/main/packages/pynumaflow/examples/map)
48-
- [Reduce](https://github.com/numaproj/numaflow-python/tree/main/packages/pynumaflow/examples/reduce)
49-
- [Map Stream](https://github.com/numaproj/numaflow-python/tree/main/packages/pynumaflow/examples/mapstream)
50-
- [Batch Map](https://github.com/numaproj/numaflow-python/tree/main/packages/pynumaflow/examples/batchmap)
51-
- [Implement User Defined Sinks](https://github.com/numaproj/numaflow-python/tree/main/packages/pynumaflow/examples/sink)
52-
- [Implement User Defined SideInputs](https://github.com/numaproj/numaflow-python/tree/main/packages/pynumaflow/examples/sideinput)
53-
54-
## Server Types
55-
56-
There are different types of gRPC server mechanisms which can be used to serve the UDFs, UDSinks and UDSource.
57-
These have different functionalities and are used for different use cases.
58-
59-
Currently we support the following server types:
60-
61-
- Sync Server
62-
- Asyncronous Server
63-
- MultiProcessing Server
41+
</details>
6442

65-
Not all of the above are supported for all UDFs, UDSource and UDSinks.
43+
## Capabilities
6644

67-
For each of the UDFs, UDSource and UDSinks, there are seperate classes for each of the server types.
68-
This helps in keeping the interface simple and easy to use, and the user can start the specific server type based on the use case.
45+
The SDK covers the full range of Numaflow extension points. Each capability maps to a dedicated set of server classes and handler interfaces.
6946

47+
> [!TIP]
48+
> Each capability below links to working examples in both function-based and class-based handler styles. See the full [examples directory](https://github.com/numaproj/numaflow-python/tree/main/packages/pynumaflow/examples) for all implementations.
7049
71-
#### SyncServer
50+
| | Description | API Reference |
51+
|---|---|---|
52+
| [**User-Defined Functions (UDFs)**](https://numaflow.numaproj.io/user-guide/user-defined-functions/user-defined-functions/) | Process and transform stream data — Map, Reduce, Reduce Stream, Map Stream, Batch Map, Accumulator | [Map](https://numaproj.io/numaflow-python/latest/api/mapper/) · [Reduce](https://numaproj.io/numaflow-python/latest/api/reducer/) · [Reduce Stream](https://numaproj.io/numaflow-python/latest/api/reducestreamer/) · [Map Stream](https://numaproj.io/numaflow-python/latest/api/mapstreamer/) · [Batch Map](https://numaproj.io/numaflow-python/latest/api/batchmapper/) · [Accumulator](https://numaproj.io/numaflow-python/latest/api/accumulator/) |
53+
| [**User-Defined Sources (UDSource)**](https://numaflow.numaproj.io/user-guide/sources/user-defined-sources/) | Ingest data from custom sources with read, ack, pending, and partition handlers | [Sourcer](https://numaproj.io/numaflow-python/latest/api/sourcer/) · [Source Transform](https://numaproj.io/numaflow-python/latest/api/sourcetransformer/) |
54+
| [**User-Defined Sinks (UDSink)**](https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/) | Deliver data to custom destinations with per-message acknowledgment | [Sinker](https://numaproj.io/numaflow-python/latest/api/sinker/) |
55+
| [**Side Inputs**](https://numaflow.numaproj.io/specifications/side-inputs/) | Broadcast slow-changing reference data to UDF vertices without passing it through the pipeline | [Side Input](https://numaproj.io/numaflow-python/latest/api/sideinput/) |
7256

73-
Syncronous Server is the simplest server type. It is a multithreaded threaded server which can be used for simple UDFs and UDSinks.
74-
Here the server will invoke the handler function for each message. The messaging is synchronous and the server will wait for the handler to return before processing the next message.
57+
## Choosing Your Server Type
7558

76-
```
77-
grpc_server = MapServer(handler)
78-
```
59+
Each functionality is served by a dedicated server class. Choose the server type that matches your workload characteristics:
7960

80-
#### AsyncServer
61+
| | **Sync** | **Async** |
62+
|---|---|---|
63+
| **Concurrency Model** | Multithreaded | asyncio event loop |
64+
| **Handler Signature** | `def handler(...)` | `async def handler(...)` |
65+
| **GIL Behaviour** | Subject to GIL | Subject to GIL |
66+
| **Typical Workloads** | Stateless transforms | I/O-bound operations |
8167

82-
Asyncronous Server is a multi threaded server which can be used for UDFs which are asyncronous. Here we utilize the asyncronous capabilities of Python to process multiple messages in parallel. The server will invoke the handler function for each message. The messaging is asyncronous and the server will not wait for the handler to return before processing the next message. Thus this server type is useful for UDFs which are asyncronous.
83-
The handler function for such a server should be an async function.
68+
## Server Class Reference
8469

85-
```py
86-
grpc_server = MapAsyncServer(handler)
87-
```
70+
| Functionality | Server Class(es) |
71+
|---|---|
72+
| [**UDSource**](https://numaproj.io/numaflow-python/latest/api/sourcer/) | [SourceAsyncServer](https://github.com/numaproj/numaflow-python/blob/main/packages/pynumaflow/examples/source/simple_source/example.py) |
73+
| [**UDSink**](https://numaproj.io/numaflow-python/latest/api/sinker/) | [SinkServer](https://github.com/numaproj/numaflow-python/blob/main/packages/pynumaflow/examples/sink/log/example.py), [SinkAsyncServer](https://github.com/numaproj/numaflow-python/blob/main/packages/pynumaflow/examples/sink/async_log/example.py) |
74+
| [**Side Input**](https://numaproj.io/numaflow-python/latest/api/sideinput/) | [SideInputServer](https://github.com/numaproj/numaflow-python/blob/main/packages/pynumaflow/examples/sideinput/simple_sideinput/example.py) |
75+
| [**Map**](https://numaproj.io/numaflow-python/latest/api/mapper/) | [MapServer](https://github.com/numaproj/numaflow-python/blob/main/packages/pynumaflow/examples/map/even_odd/example.py), [MapAsyncServer](https://github.com/numaproj/numaflow-python/blob/main/packages/pynumaflow/examples/map/async_forward_message/example.py) |
76+
| [**Reduce**](https://numaproj.io/numaflow-python/latest/api/reducer/) | [ReduceAsyncServer](https://github.com/numaproj/numaflow-python/blob/main/packages/pynumaflow/examples/reduce/counter/example.py) |
77+
| [**Reduce Stream**](https://numaproj.io/numaflow-python/latest/api/reducestreamer/) | [ReduceStreamAsyncServer](https://github.com/numaproj/numaflow-python/blob/main/packages/pynumaflow/examples/reducestream/counter/example.py) |
78+
| [**Map Stream**](https://numaproj.io/numaflow-python/latest/api/mapstreamer/) | [MapStreamAsyncServer](https://github.com/numaproj/numaflow-python/blob/main/packages/pynumaflow/examples/mapstream/flatmap_stream/example.py) |
79+
| [**Batch Map**](https://numaproj.io/numaflow-python/latest/api/batchmapper/) | [BatchMapAsyncServer](https://github.com/numaproj/numaflow-python/blob/main/packages/pynumaflow/examples/batchmap/flatmap/example.py) |
80+
| [**Accumulator**](https://numaproj.io/numaflow-python/latest/api/accumulator/) | [AccumulatorAsyncServer](https://github.com/numaproj/numaflow-python/blob/main/packages/pynumaflow/examples/accumulator/streamsorter/example.py) |
81+
| [**Source Transform**](https://numaproj.io/numaflow-python/latest/api/sourcetransformer/) | [SourceTransformServer](https://github.com/numaproj/numaflow-python/blob/main/packages/pynumaflow/examples/sourcetransform/event_time_filter/example.py), [SourceTransformAsyncServer](https://github.com/numaproj/numaflow-python/blob/main/packages/pynumaflow/examples/sourcetransform/async_event_time_filter/example.py) |
8882

89-
#### MultiProcessServer
83+
All server types accept handlers in two styles:
9084

91-
MultiProcess Server is a multi process server which can be used for UDFs which are CPU intensive. Here we utilize the multi process capabilities of Python to process multiple messages in parallel by forking multiple servers in different processes.
92-
The server will invoke the handler function for each message. Individually at the server level the messaging is synchronous and the server will wait for the handler to return before processing the next message. But since we have multiple servers running in parallel, the overall messaging also executes in parallel.
85+
- **Function-based** — pass a plain `def` or `async def` directly to the server. Best for simple, stateless logic.
86+
- **Class-based** — inherit from the corresponding base class (e.g., `Mapper`, `Reducer`, `Sinker`) and implement the `handler` method. Useful when your handler needs initialization arguments, internal state, or helper methods.
9387

94-
This could be an alternative to creating multiple replicas of the same UDF container as here we are using the multi processing capabilities of the system to process multiple messages in parallel but within the same container.
88+
The linked examples above demonstrate both styles for each functionality.
9589

96-
Thus this server type is useful for UDFs which are CPU intensive.
97-
```
98-
grpc_server = MapMultiProcServer(mapper_instance=handler, server_count=2)
99-
```
90+
## Contributing
10091

101-
#### Currently Supported Server Types for each functionality
102-
103-
These are the class names for the server types supported by each of the functionalities.
104-
105-
- UDFs
106-
- Map
107-
- MapServer
108-
- MapAsyncServer
109-
- MapMultiProcServer
110-
- Reduce
111-
- ReduceAsyncServer
112-
- MapStream
113-
- MapStreamAsyncServer
114-
- BatchMap
115-
- BatchMapAsyncServer
116-
- Source Transform
117-
- SourceTransformServer
118-
- SourceTransformMultiProcServer
119-
- UDSource
120-
- SourceServer
121-
- SourceAsyncServer
122-
- UDSink
123-
- SinkServer
124-
- SinkAsyncServer
125-
- SideInput
126-
- SideInputServer
127-
128-
129-
130-
131-
### Handler Function and Classes
132-
133-
All the server types take a instance of a handler class or a handler function as an argument.
134-
The handler function or class is the function or class which implements the functionality of the UDF, UDSource or UDSink.
135-
For ease of use the user can pass either of the two to the server and the server will handle the rest.
136-
137-
The handler for each of the servers has a specific signature which is defined by the server type and the implentation of the handlers
138-
should follow the same signature.
139-
140-
For using the class based handlers the user can inherit from the base handler class for each of the functionalities and implement the handler function.
141-
The base handler class for each of the functionalities has the same signature as the handler function for the respective server type.
142-
The list of base handler classes for each of the functionalities is given below:
143-
144-
- UDFs
145-
- Map
146-
- Mapper
147-
- Reduce
148-
- Reducer
149-
- MapStream
150-
- MapStreamer
151-
- Source Transform
152-
- SourceTransformer
153-
- Batch Map
154-
- BatchMapper
155-
- UDSource
156-
- Sourcer
157-
- UDSink
158-
- Sinker
159-
- SideInput
160-
- SideInput
161-
162-
More details about the signature of the handler function for each of the server types is given in the
163-
documentation of the respective server type.
92+
For SDK development workflow, testing against a live pipeline, and adding new examples, see the [Developer Guide](../../development.md).
93+
For general contribution guidelines, see the [Numaproj Contributing Guide](https://github.com/numaproj/numaproj/blob/main/CONTRIBUTING.md).
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
####################################################################################################
2+
# Stage 1: Builder - installs all dependencies using uv
3+
####################################################################################################
4+
FROM ghcr.io/astral-sh/uv:python3.13-trixie AS builder
5+
6+
ENV PYSETUP_PATH="/opt/pysetup"
7+
WORKDIR $PYSETUP_PATH
8+
9+
COPY pyproject.toml uv.lock README.md ./
10+
COPY pynumaflow/ ./pynumaflow/
11+
12+
ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/map/async_forward_message"
13+
COPY examples/map/async_forward_message/ $EXAMPLE_PATH/
14+
15+
WORKDIR $EXAMPLE_PATH
16+
RUN uv sync --no-dev --no-install-project --frozen
17+
18+
####################################################################################################
19+
# Stage 2: Runtime - clean image with only installed packages
20+
####################################################################################################
21+
FROM ghcr.io/astral-sh/uv:python3.13-trixie AS udf
22+
23+
ENV PYSETUP_PATH="/opt/pysetup"
24+
ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/map/async_forward_message"
25+
26+
WORKDIR $EXAMPLE_PATH
27+
COPY --from=builder $EXAMPLE_PATH/.venv $EXAMPLE_PATH/.venv
28+
COPY --from=builder $EXAMPLE_PATH/ $EXAMPLE_PATH/
29+
30+
# NOTE: We cannot use "uv run python example.py" here because uv run reads the
31+
# example's pyproject.toml, finds the pynumaflow path source (path = "../../../"),
32+
# and tries to resolve it. In the runtime stage, the parent pynumaflow source tree
33+
# is not present (by design, to keep the image small), so uv run fails.
34+
# Instead, we activate the pre-built .venv via PATH and run python directly.
35+
ENV PATH="$EXAMPLE_PATH/.venv/bin:$PATH"
36+
CMD ["python", "example.py"]
37+
38+
EXPOSE 5000
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
TAG ?= stable
2+
PUSH ?= false
3+
IMAGE_REGISTRY = quay.io/numaio/numaflow-python/map-async-forward-message:${TAG}
4+
DOCKER_FILE_PATH = examples/map/async_forward_message/Dockerfile
5+
6+
.PHONY: update
7+
update:
8+
uv lock --check || uv lock
9+
10+
.PHONY: image-push
11+
image-push: update
12+
cd ../../../ && docker buildx build \
13+
-f ${DOCKER_FILE_PATH} \
14+
-t ${IMAGE_REGISTRY} \
15+
--platform linux/amd64,linux/arm64 . --push
16+
17+
.PHONY: image
18+
image: update
19+
cd ../../../ && docker build \
20+
-f ${DOCKER_FILE_PATH} \
21+
-t ${IMAGE_REGISTRY} .
22+
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import os
2+
3+
from pynumaflow.mapper import Messages, Message, Datum, MapAsyncServer, Mapper
4+
5+
6+
class AsyncMessageForwarder(Mapper):
7+
"""
8+
This is a class that inherits from the Mapper class.
9+
It implements the handler method as an async function.
10+
"""
11+
12+
async def handler(self, keys: list[str], datum: Datum) -> Messages:
13+
val = datum.value
14+
_ = datum.event_time
15+
_ = datum.watermark
16+
return Messages(Message(value=val, keys=keys))
17+
18+
19+
async def my_handler(keys: list[str], datum: Datum) -> Messages:
20+
val = datum.value
21+
_ = datum.event_time
22+
_ = datum.watermark
23+
return Messages(Message(value=val, keys=keys))
24+
25+
26+
if __name__ == "__main__":
27+
"""
28+
Use the class based approach or function based handler
29+
based on the env variable.
30+
Both can be used and passed directly to the server class.
31+
"""
32+
invoke = os.getenv("INVOKE", "func_handler")
33+
if invoke == "class":
34+
handler = AsyncMessageForwarder()
35+
else:
36+
handler = my_handler
37+
grpc_server = MapAsyncServer(handler)
38+
grpc_server.start()
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[project]
2+
name = "map-async-forward-message"
3+
version = "0.1.0"
4+
description = ""
5+
requires-python = ">=3.13"
6+
dependencies = [
7+
"pynumaflow",
8+
]
9+
10+
[tool.uv.sources]
11+
pynumaflow = { path = "../../../" }
12+
13+
[tool.hatch.build.targets.wheel]
14+
packages = []
15+
16+
[build-system]
17+
requires = ["hatchling"]
18+
build-backend = "hatchling.build"

0 commit comments

Comments
 (0)