Skip to content

Commit ee81388

Browse files
committed
process_tracker_python-9 Process can have more than one source
✨ Processes now fail on dependency running or failed. Dependencies of a given process will now halt the process from starting if they are currently running or failed their last run. Closes: #6
1 parent 70e4dfa commit ee81388

File tree

3 files changed

+99
-6
lines changed

3 files changed

+99
-6
lines changed

process_tracker/process_tracker.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@
77
from os.path import join
88

99
import boto3
10+
from sqlalchemy.orm import aliased
1011

1112
from process_tracker.data_store import DataStore
1213
from process_tracker.extract_tracker import ExtractTracker
1314
from process_tracker.location_tracker import LocationTracker
1415

1516
from process_tracker.models.actor import Actor
1617
from process_tracker.models.extract import Extract, ExtractProcess, ExtractStatus, Location
17-
from process_tracker.models.process import ErrorTracking, ErrorType, Process, ProcessTracking, ProcessStatus, ProcessSource, ProcessTarget, ProcessType
18+
from process_tracker.models.process import ErrorTracking, ErrorType, Process, ProcessDependency, ProcessTracking, ProcessStatus, ProcessSource, ProcessTarget, ProcessType
1819
from process_tracker.models.source import Source
1920
from process_tracker.models.tool import Tool
2021

@@ -257,12 +258,28 @@ def register_new_process_run(self):
257258
When a new process instance is starting, register the run in process tracking.
258259
:return:
259260
"""
261+
child_process = aliased(Process)
262+
parent_process = aliased(Process)
260263

261264
last_run = self.get_latest_tracking_record(process=self.process)
262265

263266
new_run_flag = True
264267
new_run_id = 1
265268

269+
# Need to check the status of any dependencies. If dependencies are running or failed, halt this process.
270+
271+
dependency_hold = self.session.query(ProcessDependency)\
272+
.join(parent_process, ProcessDependency.parent_process_id == parent_process.process_id)\
273+
.join(child_process, ProcessDependency.child_process_id == child_process.process_id)\
274+
.join(ProcessTracking, ProcessTracking.process_id == parent_process.process_id) \
275+
.join(ProcessStatus, ProcessStatus.process_status_id == ProcessTracking.process_status_id) \
276+
.filter(child_process.process_id == self.process.process_id) \
277+
.filter(ProcessStatus.process_status_name.in_(('running', 'failed'))) \
278+
.count()
279+
280+
if dependency_hold > 0:
281+
raise Exception('Processes that this process is dependent on are running or failed.')
282+
266283
if last_run:
267284
# Must validate that the process is not currently running.
268285

tests/test_extract_tracker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import unittest
44

55
from process_tracker.models.extract import Extract, ExtractProcess, Location
6-
from process_tracker.models.process import Process, ProcessSource, ProcessTarget, ProcessTracking
6+
from process_tracker.models.process import Process, ProcessDependency, ProcessSource, ProcessTarget, ProcessTracking
77

88
from process_tracker.data_store import DataStore
99
from process_tracker.extract_tracker import ExtractTracker
@@ -32,6 +32,7 @@ def tearDownClass(cls):
3232
cls.session.query(ProcessTracking).delete()
3333
cls.session.query(ProcessSource).delete()
3434
cls.session.query(ProcessTarget).delete()
35+
cls.session.query(ProcessDependency).delete()
3536
cls.session.query(Process).delete()
3637
cls.session.commit()
3738

tests/test_process_tracker.py

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from sqlalchemy.orm import aliased, Session
99

1010
from process_tracker.models.extract import Extract, ExtractProcess, ExtractStatus, Location
11-
from process_tracker.models.process import ErrorType, ErrorTracking, Process, ProcessSource, ProcessTarget, ProcessTracking
11+
from process_tracker.models.process import ErrorType, ErrorTracking, Process, ProcessDependency, ProcessSource, ProcessTarget, ProcessTracking
1212

1313
from process_tracker.data_store import DataStore
1414
from process_tracker.extract_tracker import ExtractTracker
@@ -19,15 +19,16 @@ class TestProcessTracking(unittest.TestCase):
1919

2020
@classmethod
2121
def setUpClass(cls):
22-
data_store = DataStore()
23-
cls.session = data_store.session
24-
cls.data_store_type = data_store.data_store_type
22+
cls.data_store = DataStore()
23+
cls.session = cls.data_store.session
24+
cls.data_store_type = cls.data_store.data_store_type
2525

2626
@classmethod
2727
def tearDownClass(cls):
2828
cls.session.query(Location).delete()
2929
cls.session.query(ProcessSource).delete()
3030
cls.session.query(ProcessTarget).delete()
31+
cls.session.query(ProcessDependency).delete()
3132
cls.session.query(Process).delete()
3233
cls.session.commit()
3334

@@ -400,6 +401,80 @@ def test_register_new_process_run_with_previous_run(self):
400401

401402
self.assertEqual(expected_result, given_result)
402403

404+
def test_register_new_process_run_dependencies_completed(self):
405+
"""
406+
Testing that for a given process, if there are completed dependencies, then the process run is created.
407+
:return:
408+
"""
409+
dependent_process = ProcessTracker(process_name='Testing Process Tracking Dependency'
410+
, process_type='Extract'
411+
, actor_name='UnitTesting'
412+
, tool_name='Spark'
413+
, sources='Unittests'
414+
, targets='Unittests')
415+
416+
dependent_process.change_run_status(new_status='completed')
417+
self.process_tracker.change_run_status(new_status='completed')
418+
self.data_store.get_or_create(model=ProcessDependency
419+
, parent_process_id=dependent_process.process_tracking_run.process_id
420+
, child_process_id=self.process_id)
421+
422+
self.process_tracker.register_new_process_run()
423+
424+
given_count = self.session.query(ProcessTracking) \
425+
.filter(ProcessTracking.process_id == self.process_id) \
426+
.count()
427+
428+
expected_count = 2
429+
430+
self.assertEqual(expected_count, given_count)
431+
432+
def test_register_new_process_run_dependencies_running(self):
433+
"""
434+
Testing that for a given process, if there are running dependencies, then the process run is prevented from starting.
435+
:return:
436+
"""
437+
dependent_process = ProcessTracker(process_name='Testing Process Tracking Dependency Running'
438+
, process_type='Extract'
439+
, actor_name='UnitTesting'
440+
, tool_name='Spark'
441+
, sources='Unittests'
442+
, targets='Unittests')
443+
444+
dependent_process.change_run_status(new_status='running')
445+
self.process_tracker.change_run_status(new_status='completed')
446+
self.data_store.get_or_create(model=ProcessDependency
447+
, parent_process_id=dependent_process.process_tracking_run.process_id
448+
, child_process_id=self.process_id)
449+
450+
with self.assertRaises(Exception) as context:
451+
self.process_tracker.register_new_process_run()
452+
453+
return self.assertTrue('Processes that this process is dependent on are running or failed.' in str(context.exception))
454+
455+
def test_register_new_process_run_dependencies_failed(self):
456+
"""
457+
Testing that for a given process, if there are failed dependencies, then the process run is prevented from starting.
458+
:return:
459+
"""
460+
dependent_process = ProcessTracker(process_name='Testing Process Tracking Dependency Failed'
461+
, process_type='Extract'
462+
, actor_name='UnitTesting'
463+
, tool_name='Spark'
464+
, sources='Unittests'
465+
, targets='Unittests')
466+
467+
dependent_process.change_run_status(new_status='failed')
468+
self.process_tracker.change_run_status(new_status='completed')
469+
self.data_store.get_or_create(model=ProcessDependency
470+
, parent_process_id=dependent_process.process_tracking_run.process_id
471+
, child_process_id=self.process_id)
472+
473+
with self.assertRaises(Exception) as context:
474+
self.process_tracker.register_new_process_run()
475+
476+
return self.assertTrue('Processes that this process is dependent on are running or failed.' in str(context.exception))
477+
403478
def test_register_process_sources_one_source(self):
404479
"""
405480
Testing that when a new process is registered, a source registered as well.

0 commit comments

Comments
 (0)