Skip to content

Commit 2e6d2e2

Browse files
authored
Merge pull request #6 from Comcast/feature-5
Feature 5
2 parents a9854d3 + 80e6f70 commit 2e6d2e2

9 files changed

Lines changed: 134 additions & 29 deletions

File tree

pyrunner/core/config.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
from subprocess import Popen, PIPE
2020
from collections import deque
2121

22-
2322
class Config:
2423
"""
2524
Captures framework-level configurations.

pyrunner/core/engine.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import pyrunner.core.constants as constants
1818
from pyrunner.core.config import Config
1919
from pyrunner.core.context import Context
20+
from pyrunner.core.signal import SignalHandler, SIG_ABORT, SIG_PAUSE
2021
from multiprocessing import Manager
2122

2223
import os, sys, glob
@@ -45,9 +46,10 @@ def __init__(self):
4546
def initiate(self, **kwargs):
4647
"""Begins the execution loop."""
4748

49+
signal_handler = SignalHandler(self.config)
4850
sys.path.append(self.config['worker_dir'])
4951
self.start_time = time.time()
50-
wait_interval = 1.0/self.config['tickrate'] if self.config['tickrate'] > 0 else 0
52+
wait_interval = 1.0/self.config['tickrate'] if self.config['tickrate'] >= 1 else 0
5153
last_save = 0
5254
ab_code = 0
5355

@@ -56,6 +58,13 @@ def initiate(self, **kwargs):
5658
# Execution loop
5759
try:
5860
while self.register.running_nodes or self.register.pending_nodes:
61+
sig_set = signal_handler.consume()
62+
63+
# Check for abort signals
64+
if SIG_ABORT in sig_set:
65+
print('ABORT signal received! Terminating all running Workers.')
66+
self._abort_all_workers()
67+
return -1
5968

6069
# Poll running nodes for completion/failure
6170
for node in self.register.running_nodes.copy():
@@ -106,9 +115,8 @@ def initiate(self, **kwargs):
106115
except KeyboardInterrupt:
107116
print('\nKeyboard Interrupt Received')
108117
print('\nCancelling Execution')
109-
for node in self.register.running_nodes:
110-
node.terminate()
111-
return
118+
self._abort_all_workers()
119+
return -1
112120

113121
if not kwargs.get('silent'):
114122
self._print_final_state(ab_code)
@@ -118,6 +126,13 @@ def initiate(self, **kwargs):
118126

119127
return len(self.register.failed_nodes)
120128

129+
def _abort_all_workers(self):
130+
for node in self.register.running_nodes:
131+
node.terminate()
132+
#self.register.running_nodes.remove(node)
133+
#self.register.aborted_nodes.add(node)
134+
#self.register.set_children_defaulted(node)
135+
121136
def _print_current_state(self):
122137
elapsed = time.time() - self.start_time
123138

pyrunner/core/node.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,16 @@ def execute(self):
8686

8787
try:
8888
worker_class = getattr(importlib.import_module(self.module), self.worker)
89+
90+
# Check if provided worker actually extends the Worker class.
8991
if issubclass(worker_class, Worker):
9092
worker = worker_class(self.context, self._retcode, self.logfile, self.argv)
93+
# If it does not extend the Worker class, initialize a reverse-Worker in which the
94+
# worker extends the provided class.
9195
else:
9296
worker = self.generate_worker()(self.context, self._retcode, self.logfile, self.argv)
97+
98+
# Launch the "run" method of the provided Worker under a new process.
9399
self._thread = multiprocessing.Process(target=worker.protected_run, daemon=False)
94100
self._thread.start()
95101
except Exception as e:
@@ -125,11 +131,15 @@ def poll(self, wait=False):
125131
logger.restart_message(self._attempts)
126132
self._retcode.value = -1
127133

128-
return self.retcode if not running or wait else None
134+
return self.retcode if (not running or wait) else None
129135

130136
def terminate(self):
131137
if self._thread.is_alive():
132138
self._thread.terminate()
139+
logger = lg.FileLogger(self.logfile)
140+
logger.open(False)
141+
logger._system_("Keyboard Interrupt (SIGINT) received. Terminating all Worker and exiting.")
142+
logger.close()
133143
return
134144

135145

pyrunner/core/pyrunner.py

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from pyrunner.core.engine import ExecutionEngine
3030
from pyrunner.core.config import Config
3131
from pyrunner.core.register import NodeRegister
32+
from pyrunner.core.signal import SignalHandler, SIG_ABORT, SIG_PAUSE, SIG_PULSE
3233
from pyrunner.version import __version__
3334

3435
from datetime import datetime as datetime
@@ -41,23 +42,25 @@ def __init__(self, **kwargs):
4142
self._environ = os.environ.copy()
4243
self.config = Config()
4344
self.notification = notification.EmailNotification()
45+
self.signal_handler = SignalHandler(self.config)
4446

4547
self.serde_obj = serde.ListSerDe()
4648
self.register = NodeRegister()
4749
self.engine = ExecutionEngine()
4850

4951
self._init_params = {
50-
'config_file' : kwargs.get('config_file'),
51-
'proc_file' : kwargs.get('proc_file'),
52-
'restart' : kwargs.get('restart', False),
53-
'cvar_list' : [],
54-
'exec_proc_name' : None,
55-
'exec_only_list' : [],
56-
'exec_disable_list' : [],
57-
'exec_from_id' : None,
58-
'exec_to_id' : None
52+
'config_file' : kwargs.get('config_file'),
53+
'proc_file' : kwargs.get('proc_file'),
54+
'restart' : kwargs.get('restart', False),
55+
'cvar_list' : [],
56+
'exec_proc_name' : None,
57+
'exec_only_list' : [],
58+
'exec_disable_list' : [],
59+
'exec_from_id' : None,
60+
'exec_to_id' : None
5961
}
6062

63+
# Lifecycle hooks
6164
self._on_create_func = None
6265
self._on_start_func = None
6366
self._on_restart_func = None
@@ -66,11 +69,23 @@ def __init__(self, **kwargs):
6669
self._on_destroy_func = None
6770

6871
self.parse_args()
72+
73+
if self.dup_proc_is_running():
74+
raise OSError('Another process for "{}" is already running!'.format(self.config['app_name']))
6975

7076
def reset_env(self):
7177
os.environ.clear()
7278
os.environ.update(self._environ)
7379

80+
def dup_proc_is_running(self):
81+
self.signal_handler.emit(SIG_PULSE)
82+
time.sleep(1.1)
83+
if SIG_PULSE not in self.signal_handler.peek():
84+
print(self.signal_handler.peek())
85+
return True
86+
else:
87+
return False
88+
7489
def load_proc_file(self, proc_file, restart=False):
7590
if not proc_file or not os.path.isfile(proc_file):
7691
return False
@@ -121,7 +136,7 @@ def plugin_notification(self, obj):
121136
if not isinstance(obj, notification.Notification): raise Exception('Notification plugin must implement the Notification interface')
122137
self.notification = obj
123138

124-
# App lifecycle hooks
139+
# App lifecycle hooks/decorators
125140
def on_create(self, func):
126141
self._on_create_func = func
127142
def on_start(self, func):
@@ -192,15 +207,15 @@ def run(self):
192207

193208
emit_notification = True
194209

195-
# # App lifecycle - SUCCESS
210+
# App lifecycle - SUCCESS
196211
if retcode == 0:
197212
if self._on_success_func:
198213
self._on_success_func()
199214
if not self.config['email_on_success']:
200215
print('Skipping Email Notification: Property "email_on_success" is set to FALSE.')
201216
emit_notification = False
202-
# # App lifecycle - FAIL
203-
else:
217+
# App lifecycle - FAIL (<0 is for ABORT or other interrupt)
218+
elif retcode > 0:
204219
if self._on_fail_func:
205220
self._on_fail_func()
206221
if not self.config['email_on_fail']:
@@ -265,7 +280,12 @@ def zip_log_files(self, exit_status):
265280

266281
try:
267282

268-
suffix = 'FAILURE' if exit_status else 'SUCCESS'
283+
if exit_status == -1:
284+
suffix = 'ABORT'
285+
elif exit_status > 0:
286+
suffix = 'FAILURE'
287+
else:
288+
suffix = 'SUCCESS'
269289

270290
zip_file = "{}/{}_{}_{}.zip".format(self.config['log_dir'], self.config['app_name'], constants.EXECUTION_TIMESTAMP, suffix)
271291
print('Zipping Up Log Files to: {}'.format(zip_file))
@@ -356,11 +376,13 @@ def exec_from(self, id) : return self.register.exec_from(id)
356376
def exec_disable(self, id_list) : return self.register.exec_disable(id_list)
357377

358378
def parse_args(self):
379+
abort = False
380+
359381
opt_list = 'c:l:n:e:x:N:D:A:t:drhiv'
360382
longopt_list = [
361-
'setup', 'help', 'nozip', 'interactive',
383+
'setup', 'help', 'nozip', 'interactive', 'abort',
362384
'restart', 'version', 'dryrun', 'debug',
363-
'preserve-context', 'dump-logs', 'disable-exclusive-jobs',
385+
'preserve-context', 'dump-logs', 'allow-duplicate-jobs',
364386
'email=', 'email-on-fail=', 'email-on-success=', 'ef=', 'es=',
365387
'env=', 'cvar=', 'context=',
366388
'to=', 'from=', 'descendants=', 'ancestors=',
@@ -417,10 +439,12 @@ def parse_args(self):
417439
self.config['tickrate'] = int(arg)
418440
elif opt in ['--preserve-context']:
419441
self.preserve_context = True
420-
elif opt in ['--disable-exclusive-jobs']:
421-
self.disable_exclusive_jobs = True
442+
elif opt in ['--allow-duplicate-jobs']:
443+
self._init_params['allow_duplicate_jobs'] = True
422444
elif opt in ['--exec-proc-name']:
423445
self._init_params['exec_proc_name'] = arg
446+
elif opt == '--abort':
447+
abort = True
424448
elif opt in ['--serde']:
425449
if arg.lower() == 'json':
426450
self.plugin_serde(serde.JsonSerDe())
@@ -441,6 +465,11 @@ def parse_args(self):
441465
raise RuntimeError('Config file (app_profile) has not been provided')
442466
self.config.source_config_file(self._init_params['config_file'])
443467

468+
if abort:
469+
print('Submitting ABORT signal to running job for: {}'.format(self.config['app_name']))
470+
self.signal_handler.emit(SIG_ABORT)
471+
sys.exit(0)
472+
444473
# Check if restart is possible (ctllog/ctx files exist)
445474
if self._init_params['restart'] and not self.is_restartable():
446475
self._init_params['restart'] = False

pyrunner/core/signal.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# Copyright 2019 Comcast Cable Communications Management, LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
# SPDX-License-Identifier: Apache-2.0
16+
17+
import os
18+
19+
SIG_ABORT = 'sig.abort'
20+
SIG_PAUSE = 'sig.pause'
21+
SIG_PULSE = 'sig.pulse'
22+
23+
_valid_signals = (SIG_ABORT, SIG_PAUSE, SIG_PULSE)
24+
25+
class SignalHandler:
26+
27+
def __init__(self, config):
28+
self.config = config
29+
30+
def sig_file(self, sig):
31+
return '{}/.{}.{}'.format(self.config['temp_dir'], self.config['app_name'], sig)
32+
33+
def emit(self, sig):
34+
if sig not in _valid_signals: return ValueError('Unknown signal type: {}'.format(sig))
35+
open(self.sig_file(sig), 'a').close()
36+
37+
def consume(self):
38+
sig_set = self.peek()
39+
for sig in sig_set:
40+
os.remove(self.sig_file(sig))
41+
return sig_set
42+
43+
def peek(self):
44+
return set([ s for s in _valid_signals if os.path.exists(self.sig_file(s)) ])

pyrunner/logger/abstract.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ def error(self, text):
5959
"""
6060
self._emit_('ERROR', text)
6161

62+
def _system_(self, text):
63+
"""
64+
Write a generic SYSTEM level log message.
65+
This is reserved for internal control messages.
66+
"""
67+
self._emit_('SYSTEM', text)
68+
6269
@abstractmethod
6370
def restart_message(self, restart_count):
6471
"""

pyrunner/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '4.2.5'
1+
__version__ = '4.3.0'

pyrunner/worker/abstract.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,9 @@ def protected_run(self):
5757
methods, if defined.
5858
"""
5959

60-
sys.stdout = open(self.logfile, 'a')
61-
sys.stderr = open(self.logfile, 'a')
60+
if self.logfile:
61+
sys.stdout = open(self.logfile, 'a')
62+
sys.stderr = open(self.logfile, 'a')
6263

6364
# RUN
6465
try:
@@ -83,7 +84,7 @@ def protected_run(self):
8384
else:
8485
# ON FAIL
8586
try:
86-
self.retcode = self.on_fail() or self.retcode
87+
self.on_fail() or self.retcode
8788
except NotImplementedError:
8889
pass
8990
except Exception as e:

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,6 @@
3030
license = 'Apache 2.0',
3131
long_description = 'Python utility providing text-based workflow manager.',
3232
entry_points = {
33-
'console_scripts': ['pyrunner=pyrunner.cli:main', 'pyrunner-repo=pyrunner.cli:repo']
33+
'console_scripts': ['pyrunner=pyrunner.cli:main']
3434
}
3535
)

0 commit comments

Comments
 (0)