@@ -73,12 +73,13 @@ def __init__(
7373 config_location = None ,
7474 dataset_types = None ,
7575 schedule_frequency = None ,
76- process_run_id = None ,
76+ process_tracking_id = None ,
7777 ):
7878 """
7979 ProcessTracker is the primary engine for tracking data integration processes.
8080 :param process_name: Name of the process being tracked.
8181 :param process_run_name: Optional name of the process run.
82+ :param process_type: Type of process the process_name is. Optional if process already exists.
8283 :param actor_name: Name of the person or environment runnning the process.
8384 :param tool_name: Name of the tool used to run the process.
8485 :param sources: A single source name or list of source names for the given process. If source_objects is set,
@@ -104,9 +105,9 @@ def __init__(
104105 :type dataset_types: list
105106 :param schedule_frequency: The general scheduling frequency for the process (i.e. hourly)
106107 :type schedule_frequency: string
107- :param process_run_id : If trying to access an already running process, provide the process run's id.
108+ :param process_tracking_id : If trying to access an already running process, provide the process run's id.
108109 Object will be built for that specific process run.
109- :type process_run_id : int
110+ :type process_tracking_id : int
110111 """
111112 self .config_location = config_location
112113 self .config = SettingsManager (config_location = self .config_location )
@@ -128,11 +129,13 @@ def __init__(
128129 self .process_status_failed = self .process_status_types ["failed" ]
129130 self .process_status_hold = self .process_status_types ["on hold" ]
130131
131- if process_run_id is not None :
132+ if process_tracking_id is not None :
132133 self .logger .info ("Process run id provided. Checking if exists." )
133134
134135 process_run = self .data_store .get_or_create_item (
135- model = ProcessTracking , process_tracking_id = process_run_id , create = False
136+ model = ProcessTracking ,
137+ process_tracking_id = process_tracking_id ,
138+ create = False ,
136139 )
137140
138141 if process_run is not None :
@@ -145,32 +148,32 @@ def __init__(
145148
146149 self .dataset_types = process_run .process .dataset_types
147150 self .sources = self .determine_process_sources (
148- process_run_id = process_run_id
151+ process_run_id = process_tracking_id
149152 )
150153 self .targets = self .determine_process_targets (
151- process_run_id = process_run_id
154+ process_run_id = process_tracking_id
152155 )
153156
154157 self .process_name = process_run .process .process_name
155158 self .process_tracking_run = process_run
156159 self .process_run_name = process_run .process_run_name
157160
158161 else :
159- error_msg = "Process run not found based on id %s." % process_run_id
162+ error_msg = (
163+ "Process run not found based on id %s." % process_tracking_id
164+ )
160165 self .logger .error (error_msg )
161166 raise Exception (error_msg )
162167 else :
163- if process_name is None or process_type is None :
164- error_msg = "process_name and process_type must be set."
168+ if process_name is None is None :
169+ error_msg = "process_name must be set."
165170 self .logger .error (error_msg )
166171 raise Exception (error_msg )
167172
168173 self .actor = self .data_store .get_or_create_item (
169174 model = Actor , actor_name = actor_name
170175 )
171- self .process_type = self .data_store .get_or_create_item (
172- model = ProcessType , process_type_name = process_type
173- )
176+
174177 self .tool = self .data_store .get_or_create_item (
175178 model = Tool , tool_name = tool_name
176179 )
@@ -184,13 +187,27 @@ def __init__(
184187 model = ScheduleFrequency , schedule_frequency_name = schedule_frequency
185188 )
186189
187- self .process = self .data_store .get_or_create_item (
188- model = Process ,
189- process_name = process_name ,
190- process_type_id = self .process_type .process_type_id ,
191- process_tool_id = self .tool .tool_id ,
192- schedule_frequency_id = self .schedule_frequency .schedule_frequency_id ,
193- )
190+ if process_type is None :
191+
192+ self .process = self .data_store .get_or_create_item (
193+ model = Process , process_name = process_name , create = False
194+ )
195+
196+ self .process_type = self .process .process_type
197+
198+ else :
199+
200+ self .process_type = self .data_store .get_or_create_item (
201+ model = ProcessType , process_type_name = process_type
202+ )
203+
204+ self .process = self .data_store .get_or_create_item (
205+ model = Process ,
206+ process_name = process_name ,
207+ process_type_id = self .process_type .process_type_id ,
208+ process_tool_id = self .tool .tool_id ,
209+ schedule_frequency_id = self .schedule_frequency .schedule_frequency_id ,
210+ )
194211
195212 # Dataset types should be loaded before source and target because they are also used there.
196213
@@ -325,7 +342,7 @@ def determine_hold_status(self, last_run_status, last_run_id):
325342 Based on the setting 'max_concurrent_failures', count the number of failures for that number of process runs.
326343 If the counts match, process will remain on hold. If last run is 'on_hold' process will remain on hold.
327344 :param last_run_status: The status of the previous run
328- :param last_run_id: The process_run_id of the previous run
345+ :param last_run_id: The process_tracking_id of the previous run
329346 :return:
330347 """
331348 self .logger .debug ("Determining if process should be put on or remain on hold." )
@@ -362,7 +379,7 @@ def determine_hold_status(self, last_run_status, last_run_id):
362379
363380 def determine_process_sources (self , process_run_id ):
364381 """
365- Based on the process_run_id , find the given process' sources - either at the attribute, object, or source level
382+ Based on the process_tracking_id , find the given process' sources - either at the attribute, object, or source level
366383 :param process_run_id: Process run identifier
367384 :type process_run_id: int
368385 :return: Array of source objects at lowest granularity.
@@ -425,7 +442,7 @@ def determine_process_sources(self, process_run_id):
425442
426443 def determine_process_targets (self , process_run_id ):
427444 """
428- Based on the process_run_id , find the given process' targets - either at the attribute, object, or source level
445+ Based on the process_tracking_id , find the given process' targets - either at the attribute, object, or source level
429446 :param process_run_id: Process run identifier
430447 :type process_run_id: int
431448 :return: Array of source objects used as target for the process at lowest granularity.
@@ -641,13 +658,13 @@ def find_process_by_schedule_frequency(self, frequency="daily"):
641658 process_list = list ()
642659
643660 processes = (
644- self .data_store .session .query (Process . process_id )
661+ self .data_store .session .query (Process )
645662 .join (ScheduleFrequency )
646663 .filter (ScheduleFrequency .schedule_frequency_name == frequency )
647664 )
648665
649666 for process in processes :
650- process_list .append (process . process_id )
667+ process_list .append (process )
651668
652669 return process_list
653670
@@ -1319,22 +1336,53 @@ def set_process_run_low_high_dates(self, low_date=None, high_date=None):
13191336
13201337 self .session .commit ()
13211338
1322- def set_process_run_record_count (self , num_records ):
1339+ def record_count_manager (self , original_count , num_records ):
1340+ """
1341+ Given two record counts, one the overall count and the other the current count, process and return the adjusted
1342+ amount.
1343+ :param original_count: The original overall count of records
1344+ :type original_count: int
1345+ :param num_records: The adjusted count of records
1346+ :type num_records: int
1347+ """
1348+ if original_count == 0 or original_count is None :
1349+ return_count = num_records
1350+ else :
1351+ return_count = original_count + (num_records - original_count )
1352+
1353+ return return_count
1354+
1355+ def set_process_run_record_count (self , num_records , processing_type = None ):
13231356 """
13241357 For the given process run, set the process_run_record_count for the number of records processed. Will also
13251358 update the process' total_record_count - the total number of records ever processed by that process
1326- :param num_records:
1359+ :param num_records: Count of number of records processed.
1360+ :type num_records: int
1361+ :param processing_type: Type of records being processed. Valid values: None, insert, update. None will only
1362+ update overall counts
13271363 :return:
13281364 """
13291365 process_run_records = self .process .total_record_count
1366+ process_run_inserts = self .process_tracking_run .process_run_insert_count
1367+ process_run_updates = self .process_tracking_run .process_run_update_count
13301368
1331- if process_run_records == 0 or process_run_records is None :
1332-
1333- self .process .total_record_count = num_records
1334- else :
1335- self .process .total_record_count = self .process .total_record_count + (
1336- num_records - process_run_records
1369+ if processing_type == "insert" :
1370+ self .process_tracking_run .process_run_insert_count = self .record_count_manager (
1371+ original_count = process_run_inserts , num_records = num_records
1372+ )
1373+ elif processing_type == "update" :
1374+ self .process_tracking_run .process_run_update_count = self .record_count_manager (
1375+ original_count = process_run_updates , num_records = num_records
13371376 )
1377+ elif processing_type is not None :
1378+ error_msg = "Processing type not recognized."
1379+ self .logger .error (error_msg )
1380+ raise Exception (error_msg )
1381+
1382+ self .process .total_record_count = self .record_count_manager (
1383+ original_count = process_run_records , num_records = num_records
1384+ )
13381385
13391386 self .process_tracking_run .process_run_record_count = num_records
1387+
13401388 self .session .commit ()
0 commit comments