Skip to content

Commit 2264fc8

Browse files
author
Alex Meadows
committed
process_tracker_python-143 Add insert/update counts for process_tracking
✨ Added ability to break out record counts by processing type (inserts, updates) Instead of just having an overall total record count for process runs, support is added for breaking out the total count by inserts and updates for a given run. Totals are still available, but this allows for finer understanding of what actually was done during the process run. Closes #143
1 parent a072861 commit 2264fc8

File tree

5 files changed

+120
-9
lines changed

5 files changed

+120
-9
lines changed

dbscripts/mysql_process_tracker.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,8 @@ create table process_tracking
338338
process_run_actor_id int null,
339339
is_latest_run tinyint(1) not null,
340340
process_run_name varchar(250) null,
341+
process_run_insert_count int default 0 not null,
342+
process_run_update_count int default 0 not null,
341343
constraint process_tracking_process_run_name_uindex
342344
unique (process_run_name),
343345
constraint process_tracking_ibfk_1

dbscripts/postgresql_process_tracker.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,9 @@ create table process_tracking
365365
constraint process_tracking_fk03
366366
references actor_lkup,
367367
is_latest_run boolean default false,
368-
process_run_name varchar(250) null
368+
process_run_name varchar(250) null,
369+
process_run_insert_count int default 0 not null,
370+
process_run_update_count int default 0 not null
369371
);
370372

371373
comment on table process_tracking is 'Tracking table of process runs.';

process_tracker/models/process.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,8 @@ class ProcessTracking(Base):
508508
process_run_start_date_time = Column(DateTime, nullable=False)
509509
process_run_end_date_time = Column(DateTime, nullable=True)
510510
process_run_record_count = Column(Integer, nullable=False, default=0)
511+
process_run_insert_count = Column(Integer, nullable=False, default=0)
512+
process_run_update_count = Column(Integer, nullable=False, default=0)
511513
process_run_actor_id = Column(
512514
Integer, ForeignKey("process_tracker.actor_lkup.actor_id"), nullable=False
513515
)

process_tracker/process_tracker.py

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1323,22 +1323,53 @@ def set_process_run_low_high_dates(self, low_date=None, high_date=None):
13231323

13241324
self.session.commit()
13251325

1326-
def set_process_run_record_count(self, num_records):
1326+
def record_count_manager(self, original_count, num_records):
1327+
"""
1328+
Given two record counts, one the overall count and the other the current count, process and return the adjusted
1329+
amount.
1330+
:param original_count: The original overall count of records
1331+
:type original_count: int
1332+
:param num_records: The adjusted count of records
1333+
:type num_records: int
1334+
"""
1335+
if original_count == 0 or original_count is None:
1336+
return_count = num_records
1337+
else:
1338+
return_count = original_count + (num_records - original_count)
1339+
1340+
return return_count
1341+
1342+
def set_process_run_record_count(self, num_records, processing_type=None):
13271343
"""
13281344
For the given process run, set the process_run_record_count for the number of records processed. Will also
13291345
update the process' total_record_count - the total number of records ever processed by that process
1330-
:param num_records:
1346+
:param num_records: Count of number of records processed.
1347+
:type num_records: int
1348+
:param processing_type: Type of records being processed. Valid values: None, insert, update. None will only
1349+
update overall counts
13311350
:return:
13321351
"""
13331352
process_run_records = self.process.total_record_count
1353+
process_run_inserts = self.process_tracking_run.process_run_insert_count
1354+
process_run_updates = self.process_tracking_run.process_run_update_count
13341355

1335-
if process_run_records == 0 or process_run_records is None:
1336-
1337-
self.process.total_record_count = num_records
1338-
else:
1339-
self.process.total_record_count = self.process.total_record_count + (
1340-
num_records - process_run_records
1356+
if processing_type == "insert":
1357+
self.process_tracking_run.process_run_insert_count = self.record_count_manager(
1358+
original_count=process_run_inserts, num_records=num_records
1359+
)
1360+
elif processing_type == "update":
1361+
self.process_tracking_run.process_run_update_count = self.record_count_manager(
1362+
original_count=process_run_updates, num_records=num_records
13411363
)
1364+
elif processing_type is not None:
1365+
error_msg = "Processing type not recognized."
1366+
self.logger.error(error_msg)
1367+
raise Exception(error_msg)
1368+
1369+
self.process.total_record_count = self.record_count_manager(
1370+
original_count=process_run_records, num_records=num_records
1371+
)
13421372

13431373
self.process_tracking_run.process_run_record_count = num_records
1374+
13441375
self.session.commit()

tests/test_process_tracker.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1877,6 +1877,80 @@ def test_set_process_run_record_count_twice(self):
18771877

18781878
self.assertEqual(expected_result, given_result)
18791879

1880+
def test_set_process_run_record_count_insert(self):
1881+
"""
1882+
Testing that if record counts are provided for a given process_run, and the processing type is insert,
1883+
update the insert count for the process run.
1884+
"""
1885+
initial_record_count = 1000
1886+
1887+
self.process_tracker.set_process_run_record_count(
1888+
num_records=initial_record_count, processing_type="insert"
1889+
)
1890+
1891+
given_count = self.process_tracker.process_tracking_run.process_run_insert_count
1892+
expected_count = 1000
1893+
1894+
self.assertEqual(expected_count, given_count)
1895+
1896+
def test_set_process_run_record_count_insert_twice(self):
1897+
"""
1898+
Testing that if record counts are provided for a given process_run multiple times, and the processing type is insert,
1899+
update the insert count for the process run.
1900+
"""
1901+
initial_record_count = 1000
1902+
modified_record_count = 1500
1903+
1904+
self.process_tracker.set_process_run_record_count(
1905+
num_records=initial_record_count, processing_type="insert"
1906+
)
1907+
1908+
self.process_tracker.set_process_run_record_count(
1909+
num_records=modified_record_count, processing_type="insert"
1910+
)
1911+
1912+
given_count = self.process_tracker.process_tracking_run.process_run_insert_count
1913+
expected_count = 1500
1914+
1915+
self.assertEqual(expected_count, given_count)
1916+
1917+
def test_set_process_run_record_count_update(self):
1918+
"""
1919+
Testing that if record counts are provided for a given process_run, and the processing type is update,
1920+
update the update count for the process run.
1921+
"""
1922+
initial_record_count = 1000
1923+
1924+
self.process_tracker.set_process_run_record_count(
1925+
num_records=initial_record_count, processing_type="update"
1926+
)
1927+
1928+
given_count = self.process_tracker.process_tracking_run.process_run_update_count
1929+
expected_count = 1000
1930+
1931+
self.assertEqual(expected_count, given_count)
1932+
1933+
def test_set_process_run_record_count_update_twice(self):
1934+
"""
1935+
Testing that if record counts are provided for a given process_run multiple times, and the processing type is update,
1936+
update the update count for the process run.
1937+
"""
1938+
initial_record_count = 1000
1939+
modified_record_count = 1500
1940+
1941+
self.process_tracker.set_process_run_record_count(
1942+
num_records=initial_record_count, processing_type="update"
1943+
)
1944+
1945+
self.process_tracker.set_process_run_record_count(
1946+
num_records=modified_record_count, processing_type="update"
1947+
)
1948+
1949+
given_count = self.process_tracker.process_tracking_run.process_run_update_count
1950+
expected_count = 1500
1951+
1952+
self.assertEqual(expected_count, given_count)
1953+
18801954
def test_register_source_dataset_type(self):
18811955
"""
18821956
When both a source and dataset_type are provided, the source is registered to the dataset_type.

0 commit comments

Comments
 (0)