Skip to content

Commit 18af916

Browse files
committed
process_tracker_python-50 Extract Finders By Given Status
🐛 Hit connection threshold to database ✨ Extract finders can now search for any status type. Tweaked ExtractTracker to use the data store and session objects from process_tracker, since an extract tracker must have a process tracker associated with it. This stopped the connection threshold problem as the same connection can be used until the process tracker changes to 'completed' or 'failed' status, which is when the session is closed. Finder methods now have a default status of 'ready' (which should be the common status required) but any status type can be searched for, which allows custom status types to be leveraged for extracts. Closes #50
1 parent b149270 commit 18af916

File tree

6 files changed

+190
-33
lines changed

6 files changed

+190
-33
lines changed

.gitignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,4 +173,3 @@ pip-selfcheck.json
173173
.idea/misc.xml
174174
.idea/modules.xml
175175
.idea/process_tracker_python.iml
176-
/tests/test_process_tracker.py

process_tracker/extract_tracker.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,11 @@ def __init__(
4949
self.logger = logging.getLogger(__name__)
5050
self.logger.setLevel(config["DEFAULT"]["log_level"])
5151

52-
self.data_store = DataStore()
53-
self.session = self.data_store.session
5452
self.process_run = process_run
5553

54+
self.data_store = self.process_run.data_store
55+
self.session = self.process_run.session
56+
5657
self.filename = filename
5758

5859
if location is not None:

process_tracker/models/extract.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ class Extract(Base):
6060
extract_load_record_count = Column(Integer, nullable=True)
6161

6262
extract_process = relationship("ExtractProcess", back_populates="process_extracts")
63+
extract_status = relationship("ExtractStatus", foreign_keys=[extract_status_id])
6364
locations = relationship("Location", foreign_keys=[extract_location_id])
6465

6566
def __repr__(self):

process_tracker/process_tracker.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -150,22 +150,29 @@ def change_run_status(self, new_status, end_date=None):
150150

151151
self.session.commit()
152152

153+
if (
154+
self.process_status_types[new_status] == self.process_status_complete
155+
) or (self.process_status_types[new_status] == self.process_status_failed):
156+
self.session.close()
157+
153158
else:
154159
raise Exception("The provided status type %s is invalid." % new_status)
155160

156-
def find_ready_extracts_by_filename(self, filename):
161+
def find_extracts_by_filename(self, filename, status="ready"):
157162
"""
158163
For the given filename, or filename part, find all matching extracts that are ready for processing.
159-
160-
:param filename:
161-
:return:
164+
:param filename: Filename or part of filename.
165+
:type filename: str
166+
:param status: Name of the status type for files being searched. Default 'ready'.
167+
:type status: str
168+
:return: List of Extract SQLAlchemy objects.
162169
"""
163170

164171
process_files = (
165172
self.session.query(Extract)
166173
.join(ExtractStatus)
167174
.filter(Extract.extract_filename.like("%" + filename + "%"))
168-
.filter(ExtractStatus.extract_status_name == "ready")
175+
.filter(ExtractStatus.extract_status_name == status)
169176
.order_by(Extract.extract_registration_date_time)
170177
.order_by(Extract.extract_id)
171178
.all()
@@ -175,22 +182,26 @@ def find_ready_extracts_by_filename(self, filename):
175182

176183
return process_files
177184

178-
def find_ready_extracts_by_location(self, location_name=None, location_path=None):
185+
def find_extracts_by_location(
186+
self, location_name=None, location_path=None, status="ready"
187+
):
179188
"""
180189
For the given location path or location name, find all matching extracts that are ready for processing
181190
:param location_name: The name of the location
182191
:type location_name: str
183192
:param location_path: The path of the location
184193
:type location_path: str
185-
:return: List of extract files that are in 'ready' state'.
194+
:param status: Name of the status type for files being searched. Default 'ready'.
195+
:type status: str
196+
:return: List of Extract SQLAlchemy objects.
186197
"""
187198

188199
if location_path is not None:
189200
process_files = (
190201
self.session.query(Extract)
191202
.join(Location)
192203
.join(ExtractStatus)
193-
.filter(ExtractStatus.extract_status_name == "ready")
204+
.filter(ExtractStatus.extract_status_name == status)
194205
.filter(Location.location_path == location_path)
195206
.order_by(Extract.extract_registration_date_time)
196207
.order_by(Extract.extract_id)
@@ -201,7 +212,7 @@ def find_ready_extracts_by_location(self, location_name=None, location_path=None
201212
self.session.query(Extract)
202213
.join(Location)
203214
.join(ExtractStatus)
204-
.filter(ExtractStatus.extract_status_name == "ready")
215+
.filter(ExtractStatus.extract_status_name == status)
205216
.filter(Location.location_name == location_name)
206217
.order_by(Extract.extract_registration_date_time)
207218
.order_by(Extract.extract_id)
@@ -218,10 +229,14 @@ def find_ready_extracts_by_location(self, location_name=None, location_path=None
218229
self.logger.info("Returning extract files by location.")
219230
return process_files
220231

221-
def find_ready_extracts_by_process(self, extract_process_name):
232+
def find_extracts_by_process(self, extract_process_name, status="ready"):
222233
"""
223234
For the given named process, find the extracts that are ready for processing.
224-
:return: List of OS specific filepaths with filenames.
235+
:param extract_process_name: Name of the process that is associated with extracts
236+
:type extract_process_name: str
237+
:param status: Name of the status type for files being searched. Default 'ready'.
238+
:type status: str
239+
:return: List of Extract SQLAlchemy objects.
225240
"""
226241

227242
process_files = (
@@ -238,7 +253,7 @@ def find_ready_extracts_by_process(self, extract_process_name):
238253
.join(Process)
239254
.filter(
240255
Process.process_name == extract_process_name,
241-
ExtractStatus.extract_status_name == "ready",
256+
ExtractStatus.extract_status_name == status,
242257
)
243258
.order_by(Extract.extract_registration_date_time)
244259
.all()

tests/test_extract_tracker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ def tearDownClass(cls):
4646
cls.session.query(ProcessDependency).delete()
4747
cls.session.query(Process).delete()
4848
cls.session.commit()
49+
cls.session.close()
4950

5051
def setUp(self):
5152
"""

0 commit comments

Comments
 (0)