Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions paimon-python/pypaimon/catalog/filesystem_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ def _load_data_table(self, identifier: Identifier) -> FileStoreTable:
table_schema = self.get_table_schema(identifier)

# Create catalog environment for filesystem catalog
# Filesystem catalog doesn't support version management by default
from pypaimon.catalog.filesystem_catalog_loader import FileSystemCatalogLoader
catalog_environment = CatalogEnvironment(
identifier=identifier,
uuid=None, # Filesystem catalog doesn't track table UUIDs
catalog_loader=None, # No catalog loader for filesystem
uuid=None,
catalog_loader=FileSystemCatalogLoader(self.catalog_context),
supports_version_management=False
)

Expand Down
24 changes: 24 additions & 0 deletions paimon-python/pypaimon/common/options/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,16 @@ class CoreOptions:
)
)

BLOB_VIEW_FIELD: ConfigOption[str] = (
ConfigOptions.key("blob-view-field")
.string_type()
.no_default_value()
.with_description(
"Comma-separated BLOB field names that should be stored as serialized BlobViewStruct bytes "
"inline in normal data files and resolved from upstream tables at read time."
)
)

TARGET_FILE_SIZE: ConfigOption[MemorySize] = (
ConfigOptions.key("target-file-size")
.memory_type()
Expand Down Expand Up @@ -661,6 +671,20 @@ def variant_shredding_schema(self) -> Optional[str]:

def blob_descriptor_fields(self, default=None):
value = self.options.get(CoreOptions.BLOB_DESCRIPTOR_FIELD, default)
return CoreOptions._parse_field_set(value)

def blob_view_fields(self, default=None):
value = self.options.get(CoreOptions.BLOB_VIEW_FIELD, default)
return CoreOptions._parse_field_set(value)

def blob_inline_fields(self, default=None):
fields = set()
fields.update(self.blob_descriptor_fields(default))
fields.update(self.blob_view_fields(default))
return fields

@staticmethod
def _parse_field_set(value):
if value is None:
return set()
if isinstance(value, str):
Expand Down
178 changes: 151 additions & 27 deletions paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,68 +15,192 @@
# specific language governing permissions and limitations
# under the License.

from typing import Optional
from typing import Callable, Optional, Set

import pyarrow
from pyarrow import RecordBatch

from pypaimon.common.options.core_options import CoreOptions
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.table.row.blob import Blob, BlobViewStruct


class BlobDescriptorConvertReader(RecordBatchReader):
def __init__(self, inner: RecordBatchReader, table):
"""Resolves BlobView and BlobDescriptor fields in record batches.

Processing is split into two clear stages:
Stage 1 (BlobView resolution): If view fields exist, use a lightweight
prescan reader (only projecting view columns) to collect
BlobViewStructs, bulk-preload their descriptors, then read
full data from the main reader and replace view field values
with the corresponding BlobDescriptor serialized bytes.
Stage 2 (BlobData resolution): Controlled by blob-as-descriptor option.
If false, resolve all BlobDescriptor bytes (from both descriptor
fields and view fields) into real blob data bytes.
If true, return as-is.
"""

def __init__(self, inner: RecordBatchReader, table,
prescan_reader_factory: Optional[Callable[[Set[str]], RecordBatchReader]] = None):
"""
Args:
inner: The main data reader (reads all columns).
table: The table instance.
prescan_reader_factory: Optional factory that creates a lightweight
reader projecting only the specified field names. Used for
prescan to collect BlobViewStructs without reading all columns.
Signature: (field_names: Set[str]) -> RecordBatchReader
"""
self._inner = inner
self._table = table
self._prescan_reader_factory = prescan_reader_factory
self._descriptor_fields = CoreOptions.blob_descriptor_fields(table.options)
self.file_io = inner.file_io
self.blob_field_indices = inner.blob_field_indices
self._view_fields = CoreOptions.blob_view_fields(table.options)
self._descriptor_fields = CoreOptions.blob_descriptor_fields(table.options)
self._blob_as_descriptor = CoreOptions.blob_as_descriptor(table.options)
self._prescan_done = False
self._blob_view_lookup = None

def read_arrow_batch(self) -> Optional[RecordBatch]:
import pyarrow
# Ensure prescan is done before reading (only needed for view fields)
if self._view_fields and not self._prescan_done:
self._prescan_view_structs()

batch = self._inner.read_arrow_batch()
if batch is None:
return None
return self._convert_batch(batch, pyarrow)
# Resolve view fields using the preloaded lookup
if self._view_fields and self._blob_view_lookup is not None:
batch = self._resolve_view_fields(batch, self._blob_view_lookup)
# Resolve BlobDescriptor -> real bytes (if blob-as-descriptor=false)
return self._resolve_blob_data(batch)

# ------------------------------------------------------------------
# Stage 1: BlobView prescan (lightweight, only reads view columns)
# ------------------------------------------------------------------

def _prescan_view_structs(self):
"""Use a lightweight prescan reader (projecting only view columns) to
collect all BlobViewStructs and bulk-preload their descriptors."""
from pypaimon.table.row.blob import BlobViewStruct
from pypaimon.utils.blob_view_lookup import BlobViewLookup

self._prescan_done = True
all_view_structs = []

prescan_reader = self._prescan_reader_factory(self._view_fields)
try:
while True:
batch = prescan_reader.read_arrow_batch()
if batch is None:
break
for field_name in self._view_fields:
if field_name not in batch.schema.names:
continue
for value in batch.column(field_name).to_pylist():
value = self._normalize_blob_to_bytes(value)
if isinstance(value, bytes) and BlobViewStruct.is_blob_view_struct(value):
all_view_structs.append(BlobViewStruct.deserialize(value))
finally:
prescan_reader.close()

def _convert_batch(self, batch, pyarrow):
from pypaimon.table.row.blob import Blob, BlobDescriptor
# Bulk-preload BlobViewStruct -> BlobDescriptor mapping
if all_view_structs:
self._blob_view_lookup = BlobViewLookup(self._table)
self._blob_view_lookup.preload(all_view_structs)

result = batch
for field_name in self._descriptor_fields:
if field_name not in result.schema.names:
def _resolve_view_fields(self, batch, blob_view_lookup):
"""Replace BlobViewStruct bytes in view fields with the corresponding
BlobDescriptor serialized bytes."""
for field_name in self._view_fields:
if field_name not in batch.schema.names:
continue
values = result.column(field_name).to_pylist()
values = [self._normalize_blob_to_bytes(v) for v in batch.column(field_name).to_pylist()]
converted_values = []
for value in values:
if value is None:
converted_values.append(None)
continue
if hasattr(value, 'as_py'):
value = value.as_py()
if isinstance(value, str):
value = value.encode('utf-8')
if isinstance(value, bytearray):
value = bytes(value)
if not isinstance(value, bytes):
converted_values.append(value)
continue
try:
descriptor = BlobDescriptor.deserialize(value)
if descriptor.serialize() != value:
converted_values.append(value)
continue
uri_reader = self._table.file_io.uri_reader_factory.create(descriptor.uri)
converted_values.append(Blob.from_descriptor(uri_reader, descriptor).to_data())
except Exception:
if not BlobViewStruct.is_blob_view_struct(value):
converted_values.append(value)
continue
view_struct = BlobViewStruct.deserialize(value)
descriptor = blob_view_lookup.resolve_descriptor(view_struct)
converted_values.append(descriptor.serialize())

column_idx = result.schema.names.index(field_name)
result = result.set_column(
column_idx = batch.schema.names.index(field_name)
batch = batch.set_column(
column_idx,
pyarrow.field(field_name, pyarrow.large_binary(), nullable=True),
pyarrow.array(converted_values, type=pyarrow.large_binary()),
)
return result
return batch

# ------------------------------------------------------------------
# Stage 2: BlobData resolution (unified exit)
# ------------------------------------------------------------------

def _resolve_blob_data(self, batch):
if self._blob_as_descriptor:
return batch

all_inline_blob_fields = self._descriptor_fields | self._view_fields
for field_name in all_inline_blob_fields:
if field_name not in batch.schema.names:
continue
values = [self._normalize_blob_to_bytes(v) for v in batch.column(field_name).to_pylist()]
converted_values = []
for value in values:
blob = Blob.from_bytes(value, self._table.file_io)
converted_values.append(blob.to_data() if blob else None)

column_idx = batch.schema.names.index(field_name)
batch = batch.set_column(
column_idx,
pyarrow.field(field_name, pyarrow.large_binary(), nullable=True),
pyarrow.array(converted_values, type=pyarrow.large_binary()),
)
return batch

# ------------------------------------------------------------------
# Utilities
# ------------------------------------------------------------------

@staticmethod
def _normalize_blob_to_bytes(value):
if value is None:
return None
if hasattr(value, 'as_py'):
value = value.as_py()
if isinstance(value, str):
value = value.encode('utf-8')
if isinstance(value, bytearray):
value = bytes(value)
return value

def close(self):
self._inner.close()


class _CachedBatchReader(RecordBatchReader):
"""A simple reader that replays pre-cached RecordBatches.
Used as fallback when no prescan_reader_factory is provided."""

def __init__(self, batches):
self._batches = batches
self._index = 0

def read_arrow_batch(self) -> Optional[RecordBatch]:
if self._index >= len(self._batches):
return None
batch = self._batches[self._index]
self._index += 1
return batch

def close(self):
self._batches = None
36 changes: 32 additions & 4 deletions paimon-python/pypaimon/read/split_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,19 @@ def _push_down_predicate(self) -> Optional[Predicate]:
return None

def create_reader(self) -> RecordReader:
reader = self._create_raw_reader()

if (CoreOptions.blob_view_fields(self.table.options)
or (not CoreOptions.blob_as_descriptor(self.table.options)
and CoreOptions.blob_descriptor_fields(self.table.options))):
reader = BlobDescriptorConvertReader(
reader, self.table,
prescan_reader_factory=lambda names: self._create_prescan_reader(names))

return reader

def _create_raw_reader(self) -> RecordReader:
"""Core read logic: split_by_row_id -> suppliers -> ConcatBatchReader -> filter."""
files = self.split.files
suppliers = []

Expand Down Expand Up @@ -760,12 +773,27 @@ def create_reader(self) -> RecordReader:
else:
reader = merge_reader

if (not CoreOptions.blob_as_descriptor(self.table.options)
and CoreOptions.blob_descriptor_fields(self.table.options)):
reader = BlobDescriptorConvertReader(reader, self.table)

return reader

def _create_prescan_reader(self, field_names):
"""Create a prescan reader by constructing a new DataEvolutionSplitRead
instance that only projects the specified field names."""
from pypaimon.read.reader.iface.record_batch_reader import EmptyRecordBatchReader

prescan_fields = [f for f in self.read_fields if f.name in field_names]
if not prescan_fields:
return EmptyRecordBatchReader()

prescan_read = DataEvolutionSplitRead(
table=self.table,
predicate=self.predicate,
read_type=prescan_fields,
split=self.split,
row_tracking_enabled=False,
)
prescan_read.row_ranges = self.row_ranges
return prescan_read._create_raw_reader()

def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]:
"""Split files by firstRowId for data evolution."""

Expand Down
Loading