Skip to content

Commit 5ce544f

Browse files
author
Alex Meadows
committed
process_tracker_python-87 Add process filters for source attributes
✨ Added ability for processes to store their source query filters ✨ Added finders for process source and target attributes
1 parent 8d0136c commit 5ce544f

File tree

9 files changed

+419
-3
lines changed

9 files changed

+419
-3
lines changed

dbscripts/mysql_process_tracker.sql

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,3 +535,37 @@ create table if not exists process_target_object_attribute
535535
constraint process_target_object_attribute_fk02
536536
foreign key (target_object_attribute_id) references source_object_attribute (source_object_attribute_id)
537537
);
538+
539+
create table filter_type_lkup
540+
(
541+
filter_type_id int auto_increment
542+
primary key,
543+
filter_type_code varchar(3) not null,
544+
filter_type_name varchar(75) not null,
545+
constraint filter_type_lkup_filter_type_code_uindex
546+
unique (filter_type_code),
547+
constraint filter_type_lkup_filter_type_name_uindex
548+
unique (filter_type_name)
549+
);
550+
551+
create table process_filter
552+
(
553+
process_filter_id int auto_increment
554+
primary key,
555+
process_id int not null,
556+
source_object_attribute_id int not null,
557+
filter_type_id int not null,
558+
filter_value_string varchar(250) null,
559+
filter_value_numeric decimal null,
560+
constraint process_filter_udx
561+
unique (process_id, source_object_attribute_id, filter_type_id),
562+
constraint process_filter_fk01
563+
foreign key (process_id) references process (process_id),
564+
constraint process_filter_fk02
565+
foreign key (source_object_attribute_id) references source_object_attribute (source_object_attribute_id),
566+
constraint process_filter_fk03
567+
foreign key (filter_type_id) references filter_type_lkup (filter_type_id)
568+
);
569+
570+
571+

dbscripts/mysql_process_tracker_defaults.sql

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,13 @@ INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, sche
3434
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (6, 'quarterly');
3535
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (4, 'weekly');
3636

37-
UPDATE process_tracker.schedule_frequency_lkup SET schedule_frequency_id = 0 WHERE schedule_frequency_name = 'unscheduled';
37+
UPDATE process_tracker.schedule_frequency_lkup SET schedule_frequency_id = 0 WHERE schedule_frequency_name = 'unscheduled';
38+
39+
INSERT INTO process_tracker.filter_type_lkup (filter_type_id, filter_type_code, filter_type_name) VALUES (1, 'eq', 'equal to');
40+
INSERT INTO process_tracker.filter_type_lkup (filter_type_id, filter_type_code, filter_type_name) VALUES (2, 'lt', 'less than');
41+
INSERT INTO process_tracker.filter_type_lkup (filter_type_id, filter_type_code, filter_type_name) VALUES (3, 'gt', 'greater than');
42+
INSERT INTO process_tracker.filter_type_lkup (filter_type_id, filter_type_code, filter_type_name) VALUES (4, 'lte', 'less than or equal');
43+
INSERT INTO process_tracker.filter_type_lkup (filter_type_id, filter_type_code, filter_type_name) VALUES (5, 'gte', 'greater than or equal');
44+
INSERT INTO process_tracker.filter_type_lkup (filter_type_id, filter_type_code, filter_type_name) VALUES (6, 'not', 'not equal');
45+
INSERT INTO process_tracker.filter_type_lkup (filter_type_id, filter_type_code, filter_type_name) VALUES (7, 'lke', 'like');
46+
INSERT INTO process_tracker.filter_type_lkup (filter_type_id, filter_type_code, filter_type_name) VALUES (8, 'in', 'in set');

dbscripts/postgresql_process_tracker.sql

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,47 @@ alter table process_type_lkup owner to pt_admin;
193193
create unique index process_type_lkup_udx01
194194
on process_type_lkup (process_type_name);
195195

196+
create table process_tracker.filter_type_lkup
197+
(
198+
filter_type_id serial not null
199+
constraint filter_type_lkup_pk
200+
primary key,
201+
filter_type_code varchar(3) not null,
202+
filter_type_name varchar(75) not null
203+
);
204+
205+
alter table process_tracker.filter_type_lkup owner to pt_admin;
206+
207+
create unique index filter_type_lkup_filter_type_code_uindex
208+
on process_tracker.filter_type_lkup (filter_type_code);
209+
210+
create unique index filter_type_lkup_filter_type_name_uindex
211+
on process_tracker.filter_type_lkup (filter_type_name);
212+
213+
create table process_tracker.process_filter
214+
(
215+
process_filter_id serial not null
216+
constraint process_filter_pk
217+
primary key,
218+
process_id integer not null
219+
constraint process_filter_fk01
220+
references process_tracker.process,
221+
source_object_attribute_id integer not null
222+
constraint process_filter_fk02
223+
references process_tracker.source_object_attribute,
224+
filter_type_id integer not null
225+
constraint process_filter_fk03
226+
references process_tracker.filter_type_lkup,
227+
filter_value_string varchar(250),
228+
filter_value_numeric numeric
229+
);
230+
231+
alter table process_tracker.process_filter owner to pt_admin;
232+
233+
create unique index process_filter_udx01
234+
on process_tracker.process_filter (process_id, source_object_attribute_id, filter_type_id);
235+
236+
196237
create table process_tracker.process
197238
(
198239
process_id serial not null

dbscripts/postgresql_process_tracker_defaults.sql

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,13 @@ INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, sche
3333
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (3, 'weekly');
3434
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (4, 'monthly');
3535
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (5, 'quarterly');
36-
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (6, 'annually');
36+
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (6, 'annually');
37+
38+
INSERT INTO process_tracker.filter_type_lkup (filter_type_id, filter_type_code, filter_type_name) VALUES (1, 'eq', 'equal to');
39+
INSERT INTO process_tracker.filter_type_lkup (filter_type_id, filter_type_code, filter_type_name) VALUES (2, 'lt', 'less than');
40+
INSERT INTO process_tracker.filter_type_lkup (filter_type_id, filter_type_code, filter_type_name) VALUES (3, 'gt', 'greater than');
41+
INSERT INTO process_tracker.filter_type_lkup (filter_type_id, filter_type_code, filter_type_name) VALUES (4, 'lte', 'less than or equal');
42+
INSERT INTO process_tracker.filter_type_lkup (filter_type_id, filter_type_code, filter_type_name) VALUES (5, 'gte', 'greater than or equal');
43+
INSERT INTO process_tracker.filter_type_lkup (filter_type_id, filter_type_code, filter_type_name) VALUES (6, 'not', 'not equal');
44+
INSERT INTO process_tracker.filter_type_lkup (filter_type_id, filter_type_code, filter_type_name) VALUES (7, 'lke', 'like');
45+
INSERT INTO process_tracker.filter_type_lkup (filter_type_id, filter_type_code, filter_type_name) VALUES (8, 'in', 'in set');

process_tracker/models/process.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
DateTime,
99
ForeignKey,
1010
Integer,
11+
Numeric,
1112
Sequence,
1213
String,
1314
UniqueConstraint,
@@ -232,6 +233,48 @@ def __repr__(self):
232233
)
233234

234235

236+
class ProcessFilter(Base):
237+
238+
__tablename__ = "process_filter"
239+
__table_args__ = {"schema": "process_tracker"}
240+
241+
process_filter_id = Column(
242+
Integer,
243+
Sequence("process_filter_process_filter_id_seq", schema="process_tracker"),
244+
primary_key=True,
245+
nullable=False,
246+
)
247+
process_id = Column(
248+
Integer, ForeignKey("process_tracker.process.process_id"), nullable=False
249+
)
250+
source_object_attribute_id = Column(
251+
Integer,
252+
ForeignKey(
253+
"process_tracker.source_object_attribute.source_object_attribute_id"
254+
),
255+
nullable=False,
256+
)
257+
filter_type_id = Column(
258+
Integer,
259+
ForeignKey("process_tracker.filter_type_lkup.filter_type_id"),
260+
nullable=False,
261+
)
262+
filter_value_string = Column(String(250), nullable=True)
263+
filter_value_numeric = Column(Numeric, nullable=True)
264+
265+
attributes = relationship("SourceObjectAttribute")
266+
267+
UniqueConstraint = (process_id, source_object_attribute_id, filter_type_id)
268+
269+
def __repr__(self):
270+
271+
return "<ProcessFilter process=%s, attribute=%s, filter_type=%s>" % (
272+
self.process_id,
273+
self.source_object_attribute_id,
274+
self.filter_type_id,
275+
)
276+
277+
235278
class ProcessSource(Base):
236279

237280
__tablename__ = "process_source"

process_tracker/models/source.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,28 @@ def __repr__(self):
5959
return "<DatasetType %s>" % self.dataset_type
6060

6161

62+
class FilterType(Base):
63+
64+
__tablename__ = "filter_type_lkup"
65+
__table_args__ = {"schema": "process_tracker"}
66+
67+
filter_type_id = Column(
68+
Integer,
69+
Sequence("filter_type_lkup_filter_type_id_seq", schema="process_tracker"),
70+
primary_key=True,
71+
nullable=False,
72+
)
73+
filter_type_code = Column(String(3), nullable=False, unique=True)
74+
filter_type_name = Column(String(75), nullable=False, unique=True)
75+
76+
def __repr__(self):
77+
78+
return "<FilterType code=%s, name=%s>" % (
79+
self.filter_type_code,
80+
self.filter_type_name,
81+
)
82+
83+
6284
class Source(Base):
6385

6486
__tablename__ = "source_lkup"
@@ -162,6 +184,8 @@ class SourceObject(Base):
162184
passive_deletes="all",
163185
)
164186

187+
sources = relationship("Source")
188+
165189
def __repr__(self):
166190

167191
return "<SourceObject (source_id=%s, source_object_name=%s)>" % (
@@ -204,6 +228,8 @@ class SourceObjectAttribute(Base):
204228

205229
UniqueConstraint(source_object_id, source_object_attribute_name)
206230

231+
source_objects = relationship("SourceObject")
232+
207233
def __repr__(self):
208234

209235
return "<Source Object Attribute id=%s, name=%s, source_object=%s>" % (

process_tracker/process_tracker.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
ProcessContact,
3131
ProcessDatasetType,
3232
ProcessDependency,
33+
ProcessFilter,
3334
ProcessTracking,
3435
ProcessStatus,
3536
ProcessSource,
@@ -43,6 +44,7 @@
4344
from process_tracker.models.schedule import ScheduleFrequency
4445
from process_tracker.models.source import (
4546
DatasetType,
47+
FilterType,
4648
Source,
4749
SourceContact,
4850
SourceDatasetType,
@@ -498,6 +500,124 @@ def find_process_by_schedule_frequency(self, frequency="daily"):
498500

499501
return process_list
500502

503+
def find_process_filters(self, process):
504+
"""
505+
For the given process, find the filters required for querying the source system.
506+
:param process: The process' process id.
507+
:type process: integer
508+
"""
509+
510+
filter_list = list()
511+
512+
filters = (
513+
self.session.query(
514+
Source.source_name,
515+
SourceObject.source_object_name,
516+
SourceObjectAttribute.source_object_attribute_name,
517+
FilterType.filter_type_code,
518+
ProcessFilter.filter_value_numeric,
519+
ProcessFilter.filter_value_string,
520+
)
521+
.join(SourceObjectAttribute, ProcessFilter.attributes)
522+
.join(SourceObject, SourceObjectAttribute.source_objects)
523+
.join(Source)
524+
.join(FilterType)
525+
.filter(ProcessFilter.process_id == process)
526+
.orderby(
527+
Source.source_name,
528+
SourceObject.source_object_name,
529+
SourceObjectAttribute.source_object_attribute_name,
530+
)
531+
)
532+
533+
for filter in filters:
534+
filter_list.append(
535+
{
536+
"source_name": filter.source_name,
537+
"source_object_name": filter.source_object_name,
538+
"source_object_attribute_name": filter.source_object_attribute_name,
539+
"filter_type_code": filter.filter_type_code,
540+
"filter_value_numeric": filter.filter_value_numeric,
541+
"filter_value_string": filter.filter_value_string,
542+
}
543+
)
544+
545+
return filter_list
546+
547+
def find_process_source_attributes(self, process):
548+
"""
549+
For the given process, find the attributes used for process sources.
550+
:param process:
551+
:return:
552+
"""
553+
554+
source_attribute_list = list()
555+
556+
source_attributes = (
557+
self.session.query(
558+
Source.source_name,
559+
SourceObject.source_object_name,
560+
SourceObjectAttribute.source_object_attribute_name,
561+
)
562+
.join(SourceObject, SourceObjectAttribute.source_objects)
563+
.join(Source, SourceObject.sources)
564+
.join(ProcessSourceObjectAttribute)
565+
.filter(ProcessSourceObjectAttribute.process_id == process)
566+
.order_by(
567+
Source.source_name,
568+
SourceObject.source_object_name,
569+
SourceObjectAttribute.source_object_attribute_name,
570+
)
571+
)
572+
573+
for attribute in source_attributes:
574+
source_attribute_list.append(
575+
{
576+
"source_name": attribute.source_name,
577+
"source_object_name": attribute.source_object_name,
578+
"source_object_attribute_name": attribute.source_object_attribute_name,
579+
}
580+
)
581+
582+
return source_attribute_list
583+
584+
def find_process_target_attributes(self, process):
585+
"""
586+
For the given process, find the attributes used for process targets.
587+
:param process:
588+
:return:
589+
"""
590+
591+
target_attribute_list = list()
592+
593+
source_attributes = (
594+
self.session.query(
595+
Source.source_name,
596+
SourceObject.source_object_name,
597+
SourceObjectAttribute.source_object_attribute_name,
598+
)
599+
.join(SourceObject, SourceObjectAttribute.source_objects)
600+
.join(Source, SourceObject.sources)
601+
.join(ProcessTargetObjectAttribute)
602+
.filter(ProcessTargetObjectAttribute.process_id == process)
603+
.order_by(
604+
Source.source_name,
605+
SourceObject.source_object_name,
606+
SourceObjectAttribute.source_object_attribute_name,
607+
)
608+
)
609+
610+
for attribute in source_attributes:
611+
target_attribute_list.append(
612+
{
613+
"target_name": attribute.source_name,
614+
"target_object_name": attribute.source_object_name,
615+
"target_object_attribute_name": attribute.source_object_attribute_name,
616+
}
617+
)
618+
619+
return target_attribute_list
620+
501621
def get_latest_tracking_record(self, process):
502622
"""
503623
For the given process, find the latest tracking record.

0 commit comments

Comments
 (0)