From 94595b390b7d5e69be55430b913807915a92c556 Mon Sep 17 00:00:00 2001 From: Chenglong Wang Date: Wed, 30 Apr 2025 17:53:24 -0700 Subject: [PATCH 01/12] experiment new data loading protocal --- .env.template | 14 +- py-src/data_formulator/app.py | 1 + .../data_loader/external_data_loader.py | 51 +++ .../data_loader/kusto_data_loader.py | 140 +++++++ .../data_loader/mysql_data_loader.py | 91 +++++ py-src/data_formulator/db_manager.py | 41 +-- py-src/data_formulator/tables_routes.py | 208 ++++++----- src/app/utils.tsx | 5 +- src/views/DBTableManager.tsx | 341 +++++++++++++++--- 9 files changed, 673 insertions(+), 219 deletions(-) create mode 100644 py-src/data_formulator/data_loader/external_data_loader.py create mode 100644 py-src/data_formulator/data_loader/kusto_data_loader.py create mode 100644 py-src/data_formulator/data_loader/mysql_data_loader.py diff --git a/.env.template b/.env.template index ffce64dc..2405af77 100644 --- a/.env.template +++ b/.env.template @@ -5,16 +5,4 @@ DISABLE_DISPLAY_KEYS=false # if true, the display keys will not be shown in the frontend EXEC_PYTHON_IN_SUBPROCESS=false # if true, the python code will be executed in a subprocess to avoid crashing the main app, but it will increase the time of response -LOCAL_DB_DIR= # the directory to store the local database, if not provided, the app will use the temp directory - -# External atabase connection settings -# check https://duckdb.org/docs/stable/extensions/mysql.html -# and https://duckdb.org/docs/stable/extensions/postgres.html -USE_EXTERNAL_DB=false # if true, the app will use an external database instead of the one in the app -DB_NAME=mysql_db # the name to refer to this database connection -DB_TYPE=mysql # mysql or postgresql -DB_HOST=localhost -DB_PORT=0 -DB_DATABASE=mysql -DB_USER=root -DB_PASSWORD= \ No newline at end of file +LOCAL_DB_DIR= # the directory to store the local database, if not provided, the app will use the temp directory \ No newline at end of file diff --git a/py-src/data_formulator/app.py b/py-src/data_formulator/app.py index c2950612..edc8f55f 100644 --- a/py-src/data_formulator/app.py +++ b/py-src/data_formulator/app.py @@ -37,6 +37,7 @@ from data_formulator.tables_routes import tables_bp from data_formulator.agent_routes import agent_bp + app = Flask(__name__, static_url_path='', static_folder=os.path.join(APP_ROOT, "dist")) app.secret_key = secrets.token_hex(16) # Generate a random secret key for sessions diff --git a/py-src/data_formulator/data_loader/external_data_loader.py b/py-src/data_formulator/data_loader/external_data_loader.py new file mode 100644 index 00000000..ea354b0b --- /dev/null +++ b/py-src/data_formulator/data_loader/external_data_loader.py @@ -0,0 +1,51 @@ +from abc import ABC, abstractmethod +from typing import Dict, Any, List +import pandas as pd +import json +import duckdb +import random +import string + +class ExternalDataLoader(ABC): + + def ingest_df_to_duckdb(self, df: pd.DataFrame, table_name: str): + + base_name = table_name + counter = 1 + while True: + # Check if table exists + exists = self.duck_db_conn.execute(f"SELECT COUNT(*) FROM duckdb_tables() WHERE table_name = '{table_name}'").fetchone()[0] > 0 + if not exists: + break + # If exists, append counter to base name + table_name = f"{base_name}_{counter}" + counter += 1 + + # Create table + random_suffix = ''.join(random.choices(string.ascii_letters + string.digits, k=6)) + self.duck_db_conn.register(f'df_temp_{random_suffix}', df) + self.duck_db_conn.execute(f"CREATE TABLE {table_name} AS SELECT * FROM df_temp_{random_suffix}") + self.duck_db_conn.execute(f"DROP VIEW df_temp_{random_suffix}") # Drop the temporary view after creating the table + + @staticmethod + @abstractmethod + def list_params() -> List[Dict[str, Any]]: + pass + + @abstractmethod + def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnection): + pass + + @abstractmethod + def list_tables(self) -> List[Dict[str, Any]]: + # should include: table_name, column_names, column_types, sample_data + pass + + @abstractmethod + def ingest_data(self, table_name: str, name_as: str = None, size: int = 1000000): + pass + + @abstractmethod + def ingest_data_from_query(self, query: str, name_as: str): + pass + diff --git a/py-src/data_formulator/data_loader/kusto_data_loader.py b/py-src/data_formulator/data_loader/kusto_data_loader.py new file mode 100644 index 00000000..d32bc8c6 --- /dev/null +++ b/py-src/data_formulator/data_loader/kusto_data_loader.py @@ -0,0 +1,140 @@ +from typing import Dict, Any, List +import pandas as pd +import json +import duckdb +import random +import string + +from azure.kusto.data import KustoClient, KustoConnectionStringBuilder +from azure.kusto.data.helpers import dataframe_from_result_table + +from data_formulator.data_loader.external_data_loader import ExternalDataLoader + +def sanitize_table_name(table_name: str) -> str: + return table_name.replace(".", "_").replace("-", "_") + +class KustoDataLoader(ExternalDataLoader): + + @staticmethod + def list_params() -> bool: + params_list = [ + {"name": "kusto_cluster", "type": "string", "required": True}, + {"name": "kusto_database", "type": "string", "required": True}, + {"name": "client_id", "type": "string", "required": False}, + {"name": "client_secret", "type": "string", "required": False}, + {"name": "tenant_id", "type": "string", "required": False} + ] + return params_list + + def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnection): + + self.kusto_cluster = params.get("kusto_cluster", None) + self.kusto_database = params.get("kusto_database", None) + + self.client_id = params.get("client_id", None) + self.client_secret = params.get("client_secret", None) + self.tenant_id = params.get("tenant_id", None) + + try: + if self.client_id and self.client_secret and self.tenant_id: + # This function provides an interface to Kusto. It uses AAD application key authentication. + self.client = KustoClient(KustoConnectionStringBuilder.with_aad_application_key_authentication( + self.kusto_cluster, self.client_id, self.client_secret, self.tenant_id)) + else: + # This function provides an interface to Kusto. It uses Azure CLI auth, but you can also use other auth types. + self.client = KustoClient(KustoConnectionStringBuilder.with_az_cli_authentication(self.kusto_cluster)) + except Exception as e: + raise Exception(f"Error creating Kusto client: {e}, please authenticate with Azure CLI when starting the app.") + + self.duck_db_conn = duck_db_conn + + def query(self, kql: str) -> pd.DataFrame: + result = self.client.execute(self.kusto_database, kql) + return dataframe_from_result_table(result.primary_results[0]) + + def list_tables(self) -> List[Dict[str, Any]]: + query = ".show tables" + tables_df = self.query(query) + + results = [] + for table in tables_df.to_dict(orient="records"): + table_name = table['TableName'] + schema_result = self.query(f".show table ['{table_name}'] schema as json").to_dict(orient="records") + columns = [{ + 'name': r["Name"], + 'type': r["Type"] + } for r in json.loads(schema_result[0]['Schema'])['OrderedColumns']] + + row_count_result = self.query(f".show table ['{table_name}'] details").to_dict(orient="records") + row_count = row_count_result[0]["TotalRowCount"] + + sample_query = f"['{table_name}'] | take {10}" + sample_result = self.query(sample_query).to_dict(orient="records") + + table_metadata = { + "row_count": row_count, + "columns": columns, + "sample_rows": sample_result + } + + results.append({ + "name": table_name, + "metadata": table_metadata + }) + + return results + + def ingest_data(self, table_name: str, name_as: str = None, size: int = 5000000) -> pd.DataFrame: + if name_as is None: + name_as = table_name + + # Create a subquery that applies random ordering once with a fixed seed + total_rows_ingested = 0 + first_chunk = True + chunk_size = 100000 + + size_estimate_query = f"['{table_name}'] | take {10000} | summarize Total=sum(estimate_data_size(*))" + size_estimate_result = self.query(size_estimate_query) + size_estimate = size_estimate_result['Total'].values[0] + print(f"size_estimate: {size_estimate}") + + chunk_size = min(64 * 1024 * 1024 / size_estimate * 0.9 * 10000, 5000000) + print(f"estimated_chunk_size: {chunk_size}") + + while total_rows_ingested < size: + try: + query = f"['{table_name}'] | serialize | extend rn=row_number() | where rn >= {total_rows_ingested} and rn < {total_rows_ingested + chunk_size} | project-away rn" + chunk_df = self.query(query) + except Exception as e: + chunk_size = int(chunk_size * 0.8) + continue + + print(f"total_rows_ingested: {total_rows_ingested}") + print(chunk_df.head()) + + # Stop if no more data + if chunk_df.empty: + break + + # Sanitize the table name for SQL compatibility + name_as = sanitize_table_name(name_as) + + # For first chunk, create new table; for subsequent chunks, append + if first_chunk: + self.ingest_df_to_duckdb(chunk_df, name_as) + first_chunk = False + else: + # Append to existing table + random_suffix = ''.join(random.choices(string.ascii_letters + string.digits, k=6)) + self.duck_db_conn.register(f'df_temp_{random_suffix}', chunk_df) + self.duck_db_conn.execute(f"INSERT INTO {name_as} SELECT * FROM df_temp_{random_suffix}") + self.duck_db_conn.execute(f"DROP VIEW df_temp_{random_suffix}") + + total_rows_ingested += len(chunk_df) + + + def ingest_data_from_query(self, query: str, name_as: str) -> pd.DataFrame: + # Sanitize the table name for SQL compatibility + name_as = sanitize_table_name(name_as) + df = self.query(query) + self.ingest_df_to_duckdb(df, name_as) \ No newline at end of file diff --git a/py-src/data_formulator/data_loader/mysql_data_loader.py b/py-src/data_formulator/data_loader/mysql_data_loader.py new file mode 100644 index 00000000..c6b86cba --- /dev/null +++ b/py-src/data_formulator/data_loader/mysql_data_loader.py @@ -0,0 +1,91 @@ +import json + +import pandas as pd +import duckdb + +from data_formulator.data_loader.external_data_loader import ExternalDataLoader +from typing import Dict, Any + +class MySQLDataLoader(ExternalDataLoader): + + @staticmethod + def list_params() -> bool: + params_list = [ + {"name": "user", "type": "string", "required": True, "default": "root"}, + {"name": "password", "type": "string", "required": False, "default": ""}, + {"name": "host", "type": "string", "required": True, "default": "localhost"}, + {"name": "database", "type": "string", "required": True, "default": "mysql"} + ] + return params_list + + def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnection): + self.params = params + self.duck_db_conn = duck_db_conn + + # Install and load the MySQL extension + self.duck_db_conn.install_extension("mysql") + self.duck_db_conn.load_extension("mysql") + + attatch_string = "" + for key, value in self.params.items(): + if value: + attatch_string += f"{key}={value} " + + # Register MySQL connection + self.duck_db_conn.execute(f"ATTACH '{attatch_string}' AS mysqldb (TYPE mysql);") + + def list_tables(self): + tables_df = self.duck_db_conn.execute(f""" + SELECT TABLE_SCHEMA, TABLE_NAME FROM mysqldb.information_schema.tables + WHERE table_schema NOT IN ('information_schema', 'mysql', 'performance_schema', 'sys') + """).fetch_df() + + results = [] + + for schema, table_name in tables_df.values: + + full_table_name = f"{schema}.{table_name}" + + # Get column information using DuckDB's information schema + columns_df = self.duck_db_conn.execute(f"DESCRIBE mysqldb.{full_table_name}").df() + columns = [{ + 'name': row['column_name'], + 'type': row['column_type'] + } for _, row in columns_df.iterrows()] + + # Get sample data + sample_df = self.duck_db_conn.execute(f"SELECT * FROM mysqldb.{full_table_name} LIMIT 10").df() + sample_rows = json.loads(sample_df.to_json(orient="records")) + + # get row count + row_count = self.duck_db_conn.execute(f"SELECT COUNT(*) FROM mysqldb.{full_table_name}").fetchone()[0] + + table_metadata = { + "row_count": row_count, + "columns": columns, + "sample_rows": sample_rows + } + + results.append({ + "name": full_table_name, + "metadata": table_metadata + }) + + return results + + def ingest_data(self, table_name: str, name_as: str = None, size: int = 1000000): + # Create table in the main DuckDB database from MySQL data + if name_as is None: + name_as = table_name.split('.')[-1] + + self.duck_db_conn.execute(f""" + CREATE OR REPLACE TABLE {name_as} AS + SELECT * FROM mysqldb.{table_name} + LIMIT {size} + """) + + def ingest_data_from_query(self, query: str, name_as: str) -> pd.DataFrame: + self.duck_db_conn.execute(f""" + CREATE OR REPLACE TABLE main.{name_as} AS + SELECT * FROM ({query}) + """) \ No newline at end of file diff --git a/py-src/data_formulator/db_manager.py b/py-src/data_formulator/db_manager.py index 0f8e4888..bb914d99 100644 --- a/py-src/data_formulator/db_manager.py +++ b/py-src/data_formulator/db_manager.py @@ -9,13 +9,9 @@ from dotenv import load_dotenv class DuckDBManager: - def __init__(self, external_db_connections: Dict[str, Dict[str, Any]], local_db_dir: str): + def __init__(self, local_db_dir: str): # Store session db file paths self._db_files: Dict[str, str] = {} - - # External db connections and tracking of installed extensions - self._external_db_connections: Dict[str, Dict[str, Any]] = external_db_connections - self._installed_extensions: Dict[str, List[str]] = {} self._local_db_dir: str = local_db_dir @contextmanager @@ -26,7 +22,6 @@ def connection(self, session_id: str) -> ContextManager[duckdb.DuckDBPyConnectio conn = self.get_connection(session_id) yield conn finally: - # Close the connection after use if conn: conn.close() @@ -40,8 +35,6 @@ def get_connection(self, session_id: str) -> duckdb.DuckDBPyConnection: db_file = os.path.join(db_dir, f"df_{session_id}.duckdb") print(f"=== Creating new db file: {db_file}") self._db_files[session_id] = db_file - # Initialize extension tracking for this file - self._installed_extensions[db_file] = [] else: print(f"=== Using existing db file: {self._db_files[session_id]}") db_file = self._db_files[session_id] @@ -49,43 +42,11 @@ def get_connection(self, session_id: str) -> duckdb.DuckDBPyConnection: # Create a fresh connection to the database file conn = duckdb.connect(database=db_file) - if self._external_db_connections and self._external_db_connections['db_type'] in ['mysql', 'postgresql']: - db_name = self._external_db_connections['db_name'] - db_type = self._external_db_connections['db_type'] - - print(f"=== connecting to {db_type} extension") - # Only install if not already installed for this db file - if db_type not in self._installed_extensions.get(db_file, []): - conn.execute(f"INSTALL {db_type};") - self._installed_extensions[db_file].append(db_type) - - conn.execute(f"LOAD {db_type};") - conn.execute(f"""CREATE SECRET ( - TYPE {db_type}, - HOST '{self._external_db_connections['host']}', - PORT '{self._external_db_connections['port']}', - DATABASE '{self._external_db_connections['database']}', - USER '{self._external_db_connections['user']}', - PASSWORD '{self._external_db_connections['password']}'); - """) - conn.execute(f"ATTACH '' AS {db_name} (TYPE {db_type});") - # result = conn.execute(f"SELECT * FROM {db_name}.information_schema.tables WHERE table_schema NOT IN ('information_schema', 'mysql', 'performance_schema', 'sys');").fetch_df() - # print(f"=== result: {result}") - return conn env = load_dotenv() # Initialize the DB manager db_manager = DuckDBManager( - external_db_connections={ - "db_name": os.getenv('DB_NAME'), - "db_type": os.getenv('DB_TYPE'), - "host": os.getenv('DB_HOST'), - "port": os.getenv('DB_PORT'), - "database": os.getenv('DB_DATABASE'), - "user": os.getenv('DB_USER'), - "password": os.getenv('DB_PASSWORD') - } if os.getenv('USE_EXTERNAL_DB') == 'true' else None, local_db_dir=os.getenv('LOCAL_DB_DIR') ) \ No newline at end of file diff --git a/py-src/data_formulator/tables_routes.py b/py-src/data_formulator/tables_routes.py index 2beb4549..6ffe3015 100644 --- a/py-src/data_formulator/tables_routes.py +++ b/py-src/data_formulator/tables_routes.py @@ -8,7 +8,7 @@ mimetypes.add_type('application/javascript', '.js') mimetypes.add_type('application/javascript', '.mjs') import json - +import traceback from flask import request, send_from_directory, session, jsonify, Blueprint import pandas as pd import random @@ -16,6 +16,9 @@ from pathlib import Path from data_formulator.db_manager import db_manager +from data_formulator.data_loader.external_data_loader import ExternalDataLoader +from data_formulator.data_loader.mysql_data_loader import MySQLDataLoader +from data_formulator.data_loader.kusto_data_loader import KustoDataLoader import re from typing import Tuple @@ -44,11 +47,11 @@ def list_tables(): table_metadata_list = db.execute(""" SELECT database_name, schema_name, table_name, schema_name==current_schema() as is_current_schema, 'table' as object_type FROM duckdb_tables() - WHERE internal=False + WHERE internal=False AND database_name == current_database() UNION ALL SELECT database_name, schema_name, view_name as table_name, schema_name==current_schema() as is_current_schema, 'view' as object_type FROM duckdb_views() - WHERE view_name NOT LIKE 'duckdb_%' AND view_name NOT LIKE 'sqlite_%' AND view_name NOT LIKE 'pragma_%' + WHERE view_name NOT LIKE 'duckdb_%' AND view_name NOT LIKE 'sqlite_%' AND view_name NOT LIKE 'pragma_%' AND database_name == current_database() """).fetchall() @@ -471,107 +474,6 @@ def upload_db_file(): }), status_code -def validate_db_connection_params(db_type: str, db_host: str, db_port: int, - db_database: str, db_user: str, db_password: str): - """Validate database connection parameters""" - # Validate db_type - valid_db_types = ['postgresql', 'mysql'] - if not db_type or db_type.lower() not in valid_db_types: - raise ValueError(f"Invalid database type. Must be one of: {', '.join(valid_db_types)}") - - # Validate host (basic DNS/IP format check) - if not db_host or not re.match(r'^[a-zA-Z0-9.-]+$', db_host): - raise ValueError("Invalid host format") - - # Validate port - try: - port = int(db_port) - if not (1 <= port <= 65535): - raise ValueError() - except (ValueError, TypeError): - raise ValueError("Port must be a number between 1 and 65535") - - # Validate database name (alphanumeric and underscores only) - if not db_database or not re.match(r'^[a-zA-Z0-9_]+$', db_database): - raise ValueError("Invalid database name format") - - # Validate username (alphanumeric and some special chars) - if not db_user or not re.match(r'^[a-zA-Z0-9@._-]+$', db_user): - raise ValueError("Invalid username format") - - # Validate password exists - if not db_password: - raise ValueError("Password cannot be empty") - -@tables_bp.route('/attach-external-db', methods=['POST']) -def attach_external_db(): - """Attach an external db to the session""" - try: - data = request.get_json() - db_type = data.get('db_type') - db_host = data.get('db_host') - db_port = data.get('db_port') - db_database = data.get('db_database') - db_user = data.get('db_user') - db_password = data.get('db_password') - - # Generate a random suffix for the database name - suffix = ''.join(random.choices(string.ascii_letters + string.digits, k=2)) - db_name = f"{db_type.lower()}_{suffix}" - - if 'session_id' not in session: - return jsonify({"status": "error", "message": "No session ID found"}), 400 - - with db_manager.connection(session['session_id']) as conn: - # Create secret using parameterized query - - # Install and load the extension - if db_type == 'mysql': - conn.install_extension("mysql") - conn.load_extension("mysql") - elif db_type == 'postgresql': - conn.install_extension("postgres") - conn.load_extension("postgres") - - connect_query = f"""CREATE SECRET ( - TYPE {db_type}, - HOST '{db_host}', - PORT '{db_port}', - DATABASE '{db_database}', - USER '{db_user}', - PASSWORD '{db_password}' - );""" - conn.execute(connect_query) - - # Attach the database - conn.execute(f"ATTACH '' AS {db_name} (TYPE {db_type});") - - result = conn.execute(f"SELECT * FROM {db_name}.information_schema.tables WHERE table_schema NOT IN ('information_schema', 'mysql', 'performance_schema', 'sys');").fetch_df() - - print(f"result: {result}") - - result = conn.execute(f"SELECT * FROM {db_name}.sakila.actor LIMIT 10;").fetchdf() - - print(f"result: {result}") - - # Log what we found for debugging - logger.info(f"Found {len(result)} tables: {result}") - - return jsonify({ - "status": "success", - "message": "External database attached successfully", - "result": result - }) - - except Exception as e: - logger.error(f"Error attaching external database: {str(e)}") - safe_msg, status_code = sanitize_db_error_message(e) - return jsonify({ - "status": "error", - "message": safe_msg - }), status_code - - @tables_bp.route('/download-db-file', methods=['GET']) def download_db_file(): """Download the db file for a session""" @@ -810,4 +712,100 @@ def sanitize_db_error_message(error: Exception) -> Tuple[str, int]: logger.error(f"Unexpected error occurred: {error_msg}") # Return a generic error message for unknown errors - return "An unexpected error occurred", 500 \ No newline at end of file + return "An unexpected error occurred", 500 + + + +available_data_loaders = { + 'mysql': MySQLDataLoader, + 'kusto': KustoDataLoader +} + +@tables_bp.route('/data-loader/list-params', methods=['POST']) +def data_loader_list_params(): + """List params for a data loader""" + + try: + data = request.get_json() + data_loader_type = data.get('data_loader_type') + + if data_loader_type not in available_data_loaders: + return jsonify({"status": "error", "message": f"Invalid data loader type. Must be one of: {', '.join(available_data_loaders.keys())}"}), 400 + + data_loader = available_data_loaders[data_loader_type] + + params = data_loader.list_params() + + return jsonify({ + "status": "success", + "params": params + }) + + except Exception as e: + logger.error(f"Error listing params for data loader: {str(e)}") + safe_msg, status_code = sanitize_db_error_message(e) + return jsonify({ + "status": "error", + "message": safe_msg + }), status_code + +@tables_bp.route('/data-loader/list-tables', methods=['POST']) +def data_loader_list_tables(): + """List tables from a data loader""" + + try: + data = request.get_json() + data_loader_type = data.get('data_loader_type') + data_loader_params = data.get('data_loader_params') + + if data_loader_type not in available_data_loaders: + return jsonify({"status": "error", "message": f"Invalid data loader type. Must be one of: {', '.join(available_data_loaders.keys())}"}), 400 + + with db_manager.connection(session['session_id']) as duck_db_conn: + data_loader = available_data_loaders[data_loader_type](data_loader_params, duck_db_conn) + tables = data_loader.list_tables() + + return jsonify({ + "status": "success", + "tables": tables + }) + + except Exception as e: + logger.error(f"Error listing tables from data loader: {str(e)}") + print(traceback.format_exc()) + safe_msg, status_code = sanitize_db_error_message(e) + return jsonify({ + "status": "error", + "message": safe_msg + }), status_code + + +@tables_bp.route('/data-loader/ingest-data', methods=['POST']) +def data_loader_ingest_data(): + """Ingest data from a data loader""" + + try: + data = request.get_json() + data_loader_type = data.get('data_loader_type') + data_loader_params = data.get('data_loader_params') + table_name = data.get('table_name') + + if data_loader_type not in available_data_loaders: + return jsonify({"status": "error", "message": f"Invalid data loader type. Must be one of: {', '.join(available_data_loaders.keys())}"}), 400 + + with db_manager.connection(session['session_id']) as duck_db_conn: + data_loader = available_data_loaders[data_loader_type](data_loader_params, duck_db_conn) + data_loader.ingest_data(table_name) + + return jsonify({ + "status": "success", + "message": "Successfully ingested data from data loader" + }) + + except Exception as e: + logger.error(f"Error ingesting data from data loader: {str(e)}") + safe_msg, status_code = sanitize_db_error_message(e) + return jsonify({ + "status": "error", + "message": safe_msg + }), status_code \ No newline at end of file diff --git a/src/app/utils.tsx b/src/app/utils.tsx index 3059f96e..0fbd9331 100644 --- a/src/app/utils.tsx +++ b/src/app/utils.tsx @@ -57,7 +57,6 @@ export function getUrls() { UPLOAD_DB_FILE: `/api/tables/upload-db-file`, DOWNLOAD_DB_FILE: `/api/tables/download-db-file`, RESET_DB_FILE: `/api/tables/reset-db-file`, - ATTACH_EXTERNAL_DB: `/api/tables/attach-external-db`, LIST_TABLES: `/api/tables/list-tables`, TABLE_DATA: `/api/tables/get-table`, @@ -66,6 +65,10 @@ export function getUrls() { GET_COLUMN_STATS: `/api/tables/analyze`, QUERY_TABLE: `/api/tables/query`, SAMPLE_TABLE: `/api/tables/sample-table`, + + DATA_LOADER_LIST_PARAMS: `/api/tables/data-loader/list-params`, + DATA_LOADER_LIST_TABLES: `/api/tables/data-loader/list-tables`, + DATA_LOADER_INGEST_DATA: `/api/tables/data-loader/ingest-data`, }; } diff --git a/src/views/DBTableManager.tsx b/src/views/DBTableManager.tsx index 2c54d48d..1c3b0836 100644 --- a/src/views/DBTableManager.tsx +++ b/src/views/DBTableManager.tsx @@ -30,7 +30,9 @@ import { CircularProgress, ButtonGroup, Tooltip, - MenuItem + MenuItem, + Chip, + Collapse } from '@mui/material'; import DeleteIcon from '@mui/icons-material/Delete'; import UploadFileIcon from '@mui/icons-material/UploadFile'; @@ -44,6 +46,10 @@ import UploadIcon from '@mui/icons-material/Upload'; import DownloadIcon from '@mui/icons-material/Download'; import RestartAltIcon from '@mui/icons-material/RestartAlt'; import PolylineIcon from '@mui/icons-material/Polyline'; +import ExpandLessIcon from '@mui/icons-material/ExpandLess'; +import ExpandMoreIcon from '@mui/icons-material/ExpandMore'; +import TableRowsIcon from '@mui/icons-material/TableRows'; +import RefreshIcon from '@mui/icons-material/Refresh'; import { getUrls } from '../app/utils'; import { CustomReactTable } from './ReactTable'; @@ -102,22 +108,22 @@ interface DBTable { interface TabPanelProps { children?: React.ReactNode; - index: number; - value: number; + key: string; + show: boolean; sx?: SxProps; } function TabPanel(props: TabPanelProps, sx: SxProps) { - const { children, value, index, ...other } = props; + const { children, show, key, ...other } = props; return ( - ); +} + +export const DataQueryForm: React.FC<{ + dataLoaderType: string, + availableTables: {name: string, fields: string[]}[], + dataLoaderParams: Record, + onImport: () => void, + onFinish: (status: "success" | "error", message: string) => void +}> = ({dataLoaderType, availableTables, dataLoaderParams, onImport, onFinish}) => { + + let activeModel = useSelector(dfSelectors.getActiveModel); + + const [selectedTables, setSelectedTables] = useState(availableTables.map(t => t.name).slice(0, 5)); + + const [waiting, setWaiting] = useState(false); + + const [query, setQuery] = useState("-- query the data source / describe your goal and ask AI to help you write the query\n"); + const [queryResult, setQueryResult] = useState<{ + status: string, + message: string, + sample: any[], + code: string, + } | undefined>(undefined); + const [queryResultName, setQueryResultName] = useState(""); + + const aiCompleteQuery = (query: string) => { + if (queryResult?.status === "error") { + setQueryResult(undefined); + } + let data = { + data_source_metadata: { + data_loader_type: dataLoaderType, + tables: availableTables.filter(t => selectedTables.includes(t.name)) + }, + query: query, + model: activeModel + } + setWaiting(true); + fetch(getUrls().QUERY_COMPLETION, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify(data) + }) + .then(response => response.json()) + .then(data => { + setWaiting(false); + if (data.status === "ok") { + setQuery(data.query); + } else { + onFinish("error", data.reasoning); + } + }) + .catch(error => { + setWaiting(false); + onFinish("error", `Failed to complete query please try again.`); + }); + } + + const handleViewQuerySample = (query: string) => { + setQueryResult(undefined); + setWaiting(true); + fetch(getUrls().DATA_LOADER_VIEW_QUERY_SAMPLE, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + data_loader_type: dataLoaderType, + data_loader_params: dataLoaderParams, + query: query + }) + }) + .then(response => response.json()) + .then(data => { + setWaiting(false); + if (data.status === "success") { + setQueryResult({ + status: "success", + message: "Data loaded successfully", + sample: data.sample, + code: query + }); + let newName = `r_${Math.random().toString(36).substring(2, 4)}`; + setQueryResultName(newName); + } else { + setQueryResult({ + status: "error", + message: data.message, + sample: [], + code: query + }); + } + }) + .catch(error => { + setWaiting(false); + setQueryResult({ + status: "error", + message: `Failed to view query sample, please try again.`, + sample: [], + code: query + }); + }); + } + + const handleImportQueryResult = () => { + setWaiting(true); + fetch(getUrls().DATA_LOADER_INGEST_DATA_FROM_QUERY, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + data_loader_type: dataLoaderType, + data_loader_params: dataLoaderParams, + query: queryResult?.code ?? query, + name_as: queryResultName + }) + }) + .then(response => response.json()) + .then(data => { + setWaiting(false); + if (data.status === "success") { + onFinish("success", "Data imported successfully"); + } else { + onFinish("error", data.reasoning); + } + }) + .catch(error => { + setWaiting(false); + onFinish("error", `Failed to import data, please try again.`); + }); + } + + let queryResultBox = queryResult?.status === "success" ? [ + + ({id: t, label: t}))} rowsPerPageNum={-1} compact={false} /> + , + + + setQueryResultName(event.target.value)} + /> + + + ] : []; + + return ( + + {waiting && + + } + + + query from tables: + + {availableTables.map((table) => ( + : undefined} + color={selectedTables.includes(table.name) ? "primary" : "default"} variant="outlined" + sx={{ fontSize: 11, margin: 0.25, + height: 20, borderRadius: 0.5, + borderColor: selectedTables.includes(table.name) ? "primary.main" : "rgba(0, 0, 0, 0.1)", + color: selectedTables.includes(table.name) ? "primary.main" : "text.secondary", + '&:hover': { + backgroundColor: "rgba(0, 0, 0, 0.07)", + } + }} + size="small" + onClick={() => { + setSelectedTables(selectedTables.includes(table.name) ? selectedTables.filter(t => t !== table.name) : [...selectedTables, table.name]); + }} + /> + ))} + + + + { + setQuery(tempCode); + }} + highlight={code => Prism.highlight(code, Prism.languages.sql, 'sql')} + padding={10} + style={{ + minHeight: queryResult ? 60 : 200, + fontFamily: '"Fira code", "Fira Mono", monospace', + fontSize: 12, + paddingBottom: '24px', + backgroundColor: "rgba(0, 0, 0, 0.03)", + + overflowY: "auto" + }} + /> + + {queryResult?.status === "error" && + + {queryResult?.message} + + + } + + + + + {queryResult && queryResultBox} + + + ) } \ No newline at end of file diff --git a/src/views/ModelSelectionDialog.tsx b/src/views/ModelSelectionDialog.tsx index d0e57ac5..461386f7 100644 --- a/src/views/ModelSelectionDialog.tsx +++ b/src/views/ModelSelectionDialog.tsx @@ -91,6 +91,39 @@ export const ModelSelectionButton: React.FC<{}> = ({ }) => { }); }, []); + useEffect(() => { + const findWorkingModel = async () => { + for (let i = 0; i < models.length; i++) { + if (testedModels.find(t => t.id == models[i].id)) { + continue; + } + const model = models[i]; + const message = { + method: 'POST', + headers: { 'Content-Type': 'application/json', }, + body: JSON.stringify({ + model: model, + }), + }; + try { + const response = await fetch(getUrls().TEST_MODEL, {...message }); + const data = await response.json(); + const status = data["status"] || 'error'; + updateModelStatus(model, status, data["message"] || ""); + if (status === 'ok') { + break; + } + } catch (error) { + updateModelStatus(model, 'error', (error as Error).message || 'Failed to test model'); + } + } + }; + + if (models.length > 0 && testedModels.filter(t => t.status == 'ok').length == 0) { + findWorkingModel(); + } + }, []); + let updateModelStatus = (model: ModelConfig, status: 'ok' | 'error' | 'testing' | 'unknown', message: string) => { dispatch(dfActions.updateModelStatus({id: model.id, status, message})); } From b8d3f115a1fea943337815c11402aad815476317 Mon Sep 17 00:00:00 2001 From: Chenglong Wang Date: Tue, 13 May 2025 15:45:54 -0700 Subject: [PATCH 07/12] various data-loader templates --- py-src/data_formulator/agents/agent_py_data_rec.py | 2 +- py-src/data_formulator/agents/agent_py_data_transform.py | 4 +--- py-src/data_formulator/db_manager.py | 6 ++---- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/py-src/data_formulator/agents/agent_py_data_rec.py b/py-src/data_formulator/agents/agent_py_data_rec.py index f67ef372..6c1db07b 100644 --- a/py-src/data_formulator/agents/agent_py_data_rec.py +++ b/py-src/data_formulator/agents/agent_py_data_rec.py @@ -165,7 +165,7 @@ def process_gpt_response(self, input_tables, messages, response): if result['status'] == 'ok': result_df = result['content'] result['content'] = { - 'rows': result_df.to_dict(orient='records'), + 'rows': json.loads(result_df.to_json(orient='records')), } else: logger.info(result['content']) diff --git a/py-src/data_formulator/agents/agent_py_data_transform.py b/py-src/data_formulator/agents/agent_py_data_transform.py index e096a449..b3cc999e 100644 --- a/py-src/data_formulator/agents/agent_py_data_transform.py +++ b/py-src/data_formulator/agents/agent_py_data_transform.py @@ -221,13 +221,11 @@ def process_gpt_response(self, input_tables, messages, response): result = py_sandbox.run_transform_in_sandbox2020(code_str, [pd.DataFrame.from_records(t['rows']) for t in input_tables], self.exec_python_in_subprocess) result['code'] = code_str - print(f"result: {result}") - if result['status'] == 'ok': # parse the content result_df = result['content'] result['content'] = { - 'rows': result_df.to_dict(orient='records'), + 'rows': json.loads(result_df.to_json(orient='records')), } else: logger.info(result['content']) diff --git a/py-src/data_formulator/db_manager.py b/py-src/data_formulator/db_manager.py index bb914d99..348ef244 100644 --- a/py-src/data_formulator/db_manager.py +++ b/py-src/data_formulator/db_manager.py @@ -1,8 +1,6 @@ import duckdb import pandas as pd -from typing import Optional, Dict, List, ContextManager, Any, Tuple -import time -from flask import session +from typing import Dict import tempfile import os from contextlib import contextmanager @@ -15,7 +13,7 @@ def __init__(self, local_db_dir: str): self._local_db_dir: str = local_db_dir @contextmanager - def connection(self, session_id: str) -> ContextManager[duckdb.DuckDBPyConnection]: + def connection(self, session_id: str): """Get a DuckDB connection as a context manager that will be closed when exiting the context""" conn = None try: From f5554b99aa8686bcd510161c3bd01b6ccb70dac4 Mon Sep 17 00:00:00 2001 From: Chenglong Wang Date: Tue, 13 May 2025 16:14:12 -0700 Subject: [PATCH 08/12] update readme and version etc --- README.md | 8 +++++ py-src/data_formulator/data_loader/README.md | 36 ++++++++++++++++++++ pyproject.toml | 2 +- 3 files changed, 45 insertions(+), 1 deletion(-) create mode 100644 py-src/data_formulator/data_loader/README.md diff --git a/README.md b/README.md index 0987577f..69f43c90 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,14 @@ Transform data and create rich visualizations iteratively with AI 🪄. Try Data ## News 🔥🔥🔥 +- [04-23-2025] Data Formulator 0.2.1: External Data Loader + - We introduced an external data loader to make import data easier. + - Example data loaders from MySQL and Azure Data Explorer (Kusto) are provided. + - Call for action: + - Users: let us know which data source you'd like to load data from. + - Developers: help us build more data loaders. + - Created a discord channel for discussions. + - [04-23-2025] Data Formulator 0.2: working with *large* data 📦📦📦 - Explore large data by: 1. Upload large data file to the local database (powered by [DuckDB](https://github.com/duckdb/duckdb)). diff --git a/py-src/data_formulator/data_loader/README.md b/py-src/data_formulator/data_loader/README.md new file mode 100644 index 00000000..c3270c15 --- /dev/null +++ b/py-src/data_formulator/data_loader/README.md @@ -0,0 +1,36 @@ +## Data Loader Module + +This module provides a framework for loading data from various external sources into DuckDB. It follows an abstract base class pattern to ensure consistent implementation across different data sources. + +### Building a New Data Loader + +The abstract class `ExternalDataLoader` defines the data loader interface. Each concrete implementation (e.g., `KustoDataLoader`, `MySQLDataLoader`) handles specific data source connections and data ingestion. + +To create a new data loader: + +1. Create a new class that inherits from `ExternalDataLoader` +2. Implement the required abstract methods: + - `list_params()`: Define required connection parameters + - `__init__()`: Initialize connection to data source + - `list_tables()`: List available tables/views + - `ingest_data()`: Load data from source + - `view_query_sample()`: Preview query results + - `ingest_data_from_query()`: Load data from custom query +3. Register the new class into `__init__.py` so that the front-end can automatically discover the new data loader. + +The UI automatically provide the query completion option to help user generate queries for the given data loader (from NL or partial queries). + +### Example Implementations + +- `KustoDataLoader`: Azure Data Explorer (Kusto) integration +- `MySQLDataLoader`: MySQL database integration + +### Testing + +Ensure your implementation: +- Handles connection errors gracefully +- Properly sanitizes table names +- Respects size limits for data ingestion +- Returns consistent metadata format + +Launch the front-end and test the data loader. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index f049232b..96675706 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "data_formulator" -version = "0.2.1.1" +version = "0.2.1.2" requires-python = ">=3.9" authors = [ From 436f1835cb40b44ca3ffec384bed187f1e0aca9d Mon Sep 17 00:00:00 2001 From: Chenglong Wang Date: Tue, 13 May 2025 16:29:07 -0700 Subject: [PATCH 09/12] tweak --- README.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 69f43c90..ecc3b178 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)  [![YouTube](https://img.shields.io/badge/YouTube-white?logo=youtube&logoColor=%23FF0000)](https://youtu.be/3ndlwt0Wi3c)  [![build](https://github.com/microsoft/data-formulator/actions/workflows/python-build.yml/badge.svg)](https://github.com/microsoft/data-formulator/actions/workflows/python-build.yml) +[![Discord](https://img.shields.io/badge/discord-chat-green?logo=discord)](https://discord.gg/mYCZMQKYZb) @@ -23,12 +24,12 @@ Transform data and create rich visualizations iteratively with AI 🪄. Try Data ## News 🔥🔥🔥 - [04-23-2025] Data Formulator 0.2.1: External Data Loader - - We introduced an external data loader to make import data easier. + - We introduced external data loader to make import data easier. [readme](https://github.com/microsoft/data-formulator/tree/main/py-src/data_formulator/data_loader) [UI demo](https://github.com/microsoft/data-formulator/pull/155) - Example data loaders from MySQL and Azure Data Explorer (Kusto) are provided. - Call for action: - Users: let us know which data source you'd like to load data from. - - Developers: help us build more data loaders. - - Created a discord channel for discussions. + - Developers: let's build more data loaders. + - Created a discord channel for discussions, data formulator, data analysis & AI and more [![Discord](https://img.shields.io/badge/discord-chat-green?logo=discord)](https://discord.gg/mYCZMQKYZb). - [04-23-2025] Data Formulator 0.2: working with *large* data 📦📦📦 - Explore large data by: From e8c8c07ebacdc6ff66666dae51249eabe25db799 Mon Sep 17 00:00:00 2001 From: Chenglong Wang Date: Tue, 13 May 2025 16:30:25 -0700 Subject: [PATCH 10/12] tweak --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ecc3b178..68d93b19 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ Transform data and create rich visualizations iteratively with AI 🪄. Try Data - Call for action: - Users: let us know which data source you'd like to load data from. - Developers: let's build more data loaders. - - Created a discord channel for discussions, data formulator, data analysis & AI and more [![Discord](https://img.shields.io/badge/discord-chat-green?logo=discord)](https://discord.gg/mYCZMQKYZb). + - Discord channel for discussions: join here -- [![Discord](https://img.shields.io/badge/discord-chat-green?logo=discord)](https://discord.gg/mYCZMQKYZb). - [04-23-2025] Data Formulator 0.2: working with *large* data 📦📦📦 - Explore large data by: From 49bade7efa8fafcfe998d4df09e065c8e78328a8 Mon Sep 17 00:00:00 2001 From: Chenglong Wang Date: Tue, 13 May 2025 16:32:25 -0700 Subject: [PATCH 11/12] minor --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 68d93b19..f33ae7b1 100644 --- a/README.md +++ b/README.md @@ -24,12 +24,12 @@ Transform data and create rich visualizations iteratively with AI 🪄. Try Data ## News 🔥🔥🔥 - [04-23-2025] Data Formulator 0.2.1: External Data Loader - - We introduced external data loader to make import data easier. [readme](https://github.com/microsoft/data-formulator/tree/main/py-src/data_formulator/data_loader) [UI demo](https://github.com/microsoft/data-formulator/pull/155) + - We introduced external data loader class to make import data easier. [Readme](https://github.com/microsoft/data-formulator/tree/main/py-src/data_formulator/data_loader) and [Demo](https://github.com/microsoft/data-formulator/pull/155) - Example data loaders from MySQL and Azure Data Explorer (Kusto) are provided. - Call for action: - Users: let us know which data source you'd like to load data from. - Developers: let's build more data loaders. - - Discord channel for discussions: join here -- [![Discord](https://img.shields.io/badge/discord-chat-green?logo=discord)](https://discord.gg/mYCZMQKYZb). + - Discord channel for discussions: join us! [![Discord](https://img.shields.io/badge/discord-chat-green?logo=discord)](https://discord.gg/mYCZMQKYZb). - [04-23-2025] Data Formulator 0.2: working with *large* data 📦📦📦 - Explore large data by: From 16c2ea929a65e9782d6b2547f2dcac0a0b52d2eb Mon Sep 17 00:00:00 2001 From: Chenglong Wang Date: Tue, 13 May 2025 16:38:39 -0700 Subject: [PATCH 12/12] add some links --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index f33ae7b1..d244ad77 100644 --- a/README.md +++ b/README.md @@ -23,13 +23,13 @@ Transform data and create rich visualizations iteratively with AI 🪄. Try Data ## News 🔥🔥🔥 -- [04-23-2025] Data Formulator 0.2.1: External Data Loader +- [05-13-2025] Data Formulator 0.2.1: External Data Loader - We introduced external data loader class to make import data easier. [Readme](https://github.com/microsoft/data-formulator/tree/main/py-src/data_formulator/data_loader) and [Demo](https://github.com/microsoft/data-formulator/pull/155) - Example data loaders from MySQL and Azure Data Explorer (Kusto) are provided. - - Call for action: + - Call for action [link](https://github.com/microsoft/data-formulator/issues/156): - Users: let us know which data source you'd like to load data from. - Developers: let's build more data loaders. - - Discord channel for discussions: join us! [![Discord](https://img.shields.io/badge/discord-chat-green?logo=discord)](https://discord.gg/mYCZMQKYZb). + - Discord channel for discussions: join us! [![Discord](https://img.shields.io/badge/discord-chat-green?logo=discord)](https://discord.gg/mYCZMQKYZb) - [04-23-2025] Data Formulator 0.2: working with *large* data 📦📦📦 - Explore large data by: