Skip to content

Commit 3299820

Browse files
committed
process_tracker_python-51 Sources/Targets need tracking by individual objects
✨ Processes can track data source/target as well as source/target objects. Added ability to track source and target objects as optional addition. Closes #51
1 parent 373bd27 commit 3299820

File tree

4 files changed

+454
-53
lines changed

4 files changed

+454
-53
lines changed

process_tracker/models/process.py

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,8 @@ class ProcessSource(Base):
173173
nullable=False,
174174
)
175175

176-
sources = relationship("Source", passive_deletes="all")
177176
processes = relationship("Process", passive_deletes="all")
177+
sources = relationship("Source", passive_deletes="all")
178178

179179
def __repr__(self):
180180

@@ -184,6 +184,33 @@ def __repr__(self):
184184
)
185185

186186

187+
class ProcessSourceObject(Base):
188+
__tablename__ = "process_source_object"
189+
__table_args__ = {"schema": "process_tracker"}
190+
191+
process_id = Column(
192+
Integer,
193+
ForeignKey("process_tracker.process.process_id"),
194+
primary_key=True,
195+
nullable=False,
196+
)
197+
source_object_id = Column(
198+
Integer,
199+
ForeignKey("process_tracker.source_object_lkup.source_object_id"),
200+
primary_key=True,
201+
nullable=False,
202+
)
203+
204+
objects = relationship("SourceObject", passive_deletes="all")
205+
processes = relationship("Process", passive_deletes="all")
206+
207+
def __repr__(self):
208+
return "<ProcessSourceObject (process_id=%s, source_object=%s)>" % (
209+
self.process_id,
210+
self.source_object_id,
211+
)
212+
213+
187214
class ProcessTarget(Base):
188215
__tablename__ = "process_target"
189216
__table_args__ = {"schema": "process_tracker"}
@@ -201,13 +228,41 @@ class ProcessTarget(Base):
201228
nullable=False,
202229
)
203230

204-
targets = relationship("Source", passive_deletes="all")
205231
processes = relationship("Process", passive_deletes="all")
232+
targets = relationship("Source", passive_deletes="all")
206233

207234
def __repr__(self):
208235
return "<ProcessSource (process=%s, target_source=%s)>" % (
209236
self.process_id,
210-
self.source_id,
237+
self.target_source_id,
238+
)
239+
240+
241+
class ProcessTargetObject(Base):
242+
243+
__tablename__ = "process_target_object"
244+
__table_args__ = {"schema": "process_tracker"}
245+
246+
process_id = Column(
247+
Integer,
248+
ForeignKey("process_tracker.process.process_id"),
249+
primary_key=True,
250+
nullable=False,
251+
)
252+
target_object_id = Column(
253+
Integer,
254+
ForeignKey("process_tracker.source_object_lkup.source_object_id"),
255+
primary_key=True,
256+
nullable=False,
257+
)
258+
259+
objects = relationship("SourceObject", passive_deletes="all")
260+
processes = relationship("Process", passive_deletes="all")
261+
262+
def __repr__(self):
263+
return "<ProcessTargetObject (process_id=%s, target_object=%s)>" % (
264+
self.process_id,
265+
self.target_object_id,
211266
)
212267

213268

process_tracker/models/source.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
# SQLAlchemy Models
22
# Models for Source entities
33

4-
from sqlalchemy import Column, Integer, Sequence, String
4+
from sqlalchemy import Column, ForeignKey, Integer, Sequence, String, UniqueConstraint
5+
from sqlalchemy.orm import relationship
56

67
from process_tracker.models.model_base import Base
78

@@ -22,3 +23,36 @@ class Source(Base):
2223
def __repr__(self):
2324

2425
return "<Source (name=%s)>" % self.source_name
26+
27+
28+
class SourceObject(Base):
29+
30+
__tablename__ = "source_object_lkup"
31+
__table_args__ = {"schema": "process_tracker"}
32+
33+
source_object_id = Column(
34+
Integer,
35+
Sequence("source_object_lkup_source_object_id_seq", schema="process_tracker"),
36+
primary_key=True,
37+
nullable=False,
38+
)
39+
source_id = Column(
40+
Integer, ForeignKey("process_tracker.source_lkup.source_id"), nullable=False
41+
)
42+
source_object_name = Column(String(250), nullable=False)
43+
44+
UniqueConstraint(source_id, source_object_name)
45+
46+
source_processes = relationship(
47+
"ProcessSourceObject", back_populates="objects", passive_deletes="all"
48+
)
49+
target_processes = relationship(
50+
"ProcessTargetObject", back_populates="objects", passive_deletes="all"
51+
)
52+
53+
def __repr__(self):
54+
55+
return "<SourceObject (source_id=%s, source_object_name=%s)>" % (
56+
self.source_id,
57+
self.source_object_name,
58+
)

process_tracker/process_tracker.py

Lines changed: 143 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@
3030
ProcessTracking,
3131
ProcessStatus,
3232
ProcessSource,
33+
ProcessSourceObject,
3334
ProcessTarget,
35+
ProcessTargetObject,
3436
ProcessType,
3537
)
36-
from process_tracker.models.source import Source
38+
from process_tracker.models.source import Source, SourceObject
3739
from process_tracker.models.tool import Tool
3840

3941

@@ -44,20 +46,29 @@ def __init__(
4446
process_type,
4547
actor_name,
4648
tool_name,
47-
sources,
48-
targets,
49+
sources=None,
50+
targets=None,
51+
source_objects=None,
52+
target_objects=None,
4953
config_location=None,
5054
):
5155
"""
5256
ProcessTracker is the primary engine for tracking data integration processes.
5357
:param process_name: Name of the process being tracked.
5458
:param actor_name: Name of the person or environment runnning the process.
5559
:param tool_name: Name of the tool used to run the process.
56-
:param sources: A single source name or list of source names for the given process.
60+
:param sources: A single source name or list of source names for the given process. If source_objects is set,
61+
sources is ignored. Optional.
5762
:type sources: list
58-
:param targets: A single target name or list of target names for the given process.
63+
:param targets: A single target name or list of target names for the given process. If target_objects is set,
64+
targets is ignored. Optional.
5965
:type targets: list
60-
:param config_location: Location where Process Tracker configuration file is.
66+
:param source_objects: Finer grained list of sources, including source objects (i.e. tables). Optional.
67+
:type source_objects: dict of lists
68+
:param target_objects: Finer grained list of targets, including target objects (i.e. tables). Optional.
69+
:type target_objects: dict of lists
70+
:param config_location: Location where Process Tracker configuration file is. If not set, will use local home
71+
directory.
6172
:type config_location: file path
6273
"""
6374
config = SettingsManager().config
@@ -84,8 +95,23 @@ def __init__(
8495
process_tool_id=self.tool.tool_id,
8596
)
8697

87-
self.sources = self.register_process_sources(sources=sources)
88-
self.targets = self.register_process_targets(targets=targets)
98+
# Either sources or source_objects should be set, not both. Always go with lower grain if possible.
99+
100+
if source_objects is not None:
101+
self.sources = self.register_process_sources(source_objects=source_objects)
102+
elif sources is not None:
103+
self.sources = self.register_process_sources(sources=sources)
104+
else:
105+
self.sources = None
106+
107+
# Either targets or target_objects should be set, not both. Always go with lower grain if possible.
108+
109+
if target_objects is not None:
110+
self.targets = self.register_process_targets(target_objects=target_objects)
111+
elif targets is not None:
112+
self.targets = self.register_process_targets(targets=targets)
113+
else:
114+
self.targets = None
89115

90116
self.process_name = process_name
91117

@@ -448,52 +474,129 @@ def register_new_process_run(self):
448474
else:
449475
raise Exception("The process %s is currently running." % self.process_name)
450476

451-
def register_process_sources(self, sources):
477+
def register_process_sources(self, sources=None, source_objects=None):
452478
"""
453479
Register source(s) to a given process.
454-
:param sources: List of source name(s)
455-
:return: List of source objects.
480+
:param sources: List of source name(s) If source_objects is set, sources is ignored.
481+
:type sources: list
482+
:param source_objects: Finer grained list of source name(s) and their objects used in this process.
483+
:type source_objects: dict of lists
484+
:return: List of source or source object SQLAlchemy objects.
456485
"""
457-
if isinstance(sources, str):
458-
sources = [sources]
459-
source_list = []
486+
source_list = list()
460487

461-
for source in sources:
462-
source = self.data_store.get_or_create_item(
463-
model=Source, source_name=source
464-
)
488+
if source_objects is not None:
489+
if isinstance(source_objects, dict):
490+
for source, objects in source_objects.items():
465491

466-
self.data_store.get_or_create_item(
467-
model=ProcessSource,
468-
source_id=source.source_id,
469-
process_id=self.process.process_id,
470-
)
492+
source = self.data_store.get_or_create_item(
493+
model=Source, source_name=source
494+
)
495+
496+
self.data_store.get_or_create_item(
497+
model=ProcessSource,
498+
source_id=source.source_id,
499+
process_id=self.process.process_id,
500+
)
501+
502+
for item in objects:
503+
504+
source_object = self.data_store.get_or_create_item(
505+
model=SourceObject,
506+
source_id=source.source_id,
507+
source_object_name=item,
508+
)
509+
510+
self.data_store.get_or_create_item(
511+
model=ProcessSourceObject,
512+
process_id=self.process.process_id,
513+
source_object_id=source_object.source_object_id,
514+
)
515+
516+
source_list.append(source_object)
517+
else:
518+
self.logger.error("It appears source_objects is not a dictionary.")
519+
raise Exception("It appears source_objects is not a dictionary.")
520+
521+
elif sources is not None:
522+
if isinstance(sources, str):
523+
sources = [sources]
524+
525+
for source in sources:
526+
source = self.data_store.get_or_create_item(
527+
model=Source, source_name=source
528+
)
529+
530+
self.data_store.get_or_create_item(
531+
model=ProcessSource,
532+
source_id=source.source_id,
533+
process_id=self.process.process_id,
534+
)
535+
536+
source_list.append(source)
471537

472-
source_list.append(source)
473538
return source_list
474539

475-
def register_process_targets(self, targets):
540+
def register_process_targets(self, targets=None, target_objects=None):
476541
"""
477542
Register target source(s) to a given process.
478-
:param targets: List of source name(s)
479-
:return: List of source objects.
543+
:param targets: List of source name(s). If target_objects is set, targets is ignored.
544+
:type targets: list
545+
:param target_objects: Finer grained list of target name(s) and their objects used in this process.
546+
:type target_objects: dict of lists
547+
:return: List of source or source object SQLAlchemy objects.
480548
"""
481-
if isinstance(targets, str):
482-
targets = [targets]
483-
target_list = []
549+
target_list = list()
484550

485-
for target in targets:
486-
source = self.data_store.get_or_create_item(
487-
model=Source, source_name=target
488-
)
551+
if target_objects is not None:
552+
if isinstance(target_objects, dict):
553+
for target, objects in target_objects.items():
489554

490-
self.data_store.get_or_create_item(
491-
model=ProcessTarget,
492-
target_source_id=source.source_id,
493-
process_id=self.process.process_id,
494-
)
555+
target = self.data_store.get_or_create_item(
556+
model=Source, source_name=target
557+
)
558+
559+
self.data_store.get_or_create_item(
560+
model=ProcessTarget,
561+
target_source_id=target.source_id,
562+
process_id=self.process.process_id,
563+
)
564+
565+
for item in objects:
566+
567+
target_object = self.data_store.get_or_create_item(
568+
model=SourceObject,
569+
source_id=target.source_id,
570+
source_object_name=item,
571+
)
572+
573+
self.data_store.get_or_create_item(
574+
model=ProcessTargetObject,
575+
process_id=self.process.process_id,
576+
target_object_id=target_object.source_object_id,
577+
)
578+
579+
target_list.append(target_object)
580+
else:
581+
self.logger.error("It appears target_objects is not a dictionary.")
582+
raise Exception("It appears target_objects is not a dictionary.")
583+
584+
elif targets is not None:
585+
if isinstance(targets, str):
586+
targets = [targets]
587+
588+
for target in targets:
589+
source = self.data_store.get_or_create_item(
590+
model=Source, source_name=target
591+
)
592+
593+
self.data_store.get_or_create_item(
594+
model=ProcessTarget,
595+
target_source_id=source.source_id,
596+
process_id=self.process.process_id,
597+
)
495598

496-
target_list.append(source)
599+
target_list.append(source)
497600
return target_list
498601

499602
def set_process_run_low_high_dates(self, low_date=None, high_date=None):

0 commit comments

Comments
 (0)