-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathpython_task_process_manager.py
More file actions
177 lines (165 loc) · 7.23 KB
/
python_task_process_manager.py
File metadata and controls
177 lines (165 loc) · 7.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import json
import queue
import subprocess
import sys
import threading
import typing
from pathlib import Path
from queue import Queue
from threading import Thread
from typing import Union
from PySide6.QtCore import QTimer
from PySide6.QtGui import QTextCharFormat
from je_editor.pyside_ui.main_ui.save_settings.user_color_setting_file import actually_color_dict
from je_editor.utils.venv_check.check_venv import check_and_choose_venv
from automation_ide.automation_editor_ui.show_code_window.code_window import CodeWindow
class TaskProcessManager(object):
def __init__(
self,
main_window: CodeWindow,
task_done_trigger_function: typing.Callable = None,
error_trigger_function: typing.Callable = None,
program_buffer_size: int = 1024,
program_encoding: str = "utf-8"
):
super().__init__()
self.compiler_path = None
# ite_instance param
self.read_program_error_output_from_thread: Union[threading.Thread, None] = None
self.read_program_output_from_thread: Union[threading.Thread, None] = None
self.main_window: CodeWindow = main_window
self.timer: QTimer = QTimer(self.main_window)
self.still_run_program: bool = True
self.program_encoding: str = program_encoding
self.run_output_queue: Queue = Queue()
self.run_error_queue: Queue = Queue()
self.process: Union[subprocess.Popen, None] = None
self.task_done_trigger_function: typing.Callable = task_done_trigger_function
self.error_trigger_function: typing.Callable = error_trigger_function
self.program_buffer_size = program_buffer_size
def renew_path(self) -> None:
if self.main_window.python_compiler is None:
# Renew compiler path
if sys.platform in ["win32", "cygwin", "msys"]:
venv_path = Path(str(Path.cwd()) + "/venv/Scripts")
else:
venv_path = Path(str(Path.cwd()) + "/venv/bin")
self.compiler_path = check_and_choose_venv(venv_path)
else:
self.compiler_path = self.main_window.python_compiler
def start_test_process(self, package: str, exec_str: str):
self.renew_path()
if sys.platform in ["win32", "cygwin", "msys"]:
exec_str = json.dumps(exec_str)
args = [
self.compiler_path,
"-m",
package,
"--execute_str",
exec_str
]
else:
args = " ".join([f"{self.compiler_path}", f"-m {package}", "--execute_str", f"{exec_str}"])
self.process: subprocess.Popen = subprocess.Popen(
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
encoding=self.program_encoding
)
self.still_run_program = True
# program output message queue thread
self.read_program_output_from_thread = Thread(
target=self.read_program_output_from_process,
daemon=True
)
self.read_program_output_from_thread.start()
# program error message queue thread
self.read_program_error_output_from_thread = Thread(
target=self.read_program_error_output_from_process,
daemon=True
)
self.read_program_error_output_from_thread.start()
# start Pyside update
# start timer
self.main_window.setWindowTitle(package)
self.main_window.show()
self.timer = QTimer()
self.timer.setInterval(100)
self.timer.timeout.connect(self.pull_text)
self.timer.start()
# Pyside UI update method
def pull_text(self):
try:
if not self.run_output_queue.empty():
output_message = self.run_output_queue.get_nowait()
output_message = str(output_message).strip()
if output_message:
text_cursor = self.main_window.code_result.textCursor()
text_format = QTextCharFormat()
text_format.setForeground(actually_color_dict.get("normal_output_color"))
text_cursor.insertText(output_message, text_format)
text_cursor.insertBlock()
if not self.run_error_queue.empty():
error_message = self.run_error_queue.get_nowait()
error_message = str(error_message).strip()
if error_message:
text_cursor = self.main_window.code_result.textCursor()
text_format = QTextCharFormat()
text_format.setForeground(actually_color_dict.get("error_output_color"))
text_cursor.insertText(error_message, text_format)
text_cursor.insertBlock()
except queue.Empty:
pass
if self.process is not None:
if self.process.returncode == 0:
if self.timer.isActive():
self.timer.stop()
self.exit_program()
elif self.process.returncode is not None:
if self.timer.isActive():
self.timer.stop()
self.exit_program()
if self.still_run_program:
# poll return code
self.process.poll()
else:
if self.timer.isActive():
self.timer.stop()
# exit program change run flag to false and clean read thread and queue and process
def exit_program(self):
self.still_run_program = False
if self.read_program_output_from_thread is not None:
self.read_program_output_from_thread = None
if self.read_program_error_output_from_thread is not None:
self.read_program_error_output_from_thread = None
self.print_and_clear_queue()
if self.process is not None:
self.process.terminate()
text_cursor = self.main_window.code_result.textCursor()
text_format = QTextCharFormat()
text_format.setForeground(actually_color_dict.get("normal_output_color"))
text_cursor.insertText(f"Task exit with code {self.process.returncode}", text_format)
text_cursor.insertBlock()
self.process = None
def print_and_clear_queue(self):
self.run_output_queue = queue.Queue()
self.run_error_queue = queue.Queue()
def read_program_output_from_process(self):
while self.still_run_program:
self.process: subprocess.Popen
program_output_data = self.process.stdout.readline(self.program_buffer_size) \
.decode("utf-8", "replace")
if self.process:
self.process.stdout.flush()
if program_output_data.strip() != "":
self.run_output_queue.put(program_output_data)
def read_program_error_output_from_process(self):
while self.still_run_program:
program_error_output_data = self.process.stderr.readline(self.program_buffer_size) \
.decode("utf-8", "replace")
if self.process:
self.process.stderr.flush()
if program_error_output_data.strip() != "":
self.run_error_queue.put(program_error_output_data)