|
18 | 18 | import asyncio |
19 | 19 | import os |
20 | 20 | import mimetypes |
| 21 | +import io |
21 | 22 | import aiofiles |
22 | 23 | from typing import List, Union, Callable, Dict, Awaitable, Optional, Mapping, cast, TypeVar |
23 | 24 | from abc import abstractmethod, ABC |
@@ -664,6 +665,82 @@ async def send_file( |
664 | 665 |
|
665 | 666 | return writer.info |
666 | 667 |
|
| 668 | + async def send_bytes( |
| 669 | + self, |
| 670 | + data: Union[bytes, bytearray, memoryview, io.IOBase], |
| 671 | + name: str, |
| 672 | + *, |
| 673 | + mime_type: str = "application/octet-stream", |
| 674 | + topic: str = "", |
| 675 | + destination_identities: Optional[List[str]] = None, |
| 676 | + attributes: Optional[Dict[str, str]] = None, |
| 677 | + stream_id: str | None = None, |
| 678 | + ) -> ByteStreamInfo: |
| 679 | + """ |
| 680 | + Send in-memory bytes or a file-like object as a byte stream. |
| 681 | +
|
| 682 | + Accepts common Python byte/blob types: bytes, bytearray, memoryview, and readable io.IOBase |
| 683 | + (e.g., io.BytesIO, buffered readers). The name is used for the stream metadata. |
| 684 | + """ |
| 685 | + # Bytes-like input path |
| 686 | + if isinstance(data, (bytes, bytearray, memoryview)): |
| 687 | + buffer = bytes(data) |
| 688 | + total_size = len(buffer) |
| 689 | + |
| 690 | + writer: ByteStreamWriter = await self.stream_bytes( |
| 691 | + name=name, |
| 692 | + total_size=total_size, |
| 693 | + mime_type=mime_type, |
| 694 | + attributes=attributes, |
| 695 | + stream_id=stream_id, |
| 696 | + destination_identities=destination_identities, |
| 697 | + topic=topic, |
| 698 | + ) |
| 699 | + |
| 700 | + offset = 0 |
| 701 | + while offset < total_size: |
| 702 | + end = min(offset + STREAM_CHUNK_SIZE, total_size) |
| 703 | + await writer.write(buffer[offset:end]) |
| 704 | + offset = end |
| 705 | + |
| 706 | + await writer.aclose() |
| 707 | + return writer.info |
| 708 | + |
| 709 | + # File-like input path |
| 710 | + if isinstance(data, io.IOBase) and data.readable(): |
| 711 | + total_size: Optional[int] = None |
| 712 | + try: |
| 713 | + if data.seekable(): |
| 714 | + current_pos = data.tell() |
| 715 | + data.seek(0, io.SEEK_END) |
| 716 | + end_pos = data.tell() |
| 717 | + total_size = end_pos - current_pos |
| 718 | + data.seek(current_pos, io.SEEK_SET) |
| 719 | + except Exception: |
| 720 | + total_size = None |
| 721 | + |
| 722 | + writer = await self.stream_bytes( |
| 723 | + name=name, |
| 724 | + total_size=total_size, |
| 725 | + mime_type=mime_type, |
| 726 | + attributes=attributes, |
| 727 | + stream_id=stream_id, |
| 728 | + destination_identities=destination_identities, |
| 729 | + topic=topic, |
| 730 | + ) |
| 731 | + |
| 732 | + while True: |
| 733 | + chunk = data.read(STREAM_CHUNK_SIZE) |
| 734 | + if not chunk: |
| 735 | + break |
| 736 | + await writer.write(chunk) |
| 737 | + await writer.aclose() |
| 738 | + return writer.info |
| 739 | + |
| 740 | + raise TypeError( |
| 741 | + "Unsupported data type for send_bytes. Expected bytes, bytearray, memoryview, or a readable io.IOBase." |
| 742 | + ) |
| 743 | + |
667 | 744 | async def publish_track( |
668 | 745 | self, track: LocalTrack, options: TrackPublishOptions = TrackPublishOptions() |
669 | 746 | ) -> LocalTrackPublication: |
|
0 commit comments