Skip to content

Commit 499dcdf

Browse files
committed
process_tracker_python-30 Extract Lookups should return Extract objects
All the extract file finders now return extract objects so they can be modified as they are used. A helper method was also added that provides the full filename of a given extract (called full_filepath). Also realized that I missed adding string representations of a bunch of the data models. Closes #30
2 parents 78fbc7a + d4e569e commit 499dcdf

File tree

5 files changed

+50
-10
lines changed

5 files changed

+50
-10
lines changed

process_tracker/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
from process_tracker.extract_tracker import ExtractTracker
22
from process_tracker.process_tracker import ProcessTracker
33

4-
__version__ = '0.1.6'
4+
__version__ = '0.2.0'

process_tracker/data_store.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,21 @@ def get_or_create_item(self, model, create=True, **kwargs):
6262
:param kwargs: The filter criteria required to find the specific entity instance.
6363
:return:
6464
"""
65-
65+
self.logger.debug('Attempting to obtain record.')
6666
instance = self.session.query(model).filter_by(**kwargs).first()
6767

6868
if instance is None:
69+
70+
self.logger.debug('Record did not exist.')
71+
6972
if create:
70-
self.logger.info('creating instance')
73+
74+
self.logger.info('Creating instance.')
75+
7176
instance = model(**kwargs)
77+
7278
try:
79+
self.logger.debug('Committing instance.')
7380
self.session.add(instance)
7481
except Exception as e:
7582
self.logger.error(e)
@@ -95,7 +102,7 @@ def initialize_data_store(self, overwrite=False):
95102
self.logger.warn('ALERT - DATA STORE TO BE OVERWRITTEN - ALL DATA WILL BE LOST')
96103

97104
for table in Base.metadata.table_names():
98-
self.logger.debug('Table will be deleted: %s' % table)
105+
self.logger.info('Table will be deleted: %s' % table)
99106
table.drop(self.engine)
100107

101108
version = None
@@ -330,7 +337,7 @@ def verify_and_connect_to_data_store(self):
330337
Based on environment variables, create the data store connection engine.
331338
:return:
332339
"""
333-
340+
self.logger.info('Obtaining application configuration.')
334341
config = SettingsManager(config_location=self.config_location).config
335342

336343
data_store_type = config['DEFAULT']['data_store_type']
@@ -371,8 +378,13 @@ def verify_and_connect_to_data_store(self):
371378
engine = ''
372379
meta = ''
373380
session = ''
381+
self.logger.info('Data store is supported.')
374382

375383
if data_store_type in relational_stores:
384+
385+
self.logger.info('Data store is relational.')
386+
self.logger.info('Data store is %s' % data_store_type)
387+
376388
if data_store_type == 'postgresql' \
377389
or data_store_type == 'oracle'\
378390
or data_store_type == 'snowflake':
@@ -398,8 +410,11 @@ def verify_and_connect_to_data_store(self):
398410
, data_store_port))
399411

400412
if database_exists(engine.url):
413+
401414
self.logger.info("Data store exists. Continuing to work.")
415+
402416
else:
417+
403418
self.logger.error('Data store does not exist. Please create and try again.')
404419
raise Exception('Data store does not exist. Please create and try again.')
405420

process_tracker/extract_tracker.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,27 @@ def __init__(self, process_run, filename, location=None, location_name=None, loc
4343
self.filename = filename
4444

4545
if location is not None:
46+
self.logger.info('Location object provided.')
4647
self.location = location
4748
elif location_path is not None:
49+
self.logger.info('Location path provided. Creating Location object.')
4850
self.location = LocationTracker(location_name=location_name, location_path=location_path)
4951
else:
5052
raise Exception('A location object or location_path must be provided.')
5153

54+
self.logger.info('Registering extract.')
55+
5256
self.extract = self.data_store.get_or_create_item(model=Extract
5357
, extract_filename=filename
5458
, extract_location_id=self.location.location.location_id)
5559

5660
if location_path is not None:
61+
self.logger.info('Location path was provided so building file path from it.')
62+
5763
self.full_filename = join(location_path, filename)
5864
else:
65+
self.logger.info('Location provided so building file path from it.')
66+
5967
self.full_filename = join(self.location.location_path, self.extract.extract_filename)
6068

6169
# Getting all status types in the event there are custom status types added later.
@@ -74,8 +82,10 @@ def __init__(self, process_run, filename, location=None, location_name=None, loc
7482
self.extract_process = self.retrieve_extract_process()
7583

7684
if status is not None:
85+
self.logger.info('Status was provided by user.')
7786
self.change_extract_status(new_status=status)
7887
else:
88+
self.logger.info('Status was not provided. Initializing.')
7989
self.extract.extract_status_id = self.extract_status_initializing
8090

8191
self.session.commit()
@@ -85,7 +95,6 @@ def change_extract_status(self, new_status):
8595
Change an extract record status.
8696
:return:
8797
"""
88-
8998
status_date = datetime.now()
9099
if new_status in self.extract_status_types:
91100
self.logger.info('Setting extract status to %s' % new_status)
@@ -100,6 +109,7 @@ def change_extract_status(self, new_status):
100109
self.session.commit()
101110

102111
else:
112+
self.logger.error('%s is not a valid extract status type.' % new_status)
103113
raise Exception('%s is not a valid extract status type. '
104114
'Please add the status to extract_status_lkup' % new_status)
105115

@@ -108,7 +118,9 @@ def get_extract_status_types(self):
108118
Get list of process status types and return dictionary.
109119
:return:
110120
"""
111-
status_types = {}
121+
self.logger.info('Obtaining extract status types.')
122+
123+
status_types = dict()
112124

113125
for record in self.session.query(ExtractStatus):
114126
status_types[record.extract_status_name] = record.extract_status_id
@@ -121,14 +133,16 @@ def retrieve_extract_process(self):
121133
:return:
122134
"""
123135

136+
self.logger.info('Associating extract to given process.')
137+
124138
extract_process = self.data_store.get_or_create_item(model=ExtractProcess
125139
, extract_tracking_id=self.extract.extract_id
126140
, process_tracking_id=self.process_run.process_tracking_run
127141
.process_tracking_id)
128142

129143
# Only need to set to 'initializing' when it's the first time a process run is trying to work with files.
130144
if extract_process.extract_process_status_id is None:
131-
145+
self.logger.info('Extract process status must also be set. Initializing.')
132146
extract_process.extract_process_status_id = self.extract_status_initializing
133147
self.session.commit()
134148

process_tracker/location_tracker.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,16 @@ def __init__(self, location_path, location_name=None):
2424
self.location_path = location_path.lower()
2525

2626
if location_name is None:
27+
self.logger.info('Location name not provided. Generating.')
2728
self.location_name = self.derive_location_name()
2829
else:
30+
self.logger.info('Using provided location name: %s' % location_name)
2931
self.location_name = location_name
3032

3133
self.location_type = self.derive_location_type()
34+
35+
self.logger.info('Registering extract location.')
36+
3237
self.location = self.data_store.get_or_create_item(model=Location
3338
, location_name=self.location_name
3439
, location_path=location_path
@@ -48,10 +53,11 @@ def derive_location_name(self):
4853

4954
if "s3" in self.location_path:
5055
# If the path is an S3 Bucket, prefix to name.
51-
56+
self.logger.info('Location appears to be s3 related. Setting prefix.')
5257
location_prefix = "s3"
5358

5459
if location_prefix is not None:
60+
self.logger.info('Location prefix provided. Appending to location name.')
5561
location_name = location_prefix + " - "
5662

5763
location_name += basename(normpath(self.location_path))
@@ -66,10 +72,15 @@ def derive_location_type(self):
6672

6773
if "s3" in self.location_path or "s3" in self.location_name:
6874

75+
self.logger.info('Location appears to be s3 related. Setting type to s3.')
76+
6977
location_type = self.data_store.get_or_create_item(model=LocationType
7078
, location_type_name="s3")
7179

7280
else:
81+
82+
self.logger.info('Location did not match special types. Assuming local directory path.')
83+
7384
location_type = self.data_store.get_or_create_item(model=LocationType
7485
, location_type_name="local directory")
7586

tests/test_process_tracker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ def test_register_new_process_run_exception(self):
432432
# Running registration a second time to mimic job being run twice
433433
self.process_tracker.register_new_process_run()
434434

435-
return self.assertTrue('The process Testing Process Tracking Initialization '
435+
return self.assertTrue('Process Testing Process Tracking Initialization '
436436
'is currently running.' in str(context.exception))
437437

438438
def test_register_new_process_run_with_previous_run(self):

0 commit comments

Comments
 (0)