Skip to content

Commit b36cdfa

Browse files
authored
Merge pull request #34 from OpenDataAlex/process_tracker_python-30
process_tracker_python-30 Extract Lookups should return Extract objects
2 parents d4e569e + c16e0a0 commit b36cdfa

File tree

4 files changed

+72
-82
lines changed

4 files changed

+72
-82
lines changed

process_tracker/models/actor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,7 @@ class Actor(Base):
1111

1212
actor_id = Column(Integer, Sequence('actor_lkup_actor_id_seq'), primary_key=True)
1313
actor_name = Column(String(250), nullable=False, unique=True)
14+
15+
def __repr__(self):
16+
return "<Actor id=%s, name=%s>" % (self.actor_id
17+
, self.actor_name)

process_tracker/models/extract.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# Models for Extract (Data) entities
33

44
from datetime import datetime
5+
from os.path import join
56

67
from sqlalchemy import Column, DateTime, ForeignKey, Integer, Sequence, String
78
from sqlalchemy.orm import relationship
@@ -18,6 +19,11 @@ class ExtractStatus(Base):
1819

1920
extracts = relationship("ExtractProcess")
2021

22+
def __repr__(self):
23+
24+
return "<Extract Status id=%s, name=%s>" % (self.extract_status_id
25+
, self.extract_status_name)
26+
2127

2228
class Extract(Base):
2329

@@ -32,6 +38,17 @@ class Extract(Base):
3238
extract_process = relationship("ExtractProcess", back_populates='process_extracts')
3339
locations = relationship("Location", foreign_keys=[extract_location_id])
3440

41+
def __repr__(self):
42+
43+
return "<Extract id=%s, filename=%s, location=%s, status=%s>" % (self.extract_id
44+
, self.extract_filename
45+
, self.extract_location_id
46+
, self.extract_status_id)
47+
48+
def full_filepath(self):
49+
50+
return join(self.locations.location_path, self.extract_filename)
51+
3552

3653
class ExtractProcess(Base):
3754

@@ -45,6 +62,12 @@ class ExtractProcess(Base):
4562
process_extracts = relationship('Extract', foreign_keys=[extract_tracking_id])
4663
extract_processes = relationship('ProcessTracking', foreign_keys=[process_tracking_id])
4764

65+
def __repr__(self):
66+
67+
return "<ExtractProcess extract=%s, process_run=%s, extract_status=%s>" % (self.extract_tracking_id
68+
, self.process_tracking_id
69+
, self.extract_process_status_id)
70+
4871

4972
class LocationType(Base):
5073

@@ -54,7 +77,12 @@ class LocationType(Base):
5477
location_type_name = Column(String(25), unique=True, nullable=False)
5578

5679
locations = relationship('Location', back_populates='location_types')
57-
80+
81+
def __repr__(self):
82+
83+
return "<LocationType id=%s, name=%s>" % (self.location_type_id
84+
, self.location_type_name)
85+
5886

5987
class Location(Base):
6088

@@ -68,3 +96,9 @@ class Location(Base):
6896
extracts = relationship("Extract")
6997

7098
location_types = relationship('LocationType', foreign_keys=[location_type])
99+
100+
def __repr__(self):
101+
102+
return "<Location id=%s, name=%s, type=%s>" % (self.location_id
103+
, self.location_name
104+
, self.location_path)

process_tracker/process_tracker.py

Lines changed: 20 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from datetime import datetime
55
import logging
66
import os
7-
from os.path import join
87

98
from sqlalchemy.orm import aliased
109

@@ -16,8 +15,7 @@
1615

1716
from process_tracker.models.actor import Actor
1817
from process_tracker.models.extract import Extract, ExtractProcess, ExtractStatus, Location
19-
from process_tracker.models.process import ErrorTracking, ErrorType, Process, ProcessDependency, ProcessTracking\
20-
, ProcessStatus, ProcessSource, ProcessTarget, ProcessType
18+
from process_tracker.models.process import ErrorTracking, ErrorType, Process, ProcessDependency, ProcessTracking, ProcessStatus, ProcessSource, ProcessTarget, ProcessType
2119
from process_tracker.models.source import Source
2220
from process_tracker.models.tool import Tool
2321

@@ -80,13 +78,10 @@ def change_run_status(self, new_status, end_date=None):
8078
:return:
8179
"""
8280
if end_date is None:
83-
self.logger.info('Eng date was not set. Setting to current timestamp.')
8481
end_date = datetime.now()
8582

8683
if self.process_status_types[new_status]:
8784

88-
self.logger.info('New status exists. Setting run to %s' % new_status)
89-
9085
self.process_tracking_run.process_status_id = self.process_status_types[new_status]
9186

9287
if (self.process_status_types[new_status] == self.process_status_complete) \
@@ -104,7 +99,6 @@ def change_run_status(self, new_status, end_date=None):
10499
self.session.commit()
105100

106101
else:
107-
self.logger.error('%s is not a valid process status type.' % new_status)
108102
raise Exception('%s is not a valid process status type. '
109103
'Please add the status to process_status_type_lkup' % new_status)
110104

@@ -115,71 +109,54 @@ def find_ready_extracts_by_filename(self, filename):
115109
:param filename:
116110
:return:
117111
"""
118-
extract_files = []
119-
120-
self.logger.info('Searching for extracts with full/partial filename of %s' % filename)
121112

122-
process_files = self.session.query(Extract.extract_filename, Location.location_path)\
123-
.join(Location)\
113+
process_files = self.session.query(Extract)\
124114
.join(ExtractStatus)\
125115
.filter(Extract.extract_filename.like("%" + filename + "%"))\
126116
.filter(ExtractStatus.extract_status_name == 'ready') \
127117
.order_by(Extract.extract_registration_date_time)\
128-
.order_by(Extract.extract_id)
129-
130-
for record in process_files:
131-
extract_files.append(join(record.location_path, record.extract_filename))
132-
self.logger.debug('Found file %s' % record.extract_filename)
118+
.order_by(Extract.extract_id)\
119+
.all()
133120

134-
return extract_files
121+
return process_files
135122

136123
def find_ready_extracts_by_location(self, location):
137124
"""
138125
For the given location name, find all matching extracts that are ready for processing
139126
:param location:
140127
:return:
141128
"""
142-
extract_files = []
143129

144-
self.logger.info('Searching for extracts found at the location named %s' % location)
145-
146-
process_files = self.session.query(Extract.extract_filename, Location.location_path)\
130+
process_files = self.session.query(Extract)\
147131
.join(Location)\
148132
.join(ExtractStatus)\
149133
.filter(ExtractStatus.extract_status_name == 'ready')\
150134
.filter(Location.location_name == location) \
151-
.order_by(Extract.extract_registration_date_time)
152-
153-
for record in process_files:
154-
extract_files.append(join(record.location_path, record.extract_filename))
155-
self.logger.debug('Found file %s' % record.extract_filename)
135+
.order_by(Extract.extract_registration_date_time)\
136+
.all()
156137

157-
return extract_files
138+
return process_files
158139

159140
def find_ready_extracts_by_process(self, extract_process_name):
160141
"""
161142
For the given named process, find the extracts that are ready for processing.
162143
:return: List of OS specific filepaths with filenames.
163144
"""
164-
extract_files = []
165145

166-
self.logger.info('Searching for extracts related to process %s' % extract_process_name)
167-
168-
process_files = self.session.query(Extract.extract_filename, Location.location_path) \
146+
process_files = self.session.query(Extract) \
169147
.join(ExtractStatus, Extract.extract_status_id == ExtractStatus.extract_status_id) \
170148
.join(Location, Extract.extract_location_id == Location.location_id) \
171149
.join(ExtractProcess, Extract.extract_id == ExtractProcess.extract_tracking_id) \
172150
.join(ProcessTracking) \
173151
.join(Process) \
174152
.filter(Process.process_name == extract_process_name
175153
, ExtractStatus.extract_status_name == 'ready') \
176-
.order_by(Extract.extract_registration_date_time)
154+
.order_by(Extract.extract_registration_date_time)\
155+
.all()
177156

178-
for record in process_files:
179-
extract_files.append(join(record.location_path, record.extract_filename))
180-
self.logger.debug('Found file %s' % record.extract_filename)
157+
self.logger.info('Returning extract files by process.')
181158

182-
return extract_files
159+
return process_files
183160

184161
def get_latest_tracking_record(self, process):
185162
"""
@@ -188,14 +165,13 @@ def get_latest_tracking_record(self, process):
188165
:type process: integer
189166
:return:
190167
"""
191-
self.logger.info('Searching for latest process run for process %s' % process.process_name)
168+
192169
instance = self.session.query(ProcessTracking)\
193170
.filter(ProcessTracking.process_id == process.process_id)\
194171
.order_by(ProcessTracking.process_run_id.desc())\
195172
.first()
196173

197174
if instance is None:
198-
self.logger.info('Process run not found.')
199175
return False
200176

201177
return instance
@@ -207,12 +183,7 @@ def get_process_status_types(self):
207183
"""
208184
status_types = {}
209185

210-
self.logger.info('Getting all process status types.')
211-
212186
for record in self.session.query(ProcessStatus):
213-
214-
self.logger.debug('Found process status %s' % record.process_status_name)
215-
216187
status_types[record.process_status_name] = record.process_status_id
217188

218189
return status_types
@@ -229,21 +200,14 @@ def raise_run_error(self, error_type_name, error_description=None, fail_run=Fals
229200
:type end_date: datetime
230201
:return:
231202
"""
232-
self.logger.info('Raising run error.')
233-
234203
if end_date is None:
235-
self.logger.info('Setting end date since one was not provided.')
236204
end_date = datetime.now() # Need the date to match across all parts of the event in case the run is failed.
237205

238206
if error_description is None:
239-
self.logger.info('Setting default error description since one was not provided.')
240207
error_description = 'Unspecified error.'
241208

242-
self.logger.info('Getting error type.')
243-
244209
error_type = self.data_store.get_or_create_item(model=ErrorType, create=False, error_type_name=error_type_name)
245210

246-
self.logger.info('Setting run error.')
247211
run_error = ErrorTracking(error_type_id=error_type.error_type_id
248212
, error_description=error_description
249213
, process_tracking_id=self.process_tracking_run.process_tracking_id
@@ -256,7 +220,6 @@ def raise_run_error(self, error_type_name, error_description=None, fail_run=Fals
256220
if fail_run:
257221
self.change_run_status(new_status='failed', end_date=end_date)
258222
self.session.commit()
259-
self.logger.error('Process halting. An error triggered the process to fail.')
260223
raise Exception('Process halting. An error triggered the process to fail.')
261224

262225
def register_extracts_by_location(self, location_path, location_name=None):
@@ -266,7 +229,6 @@ def register_extracts_by_location(self, location_path, location_name=None):
266229
:param location_path: Path of the location
267230
:return:
268231
"""
269-
self.logger.info('Getting location info for %s' % location_path)
270232
location = LocationTracker(location_path=location_path, location_name=location_name)
271233

272234
# if location.location_type.location_type_name == "s3":
@@ -286,7 +248,6 @@ def register_extracts_by_location(self, location_path, location_name=None):
286248
# , status='ready')
287249
# else:
288250
for file in os.listdir(location_path):
289-
self.logger.debug('Registering file %s' % file)
290251
ExtractTracker(process_run=self
291252
, filename=file
292253
, location=location
@@ -300,15 +261,12 @@ def register_new_process_run(self):
300261
child_process = aliased(Process)
301262
parent_process = aliased(Process)
302263

303-
self.logger.info('Finding latest process run.')
304-
305264
last_run = self.get_latest_tracking_record(process=self.process)
306265

307266
new_run_flag = True
308267
new_run_id = 1
309268

310269
# Need to check the status of any dependencies. If dependencies are running or failed, halt this process.
311-
self.logger.info('Checking process dependencies.')
312270

313271
dependency_hold = self.session.query(ProcessDependency)\
314272
.join(parent_process, ProcessDependency.parent_process_id == parent_process.process_id)\
@@ -320,18 +278,15 @@ def register_new_process_run(self):
320278
.count()
321279

322280
if dependency_hold > 0:
323-
self.logger.error('Processes that this process is dependent on are running or failed.')
324281
raise Exception('Processes that this process is dependent on are running or failed.')
325282

326283
if last_run:
327284
# Must validate that the process is not currently running.
328-
self.logger.info('Process run found. Verifying that the process is not running.')
329285

330286
if last_run.process_status_id != self.process_status_running:
331287
last_run.is_latest_run = False
332288
new_run_flag = True
333289
new_run_id = last_run.process_run_id + 1
334-
self.logger.info('Previous process run not running. Creating new run.')
335290
else:
336291
new_run_flag = False
337292

@@ -346,13 +301,12 @@ def register_new_process_run(self):
346301
self.session.add(new_run)
347302
self.session.commit()
348303

349-
self.logger.info('Process tracking record added for %s' % self.process_name)
350-
351304
return new_run
352305

306+
self.logger.info('Process tracking record added for %s' % self.process_name)
307+
353308
else:
354-
self.logger.error('Process %s is currently running.' % self.process_name)
355-
raise Exception('Process %s is currently running.' % self.process_name)
309+
raise Exception('The process %s is currently running.' % self.process_name)
356310

357311
def register_process_sources(self, sources):
358312
"""
@@ -361,12 +315,10 @@ def register_process_sources(self, sources):
361315
:return: List of source objects.
362316
"""
363317
if isinstance(sources, str):
364-
self.logger.info('Only one source provided. Turning into list for processing.')
365318
sources = [sources]
366319
source_list = []
367320

368321
for source in sources:
369-
self.logger.debug('Registering source %s' % source)
370322
source = self.data_store.get_or_create_item(model=Source, source_name=source)
371323

372324
self.data_store.get_or_create_item(model=ProcessSource, source_id=source.source_id
@@ -382,12 +334,10 @@ def register_process_targets(self, targets):
382334
:return: List of source objects.
383335
"""
384336
if isinstance(targets, str):
385-
self.logger.info('Only one target provided. Turning into list for processing.')
386337
targets = [targets]
387338
target_list = []
388339

389340
for target in targets:
390-
self.logger.debug('Registering target %s' % target)
391341
source = self.data_store.get_or_create_item(model=Source, source_name=target)
392342

393343
self.data_store.get_or_create_item(model=ProcessTarget, target_source_id=source.source_id
@@ -410,11 +360,9 @@ def set_process_run_low_high_dates(self, low_date=None, high_date=None):
410360
previous_high_date_time = self.process_tracking_run.process_run_low_date_time
411361

412362
if low_date is not None and (previous_low_date_time is None or low_date < previous_low_date_time):
413-
self.logger.info('Setting process run low date to %s' % low_date)
414363
self.process_tracking_run.process_run_low_date_time = low_date
415364

416365
if high_date is not None and (previous_high_date_time is None or high_date > previous_high_date_time):
417-
self.logger.info('Setting process run high date to %s' % high_date)
418366
self.process_tracking_run.process_run_high_date_time = high_date
419367

420368
self.session.commit()
@@ -429,13 +377,10 @@ def set_process_run_record_count(self, num_records):
429377
process_run_records = self.process.total_record_count
430378

431379
if process_run_records == 0:
432-
self.logger.info('Adding %s records to total record count.' % num_records)
380+
433381
self.process.total_record_count += num_records
434382
else:
435-
new_records = num_records - process_run_records
436-
437-
self.logger.info('Adding %s records to total record count.' % new_records)
438-
self.process.total_record_count = self.process.total_record_count + new_records
383+
self.process.total_record_count = self.process.total_record_count + (num_records - process_run_records)
439384

440385
self.process_tracking_run.process_run_record_count = num_records
441386
self.session.commit()

0 commit comments

Comments
 (0)