|
9 | 9 | - ``<djblob>``: Serialize Python objects to DataJoint's blob format (internal storage) |
10 | 10 | - ``<content>``: Content-addressed storage with SHA256 deduplication |
11 | 11 | - ``<xblob>``: External serialized blobs using content-addressed storage |
| 12 | + - ``<object>``: Path-addressed storage for files/folders (Zarr, HDF5) |
12 | 13 |
|
13 | 14 | Example - Creating a Custom Type: |
14 | 15 | Here's how to define your own AttributeType, modeled after the built-in types:: |
@@ -237,3 +238,192 @@ def decode(self, stored: bytes, *, key: dict | None = None) -> Any: |
237 | 238 | from . import blob |
238 | 239 |
|
239 | 240 | return blob.unpack(stored, squeeze=False) |
| 241 | + |
| 242 | + |
| 243 | +# ============================================================================= |
| 244 | +# Path-Addressed Storage Types (OAS - Object-Augmented Schema) |
| 245 | +# ============================================================================= |
| 246 | + |
| 247 | + |
| 248 | +@register_type |
| 249 | +class ObjectType(AttributeType): |
| 250 | + """ |
| 251 | + Path-addressed storage for files and folders. |
| 252 | +
|
| 253 | + The ``<object>`` type provides managed file/folder storage where the path |
| 254 | + is derived from the primary key: ``{schema}/{table}/objects/{pk}/{field}_{token}.{ext}`` |
| 255 | +
|
| 256 | + Unlike ``<content>`` (content-addressed), each row has its own storage path, |
| 257 | + and content is deleted when the row is deleted. This is ideal for: |
| 258 | +
|
| 259 | + - Zarr arrays (hierarchical chunked data) |
| 260 | + - HDF5 files |
| 261 | + - Complex multi-file outputs |
| 262 | + - Any content that shouldn't be deduplicated |
| 263 | +
|
| 264 | + Example:: |
| 265 | +
|
| 266 | + @schema |
| 267 | + class Analysis(dj.Computed): |
| 268 | + definition = ''' |
| 269 | + -> Recording |
| 270 | + --- |
| 271 | + results : <object@mystore> |
| 272 | + ''' |
| 273 | +
|
| 274 | + def make(self, key): |
| 275 | + # Store a file |
| 276 | + self.insert1({**key, 'results': '/path/to/results.zarr'}) |
| 277 | +
|
| 278 | + # Fetch returns ObjectRef for lazy access |
| 279 | + ref = (Analysis & key).fetch1('results') |
| 280 | + ref.path # Storage path |
| 281 | + ref.read() # Read file content |
| 282 | + ref.fsmap # For zarr.open(ref.fsmap) |
| 283 | +
|
| 284 | + Storage Structure: |
| 285 | + Objects are stored at:: |
| 286 | +
|
| 287 | + {store_root}/{schema}/{table}/objects/{pk}/{field}_{token}.ext |
| 288 | +
|
| 289 | + The token ensures uniqueness even if content is replaced. |
| 290 | +
|
| 291 | + Comparison with ``<content>``:: |
| 292 | +
|
| 293 | + | Aspect | <object> | <content> | |
| 294 | + |----------------|-------------------|---------------------| |
| 295 | + | Addressing | Path (by PK) | Hash (by content) | |
| 296 | + | Deduplication | No | Yes | |
| 297 | + | Deletion | With row | GC when unreferenced| |
| 298 | + | Use case | Zarr, HDF5 | Blobs, attachments | |
| 299 | +
|
| 300 | + Note: |
| 301 | + A store must be specified (``<object@store>``) unless a default store |
| 302 | + is configured. Returns ``ObjectRef`` on fetch for lazy access. |
| 303 | + """ |
| 304 | + |
| 305 | + type_name = "object" |
| 306 | + dtype = "json" |
| 307 | + |
| 308 | + def encode( |
| 309 | + self, |
| 310 | + value: Any, |
| 311 | + *, |
| 312 | + key: dict | None = None, |
| 313 | + store_name: str | None = None, |
| 314 | + ) -> dict: |
| 315 | + """ |
| 316 | + Store content and return metadata. |
| 317 | +
|
| 318 | + Args: |
| 319 | + value: Content to store. Can be: |
| 320 | + - bytes: Raw bytes to store as file |
| 321 | + - str/Path: Path to local file or folder to upload |
| 322 | + key: Dict containing context for path construction: |
| 323 | + - _schema: Schema name |
| 324 | + - _table: Table name |
| 325 | + - _field: Field/attribute name |
| 326 | + - Other entries are primary key values |
| 327 | + store_name: Store to use. If None, uses default store. |
| 328 | +
|
| 329 | + Returns: |
| 330 | + Metadata dict suitable for ObjectRef.from_json() |
| 331 | + """ |
| 332 | + from datetime import datetime, timezone |
| 333 | + from pathlib import Path |
| 334 | + |
| 335 | + from .content_registry import get_store_backend |
| 336 | + from .storage import build_object_path |
| 337 | + |
| 338 | + # Extract context from key |
| 339 | + key = key or {} |
| 340 | + schema = key.pop("_schema", "unknown") |
| 341 | + table = key.pop("_table", "unknown") |
| 342 | + field = key.pop("_field", "data") |
| 343 | + primary_key = {k: v for k, v in key.items() if not k.startswith("_")} |
| 344 | + |
| 345 | + # Determine content type and extension |
| 346 | + is_dir = False |
| 347 | + ext = None |
| 348 | + size = None |
| 349 | + |
| 350 | + if isinstance(value, bytes): |
| 351 | + content = value |
| 352 | + size = len(content) |
| 353 | + elif isinstance(value, (str, Path)): |
| 354 | + source_path = Path(value) |
| 355 | + if not source_path.exists(): |
| 356 | + raise FileNotFoundError(f"Source path does not exist: {source_path}") |
| 357 | + is_dir = source_path.is_dir() |
| 358 | + ext = source_path.suffix if not is_dir else None |
| 359 | + if is_dir: |
| 360 | + # For directories, we'll upload later |
| 361 | + content = None |
| 362 | + else: |
| 363 | + content = source_path.read_bytes() |
| 364 | + size = len(content) |
| 365 | + else: |
| 366 | + raise TypeError(f"<object> expects bytes or path, got {type(value).__name__}") |
| 367 | + |
| 368 | + # Build storage path |
| 369 | + path, token = build_object_path( |
| 370 | + schema=schema, |
| 371 | + table=table, |
| 372 | + field=field, |
| 373 | + primary_key=primary_key, |
| 374 | + ext=ext, |
| 375 | + ) |
| 376 | + |
| 377 | + # Get storage backend |
| 378 | + backend = get_store_backend(store_name) |
| 379 | + |
| 380 | + # Upload content |
| 381 | + if is_dir: |
| 382 | + # Upload directory recursively |
| 383 | + source_path = Path(value) |
| 384 | + backend.put_folder(str(source_path), path) |
| 385 | + # Compute size by summing all files |
| 386 | + size = sum(f.stat().st_size for f in source_path.rglob("*") if f.is_file()) |
| 387 | + else: |
| 388 | + backend.put_buffer(content, path) |
| 389 | + |
| 390 | + # Build metadata |
| 391 | + timestamp = datetime.now(timezone.utc) |
| 392 | + metadata = { |
| 393 | + "path": path, |
| 394 | + "store": store_name, |
| 395 | + "size": size, |
| 396 | + "ext": ext, |
| 397 | + "is_dir": is_dir, |
| 398 | + "timestamp": timestamp.isoformat(), |
| 399 | + } |
| 400 | + |
| 401 | + return metadata |
| 402 | + |
| 403 | + def decode(self, stored: dict, *, key: dict | None = None) -> Any: |
| 404 | + """ |
| 405 | + Create ObjectRef handle for lazy access. |
| 406 | +
|
| 407 | + Args: |
| 408 | + stored: Metadata dict from database. |
| 409 | + key: Primary key values (unused). |
| 410 | +
|
| 411 | + Returns: |
| 412 | + ObjectRef for accessing the stored content. |
| 413 | + """ |
| 414 | + from .content_registry import get_store_backend |
| 415 | + from .objectref import ObjectRef |
| 416 | + |
| 417 | + store_name = stored.get("store") |
| 418 | + backend = get_store_backend(store_name) |
| 419 | + return ObjectRef.from_json(stored, backend=backend) |
| 420 | + |
| 421 | + def validate(self, value: Any) -> None: |
| 422 | + """Validate that value is bytes or a valid path.""" |
| 423 | + from pathlib import Path |
| 424 | + |
| 425 | + if isinstance(value, bytes): |
| 426 | + return |
| 427 | + if isinstance(value, (str, Path)): |
| 428 | + return |
| 429 | + raise TypeError(f"<object> expects bytes or path, got {type(value).__name__}") |
0 commit comments