Skip to content

Commit b6ce2ab

Browse files
committed
process_tracker_python-7 Register extracts by location
✨ Can now register extracts by given location Have tested new extract registration by given location for local directory paths.
1 parent 9239734 commit b6ce2ab

File tree

11 files changed

+551
-109
lines changed

11 files changed

+551
-109
lines changed

Pipfile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ verify_ssl = true
55

66
[dev-packages]
77
coverage="*"
8+
moto="*"
89
python-coveralls="*"
910
coveralls="*"
10-
twine="*"
1111

1212
[packages]
13+
boto3="*"
1314
sqlalchemy="*"
1415
sqlalchemy-utils="*"
1516
python-dateutil="*"

Pipfile.lock

Lines changed: 343 additions & 42 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

process_tracker/data_store.py

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -59,35 +59,32 @@ def verify_and_connect_to_data_store(self):
5959
data_store_port = os.environ.get('process_tracking_data_store_port')
6060
data_store_name = os.environ.get('process_tracking_data_store_name')
6161

62-
data_store_error_flag = False
62+
errors = []
6363

6464
if data_store_type is None:
65-
raise Exception('Data store type is not set.')
66-
data_store_error_flag = True
65+
errors.append(Exception('Data store type is not set.'))
6766

6867
if data_store_username is None:
69-
raise Exception('Data store username is not set.')
70-
data_store_error_flag = True
68+
errors.append(Exception('Data store username is not set.'))
7169

7270
if data_store_password is None:
73-
raise Exception('Data store password is not set')
74-
data_store_error_flag = True
71+
errors.append(Exception('Data store password is not set'))
7572

7673
if data_store_host is None:
77-
raise Exception('Data store host is not set')
78-
data_store_error_flag = True
74+
errors.append(Exception('Data store host is not set'))
7975

8076
if data_store_port is None:
81-
raise Exception('Data store port is not set')
82-
data_store_error_flag = True
77+
errors.append(Exception('Data store port is not set'))
8378

8479
if data_store_name is None:
85-
raise Exception('Data store name is not set')
86-
data_store_error_flag = True
80+
errors.append(Exception('Data store name is not set'))
8781

88-
if data_store_error_flag:
89-
raise Exception('Data store has not been properly configured. Please read how to set up the Process '
90-
'Tracking data store by going to: <insert read the docs url here>')
82+
if errors:
83+
84+
errors.append(Exception('Data store has not been properly configured. Please read how to set up the Process '
85+
'Tracking data store by going to: <insert read the docs url here>'))
86+
87+
raise Exception(errors)
9188

9289
relational_stores = ['postgresql']
9390
nonrelational_stores = []

process_tracker/extract_tracker.py

Lines changed: 33 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,61 @@
11
# Extract Tracking
22
# Used in the creation and editing of extract records. Used in conjunction with process tracking.
33
from datetime import datetime
4-
from os.path import basename, join, normpath
4+
import logging
5+
from os.path import join
56

6-
from process_tracker.data_store import DataStore
7+
from sqlalchemy.orm import Session
78

9+
from process_tracker.data_store import DataStore
10+
from process_tracker.location_tracker import LocationTracker
811
from process_tracker.models.extract import Extract, ExtractProcess, ExtractStatus, Location
912

1013

1114
class ExtractTracker:
1215

13-
def __init__(self, process_run, filename, location_path, location_name=None):
16+
def __init__(self, process_run, filename, location=None, location_name=None, location_path=None, status=None):
1417
"""
1518
ExtractTracker is the primary engine for tracking data extracts
1619
:param process_run: The process object working with extracts (either creating or consuming)
1720
:type process_run: ProcessTracker object
1821
:param filename: Name of the data extract file.
1922
:type filename: string
23+
:param location: SQLAlchemy Location object
2024
:param location_path: Location (filepath, s3 bucket, etc.) where the file is stored
2125
:type location_path: string
2226
:param location_name: Optional parameter to provide a specific name for the location. If not provided, will use
2327
the last directory in the path as the location name. If type of location can be
2428
determined (i.e. S3 bucket), the location type will be prepended.
2529
:type location_name: string
30+
:param status: Optional if status does not need to be 'initializing', which is default.
31+
:type status: string
2632
"""
33+
self.logger = logging.getLogger(__name__)
34+
2735
self.data_store = DataStore()
2836
self.session = self.data_store.session
2937
self.process_run = process_run
3038

31-
if location_name is None:
32-
location_name = self.derive_location_name(location_path=location_path)
33-
3439
self.source = self.process_run.source
3540
self.filename = filename
36-
self.full_filename = join(location_path, filename)
3741

38-
self.location = self.data_store.get_or_create(model=Location
39-
, location_name=location_name
40-
, location_path=location_path)
42+
if location is not None:
43+
self.location = location
44+
elif location_path is not None:
45+
self.location = LocationTracker(location_name=location_name, location_path=location_path)
46+
else:
47+
raise Exception('A location object or location_path must be provided.')
4148

4249
self.extract = self.data_store.get_or_create(model=Extract
4350
, extract_filename=filename
44-
, extract_location_id=self.location.location_id
51+
, extract_location_id=self.location.location.location_id
4552
, extract_source_id=self.source.source_id)
4653

54+
if location_path is not None:
55+
self.full_filename = join(location_path, filename)
56+
else:
57+
self.full_filename = join(self.location.location_path, self.extract.extract_filename)
58+
4759
# Getting all status types in the event there are custom status types added later.
4860
self.extract_status_types = self.get_extract_status_types()
4961

@@ -59,7 +71,16 @@ def __init__(self, process_run, filename, location_path, location_name=None):
5971

6072
self.extract_process = self.retrieve_extract_process()
6173

62-
self.extract.extract_status_id = self.extract_status_initializing
74+
if status is not None and self.extract_status_types[status]:
75+
self.logger.info('Setting extract status to %s' % status)
76+
self.extract.extract_status_id = self.extract_status_types[status]
77+
self.extract_process.extract_process_status_id = self.extract_status_types[status]
78+
else:
79+
if status is not None:
80+
self.logger.error('Provided status %s is not in extract_status_types_lkup. '
81+
'Setting to initializing.' % status)
82+
self.extract.extract_status_id = self.extract_status_initializing
83+
6384
self.session.commit()
6485

6586
def change_extract_status(self, new_status):
@@ -82,34 +103,6 @@ def change_extract_status(self, new_status):
82103
raise Exception('%s is not a valid extract status type. '
83104
'Please add the status to extract_status_lkup' % new_status)
84105

85-
@staticmethod
86-
def derive_location_name(location_path):
87-
"""
88-
If location name is not provided, attempt to derive name from path.
89-
:param location_path: The data extract file location path.
90-
:return:
91-
"""
92-
# Idea is to generalize things like grabbing the last directory name in the path,
93-
# what type of path is it (normal, s3, etc.)
94-
95-
location_prefix = None
96-
location_name = ""
97-
98-
location_path = location_path.lower() # Don't care about casing.
99-
100-
if "s3" in location_path:
101-
# If the path is an S3 Bucket, prefix to name.
102-
103-
location_prefix = "s3"
104-
105-
if location_prefix is not None:
106-
107-
location_name = location_prefix + " - "
108-
109-
location_name += basename(normpath(location_path))
110-
111-
return location_name
112-
113106
def get_extract_status_types(self):
114107
"""
115108
Get list of process status types and return dictionary.
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Location
2+
# For processes dealing with Extract Locations.
3+
4+
from os.path import basename, normpath
5+
6+
from process_tracker.data_store import DataStore
7+
from process_tracker.models.extract import Location, LocationType
8+
9+
10+
class LocationTracker:
11+
12+
def __init__(self, location_path, location_name=None):
13+
14+
self.data_store = DataStore()
15+
16+
self.location_path = location_path.lower()
17+
self.location_type = self.derive_location_type()
18+
19+
if location_name is None:
20+
self.location_name = self.derive_location_name()
21+
else:
22+
self.location_name = location_name
23+
24+
self.location = self.data_store.get_or_create(model=Location
25+
, location_name=self.location_name
26+
, location_path=location_path
27+
, location_type=self.location_type.location_type_id)
28+
29+
def derive_location_name(self):
30+
"""
31+
If location name is not provided, attempt to derive name from path.
32+
:return:
33+
"""
34+
# Idea is to generalize things like grabbing the last directory name in the path,
35+
# what type of path is it (normal, s3, etc.)
36+
37+
location_prefix = None
38+
39+
location_name = ""
40+
41+
if "s3" in self.location_path:
42+
# If the path is an S3 Bucket, prefix to name.
43+
44+
location_prefix = "s3"
45+
46+
if location_prefix is not None:
47+
location_name = location_prefix + " - "
48+
49+
location_name += basename(normpath(self.location_path))
50+
51+
return location_name
52+
53+
def derive_location_type(self):
54+
"""
55+
Determine the type of location provided.
56+
:return:
57+
"""
58+
59+
if "s3" in self.location_path:
60+
61+
location_type = self.data_store.get_or_create(model=LocationType
62+
, location_type_name="s3")
63+
64+
else:
65+
location_type = self.data_store.get_or_create(model=LocationType
66+
, location_type_name="local directory")
67+
68+
return location_type

process_tracker/models/extract.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,25 @@ class ExtractProcess(Base):
5050
extract_processes = relationship('ProcessTracking', foreign_keys=[process_tracking_id])
5151

5252

53+
class LocationType(Base):
54+
55+
__tablename__ = "location_type_lkup"
56+
57+
location_type_id = Column(Integer, Sequence('location_type_lkup_location_type_id_seq'), primary_key=True)
58+
location_type_name = Column(String(25), unique=True, nullable=False)
59+
60+
locations = relationship('Location', back_populates='location_types')
61+
62+
5363
class Location(Base):
5464

5565
__tablename__ = "location_lkup"
5666

5767
location_id = Column(Integer, Sequence('location_lkup_location_id_seq'), primary_key=True)
5868
location_name = Column(String(750), nullable=False, unique=True)
5969
location_path = Column(String(750), nullable=False, unique=True)
70+
location_type = Column(Integer, ForeignKey("location_type_lkup.location_type_id"))
6071

6172
extracts = relationship("Extract")
73+
74+
location_types = relationship('LocationType', foreign_keys=[location_type])

process_tracker/process_tracker.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,14 @@
33

44
from datetime import datetime
55
import logging
6+
import os
67
from os.path import join
78

9+
import boto3
10+
811
from process_tracker.data_store import DataStore
12+
from process_tracker.extract_tracker import ExtractTracker
13+
from process_tracker.location_tracker import LocationTracker
914

1015
from process_tracker.models.actor import Actor
1116
from process_tracker.models.extract import Extract, ExtractProcess, ExtractStatus, Location
@@ -214,6 +219,31 @@ def raise_run_error(self, error_type_name, error_description=None, fail_run=Fals
214219
self.session.commit()
215220
raise Exception('Process halting. An error triggered the process to fail.')
216221

222+
def register_extracts_by_location(self, location_path, location_name=None):
223+
"""
224+
For a given location, find all files and attempt to register them.
225+
:param location_name: Name of the location
226+
:param location_path: Path of the location
227+
:return:
228+
"""
229+
location = LocationTracker(location_path=location_path, location_name=location_name)
230+
231+
if location.location_type == "s3":
232+
s3 = boto3.resource('s3')
233+
bucket = s3.Bucket(location.location_path)
234+
235+
for file in bucket.objects.all():
236+
ExtractTracker(process_run=self
237+
, filename=file
238+
, location=location
239+
, status='ready')
240+
else:
241+
for file in os.listdir(location_path):
242+
ExtractTracker(process_run=self
243+
, filename=file
244+
, location=location
245+
, status='ready')
246+
217247
def register_new_process_run(self):
218248
"""
219249
When a new process instance is starting, register the run in process tracking.
@@ -252,7 +282,6 @@ def register_new_process_run(self):
252282

253283
else:
254284
raise Exception('The process %s is currently running.' % self.process_name)
255-
exit()
256285

257286
def set_process_run_low_high_dates(self, low_date=None, high_date=None):
258287
"""

setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
packages=setuptools.find_packages(),
1616
test_suite='tests.process_tracker_test_suite',
1717
install_requires=[
18+
'boto3 >= 1.9.150',
1819
'sqlalchemy >= 1.3.3',
1920
'sqlalchemy-utils >= 0.33.11',
2021
'python-dateutil >= 2.8.0',
@@ -23,9 +24,9 @@
2324
extras_requires={
2425
'dev': [
2526
'coverage >= "4.0.3',
27+
'moto >= 1.3.8'
2628
'python-coveralls >= 2.9.1',
2729
'coveralls >= 1.7.0',
28-
'twine >= 1.13.0'
2930
]
3031
},
3132
classifiers=[

tests/test_extract_tracker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ def test_derive_location_name_local_path(self):
9494
:return:
9595
"""
9696
extract = ExtractTracker(process_run=self.process_run
97-
, filename='test_extract_filename2.csv'
98-
, location_path='/home/test/extract_dir2')
97+
, filename='test_extract_filename2.csv'
98+
, location_path='/home/test/extract_dir2')
9999

100100
location = self.session.query(Location).filter(Location.location_id == extract.extract.extract_location_id)
101101

tests/test_location_tracker.py

Whitespace-only changes.

0 commit comments

Comments
 (0)