@@ -14,7 +14,6 @@ def __init__(self):
1414 def query_data (self , query : str , batch_size : int = None ):
1515 """Executes a SELECT query and returns results as a DataFrame."""
1616 self .db .check_connection ()
17-
1817 if self .conn is None :
1918 logging .error ("No active database connection." )
2019 return None
@@ -23,9 +22,8 @@ def query_data(self, query: str, batch_size: int = None):
2322 try :
2423 cursor = self .conn .cursor ()
2524 cursor .execute (query )
26-
25+ rows = []
2726 if batch_size :
28- rows = []
2927 while True :
3028 batch = cursor .fetchmany (batch_size )
3129 if not batch :
@@ -36,8 +34,9 @@ def query_data(self, query: str, batch_size: int = None):
3634
3735 columns = [desc [0 ] for desc in cursor .description ]
3836 df = pd .DataFrame .from_records (rows , columns = columns )
39-
40- logging .info (f"Query executed in { time .time () - start_time :.4f} seconds" )
37+ print ("Data fetched successfully!" )
38+ print ("Dataframe Size" , df .shape )
39+ logging .info (f"Data fetched in { time .time () - start_time :.4f} seconds" )
4140 return df
4241 except Exception as e :
4342 logging .error (f"Error executing query: { e } " )
@@ -46,12 +45,11 @@ def query_data(self, query: str, batch_size: int = None):
4645 cursor .close ()
4746
4847 def execute_query (self , query : str ):
49- """Executes an INSERT, UPDATE, or DELETE query ."""
48+ """Executes INSERT, UPDATE, DELETE queries ."""
5049 return execute_with_retry (lambda : self ._execute_query (query ))
5150
5251 def _execute_query (self , query : str ) -> bool :
5352 self .db .check_connection ()
54-
5553 if self .conn is None :
5654 logging .error ("No active database connection." )
5755 return False
@@ -68,6 +66,50 @@ def _execute_query(self, query: str) -> bool:
6866 finally :
6967 cursor .close ()
7068
69+ def insert_dataframe (self , df : pd .DataFrame , table_name : str ):
70+ """Inserts a DataFrame into the specified SQL table."""
71+ self .db .check_connection ()
72+ if self .conn is None :
73+ logging .error ("No active database connection." )
74+ return False
75+
76+ try :
77+ df .to_sql (table_name , self .conn , if_exists = 'append' , index = False )
78+ logging .info (f"Inserted DataFrame into table '{ table_name } ' successfully!" )
79+ return True
80+ except Exception as e :
81+ logging .error (f"Error inserting DataFrame: { e } " )
82+ return False
83+
84+ def update_table_with_dataframe (self , df : pd .DataFrame , table_name : str , key_columns : list ):
85+ """Updates records in a SQL table using a DataFrame based on key columns."""
86+ self .db .check_connection ()
87+ if self .conn is None :
88+ logging .error ("No active database connection." )
89+ return False
90+
91+ try :
92+ cursor = self .conn .cursor ()
93+ for _ , row in df .iterrows ():
94+ set_clause = ", " .join ([f"{ col } = ?" for col in df .columns if col not in key_columns ])
95+ where_clause = " AND " .join ([f"{ key } = ?" for key in key_columns ])
96+ values = [row [col ] for col in df .columns if col not in key_columns ] + [row [key ] for key in key_columns ]
97+ sql = f"UPDATE { table_name } SET { set_clause } WHERE { where_clause } "
98+ cursor .execute (sql , values )
99+
100+ self .conn .commit ()
101+ logging .info (f"Updated table '{ table_name } ' successfully using DataFrame." )
102+ return True
103+ except Exception as e :
104+ logging .error (f"Error updating table with DataFrame: { e } " )
105+ return False
106+ finally :
107+ cursor .close ()
108+
109+ def create_table (self , create_sql : str ):
110+ """Creates a SQL table using the provided CREATE TABLE SQL statement."""
111+ return self .execute_query (create_sql )
112+
71113 def close (self ):
72114 """Closes the database connection."""
73115 self .db .close_connection ()
0 commit comments