Skip to content

Commit f378eef

Browse files
committed
process_tracker_python-9 Process can have more than one source
✨ Process now can have more than one source. Can also have more than one target Processes now have the ability to be registered with more than one source and more than one target (which is an alias of source).
1 parent fe49124 commit f378eef

File tree

7 files changed

+195
-63
lines changed

7 files changed

+195
-63
lines changed

dbscripts/postgresql_process_tracker.sql

Lines changed: 75 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,13 @@ create schema process_tracking;
44

55
alter schema process_tracking owner to pt_admin;
66

7-
create sequence actor_lkup_actor_id_seq;
8-
9-
alter sequence actor_lkup_actor_id_seq owner to pt_admin;
10-
11-
create table location_lkup
12-
(
13-
location_id serial not null
14-
constraint location_lkup_pk
15-
primary key,
16-
location_name varchar(750) not null,
17-
location_path varchar(750) not null
18-
);
19-
20-
comment on table location_lkup is 'Locations where files are located.';
7+
create schema process_tracking;
218

22-
alter table location_lkup owner to pt_admin;
9+
alter schema process_tracking owner to pt_admin;
2310

24-
create unique index location_lkup_udx01
25-
on location_lkup (location_name);
11+
create sequence actor_lkup_actor_id_seq;
2612

27-
create unique index location_lkup_udx02
28-
on location_lkup (location_path);
13+
alter sequence actor_lkup_actor_id_seq owner to pt_admin;
2914

3015
create table error_type_lkup
3116
(
@@ -136,9 +121,6 @@ create table process
136121
constraint process_pk
137122
primary key,
138123
process_name varchar(250) not null,
139-
process_source_id integer not null
140-
constraint process_fk01
141-
references source_lkup,
142124
total_record_count integer default 0 not null,
143125
process_type_id integer not null
144126
constraint process_fk02
@@ -153,8 +135,6 @@ comment on table process is 'Processes being tracked';
153135

154136
comment on column process.process_name is 'Unique name for process.';
155137

156-
comment on column process.process_source_id is 'The source that the process is extracting from.';
157-
158138
comment on column process.total_record_count is 'Total number of records processed over all runs of process.';
159139

160140
comment on column process.process_type_id is 'The type of process being tracked.';
@@ -274,12 +254,45 @@ alter table extract_status_lkup owner to pt_admin;
274254
create unique index extract_status_lkup_extract_status_name_uindex
275255
on extract_status_lkup (extract_status_name);
276256

257+
create table location_type_lkup
258+
(
259+
location_type_id serial not null
260+
constraint location_type_lkup_pk
261+
primary key,
262+
location_type_name varchar(25) not null
263+
);
264+
265+
comment on table location_type_lkup is 'Listing of location types';
266+
267+
alter table location_type_lkup owner to pt_admin;
268+
269+
create table location_lkup
270+
(
271+
location_id serial not null
272+
constraint location_lkup_pk
273+
primary key,
274+
location_name varchar(750) not null,
275+
location_path varchar(750) not null,
276+
location_type integer not null
277+
constraint location_lkup_fk01
278+
references location_type_lkup
279+
);
280+
281+
comment on table location_lkup is 'Locations where files are located.';
282+
283+
alter table location_lkup owner to pt_admin;
284+
285+
create unique index location_lkup_udx01
286+
on location_lkup (location_name);
287+
288+
create unique index location_lkup_udx02
289+
on location_lkup (location_path);
290+
277291
create table extract_tracking
278292
(
279293
extract_id serial not null
280294
constraint extract_tracking_pk
281295
primary key,
282-
extract_source_id integer not null,
283296
extract_filename varchar(750) not null,
284297
extract_location_id integer not null
285298
constraint extract_tracking_fk01
@@ -295,8 +308,6 @@ create table extract_tracking
295308

296309
comment on table extract_tracking is 'Tracking table for all extract/staging data files.';
297310

298-
comment on column extract_tracking.extract_source_id is 'Source identifier (source_lkup) for where the extract originated.';
299-
300311
comment on column extract_tracking.extract_filename is 'The unique filename for a given extract from a given source.';
301312

302313
comment on column extract_tracking.extract_location_id is 'The location where the given extract can be found.';
@@ -309,9 +320,6 @@ comment on column extract_tracking.extract_registration_date_time is 'The dateti
309320

310321
alter table extract_tracking owner to pt_admin;
311322

312-
create unique index extract_tracking_udx01
313-
on extract_tracking (extract_source_id, extract_filename);
314-
315323
create table extract_process_tracking
316324
(
317325
extract_tracking_id integer not null
@@ -332,6 +340,43 @@ comment on table extract_process_tracking is 'Showing which processes have impac
332340

333341
alter table extract_process_tracking owner to pt_admin;
334342

343+
create unique index location_type_lkup_udx01
344+
on location_type_lkup (location_type_name);
345+
346+
create table process_source
347+
(
348+
source_id integer not null
349+
constraint process_source_fk01
350+
references source_lkup,
351+
process_id integer not null
352+
constraint process_source_fk02
353+
references process,
354+
constraint process_source_pk
355+
primary key (source_id, process_id)
356+
);
357+
358+
comment on table process_source is 'List of sources for given processes';
359+
360+
alter table process_source owner to pt_admin;
361+
362+
create table process_target
363+
(
364+
target_source_id integer not null
365+
constraint process_target_fk01
366+
references source_lkup,
367+
process_id integer not null
368+
constraint process_target_fk02
369+
references process,
370+
constraint process_target_pk
371+
primary key (target_source_id, process_id)
372+
);
373+
374+
comment on table process_target is 'List of targets for given processes';
375+
376+
alter table process_target owner to pt_admin;
377+
378+
379+
335380
INSERT INTO process_tracking.extract_status_lkup (extract_status_id, extract_status_name) VALUES (1, 'initializing');
336381
INSERT INTO process_tracking.extract_status_lkup (extract_status_id, extract_status_name) VALUES (2, 'ready');
337382
INSERT INTO process_tracking.extract_status_lkup (extract_status_id, extract_status_name) VALUES (3, 'loading');

process_tracker/extract_tracker.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ def __init__(self, process_run, filename, location=None, location_name=None, loc
3636
self.session = self.data_store.session
3737
self.process_run = process_run
3838

39-
self.source = self.process_run.source
4039
self.filename = filename
4140

4241
if location is not None:
@@ -48,8 +47,7 @@ def __init__(self, process_run, filename, location=None, location_name=None, loc
4847

4948
self.extract = self.data_store.get_or_create(model=Extract
5049
, extract_filename=filename
51-
, extract_location_id=self.location.location.location_id
52-
, extract_source_id=self.source.source_id)
50+
, extract_location_id=self.location.location.location_id)
5351

5452
if location_path is not None:
5553
self.full_filename = join(location_path, filename)

process_tracker/models/extract.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,10 @@ class Extract(Base):
2424
__tablename__ = "extract_tracking"
2525

2626
extract_id = Column(Integer, Sequence('extract_tracking_extract_id_seq'), primary_key=True)
27-
extract_source_id = Column(Integer, ForeignKey("source_lkup.source_id"))
2827
extract_filename = Column(String(750), nullable=False, unique=True)
2928
extract_location_id = Column(Integer, ForeignKey('location_lkup.location_id'))
30-
# extract_process_run_id = Column(Integer, ForeignKey('process_tracking.process_tracking_id'))
3129
extract_status_id = Column(Integer, ForeignKey('extract_status_lkup.extract_status_id'))
3230
extract_registration_date_time = Column(DateTime, nullable=False, default=datetime.now())
33-
# extract_load_date_time = Column(DateTime, nullable=False, default=default_date)
34-
# extract_archive_date_time = Column(DateTime, nullable=False, default=default_date)
3531

3632
extract_process = relationship("ExtractProcess", back_populates='process_extracts')
3733
locations = relationship("Location", foreign_keys=[extract_location_id])

process_tracker/models/process.py

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -75,31 +75,52 @@ class Process(Base):
7575

7676
process_id = Column(Integer, Sequence('process_process_id_seq'), primary_key=True)
7777
process_name = Column(String(250), nullable=False, unique=True)
78-
process_source_id = Column(Integer, ForeignKey('source_lkup.source_id'))
79-
# latest_run_low_date_time = Column(DateTime(timezone=True), nullable=False, default=default_date)
80-
# latest_run_high_date_time = Column(DateTime(timezone=True), nullable=False, default=default_date)
81-
# latest_run_id = Column(Integer, nullable=False, default=0)
82-
# latest_run_start_date_time = Column(DateTime(timezone=True), nullable=False, default=default_date)
83-
# latest_run_end_date_time = Column(DateTime(timezone=True), nullable=False, default=default_date)
84-
# latest_run_process_status = Column(Integer, nullable=False, default=0)
85-
# latest_run_record_count = Column(Integer, nullable=False, default=0)
8678
total_record_count = Column(Integer, nullable=False, default=0)
87-
# latest_run_actor_id = Column(id.idType, ForeignKey('actor.actor_id'))
8879
process_type_id = Column(Integer, ForeignKey('process_type_lkup.process_type_id'))
8980
process_tool_id = Column(Integer, ForeignKey('tool_lkup.tool_id'))
9081
last_failed_run_date_time = Column(DateTime(timezone=True), nullable=False, default=default_date)
9182

9283
process_tracking = relationship("ProcessTracking")
9384
process_type = relationship("ProcessType", back_populates="processes")
94-
source = relationship("Source")
85+
sources = relationship("ProcessSource")
86+
targets = relationship("ProcessTarget")
9587
tool = relationship("Tool")
9688

9789
def __repr__(self):
9890

99-
return "<Process (id=%s, name=%s, source=%s, type=%s)>" % (self.process_id
100-
, self.process_name
101-
, self.process_source_id
102-
, self.process_type_id)
91+
return "<Process (id=%s, name=%s, type=%s)>" % (self.process_id
92+
, self.process_name
93+
, self.process_type_id)
94+
95+
96+
class ProcessSource(Base):
97+
98+
__tablename__ = 'process_source'
99+
100+
source_id = Column(Integer, ForeignKey('source_lkup.source_id'), primary_key=True)
101+
process_id = Column(Integer, ForeignKey('process.process_id'), primary_key=True)
102+
103+
sources = relationship("Source")
104+
processes = relationship("Process")
105+
106+
def __repr__(self):
107+
108+
return "<ProcessSource (process=%s, source=%s)>" % (self.process_id
109+
, self.source_id)
110+
111+
112+
class ProcessTarget(Base):
113+
__tablename__ = 'process_target'
114+
115+
target_source_id = Column(Integer, ForeignKey('source_lkup.source_id'), primary_key=True)
116+
process_id = Column(Integer, ForeignKey('process.process_id'), primary_key=True)
117+
118+
targets = relationship("Source")
119+
processes = relationship("Process")
120+
121+
def __repr__(self):
122+
return "<ProcessSource (process=%s, target_source=%s)>" % (self.process_id
123+
, self.source_id)
103124

104125

105126
class ProcessDependency(Base):

process_tracker/process_tracker.py

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,21 @@
1414

1515
from process_tracker.models.actor import Actor
1616
from process_tracker.models.extract import Extract, ExtractProcess, ExtractStatus, Location
17-
from process_tracker.models.process import ErrorTracking, ErrorType, Process, ProcessTracking, ProcessStatus, ProcessType
17+
from process_tracker.models.process import ErrorTracking, ErrorType, Process, ProcessTracking, ProcessStatus, ProcessSource, ProcessTarget, ProcessType
1818
from process_tracker.models.source import Source
1919
from process_tracker.models.tool import Tool
2020

2121

2222
class ProcessTracker:
2323

24-
def __init__(self, process_name, process_type, actor_name, tool_name, source_name):
24+
def __init__(self, process_name, process_type, actor_name, tool_name, sources, targets):
2525
"""
2626
ProcessTracker is the primary engine for tracking data integration processes.
2727
:param process_name: Name of the process being tracked.
2828
:param actor_name: Name of the person or environment runnning the process.
2929
:param tool_name: Name of the tool used to run the process.
30-
:param source_name: Name of the source that the data is coming from.
30+
:param sources: A single source name or list of source names for the given process.
31+
:type sources: list
3132
"""
3233

3334
self.logger = logging.getLogger(__name__)
@@ -37,14 +38,15 @@ def __init__(self, process_name, process_type, actor_name, tool_name, source_nam
3738

3839
self.actor = self.data_store.get_or_create(model=Actor, actor_name=actor_name)
3940
self.process_type = self.data_store.get_or_create(model=ProcessType, process_type_name=process_type)
40-
self.source = self.data_store.get_or_create(model=Source, source_name=source_name)
4141
self.tool = self.data_store.get_or_create(model=Tool, tool_name=tool_name)
4242

4343
self.process = self.data_store.get_or_create(model=Process, process_name=process_name
44-
, process_source_id=self.source.source_id
4544
, process_type_id=self.process_type.process_type_id
4645
, process_tool_id=self.tool.tool_id)
4746

47+
self.sources = self.register_process_sources(sources=sources)
48+
self.targets = self.register_process_targets(targets=targets)
49+
4850
self.process_name = process_name
4951

5052
# Getting all status types in the event there are custom status types added later.
@@ -289,6 +291,46 @@ def register_new_process_run(self):
289291
else:
290292
raise Exception('The process %s is currently running.' % self.process_name)
291293

294+
def register_process_sources(self, sources):
295+
"""
296+
Register source(s) to a given process.
297+
:param sources: List of source name(s)
298+
:return: List of source objects.
299+
"""
300+
if sources != list:
301+
sources = [sources]
302+
303+
source_list = []
304+
305+
for source in sources:
306+
source = self.data_store.get_or_create(model=Source, source_name=source)
307+
308+
self.data_store.get_or_create(model=ProcessSource, source_id=source.source_id
309+
, process_id=self.process.process_id)
310+
311+
source_list.append(source)
312+
return source_list
313+
314+
def register_process_targets(self, targets):
315+
"""
316+
Register target source(s) to a given process.
317+
:param targets: List of source name(s)
318+
:return: List of source objects.
319+
"""
320+
if targets != list:
321+
targets = [targets]
322+
323+
source_list = []
324+
325+
for target in targets:
326+
source = self.data_store.get_or_create(model=Source, source_name=target)
327+
328+
self.data_store.get_or_create(model=ProcessTarget, target_source_id=source.source_id
329+
, process_id=self.process.process_id)
330+
331+
source_list.append(source)
332+
return source_list
333+
292334
def set_process_run_low_high_dates(self, low_date=None, high_date=None):
293335
"""
294336
For the given process run, set the process_run_low_date_time and/or process_run_high_date_time.

tests/test_extract_tracker.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import unittest
44

55
from process_tracker.models.extract import Extract, ExtractProcess, Location
6-
from process_tracker.models.process import Process, ProcessTracking
6+
from process_tracker.models.process import Process, ProcessSource, ProcessTarget, ProcessTracking
77

88
from process_tracker.data_store import DataStore
99
from process_tracker.extract_tracker import ExtractTracker
@@ -18,7 +18,8 @@ def setUpClass(cls):
1818
, process_type='Load'
1919
, actor_name='UnitTesting'
2020
, tool_name='Spark'
21-
, source_name='Unittests')
21+
, sources='Unittests'
22+
, targets='Unittests')
2223

2324
cls.process_run = cls.process_tracker
2425

@@ -29,6 +30,8 @@ def setUpClass(cls):
2930
def tearDownClass(cls):
3031

3132
cls.session.query(ProcessTracking).delete()
33+
cls.session.query(ProcessSource).delete()
34+
cls.session.query(ProcessTarget).delete()
3235
cls.session.query(Process).delete()
3336
cls.session.commit()
3437

0 commit comments

Comments
 (0)