Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions fastapi_startkit/src/fastapi_startkit/reverb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Reverb broadcasting module.

Provides a Laravel-style broadcasting API for FastAPI applications:

- :class:`.Channel` / :class:`.PrivateChannel` / :class:`.PresenceChannel` —
channel type declarations.
- :class:`.BroadcastEvent` — base class for broadcastable events.
- :class:`.ChannelAuthRegistry` — pattern-based authorization registry.
- :class:`.Broadcaster` — core dispatcher (bound as ``"broadcast"`` in the
service container).
- :class:`.ReverbProvider` — service provider that auto-wires everything.

Typical usage
-------------
1. Register ``ReverbProvider`` in your application providers.
2. Create ``routes/channels.py`` with ``@Broadcast.channel(...)`` callbacks.
3. Create event classes that extend ``BroadcastEvent`` and implement
``broadcast_on()``.
4. Call ``await event.emit()`` or ``await Broadcast.dispatch(event)`` to
send events to connected clients.
"""

from .broadcaster import Broadcaster
from .channels import Channel, PresenceChannel, PrivateChannel
from .event import BroadcastEvent
from .provider import ReverbProvider
from .registry import ChannelAuthRegistry

__all__ = [
"Channel",
"PrivateChannel",
"PresenceChannel",
"BroadcastEvent",
"ChannelAuthRegistry",
"Broadcaster",
"ReverbProvider",
]
120 changes: 120 additions & 0 deletions fastapi_startkit/src/fastapi_startkit/reverb/broadcaster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""Core Broadcaster — dispatches events to the Reverb WebSocket server.

The ``Broadcaster`` is bound into the container under both ``"broadcast"`` and
``"reverb"`` by :class:`~fastapi_startkit.reverb.provider.ReverbProvider`.
Access it via the ``Broadcast`` facade or resolve it directly from the
container.

Typical usage::

# Via facade
await Broadcast.dispatch(OrderShipped(order_id=42))
await Broadcast.emit("orders.42", "OrderShipped", {"order_id": 42})

# Via decorator
@Broadcast.channel("orders.{order_id}")
async def authorize_orders(user, order_id: int) -> bool:
return user.id == order_id
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Callable

if TYPE_CHECKING:
from .event import BroadcastEvent
from .registry import ChannelAuthRegistry
from ..broadcasting.reverb.server import ReverbServer


class Broadcaster:
"""Dispatches broadcast events and manages channel authorization.

Args:
server: The Reverb WebSocket server that delivers messages to
connected clients.
registry: The :class:`~fastapi_startkit.reverb.registry.ChannelAuthRegistry`
that holds ``@Broadcast.channel`` callbacks.
config: Raw broadcasting config dict (from ``BroadcastingConfig``).
"""

def __init__(
self,
server: "ReverbServer | None" = None,
registry: "ChannelAuthRegistry | None" = None,
config: dict | None = None,
) -> None:
self._server = server
self._registry = registry
self._config = config or {}

# ------------------------------------------------------------------
# Primary dispatch path
# ------------------------------------------------------------------

async def dispatch(self, event: "BroadcastEvent") -> None:
"""Broadcast a :class:`~fastapi_startkit.reverb.event.BroadcastEvent`
to every channel returned by its ``broadcast_on()`` method.

Args:
event: A ``BroadcastEvent`` instance. The event name defaults to
``event.__class__.__name__`` when ``event.name`` is ``None``.
"""
if self._server is None:
return

channels = event.broadcast_on()
event_name = event.name if event.name is not None else event.__class__.__name__
payload = event.payload if isinstance(event.payload, dict) else {}

for channel in channels:
await self._server.broadcast_to_channel(channel.name, event_name, payload)

# ------------------------------------------------------------------
# Escape hatch
# ------------------------------------------------------------------

async def emit(self, channel: str, event_name: str, payload: dict) -> None:
"""Broadcast a raw event without wrapping it in a ``BroadcastEvent``.

Useful for quick one-off broadcasts or dynamic channel names that
don't warrant a dedicated event class.

Args:
channel: Full channel name (e.g. ``"orders.42"``).
event_name: Event name sent to subscribers.
payload: Arbitrary JSON-serializable dict.
"""
if self._server is None:
return
await self._server.broadcast_to_channel(channel, event_name, payload)

# ------------------------------------------------------------------
# Channel authorization decorator
# ------------------------------------------------------------------

def channel(self, pattern: str) -> Callable:
"""Register a channel authorization callback.

Use as a decorator in ``routes/channels.py``::

from fastapi_startkit.facades.Broadcast import Broadcast

@Broadcast.channel("orders.{order_id}")
async def authorize_orders(user, order_id: int) -> bool:
return user.id == order_id

Args:
pattern: Channel pattern with ``{name}`` placeholders.

Returns:
A decorator that registers the wrapped callable and returns it
unchanged so it remains importable.
"""

def decorator(callback: Callable) -> Callable:
if self._registry is not None:
self._registry.register(pattern, callback)
return callback

return decorator
55 changes: 55 additions & 0 deletions fastapi_startkit/src/fastapi_startkit/reverb/channels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Channel types for Reverb broadcasting.

Three channel types control authorization behaviour:

- ``Channel`` — public, no authorization check required.
- ``PrivateChannel`` — authorization is checked via a ``@Broadcast.channel``
callback before a subscription is accepted.
- ``PresenceChannel``— authorization is checked *and* member tracking is
available (tracking is a v2 concern, but the class
must exist for the API to be forward-compatible).
"""

from __future__ import annotations


class Channel:
"""Public channel — subscriptions accepted without any auth check."""

def __init__(self, name: str) -> None:
self.name = name

def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.name!r})"

def __eq__(self, other: object) -> bool:
return isinstance(other, Channel) and self.name == other.name

def __hash__(self) -> int:
return hash((self.__class__.__name__, self.name))


class PrivateChannel(Channel):
"""Private channel — authorization checked via ``@Broadcast.channel``
callback before any subscription is accepted.

The channel name is automatically prefixed with ``private-`` to match
the Pusher/Laravel Echo protocol convention.
"""

def __init__(self, name: str) -> None:
self._raw_name = name
super().__init__(f"private-{name}")


class PresenceChannel(Channel):
"""Presence channel — authorization checked, member tracking available.

The channel name is automatically prefixed with ``presence-`` to match
the Pusher/Laravel Echo protocol convention. Full member-tracking is a
v2 feature; the class exists now so the public API is stable.
"""

def __init__(self, name: str) -> None:
self._raw_name = name
super().__init__(f"presence-{name}")
60 changes: 60 additions & 0 deletions fastapi_startkit/src/fastapi_startkit/reverb/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""BroadcastEvent base class.

Subclass ``BroadcastEvent`` to create broadcastable events::

class OrderShipped(BroadcastEvent):
def __init__(self, order_id: int) -> None:
self.payload = {"order_id": order_id}

def broadcast_on(self):
return [PrivateChannel(f"orders.{self.order_id}")]

# Broadcast synchronously from a FastAPI endpoint:
await OrderShipped(order_id=42).emit()

# Or dispatch via the facade:
await Broadcast.dispatch(OrderShipped(order_id=42))
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Union

from .channels import Channel, PresenceChannel, PrivateChannel


class BroadcastEvent(ABC):
"""Abstract base for all broadcastable events.

Subclasses **must** implement :meth:`broadcast_on`. ``payload`` and
``name`` are intentionally class-level defaults so users can override
them either as class attributes *or* in ``__init__``.
"""

#: Data payload forwarded to subscribers. Override per-instance in
#: ``__init__`` or as a class attribute.
payload: dict = {}

#: Event name sent over the wire. Defaults to the class name when
#: ``None`` so that renaming the Python class also renames the event.
name: str | None = None

@abstractmethod
def broadcast_on(self) -> list[Union[Channel, PrivateChannel, PresenceChannel]]:
"""Return the channels this event should be broadcast on.

Must be overridden by every concrete subclass.
"""
...

async def emit(self) -> None:
"""Convenience shortcut — delegates to ``Broadcast.dispatch(self)``.

Because ``dispatch`` is a coroutine, ``emit`` must be awaited::

await OrderShipped(order_id=42).emit()
"""
from fastapi_startkit.facades.Broadcast import Broadcast

await Broadcast.dispatch(self)
Loading
Loading