@@ -21,20 +21,17 @@ def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "proc
2121 # Ensure directory exists
2222 db_dir = os .path .dirname (db_path )
2323 if db_dir and not os .path .exists (db_dir ):
24- os .makedirs (db_dir )
25-
24+ os .makedirs (db_dir )
2625 conn = None
2726 try :
2827 # Connect to database
2928 conn = sqlite3 .connect (db_path )
3029 cursor = conn .cursor ()
3130
3231 # TODO (Find & Fix): Table creation and schema logic missing
33-
34- # Idempotency check (should avoid duplicate inserts)
3532 cursor .execute (f"""
3633 CREATE TABLE IF NOT EXISTS { table_name } (
37- employee_id INTEGER PRIMARY KEY,
34+ employee_id TEXT PRIMARY KEY,
3835 name TEXT,
3936 email TEXT,
4037 age INTEGER,
@@ -50,15 +47,11 @@ def load(df: pd.DataFrame, db_path: str = "etl_data.db", table_name: str = "proc
5047 )
5148 """ )
5249
53- data_to_insert = [tuple (row ) for row in df .itertuples (index = False , name = None )]
5450 placeholders = ", " .join (["?" ] * len (df .columns ))
5551 column_names = ", " .join (df .columns )
56- sql_query = f"INSERT OR IGNORE INTO { table_name } ({ column_names } ) VALUES ({ placeholders } )"
57- cursor .executemany (sql_query , data_to_insert )
52+ sql_query = f"INSERT OR REPLACE INTO { table_name } ({ column_names } ) VALUES ({ placeholders } )"
53+ cursor .executemany (sql_query , df . itertuples ( index = False , name = None ) )
5854 conn .commit ()
59- # TODO (Find & Fix): Bulk insert without checking for duplicates
60-
61-
6255 except sqlite3 .Error as e :
6356 if conn :
6457 conn .rollback ()
0 commit comments