Skip to content

Commit 1143893

Browse files
committed
✨ Add ability to trigger errors
✨ Add ability to change process run state
1 parent 4ca68ea commit 1143893

File tree

7 files changed

+314
-45
lines changed

7 files changed

+314
-45
lines changed

Pipfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ sqlalchemy = "*"
1010
sqlalchemy-utils = "*"
1111
python-dateutil = "*"
1212
psycopg2-binary = "*"
13+
coverage = "*"
1314

1415
[requires]
1516
python_version = "3.7"

Pipfile.lock

Lines changed: 38 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

process_tracker/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868

6969
Session = sessionmaker(bind=engine)
7070

71-
session = Session()
71+
session = Session(expire_on_commit=False)
7272
session.execute("SET search_path TO %s" % data_store_name)
7373

7474
else:

process_tracker/data_store.py

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
21
from process_tracker import session
3-
from models.process import ProcessTracking
42

53

64
class DataStore:
@@ -28,26 +26,6 @@ def get_or_create(model, create=True, **kwargs):
2826
session.commit()
2927

3028
else:
31-
raise Exception('There is no record match in %s' % model.__tablename__)
32-
exit()
33-
34-
return instance
35-
36-
@staticmethod
37-
def get_latest_tracking_record(process):
38-
"""
39-
For the given process, find the latest tracking record.
40-
:param process: The process' process_id.
41-
:type process: integer
42-
:return:
43-
"""
44-
45-
instance = session.query(ProcessTracking)\
46-
.filter(ProcessTracking.process_id == process)\
47-
.order_by(ProcessTracking.process_run_id.desc())\
48-
.first()
49-
50-
if instance is None:
51-
return False
29+
raise Exception('There is no record match in %s .' % model.__tablename__)
5230

5331
return instance

process_tracker/extract_tracking.py

Whitespace-only changes.

process_tracker/process_tracking.py

Lines changed: 124 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
21
from datetime import datetime
32
import logging
43

54
from process_tracker import session
65
from process_tracker.data_store import DataStore
76
from models.actor import Actor
8-
from models.process import Process, ProcessTracking, ProcessType, ProcessStatus
7+
from models.process import ErrorTracking, ErrorType, Process, ProcessTracking, ProcessStatus, ProcessType
98
from models.source import Source
109
from models.tool import Tool
1110

@@ -21,7 +20,7 @@ def __init__(self, process_name, process_type, actor_name, tool_name, source_nam
2120
:param source_name: Name of the source that the data is coming from.
2221
"""
2322

24-
self.logging = logging.getLogger(__name__)
23+
self.logger = logging.getLogger(__name__)
2524

2625
self.data_store = DataStore()
2726

@@ -35,23 +34,25 @@ def __init__(self, process_name, process_type, actor_name, tool_name, source_nam
3534
, process_type_id=self.process_type.process_type_id
3635
, process_tool_id=self.tool.tool_id)
3736

38-
self.process_status_running = session.query(ProcessStatus)\
39-
.filter(ProcessStatus.process_status_name == 'running').with_entities(ProcessStatus.process_status_id)
40-
self.process_status_complete = session.query(ProcessStatus)\
41-
.filter(ProcessStatus.process_status_name == 'complete').with_entities(ProcessStatus.process_status_id)
42-
self.process_status_failed = session.query(ProcessStatus)\
43-
.filter(ProcessStatus.process_status_name == 'failed').with_entities(ProcessStatus.process_status_id)
37+
self.process_name = process_name
38+
self.process_tracking_run = ProcessTracking()
39+
40+
self.process_status_types = self.get_process_status_types()
41+
42+
self.process_status_running = self.process_status_types['running']
43+
self.process_status_complete = self.process_status_types['completed']
44+
self.process_status_failed = self.process_status_types['failed']
4445

4546
def register_new_process_run(self):
4647
"""
4748
When a new process instance is starting, register the run in process tracking.
4849
:return:
4950
"""
5051

51-
last_run = self.data_store.get_latest_tracking_record(process=self.process.process_id)
52+
last_run = self.get_latest_tracking_record(process=self.process)
5253

5354
new_run_flag = True
54-
new_run_id = 0
55+
new_run_id = 1
5556

5657
if last_run:
5758
# Must validate that the process is not currently running.
@@ -72,4 +73,115 @@ def register_new_process_run(self):
7273
session.add(new_run)
7374
session.commit()
7475

75-
self.logging.info('Process tracking record added for %s' % new_run)
76+
self.process_tracking_run = new_run
77+
78+
self.logger.info('Process tracking record added for %s' % self.process_name)
79+
80+
else:
81+
raise Exception('The process %s is currently running.' % self.process_name)
82+
exit()
83+
84+
def change_run_status(self, new_status, end_date=None):
85+
"""
86+
Change a process tracking run record from 'running' to another status.
87+
:param new_status: The name of the status that the run is being switched to.
88+
:type new_status: string
89+
:param end_date: If there is a specific time required (i.e. to keep timestamps consistent between log and data
90+
store)
91+
:type end_date: datetime
92+
:return:
93+
"""
94+
if end_date is None:
95+
end_date = datetime.now()
96+
97+
if self.process_status_types[new_status]:
98+
99+
self.process_tracking_run.process_status_id = self.process_status_types[new_status]
100+
101+
if (self.process_status_types[new_status] == self.process_status_complete) \
102+
or (self.process_status_types[new_status] == self.process_status_failed):
103+
104+
self.logger.info("Process status changing to failed or completed.")
105+
106+
self.process_tracking_run.process_run_end_date_time = end_date
107+
108+
if self.process_status_types[new_status] == self.process_status_failed:
109+
110+
self.logger.info("Process recording as failed. Setting process last_failed_run_date_time.")
111+
112+
self.process.last_failed_run_date_time = end_date
113+
114+
session.commit()
115+
116+
else:
117+
raise Exception('%s is not a valid process status type. '
118+
'Please add the status to process_status_type_lkup' % new_status)
119+
120+
def raise_run_error(self, error_type_name, error_description=None, fail_run=False, end_date=None):
121+
"""
122+
Raise a runtime error and fail the process run if the error is severe enough. The error also is recorded in
123+
error_tracking.
124+
:param error_type_name: The name of the type of error being triggered.
125+
:param error_description: The description of the error to store in error tracking.
126+
:param fail_run: Flag for triggering a run failure, default False
127+
:type fail_run: Boolean
128+
:param end_date: If a specific datetime is required for the process_tracking end datetime.
129+
:type end_date: datetime
130+
:return:
131+
"""
132+
if end_date is None:
133+
end_date = datetime.now() # Need the date to match across all parts of the event in case the run is failed.
134+
135+
if error_description is None:
136+
error_description = 'Unspecified error.'
137+
138+
error_type = self.data_store.get_or_create(model=ErrorType, create=False, error_type_name=error_type_name)
139+
140+
run_error = ErrorTracking(error_type_id=error_type.error_type_id
141+
, error_description=error_description
142+
, process_tracking_id=self.process_tracking_run.process_tracking_id
143+
, error_occurrence_date_time=end_date)
144+
145+
self.logger.error('%s - %s - %s' % (self.process_name, error_type_name, error_description))
146+
session.add(run_error)
147+
session.commit()
148+
149+
if fail_run:
150+
self.change_run_status(new_status='failed', end_date=end_date)
151+
session.commit()
152+
raise Exception('Process halting. An error triggered the process to fail.')
153+
154+
@staticmethod
155+
def get_process_status_types():
156+
"""
157+
Get list of process status types and return dictionary.
158+
:return:
159+
"""
160+
status_types ={}
161+
162+
for record in session.query(ProcessStatus):
163+
status_types[record.process_status_name] = record.process_status_id
164+
165+
return status_types
166+
167+
@staticmethod
168+
def get_latest_tracking_record(process):
169+
"""
170+
For the given process, find the latest tracking record.
171+
:param process: The process' process_id.
172+
:type process: integer
173+
:return:
174+
"""
175+
176+
instance = session.query(ProcessTracking)\
177+
.filter(ProcessTracking.process_id == process.process_id)\
178+
.order_by(ProcessTracking.process_run_id.desc())\
179+
.first()
180+
181+
if instance is None:
182+
return False
183+
184+
return instance
185+
186+
187+

0 commit comments

Comments
 (0)