Skip to content

Commit ebf93a3

Browse files
committed
Register extracts by location
🐛 Have proven that register extracts by location works thru mocking Closes:#7
1 parent 76e7127 commit ebf93a3

File tree

4 files changed

+121
-57
lines changed

4 files changed

+121
-57
lines changed

process_tracker/process_tracker.py

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging
66
import os
77

8+
import boto3
89
from sqlalchemy.orm import aliased
910

1011
from process_tracker.data_store import DataStore
@@ -333,26 +334,29 @@ def register_extracts_by_location(self, location_path, location_name=None):
333334
location_path=location_path, location_name=location_name
334335
)
335336

336-
# if location.location_type.location_type_name == "s3":
337-
# s3 = boto3.resource("s3")
338-
#
339-
# path = location.location_path
340-
#
341-
# if path.startswith("s3://"):
342-
# path = path[len("s3://")]
343-
#
344-
# bucket = s3.Bucket(path)
345-
#
346-
# for file in bucket.objects.all():
347-
# ExtractTracker(process_run=self
348-
# , filename=file
349-
# , location=location
350-
# , status='ready')
351-
# else:
352-
for file in os.listdir(location_path):
353-
ExtractTracker(
354-
process_run=self, filename=file, location=location, status="ready"
355-
)
337+
if location.location_type.location_type_name == "s3":
338+
s3 = boto3.resource("s3")
339+
340+
path = location.location_path
341+
342+
path = path[path.startswith("s3://") and len("s3://") :]
343+
344+
self.logger.debug("Path is now %s" % path)
345+
346+
bucket = s3.Bucket(path)
347+
348+
for file in bucket.objects.all():
349+
ExtractTracker(
350+
process_run=self,
351+
filename=file.key,
352+
location=location,
353+
status="ready",
354+
)
355+
else:
356+
for file in os.listdir(location_path):
357+
ExtractTracker(
358+
process_run=self, filename=file, location=location, status="ready"
359+
)
356360

357361
def register_new_process_run(self):
358362
"""

tests/fixtures/test_local_dir_1.csv

Whitespace-only changes.

tests/fixtures/test_local_dir_2.csv

Whitespace-only changes.

tests/test_process_tracker.py

Lines changed: 97 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
import boto3
1010
import botocore
11-
import moto
11+
from moto import mock_s3
1212
from sqlalchemy.orm import aliased, Session
1313

1414
from process_tracker.models.extract import (
@@ -568,42 +568,102 @@ def test_register_extracts_by_location_local(self):
568568

569569
self.assertCountEqual(expected_result, given_result)
570570

571-
# def test_register_extracts_by_location_s3(self):
572-
# """
573-
# Testing that when the location is s3, all the extracts are registered and set to 'ready' status.
574-
# The process/extract relationship should also be set to 'ready' since that is the last status the process set
575-
# the extracts to.
576-
# :return:
577-
# """
578-
# process_status = aliased(ExtractStatus)
579-
# extract_status = aliased(ExtractStatus)
580-
#
581-
# expected_keys = 'test_local_dir_1.csv', 'test_local_dir_2.csv'
582-
#
583-
# client
584-
#
585-
# with moto.mock_s3():
586-
# conn = boto3.resource('s3', region_name='us-east-1')
587-
# conn.create_bucket(Bucket='test_bucket')
588-
#
589-
# for file in expected_keys:
590-
# conn.Object('test_bucket', file)
591-
#
592-
# self.process_tracker.register_extracts_by_location(location_path='s3://test_bucket')
593-
#
594-
# extracts = self.session.query(Extract.extract_filename, extract_status.extract_status_name, process_status.extract_status_name)\
595-
# .join(ExtractProcess, Extract.extract_id == ExtractProcess.extract_tracking_id) \
596-
# .join(extract_status, Extract.extract_status_id == extract_status.extract_status_id) \
597-
# .join(process_status, ExtractProcess.extract_process_status_id == process_status.extract_status_id) \
598-
# .filter(ExtractProcess.process_tracking_id == self.process_tracker.process_tracking_run.process_tracking_id)
599-
#
600-
# given_result = [[extracts[0].extract_filename, extracts[0].extract_status_name, extracts[0].extract_status_name]
601-
# ,[extracts[1].extract_filename, extracts[1].extract_status_name, extracts[1].extract_status_name]]
602-
#
603-
# expected_result = [['test_local_dir_1.csv', 'ready', 'ready']
604-
# , ['test_local_dir_2.csv', 'ready', 'ready']]
605-
#
606-
# self.assertEqual(expected_result, given_result)
571+
@mock_s3
572+
def test_register_extracts_by_location_s3(self):
573+
"""
574+
Testing that when the location is s3, all the extracts are registered and set to 'ready' status.
575+
The process/extract relationship should also be set to 'ready' since that is the last status the process set
576+
the extracts to.
577+
:return:
578+
"""
579+
process_status = aliased(ExtractStatus)
580+
extract_status = aliased(ExtractStatus)
581+
test_bucket = "test_bucket"
582+
583+
expected_keys = ["test_local_dir_1.csv", "test_local_dir_2.csv"]
584+
585+
client = boto3.client(
586+
"s3",
587+
region_name="us-east-1",
588+
aws_access_key_id="fake_access_key",
589+
aws_secret_access_key="fake_secret_key",
590+
)
591+
try:
592+
s3 = boto3.resource(
593+
"s3",
594+
region_name="us-east-1",
595+
aws_access_key_id="fake_access_key",
596+
aws_secret_access_key="fake_secret_key",
597+
)
598+
599+
s3.meta.client.head_bucket(Bucket=test_bucket)
600+
except botocore.exceptions.ClientError:
601+
pass
602+
else:
603+
err = "%s should not exist" % test_bucket
604+
raise EnvironmentError(err)
605+
606+
client.create_bucket(Bucket=test_bucket)
607+
608+
current_dir = os.path.dirname(__file__)
609+
fixtures_dir = os.path.join(current_dir, "fixtures")
610+
611+
for file in expected_keys:
612+
613+
key = os.path.join(test_bucket, file)
614+
615+
print(file)
616+
print(key)
617+
print(fixtures_dir)
618+
619+
file = os.path.join(fixtures_dir, file)
620+
client.upload_file(Filename=file, Bucket=test_bucket, Key=key)
621+
622+
self.process_tracker.register_extracts_by_location(
623+
location_path="s3://test_bucket"
624+
)
625+
626+
extracts = (
627+
self.session.query(
628+
Extract.extract_filename,
629+
extract_status.extract_status_name,
630+
process_status.extract_status_name,
631+
)
632+
.join(
633+
ExtractProcess, Extract.extract_id == ExtractProcess.extract_tracking_id
634+
)
635+
.join(
636+
extract_status,
637+
Extract.extract_status_id == extract_status.extract_status_id,
638+
)
639+
.join(
640+
process_status,
641+
ExtractProcess.extract_process_status_id
642+
== process_status.extract_status_id,
643+
)
644+
.filter(
645+
ExtractProcess.process_tracking_id
646+
== self.process_tracker.process_tracking_run.process_tracking_id
647+
)
648+
)
649+
650+
given_result = list()
651+
652+
for extract in extracts:
653+
given_result.append(
654+
[
655+
extract.extract_filename,
656+
extract.extract_status_name,
657+
extract.extract_status_name,
658+
]
659+
)
660+
661+
expected_result = [
662+
["test_bucket/test_local_dir_1.csv", "ready", "ready"],
663+
["test_bucket/test_local_dir_2.csv", "ready", "ready"],
664+
]
665+
666+
self.assertEqual(expected_result, given_result)
607667

608668
def test_register_new_process_run(self):
609669
"""

0 commit comments

Comments
 (0)