Skip to content

Commit fe49124

Browse files
committed
process_tracker_python-7 Register extracts by location
✨ Can now register extracts by given location 🐛 Have been unable to write test for s3 locations. Tried writing test for s3 locations to register, but having issues with moto library. Waiting to hear back from stackoverflow, but wanted to at least commit what I've done before switching to another issue in the meantime.
1 parent b6ce2ab commit fe49124

File tree

3 files changed

+46
-12
lines changed

3 files changed

+46
-12
lines changed

process_tracker/location_tracker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ def __init__(self, location_path, location_name=None):
1414
self.data_store = DataStore()
1515

1616
self.location_path = location_path.lower()
17-
self.location_type = self.derive_location_type()
1817

1918
if location_name is None:
2019
self.location_name = self.derive_location_name()
2120
else:
2221
self.location_name = location_name
2322

23+
self.location_type = self.derive_location_type()
2424
self.location = self.data_store.get_or_create(model=Location
2525
, location_name=self.location_name
2626
, location_path=location_path
@@ -56,7 +56,7 @@ def derive_location_type(self):
5656
:return:
5757
"""
5858

59-
if "s3" in self.location_path:
59+
if "s3" in self.location_path or "s3" in self.location_name:
6060

6161
location_type = self.data_store.get_or_create(model=LocationType
6262
, location_type_name="s3")

process_tracker/process_tracker.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,9 +228,15 @@ def register_extracts_by_location(self, location_path, location_name=None):
228228
"""
229229
location = LocationTracker(location_path=location_path, location_name=location_name)
230230

231-
if location.location_type == "s3":
232-
s3 = boto3.resource('s3')
233-
bucket = s3.Bucket(location.location_path)
231+
if location.location_type.location_type_name == "s3":
232+
s3 = boto3.resource("s3")
233+
234+
path = location.location_path
235+
236+
if path.startswith("s3://"):
237+
path = path[len("s3://")]
238+
239+
bucket = s3.Bucket(path)
234240

235241
for file in bucket.objects.all():
236242
ExtractTracker(process_run=self

tests/test_process_tracker.py

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from datetime import datetime, timedelta
44
import unittest
55

6+
import boto3
67
import moto
78
from sqlalchemy.orm import aliased, Session
89

@@ -316,13 +317,40 @@ def test_register_extracts_by_location_local(self):
316317

317318
self.assertEqual(expected_result, given_result)
318319

319-
def test_register_extracts_by_location_s3(self):
320-
"""
321-
Testing that when the location is s3, all the extracts are registered and set to 'ready' status.
322-
:return:
323-
"""
324-
325-
320+
# def test_register_extracts_by_location_s3(self):
321+
# """
322+
# Testing that when the location is s3, all the extracts are registered and set to 'ready' status.
323+
# The process/extract relationship should also be set to 'ready' since that is the last status the process set
324+
# the extracts to.
325+
# :return:
326+
# """
327+
# process_status = aliased(ExtractStatus)
328+
# extract_status = aliased(ExtractStatus)
329+
#
330+
# expected_keys = 'test_local_dir_1.csv', 'test_local_dir_2.csv'
331+
#
332+
# with moto.mock_s3():
333+
# conn = boto3.resource('s3', region_name='us-east-1')
334+
# conn.create_bucket(Bucket='test_bucket')
335+
#
336+
# for file in expected_keys:
337+
# conn.Object('test_bucket', file)
338+
#
339+
# self.process_tracker.register_extracts_by_location(location_path='s3://test_bucket')
340+
#
341+
# extracts = self.session.query(Extract.extract_filename, extract_status.extract_status_name, process_status.extract_status_name)\
342+
# .join(ExtractProcess, Extract.extract_id == ExtractProcess.extract_tracking_id) \
343+
# .join(extract_status, Extract.extract_status_id == extract_status.extract_status_id) \
344+
# .join(process_status, ExtractProcess.extract_process_status_id == process_status.extract_status_id) \
345+
# .filter(ExtractProcess.process_tracking_id == self.process_tracker.process_tracking_run.process_tracking_id)
346+
#
347+
# given_result = [[extracts[0].extract_filename, extracts[0].extract_status_name, extracts[0].extract_status_name]
348+
# ,[extracts[1].extract_filename, extracts[1].extract_status_name, extracts[1].extract_status_name]]
349+
#
350+
# expected_result = [['test_local_dir_1.csv', 'ready', 'ready']
351+
# ,['test_local_dir_2.csv', 'ready', 'ready']]
352+
#
353+
# self.assertEqual(expected_result, given_result)
326354

327355
def test_register_new_process_run(self):
328356
"""

0 commit comments

Comments
 (0)