Skip to content

Commit 695f5e4

Browse files
committed
Wrote tests for ExtractTracking
✅ Wrote tests for ExtractTracking 🐛 Debugged issue with ProcessTracking initialization In testing ExtractTracking figured that the ProcessTracking module should in fact initialize a new run instead of making the user do so. This way when the user calls ProcessTracker it will automatically try to initialize a new run.
1 parent f84a265 commit 695f5e4

File tree

5 files changed

+95
-36
lines changed

5 files changed

+95
-36
lines changed

Pipfile.lock

100644100755
File mode changed.

process_tracker/extract_tracking.py

100644100755
Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ def __init__(self, process_run, filename, location_path, location_name=None):
4343
, extract_location_id=self.location.location_id
4444
, extract_source_id=self.source.source_id)
4545

46-
self.extract_process = self.retrieve_extract_process()
47-
4846
# Getting all status types in the event there are custom status types added later.
4947
self.extract_status_types = self.get_extract_status_types()
5048

@@ -58,6 +56,8 @@ def __init__(self, process_run, filename, location_path, location_name=None):
5856
self.extract_status_deleted = self.extract_status_types['deleted']
5957
self.extract_status_error = self.extract_status_types['error']
6058

59+
self.extract_process = self.retrieve_extract_process()
60+
6161
def change_extract_status(self, new_status):
6262
"""
6363
Change an extract record status.
@@ -66,10 +66,10 @@ def change_extract_status(self, new_status):
6666
status_date = datetime.now()
6767
new_status = self.extract_status_types[new_status]
6868

69-
if self.extract_status_types[new_status]:
69+
if new_status:
7070
self.extract.extract_status_id = new_status
7171

72-
self.extract_process.extract_status_id = new_status
72+
self.extract_process.extract_process_status_id = new_status
7373
self.extract_process.extract_process_event_date_time = status_date
7474

7575
session.commit()
@@ -88,16 +88,19 @@ def derive_location_name(location_path):
8888
# Idea is to generalize things like grabbing the last directory name in the path,
8989
# what type of path is it (normal, s3, etc.)
9090

91+
location_prefix = None
92+
location_name = ""
93+
9194
location_path = location_path.lower() # Don't care about casing.
9295

9396
if "s3" in location_path:
9497
# If the path is an S3 Bucket, prefix to name.
9598

9699
location_prefix = "s3"
97-
else:
98-
location_prefix = ""
99100

100-
location_name = location_prefix + " - "
101+
if location_prefix is not None:
102+
103+
location_name = location_prefix + " - "
101104

102105
location_name += basename(normpath(location_path))
103106

@@ -124,7 +127,8 @@ def retrieve_extract_process(self):
124127

125128
extract_process = self.data_store.get_or_create(model=ExtractProcess
126129
, extract_tracking_id=self.extract.extract_id
127-
, process_tracking_id=self.process_run.process_tracking_id)
130+
, process_tracking_id=self.process_run.process_tracking_run
131+
.process_tracking_id)
128132

129133
# Only need to set to 'initializing' when it's the first time a process run is trying to work with files.
130134
if extract_process.extract_process_status_id is None:

process_tracker/process_tracking.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ def __init__(self, process_name, process_type, actor_name, tool_name, source_nam
4141
, process_tool_id=self.tool.tool_id)
4242

4343
self.process_name = process_name
44-
self.process_tracking_run = ProcessTracking()
4544

4645
# Getting all status types in the event there are custom status types added later.
4746
self.process_status_types = self.get_process_status_types()
@@ -51,6 +50,8 @@ def __init__(self, process_name, process_type, actor_name, tool_name, source_nam
5150
self.process_status_complete = self.process_status_types['completed']
5251
self.process_status_failed = self.process_status_types['failed']
5352

53+
self.process_tracking_run = self.register_new_process_run()
54+
5455
def change_run_status(self, new_status, end_date=None):
5556
"""
5657
Change a process tracking run record from 'running' to another status.
@@ -241,12 +242,12 @@ def register_new_process_run(self):
241242
, process_run_id=new_run_id
242243
, process_run_start_date_time=datetime.now()
243244
, process_run_actor_id=self.actor.actor_id
244-
, is_latest_run = True)
245+
, is_latest_run=True)
245246

246247
session.add(new_run)
247248
session.commit()
248249

249-
self.process_tracking_run = new_run
250+
return new_run
250251

251252
self.logger.info('Process tracking record added for %s' % self.process_name)
252253

tests/test_extract_tracking.py

100644100755
Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from datetime import datetime
44
import unittest
55

6-
from models.extract import Extract, ExtractProcess, ExtractStatus
6+
from models.extract import Extract, ExtractProcess, ExtractStatus, Location
77
from models.process import Process, ProcessTracking
88

99
from process_tracker import session
@@ -21,12 +21,12 @@ def setUpClass(cls):
2121
, tool_name='Spark'
2222
, source_name='Unittests')
2323

24-
cls.process_tracker.register_new_process_run()
25-
26-
cls.process_run = cls.process_tracker.process_tracking_run
24+
cls.process_run = cls.process_tracker
2725

2826
@classmethod
2927
def tearDownClass(cls):
28+
29+
session.query(ProcessTracking).delete()
3030
session.query(Process).delete()
3131
session.commit()
3232

@@ -46,9 +46,9 @@ def tearDown(self):
4646
Need to clean up tables to return them to pristine state for other tests.
4747
:return:
4848
"""
49-
session.query(ProcessTracking).delete()
5049
session.query(ExtractProcess).delete()
5150
session.query(Extract).delete()
51+
session.query(Location).delete()
5252
session.commit()
5353

5454
def test_change_extract_status(self):
@@ -69,4 +69,63 @@ def test_change_extract_status(self):
6969
expected_result = [self.extract.extract_status_ready
7070
, self.extract.extract_status_ready]
7171

72-
self.assertEqual(given_result, expected_result)
72+
self.assertEqual(expected_result, given_result)
73+
74+
def test_change_extract_status_initialization(self):
75+
"""
76+
Testing that when the extract is first being worked on by a process, the status is set to 'initializing'
77+
:return:
78+
"""
79+
extract_id = self.extract.extract.extract_id
80+
self.extract.retrieve_extract_process()
81+
82+
extract_process_record = session.query(ExtractProcess).filter(ExtractProcess.extract_tracking_id == extract_id)
83+
84+
given_result = extract_process_record[0].extract_process_status_id
85+
expected_result = self.extract.extract_status_initializing
86+
87+
self.assertEqual(expected_result, given_result)
88+
89+
def test_derive_location_name_local_path(self):
90+
"""
91+
Testing that if a location name is not provided, one is created from the local path provided.
92+
:return:
93+
"""
94+
extract = ExtractTracker(process_run=self.process_run
95+
, filename='test_extract_filename2.csv'
96+
, location_path='/home/test/extract_dir2')
97+
98+
location = session.query(Location).filter(Location.location_id == extract.extract.extract_location_id)
99+
100+
given_result = location[0].location_name
101+
expected_result = 'extract_dir2'
102+
103+
self.assertEqual(expected_result, given_result)
104+
105+
def test_derive_location_name_s3(self):
106+
"""
107+
Testing that if a location name is not provided, one is created from the s3 path provided.
108+
:return:
109+
"""
110+
extract = ExtractTracker(process_run=self.process_run
111+
, filename='test_extract_filename2.csv'
112+
, location_path='https://test-test.s3.amazonaws.com/test/extract_dir')
113+
114+
location = session.query(Location).filter(Location.location_id == extract.extract.extract_location_id)
115+
116+
given_result = location[0].location_name
117+
expected_result = 's3 - extract_dir'
118+
119+
self.assertEqual(expected_result, given_result)
120+
121+
def test_location_name_provided(self):
122+
"""
123+
Testing that if a location name is provided (like with default extract), one is not created.
124+
:return:
125+
"""
126+
location = session.query(Location).filter(Location.location_id == self.extract.extract.extract_location_id)
127+
128+
given_result = location[0].location_name
129+
expected_result = 'Test Location'
130+
131+
self.assertEqual(expected_result, given_result)

tests/test_process_tracking.py

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,6 @@
1212

1313
class TestProcessTracking(unittest.TestCase):
1414

15-
@classmethod
16-
def setUpClass(cls):
17-
cls.process_tracker = ProcessTracker(process_name='Testing Process Tracking Initialization'
18-
, process_type='Extract'
19-
, actor_name='UnitTesting'
20-
, tool_name='Spark'
21-
, source_name='Unittests')
22-
2315
@classmethod
2416
def tearDownClass(cls):
2517
session.query(Process).delete()
@@ -30,8 +22,11 @@ def setUp(self):
3022
Creating an initial process tracking run record for testing.
3123
:return:
3224
"""
33-
34-
self.process_tracker.register_new_process_run()
25+
self.process_tracker = ProcessTracker(process_name='Testing Process Tracking Initialization'
26+
, process_type='Extract'
27+
, actor_name='UnitTesting'
28+
, tool_name='Spark'
29+
, source_name='Unittests')
3530

3631
self.process_id = self.process_tracker.process.process_id
3732
self.provided_end_date = datetime.now()
@@ -55,7 +50,7 @@ def test_verify_session_variables_postgresql(self):
5550
given_result = data_store_type
5651
expected_result = 'postgresql'
5752

58-
self.assertEqual(given_result, expected_result)
53+
self.assertEqual(expected_result, given_result)
5954

6055
def test_initializing_process_tracking(self):
6156
"""
@@ -65,7 +60,7 @@ def test_initializing_process_tracking(self):
6560
given_result = self.process_tracker.actor.actor_name
6661
expected_result = 'UnitTesting'
6762

68-
self.assertEqual(given_result, expected_result)
63+
self.assertEqual(expected_result, given_result)
6964

7065
def test_register_new_process_run(self):
7166
"""
@@ -77,7 +72,7 @@ def test_register_new_process_run(self):
7772
given_result = session.query(ProcessTracking).filter(ProcessTracking.process_id == self.process_id).count()
7873
expected_result = 1
7974

80-
self.assertEqual(given_result, expected_result)
75+
self.assertEqual(expected_result, given_result)
8176

8277
def test_register_new_process_run_exception(self):
8378
"""
@@ -110,7 +105,7 @@ def test_register_new_process_run_with_previous_run(self):
110105
given_result = process_runs[0].is_latest_run
111106
expected_result = False
112107

113-
self.assertEqual(given_result, expected_result)
108+
self.assertEqual(expected_result, given_result)
114109

115110
def test_change_run_status_complete(self):
116111
"""
@@ -124,7 +119,7 @@ def test_change_run_status_complete(self):
124119
given_result = run_record[0].process_status_id
125120
expected_result = self.process_tracker.process_status_complete
126121

127-
self.assertEqual(given_result, expected_result)
122+
self.assertEqual(expected_result, given_result)
128123

129124
def test_change_run_status_failed(self):
130125
"""
@@ -138,7 +133,7 @@ def test_change_run_status_failed(self):
138133
given_result = run_record[0].process_status_id
139134
expected_result = self.process_tracker.process_status_failed
140135

141-
self.assertEqual(given_result, expected_result)
136+
self.assertEqual(expected_result, given_result)
142137

143138
def test_change_run_status_with_end_date(self):
144139
"""
@@ -153,7 +148,7 @@ def test_change_run_status_with_end_date(self):
153148
given_result = run_record[0].process_run_end_date_time
154149
expected_result = self.provided_end_date
155150

156-
self.assertEqual(given_result, expected_result)
151+
self.assertEqual(expected_result, given_result)
157152

158153
def test_raise_run_error_type_exists_no_fail(self):
159154
"""
@@ -172,7 +167,7 @@ def test_raise_run_error_type_exists_no_fail(self):
172167

173168
expected_result = 1
174169

175-
self.assertEqual(given_result, expected_result)
170+
self.assertEqual(expected_result, given_result)
176171

177172
def test_raise_run_error_type_not_exists(self):
178173
"""
@@ -219,7 +214,7 @@ def test_raise_run_error_with_fail(self):
219214
, self.provided_end_date]
220215

221216
with self.subTest():
222-
self.assertEqual(given_result, expected_result)
217+
self.assertEqual(expected_result, given_result)
223218
with self.subTest():
224219
self.assertTrue('Process halting. An error triggered the process to fail.' in str(context.exception))
225220

0 commit comments

Comments
 (0)