Skip to content

Commit 1973d18

Browse files
author
Alex Meadows
committed
process_tracker_python-102 Add relationship to extract and extract location
✨ Added automated code to associate process source objects/sources to any extracts used by the process. Sources/Source Objects now are associated to extracts and the extract's location. Closes #102
1 parent c30b221 commit 1973d18

File tree

5 files changed

+311
-4
lines changed

5 files changed

+311
-4
lines changed

process_tracker/extract_tracker.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,16 @@
1717
ExtractDatasetType,
1818
ExtractDependency,
1919
ExtractProcess,
20+
ExtractSource,
21+
ExtractSourceObject,
2022
ExtractStatus,
2123
)
24+
from process_tracker.models.source import (
25+
Source,
26+
SourceLocation,
27+
SourceObjectAttribute,
28+
SourceObjectLocation,
29+
)
2230
from process_tracker.models.source import DatasetType
2331

2432

@@ -104,6 +112,7 @@ def __init__(
104112
self.full_filename = self.get_full_filename()
105113
self.dataset_types = self.get_dataset_types()
106114
self.extract_process = self.retrieve_extract_process()
115+
self.sources = self.extract.extract_sources
107116

108117
else:
109118
if filename is None:
@@ -192,6 +201,28 @@ def __init__(
192201

193202
self.extract_process = self.retrieve_extract_process()
194203

204+
if self.process_run.source_objects is not None:
205+
self.logger.info(
206+
"Associating source system(s) object(s) with extract and location."
207+
)
208+
self.source_objects = self.register_extract_sources(
209+
source_objects=self.process_run.source_objects
210+
)
211+
self.sources = self.source_objects
212+
213+
elif self.process_run.process.sources is not None:
214+
self.logger.info(
215+
"Associating source system(s) with extract and location."
216+
)
217+
218+
self.sources = self.register_extract_sources(
219+
sources=self.process_run.sources
220+
)
221+
222+
else:
223+
self.logger.info("No source system(s) to associate to.")
224+
self.sources = None
225+
195226
if status is not None:
196227
self.logger.info("Status was provided by user.")
197228
self.change_extract_status(new_status=status)
@@ -422,6 +453,60 @@ def register_extract_dataset_types(self, dataset_types):
422453

423454
return dataset_types
424455

456+
def register_extract_sources(self, sources=None, source_objects=None):
457+
"""
458+
For the provided sources from process_run instance, associate with given Extract instance.
459+
:param sources: List of sources from process_run record.
460+
:param source_objects: List of sources and their objects from process_run record.
461+
:return:
462+
"""
463+
source_list = list()
464+
465+
if source_objects is not None:
466+
467+
for object in source_objects:
468+
self.logger.debug(
469+
"Associating extract %s to source %s."
470+
% (self.extract.extract_id, object.source_object_id)
471+
)
472+
473+
source_object = self.data_store.get_or_create_item(
474+
model=ExtractSourceObject,
475+
extract_id=self.extract.extract_id,
476+
source_object_id=object.source_object_id,
477+
)
478+
source_list.append(source_object)
479+
480+
self.data_store.get_or_create_item(
481+
SourceObjectLocation,
482+
source_object_id=object.source_object_id,
483+
location_id=self.extract.extract_location_id,
484+
)
485+
486+
elif sources is not None:
487+
488+
for source in sources:
489+
490+
self.logger.debug(
491+
"Associating extract %s to source %s."
492+
% (self.extract.extract_id, source.source_id)
493+
)
494+
extract_source = self.data_store.get_or_create_item(
495+
model=ExtractSource,
496+
extract_id=self.extract.extract_id,
497+
source_id=source.source_id,
498+
)
499+
source_list.append(extract_source)
500+
self.logger.debug("Extract source record created. %s" % extract_source)
501+
502+
self.data_store.get_or_create_item(
503+
model=SourceLocation,
504+
source_id=source.source_id,
505+
location_id=self.extract.extract_location_id,
506+
)
507+
508+
return source_list
509+
425510
def retrieve_extract_process(self):
426511
"""
427512
Create and initialize or retrieve the process/extract relationship.

process_tracker/models/extract.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ class Extract(Base):
139139
extract_process = relationship(
140140
"ExtractProcess", back_populates="process_extracts", passive_deletes="all"
141141
)
142+
extract_sources = relationship("ExtractSource")
142143
extract_status = relationship("ExtractStatus", foreign_keys=[extract_status_id])
143144
locations = relationship("Location", foreign_keys=[extract_location_id])
144145

@@ -254,6 +255,68 @@ def __repr__(self):
254255
)
255256

256257

258+
class ExtractSource(Base):
259+
260+
__tablename__ = "extract_source"
261+
__table_args__ = {"schema": "process_tracker"}
262+
263+
extract_id = Column(
264+
Integer,
265+
ForeignKey("process_tracker.extract_tracking.extract_id"),
266+
primary_key=True,
267+
nullable=False,
268+
)
269+
source_id = Column(
270+
Integer,
271+
ForeignKey("process_tracker.source_lkup.source_id"),
272+
primary_key=True,
273+
nullable=False,
274+
)
275+
276+
UniqueConstraint(extract_id, source_id)
277+
278+
extracts = relationship("Extract")
279+
sources = relationship("Source")
280+
281+
def __repr__(self):
282+
283+
return "<ExtractSource extract_id=%s, source_id=%s>" % (
284+
self.extract_id,
285+
self.source_id,
286+
)
287+
288+
289+
class ExtractSourceObject(Base):
290+
291+
__tablename__ = "extract_source_object"
292+
__table_args__ = {"schema": "process_tracker"}
293+
294+
extract_id = Column(
295+
Integer,
296+
ForeignKey("process_tracker.extract_tracking.extract_id"),
297+
primary_key=True,
298+
nullable=False,
299+
)
300+
source_object_id = Column(
301+
Integer,
302+
ForeignKey("process_tracker.source_object_lkup.source_object_id"),
303+
primary_key=True,
304+
nullable=False,
305+
)
306+
307+
UniqueConstraint(extract_id, source_object_id)
308+
309+
extracts = relationship("Extract")
310+
source_objects = relationship("SourceObject")
311+
312+
def __repr__(self):
313+
314+
return "<ExtractSourceObject extract_id=%s, source_object_id=%s>" % (
315+
self.extract_id,
316+
self.source_object_id,
317+
)
318+
319+
257320
class LocationType(Base):
258321

259322
__tablename__ = "location_type_lkup"

process_tracker/models/source.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,37 @@ def __repr__(self):
153153
)
154154

155155

156+
class SourceLocation(Base):
157+
158+
__tablename__ = "source_location"
159+
__table_args__ = {"schema": "process_tracker"}
160+
161+
source_id = Column(
162+
Integer,
163+
ForeignKey("process_tracker.source_lkup.source_id"),
164+
nullable=False,
165+
primary_key=True,
166+
)
167+
location_id = Column(
168+
Integer,
169+
ForeignKey("process_tracker.location_lkup.location_id"),
170+
nullable=False,
171+
primary_key=True,
172+
)
173+
174+
UniqueConstraint(source_id, location_id)
175+
176+
sources = relationship("Source")
177+
locations = relationship("Location")
178+
179+
def __repr__(self):
180+
181+
return "<SourceLocation source_id=%s, location_id=%s>" % (
182+
self.source_id,
183+
self.location_id,
184+
)
185+
186+
156187
class SourceObject(Base):
157188

158189
__tablename__ = "source_object_lkup"
@@ -194,6 +225,35 @@ def __repr__(self):
194225
)
195226

196227

228+
class SourceObjectLocation(Base):
229+
__tablename__ = "source_object_location"
230+
__table_args__ = {"schema": "process_tracker"}
231+
232+
source_object_id = Column(
233+
Integer,
234+
ForeignKey("process_tracker.source_object_lkup.source_object_id"),
235+
nullable=False,
236+
primary_key=True,
237+
)
238+
location_id = Column(
239+
Integer,
240+
ForeignKey("process_tracker.location_lkup.location_id"),
241+
nullable=False,
242+
primary_key=True,
243+
)
244+
245+
UniqueConstraint(source_object_id, location_id)
246+
247+
source_objects = relationship("SourceObject")
248+
locations = relationship("Location")
249+
250+
def __repr__(self):
251+
return "<SourceObjectLocation source_object_id=%s, location_id=%s>" % (
252+
self.source_object_id,
253+
self.location_id,
254+
)
255+
256+
197257
class SourceObjectAttribute(Base):
198258

199259
__tablename__ = "source_object_attribute_lkup"
@@ -202,7 +262,7 @@ class SourceObjectAttribute(Base):
202262
source_object_attribute_id = Column(
203263
Integer,
204264
Sequence(
205-
"source_object_attribute_lkup_source_object_attribute_id_seq",
265+
"source_object_attribute_source_object_attribute_id_seq",
206266
schema="process_tracker",
207267
),
208268
primary_key=True,

process_tracker/process_tracker.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,14 +196,19 @@ def __init__(
196196

197197
# sources, source_objects, or source_object_attributes should be set, not multiple. Always go with lower grain if possible.
198198

199+
self.source_object_attributes = None
200+
self.source_objects = None
201+
199202
if source_object_attributes is not None:
200-
self.sources = self.register_process_sources(
203+
self.source_object_attributes = self.register_process_sources(
201204
source_object_attributes=source_object_attributes
202205
)
206+
self.sources = self.source_object_attributes
203207
elif source_objects is not None:
204-
self.sources = self.register_process_sources(
208+
self.source_objects = self.register_process_sources(
205209
source_objects=source_objects
206210
)
211+
self.sources = self.source_objects
207212
elif sources is not None:
208213
self.sources = self.register_process_sources(sources=sources)
209214
else:

0 commit comments

Comments
 (0)