Skip to content

Commit 9f264f5

Browse files
committed
Added ExtractDependency Table
Added new table for handling extract dependencies and method to check for dependencies which is used when an extract is attempting to switch to 'loading' status. Started working on #14
1 parent b36cdfa commit 9f264f5

File tree

6 files changed

+63
-3
lines changed

6 files changed

+63
-3
lines changed

Pipfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ snowflake-sqlalchemy = "*"
2121
pymysql = "*"
2222
cx-oracle = "*"
2323
pymssql = "*"
24+
black = "*"
2425

2526
[requires]
2627
python_version = "3.7"

process_tracker/extract_tracker.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22
# Used in the creation and editing of extract records. Used in conjunction with process tracking.
33
from datetime import datetime
44
import logging
5-
import os
65
from os.path import join
76

7+
from sqlalchemy.orm import aliased
88

99
from process_tracker.data_store import DataStore
1010
from process_tracker.location_tracker import LocationTracker
1111
from process_tracker.utilities.settings import SettingsManager
12-
from process_tracker.models.extract import Extract, ExtractProcess, ExtractStatus, Location
12+
from process_tracker.models.extract import Extract, ExtractDependency, ExtractProcess, ExtractStatus
1313

1414

1515
class ExtractTracker:
@@ -97,6 +97,11 @@ def change_extract_status(self, new_status):
9797
"""
9898
status_date = datetime.now()
9999
if new_status in self.extract_status_types:
100+
101+
if new_status == self.extract_status_loading:
102+
103+
self.extract_dependency_check()
104+
100105
self.logger.info('Setting extract status to %s' % new_status)
101106

102107
new_status = self.extract_status_types[new_status]
@@ -113,6 +118,29 @@ def change_extract_status(self, new_status):
113118
raise Exception('%s is not a valid extract status type. '
114119
'Please add the status to extract_status_lkup' % new_status)
115120

121+
def extract_dependency_check(self):
122+
"""
123+
Determine if the extract file has any unloaded dependencies before trying to load the file.
124+
:return:
125+
"""
126+
child_extract = aliased(Extract)
127+
parent_extract = aliased(Extract)
128+
129+
dependency_hold = self.session.query(ExtractDependency) \
130+
.join(parent_extract, ExtractDependency.parent_extract_id == parent_extract.extract_id) \
131+
.join(child_extract, ExtractDependency.child_extract_id == child_extract.extract_id) \
132+
.join(Extract, Extract.extract_id == parent_extract.extract_id) \
133+
.join(ExtractStatus, ExtractStatus.extract_status_id == Extract.extract_status_id) \
134+
.filter(child_extract.extract_id == self.extract.extract_id) \
135+
.filter(ExtractStatus.extract_status_name.in_(('loading', 'initializing', 'ready'))) \
136+
.count()
137+
138+
if dependency_hold > 0:
139+
self.logger.error('Extract files that this extract file is dependent on have not been loaded, are being '
140+
'created, or are in the process of loading.')
141+
raise Exception('Extract files that this extract file is dependent on have not been loaded, are being '
142+
'created, or are in the process of loading.')
143+
116144
def get_extract_status_types(self):
117145
"""
118146
Get list of process status types and return dictionary.

process_tracker/models/extract.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,22 @@ def full_filepath(self):
5050
return join(self.locations.location_path, self.extract_filename)
5151

5252

53+
class ExtractDependency(Base):
54+
55+
__tablename__ = 'extract_dependency'
56+
57+
parent_extract_id = Column(Integer, ForeignKey('extract_tracking.extract_id'), primary_key=True)
58+
child_extract_id = Column(Integer, ForeignKey('extract_tracking.extract_id'), primary_key=True)
59+
60+
child_extract = relationship('Extract', foreign_keys=[child_extract_id])
61+
parent_extract = relationship('Extract', foreign_keys=[parent_extract_id])
62+
63+
def __repr__(self):
64+
65+
return "<ExtractDependency (parent_extract=%s, child_extract=%s)>" % (self.parent_extract_id
66+
, self.child_extract_id)
67+
68+
5369
class ExtractProcess(Base):
5470

5571
__tablename__ = "extract_process_tracking"

process_tracker/models/system.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ class System(Base):
1616

1717
def __repr__(self):
1818

19-
return "<System (system_key=%s)>" % self.system_key
19+
return "<System (system_key=%s)>" % self.system_key

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
],
3232
extras_requires={
3333
'dev': [
34+
'black >= 19.3b0',
3435
'coverage >= "4.0.3',
3536
'coveralls >= 1.7.0',
3637
'moto >= 1.3.8',

tests/test_extract_tracker.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,20 @@ def test_derive_location_name_s3(self):
153153

154154
self.assertEqual(expected_result, given_result)
155155

156+
def test_extract_dependency_check(self):
157+
"""
158+
Testing that if no dependencies are in a state that doesn't stop an extract from being loaded, then the extract
159+
is loaded.
160+
:return:
161+
"""
162+
163+
def test_extract_dependency_check_blocked(self):
164+
"""
165+
Testing that if a dependency is in a state that stops an extract from being loaded, then the extract triggers an
166+
error blocking the file from being processed.
167+
:return:
168+
"""
169+
156170
def test_location_name_provided(self):
157171
"""
158172
Testing that if a location name is provided (like with default extract), one is not created.

0 commit comments

Comments
 (0)