From cc1310cb60ef143e9d501a2cc1fb54f9245c9a7d Mon Sep 17 00:00:00 2001 From: Joey French Date: Thu, 20 Nov 2025 02:53:36 -0600 Subject: [PATCH] Refactor extensions to replace TableIngestClient with FileClient and MaterializationClient This commit removes the deprecated TableIngestClient and integrates FileClient and MaterializationClient into the extensions module. The changes include updates to the initialization of RoboSystemsExtensions, adjustments in the __init__.py file to reflect the new client structure, and modifications to tests to ensure proper configuration and functionality of the new clients. This refactoring enhances the API's file management capabilities and streamlines the client architecture. --- robosystems_client/extensions/__init__.py | 38 +- robosystems_client/extensions/extensions.py | 15 +- robosystems_client/extensions/file_client.py | 380 ++++++++++++++ .../extensions/materialization_client.py | 211 ++++++++ robosystems_client/extensions/table_client.py | 161 ++++++ .../extensions/table_ingest_client.py | 463 ------------------ tests/test_extensions.py | 41 +- tests/test_table_ingest_client.py | 112 ----- 8 files changed, 828 insertions(+), 593 deletions(-) create mode 100644 robosystems_client/extensions/file_client.py create mode 100644 robosystems_client/extensions/materialization_client.py create mode 100644 robosystems_client/extensions/table_client.py delete mode 100644 robosystems_client/extensions/table_ingest_client.py delete mode 100644 tests/test_table_ingest_client.py diff --git a/robosystems_client/extensions/__init__.py b/robosystems_client/extensions/__init__.py index edf3281..c5ff74d 100644 --- a/robosystems_client/extensions/__init__.py +++ b/robosystems_client/extensions/__init__.py @@ -28,12 +28,22 @@ OperationProgress, OperationResult, ) -from .table_ingest_client import ( - TableIngestClient, - UploadOptions, - IngestOptions, - UploadResult, +from .file_client import ( + FileClient, + FileUploadOptions, + FileUploadResult, + FileInfo, +) +from .materialization_client import ( + MaterializationClient, + MaterializationOptions, + MaterializationResult, + MaterializationStatus, +) +from .table_client import ( + TableClient, TableInfo, + QueryResult as TableQueryResult, ) from .graph_client import ( GraphClient, @@ -177,12 +187,20 @@ "OperationStatus", "OperationProgress", "OperationResult", - # Table Ingest Client - "TableIngestClient", - "UploadOptions", - "IngestOptions", - "UploadResult", + # File Client + "FileClient", + "FileUploadOptions", + "FileUploadResult", + "FileInfo", + # Materialization Client + "MaterializationClient", + "MaterializationOptions", + "MaterializationResult", + "MaterializationStatus", + # Table Client + "TableClient", "TableInfo", + "TableQueryResult", # Graph Client "GraphClient", "GraphMetadata", diff --git a/robosystems_client/extensions/extensions.py b/robosystems_client/extensions/extensions.py index a048af0..f1d031c 100644 --- a/robosystems_client/extensions/extensions.py +++ b/robosystems_client/extensions/extensions.py @@ -9,7 +9,9 @@ from .query_client import QueryClient from .agent_client import AgentClient from .operation_client import OperationClient -from .table_ingest_client import TableIngestClient +from .file_client import FileClient +from .materialization_client import MaterializationClient +from .table_client import TableClient from .graph_client import GraphClient from .sse_client import SSEClient @@ -61,7 +63,9 @@ def __init__(self, config: RoboSystemsExtensionConfig = None): self.query = QueryClient(self.config) self.agent = AgentClient(self.config) self.operations = OperationClient(self.config) - self.tables = TableIngestClient(self.config) + self.files = FileClient(self.config) + self.materialization = MaterializationClient(self.config) + self.tables = TableClient(self.config) self.graphs = GraphClient(self.config) def monitor_operation( @@ -92,7 +96,12 @@ def close(self): self.query.close() self.agent.close() self.operations.close_all() - self.tables.close() + if hasattr(self.files, "close"): + self.files.close() + if hasattr(self.materialization, "close"): + self.materialization.close() + if hasattr(self.tables, "close"): + self.tables.close() self.graphs.close() # Convenience methods that delegate to the appropriate clients diff --git a/robosystems_client/extensions/file_client.py b/robosystems_client/extensions/file_client.py new file mode 100644 index 0000000..6c01105 --- /dev/null +++ b/robosystems_client/extensions/file_client.py @@ -0,0 +1,380 @@ +"""File Client for RoboSystems API + +Manages file operations as first-class resources with multi-layer status tracking. +Files are independent entities with their own lifecycle (S3 → DuckDB → Graph). +""" + +from dataclasses import dataclass +from io import BytesIO +from pathlib import Path +from typing import Dict, Any, Optional, Callable, Union, BinaryIO +import logging +import httpx + +from ..api.files.create_file_upload import ( + sync_detailed as create_file_upload, +) +from ..api.files.update_file import ( + sync_detailed as update_file, +) +from ..api.files.list_files import ( + sync_detailed as list_files, +) +from ..api.files.get_file import ( + sync_detailed as get_file, +) +from ..api.files.delete_file import ( + sync_detailed as delete_file, +) +from ..models.file_upload_request import FileUploadRequest +from ..models.file_status_update import FileStatusUpdate + +logger = logging.getLogger(__name__) + + +@dataclass +class FileUploadOptions: + """Options for file upload operations""" + + on_progress: Optional[Callable[[str], None]] = None + fix_localstack_url: bool = True + ingest_to_graph: bool = False + + +@dataclass +class FileUploadResult: + """Result from file upload operation""" + + file_id: str + file_size: int + row_count: int + table_name: str + file_name: str + success: bool = True + error: Optional[str] = None + + +@dataclass +class FileInfo: + """Information about a file""" + + file_id: str + file_name: str + file_format: str + size_bytes: int + row_count: Optional[int] + upload_status: str + table_name: str + created_at: Optional[str] + uploaded_at: Optional[str] + layers: Optional[Dict[str, Any]] = None + + +class FileClient: + """Client for managing files as first-class resources""" + + def __init__(self, config: Dict[str, Any]): + self.config = config + self.base_url = config["base_url"] + self.headers = config.get("headers", {}) + self.token = config.get("token") + self._http_client = httpx.Client(timeout=120.0) + + def upload( + self, + graph_id: str, + table_name: str, + file_or_buffer: Union[Path, str, BytesIO, BinaryIO], + options: Optional[FileUploadOptions] = None, + ) -> FileUploadResult: + """ + Upload a file to a table. + + This handles the complete 3-step upload process: + 1. Get presigned upload URL + 2. Upload file to S3 + 3. Mark file as 'uploaded' (triggers DuckDB staging) + + Args: + graph_id: Graph database identifier + table_name: Table to associate file with + file_or_buffer: File path, Path object, BytesIO, or file-like object + options: Upload options (progress callback, LocalStack URL fix, auto-ingest) + + Returns: + FileUploadResult with file metadata and status + """ + options = options or FileUploadOptions() + + try: + # Determine file name and read content + if isinstance(file_or_buffer, (str, Path)): + file_path = Path(file_or_buffer) + file_name = file_path.name + with open(file_path, "rb") as f: + file_content = f.read() + elif isinstance(file_or_buffer, BytesIO): + file_name = "data.parquet" + file_content = file_or_buffer.getvalue() + elif hasattr(file_or_buffer, "read"): + file_name = getattr(file_or_buffer, "name", "data.parquet") + file_content = file_or_buffer.read() + else: + raise ValueError(f"Unsupported file type: {type(file_or_buffer)}") + + # Step 1: Get presigned upload URL + if options.on_progress: + options.on_progress( + f"Getting upload URL for {file_name} → table '{table_name}'..." + ) + + upload_request = FileUploadRequest( + file_name=file_name, + content_type="application/x-parquet", + table_name=table_name, + ) + + kwargs = { + "graph_id": graph_id, + "client": self.config.get("client"), + "body": upload_request, + } + + response = create_file_upload(**kwargs) + + if response.status_code != 200 or not response.parsed: + error_msg = f"Failed to get upload URL: {response.status_code}" + return FileUploadResult( + file_id="", + file_size=0, + row_count=0, + table_name=table_name, + file_name=file_name, + success=False, + error=error_msg, + ) + + upload_data = response.parsed + upload_url = upload_data.upload_url + file_id = upload_data.file_id + + # Fix LocalStack URL if needed + if options.fix_localstack_url and "localstack:4566" in upload_url: + upload_url = upload_url.replace("localstack:4566", "localhost:4566") + + # Step 2: Upload file to S3 + if options.on_progress: + options.on_progress(f"Uploading {file_name} to S3...") + + s3_response = self._http_client.put( + upload_url, + content=file_content, + headers={"Content-Type": "application/x-parquet"}, + ) + + if s3_response.status_code not in [200, 204]: + return FileUploadResult( + file_id=file_id, + file_size=len(file_content), + row_count=0, + table_name=table_name, + file_name=file_name, + success=False, + error=f"S3 upload failed: {s3_response.status_code}", + ) + + # Step 3: Mark file as uploaded + if options.on_progress: + options.on_progress(f"Marking {file_name} as uploaded...") + + status_update = FileStatusUpdate( + status="uploaded", + ingest_to_graph=options.ingest_to_graph, + ) + + update_kwargs = { + "graph_id": graph_id, + "file_id": file_id, + "client": self.config.get("client"), + "body": status_update, + } + + update_response = update_file(**update_kwargs) + + if update_response.status_code != 200 or not update_response.parsed: + return FileUploadResult( + file_id=file_id, + file_size=len(file_content), + row_count=0, + table_name=table_name, + file_name=file_name, + success=False, + error="Failed to complete file upload", + ) + + # Extract metadata from response + response_data = update_response.parsed + actual_file_size = getattr(response_data, "file_size_bytes", len(file_content)) + actual_row_count = getattr(response_data, "row_count", 0) + + if options.on_progress: + options.on_progress( + f"✅ Uploaded {file_name} ({actual_file_size:,} bytes, {actual_row_count:,} rows)" + ) + + return FileUploadResult( + file_id=file_id, + file_size=actual_file_size, + row_count=actual_row_count, + table_name=table_name, + file_name=file_name, + success=True, + ) + + except Exception as e: + logger.error(f"File upload failed: {e}") + return FileUploadResult( + file_id="", + file_size=0, + row_count=0, + table_name=table_name, + file_name=getattr(file_or_buffer, "name", "unknown"), + success=False, + error=str(e), + ) + + def list( + self, + graph_id: str, + table_name: Optional[str] = None, + status: Optional[str] = None, + ) -> list[FileInfo]: + """ + List files in a graph with optional filtering. + + Args: + graph_id: Graph database identifier + table_name: Optional table name filter + status: Optional upload status filter (uploaded, pending, etc.) + + Returns: + List of FileInfo objects + """ + try: + kwargs = { + "graph_id": graph_id, + "client": self.config.get("client"), + } + + if table_name: + kwargs["table_name"] = table_name + if status: + kwargs["status"] = status + + response = list_files(**kwargs) + + if response.status_code != 200 or not response.parsed: + logger.error(f"Failed to list files: {response.status_code}") + return [] + + files_data = response.parsed + files = getattr(files_data, "files", []) + + return [ + FileInfo( + file_id=f.file_id, + file_name=f.file_name, + file_format=f.file_format, + size_bytes=f.size_bytes or 0, + row_count=f.row_count, + upload_status=f.upload_status, + table_name=getattr(f, "table_name", ""), + created_at=f.created_at, + uploaded_at=f.uploaded_at, + ) + for f in files + ] + + except Exception as e: + logger.error(f"Failed to list files: {e}") + return [] + + def get(self, graph_id: str, file_id: str) -> Optional[FileInfo]: + """ + Get detailed information about a specific file. + + Args: + graph_id: Graph database identifier + file_id: File ID + + Returns: + FileInfo with multi-layer status tracking, or None if not found + """ + try: + kwargs = { + "graph_id": graph_id, + "file_id": file_id, + "client": self.config.get("client"), + } + + response = get_file(**kwargs) + + if response.status_code != 200 or not response.parsed: + logger.error(f"Failed to get file {file_id}: {response.status_code}") + return None + + file_data = response.parsed + + return FileInfo( + file_id=file_data.file_id, + file_name=file_data.file_name, + file_format=file_data.file_format, + size_bytes=file_data.size_bytes or 0, + row_count=file_data.row_count, + upload_status=file_data.upload_status, + table_name=file_data.table_name or "", + created_at=file_data.created_at, + uploaded_at=file_data.uploaded_at, + layers=getattr(file_data, "layers", None), + ) + + except Exception as e: + logger.error(f"Failed to get file {file_id}: {e}") + return None + + def delete(self, graph_id: str, file_id: str, cascade: bool = False) -> bool: + """ + Delete a file from all layers. + + Args: + graph_id: Graph database identifier + file_id: File ID to delete + cascade: If True, delete from all layers including DuckDB and graph + + Returns: + True if deletion succeeded, False otherwise + """ + try: + kwargs = { + "graph_id": graph_id, + "file_id": file_id, + "client": self.config.get("client"), + "cascade": cascade, + } + + response = delete_file(**kwargs) + + if response.status_code not in [200, 204]: + logger.error(f"Failed to delete file {file_id}: {response.status_code}") + return False + + return True + + except Exception as e: + logger.error(f"Failed to delete file {file_id}: {e}") + return False + + def __del__(self): + """Cleanup HTTP client on deletion""" + if hasattr(self, "_http_client"): + self._http_client.close() diff --git a/robosystems_client/extensions/materialization_client.py b/robosystems_client/extensions/materialization_client.py new file mode 100644 index 0000000..2ddbbbe --- /dev/null +++ b/robosystems_client/extensions/materialization_client.py @@ -0,0 +1,211 @@ +"""Materialization Client for RoboSystems API + +Manages graph materialization from DuckDB staging tables. +Treats the graph database as a materialized view of the mutable DuckDB data lake. +""" + +from dataclasses import dataclass +from typing import Dict, Any, Optional, Callable +import logging + +from ..api.materialization.materialize_graph import ( + sync_detailed as materialize_graph, +) +from ..api.materialization.get_materialization_status import ( + sync_detailed as get_materialization_status, +) +from ..models.materialize_request import MaterializeRequest + +logger = logging.getLogger(__name__) + + +@dataclass +class MaterializationOptions: + """Options for graph materialization operations""" + + ignore_errors: bool = True + rebuild: bool = False + force: bool = False + on_progress: Optional[Callable[[str], None]] = None + + +@dataclass +class MaterializationResult: + """Result from materialization operation""" + + status: str + was_stale: bool + stale_reason: Optional[str] + tables_materialized: list[str] + total_rows: int + execution_time_ms: float + message: str + success: bool = True + error: Optional[str] = None + + +@dataclass +class MaterializationStatus: + """Status information about graph materialization""" + + graph_id: str + is_stale: bool + stale_reason: Optional[str] + stale_since: Optional[str] + last_materialized_at: Optional[str] + materialization_count: int + hours_since_materialization: Optional[float] + message: str + + +class MaterializationClient: + """Client for managing graph materialization operations""" + + def __init__(self, config: Dict[str, Any]): + self.config = config + self.base_url = config["base_url"] + self.headers = config.get("headers", {}) + self.token = config.get("token") + + def materialize( + self, + graph_id: str, + options: Optional[MaterializationOptions] = None, + ) -> MaterializationResult: + """ + Materialize graph from DuckDB staging tables. + + Rebuilds the complete graph database from the current state of DuckDB + staging tables. Automatically discovers all tables, materializes them in + the correct order (nodes before relationships), and clears the staleness flag. + + Args: + graph_id: Graph database identifier + options: Materialization options (ignore_errors, rebuild, force) + + Returns: + MaterializationResult with detailed execution information + + When to use: + - After batch uploads (files uploaded with ingest_to_graph=false) + - After cascade file deletions (graph marked stale) + - Periodic full refresh to ensure consistency + - Recovery from partial materialization failures + """ + options = options or MaterializationOptions() + + try: + if options.on_progress: + options.on_progress("Starting graph materialization...") + + request = MaterializeRequest( + ignore_errors=options.ignore_errors, + rebuild=options.rebuild, + force=options.force, + ) + + kwargs = { + "graph_id": graph_id, + "client": self.config.get("client"), + "body": request, + } + + response = materialize_graph(**kwargs) + + if response.status_code != 200 or not response.parsed: + error_msg = f"Materialization failed: {response.status_code}" + if hasattr(response, "content"): + try: + import json + + error_data = json.loads(response.content) + error_msg = error_data.get("detail", error_msg) + except Exception: + pass + + return MaterializationResult( + status="failed", + was_stale=False, + stale_reason=None, + tables_materialized=[], + total_rows=0, + execution_time_ms=0, + message=error_msg, + success=False, + error=error_msg, + ) + + result_data = response.parsed + + if options.on_progress: + options.on_progress( + f"✅ Materialization complete: {len(result_data.tables_materialized)} tables, " + f"{result_data.total_rows:,} rows in {result_data.execution_time_ms:.2f}ms" + ) + + return MaterializationResult( + status=result_data.status, + was_stale=result_data.was_stale, + stale_reason=result_data.stale_reason, + tables_materialized=result_data.tables_materialized, + total_rows=result_data.total_rows, + execution_time_ms=result_data.execution_time_ms, + message=result_data.message, + success=True, + ) + + except Exception as e: + logger.error(f"Materialization failed: {e}") + return MaterializationResult( + status="failed", + was_stale=False, + stale_reason=None, + tables_materialized=[], + total_rows=0, + execution_time_ms=0, + message=str(e), + success=False, + error=str(e), + ) + + def status(self, graph_id: str) -> Optional[MaterializationStatus]: + """ + Get current materialization status for the graph. + + Shows whether the graph is stale (DuckDB has changes not yet in graph database), + when it was last materialized, and how long since last materialization. + + Args: + graph_id: Graph database identifier + + Returns: + MaterializationStatus with staleness and timing information + """ + try: + kwargs = { + "graph_id": graph_id, + "client": self.config.get("client"), + } + + response = get_materialization_status(**kwargs) + + if response.status_code != 200 or not response.parsed: + logger.error(f"Failed to get materialization status: {response.status_code}") + return None + + status_data = response.parsed + + return MaterializationStatus( + graph_id=status_data.graph_id, + is_stale=status_data.is_stale, + stale_reason=status_data.stale_reason, + stale_since=status_data.stale_since, + last_materialized_at=status_data.last_materialized_at, + materialization_count=status_data.materialization_count, + hours_since_materialization=status_data.hours_since_materialization, + message=status_data.message, + ) + + except Exception as e: + logger.error(f"Failed to get materialization status: {e}") + return None diff --git a/robosystems_client/extensions/table_client.py b/robosystems_client/extensions/table_client.py new file mode 100644 index 0000000..00f40f1 --- /dev/null +++ b/robosystems_client/extensions/table_client.py @@ -0,0 +1,161 @@ +"""Table Client for RoboSystems API + +Manages DuckDB staging table operations. +Tables provide SQL-queryable staging layer before graph materialization. +""" + +from dataclasses import dataclass +from typing import Dict, Any, Optional +import logging + +from ..api.tables.list_tables import ( + sync_detailed as list_tables, +) +from ..api.tables.query_tables import ( + sync_detailed as query_tables, +) +from ..models.table_query_request import TableQueryRequest + +logger = logging.getLogger(__name__) + + +@dataclass +class TableInfo: + """Information about a DuckDB staging table""" + + table_name: str + table_type: str + row_count: int + file_count: int + total_size_bytes: int + + +@dataclass +class QueryResult: + """Result from SQL query execution""" + + columns: list[str] + rows: list[list[Any]] + row_count: int + execution_time_ms: float + success: bool = True + error: Optional[str] = None + + +class TableClient: + """Client for managing DuckDB staging tables""" + + def __init__(self, config: Dict[str, Any]): + self.config = config + self.base_url = config["base_url"] + self.headers = config.get("headers", {}) + self.token = config.get("token") + + def list(self, graph_id: str) -> list[TableInfo]: + """ + List all DuckDB staging tables in a graph. + + Args: + graph_id: Graph database identifier + + Returns: + List of TableInfo objects with metadata + """ + try: + kwargs = { + "graph_id": graph_id, + "client": self.config.get("client"), + } + + response = list_tables(**kwargs) + + if response.status_code != 200 or not response.parsed: + logger.error(f"Failed to list tables: {response.status_code}") + return [] + + table_data = response.parsed + tables = getattr(table_data, "tables", []) + + return [ + TableInfo( + table_name=t.table_name, + table_type=t.table_type, + row_count=t.row_count, + file_count=t.file_count or 0, + total_size_bytes=t.total_size_bytes or 0, + ) + for t in tables + ] + + except Exception as e: + logger.error(f"Failed to list tables: {e}") + return [] + + def query( + self, graph_id: str, sql_query: str, limit: Optional[int] = None + ) -> QueryResult: + """ + Execute SQL query against DuckDB staging tables. + + Args: + graph_id: Graph database identifier + sql_query: SQL query to execute + limit: Optional row limit + + Returns: + QueryResult with columns and rows + + Example: + >>> result = client.tables.query( + ... graph_id, + ... "SELECT * FROM Entity WHERE entity_type = 'CORPORATION'" + ... ) + >>> for row in result.rows: + ... print(row) + """ + try: + final_query = sql_query + if limit is not None: + final_query = f"{sql_query.rstrip(';')} LIMIT {limit}" + + request = TableQueryRequest(sql=final_query) + + kwargs = { + "graph_id": graph_id, + "client": self.config.get("client"), + "body": request, + } + + response = query_tables(**kwargs) + + if response.status_code != 200 or not response.parsed: + error_msg = f"Query failed: {response.status_code}" + return QueryResult( + columns=[], + rows=[], + row_count=0, + execution_time_ms=0, + success=False, + error=error_msg, + ) + + result_data = response.parsed + + return QueryResult( + columns=result_data.columns, + rows=result_data.rows, + row_count=len(result_data.rows), + execution_time_ms=getattr(result_data, "execution_time_ms", 0), + success=True, + ) + + except Exception as e: + logger.error(f"Query failed: {e}") + return QueryResult( + columns=[], + rows=[], + row_count=0, + execution_time_ms=0, + success=False, + error=str(e), + ) diff --git a/robosystems_client/extensions/table_ingest_client.py b/robosystems_client/extensions/table_ingest_client.py deleted file mode 100644 index 8a14757..0000000 --- a/robosystems_client/extensions/table_ingest_client.py +++ /dev/null @@ -1,463 +0,0 @@ -"""Table Ingest Client for RoboSystems API - -Simplifies uploading Parquet files to staging tables and ingesting them into graphs. -""" - -from dataclasses import dataclass -from io import BytesIO -from pathlib import Path -from typing import Dict, Any, Optional, Callable, List, Union, BinaryIO -import json -import logging -import httpx - -from ..api.files.create_file_upload import ( - sync_detailed as create_file_upload, -) -from ..api.files.update_file import ( - sync_detailed as update_file, -) -from ..api.tables.list_tables import ( - sync_detailed as list_tables, -) -from ..api.materialization.materialize_graph import ( - sync_detailed as materialize_graph, -) -from ..models.file_upload_request import FileUploadRequest -from ..models.file_status_update import FileStatusUpdate -from ..models.materialize_request import MaterializeRequest - -logger = logging.getLogger(__name__) - - -@dataclass -class UploadOptions: - """Options for file upload operations""" - - on_progress: Optional[Callable[[str], None]] = None - fix_localstack_url: bool = True # Auto-fix LocalStack URLs for localhost - file_name: Optional[str] = None # Override file name (useful for buffer uploads) - - -@dataclass -class IngestOptions: - """Options for table ingestion operations""" - - ignore_errors: bool = True - rebuild: bool = False - on_progress: Optional[Callable[[str], None]] = None - - -@dataclass -class UploadResult: - """Result from file upload operation""" - - file_id: str - file_size: int - row_count: int - table_name: str - file_name: str - success: bool = True - error: Optional[str] = None - - -@dataclass -class TableInfo: - """Information about a staging table""" - - table_name: str - row_count: int - file_count: int - total_size_bytes: int - - -class TableIngestClient: - """Enhanced table ingest client with simplified upload workflow""" - - def __init__(self, config: Dict[str, Any]): - self.config = config - self.base_url = config["base_url"] - self.headers = config.get("headers", {}) - self.token = config.get("token") - # Create httpx client for S3 uploads - self._http_client = httpx.Client(timeout=120.0) - - def upload_parquet_file( - self, - graph_id: str, - table_name: str, - file_or_buffer: Union[Path, str, BytesIO, BinaryIO], - options: Optional[UploadOptions] = None, - ) -> UploadResult: - """ - Upload a Parquet file to a staging table. - - This method handles the complete 3-step upload process: - 1. Get presigned upload URL - 2. Upload file to S3 - 3. Mark file as 'uploaded' (backend validates, calculates size/row count) - - Args: - graph_id: The graph ID - table_name: Name of the staging table - file_or_buffer: Path to the Parquet file or BytesIO/BinaryIO buffer - options: Upload options - - Returns: - UploadResult with upload details (size/row count calculated by backend) - """ - if options is None: - options = UploadOptions() - - # Auto-detect if this is a file path or buffer - is_buffer = isinstance(file_or_buffer, (BytesIO, BinaryIO)) or hasattr( - file_or_buffer, "read" - ) - - # Initialize file_path for type checking - file_path: Optional[Path] = None - - if is_buffer: - # Handle buffer upload - file_name = options.file_name or "data.parquet" - else: - # Handle file path upload - file_path = Path(file_or_buffer) - file_name = file_path.name - if not file_path.exists(): - return UploadResult( - file_id="", - file_size=0, - row_count=0, - table_name=table_name, - file_name=file_name, - success=False, - error=f"File not found: {file_path}", - ) - - try: - # Import client here to avoid circular imports - from ..client import AuthenticatedClient - - # Create authenticated client with X-API-Key - # The token is extracted from X-API-Key header in extensions.py - if not self.token: - return UploadResult( - file_id="", - file_size=0, - row_count=0, - table_name=table_name, - file_name=file_name, - success=False, - error="No API key provided. Set X-API-Key in headers.", - ) - - client = AuthenticatedClient( - base_url=self.base_url, - token=self.token, - prefix="", # No prefix for X-API-Key - auth_header_name="X-API-Key", # Use X-API-Key header instead of Authorization - headers=self.headers, - ) - - # Step 1: Get presigned upload URL - if options.on_progress: - options.on_progress( - f"Getting upload URL for {file_name} -> table '{table_name}'..." - ) - - upload_request = FileUploadRequest( - file_name=file_name, content_type="application/x-parquet", table_name=table_name - ) - - kwargs = { - "graph_id": graph_id, - "client": client, - "body": upload_request, - } - - response = create_file_upload(**kwargs) - - if not response.parsed: - error_msg = f"Failed to get upload URL (status: {response.status_code})" - if hasattr(response, "content"): - try: - error_detail = json.loads(response.content) - error_msg = f"{error_msg}: {error_detail}" - except (json.JSONDecodeError, ValueError): - error_msg = f"{error_msg}: {response.content[:200]}" - - return UploadResult( - file_id="", - file_size=0, - row_count=0, - table_name=table_name, - file_name=file_name, - success=False, - error=error_msg, - ) - - upload_url = response.parsed.upload_url - file_id = response.parsed.file_id - - # Fix LocalStack URL if needed - if options.fix_localstack_url and "localstack:4566" in upload_url: - upload_url = upload_url.replace("localstack:4566", "localhost:4566") - logger.debug("Fixed LocalStack URL for localhost access") - - # Step 2: Upload file to S3 - if options.on_progress: - options.on_progress(f"Uploading {file_name} to S3...") - - # Read file content - handle both paths and buffers - if is_buffer: - # Read from buffer - if hasattr(file_or_buffer, "getvalue"): - file_content = file_or_buffer.getvalue() - else: - # BinaryIO or file-like object - file_or_buffer.seek(0) - file_content = file_or_buffer.read() - else: - # Read from file path - if file_path is None: - raise ValueError("file_path should not be None when not using buffer") - with open(file_path, "rb") as f: - file_content = f.read() - - s3_response = self._http_client.put( - upload_url, - content=file_content, - headers={"Content-Type": "application/x-parquet"}, - ) - s3_response.raise_for_status() - - # Step 3: Mark file as uploaded (backend validates and calculates size/row count) - if options.on_progress: - options.on_progress(f"Marking {file_name} as uploaded...") - - status_update = FileStatusUpdate(status="uploaded") - - kwargs = { - "graph_id": graph_id, - "file_id": file_id, - "client": client, - "body": status_update, - } - - update_response = update_file(**kwargs) - - if not update_response.parsed: - logger.error( - f"No parsed response from update_file. Status code: {update_response.status_code}" - ) - return UploadResult( - file_id=file_id, - file_size=0, - row_count=0, - table_name=table_name, - file_name=file_name, - success=False, - error="Failed to complete file upload", - ) - - response_data = update_response.parsed - - if isinstance(response_data, dict): - file_size = response_data.get("file_size_bytes", 0) - row_count = response_data.get("row_count", 0) - elif hasattr(response_data, "additional_properties"): - file_size = response_data.additional_properties.get("file_size_bytes", 0) - row_count = response_data.additional_properties.get("row_count", 0) - else: - file_size = getattr(response_data, "file_size_bytes", 0) - row_count = getattr(response_data, "row_count", 0) - - if options.on_progress: - options.on_progress( - f"✅ Uploaded {file_name} ({file_size:,} bytes, {row_count:,} rows)" - ) - - return UploadResult( - file_id=file_id, - file_size=file_size, - row_count=row_count, - table_name=table_name, - file_name=file_name, - success=True, - ) - - except Exception as e: - logger.error(f"Upload failed for {file_name}: {e}") - return UploadResult( - file_id="", - file_size=0, - row_count=0, - table_name=table_name, - file_name=file_name, - success=False, - error=str(e), - ) - - def list_staging_tables(self, graph_id: str) -> List[TableInfo]: - """ - List all staging tables in a graph. - - Args: - graph_id: The graph ID - - Returns: - List of TableInfo objects - """ - try: - from ..client import AuthenticatedClient - - if not self.token: - logger.error("No API key provided") - return [] - - client = AuthenticatedClient( - base_url=self.base_url, - token=self.token, - prefix="", - auth_header_name="X-API-Key", - headers=self.headers, - ) - - kwargs = {"graph_id": graph_id, "client": client} - - response = list_tables(**kwargs) - - if not response.parsed: - logger.error("Failed to list tables") - return [] - - tables = [] - for table_data in response.parsed.tables: - tables.append( - TableInfo( - table_name=table_data.table_name, - row_count=table_data.row_count, - file_count=table_data.file_count, - total_size_bytes=table_data.total_size_bytes, - ) - ) - - return tables - - except Exception as e: - logger.error(f"Failed to list tables: {e}") - return [] - - def ingest_all_tables( - self, graph_id: str, options: Optional[IngestOptions] = None - ) -> Dict[str, Any]: - """ - Materialize the graph from all staging tables. - - This rebuilds the complete graph database from the current state of DuckDB staging tables. - - Args: - graph_id: The graph ID - options: Ingest options - - Returns: - Dictionary with materialization results - """ - if options is None: - options = IngestOptions() - - try: - from ..client import AuthenticatedClient - - if not self.token: - return {"success": False, "error": "No API key provided"} - - client = AuthenticatedClient( - base_url=self.base_url, - token=self.token, - prefix="", - auth_header_name="X-API-Key", - headers=self.headers, - ) - - if options.on_progress: - options.on_progress("Starting table materialization...") - - materialize_request = MaterializeRequest( - ignore_errors=options.ignore_errors, rebuild=options.rebuild, force=True - ) - - kwargs = { - "graph_id": graph_id, - "client": client, - "body": materialize_request, - } - - response = materialize_graph(**kwargs) - - if not response.parsed: - return {"success": False, "error": "Failed to materialize graph"} - - result = { - "success": True, - "operation_id": getattr(response.parsed, "operation_id", None), - "message": getattr(response.parsed, "message", "Materialization started"), - } - - if options.on_progress: - options.on_progress("✅ Graph materialization completed") - - return result - - except Exception as e: - logger.error(f"Failed to materialize graph: {e}") - return {"success": False, "error": str(e)} - - def upload_and_ingest( - self, - graph_id: str, - table_name: str, - file_path: Path, - upload_options: Optional[UploadOptions] = None, - ingest_options: Optional[IngestOptions] = None, - ) -> Dict[str, Any]: - """ - Convenience method to upload a file and immediately ingest it. - - Args: - graph_id: The graph ID - table_name: Name of the staging table - file_path: Path to the Parquet file - upload_options: Upload options - ingest_options: Ingest options - - Returns: - Dictionary with upload and ingest results - """ - # Upload the file - upload_result = self.upload_parquet_file( - graph_id, table_name, file_path, upload_options - ) - - if not upload_result.success: - return { - "success": False, - "upload": upload_result, - "ingest": None, - "error": upload_result.error, - } - - # Ingest the table - ingest_result = self.ingest_all_tables(graph_id, ingest_options) - - return { - "success": upload_result.success and ingest_result.get("success", False), - "upload": upload_result, - "ingest": ingest_result, - } - - def close(self): - """Close HTTP client connections""" - if self._http_client: - self._http_client.close() diff --git a/tests/test_extensions.py b/tests/test_extensions.py index c43ba20..ffbab09 100644 --- a/tests/test_extensions.py +++ b/tests/test_extensions.py @@ -4,7 +4,9 @@ from robosystems_client.extensions import ( RoboSystemsExtensions, RoboSystemsExtensionConfig, - TableIngestClient, + FileClient, + MaterializationClient, + TableClient, QueryClient, OperationClient, GraphClient, @@ -22,10 +24,11 @@ def test_extensions_initialization_default(self): assert extensions.config["base_url"] == "http://localhost:8000" assert isinstance(extensions.query, QueryClient) assert isinstance(extensions.operations, OperationClient) - assert isinstance(extensions.tables, TableIngestClient) + assert isinstance(extensions.files, FileClient) + assert isinstance(extensions.materialization, MaterializationClient) + assert isinstance(extensions.tables, TableClient) assert isinstance(extensions.graphs, GraphClient) - # Cleanup extensions.close() def test_extensions_initialization_with_config(self): @@ -103,8 +106,36 @@ def test_operation_client_receives_config(self): extensions.close() - def test_table_ingest_client_receives_config(self): - """Test that TableIngestClient receives proper config.""" + def test_file_client_receives_config(self): + """Test that FileClient receives proper config.""" + config = RoboSystemsExtensionConfig( + base_url="https://api.robosystems.ai", + headers={"X-API-Key": "test-token"}, + ) + + extensions = RoboSystemsExtensions(config) + + assert extensions.files.base_url == "https://api.robosystems.ai" + assert "X-API-Key" in extensions.files.headers + + extensions.close() + + def test_materialization_client_receives_config(self): + """Test that MaterializationClient receives proper config.""" + config = RoboSystemsExtensionConfig( + base_url="https://api.robosystems.ai", + headers={"X-API-Key": "test-token"}, + ) + + extensions = RoboSystemsExtensions(config) + + assert extensions.materialization.base_url == "https://api.robosystems.ai" + assert "X-API-Key" in extensions.materialization.headers + + extensions.close() + + def test_table_client_receives_config(self): + """Test that TableClient receives proper config.""" config = RoboSystemsExtensionConfig( base_url="https://api.robosystems.ai", headers={"X-API-Key": "test-token"}, diff --git a/tests/test_table_ingest_client.py b/tests/test_table_ingest_client.py deleted file mode 100644 index cfae7da..0000000 --- a/tests/test_table_ingest_client.py +++ /dev/null @@ -1,112 +0,0 @@ -"""Unit tests for TableIngestClient.""" - -import pytest -from robosystems_client.extensions.table_ingest_client import ( - TableIngestClient, - UploadOptions, - IngestOptions, - UploadResult, - TableInfo, -) - - -@pytest.mark.unit -class TestTableIngestClient: - """Test suite for TableIngestClient.""" - - def test_client_initialization(self, mock_config): - """Test that client initializes correctly with config.""" - client = TableIngestClient(mock_config) - - assert client.base_url == "http://localhost:8000" - assert client.token == "test-api-key" - assert client.headers == {"X-API-Key": "test-api-key"} - - def test_upload_options_dataclass(self): - """Test UploadOptions dataclass.""" - options = UploadOptions( - on_progress=lambda msg: print(msg), - fix_localstack_url=True, - file_name="test.parquet", - ) - - assert options.fix_localstack_url is True - assert options.file_name == "test.parquet" - assert options.on_progress is not None - - def test_ingest_options_dataclass(self): - """Test IngestOptions dataclass.""" - options = IngestOptions( - ignore_errors=False, rebuild=True, on_progress=lambda msg: print(msg) - ) - - assert options.ignore_errors is False - assert options.rebuild is True - assert options.on_progress is not None - - def test_upload_result_dataclass(self): - """Test UploadResult dataclass.""" - result = UploadResult( - file_id="file-123", - file_size=5000, - row_count=100, - table_name="Entity", - file_name="data.parquet", - success=True, - ) - - assert result.file_id == "file-123" - assert result.file_size == 5000 - assert result.row_count == 100 - assert result.table_name == "Entity" - assert result.success is True - assert result.error is None - - def test_upload_result_with_error(self): - """Test UploadResult with error.""" - result = UploadResult( - file_id="", - file_size=0, - row_count=0, - table_name="Entity", - file_name="data.parquet", - success=False, - error="Upload failed", - ) - - assert result.success is False - assert result.error == "Upload failed" - - def test_table_info_dataclass(self): - """Test TableInfo dataclass.""" - info = TableInfo( - table_name="Entity", row_count=1000, file_count=5, total_size_bytes=50000 - ) - - assert info.table_name == "Entity" - assert info.row_count == 1000 - assert info.file_count == 5 - assert info.total_size_bytes == 50000 - - def test_close_method(self, mock_config): - """Test that close method exists and can be called.""" - client = TableIngestClient(mock_config) - - # Should not raise any exceptions - client.close() - - def test_default_upload_options(self): - """Test default UploadOptions values.""" - options = UploadOptions() - - assert options.on_progress is None - assert options.fix_localstack_url is True - assert options.file_name is None - - def test_default_ingest_options(self): - """Test default IngestOptions values.""" - options = IngestOptions() - - assert options.ignore_errors is True - assert options.rebuild is False - assert options.on_progress is None