Skip to content

Commit cbe7802

Browse files
author
Alex Meadows
committed
process_tracker_python-89 Add Process Schedule Frequency
✨ General scheduling for processes is now available High level scheduling frequencies are now available to schedule processes by. This is no more fine grained that saying 'these jobs run hourly or monthly'. Actual scheduling and execution still has to be performed.
1 parent 45b9897 commit cbe7802

File tree

9 files changed

+174
-17
lines changed

9 files changed

+174
-17
lines changed

dbscripts/mysql_process_tracker.sql

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
USE process_tracker;
22

3+
create table schedule_frequency_lkup
4+
(
5+
schedule_frequency_id int auto_increment
6+
primary key,
7+
schedule_frequency_name varchar(25) not null,
8+
constraint schedule_frequency_lkup_schedule_frequency_name_uindex
9+
unique (schedule_frequency_name)
10+
);
11+
312
create table data_type_lkup
413
(
514
data_type_id int auto_increment
@@ -210,14 +219,23 @@ create table process
210219
process_type_id int null,
211220
process_tool_id int null,
212221
last_failed_run_date_time datetime not null,
222+
schedule_frequency_id int default 0 not null,
213223
constraint process_name
214224
unique (process_name),
225+
constraint process_fk03
226+
foreign key (schedule_frequency_id) references schedule_frequency_lkup (schedule_frequency_id),
215227
constraint process_ibfk_1
216228
foreign key (process_type_id) references process_type_lkup (process_type_id),
217229
constraint process_ibfk_2
218230
foreign key (process_tool_id) references tool_lkup (tool_id)
219231
);
220232

233+
create index process_tool_id
234+
on process_tracker.process (process_tool_id);
235+
236+
create index process_type_id
237+
on process_tracker.process (process_type_id);
238+
221239
create index process_tool_id
222240
on process (process_tool_id);
223241

dbscripts/mysql_process_tracker_defaults.sql

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,12 @@ INSERT INTO process_tracker.system_lkup (system_id, system_key, system_value) VA
2424

2525
INSERT INTO process_tracker.extract_compression_type_lkup (extract_compression_type_id, extract_compression_type) VALUES (1, 'zip');
2626

27-
INSERT INTO process_tracker.extract_filetype_lkup (extract_filetype_id, extract_filetype_code, extract_filetype, delimiter_char, quote_char, escape_char) VALUES (1, 'csv', 'Comma Separated Values', ',', '"', '/');
27+
INSERT INTO process_tracker.extract_filetype_lkup (extract_filetype_id, extract_filetype_code, extract_filetype, delimiter_char, quote_char, escape_char) VALUES (1, 'csv', 'Comma Separated Values', ',', '"', '/');
28+
29+
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (7, 'annually');
30+
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (3, 'daily');
31+
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (2, 'hourly');
32+
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (5, 'monthly');
33+
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (6, 'quarterly');
34+
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (0, 'unscheduled');
35+
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (4, 'weekly');

dbscripts/postgresql_process_tracker.sql

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,19 @@ create schema process_tracker;
44

55
alter schema process_tracker owner to pt_admin;
66

7+
create table schedule_frequency_lkup
8+
(
9+
schedule_frequency_id serial not null
10+
constraint schedule_frequency_lkup_pk
11+
primary key,
12+
schedule_frequency_name varchar(25) not null
13+
);
14+
15+
alter table schedule_frequency_lkup owner to pt_admin;
16+
17+
create unique index schedule_frequency_lkup_schedule_frequency_name_uindex
18+
on schedule_frequency_lkup (schedule_frequency_name);
19+
720
create table data_type_lkup
821
(
922
data_type_id serial not null
@@ -180,38 +193,41 @@ alter table process_type_lkup owner to pt_admin;
180193
create unique index process_type_lkup_udx01
181194
on process_type_lkup (process_type_name);
182195

183-
create table process
196+
create table process_tracker.process
184197
(
185198
process_id serial not null
186199
constraint process_pk
187200
primary key,
188201
process_name varchar(250) not null,
189202
total_record_count integer default 0 not null,
190-
process_type_id integer null
203+
process_type_id integer
191204
constraint process_fk02
192-
references process_type_lkup,
193-
process_tool_id integer null
205+
references process_tracker.process_type_lkup,
206+
process_tool_id integer
194207
constraint process_fk03
195-
references tool_lkup,
196-
last_failed_run_date_time timestamp default '1900-01-01 00:00:00'::timestamp without time zone not null
208+
references process_tracker.tool_lkup,
209+
last_failed_run_date_time timestamp default '1900-01-01 00:00:00'::timestamp without time zone not null,
210+
schedule_frequency_id integer default 0 not null
211+
constraint process_fk04
212+
references process_tracker.schedule_frequency_lkup
197213
);
198214

199-
comment on table process is 'Processes being tracked';
215+
comment on table process_tracker.process is 'Processes being tracked';
200216

201-
comment on column process.process_name is 'Unique name for process.';
217+
comment on column process_tracker.process.process_name is 'Unique name for process.';
202218

203-
comment on column process.total_record_count is 'Total number of records processed over all runs of process.';
219+
comment on column process_tracker.process.total_record_count is 'Total number of records processed over all runs of process.';
204220

205-
comment on column process.process_type_id is 'The type of process being tracked.';
221+
comment on column process_tracker.process.process_type_id is 'The type of process being tracked.';
206222

207-
comment on column process.process_tool_id is 'The type of tool used to execute the process.';
223+
comment on column process_tracker.process.process_tool_id is 'The type of tool used to execute the process.';
208224

209-
comment on column process.last_failed_run_date_time is 'The last time the process failed to run.';
225+
comment on column process_tracker.process.last_failed_run_date_time is 'The last time the process failed to run.';
210226

211-
alter table process owner to pt_admin;
227+
alter table process_tracker.process owner to pt_admin;
212228

213229
create unique index process_udx01
214-
on process (process_name);
230+
on process_tracker.process (process_name);
215231

216232
create table process_dependency
217233
(

dbscripts/postgresql_process_tracker_defaults.sql

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,12 @@ INSERT INTO process_tracker.system_lkup (system_id, system_key, system_value) VA
2525

2626
INSERT INTO process_tracker.extract_compression_type_lkup (extract_compression_type_id, extract_compression_type) VALUES (1, 'zip');
2727

28-
INSERT INTO process_tracker.extract_filetype_lkup (extract_filetype_id, extract_filetype_code, extract_filetype, delimiter_char, quote_char, escape_char) VALUES (1, 'csv', 'Comma Separated Values', ',', '"', '/');
28+
INSERT INTO process_tracker.extract_filetype_lkup (extract_filetype_id, extract_filetype_code, extract_filetype, delimiter_char, quote_char, escape_char) VALUES (1, 'csv', 'Comma Separated Values', ',', '"', '/');
29+
30+
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (0, 'unscheduled');
31+
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (1, 'hourly');
32+
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (2, 'daily');
33+
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (3, 'weekly');
34+
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (4, 'monthly');
35+
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (5, 'quarterly');
36+
INSERT INTO process_tracker.schedule_frequency_lkup (schedule_frequency_id, schedule_frequency_name) VALUES (6, 'annually');

process_tracker/models/process.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,12 @@ class Process(Base):
147147
last_failed_run_date_time = Column(
148148
DateTime(timezone=True), nullable=False, default=default_date
149149
)
150+
schedule_frequency_id = Column(
151+
Integer,
152+
ForeignKey("process_tracker.schedule_frequency_lkup.schedule_frequency_id"),
153+
nullable=False,
154+
default=0,
155+
)
150156

151157
cluster_processes = relationship("ClusterProcess", passive_deletes="all")
152158
process_tracking = relationship("ProcessTracking", passive_deletes="all")

process_tracker/models/schedule.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# SQLAlchemy Models
2+
# Models for Schedule entities
3+
4+
5+
from sqlalchemy import (
6+
Boolean,
7+
Column,
8+
DateTime,
9+
ForeignKey,
10+
Integer,
11+
Sequence,
12+
String,
13+
UniqueConstraint,
14+
)
15+
from sqlalchemy.orm import relationship
16+
17+
from process_tracker.models.model_base import default_date, Base
18+
19+
20+
class ScheduleFrequency(Base):
21+
22+
__tablename__ = "schedule_frequency_lkup"
23+
__table_args__ = {"schema": "process_tracker"}
24+
25+
schedule_frequency_id = Column(
26+
Integer,
27+
Sequence("schedule_frequency_schedule_frequency_id", schema="process_tracker"),
28+
primary_key=True,
29+
nullable=False,
30+
)
31+
schedule_frequency_name = Column(String(25), unique=True, nullable=False)

process_tracker/process_tracker.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
ProcessTargetObjectAttribute,
4141
ProcessType,
4242
)
43+
from process_tracker.models.schedule import ScheduleFrequency
4344
from process_tracker.models.source import (
4445
DatasetType,
4546
Source,
@@ -67,6 +68,7 @@ def __init__(
6768
target_object_attributes=None,
6869
config_location=None,
6970
dataset_types=None,
71+
schedule_frequency=None,
7072
):
7173
"""
7274
ProcessTracker is the primary engine for tracking data integration processes.
@@ -92,6 +94,8 @@ def __init__(
9294
:type config_location: file path
9395
:param dataset_types: A single dataset category type or list of dataset category types for the given process.
9496
:type dataset_types: list
97+
:param schedule_frequency: The general scheduling frequency for the process (i.e. hourly)
98+
:type schedule_frequency: string
9599
"""
96100
self.config_location = config_location
97101
self.config = SettingsManager(config_location=self.config_location)
@@ -112,11 +116,21 @@ def __init__(
112116
)
113117
self.tool = self.data_store.get_or_create_item(model=Tool, tool_name=tool_name)
114118

119+
if schedule_frequency is None:
120+
self.schedule_frequency = self.data_store.get_or_create_item(
121+
model=ScheduleFrequency, schedule_frequency_name="unscheduled"
122+
)
123+
else:
124+
self.schedule_frequency = self.data_store.get_or_create_item(
125+
model=ScheduleFrequency, schedule_frequency_name=schedule_frequency
126+
)
127+
115128
self.process = self.data_store.get_or_create_item(
116129
model=Process,
117130
process_name=process_name,
118131
process_type_id=self.process_type.process_type_id,
119132
process_tool_id=self.tool.tool_id,
133+
schedule_frequency_id=self.schedule_frequency.schedule_frequency_id,
120134
)
121135

122136
# Dataset types should be loaded before source and target because they are also used there.
@@ -465,6 +479,25 @@ def find_process_contacts(self, process):
465479

466480
return contacts
467481

482+
def find_process_by_schedule_frequency(self, frequency="daily"):
483+
"""
484+
For the given schedule frequency, find the processed on that schedule frequency.
485+
:param frequency: The desired frequency with which to retrieve processes.
486+
:return:
487+
"""
488+
process_list = list()
489+
490+
processes = (
491+
self.data_store.session.query(Process.process_id)
492+
.join(ScheduleFrequency)
493+
.filter(ScheduleFrequency.schedule_frequency_name == frequency)
494+
)
495+
496+
for process in processes:
497+
process_list.append(process.process_id)
498+
499+
return process_list
500+
468501
def get_latest_tracking_record(self, process):
469502
"""
470503
For the given process, find the latest tracking record.

process_tracker/utilities/data_store.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
ProcessType,
2323
ProcessStatus,
2424
)
25+
from process_tracker.models.schedule import ScheduleFrequency
2526
from process_tracker.models.source import Source
2627
from process_tracker.models.system import System
2728
from process_tracker.models.tool import Tool
@@ -38,7 +39,16 @@
3839
]
3940
preload_process_status_types = ["running", "completed", "failed", "on hold"]
4041
preload_process_types = ["extract", "load"]
41-
preload_system_keys = [{"version", "0.6.0"}]
42+
preload_schedule_frequencies = [
43+
"unscheduled",
44+
"hourly",
45+
"daily",
46+
"weekly",
47+
"monthly",
48+
"quarterly",
49+
"annually",
50+
]
51+
preload_system_keys = [{"version", "0.7.0"}]
4252

4353
supported_data_stores = ["postgresql", "mysql", "oracle", "mssql", "snowflake"]
4454

@@ -184,6 +194,13 @@ def initialize_data_store(self, overwrite=False):
184194
self.logger.info("Adding %s" % key)
185195
self.get_or_create_item(model=System, system_key=key, system_value=value)
186196

197+
self.logger.info("Adding schedule frequencies...")
198+
for frequency in preload_schedule_frequencies:
199+
self.logger.info("Adding %s" % frequency)
200+
self.get_or_create_item(
201+
model=ScheduleFrequency, schedule_frequency_name=frequency
202+
)
203+
187204
self.session.commit()
188205

189206
self.logger.debug("Finished the initialization check.")

tests/test_process_tracker.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1954,3 +1954,23 @@ def test_register_target_object_dataset_type(self):
19541954

19551955
self.assertEqual(source_expected_result, source_given_result)
19561956
self.assertEqual(object_expected_result, object_given_result)
1957+
1958+
def test_find_process_by_schedule_frequency(self):
1959+
"""Testing that when querying based on a given frequency, the process id(s) associated with that frequency are returned."""
1960+
1961+
process = ProcessTracker(
1962+
process_name="Testing Register Target Object Dataset Type",
1963+
process_type="Extract",
1964+
actor_name="UnitTesting",
1965+
tool_name="Spark",
1966+
source_objects={"Unittests": ["Table1"]},
1967+
target_objects={"Unittest Target": ["Table1"]},
1968+
dataset_types="Category 1",
1969+
schedule_frequency="hourly",
1970+
)
1971+
1972+
given_result = process.find_process_by_schedule_frequency(frequency="hourly")
1973+
1974+
expected_result = [process.process.process_id]
1975+
1976+
self.assertEqual(expected_result, given_result)

0 commit comments

Comments
 (0)