Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions py-src/data_formulator/data_loader/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from data_formulator.data_loader.external_data_loader import ExternalDataLoader
from data_formulator.data_loader.mysql_data_loader import MySQLDataLoader
from data_formulator.data_loader.kusto_data_loader import KustoDataLoader
from data_formulator.data_loader.s3_data_loader import S3DataLoader

DATA_LOADERS = {
"mysql": MySQLDataLoader,
"kusto": KustoDataLoader
"kusto": KustoDataLoader,
"s3": S3DataLoader,
}

__all__ = ["ExternalDataLoader", "MySQLDataLoader", "KustoDataLoader", "DATA_LOADERS"]
__all__ = ["ExternalDataLoader", "MySQLDataLoader", "KustoDataLoader", "S3DataLoader", "DATA_LOADERS"]
188 changes: 188 additions & 0 deletions py-src/data_formulator/data_loader/s3_data_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
import json
import pandas as pd
import duckdb
import os

from data_formulator.data_loader.external_data_loader import ExternalDataLoader, sanitize_table_name
from typing import Dict, Any, List

class S3DataLoader(ExternalDataLoader):

@staticmethod
def list_params() -> List[Dict[str, Any]]:
params_list = [
{"name": "aws_access_key_id", "type": "string", "required": True, "default": "", "description": "AWS access key ID"},
{"name": "aws_secret_access_key", "type": "string", "required": True, "default": "", "description": "AWS secret access key"},
{"name": "aws_session_token", "type": "string", "required": False, "default": "", "description": "AWS session token (required for temporary credentials)"},
{"name": "region_name", "type": "string", "required": True, "default": "us-east-1", "description": "AWS region name"},
{"name": "bucket", "type": "string", "required": True, "default": "", "description": "S3 bucket name"}
]
return params_list

def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnection):
self.params = params
self.duck_db_conn = duck_db_conn

# Extract parameters
self.aws_access_key_id = params.get("aws_access_key_id", "")
self.aws_secret_access_key = params.get("aws_secret_access_key", "")
self.aws_session_token = params.get("aws_session_token", "")
self.region_name = params.get("region_name", "us-east-1")
self.bucket = params.get("bucket", "")

# Install and load the httpfs extension for S3 access
self.duck_db_conn.install_extension("httpfs")
self.duck_db_conn.load_extension("httpfs")

# Set AWS credentials for DuckDB
self.duck_db_conn.execute(f"SET s3_region='{self.region_name}'")
self.duck_db_conn.execute(f"SET s3_access_key_id='{self.aws_access_key_id}'")
self.duck_db_conn.execute(f"SET s3_secret_access_key='{self.aws_secret_access_key}'")
if self.aws_session_token: # Add this block
self.duck_db_conn.execute(f"SET s3_session_token='{self.aws_session_token}'")

def list_tables(self) -> List[Dict[str, Any]]:
# Use boto3 to list objects in the bucket
import boto3

s3_client = boto3.client(
Copy link

Copilot AI May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The boto3 client is created in multiple methods; consider extracting common initialization into a private helper to reduce duplication and simplify updates.

Copilot uses AI. Check for mistakes.
's3',
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
aws_session_token=self.aws_session_token if self.aws_session_token else None,
region_name=self.region_name
)

# List objects in the bucket
response = s3_client.list_objects_v2(Bucket=self.bucket)

results = []

if 'Contents' in response:
for obj in response['Contents']:
key = obj['Key']

# Skip directories and non-data files
if key.endswith('/') or not self._is_supported_file(key):
continue

# Create S3 URL
s3_url = f"s3://{self.bucket}/{key}"

try:
# Choose the appropriate read function based on file extension
if s3_url.lower().endswith('.parquet'):
sample_df = self.duck_db_conn.execute(f"SELECT * FROM read_parquet('{s3_url}') LIMIT 10").df()
elif s3_url.lower().endswith('.json') or s3_url.lower().endswith('.jsonl'):
sample_df = self.duck_db_conn.execute(f"SELECT * FROM read_json_auto('{s3_url}') LIMIT 10").df()
elif s3_url.lower().endswith('.csv'): # Default to CSV for other formats
sample_df = self.duck_db_conn.execute(f"SELECT * FROM read_csv_auto('{s3_url}') LIMIT 10").df()

# Get column information
columns = [{
'name': col,
'type': str(sample_df[col].dtype)
} for col in sample_df.columns]

# Get sample data
sample_rows = json.loads(sample_df.to_json(orient="records"))

# Estimate row count (this is approximate for CSV files)
row_count = self._estimate_row_count(s3_url)

table_metadata = {
"row_count": row_count,
"columns": columns,
"sample_rows": sample_rows
}

results.append({
"name": s3_url,
"metadata": table_metadata
})
except Exception as e:
# Skip files that can't be read
print(f"Error reading {s3_url}: {e}")
Copy link

Copilot AI May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Replace print statements with a proper logging framework to avoid uncontrolled console output in production libraries.

Suggested change
print(f"Error reading {s3_url}: {e}")
logging.error(f"Error reading {s3_url}: {e}")

Copilot uses AI. Check for mistakes.
continue

return results

def _is_supported_file(self, key: str) -> bool:
"""Check if the file type is supported by DuckDB."""
supported_extensions = ['.csv', '.parquet', '.json', '.jsonl']
return any(key.lower().endswith(ext) for ext in supported_extensions)

def _estimate_row_count(self, s3_url: str) -> int:
"""Estimate the number of rows in a file."""
try:
# For parquet files, we can get the exact count
if s3_url.lower().endswith('.parquet'):
Copy link

Copilot AI May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a full COUNT(*) scan on large Parquet files may be slow; consider reading row-count metadata from the file footer instead of scanning all rows.

Copilot uses AI. Check for mistakes.
count = self.duck_db_conn.execute(f"SELECT COUNT(*) FROM read_parquet('{s3_url}')").fetchone()[0]
return count

# For CSV files, we'll sample the file to estimate size
sample_size = 1000
sample_df = self.duck_db_conn.execute(f"SELECT * FROM read_csv_auto('{s3_url}') LIMIT {sample_size}").df()

# Get file size from S3
import boto3
s3_client = boto3.client(
's3',
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
aws_session_token=self.aws_session_token if self.aws_session_token else None,
region_name=self.region_name
)

key = s3_url.replace(f"s3://{self.bucket}/", "")
response = s3_client.head_object(Bucket=self.bucket, Key=key)
file_size = response['ContentLength']

# Estimate based on sample size and file size
if len(sample_df) > 0:
# Calculate average row size in bytes
avg_row_size = file_size / len(sample_df)
estimated_rows = int(file_size / avg_row_size)
return min(estimated_rows, 1000000) # Cap at 1 million for UI performance

return 0
except Exception as e:
print(f"Error estimating row count for {s3_url}: {e}")
return 0

def ingest_data(self, table_name: str, name_as: str = None, size: int = 1000000):
if name_as is None:
name_as = table_name.split('/')[-1].split('.')[0]

name_as = sanitize_table_name(name_as)

# Determine file type and use appropriate DuckDB function
if table_name.lower().endswith('.csv'):
self.duck_db_conn.execute(f"""
CREATE OR REPLACE TABLE main.{name_as} AS
SELECT * FROM read_csv_auto('{table_name}')
LIMIT {size}
""")
elif table_name.lower().endswith('.parquet'):
self.duck_db_conn.execute(f"""
CREATE OR REPLACE TABLE main.{name_as} AS
SELECT * FROM read_parquet('{table_name}')
LIMIT {size}
""")
elif table_name.lower().endswith('.json') or table_name.lower().endswith('.jsonl'):
self.duck_db_conn.execute(f"""
CREATE OR REPLACE TABLE main.{name_as} AS
SELECT * FROM read_json_auto('{table_name}')
LIMIT {size}
""")
else:
raise ValueError(f"Unsupported file type: {table_name}")

def view_query_sample(self, query: str) -> List[Dict[str, Any]]:
return self.duck_db_conn.execute(query).df().head(10).to_dict(orient="records")

def ingest_data_from_query(self, query: str, name_as: str):
# Execute the query and get results as a DataFrame
df = self.duck_db_conn.execute(query).df()
# Use the base class's method to ingest the DataFrame
self.ingest_df_to_duckdb(df, name_as)
5 changes: 4 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ flask
openai
azure-identity
azure-keyvault-secrets
azure-kusto-data
azure-storage-blob
python-dotenv
vega_datasets
litellm
duckdb
-e . #also need to install data formulator itself
boto3
-e . #also need to install data formulator itself
2 changes: 1 addition & 1 deletion src/views/DBTableManager.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ export const DBTableSelectionDialog: React.FC<{ buttonElement: any }> = function
sx={{px: 0.5}}
>
<Typography variant="caption" sx={{color: "text.secondary", fontWeight: "bold", px: 1}}>connect external data</Typography>
{["file upload", "mysql", "kusto"].map((dataLoaderType, i) => (
{["file upload", "mysql", "kusto","s3"].map((dataLoaderType, i) => (
<Tab
key={`dataLoader:${dataLoaderType}`}
wrapped
Expand Down