Skip to content

Commit cefef6b

Browse files
authored
Merge pull request #166 from OpenDataAlex/process_tracker_python-143
Process tracker python 143
2 parents c8650a4 + 69af17e commit cefef6b

File tree

5 files changed

+120
-9
lines changed

5 files changed

+120
-9
lines changed

dbscripts/mysql_process_tracker.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,8 @@ create table process_tracking
338338
process_run_actor_id int null,
339339
is_latest_run tinyint(1) not null,
340340
process_run_name varchar(250) null,
341+
process_run_insert_count int default 0 not null,
342+
process_run_update_count int default 0 not null,
341343
constraint process_tracking_process_run_name_uindex
342344
unique (process_run_name),
343345
constraint process_tracking_ibfk_1

dbscripts/postgresql_process_tracker.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,9 @@ create table process_tracking
365365
constraint process_tracking_fk03
366366
references actor_lkup,
367367
is_latest_run boolean default false,
368-
process_run_name varchar(250) null
368+
process_run_name varchar(250) null,
369+
process_run_insert_count int default 0 not null,
370+
process_run_update_count int default 0 not null
369371
);
370372

371373
comment on table process_tracking is 'Tracking table of process runs.';

process_tracker/models/process.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,8 @@ class ProcessTracking(Base):
508508
process_run_start_date_time = Column(DateTime, nullable=False)
509509
process_run_end_date_time = Column(DateTime, nullable=True)
510510
process_run_record_count = Column(Integer, nullable=False, default=0)
511+
process_run_insert_count = Column(Integer, nullable=False, default=0)
512+
process_run_update_count = Column(Integer, nullable=False, default=0)
511513
process_run_actor_id = Column(
512514
Integer, ForeignKey("process_tracker.actor_lkup.actor_id"), nullable=False
513515
)

process_tracker/process_tracker.py

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1336,22 +1336,53 @@ def set_process_run_low_high_dates(self, low_date=None, high_date=None):
13361336

13371337
self.session.commit()
13381338

1339-
def set_process_run_record_count(self, num_records):
1339+
def record_count_manager(self, original_count, num_records):
1340+
"""
1341+
Given two record counts, one the overall count and the other the current count, process and return the adjusted
1342+
amount.
1343+
:param original_count: The original overall count of records
1344+
:type original_count: int
1345+
:param num_records: The adjusted count of records
1346+
:type num_records: int
1347+
"""
1348+
if original_count == 0 or original_count is None:
1349+
return_count = num_records
1350+
else:
1351+
return_count = original_count + (num_records - original_count)
1352+
1353+
return return_count
1354+
1355+
def set_process_run_record_count(self, num_records, processing_type=None):
13401356
"""
13411357
For the given process run, set the process_run_record_count for the number of records processed. Will also
13421358
update the process' total_record_count - the total number of records ever processed by that process
1343-
:param num_records:
1359+
:param num_records: Count of number of records processed.
1360+
:type num_records: int
1361+
:param processing_type: Type of records being processed. Valid values: None, insert, update. None will only
1362+
update overall counts
13441363
:return:
13451364
"""
13461365
process_run_records = self.process.total_record_count
1366+
process_run_inserts = self.process_tracking_run.process_run_insert_count
1367+
process_run_updates = self.process_tracking_run.process_run_update_count
13471368

1348-
if process_run_records == 0 or process_run_records is None:
1349-
1350-
self.process.total_record_count = num_records
1351-
else:
1352-
self.process.total_record_count = self.process.total_record_count + (
1353-
num_records - process_run_records
1369+
if processing_type == "insert":
1370+
self.process_tracking_run.process_run_insert_count = self.record_count_manager(
1371+
original_count=process_run_inserts, num_records=num_records
1372+
)
1373+
elif processing_type == "update":
1374+
self.process_tracking_run.process_run_update_count = self.record_count_manager(
1375+
original_count=process_run_updates, num_records=num_records
13541376
)
1377+
elif processing_type is not None:
1378+
error_msg = "Processing type not recognized."
1379+
self.logger.error(error_msg)
1380+
raise Exception(error_msg)
1381+
1382+
self.process.total_record_count = self.record_count_manager(
1383+
original_count=process_run_records, num_records=num_records
1384+
)
13551385

13561386
self.process_tracking_run.process_run_record_count = num_records
1387+
13571388
self.session.commit()

tests/test_process_tracker.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1877,6 +1877,80 @@ def test_set_process_run_record_count_twice(self):
18771877

18781878
self.assertEqual(expected_result, given_result)
18791879

1880+
def test_set_process_run_record_count_insert(self):
1881+
"""
1882+
Testing that if record counts are provided for a given process_run, and the processing type is insert,
1883+
update the insert count for the process run.
1884+
"""
1885+
initial_record_count = 1000
1886+
1887+
self.process_tracker.set_process_run_record_count(
1888+
num_records=initial_record_count, processing_type="insert"
1889+
)
1890+
1891+
given_count = self.process_tracker.process_tracking_run.process_run_insert_count
1892+
expected_count = 1000
1893+
1894+
self.assertEqual(expected_count, given_count)
1895+
1896+
def test_set_process_run_record_count_insert_twice(self):
1897+
"""
1898+
Testing that if record counts are provided for a given process_run multiple times, and the processing type is insert,
1899+
update the insert count for the process run.
1900+
"""
1901+
initial_record_count = 1000
1902+
modified_record_count = 1500
1903+
1904+
self.process_tracker.set_process_run_record_count(
1905+
num_records=initial_record_count, processing_type="insert"
1906+
)
1907+
1908+
self.process_tracker.set_process_run_record_count(
1909+
num_records=modified_record_count, processing_type="insert"
1910+
)
1911+
1912+
given_count = self.process_tracker.process_tracking_run.process_run_insert_count
1913+
expected_count = 1500
1914+
1915+
self.assertEqual(expected_count, given_count)
1916+
1917+
def test_set_process_run_record_count_update(self):
1918+
"""
1919+
Testing that if record counts are provided for a given process_run, and the processing type is update,
1920+
update the update count for the process run.
1921+
"""
1922+
initial_record_count = 1000
1923+
1924+
self.process_tracker.set_process_run_record_count(
1925+
num_records=initial_record_count, processing_type="update"
1926+
)
1927+
1928+
given_count = self.process_tracker.process_tracking_run.process_run_update_count
1929+
expected_count = 1000
1930+
1931+
self.assertEqual(expected_count, given_count)
1932+
1933+
def test_set_process_run_record_count_update_twice(self):
1934+
"""
1935+
Testing that if record counts are provided for a given process_run multiple times, and the processing type is update,
1936+
update the update count for the process run.
1937+
"""
1938+
initial_record_count = 1000
1939+
modified_record_count = 1500
1940+
1941+
self.process_tracker.set_process_run_record_count(
1942+
num_records=initial_record_count, processing_type="update"
1943+
)
1944+
1945+
self.process_tracker.set_process_run_record_count(
1946+
num_records=modified_record_count, processing_type="update"
1947+
)
1948+
1949+
given_count = self.process_tracker.process_tracking_run.process_run_update_count
1950+
expected_count = 1500
1951+
1952+
self.assertEqual(expected_count, given_count)
1953+
18801954
def test_register_source_dataset_type(self):
18811955
"""
18821956
When both a source and dataset_type are provided, the source is registered to the dataset_type.

0 commit comments

Comments
 (0)