Skip to content

Commit 65f984b

Browse files
committed
process_tracker_python-28 Process Extract Association
✨ Bulk Extract status change for ProcessTracker ProcessTracker can now bulk change status of extracts retrieved from all extract finders. Closes:#28
1 parent de7841c commit 65f984b

File tree

3 files changed

+58
-1
lines changed

3 files changed

+58
-1
lines changed

process_tracker/data_store.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
]
2929
preload_process_status_types = ["running", "completed", "failed"]
3030
preload_process_types = ["extract", "load"]
31-
preload_system_keys = [{"key": "version", "value": "0.1.0"}]
31+
preload_system_keys = [{"key": "version", "value": "0.2.0"}]
3232

3333
relational_stores = ["postgresql", "mysql", "oracle", "mssql", "snowflake"]
3434
nonrelational_stores = []

process_tracker/process_tracker.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,20 @@ def __init__(
9898

9999
self.process_tracking_run = self.register_new_process_run()
100100

101+
@staticmethod
102+
def bulk_change_extract_status(extracts, extract_status):
103+
"""
104+
Given a set of extract objects, update the extract process record to reflect the association and updated status
105+
as well as the extract record's' status.
106+
:param extracts: List of Extract SQLAlchemy objects to be bulk updated.
107+
:param extract_status: The status to change the extract files to.
108+
:type extract_status: str
109+
:return:
110+
"""
111+
112+
for extract in extracts:
113+
extract.change_extract_status(new_status=extract_status)
114+
101115
def change_run_status(self, new_status, end_date=None):
102116
"""
103117
Change a process tracking run record from 'running' to another status.
@@ -155,6 +169,8 @@ def find_ready_extracts_by_filename(self, filename):
155169
.all()
156170
)
157171

172+
self.logger.info("Returning extract files by filename.")
173+
158174
return process_files
159175

160176
def find_ready_extracts_by_location(self, location_name=None, location_path=None):
@@ -195,6 +211,7 @@ def find_ready_extracts_by_location(self, location_name=None, location_path=None
195211
"A location name or path must be provided. Please try again."
196212
)
197213

214+
self.logger.info("Returning extract files by location.")
198215
return process_files
199216

200217
def find_ready_extracts_by_process(self, extract_process_name):

tests/test_process_tracker.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,46 @@ def tearDown(self):
112112
self.session.query(ErrorType).delete()
113113
self.session.commit()
114114

115+
def test_bulk_change_extract_status(self):
116+
"""
117+
Testing that bulk change occurs when extracts provided.
118+
:return:
119+
"""
120+
extract = ExtractTracker(
121+
process_run=self.process_tracker,
122+
filename="test_extract_filename2.csv",
123+
location_name="Test Location",
124+
location_path="/home/test/extract_dir",
125+
)
126+
127+
extract2 = ExtractTracker(
128+
process_run=self.process_tracker,
129+
filename="test_extract_filename3.csv",
130+
location_name="Test Location",
131+
location_path="/home/test/extract_dir",
132+
)
133+
134+
extracts = [extract, extract2]
135+
136+
self.process_tracker.bulk_change_extract_status(
137+
extracts=extracts, extract_status="loading"
138+
)
139+
140+
given_result = (
141+
self.session.query(ExtractProcess)
142+
.join(ExtractStatus)
143+
.filter(
144+
ExtractProcess.process_tracking_id
145+
== self.process_tracker.process_tracking_run.process_tracking_id
146+
)
147+
.filter(ExtractStatus.extract_status_name == "loading")
148+
.count()
149+
)
150+
151+
expected_result = 2
152+
153+
self.assertEqual(expected_result, given_result)
154+
115155
def test_change_status_invalid_type(self):
116156
"""
117157
Testing that if an invalid process status type is passed, it will trigger an exception.

0 commit comments

Comments
 (0)