Skip to content

Commit c30b221

Browse files
author
Alex Meadows
committed
process_tracker_python-124 Add Extract Tracker finder
✨ Added ability to recreate ExtractTracker object based on extract_id Like with ProcessTracker, ExtractTracker can now re-instantiate objects that are in the middle of processing and can continue to be processed. Closes #124
1 parent a80e9ca commit c30b221

File tree

3 files changed

+213
-95
lines changed

3 files changed

+213
-95
lines changed

process_tracker/extract_tracker.py

Lines changed: 157 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,22 @@
1919
ExtractProcess,
2020
ExtractStatus,
2121
)
22+
from process_tracker.models.source import DatasetType
2223

2324

2425
class ExtractTracker:
2526
def __init__(
2627
self,
2728
process_run,
28-
filename,
29+
filename=None,
2930
location=None,
3031
location_name=None,
3132
location_path=None,
3233
status=None,
3334
compression_type=None,
3435
filetype=None,
3536
config_location=None,
37+
extract_id=None,
3638
):
3739
"""
3840
ExtractTracker is the primary engine for tracking data extracts
@@ -55,6 +57,9 @@ def __init__(
5557
:type filetype: string
5658
:param config_location: Optional location for the process_tracker configuration file.
5759
:type config_location: string
60+
:param extract_id: If trying to work with a specific extract that's in process, provide the id and it will be
61+
reconstructed.
62+
:type extract_id: int
5863
"""
5964
log_level = SettingsManager(
6065
config_location=config_location
@@ -68,117 +73,133 @@ def __init__(
6873
self.data_store = self.process_run.data_store
6974
self.session = self.process_run.session
7075

71-
self.filename = filename
72-
73-
if location is not None:
74-
self.logger.info("Location object provided.")
75-
self.location = location
76-
elif location_path is not None:
77-
self.logger.info("Location path provided. Creating Location object.")
78-
self.location = LocationTracker(
79-
location_name=location_name,
80-
location_path=location_path,
81-
data_store=self.data_store,
76+
# Getting all status types in the event there are custom status types added later.
77+
self.extract_status_types = self.get_extract_status_types()
78+
79+
# For specific status types, need to retrieve their ids to be used for those status types' logic.
80+
81+
self.extract_status_initializing = self.extract_status_types["initializing"]
82+
self.extract_status_ready = self.extract_status_types["ready"]
83+
self.extract_status_loading = self.extract_status_types["loading"]
84+
self.extract_status_loaded = self.extract_status_types["loaded"]
85+
self.extract_status_archived = self.extract_status_types["archived"]
86+
self.extract_status_deleted = self.extract_status_types["deleted"]
87+
self.extract_status_error = self.extract_status_types["error"]
88+
89+
if extract_id is not None:
90+
self.logger.info("Extract id provided. Attempting to reconstruct.")
91+
92+
extract = self.data_store.get_or_create_item(
93+
model=Extract, extract_id=extract_id, create=False
8294
)
83-
else:
84-
raise Exception("A location object or location_path must be provided.")
95+
self.filename = extract.extract_filename
96+
self.location = extract.locations
97+
self.compression_type = extract.compression_type
98+
if self.compression_type is None:
99+
self.compression_type_id = None
100+
else:
101+
self.compression_type_id = self.compression_type.compression_type_id
102+
self.filetype = extract.extract_filetype
103+
self.extract = extract
104+
self.full_filename = self.get_full_filename()
105+
self.dataset_types = self.get_dataset_types()
106+
self.extract_process = self.retrieve_extract_process()
85107

86-
if compression_type is not None:
87-
self.logger.info("Finding compression type.")
88-
try:
89-
self.compression_type = self.data_store.get_or_create_item(
90-
model=ExtractCompressionType,
91-
create=False,
92-
extract_compression_type=compression_type,
93-
)
94-
except Exception:
95-
error_msg = "%s is not a valid compression type." % compression_type
108+
else:
109+
if filename is None:
110+
error_msg = "Filename must be provided."
96111
self.logger.error(error_msg)
97112
raise Exception(error_msg)
98113

99-
self.compression_type_id = self.compression_type.extract_compression_type_id
100-
else:
101-
self.compression_type_id = None
114+
self.filename = filename
115+
116+
if location is not None:
117+
self.logger.info("Location object provided.")
118+
self.location = location
119+
elif location_path is not None:
120+
self.logger.info("Location path provided. Creating Location object.")
121+
self.location = LocationTracker(
122+
location_name=location_name,
123+
location_path=location_path,
124+
data_store=self.data_store,
125+
)
126+
else:
127+
raise Exception("A location object or location_path must be provided.")
128+
129+
if compression_type is not None:
130+
self.logger.info("Finding compression type.")
131+
try:
132+
self.compression_type = self.data_store.get_or_create_item(
133+
model=ExtractCompressionType,
134+
create=False,
135+
extract_compression_type=compression_type,
136+
)
137+
except Exception:
138+
error_msg = "%s is not a valid compression type." % compression_type
139+
self.logger.error(error_msg)
140+
raise Exception(error_msg)
102141

103-
if filetype is not None:
104-
self.logger.info("File type provided. Verifying it is a valid filetype.")
105-
try:
106-
self.filetype = self.data_store.get_or_create_item(
107-
model=ExtractFileType, create=False, extract_filetype=filetype
142+
self.compression_type_id = (
143+
self.compression_type.extract_compression_type_id
108144
)
109-
except Exception:
110-
error_msg = "%s is not a valid file type." % filetype
111-
self.logger.error(error_msg)
112-
raise Exception(error_msg)
113-
else:
114-
# Need to try to determine the filetype based on the extension of the filename.
115-
file_extension = os.path.splitext(filename)[1]
116-
file_extension = file_extension.replace(".", "")
117-
self.logger.info(
118-
"Trying to find record for file extension: %s" % file_extension
119-
)
120-
self.filetype = self.data_store.get_or_create_item(
121-
model=ExtractFileType,
122-
create=False,
123-
extract_filetype_code=file_extension,
124-
)
145+
else:
146+
self.compression_type_id = None
125147

126-
self.logger.info("Registering extract.")
148+
if filetype is not None:
149+
self.logger.info(
150+
"File type provided. Verifying it is a valid filetype."
151+
)
152+
try:
153+
self.filetype = self.data_store.get_or_create_item(
154+
model=ExtractFileType, create=False, extract_filetype=filetype
155+
)
156+
except Exception:
157+
error_msg = "%s is not a valid file type." % filetype
158+
self.logger.error(error_msg)
159+
raise Exception(error_msg)
160+
else:
161+
# Need to try to determine the filetype based on the extension of the filename.
162+
file_extension = os.path.splitext(filename)[1]
163+
file_extension = file_extension.replace(".", "")
164+
self.logger.info(
165+
"Trying to find record for file extension: %s" % file_extension
166+
)
167+
self.filetype = self.data_store.get_or_create_item(
168+
model=ExtractFileType,
169+
create=False,
170+
extract_filetype_code=file_extension,
171+
)
127172

128-
self.extract = self.data_store.get_or_create_item(
129-
model=Extract,
130-
extract_filename=filename,
131-
extract_location_id=self.location.location.location_id,
132-
extract_compression_type_id=self.compression_type_id,
133-
extract_filetype_id=self.filetype.extract_filetype_id,
134-
)
173+
self.logger.info("Registering extract.")
135174

136-
if location_path is not None:
137-
self.logger.info(
138-
"Location path was provided so building file path from it."
175+
self.extract = self.data_store.get_or_create_item(
176+
model=Extract,
177+
extract_filename=filename,
178+
extract_location_id=self.location.location.location_id,
179+
extract_compression_type_id=self.compression_type_id,
180+
extract_filetype_id=self.filetype.extract_filetype_id,
139181
)
140182

141-
self.full_filename = str(Path(location_path).joinpath(filename))
142-
else:
143-
self.logger.info("Location provided so building file path from it.")
183+
self.full_filename = self.get_full_filename(location_path=location_path)
144184

145-
self.full_filename = str(
146-
Path(self.location.location_path).joinpath(
147-
self.extract.extract_filename
185+
if self.process_run.dataset_types is not None:
186+
self.logger.info("Associating dataset type(s) with extract.")
187+
self.dataset_types = self.register_extract_dataset_types(
188+
dataset_types=self.process_run.dataset_types
148189
)
149-
)
190+
else:
191+
self.dataset_types = None
150192

151-
if self.process_run.dataset_types is not None:
152-
self.logger.info("Associating dataset type(s) with extract.")
153-
self.dataset_types = self.register_extract_dataset_types(
154-
dataset_types=self.process_run.dataset_types
155-
)
156-
else:
157-
self.dataset_types = None
158-
159-
# Getting all status types in the event there are custom status types added later.
160-
self.extract_status_types = self.get_extract_status_types()
161-
162-
# For specific status types, need to retrieve their ids to be used for those status types' logic.
163-
164-
self.extract_status_initializing = self.extract_status_types["initializing"]
165-
self.extract_status_ready = self.extract_status_types["ready"]
166-
self.extract_status_loading = self.extract_status_types["loading"]
167-
self.extract_status_loaded = self.extract_status_types["loaded"]
168-
self.extract_status_archived = self.extract_status_types["archived"]
169-
self.extract_status_deleted = self.extract_status_types["deleted"]
170-
self.extract_status_error = self.extract_status_types["error"]
193+
self.extract_process = self.retrieve_extract_process()
171194

172-
self.extract_process = self.retrieve_extract_process()
195+
if status is not None:
196+
self.logger.info("Status was provided by user.")
197+
self.change_extract_status(new_status=status)
198+
else:
199+
self.logger.info("Status was not provided. Initializing.")
200+
self.extract.extract_status_id = self.extract_status_initializing
173201

174-
if status is not None:
175-
self.logger.info("Status was provided by user.")
176-
self.change_extract_status(new_status=status)
177-
else:
178-
self.logger.info("Status was not provided. Initializing.")
179-
self.extract.extract_status_id = self.extract_status_initializing
180-
181-
self.session.commit()
202+
self.session.commit()
182203

183204
def add_dependency(self, dependency_type, dependency):
184205
"""
@@ -330,6 +351,21 @@ def extract_dependency_check(self, extracts=None):
330351
else:
331352
return False
332353

354+
def get_dataset_types(self):
355+
"""
356+
Get list of dataset types associated to extract and return list.
357+
:return:
358+
"""
359+
dataset_types = list()
360+
361+
for type in self.extract.extract_dataset_types:
362+
dataset_type = self.data_store.get_or_create_item(
363+
model=DatasetType, dataset_type_id=type.dataset_type_id
364+
)
365+
dataset_types.append(dataset_type)
366+
367+
return dataset_types
368+
333369
def get_extract_status_types(self):
334370
"""
335371
Get list of process status types and return dictionary.
@@ -344,6 +380,32 @@ def get_extract_status_types(self):
344380

345381
return status_types
346382

383+
def get_full_filename(self, location_path=None):
384+
"""
385+
Build full filepath to file based on location's path and the filename.
386+
:param location_path: If provided, will use for filepath, otherwise will attempt to obtain
387+
from extract location.
388+
:type location_path: str
389+
:return: full filepath as string
390+
"""
391+
392+
if location_path is not None:
393+
self.logger.info(
394+
"Location path was provided so building file path from it."
395+
)
396+
397+
full_filename = str(Path(location_path).joinpath(self.filename))
398+
else:
399+
self.logger.info("Location provided so building file path from it.")
400+
401+
full_filename = str(
402+
Path(self.location.location_path).joinpath(
403+
self.extract.extract_filename
404+
)
405+
)
406+
407+
return full_filename
408+
347409
def register_extract_dataset_types(self, dataset_types):
348410
"""
349411
For the provided dataset types from process_run instance, associate with given Extract instance.

process_tracker/models/extract.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ class Extract(Base):
129129
nullable=True,
130130
)
131131

132+
compression_type = relationship("ExtractCompressionType")
133+
extract_filetype = relationship("ExtractFileType")
132134
extract_dataset_types = relationship(
133135
"ExtractDatasetType",
134136
back_populates="dataset_type_extracts",

tests/test_extract_tracker.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,3 +606,57 @@ def test_set_file_type_invalid(self):
606606
)
607607

608608
self.assertTrue("zap is not a valid file type" in str(context.exception))
609+
610+
def test_extract_tracker_with_extract_id(self):
611+
"""
612+
Testing that when providing a extract_id to ExtractTracker, the instance is returned instead of being created.
613+
:return:
614+
"""
615+
616+
extract_id = self.extract.extract.extract_id
617+
618+
new_extract_tracker = ExtractTracker(
619+
extract_id=extract_id, process_run=self.process_run
620+
)
621+
622+
expected_filename = self.extract.extract.extract_filename
623+
given_filename = new_extract_tracker.extract.extract_filename
624+
625+
expected_location = self.extract.location.location_name
626+
given_location = new_extract_tracker.location.location_name
627+
628+
expected_compression_type = self.extract.compression_type_id
629+
given_compression_type = new_extract_tracker.compression_type_id
630+
631+
expected_filetype = self.extract.extract.extract_filetype
632+
given_filetype = new_extract_tracker.extract.extract_filetype
633+
634+
expected_full_filename = self.extract.full_filename
635+
given_full_filename = new_extract_tracker.full_filename
636+
637+
expected_dataset_types = self.extract.dataset_types
638+
given_dataset_types = new_extract_tracker.dataset_types
639+
640+
expected_process = self.extract.extract_process.process_tracking_id
641+
given_process = new_extract_tracker.extract_process.process_tracking_id
642+
643+
self.assertEqual(expected_filename, given_filename)
644+
self.assertEqual(expected_location, given_location)
645+
self.assertEqual(expected_compression_type, given_compression_type)
646+
self.assertEqual(expected_filetype, given_filetype)
647+
self.assertEqual(expected_full_filename, given_full_filename)
648+
self.assertEqual(expected_dataset_types, given_dataset_types)
649+
self.assertEqual(expected_process, given_process)
650+
651+
def test_ensure_nulls_caught_on_instantiation(self):
652+
"""
653+
With the adding of the ability of having a extract_id we have to allow for filename to
654+
be nullable. If ExtractTracker is instantiated without filename provided an error should
655+
be raised.
656+
:return:
657+
"""
658+
with self.assertRaises(Exception) as context:
659+
660+
ExtractTracker(process_run=self.extract.process_run)
661+
662+
return self.assertTrue("Filename must be provided." in str(context.exception))

0 commit comments

Comments
 (0)