Skip to content

Commit f84a265

Browse files
committed
🗃️ Built initial version of extract tracking.
✅ Wrote initial framework for unit testing extract tracking.
1 parent b8b0ded commit f84a265

File tree

5 files changed

+345
-79
lines changed

5 files changed

+345
-79
lines changed

models/extract.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
# SQLAlchemy Models
22
# Models for Extract (Data) entities
33

4-
from sqlalchemy import Column, ForeignKey, Integer, Sequence, String
4+
from datetime import datetime
5+
6+
from sqlalchemy import Column, DateTime, ForeignKey, Integer, Sequence, String
57
from sqlalchemy.orm import relationship
68

79
from models.model_base import Base
@@ -14,7 +16,7 @@ class ExtractStatus(Base):
1416
extract_status_id = Column(Integer, Sequence('extract_status_lkup_status_id_seq'), primary_key=True)
1517
extract_status_name = Column(String(75), nullable=False, unique=True)
1618

17-
extracts = relationship("Extract")
19+
extracts = relationship("ExtractProcess")
1820

1921

2022
class Extract(Base):
@@ -25,10 +27,26 @@ class Extract(Base):
2527
extract_source_id = Column(Integer, ForeignKey("source_lkup.source_id"))
2628
extract_filename = Column(String(750), nullable=False, unique=True)
2729
extract_location_id = Column(Integer, ForeignKey('location_lkup.location_id'))
28-
extract_process_run_id = Column(Integer, ForeignKey('process_tracking.process_tracking_id'))
30+
# extract_process_run_id = Column(Integer, ForeignKey('process_tracking.process_tracking_id'))
2931
extract_status_id = Column(Integer, ForeignKey('extract_status_lkup.extract_status_id'))
32+
extract_registration_date_time = Column(DateTime, nullable=False, default=datetime.now())
33+
# extract_load_date_time = Column(DateTime, nullable=False, default=default_date)
34+
# extract_archive_date_time = Column(DateTime, nullable=False, default=default_date)
35+
36+
extract_process = relationship("ExtractProcess", back_populates='process_extracts')
37+
38+
39+
class ExtractProcess(Base):
40+
41+
__tablename__ = "extract_process_tracking"
42+
43+
extract_tracking_id = Column(Integer, ForeignKey("extract_tracking.extract_id"), primary_key=True)
44+
process_tracking_id = Column(Integer, ForeignKey("process_tracking.process_tracking_id"), primary_key=True)
45+
extract_process_status_id = Column(Integer, ForeignKey("extract_status_lkup.extract_status_id"))
46+
extract_process_event_date_time = Column(DateTime, nullable=False, default=datetime.now())
3047

31-
process_tracking = relationship("ProcessTracking")
48+
process_extracts = relationship('Extract', foreign_keys=[extract_tracking_id])
49+
extract_processes = relationship('ProcessTracking', foreign_keys=[process_tracking_id])
3250

3351

3452
class Location(Base):

models/process.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ class ProcessTracking(Base):
136136
is_latest_run = Column(Boolean, nullable=False, default=False)
137137

138138
errors = relationship("ErrorTracking", back_populates="error_tracking")
139-
extracts = relationship("Extract", back_populates="process_tracking")
139+
extracts = relationship("ExtractProcess", back_populates="extract_processes")
140140
process = relationship("Process", back_populates="process_tracking")
141141

142142
def __repr__(self):
Lines changed: 122 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,135 @@
11
# Extract Tracking
22
# Used in the creation and editing of extract records. Used in conjunction with process tracking.
3+
from datetime import datetime
4+
from os.path import basename, normpath
35

4-
from models.extract import Extract, Location
5-
from models.source import Source
6+
from process_tracker import session
7+
from process_tracker.data_store import DataStore
8+
9+
from models.extract import Extract, ExtractProcess, ExtractStatus, Location
610

711

812
class ExtractTracker:
913

10-
def __init__(self, source_name, filename, location, process_name):
14+
def __init__(self, process_run, filename, location_path, location_name=None):
1115
"""
1216
ExtractTracker is the primary engine for tracking data extracts
13-
:param source_name: Name of the source where data extract is from.
14-
:type source_name: string
17+
:param process_run: The process object working with extracts (either creating or consuming)
18+
:type process_run: ProcessTracker object
1519
:param filename: Name of the data extract file.
1620
:type filename: string
17-
:param location: Location (filepath, s3 bucket, etc.) where the file is stored
18-
:type location: string
19-
:param process_name: Name of the process that produced the data extract.
20-
:type process_name: string
21+
:param location_path: Location (filepath, s3 bucket, etc.) where the file is stored
22+
:type location_path: string
23+
:param location_name: Optional parameter to provide a specific name for the location. If not provided, will use
24+
the last directory in the path as the location name. If type of location can be
25+
determined (i.e. S3 bucket), the location type will be prepended.
26+
:type location_name: string
27+
"""
28+
self.data_store = DataStore()
29+
self.process_run = process_run
30+
31+
if location_name is None:
32+
location_name = self.derive_location_name(location_path=location_path)
33+
34+
self.source = self.process_run.source
35+
self.filename = filename
36+
37+
self.location = self.data_store.get_or_create(model=Location
38+
, location_name=location_name
39+
, location_path=location_path)
40+
41+
self.extract = self.data_store.get_or_create(model=Extract
42+
, extract_filename=filename
43+
, extract_location_id=self.location.location_id
44+
, extract_source_id=self.source.source_id)
45+
46+
self.extract_process = self.retrieve_extract_process()
47+
48+
# Getting all status types in the event there are custom status types added later.
49+
self.extract_status_types = self.get_extract_status_types()
50+
51+
# For specific status types, need to retrieve their ids to be used for those status types' logic.
52+
53+
self.extract_status_initializing = self.extract_status_types['initializing']
54+
self.extract_status_ready = self.extract_status_types['ready']
55+
self.extract_status_loading = self.extract_status_types['loading']
56+
self.extract_status_loaded = self.extract_status_types['loaded']
57+
self.extract_status_archived = self.extract_status_types['archived']
58+
self.extract_status_deleted = self.extract_status_types['deleted']
59+
self.extract_status_error = self.extract_status_types['error']
60+
61+
def change_extract_status(self, new_status):
62+
"""
63+
Change an extract record status.
64+
:return:
65+
"""
66+
status_date = datetime.now()
67+
new_status = self.extract_status_types[new_status]
68+
69+
if self.extract_status_types[new_status]:
70+
self.extract.extract_status_id = new_status
71+
72+
self.extract_process.extract_status_id = new_status
73+
self.extract_process.extract_process_event_date_time = status_date
74+
75+
session.commit()
76+
77+
else:
78+
raise Exception('%s is not a valid extract status type. '
79+
'Please add the status to extract_status_lkup' % new_status)
80+
81+
@staticmethod
82+
def derive_location_name(location_path):
2183
"""
84+
If location name is not provided, attempt to derive name from path.
85+
:param location_path: The data extract file location path.
86+
:return:
87+
"""
88+
# Idea is to generalize things like grabbing the last directory name in the path,
89+
# what type of path is it (normal, s3, etc.)
90+
91+
location_path = location_path.lower() # Don't care about casing.
92+
93+
if "s3" in location_path:
94+
# If the path is an S3 Bucket, prefix to name.
95+
96+
location_prefix = "s3"
97+
else:
98+
location_prefix = ""
99+
100+
location_name = location_prefix + " - "
101+
102+
location_name += basename(normpath(location_path))
103+
104+
return location_name
105+
106+
@staticmethod
107+
def get_extract_status_types():
108+
"""
109+
Get list of process status types and return dictionary.
110+
:return:
111+
"""
112+
status_types = {}
113+
114+
for record in session.query(ExtractStatus):
115+
status_types[record.extract_status_name] = record.extract_status_id
116+
117+
return status_types
118+
119+
def retrieve_extract_process(self):
120+
"""
121+
Create and initialize or retrieve the process/extract relationship.
122+
:return:
123+
"""
124+
125+
extract_process = self.data_store.get_or_create(model=ExtractProcess
126+
, extract_tracking_id=self.extract.extract_id
127+
, process_tracking_id=self.process_run.process_tracking_id)
128+
129+
# Only need to set to 'initializing' when it's the first time a process run is trying to work with files.
130+
if extract_process.extract_process_status_id is None:
131+
132+
extract_process.extract_process_status_id = self.extract_status_initializing
133+
session.commit()
22134

135+
return extract_process

0 commit comments

Comments
 (0)