+ 🪄 Explore data with visualizations, powered by AI agents.
+
-🪄 Turn data into insights with AI Agents, with the exploration paths you choose. Try Data Formulator now!
+
+
+
+
+
-- 🤖 New in v0.5: agent model + interative control [(video)](https://www.youtube.com/watch?v=GfTE2FLyMrs)
-- 🔥🔥🔥 Try our online demo at [https://data-formulator.ai](https://data-formulator.ai)
-- Any questions, thoughts? Discuss in the Discord channel! [](https://discord.gg/mYCZMQKYZb)
+
+
+
+
+
+
+
+
-https://github.com/user-attachments/assets/8ca57b68-4d7a-42cb-bcce-43f8b1681ce2
-
-
+
+
+
## News 🔥🔥🔥
+[12-08-2025] **Data Formulator 0.5.1** — Connect more, visualize more, move faster
+- 🔌 **Community data loaders**: Google BigQuery, MySQL, Postgres, MongoDB
+- 📊 **New chart types**: US Map & Pie Chart (more to be added soon)
+- ✏️ **Editable reports**: Refine generated reports with [Chartifact](https://github.com/microsoft/chartifact) in markdown style. [demo](https://github.com/microsoft/data-formulator/pull/200#issue-3635408217)
+- ⚡ **Snappier UI**: Noticeably faster interactions across the board
[11-07-2025] Data Formulator 0.5: Vibe with your data, in control
@@ -109,9 +118,9 @@ Here are milestones that lead to the current design:
## Overview
-**Data Formulator** is an application from Microsoft Research that uses AI agents to make it easier to turn data into insights.
+**Data Formulator** is a Microsoft Research prototype for data exploration with visualizations powered by AI agents.
-Data Formulator is an AI-powered tool for analysts to iteratively explore and visualize data. Started with data in any format (screenshot, text, csv, or database), users can work with AI agents with a novel blended interface that combines *user interface interactions (UI)* and *natural language (NL) inputs* to communicate their intents, control branching exploration directions, and create reports to share their insights.
+Data Formulator enables analysts to iteratively explore and visualize data. Started with data in any format (screenshot, text, csv, or database), users can work with AI agents with a novel blended interface that combines *user interface interactions (UI)* and *natural language (NL) inputs* to communicate their intents, control branching exploration directions, and create reports to share their insights.
## Get Started
diff --git a/package.json b/package.json
index e8d88ab3..66bfcfd4 100644
--- a/package.json
+++ b/package.json
@@ -44,7 +44,7 @@
"redux-persist": "^6.0.0",
"typescript": "^4.9.5",
"validator": "^13.15.20",
- "vega": "^5.32.0",
+ "vega": "^6.2.0",
"vega-embed": "^6.21.0",
"vega-lite": "^5.5.0",
"vm-browserify": "^1.1.2"
diff --git a/py-src/data_formulator/agents/agent_interactive_explore.py b/py-src/data_formulator/agents/agent_interactive_explore.py
index 28fc0de1..e1c4f2b2 100644
--- a/py-src/data_formulator/agents/agent_interactive_explore.py
+++ b/py-src/data_formulator/agents/agent_interactive_explore.py
@@ -67,54 +67,42 @@
* when the exploration context is provided, make your suggestion based on the context as well as the original dataset; otherwise leverage the original dataset to suggest questions.
Guidelines for question suggestions:
-1. Suggest a list of question_groups of interesting analytical questions that are not obvious that can uncover nontrivial insights, including both breadth and depth questions.
-
+1. Suggest a list of question_groups of interesting analytical questions that are not obvious that can uncover nontrivial insights.
2. Use a diverse language style to display the questions (can be questions, statements etc)
3. If there are multiple datasets in a thread, consider relationships between them
4. CONCISENESS: the questions should be concise and to the point
5. QUESTION GROUP GENERATION:
- different questions groups should cover different aspects of the data analysis for user to choose from.
- - each question_group should include both 'breadth_questions' and 'depth_questions':
- - breadth_questions: a group of questions that are all relatively simple that helps the user understand the data in a broad sense.
- - depth_questions: a sequence of questions that build on top of each other to answer a specific aspect of the user's goal.
- - you have a budget of generating 4 questions in total (or as directed by the user).
- - allocate 2-3 questions to 'breadth_questions' and 2-3 questions to 'depth_questions' based on the user's goal and the data.
- - each question group should slightly lean towards 'breadth' or 'depth' exploration, but not too much.
- - the more focused area can have more questions than the other area.
+ - each question_group is a sequence of 'questions' that builds on top of each other to answer the user's goal.
- each question group should have a difficulty level (easy / medium / hard),
- simple questions should be short -- single sentence exploratory questions
- medium questions can be 1-2 sentences exploratory questions
- hard questions should introduce some new analysis concept but still make it concise
- if suitable, include a group of questions that are related to statistical analysis: forecasting, regression, or clustering.
6. QUESTIONS WITHIN A QUESTION GROUP:
- - all questions should be a new question based on the thread of exploration the user provided, do not repeat questions that have already been explored in the thread
+ - raise new questions that are related to the user's goal, do not repeat questions that have already been explored in the context provided to you.
- if the user provides a start question, suggested questions should be related to the start question.
- - when suggesting 'breadth_questions' in a question_group, they should be a group of questions:
- - they are related to the user's goal, they should each explore a different aspect of the user's goal in parallel.
- - questions should consider different fields, metrics and statistical methods.
- - each question within the group should be distinct from each other that they will lead to different insights and visualizations
- - when suggesting 'depth_questions' in a question_group, they should be a sequence of questions:
- - start of the question should provide an overview of the data in the direction going to be explored, and it will be refined in the subsequent questions.
- - they progressively dive deeper into the data, building on top of the previous question.
- - each question should be related to the previous question, introducing refined analysis (e.g., updated computation, filtering, different grouping, etc.)
+ - the questions should progressively dive deeper into the data, building on top of the previous question.
+ - start of the question should provide an overview of the data in the direction going to be explored.
+ - followup questions should refine the previous question, introducing refined analysis to deep dive into the data (e.g., updated computation, filtering, different grouping, etc.)
+ - don't jump too far from the previous question so that readers can understand the flow of the questions.
- every question should be answerable with a visualization.
7. FORMATTING:
- - include "breadth_questions" and "depth_questions" in the question group:
- - each question group should have 2-3 questions (or as directed by the user).
+ - include "questions" in the question group:
+ - each question group should have 2-4 questions (or as directed by the user).
- For each question group, include a 'goal' that summarizes the goal of the question group.
- The goal should all be a short single sentence (<12 words).
- Meaning of the 'goal' should be clear that the user won't misunderstand the actual question descibed in 'text'.
- It should capture the key computation and exploration direction of the question (do not omit any information that may lead to ambiguity), but also keep it concise.
- include the **bold** keywords for the attributes / metrics that are important to the question, especially when the goal mentions fields / metrics in the original dataset (don't have to be exact match)
- include 'difficulty' to indicate the difficulty of the question, it should be one of 'easy', 'medium', 'hard'
- - a 'focus' field to indicate whether the overall question group leans more on 'breadth' or 'depth' exploration.
Output should be a list of json objects in the following format, each line should be a json object representing a question group, starting with 'data: ':
Format:
-data: {"breadth_questions": [...], "depth_questions": [...], "goal": ..., "difficulty": ..., "focus": "..."}
-data: {"breadth_questions": [...], "depth_questions": [...], "goal": ..., "difficulty": ..., "focus": "..."}
+data: {"questions": [...], "goal": ..., "difficulty": ...}
+data: {"questions": [...], "goal": ..., "difficulty": ...}
... // more question groups
'''
diff --git a/py-src/data_formulator/agents/agent_query_completion.py b/py-src/data_formulator/agents/agent_query_completion.py
index f60a2fa9..5e82f383 100644
--- a/py-src/data_formulator/agents/agent_query_completion.py
+++ b/py-src/data_formulator/agents/agent_query_completion.py
@@ -54,6 +54,10 @@ def __init__(self, client):
def run(self, data_source_metadata, query):
+ # For MongoDB, treat it as a SQL-like data source for query generation
+ if data_source_metadata['data_loader_type'] == "mongodb":
+ data_source_metadata['data_loader_type'] = "SQL"
+
user_query = f"[DATA SOURCE]\n\n{json.dumps(data_source_metadata, indent=2)}\n\n[USER INPUTS]\n\n{query}\n\n"
logger.info(user_query)
diff --git a/py-src/data_formulator/data_loader/__init__.py b/py-src/data_formulator/data_loader/__init__.py
index 1c964f8f..7015330e 100644
--- a/py-src/data_formulator/data_loader/__init__.py
+++ b/py-src/data_formulator/data_loader/__init__.py
@@ -5,6 +5,8 @@
from data_formulator.data_loader.s3_data_loader import S3DataLoader
from data_formulator.data_loader.azure_blob_data_loader import AzureBlobDataLoader
from data_formulator.data_loader.postgresql_data_loader import PostgreSQLDataLoader
+from data_formulator.data_loader.mongodb_data_loader import MongoDBDataLoader
+from data_formulator.data_loader.bigquery_data_loader import BigQueryDataLoader
DATA_LOADERS = {
"mysql": MySQLDataLoader,
@@ -12,8 +14,9 @@
"kusto": KustoDataLoader,
"s3": S3DataLoader,
"azure_blob": AzureBlobDataLoader,
- "postgresql": PostgreSQLDataLoader
+ "postgresql": PostgreSQLDataLoader,
+ "mongodb": MongoDBDataLoader,
+ "bigquery": BigQueryDataLoader
}
-__all__ = ["ExternalDataLoader", "MySQLDataLoader", "MSSQLDataLoader", "KustoDataLoader", "S3DataLoader", "AzureBlobDataLoader","PostgreSQLDataLoader","DATA_LOADERS"]
-
+__all__ = ["ExternalDataLoader", "MySQLDataLoader", "MSSQLDataLoader", "KustoDataLoader", "S3DataLoader", "AzureBlobDataLoader","PostgreSQLDataLoader", "MongoDBDataLoader", "BigQueryDataLoader", "DATA_LOADERS"]
\ No newline at end of file
diff --git a/py-src/data_formulator/data_loader/azure_blob_data_loader.py b/py-src/data_formulator/data_loader/azure_blob_data_loader.py
index 1959125f..da97a3bd 100644
--- a/py-src/data_formulator/data_loader/azure_blob_data_loader.py
+++ b/py-src/data_formulator/data_loader/azure_blob_data_loader.py
@@ -7,6 +7,13 @@
from typing import Dict, Any, List
from data_formulator.security import validate_sql_query
+try:
+ from azure.storage.blob import BlobServiceClient, ContainerClient
+ from azure.identity import DefaultAzureCredential, AzureCliCredential, ManagedIdentityCredential, EnvironmentCredential, ChainedTokenCredential
+ AZURE_BLOB_AVAILABLE = True
+except ImportError:
+ AZURE_BLOB_AVAILABLE = False
+
class AzureBlobDataLoader(ExternalDataLoader):
@staticmethod
@@ -59,6 +66,12 @@ def auth_instructions() -> str:
"""
def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnection):
+ if not AZURE_BLOB_AVAILABLE:
+ raise ImportError(
+ "Azure storage libraries are required for Azure Blob connections. "
+ "Install with: pip install azure-storage-blob azure-identity"
+ )
+
self.params = params
self.duck_db_conn = duck_db_conn
@@ -368,7 +381,7 @@ def view_query_sample(self, query: str) -> List[Dict[str, Any]]:
if not result:
raise ValueError(error_message)
- return self.duck_db_conn.execute(query).df().head(10).to_dict(orient="records")
+ return json.loads(self.duck_db_conn.execute(query).df().head(10).to_json(orient="records"))
def ingest_data_from_query(self, query: str, name_as: str):
# Execute the query and get results as a DataFrame
diff --git a/py-src/data_formulator/data_loader/bigquery_data_loader.py b/py-src/data_formulator/data_loader/bigquery_data_loader.py
new file mode 100644
index 00000000..c0383e87
--- /dev/null
+++ b/py-src/data_formulator/data_loader/bigquery_data_loader.py
@@ -0,0 +1,324 @@
+import json
+import logging
+import re
+from typing import Dict, Any, List, Optional
+import pandas as pd
+import duckdb
+
+from data_formulator.data_loader.external_data_loader import ExternalDataLoader, sanitize_table_name
+from data_formulator.security import validate_sql_query
+
+try:
+ from google.cloud import bigquery
+ from google.oauth2 import service_account
+ BIGQUERY_AVAILABLE = True
+except ImportError:
+ BIGQUERY_AVAILABLE = False
+
+log = logging.getLogger(__name__)
+
+class BigQueryDataLoader(ExternalDataLoader):
+ """BigQuery data loader implementation"""
+
+ @staticmethod
+ def list_params() -> List[Dict[str, Any]]:
+ return [
+ {"name": "project_id", "type": "text", "required": True, "description": "Google Cloud Project ID", "default": ""},
+ {"name": "dataset_id", "type": "text", "required": False, "description": "Dataset ID(s) - leave empty for all, or specify one (e.g., 'billing') or multiple separated by commas (e.g., 'billing,enterprise_collected,ga_api')", "default": ""},
+ {"name": "credentials_path", "type": "text", "required": False, "description": "Path to service account JSON file (optional)", "default": ""},
+ {"name": "location", "type": "text", "required": False, "description": "BigQuery location (default: US)", "default": "US"}
+ ]
+
+ @staticmethod
+ def auth_instructions() -> str:
+ return """BigQuery Authentication Instructions
+
+Authentication Options (choose one):
+
+Option 1 - Application Default Credentials (Recommended)
+ - Install Google Cloud SDK: https://cloud.google.com/sdk/docs/install
+ - Run `gcloud auth application-default login` in your terminal
+ - Leave `credentials_path` parameter empty
+ - Requires Google Cloud Project ID
+
+Option 2 - Service Account Key File
+ - Create a service account in Google Cloud Console
+ - Download the JSON key file
+ - Provide the full path to the JSON file in `credentials_path` parameter
+ - Grant the service account BigQuery Data Viewer role (or appropriate permissions)
+
+Option 3 - Environment Variables
+ - Set GOOGLE_APPLICATION_CREDENTIALS environment variable to point to your service account JSON file
+ - Leave `credentials_path` parameter empty
+
+Required Permissions:
+ - BigQuery Data Viewer (for reading data)
+ - BigQuery Job User (for running queries)
+
+Parameters:
+ - project_id: Your Google Cloud Project ID (required)
+ - dataset_id: Specific dataset to browse (optional - leave empty to see all datasets)
+ - location: BigQuery location/region (default: US)
+ - credentials_path: Path to service account JSON file (optional)
+
+Supported Operations:
+ - Browse datasets and tables
+ - Preview table schemas and data
+ - Import data from tables
+ - Execute custom SQL queries
+"""
+
+ def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnection):
+ if not BIGQUERY_AVAILABLE:
+ raise ImportError(
+ "google-cloud-bigquery is required for BigQuery connections. "
+ "Install with: pip install google-cloud-bigquery google-auth"
+ )
+
+ self.params = params
+ self.duck_db_conn = duck_db_conn
+ self.project_id = params.get("project_id")
+ self.dataset_ids = [d.strip() for d in params.get("dataset_id", "").split(",") if d.strip()] # Support multiple datasets
+ self.location = params.get("location", "US")
+
+ # Initialize BigQuery client
+ if params.get("credentials_path"):
+ credentials = service_account.Credentials.from_service_account_file(params["credentials_path"])
+ self.client = bigquery.Client(
+ project=self.project_id,
+ credentials=credentials,
+ location=self.location
+ )
+ else:
+ # Use default credentials (ADC)
+ self.client = bigquery.Client(
+ project=self.project_id,
+ location=self.location
+ )
+
+ def list_tables(self, table_filter: str = None) -> List[Dict[str, Any]]:
+ """List tables from BigQuery datasets"""
+ results = []
+
+ try:
+ log.info(f"Listing BigQuery datasets for project: {self.project_id}")
+
+ # List datasets with timeout
+ datasets = list(self.client.list_datasets(max_results=50))
+ log.info(f"Found {len(datasets)} datasets")
+
+ # Limit to first 10 datasets if no specific dataset is specified
+ if not self.dataset_ids:
+ datasets = datasets[:10]
+
+ for dataset in datasets:
+ dataset_id = dataset.dataset_id
+
+ # Skip if we have specific datasets and this isn't one of them
+ if self.dataset_ids and dataset_id not in self.dataset_ids:
+ continue
+
+ try:
+ log.info(f"Processing dataset: {dataset_id}")
+ # List tables in dataset with limit
+ tables = list(self.client.list_tables(dataset.reference, max_results=20))
+
+ for table in tables:
+ full_table_name = f"{self.project_id}.{dataset_id}.{table.table_id}"
+
+ # Apply filter if provided
+ if table_filter and table_filter.lower() not in table.table_id.lower():
+ continue
+
+ # Get basic table info without full schema for performance
+ try:
+ table_ref = self.client.get_table(table.reference)
+ columns = [{"name": field.name, "type": field.field_type} for field in table_ref.schema[:10]] # Limit columns shown
+
+ results.append({
+ "name": full_table_name,
+ "metadata": {
+ "row_count": table_ref.num_rows or 0,
+ "columns": columns,
+ "sample_rows": [] # Empty for performance, can be populated later
+ }
+ })
+ except Exception as e:
+ log.warning(f"Error getting schema for table {full_table_name}: {e}")
+ # Add table without detailed schema
+ results.append({
+ "name": full_table_name,
+ "metadata": {
+ "row_count": 0,
+ "columns": [],
+ "sample_rows": []
+ }
+ })
+
+ # Limit total results for performance
+ if len(results) >= 100:
+ log.info("Reached 100 table limit, stopping enumeration")
+ return results
+
+ except Exception as e:
+ log.warning(f"Error accessing dataset {dataset_id}: {e}")
+ continue
+
+ except Exception as e:
+ log.error(f"Error listing BigQuery tables: {e}")
+
+ log.info(f"Returning {len(results)} tables")
+ return results
+
+ def _convert_bigquery_dtypes(self, df: pd.DataFrame) -> pd.DataFrame:
+ """Convert BigQuery-specific dtypes to standard pandas dtypes"""
+
+ def safe_convert(x):
+ try:
+ if x is None or pd.isna(x):
+ return None
+ if isinstance(x, (dict, list)):
+ return json.dumps(x, default=str)
+ if hasattr(x, "__dict__"):
+ return json.dumps(x.__dict__, default=str)
+ s = str(x)
+ if "[object Object]" in s:
+ return json.dumps(x, default=str)
+ return s
+ except Exception:
+ return str(x) if x is not None else None
+
+ for col in df.columns:
+ # Convert db_dtypes.DateDtype to standard datetime
+ if hasattr(df[col].dtype, "name") and "dbdate" in str(df[col].dtype).lower():
+ df[col] = pd.to_datetime(df[col])
+ # Convert other db_dtypes if needed
+ elif str(df[col].dtype).startswith("db_dtypes"):
+ try:
+ df[col] = df[col].astype(str)
+ except Exception as e:
+ logging.error(f"Failed to convert column '{col}' to string: {e}")
+ # Handle nested objects/JSON columns
+ elif df[col].dtype == "object":
+ df[col] = df[col].apply(safe_convert)
+
+ return df
+
+ def ingest_data(self, table_name: str, name_as: Optional[str] = None, size: int = 1000000):
+ """Ingest data from BigQuery table into DuckDB with stable, de-duplicated column aliases."""
+ if name_as is None:
+ name_as = table_name.split('.')[-1]
+
+ name_as = sanitize_table_name(name_as)
+
+
+ table_ref = self.client.get_table(table_name)
+
+ select_parts: list[str] = []
+ used_aliases: dict[str, str] = {} # alias -> field_path
+
+ def build_alias(field_path: str) -> str:
+ """
+ Build a human-readable, globally unique alias from a BigQuery field path.
+
+ Examples:
+ 'geo.country' -> 'geo_country'
+ 'device.category' -> 'device_category'
+ 'event_params.value' -> 'event_params_value'
+ """
+ # path "a.b.c" -> "a_b_c"
+ alias = field_path.replace('.', '_')
+
+ # remove weird characters
+ alias = re.sub(r'[^0-9a-zA-Z_]', '_', alias)
+ alias = re.sub(r'_+', '_', alias).strip('_') or "col"
+
+ # must start with letter or underscore
+ if not alias[0].isalpha() and alias[0] != '_':
+ alias = f"_{alias}"
+
+ base_alias = alias
+ counter = 1
+ while alias in used_aliases:
+ # same alias from another path – suffix and log once
+ alias = f"{base_alias}_{counter}"
+ counter += 1
+
+ used_aliases[alias] = field_path
+ return alias
+
+ def add_field(field_path: str):
+ alias = build_alias(field_path)
+ select_parts.append(f"`{table_name}`.{field_path} AS `{alias}`")
+
+ def process_field(field, parent_path: str = ""):
+ """
+ Recursively process fields, flattening non-repeated RECORDs.
+ """
+ current_path = f"{parent_path}.{field.name}" if parent_path else field.name
+
+ # Flatten STRUCT / RECORD that is not REPEATED
+ if field.field_type == "RECORD" and field.mode != "REPEATED":
+ for subfield in field.fields:
+ process_field(subfield, current_path)
+ else:
+ # Regular field or REPEATED RECORD/array – select as a single column
+ add_field(current_path)
+
+ # Process all top-level fields
+ for field in table_ref.schema:
+ process_field(field)
+
+ if not select_parts:
+ raise ValueError(f"No fields found for table {table_name}")
+
+ query = f"SELECT {', '.join(select_parts)} FROM `{table_name}` LIMIT {size}"
+
+ df = self.client.query(query).to_dataframe()
+
+ # Safety net: drop exact duplicate names if something slipped through
+ if df.columns.duplicated().any():
+ dupes = df.columns[df.columns.duplicated()].tolist()
+ log.warning(f"Duplicate column names detected in DataFrame, dropping later ones: {dupes}")
+ df = df.loc[:, ~df.columns.duplicated()]
+
+
+ # Convert BigQuery-specific dtypes
+ df = self._convert_bigquery_dtypes(df)
+
+ self.ingest_df_to_duckdb(df, name_as)
+
+ def view_query_sample(self, query: str) -> List[Dict[str, Any]]:
+ """Execute query and return sample results as a list of dictionaries"""
+ result, error_message = validate_sql_query(query)
+ if not result:
+ raise ValueError(error_message)
+
+ # Add LIMIT if not present
+ if "LIMIT" not in query.upper():
+ query += " LIMIT 10"
+
+ df = self.client.query(query).to_dataframe()
+ return json.loads(df.to_json(orient="records"))
+
+ def ingest_data_from_query(self, query: str, name_as: str) -> pd.DataFrame:
+ """Execute custom query and ingest results into DuckDB"""
+ name_as = sanitize_table_name(name_as)
+
+ result, error_message = validate_sql_query(query)
+ if not result:
+ raise ValueError(error_message)
+
+ # Execute query and get DataFrame
+ df = self.client.query(query).to_dataframe()
+
+ # Drop duplicate columns
+ df = df.loc[:, ~df.columns.duplicated()]
+
+ # Convert BigQuery-specific dtypes
+ df = self._convert_bigquery_dtypes(df)
+
+ # Use base class method to ingest DataFrame
+ self.ingest_df_to_duckdb(df, name_as)
+
+ return df
diff --git a/py-src/data_formulator/data_loader/external_data_loader.py b/py-src/data_formulator/data_loader/external_data_loader.py
index 204b1802..6bc44254 100644
--- a/py-src/data_formulator/data_loader/external_data_loader.py
+++ b/py-src/data_formulator/data_loader/external_data_loader.py
@@ -108,7 +108,7 @@ def ingest_data(self, table_name: str, name_as: str = None, size: int = 1000000)
pass
@abstractmethod
- def view_query_sample(self, query: str) -> str:
+ def view_query_sample(self, query: str) -> List[Dict[str, Any]]:
pass
@abstractmethod
diff --git a/py-src/data_formulator/data_loader/kusto_data_loader.py b/py-src/data_formulator/data_loader/kusto_data_loader.py
index b2166375..88016215 100644
--- a/py-src/data_formulator/data_loader/kusto_data_loader.py
+++ b/py-src/data_formulator/data_loader/kusto_data_loader.py
@@ -8,11 +8,15 @@
import string
from datetime import datetime
-from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
-from azure.kusto.data.helpers import dataframe_from_result_table
-
from data_formulator.data_loader.external_data_loader import ExternalDataLoader, sanitize_table_name
+try:
+ from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
+ from azure.kusto.data.helpers import dataframe_from_result_table
+ KUSTO_AVAILABLE = True
+except ImportError:
+ KUSTO_AVAILABLE = False
+
# Get logger for this module (logging config done in app.py)
logger = logging.getLogger(__name__)
@@ -57,6 +61,11 @@ def auth_instructions() -> str:
"""
def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnection):
+ if not KUSTO_AVAILABLE:
+ raise ImportError(
+ "azure-kusto-data is required for Kusto/Azure Data Explorer connections. "
+ "Install with: pip install azure-kusto-data"
+ )
self.kusto_cluster = params.get("kusto_cluster", None)
self.kusto_database = params.get("kusto_database", None)
@@ -236,7 +245,7 @@ def ingest_data(self, table_name: str, name_as: str = None, size: int = 5000000)
total_rows_ingested += len(chunk_df)
- def view_query_sample(self, query: str) -> str:
+ def view_query_sample(self, query: str) -> List[Dict[str, Any]]:
df = self.query(query).head(10)
return json.loads(df.to_json(orient="records", date_format='iso'))
diff --git a/py-src/data_formulator/data_loader/mongodb_data_loader.py b/py-src/data_formulator/data_loader/mongodb_data_loader.py
new file mode 100644
index 00000000..32ad8cdb
--- /dev/null
+++ b/py-src/data_formulator/data_loader/mongodb_data_loader.py
@@ -0,0 +1,418 @@
+import json
+import string
+import random as rand
+
+import pandas as pd
+import duckdb
+import pymongo
+from bson import ObjectId
+from datetime import datetime
+
+from data_formulator.data_loader.external_data_loader import ExternalDataLoader, sanitize_table_name
+
+from data_formulator.security import validate_sql_query
+from typing import Dict, Any, Optional, List
+
+
+class MongoDBDataLoader(ExternalDataLoader):
+
+ @staticmethod
+ def list_params() -> bool:
+ params_list = [
+ {"name": "host", "type": "string", "required": True, "default": "localhost", "description": ""},
+ {"name": "port", "type": "int", "required": False, "default": 27017, "description": "MongoDB server port (default 27017)"},
+ {"name": "username", "type": "string", "required": False, "default": "", "description": ""},
+ {"name": "password", "type": "string", "required": False, "default": "", "description": ""},
+ {"name": "database", "type": "string", "required": True, "default": "", "description": ""},
+ {"name": "collection", "type": "string", "required": False, "default": "", "description": "If specified, only this collection will be accessed"},
+ {"name": "authSource", "type": "string", "required": False, "default": "", "description": "Authentication database (defaults to target database if empty)"}
+ ]
+ return params_list
+
+ @staticmethod
+ def auth_instructions() -> str:
+ return """
+MongoDB Connection Instructions:
+
+1. Local MongoDB Setup:
+ - Ensure MongoDB server is running on your machine
+ - Default connection: host='localhost', port=27017
+ - If authentication is not enabled, leave username and password empty
+
+2. Remote MongoDB Connection:
+ - Obtain host address, port, username, and password from your database administrator
+ - Ensure the MongoDB server allows remote connections
+
+3. Common Connection Parameters:
+ - host: MongoDB server address (default: 'localhost')
+ - port: MongoDB server port (default: 27017)
+ - username: Your MongoDB username (leave empty if no auth)
+ - password: Your MongoDB password (leave empty if no auth)
+ - database: Target database name to connect to
+ - collection: (Optional) Specific collection to access, leave empty to list all collections
+
+4. Troubleshooting:
+ - Verify MongoDB service is running: `mongod --version`
+ - Test connection: `mongosh --host [host] --port [port]`
+"""
+
+ def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnection):
+ self.params = params
+ self.duck_db_conn = duck_db_conn
+
+ try:
+ # Create MongoDB client
+ host = self.params.get("host", "localhost")
+ port = int(self.params.get("port", 27017))
+ username = self.params.get("username", "")
+ password = self.params.get("password", "")
+ database = self.params.get("database", "")
+ collection = self.params.get("collection", "")
+ auth_source = self.params.get("authSource", "") or database # Default to target database
+
+ if username and password:
+ # Use authSource to specify which database contains user credentials
+ self.mongo_client = pymongo.MongoClient(
+ host=host,
+ port=port,
+ username=username,
+ password=password,
+ authSource=auth_source
+ )
+ else:
+ self.mongo_client = pymongo.MongoClient(host=host, port=port)
+
+ self.db = self.mongo_client[database]
+ self.database_name = database
+
+ self.collection = self.db[collection] if collection else None
+
+ except Exception as e:
+ raise Exception(f"Failed to connect to MongoDB: {e}")
+
+ def close(self):
+ """Close the MongoDB connection"""
+ if hasattr(self, 'mongo_client') and self.mongo_client is not None:
+ try:
+ self.mongo_client.close()
+ self.mongo_client = None
+ except Exception as e:
+ print(f"Warning: Failed to close MongoDB connection: {e}")
+
+ def __enter__(self):
+ """Context manager entry"""
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """Context manager exit - ensures connection is closed"""
+ self.close()
+ return False
+
+ def __del__(self):
+ """Destructor to ensure connection is closed"""
+ self.close()
+
+ @staticmethod
+ def _flatten_document(doc: Dict[str, Any], parent_key: str = '', sep: str = '_') -> Dict[str, Any]:
+ """
+ Use recursion to flatten nested MongoDB documents
+ """
+ items = []
+ for key, value in doc.items():
+ new_key = f"{parent_key}{sep}{key}" if parent_key else key
+
+ if isinstance(value, dict):
+ items.extend(MongoDBDataLoader._flatten_document(value, new_key, sep).items())
+ elif isinstance(value, list):
+ if len(value) == 0:
+ items.append((new_key, None))
+ else:
+ for idx, item in enumerate(value, start=1):
+ item_key = f"{new_key}{sep}{idx}"
+ if isinstance(item, dict):
+ items.extend(MongoDBDataLoader._flatten_document(item, item_key, sep).items())
+ else:
+ items.append((item_key, item))
+ else:
+ items.append((new_key, value))
+
+ return dict(items)
+
+ @staticmethod
+ def _convert_special_types(doc: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ Convert MongoDB special types (ObjectId, datetime, etc.) to serializable types
+ """
+ result = {}
+ for key, value in doc.items():
+ if isinstance(value, ObjectId):
+ result[key] = str(value)
+ elif isinstance(value, datetime):
+ result[key] = value.isoformat()
+ elif isinstance(value, bytes):
+ result[key] = value.decode('utf-8', errors='ignore')
+ elif isinstance(value, dict):
+ result[key] = MongoDBDataLoader._convert_special_types(value)
+ elif isinstance(value, list):
+ result[key] = [
+ MongoDBDataLoader._convert_special_types(item) if isinstance(item, dict)
+ else str(item) if isinstance(item, ObjectId)
+ else item.isoformat() if isinstance(item, datetime)
+ else item
+ for item in value
+ ]
+ else:
+ result[key] = value
+ return result
+
+ def _process_documents(self, documents: List[Dict[str, Any]]) -> pd.DataFrame:
+ """
+ Process MongoDB documents list, flatten and convert to DataFrame
+ """
+ if not documents:
+ return pd.DataFrame()
+
+ processed_docs = []
+ for doc in documents:
+ converted = self._convert_special_types(doc)
+ flattened = self._flatten_document(converted)
+ processed_docs.append(flattened)
+
+ df = pd.DataFrame(processed_docs)
+ return df
+
+ def list_tables(self, table_filter: str = None):
+ """
+ List all collections
+ """
+ results = []
+
+ # Get specified collection or all collections
+ collection_param = self.params.get("collection", "")
+
+ if collection_param:
+ collection_names = [collection_param]
+ else:
+ collection_names = self.db.list_collection_names()
+
+ for collection_name in collection_names:
+ # Apply filter
+ if table_filter and table_filter.lower() not in collection_name.lower():
+ continue
+
+ try:
+ full_table_name = f"{collection_name}"
+ collection = self.db[collection_name]
+
+ # Get row count
+ row_count = collection.count_documents({})
+
+ # Get sample data
+ sample_data = list(collection.find().limit(10))
+
+ if sample_data:
+ df = self._process_documents(sample_data)
+
+ # Construct column information
+ columns = [{
+ 'name': col,
+ 'type': str(df[col].dtype)
+ } for col in df.columns]
+
+ # Convert sample_data for return
+ sample_rows = json.loads(df.to_json(orient="records"))
+ else:
+ columns = []
+ sample_rows = []
+
+ table_metadata = {
+ "row_count": row_count,
+ "columns": columns,
+ "sample_rows": sample_rows
+ }
+
+ results.append({
+ "name": full_table_name,
+ "metadata": table_metadata
+ })
+ except Exception as e:
+ continue
+
+ return results
+
+ def ingest_data(self, table_name: str, name_as: Optional[str] = None, size: int = 100000):
+ """
+ Import MongoDB collection data into DuckDB
+ """
+ # Extract collection name from full table name
+ parts = table_name.split('.')
+ if len(parts) >= 3:
+ collection_name = parts[-1]
+ else:
+ collection_name = table_name
+
+ if name_as is None:
+ name_as = collection_name
+
+ # Get and process data from MongoDB (limit rows)
+ collection = self.db[collection_name]
+ data_cursor = collection.find().limit(size)
+ data_list = list(data_cursor)
+ if not data_list:
+ raise Exception(f"No data found in MongoDB collection '{collection_name}'.")
+ df = self._process_documents(data_list)
+
+ name_as = sanitize_table_name(name_as)
+
+ self._load_dataframe_to_duckdb(df, name_as, size)
+ return
+
+
+ def view_query_sample(self, query: str) -> List[Dict[str, Any]]:
+
+ self._existed_collections_in_duckdb()
+ self._difference_collections()
+ self._preload_all_collections(self.collection.name if self.collection else "")
+
+ result, error_message = validate_sql_query(query)
+ if not result:
+ print(error_message)
+ raise ValueError(error_message)
+
+ result_query = json.loads(self.duck_db_conn.execute(query).df().head(10).to_json(orient="records"))
+
+ self._drop_all_loaded_tables()
+
+ for collection_name, df in self.existed_collections.items():
+ self._load_dataframe_to_duckdb(df, collection_name)
+
+ return result_query
+
+ def ingest_data_from_query(self, query: str, name_as: str) -> pd.DataFrame:
+ """
+ Create a new table from query results
+ """
+ result, error_message = validate_sql_query(query)
+ if not result:
+ raise ValueError(error_message)
+
+ name_as = sanitize_table_name(name_as)
+
+ self._existed_collections_in_duckdb()
+ self._difference_collections()
+ self._preload_all_collections(self.collection.name if self.collection else "")
+
+ query_result_df = self.duck_db_conn.execute(query).df()
+
+ self._drop_all_loaded_tables()
+
+ for collection_name, existing_df in self.existed_collections.items():
+ self._load_dataframe_to_duckdb(existing_df, collection_name)
+
+ self._load_dataframe_to_duckdb(query_result_df, name_as)
+
+ return query_result_df
+
+ @staticmethod
+ def _quote_identifier(name: str) -> str:
+ """
+ Safely quote a SQL identifier to prevent SQL injection.
+ Double quotes are escaped by doubling them.
+ """
+ # Escape any double quotes in the identifier by doubling them
+ escaped = name.replace('"', '""')
+ return f'"{escaped}"'
+
+ def _existed_collections_in_duckdb(self):
+ """
+ Return the names and contents of tables already loaded into DuckDB
+ """
+ self.existed_collections = {}
+ duckdb_tables = self.duck_db_conn.execute("SHOW TABLES").df()
+ for _, row in duckdb_tables.iterrows():
+ collection_name = row['name']
+ quoted_name = self._quote_identifier(collection_name)
+ df = self.duck_db_conn.execute(f"SELECT * FROM {quoted_name}").df()
+ self.existed_collections[collection_name] = df
+
+
+ def _difference_collections(self):
+ """
+ Return the difference between all collections and loaded collections
+ """
+ self.diff_collections = []
+ all_collections = set(self.db.list_collection_names())
+ loaded_collections = set(self.existed_collections)
+ diff_collections = all_collections - loaded_collections
+ self.diff_collections = list(diff_collections)
+ print(f'Difference collections: {self.diff_collections}')
+
+ def _drop_all_loaded_tables(self):
+ """
+ Drop all tables loaded into DuckDB
+ """
+ for table_name in self.loaded_tables.values():
+ try:
+ quoted_name = self._quote_identifier(table_name)
+ self.duck_db_conn.execute(f"DROP TABLE IF EXISTS main.{quoted_name}")
+ print(f"Dropped loaded table: {table_name}")
+ except Exception as e:
+ print(f"Warning: Failed to drop table '{table_name}': {e}")
+
+ def _preload_all_collections(self, specified_collection: str = "", size: int = 100000):
+ """
+ Preload all MongoDB collections into DuckDB memory
+ """
+ # Get the list of collections to load
+ if specified_collection:
+ collection_names = [specified_collection]
+ else:
+ collection_names = self.db.list_collection_names()
+
+ # Record loaded tables
+ self.loaded_tables = {}
+
+ for collection_name in collection_names:
+ try:
+ collection = self.db[collection_name]
+
+ # Get data
+ data_cursor = collection.find().limit(size)
+ data_list = list(data_cursor)
+
+ if not data_list:
+ print(f"Skipping empty collection: {collection_name}")
+ continue
+
+ df = self._process_documents(data_list)
+
+ # Generate table name
+ table_name = sanitize_table_name(collection_name)
+
+ # Load into DuckDB
+ self._load_dataframe_to_duckdb(df, table_name)
+
+ # Record mapping
+ self.loaded_tables[collection_name] = table_name
+ print(f"Preloaded collection '{collection_name}' as table '{table_name}' ({len(data_list)} rows)")
+
+ except Exception as e:
+ print(f"Warning: Failed to preload collection '{collection_name}': {e}")
+
+ def _load_dataframe_to_duckdb(self, df: pd.DataFrame, table_name: str, size: int = 1000000):
+ """
+ Load DataFrame into DuckDB
+ """
+ # Create table using a temporary view
+ random_suffix = ''.join(rand.choices(string.ascii_letters + string.digits, k=6))
+ temp_view_name = f'df_temp_{random_suffix}'
+
+ self.duck_db_conn.register(temp_view_name, df)
+ # Use CREATE OR REPLACE to directly replace existing table
+ # Quote identifiers to prevent SQL injection
+ quoted_table_name = self._quote_identifier(table_name)
+ quoted_temp_view = self._quote_identifier(temp_view_name)
+ # Ensure size is an integer to prevent injection via size parameter
+ safe_size = int(size)
+ self.duck_db_conn.execute(f"CREATE OR REPLACE TABLE main.{quoted_table_name} AS SELECT * FROM {quoted_temp_view} LIMIT {safe_size}")
+ self.duck_db_conn.execute(f"DROP VIEW {quoted_temp_view}")
\ No newline at end of file
diff --git a/py-src/data_formulator/data_loader/mssql_data_loader.py b/py-src/data_formulator/data_loader/mssql_data_loader.py
index 130f78c8..eeb8c4f7 100644
--- a/py-src/data_formulator/data_loader/mssql_data_loader.py
+++ b/py-src/data_formulator/data_loader/mssql_data_loader.py
@@ -1,6 +1,6 @@
import json
import logging
-from typing import Dict, Any, Optional
+from typing import Dict, Any, Optional, List
import duckdb
import pandas as pd
@@ -148,17 +148,12 @@ def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnecti
log.info("Initializing MSSQL DataLoader with parameters: %s", params)
if not PYODBC_AVAILABLE:
- error_msg = """
-pyodbc is required for MSSQL connections but is not properly installed.
-
-Installation steps for macOS:
-1. Install unixodbc: brew install unixodbc
-2. Install pyodbc: pip install pyodbc
-3. Install Microsoft ODBC Driver for SQL Server
-
-For other platforms, see: https://github.com/mkleehammer/pyodbc/wiki
-"""
- raise ImportError(error_msg.strip())
+ raise ImportError(
+ "pyodbc is required for MSSQL connections. "
+ "Install with: pip install pyodbc\n"
+ "Note for macOS: You may also need to run 'brew install unixodbc' first.\n"
+ "For other platforms, see: https://github.com/mkleehammer/pyodbc/wiki"
+ )
self.params = params
self.duck_db_conn = duck_db_conn
@@ -418,7 +413,7 @@ def ingest_data(self, table_name: str, name_as: Optional[str] = None, size: int
log.error(f"Failed to ingest data from {table_name}: {e}")
raise
- def view_query_sample(self, query: str) -> str:
+ def view_query_sample(self, query: str) -> List[Dict[str, Any]]:
"""Execute a custom query and return sample results"""
try:
# Add TOP 10 if not already present for SELECT queries
diff --git a/py-src/data_formulator/data_loader/mysql_data_loader.py b/py-src/data_formulator/data_loader/mysql_data_loader.py
index 184eb0f5..9e29f3b7 100644
--- a/py-src/data_formulator/data_loader/mysql_data_loader.py
+++ b/py-src/data_formulator/data_loader/mysql_data_loader.py
@@ -1,4 +1,5 @@
import json
+import logging
import pandas as pd
import duckdb
@@ -6,13 +7,21 @@
from data_formulator.data_loader.external_data_loader import ExternalDataLoader, sanitize_table_name
from data_formulator.security import validate_sql_query
-from typing import Dict, Any, Optional
+from typing import Dict, Any, Optional, List
+
+try:
+ import pymysql
+ PYMYSQL_AVAILABLE = True
+except ImportError:
+ PYMYSQL_AVAILABLE = False
+
+logger = logging.getLogger(__name__)
class MySQLDataLoader(ExternalDataLoader):
@staticmethod
- def list_params() -> bool:
+ def list_params() -> List[Dict[str, Any]]:
params_list = [
{"name": "user", "type": "string", "required": True, "default": "root", "description": ""},
{"name": "password", "type": "string", "required": False, "default": "", "description": "leave blank for no password"},
@@ -50,96 +59,245 @@ def auth_instructions() -> str:
"""
def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnection):
+ if not PYMYSQL_AVAILABLE:
+ raise ImportError(
+ "pymysql is required for MySQL connections. "
+ "Install with: pip install pymysql"
+ )
+
self.params = params
self.duck_db_conn = duck_db_conn
- # Install and load the MySQL extension
- self.duck_db_conn.install_extension("mysql")
- self.duck_db_conn.load_extension("mysql")
+ # Get params as-is from frontend
+ host = self.params.get('host', '')
+ user = self.params.get('user', '')
+ password = self.params.get('password', '')
+ database = self.params.get('database', '')
+
+ # Validate required params
+ if not host:
+ raise ValueError("MySQL host is required")
+ if not user:
+ raise ValueError("MySQL user is required")
+ if not database:
+ raise ValueError("MySQL database is required")
+
+ # Handle port (only field with sensible default)
+ port = self.params.get('port', '')
+ if isinstance(port, str):
+ port = int(port) if port else 3306
+ elif not port:
+ port = 3306
- attach_string = ""
- for key, value in self.params.items():
- if value is not None and value != "":
- attach_string += f"{key}={value} "
-
- # Detach existing mysqldb connection if it exists
try:
- self.duck_db_conn.execute("DETACH mysqldb;")
- except:
- pass # Ignore if mysqldb doesn't exist
+ self.mysql_conn = pymysql.connect(
+ host=host,
+ user=user,
+ password=password,
+ database=database,
+ port=port,
+ cursorclass=pymysql.cursors.DictCursor,
+ charset='utf8mb4'
+ )
+ self.database = database
+ logger.info(f"Successfully connected to MySQL database: {self.database}")
+ except Exception as e:
+ logger.error(f"Failed to connect to MySQL: {e}")
+ raise
+
+ def _execute_query(self, query: str, params: tuple = None) -> pd.DataFrame:
+ """Execute a query using native MySQL connection and return a DataFrame.
- # Register MySQL connection
- self.duck_db_conn.execute(f"ATTACH '{attach_string}' AS mysqldb (TYPE mysql);")
+ Args:
+ query: SQL query string. Use %s for parameterized queries.
+ params: Optional tuple of parameters for parameterized queries.
+ """
+ try:
+ with self.mysql_conn.cursor() as cursor:
+ cursor.execute(query, params)
+ rows = cursor.fetchall()
+ if rows:
+ return pd.DataFrame(rows)
+ else:
+ # Return empty DataFrame with column names
+ return pd.DataFrame()
+ except Exception as e:
+ logger.error(f"Error executing MySQL query: {e}")
+ # Try to reconnect if connection was lost
+ self._reconnect_if_needed()
+ raise
- def list_tables(self, table_filter: str = None):
- tables_df = self.duck_db_conn.execute(f"""
- SELECT TABLE_SCHEMA, TABLE_NAME FROM mysqldb.information_schema.tables
- WHERE table_schema NOT IN ('information_schema', 'mysql', 'performance_schema', 'sys')
- """).fetch_df()
+ def _reconnect_if_needed(self):
+ """Attempt to reconnect to MySQL if the connection was lost."""
+ try:
+ self.mysql_conn.ping(reconnect=True)
+ except Exception as e:
+ logger.warning(f"Reconnection attempt failed: {e}")
+ # Try to create a new connection using stored params
+ host = self.params.get('host', '')
+ user = self.params.get('user', '')
+ password = self.params.get('password', '')
+
+ port = self.params.get('port', '')
+ if isinstance(port, str):
+ port = int(port) if port else 3306
+ elif not port:
+ port = 3306
+
+ self.mysql_conn = pymysql.connect(
+ host=host,
+ user=user,
+ password=password,
+ database=self.database,
+ port=port,
+ cursorclass=pymysql.cursors.DictCursor,
+ charset='utf8mb4'
+ )
- results = []
+ def list_tables(self, table_filter: str = None) -> List[Dict[str, Any]]:
+ # Get list of tables from the connected database
+ # Filter by the specific database we're connected to for better performance
+ tables_query = """
+ SELECT TABLE_SCHEMA, TABLE_NAME
+ FROM information_schema.tables
+ WHERE TABLE_SCHEMA = %s
+ AND TABLE_TYPE = 'BASE TABLE'
+ """
+ tables_df = self._execute_query(tables_query, (self.database,))
- for schema, table_name in tables_df.values:
+ if tables_df.empty:
+ return []
- full_table_name = f"mysqldb.{schema}.{table_name}"
+ results = []
+
+ for _, row in tables_df.iterrows():
+ schema = row['TABLE_SCHEMA']
+ table_name = row['TABLE_NAME']
# Apply table filter if provided
if table_filter and table_filter.lower() not in table_name.lower():
continue
- # Get column information using DuckDB's information schema
- columns_df = self.duck_db_conn.execute(f"DESCRIBE {full_table_name}").df()
- columns = [{
- 'name': row['column_name'],
- 'type': row['column_type']
- } for _, row in columns_df.iterrows()]
-
- # Get sample data
- sample_df = self.duck_db_conn.execute(f"SELECT * FROM {full_table_name} LIMIT 10").df()
- sample_rows = json.loads(sample_df.to_json(orient="records"))
-
- # get row count
- row_count = self.duck_db_conn.execute(f"SELECT COUNT(*) FROM {full_table_name}").fetchone()[0]
-
- table_metadata = {
- "row_count": row_count,
- "columns": columns,
- "sample_rows": sample_rows
- }
-
- results.append({
- "name": full_table_name,
- "metadata": table_metadata
- })
+ full_table_name = f"{schema}.{table_name}"
+
+ try:
+ # Get column information from MySQL
+ columns_query = (
+ "SELECT COLUMN_NAME, DATA_TYPE "
+ "FROM information_schema.columns "
+ "WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s "
+ "ORDER BY ORDINAL_POSITION"
+ )
+ columns_df = self._execute_query(columns_query, (schema, table_name))
+ columns = [{
+ 'name': col_row['COLUMN_NAME'],
+ 'type': col_row['DATA_TYPE']
+ } for _, col_row in columns_df.iterrows()]
+
+ # Get sample data
+ sample_query = "SELECT * FROM `{}`.`{}` LIMIT 10".format(schema, table_name)
+ sample_df = self._execute_query(sample_query)
+ sample_rows = json.loads(sample_df.to_json(orient="records", date_format='iso'))
+
+ # Get row count
+ count_query = "SELECT COUNT(*) as cnt FROM `{}`.`{}`".format(schema, table_name)
+ count_df = self._execute_query(count_query)
+ row_count = int(count_df['cnt'].iloc[0]) if not count_df.empty else 0
+
+ table_metadata = {
+ "row_count": row_count,
+ "columns": columns,
+ "sample_rows": sample_rows
+ }
+
+ results.append({
+ "name": full_table_name,
+ "metadata": table_metadata
+ })
+ except Exception as e:
+ logger.warning(f"Error processing table {full_table_name}: {e}")
+ continue
return results
def ingest_data(self, table_name: str, name_as: Optional[str] = None, size: int = 1000000):
- # Create table in the main DuckDB database from MySQL data
+ """Fetch data from MySQL and ingest into DuckDB."""
if name_as is None:
name_as = table_name.split('.')[-1]
name_as = sanitize_table_name(name_as)
- self.duck_db_conn.execute(f"""
- CREATE OR REPLACE TABLE main.{name_as} AS
- SELECT * FROM {table_name}
- LIMIT {size}
- """)
+ # Validate and sanitize table name components
+ sanitized_size = None
+ try:
+ sanitized_size = int(size)
+ if sanitized_size <= 0:
+ raise ValueError("Size must be a positive integer.")
+ except Exception:
+ raise ValueError("Size parameter must be a positive integer.")
+
+ if '.' in table_name:
+ parts = table_name.split('.')
+ schema = sanitize_table_name(parts[0])
+ tbl = sanitize_table_name(parts[1])
+ query = f"SELECT * FROM `{schema}`.`{tbl}` LIMIT {sanitized_size}"
+ else:
+ sanitized_table_name = sanitize_table_name(table_name)
+ query = f"SELECT * FROM `{sanitized_table_name}` LIMIT {sanitized_size}"
- def view_query_sample(self, query: str) -> str:
+ # Fetch data from MySQL
+ df = self._execute_query(query)
+
+ if df.empty:
+ logger.warning(f"No data fetched from table {table_name}")
+ return
+
+ # Ingest into DuckDB using the base class method
+ self.ingest_df_to_duckdb(df, name_as)
+ logger.info(f"Successfully ingested {len(df)} rows from {table_name} into DuckDB table {name_as}")
+
+ def view_query_sample(self, query: str) -> List[Dict[str, Any]]:
result, error_message = validate_sql_query(query)
if not result:
raise ValueError(error_message)
- return json.loads(self.duck_db_conn.execute(query).df().head(10).to_json(orient="records"))
+ # Execute query via native MySQL connection
+ df = self._execute_query(query)
+ return json.loads(df.head(10).to_json(orient="records", date_format='iso'))
def ingest_data_from_query(self, query: str, name_as: str) -> pd.DataFrame:
- # Execute the query and get results as a DataFrame
+ """Execute custom query and ingest results into DuckDB."""
result, error_message = validate_sql_query(query)
if not result:
raise ValueError(error_message)
- df = self.duck_db_conn.execute(query).df()
- # Use the base class's method to ingest the DataFrame
- self.ingest_df_to_duckdb(df, sanitize_table_name(name_as))
\ No newline at end of file
+ # Execute query via native MySQL connection
+ df = self._execute_query(query)
+
+ # Ingest into DuckDB using the base class method
+ self.ingest_df_to_duckdb(df, sanitize_table_name(name_as))
+ return df
+
+ def close(self):
+ """Explicitly close the MySQL connection."""
+ if hasattr(self, 'mysql_conn') and self.mysql_conn:
+ try:
+ self.mysql_conn.close()
+ except Exception as e:
+ logger.warning(f"Error closing MySQL connection: {e}")
+
+ def __enter__(self):
+ """Support context manager entry."""
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """Support context manager exit and cleanup."""
+ self.close()
+
+ def __del__(self):
+ """Clean up MySQL connection when the loader is destroyed."""
+ try:
+ self.close()
+ except Exception:
+ # Ignore errors during destruction to prevent exceptions in garbage collection
+ pass
\ No newline at end of file
diff --git a/py-src/data_formulator/data_loader/postgresql_data_loader.py b/py-src/data_formulator/data_loader/postgresql_data_loader.py
index e35671ee..b0453763 100644
--- a/py-src/data_formulator/data_loader/postgresql_data_loader.py
+++ b/py-src/data_formulator/data_loader/postgresql_data_loader.py
@@ -29,15 +29,33 @@ def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnecti
self.params = params
self.duck_db_conn = duck_db_conn
+ # Get params as-is from frontend
+ host = self.params.get('host', '')
+ port = self.params.get('port', '') or '5432' # Only port has a sensible default
+ user = self.params.get('user', '')
+ database = self.params.get('database', '')
+ password = self.params.get('password', '')
+
+ # Validate required params
+ if not host:
+ raise ValueError("PostgreSQL host is required")
+ if not user:
+ raise ValueError("PostgreSQL user is required")
+ if not database:
+ raise ValueError("PostgreSQL database is required")
+
+ # Create a sanitized version for logging (excludes password)
+ sanitized_attach_string = f"host={host} port={port} user={user} dbname={database}"
+
try:
# Install and load the Postgres extension
self.duck_db_conn.install_extension("postgres")
self.duck_db_conn.load_extension("postgres")
# Prepare the connection string for Postgres
- port = self.params.get('port', '5432')
- password_part = f" password={self.params.get('password', '')}" if self.params.get('password') else ""
- attach_string = f"host={self.params['host']} port={port} user={self.params['user']}{password_part} dbname={self.params['database']}"
+ # Note: attach_string contains sensitive credentials - do not log it
+ password_part = f" password={password}" if password else ""
+ attach_string = f"host={host} port={port} user={user}{password_part} dbname={database}"
# Detach existing postgres connection if it exists
try:
@@ -47,11 +65,13 @@ def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnecti
# Register Postgres connection
self.duck_db_conn.execute(f"ATTACH '{attach_string}' AS mypostgresdb (TYPE postgres);")
- print(f"Successfully connected to PostgreSQL database: {self.params['database']}")
+ print(f"Successfully connected to PostgreSQL database: {database}")
except Exception as e:
- print(f"Failed to connect to PostgreSQL: {e}")
- raise
+ # Log error with sanitized connection string to avoid exposing password
+ error_type = type(e).__name__
+ print(f"Failed to connect to PostgreSQL ({sanitized_attach_string}): {error_type}")
+ raise ValueError(f"Failed to connect to PostgreSQL database '{database}' on host '{host}': {error_type}")
def list_tables(self):
try:
@@ -123,7 +143,7 @@ def ingest_data(self, table_name: str, name_as: Optional[str] = None, size: int
LIMIT {size}
""")
- def view_query_sample(self, query: str) -> str:
+ def view_query_sample(self, query: str) -> List[Dict[str, Any]]:
result, error_message = validate_sql_query(query)
if not result:
raise ValueError(error_message)
diff --git a/py-src/data_formulator/data_loader/s3_data_loader.py b/py-src/data_formulator/data_loader/s3_data_loader.py
index d00eb71a..074172ad 100644
--- a/py-src/data_formulator/data_loader/s3_data_loader.py
+++ b/py-src/data_formulator/data_loader/s3_data_loader.py
@@ -7,6 +7,12 @@
from typing import Dict, Any, List
from data_formulator.security import validate_sql_query
+try:
+ import boto3
+ BOTO3_AVAILABLE = True
+except ImportError:
+ BOTO3_AVAILABLE = False
+
class S3DataLoader(ExternalDataLoader):
@staticmethod
@@ -58,6 +64,12 @@ def auth_instructions() -> str:
"""
def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnection):
+ if not BOTO3_AVAILABLE:
+ raise ImportError(
+ "boto3 is required for S3 connections. "
+ "Install with: pip install boto3"
+ )
+
self.params = params
self.duck_db_conn = duck_db_conn
@@ -202,7 +214,7 @@ def view_query_sample(self, query: str) -> List[Dict[str, Any]]:
if not result:
raise ValueError(error_message)
- return self.duck_db_conn.execute(query).df().head(10).to_dict(orient="records")
+ return json.loads(self.duck_db_conn.execute(query).df().head(10).to_json(orient="records"))
def ingest_data_from_query(self, query: str, name_as: str):
# Execute the query and get results as a DataFrame
diff --git a/py-src/data_formulator/workflows/exploration_flow.py b/py-src/data_formulator/workflows/exploration_flow.py
index de3554b1..dc241a8b 100644
--- a/py-src/data_formulator/workflows/exploration_flow.py
+++ b/py-src/data_formulator/workflows/exploration_flow.py
@@ -8,7 +8,6 @@
from typing import Dict, List, Any, Optional, Tuple, Generator
from data_formulator.agents.agent_exploration import ExplorationAgent
-from data_formulator.agents.agent_interactive_explore import InteractiveExploreAgent
from data_formulator.agents.agent_py_data_rec import PythonDataRecAgent
from data_formulator.agents.agent_sql_data_rec import SQLDataRecAgent
from data_formulator.agents.client_utils import Client
diff --git a/pyproject.toml b/pyproject.toml
index 1454f73d..71e11f8b 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "data_formulator"
-version = "0.5.0.2"
+version = "0.5.1"
requires-python = ">=3.9"
authors = [
@@ -26,21 +26,27 @@ dependencies = [
"flask",
"flask-cors",
"openai",
- "azure-identity",
- "azure-kusto-data",
- "azure-keyvault-secrets",
- "azure-storage-blob",
"python-dotenv",
"vega_datasets",
"litellm",
"duckdb",
- "pyodbc",
"numpy",
"vl-convert-python",
- "boto3",
"backoff",
"beautifulsoup4",
- "scikit-learn"
+ "scikit-learn",
+
+ "azure-identity",
+ "azure-kusto-data",
+ "azure-keyvault-secrets",
+ "azure-storage-blob",
+ "google-cloud-bigquery",
+ "google-auth",
+ "db-dtypes",
+ "boto3",
+ "pymysql",
+ "pyodbc",
+ "pymongo"
]
[project.urls]
diff --git a/requirements.txt b/requirements.txt
index 17adffe4..45ad17e8 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,21 +1,31 @@
+# Core dependencies (always required)
jupyter
pandas
numpy
flask
flask-cors
openai
-azure-identity
-azure-kusto-data
-azure-keyvault-secrets
-azure-storage-blob
python-dotenv
vega_datasets
litellm
duckdb
-boto3
-pyodbc
vl-convert-python
backoff
beautifulsoup4
scikit-learn
--e . #also need to install data formulator itself
\ No newline at end of file
+
+# External data loaders (Azure, BigQuery, AWS S3, MySQL, MSSQL)
+azure-identity
+azure-kusto-data
+azure-keyvault-secrets
+azure-storage-blob
+google-cloud-bigquery
+google-auth
+db-dtypes
+boto3
+pymysql
+pyodbc
+pymongo
+
+# Install data_formulator itself in editable mode
+-e .
\ No newline at end of file
diff --git a/src/app/App.tsx b/src/app/App.tsx
index de2864ef..fdf0396d 100644
--- a/src/app/App.tsx
+++ b/src/app/App.tsx
@@ -169,7 +169,7 @@ export const ExportStateButton: React.FC<{}> = ({ }) => {
// Fields to exclude from serialization
const excludedFields = new Set([
'models',
- 'modelSlots',
+ 'selectedModelId',
'testedModels',
'dataLoaderConnectParams',
'sessionId',
@@ -221,7 +221,19 @@ export interface AppFCProps {
// Extract menu components into separate components to prevent full app re-renders
const TableMenu: React.FC = () => {
const [anchorEl, setAnchorEl] = useState(null);
+ const [openDialog, setOpenDialog] = useState<'database' | 'extract' | 'paste' | 'upload' | null>(null);
+ const fileInputRef = React.useRef(null);
const open = Boolean(anchorEl);
+
+ const handleOpenDialog = (dialog: 'database' | 'extract' | 'paste' | 'upload') => {
+ setAnchorEl(null);
+ if (dialog === 'upload') {
+ // For file upload, trigger the hidden file input
+ fileInputRef.current?.click();
+ } else {
+ setOpenDialog(dialog);
+ }
+ };
return (
<>
@@ -246,40 +258,48 @@ const TableMenu: React.FC = () => {
}}
aria-labelledby="add-table-button"
sx={{
- '& .MuiMenuItem-root': { padding: 0, margin: 0 } ,
- '& .MuiTypography-root': { fontSize: 14, display: 'flex', alignItems: 'center', textTransform: 'none',gap: 1 }
+ '& .MuiMenuItem-root': { padding: '4px 8px' },
+ '& .MuiTypography-root': { fontSize: 14, display: 'flex', alignItems: 'center', textTransform: 'none', gap: 1 }
}}
>
-