Skip to content

Commit b8b0ded

Browse files
committed
🗃️ Add more robust capabilities to extract_tracking and related tables.
1 parent 1143893 commit b8b0ded

File tree

7 files changed

+71
-19
lines changed

7 files changed

+71
-19
lines changed

models/extract.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,16 @@
77
from models.model_base import Base
88

99

10+
class ExtractStatus(Base):
11+
12+
__tablename__ = "extract_status_lkup"
13+
14+
extract_status_id = Column(Integer, Sequence('extract_status_lkup_status_id_seq'), primary_key=True)
15+
extract_status_name = Column(String(75), nullable=False, unique=True)
16+
17+
extracts = relationship("Extract")
18+
19+
1020
class Extract(Base):
1121

1222
__tablename__ = "extract_tracking"
@@ -15,6 +25,10 @@ class Extract(Base):
1525
extract_source_id = Column(Integer, ForeignKey("source_lkup.source_id"))
1626
extract_filename = Column(String(750), nullable=False, unique=True)
1727
extract_location_id = Column(Integer, ForeignKey('location_lkup.location_id'))
28+
extract_process_run_id = Column(Integer, ForeignKey('process_tracking.process_tracking_id'))
29+
extract_status_id = Column(Integer, ForeignKey('extract_status_lkup.extract_status_id'))
30+
31+
process_tracking = relationship("ProcessTracking")
1832

1933

2034
class Location(Base):

models/process.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
# Models for Process entities
33

44

5-
from sqlalchemy import Column, DateTime, ForeignKey, Integer, Sequence, String
5+
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, Sequence, String
66
from sqlalchemy.orm import relationship
77

88
from models.model_base import default_date, Base
9+
from models.extract import Extract
910

1011

1112
class ErrorType(Base):
@@ -132,9 +133,11 @@ class ProcessTracking(Base):
132133
process_run_end_date_time = Column(DateTime, nullable=True)
133134
process_run_record_count = Column(Integer, nullable=False, default=0)
134135
process_run_actor_id = Column(Integer, ForeignKey('actor_lkup.actor_id'))
136+
is_latest_run = Column(Boolean, nullable=False, default=False)
135137

136-
process = relationship("Process", back_populates="process_tracking")
137138
errors = relationship("ErrorTracking", back_populates="error_tracking")
139+
extracts = relationship("Extract", back_populates="process_tracking")
140+
process = relationship("Process", back_populates="process_tracking")
138141

139142
def __repr__(self):
140143

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Extract Tracking
2+
# Used in the creation and editing of extract records. Used in conjunction with process tracking.
3+
4+
from models.extract import Extract, Location
5+
from models.source import Source
6+
7+
8+
class ExtractTracker:
9+
10+
def __init__(self, source_name, filename, location, process_name):
11+
"""
12+
ExtractTracker is the primary engine for tracking data extracts
13+
:param source_name: Name of the source where data extract is from.
14+
:type source_name: string
15+
:param filename: Name of the data extract file.
16+
:type filename: string
17+
:param location: Location (filepath, s3 bucket, etc.) where the file is stored
18+
:type location: string
19+
:param process_name: Name of the process that produced the data extract.
20+
:type process_name: string
21+
"""
22+

process_tracker/process.py

Lines changed: 0 additions & 11 deletions
This file was deleted.

process_tracker/process_tracking.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
# Process Tracking
2+
# Used in the creation and editing of process tracking records.
3+
14
from datetime import datetime
25
import logging
36

@@ -13,7 +16,7 @@ class ProcessTracker:
1316

1417
def __init__(self, process_name, process_type, actor_name, tool_name, source_name):
1518
"""
16-
ProcessTracker is the primary enginer for tracking data integration processes.
19+
ProcessTracker is the primary engine for tracking data integration processes.
1720
:param process_name: Name of the process being tracked.
1821
:param actor_name: Name of the person or environment runnning the process.
1922
:param tool_name: Name of the tool used to run the process.
@@ -58,6 +61,7 @@ def register_new_process_run(self):
5861
# Must validate that the process is not currently running.
5962

6063
if last_run.process_status_id != self.process_status_running:
64+
last_run.is_latest_run = False
6165
new_run_flag = True
6266
new_run_id = last_run.process_run_id + 1
6367
else:
@@ -68,7 +72,8 @@ def register_new_process_run(self):
6872
, process_status_id=self.process_status_running
6973
, process_run_id=new_run_id
7074
, process_run_start_date_time=datetime.now()
71-
, process_run_actor_id=self.actor.actor_id)
75+
, process_run_actor_id=self.actor.actor_id
76+
, is_latest_run = True)
7277

7378
session.add(new_run)
7479
session.commit()

tests/test_extract_tracking.py

Whitespace-only changes.

tests/test_process_tracking.py

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ class TestProcessTracking(unittest.TestCase):
1515
@classmethod
1616
def setUpClass(cls):
1717
cls.process_tracker = ProcessTracker(process_name='Testing Process Tracking Initialization'
18-
, process_type='Extract'
19-
, actor_name='UnitTesting'
20-
, tool_name='Spark'
21-
, source_name='Unittests')
18+
, process_type='Extract'
19+
, actor_name='UnitTesting'
20+
, tool_name='Spark'
21+
, source_name='Unittests')
2222

2323
@classmethod
2424
def tearDownClass(cls):
@@ -93,6 +93,25 @@ def test_register_new_process_run_exception(self):
9393
return self.assertTrue('The process Testing Process Tracking Initialization '
9494
'is currently running.' in str(context.exception))
9595

96+
def test_register_new_process_run_with_previous_run(self):
97+
"""
98+
Testing that a new run record is created if there is another instance of same process in 'completed' or 'failed'
99+
status. Also flips the is_latest_run flag on previous run to False.
100+
:return:
101+
"""
102+
103+
self.process_tracker.change_run_status(new_status='completed')
104+
self.process_tracker.register_new_process_run()
105+
106+
process_runs = session.query(ProcessTracking)\
107+
.filter(ProcessTracking.process_id == self.process_id)\
108+
.order_by(ProcessTracking.process_tracking_id)
109+
110+
given_result = process_runs[0].is_latest_run
111+
expected_result = False
112+
113+
self.assertEqual(given_result, expected_result)
114+
96115
def test_change_run_status_complete(self):
97116
"""
98117
Testing that when changing the run status from 'running' to 'complete' the run record updates successfully.

0 commit comments

Comments
 (0)