diff --git a/README.md b/README.md
index 6b39669..3a8fc9d 100644
--- a/README.md
+++ b/README.md
@@ -1,56 +1,35 @@
-## LoadDensity
-[](https://pepy.tech/project/je-load-density)
-
-[](https://www.codacy.com/gh/JE-Chen/LoadDensity/dashboard?utm_source=github.com&utm_medium=referral&utm_content=JE-Chen/LoadDensity&utm_campaign=Badge_Grade)
-
-[](https://github.com/Intergration-Automation-Testing/LoadDensity/actions/workflows/stable_python3_8.yml)
-
-[](https://github.com/Intergration-Automation-Testing/LoadDensity/actions/workflows/stable_python3_9.yml)
-
-[](https://github.com/Intergration-Automation-Testing/LoadDensity/actions/workflows/stable_python3_10.yml)
-
-[](https://github.com/Intergration-Automation-Testing/LoadDensity/actions/workflows/stable_python3_11.yml)
-
-### Documentation
-
-[](https://loaddensity.readthedocs.io/en/latest/?badge=latest)
-
-[LoadDensity Doc Click Here!](https://loaddensity.readthedocs.io/en/latest/)
-
----
-> Project Kanban \
-> https://github.com/orgs/Integration-Automation/projects/2/views/1
-> * Load automation.
-> * Easy setup user template.
-> * Load Density script.
-> * Generate JSON/HTML/XML report.
-> * 1 sec / thousands requests.
-> * Fast spawn users.
-> * Multi test on one task.
-> * Specify test time.
-> * OS Independent.
-> * Remote automation support.
-> * Project & Template support.
----
-
-## install
+# LoadDensity
+A high‑performance load testing and automation tool.
+It supports fast user spawning, flexible templates, and generates reports in multiple formats.
+Designed to be cross‑platform and easy to integrate into your projects.
+
+- Load automation: Quickly set up and run load tests
+- User templates: Simple configuration for reusable test users
+- Load Density scripts: Define and execute repeatable scenarios
+- Report generation: Export results in JSON, HTML, or XML
+- High throughput: Thousands of requests per second
+- Fast user spawning: Scale up test users instantly
+- Multi‑test support: Run multiple tests on a single task
+- Configurable test duration: Specify how long tests should run
+- OS independent: Works across major operating systems
+- Remote automation: Execute tests remotely
+- Project & template support: Organize and reuse test setup
+
+## Installation
```
pip install je_locust_wrapper
```
-## require
+## Require
```
-python 3.8 or later
+python 3.9 or later
```
->* Test on
->> * windows 10 ~ 11
->> * osx 10.5 ~ 11 big sur
->> * ubuntu 20.0.4
->> * raspberry pi 3B+
->> * All test in test dir
->> *
-### Architecture Diagram
-
\ No newline at end of file
+## Tested Platforms
+- Windows 10 ~ 11
+- macOS 10.15 ~ 11 Big Sur
+- Ubuntu 20.04
+- Raspberry Pi 3B+
+- All test cases are located in the test directory
diff --git a/architecture_diagram/LoadDnesity_Archirecture.drawio b/architecture_diagram/LoadDnesity_Archirecture.drawio
deleted file mode 100644
index 08f8628..0000000
--- a/architecture_diagram/LoadDnesity_Archirecture.drawio
+++ /dev/null
@@ -1,406 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/architecture_diagram/LoadDnesity_Archirecture.drawio.png b/architecture_diagram/LoadDnesity_Archirecture.drawio.png
deleted file mode 100644
index 082c47c..0000000
Binary files a/architecture_diagram/LoadDnesity_Archirecture.drawio.png and /dev/null differ
diff --git a/dev.toml b/dev.toml
index 91cad16..2d993e5 100644
--- a/dev.toml
+++ b/dev.toml
@@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "je_load_density_dev"
-version = "0.0.77"
+version = "0.0.78"
authors = [
{ name = "JE-Chen", email = "jechenmailman@gmail.com" },
]
diff --git a/dev_requirements.txt b/dev_requirements.txt
index 371d98e..d2286ac 100644
--- a/dev_requirements.txt
+++ b/dev_requirements.txt
@@ -3,3 +3,4 @@ sphinx
twine
sphinx-rtd-theme
build
+pytest
diff --git a/je_load_density/gui/load_density_gui_thread.py b/je_load_density/gui/load_density_gui_thread.py
index ed20b54..dd858cc 100644
--- a/je_load_density/gui/load_density_gui_thread.py
+++ b/je_load_density/gui/load_density_gui_thread.py
@@ -1,26 +1,59 @@
from PySide6.QtCore import QThread
from je_load_density.wrapper.start_wrapper.start_test import start_test
+# 定義常數,避免硬編碼字串
+# Define constant to avoid hard-coded string
+DEFAULT_USER_TYPE = "fast_http_user"
+
class LoadDensityGUIThread(QThread):
+ """
+ GUI 測試執行緒
+ GUI Test Thread
+
+ 用於在背景執行負載測試,避免阻塞主介面。
+ Used to run load tests in the background without blocking the main GUI.
+ """
- def __init__(self):
+ def __init__(self,
+ request_url: str = None,
+ test_duration: int = None,
+ user_count: int = None,
+ spawn_rate: int = None,
+ http_method: str = None):
+ """
+ 初始化執行緒參數
+ Initialize thread parameters
+
+ :param request_url: 測試目標 URL (Target request URL)
+ :param test_duration: 測試持續時間 (Test duration in seconds)
+ :param user_count: 使用者數量 (Number of simulated users)
+ :param spawn_rate: 使用者生成速率 (User spawn rate)
+ :param http_method: HTTP 方法 (HTTP method, e.g., "GET", "POST")
+ """
super().__init__()
- self.url = None
- self.test_time = None
- self.user_count = None
- self.spawn_rate = None
- self.method = None
+ self.request_url = request_url
+ self.test_duration = test_duration
+ self.user_count = user_count
+ self.spawn_rate = spawn_rate
+ self.http_method = http_method
def run(self):
+ """
+ 執行負載測試
+ Run the load test
+ """
+ if not self.request_url or not self.http_method:
+ # 基本檢查,避免傳入 None
+ # Basic validation to avoid None values
+ raise ValueError("Request URL and HTTP method must be provided.")
+
start_test(
- {
- "user": "fast_http_user",
- },
- self.user_count, self.spawn_rate, self.test_time,
- **{
- "tasks": {
- self.method: {"request_url": self.url},
- }
+ {"user": DEFAULT_USER_TYPE}, # 使用者類型 (User type)
+ self.user_count,
+ self.spawn_rate,
+ self.test_duration,
+ tasks={
+ self.http_method: {"request_url": self.request_url}
}
)
\ No newline at end of file
diff --git a/je_load_density/gui/log_to_ui_filter.py b/je_load_density/gui/log_to_ui_filter.py
index ffefbcc..16b7291 100644
--- a/je_load_density/gui/log_to_ui_filter.py
+++ b/je_load_density/gui/log_to_ui_filter.py
@@ -1,10 +1,30 @@
import logging
import queue
-locust_log_queue = queue.Queue()
+# 建立一個佇列,用來存放攔截到的日誌訊息
+# Create a queue to store intercepted log messages
+log_message_queue: queue.Queue[str] = queue.Queue()
+
class InterceptAllFilter(logging.Filter):
+ """
+ 攔截所有日誌訊息並存入佇列
+ Intercept all log messages and store them into a queue
+
+ 此 Filter 可用於將 logging 模組的輸出導向 GUI 或其他處理流程。
+ This filter can be used to redirect logging outputs to a GUI or other processing pipelines.
+ """
+
+ def filter(self, record: logging.LogRecord) -> bool:
+ """
+ 攔截日誌紀錄並存入佇列
+ Intercept log record and put it into the queue
- def filter(self, record):
- locust_log_queue.put(record.getMessage())
- return False
+ :param record: logging.LogRecord 物件 (Log record object)
+ :return: False → 阻止訊息繼續傳遞到其他 Handler
+ False → Prevents the message from propagating to other handlers
+ """
+ # 只存放訊息文字,也可以改成存整個 record 以保留更多資訊
+ # Only store the message text; alternatively, store the whole record for more details
+ log_message_queue.put(record.getMessage())
+ return False
\ No newline at end of file
diff --git a/je_load_density/gui/main_widget.py b/je_load_density/gui/main_widget.py
index 99f8534..8542b4c 100644
--- a/je_load_density/gui/main_widget.py
+++ b/je_load_density/gui/main_widget.py
@@ -1,27 +1,52 @@
import logging
+import queue
+from typing import Optional
from PySide6.QtCore import QTimer
from PySide6.QtGui import QIntValidator
-from PySide6.QtWidgets import QWidget, QFormLayout, QLineEdit, QComboBox, QPushButton, QTextEdit, QVBoxLayout, QLabel
+from PySide6.QtWidgets import (
+ QWidget, QFormLayout, QLineEdit, QComboBox,
+ QPushButton, QTextEdit, QVBoxLayout, QLabel, QMessageBox
+)
from je_load_density.gui.load_density_gui_thread import LoadDensityGUIThread
from je_load_density.gui.language_wrapper.multi_language_wrapper import language_wrapper
-from je_load_density.gui.log_to_ui_filter import InterceptAllFilter, locust_log_queue
+from je_load_density.gui.log_to_ui_filter import InterceptAllFilter, log_message_queue
class LoadDensityWidget(QWidget):
+ """
+ 負載測試 GUI 控制元件
+ Load Test GUI Widget
- def __init__(self, parent=None):
+ 提供使用者輸入測試參數並啟動負載測試,
+ 並將日誌訊息即時顯示在 GUI 中。
+ Provides input fields for test parameters, starts load tests,
+ and displays log messages in real-time.
+ """
+
+ def __init__(self, parent: Optional[QWidget] = None):
super().__init__(parent)
- # from
+
+ # === 表單區域 (Form Section) ===
form_layout = QFormLayout()
+
+ # URL 輸入框 (Target URL input)
self.url_input = QLineEdit()
+
+ # 測試時間 (Test duration, must be int)
self.test_time_input = QLineEdit()
self.test_time_input.setValidator(QIntValidator())
+
+ # 使用者數量 (User count)
self.user_count_input = QLineEdit()
self.user_count_input.setValidator(QIntValidator())
+
+ # 生成速率 (Spawn rate)
self.spawn_rate_input = QLineEdit()
self.spawn_rate_input.setValidator(QIntValidator())
+
+ # HTTP 方法選擇 (HTTP method selection)
self.method_combobox = QComboBox()
self.method_combobox.addItems([
language_wrapper.language_word_dict.get("get"),
@@ -32,50 +57,75 @@ def __init__(self, parent=None):
language_wrapper.language_word_dict.get("head"),
language_wrapper.language_word_dict.get("options"),
])
+
+ # 將元件加入表單 (Add widgets to form layout)
form_layout.addRow(language_wrapper.language_word_dict.get("url"), self.url_input)
form_layout.addRow(language_wrapper.language_word_dict.get("test_time"), self.test_time_input)
form_layout.addRow(language_wrapper.language_word_dict.get("user_count"), self.user_count_input)
form_layout.addRow(language_wrapper.language_word_dict.get("spawn_rate"), self.spawn_rate_input)
form_layout.addRow(language_wrapper.language_word_dict.get("test_method"), self.method_combobox)
+ # === 啟動按鈕 (Start button) ===
self.start_button = QPushButton(language_wrapper.language_word_dict.get("start_button"))
self.start_button.clicked.connect(self.run_load_density)
- # Log panel
+ # === 日誌面板 (Log panel) ===
self.log_panel = QTextEdit()
self.log_panel.setReadOnly(True)
- # Add widget to vertical layout
+ # === 主版面配置 (Main layout) ===
main_layout = QVBoxLayout()
main_layout.addLayout(form_layout)
main_layout.addWidget(self.start_button)
main_layout.addWidget(QLabel(language_wrapper.language_word_dict.get("log")))
main_layout.addWidget(self.log_panel)
- # Param
- self.run_load_density_thread = None
+ # === 執行緒與計時器 (Thread & Timer) ===
+ self.run_load_density_thread: Optional[LoadDensityGUIThread] = None
self.pull_log_timer = QTimer()
- self.pull_log_timer.setInterval(20)
+ self.pull_log_timer.setInterval(50) # 稍微放大間隔,避免 UI 卡頓
self.pull_log_timer.timeout.connect(self.add_text_to_log)
self.setLayout(main_layout)
- def run_load_density(self):
+ def run_load_density(self) -> None:
+ """
+ 啟動負載測試
+ Start the load test
+ """
+ try:
+ test_time = int(self.test_time_input.text())
+ user_count = int(self.user_count_input.text())
+ spawn_rate = int(self.spawn_rate_input.text())
+ except ValueError:
+ QMessageBox.warning(self, "Invalid Input", "請輸入有效的數字\nPlease enter valid numbers")
+ return
+
self.run_load_density_thread = LoadDensityGUIThread()
- self.run_load_density_thread.url = self.url_input.text()
- self.run_load_density_thread.test_time = int(self.test_time_input.text())
- self.run_load_density_thread.user_count = int(self.user_count_input.text())
- self.run_load_density_thread.spawn_rate = int(self.spawn_rate_input.text())
- self.run_load_density_thread.method = self.method_combobox.currentText().lower()
- log_handler_list = [handler for handler in logging.getLogger("root").handlers if handler.name == "log_reader"]
+ self.run_load_density_thread.request_url = self.url_input.text()
+ self.run_load_density_thread.test_duration = test_time
+ self.run_load_density_thread.user_count = user_count
+ self.run_load_density_thread.spawn_rate = spawn_rate
+ self.run_load_density_thread.http_method = self.method_combobox.currentText().lower()
+
+ # 設定日誌攔截器 (Attach log filter)
+ root_logger = logging.getLogger("root")
+ log_handler_list = [handler for handler in root_logger.handlers if handler.name == "log_reader"]
if log_handler_list:
log_handler = log_handler_list[0]
- log_handler.addFilter(InterceptAllFilter())
+ # 避免重複新增 Filter (Prevent duplicate filters)
+ if not any(isinstance(f, InterceptAllFilter) for f in log_handler.filters):
+ log_handler.addFilter(InterceptAllFilter())
+
+ # 啟動執行緒與計時器 (Start thread & timer)
self.run_load_density_thread.start()
- self.pull_log_timer.stop()
self.pull_log_timer.start()
self.log_panel.clear()
- def add_text_to_log(self):
- if not locust_log_queue.empty():
- self.log_panel.append(locust_log_queue.get_nowait())
+ def add_text_to_log(self) -> None:
+ """
+ 將日誌訊息加入到 GUI 面板
+ Append log messages to GUI panel
+ """
+ while not log_message_queue.empty():
+ self.log_panel.append(log_message_queue.get_nowait())
\ No newline at end of file
diff --git a/je_load_density/gui/main_window.py b/je_load_density/gui/main_window.py
index 2c4e167..8d14828 100644
--- a/je_load_density/gui/main_window.py
+++ b/je_load_density/gui/main_window.py
@@ -1,6 +1,7 @@
import sys
+from typing import Optional
-from PySide6.QtWidgets import QMainWindow, QApplication
+from PySide6.QtWidgets import QMainWindow, QApplication, QWidget
from qt_material import QtStyleTools
from je_load_density.gui.language_wrapper.multi_language_wrapper import language_wrapper
@@ -8,18 +9,41 @@
class LoadDensityUI(QMainWindow, QtStyleTools):
+ """
+ 負載測試主視窗
+ Load Test Main Window
- def __init__(self):
- super().__init__()
+ 提供 GUI 介面,整合測試控制元件與樣式設定。
+ Provides the main GUI window, integrating the load test widget and applying styles.
+ """
+
+ def __init__(self, parent: Optional[QWidget] = None):
+ super().__init__(parent)
+
+ # 應用程式名稱 (Application name)
self.id = language_wrapper.language_word_dict.get("application_name")
+
+ # 在 Windows 平台設定 AppUserModelID,讓工作列顯示正確的應用程式名稱
+ # Set AppUserModelID on Windows so the taskbar shows the correct application name
if sys.platform in ["win32", "cygwin", "msys"]:
from ctypes import windll
windll.shell32.SetCurrentProcessExplicitAppUserModelID(self.id)
+
+ # 設定字體樣式 (Set font style)
self.setStyleSheet(
- f"font-size: 12pt;"
- f"font-family: 'Lato';"
+ "font-size: 12pt;"
+ "font-family: 'Lato';"
)
+
+ # 套用 qt-material 樣式 (Apply qt-material theme)
self.apply_stylesheet(self, "dark_amber.xml")
+
+ # 建立並設定主要控制元件 (Create and set main widget)
self.load_density_widget = LoadDensityWidget()
self.setCentralWidget(self.load_density_widget)
+if __name__ == "__main__":
+ app = QApplication(sys.argv)
+ window = LoadDensityUI()
+ window.show()
+ sys.exit(app.exec())
\ No newline at end of file
diff --git a/je_load_density/utils/callback/callback_function_executor.py b/je_load_density/utils/callback/callback_function_executor.py
index a206b19..4807fe1 100644
--- a/je_load_density/utils/callback/callback_function_executor.py
+++ b/je_load_density/utils/callback/callback_function_executor.py
@@ -1,18 +1,41 @@
import typing
from sys import stderr
-from je_load_density.utils.exception.exception_tags import get_bad_trigger_function, get_bad_trigger_method
+from je_load_density.utils.exception.exception_tags import (
+ get_bad_trigger_function,
+ get_bad_trigger_method,
+)
from je_load_density.utils.exception.exceptions import CallbackExecutorException
-from je_load_density.utils.generate_report.generate_html_report import generate_html, generate_html_report
-from je_load_density.utils.generate_report.generate_json_report import generate_json, generate_json_report
-from je_load_density.utils.generate_report.generate_xml_report import generate_xml, generate_xml_report
+from je_load_density.utils.generate_report.generate_html_report import (
+ generate_html,
+ generate_html_report,
+)
+from je_load_density.utils.generate_report.generate_json_report import (
+ generate_json,
+ generate_json_report,
+)
+from je_load_density.utils.generate_report.generate_xml_report import (
+ generate_xml,
+ generate_xml_report,
+)
from je_load_density.wrapper.start_wrapper.start_test import start_test
-class CallbackFunctionExecutor(object):
+class CallbackFunctionExecutor:
+ """
+ 回呼函式執行器
+ Callback Function Executor
- def __init__(self):
- self.event_dict: dict = {
+ 提供事件觸發與回呼機制,先執行指定的 trigger function,
+ 再執行 callback function。
+ Provides a mechanism to trigger a function from event_dict,
+ then execute a callback function.
+ """
+
+ def __init__(self) -> None:
+ # 事件字典,定義可觸發的函式
+ # Event dictionary, defines available trigger functions
+ self.event_dict: dict[str, typing.Callable] = {
"user_test": start_test,
"LD_generate_html": generate_html,
"LD_generate_html_report": generate_html_report,
@@ -23,25 +46,36 @@ def __init__(self):
}
def callback_function(
- self,
- trigger_function_name: str,
- callback_function: typing.Callable,
- callback_function_param: [dict, None] = None,
- callback_param_method: str = "kwargs",
- **kwargs
- ):
+ self,
+ trigger_function_name: str,
+ callback_function: typing.Callable,
+ callback_function_param: typing.Optional[typing.Union[dict, list]] = None,
+ callback_param_method: str = "kwargs",
+ **kwargs,
+ ) -> typing.Any:
"""
- :param trigger_function_name: what function we want to trigger only accept function in event_dict
- :param callback_function: what function we want to callback
- :param callback_function_param: callback function's param only accept dict
- :param callback_param_method: what type param will use on callback function only accept kwargs and args
- :param kwargs: trigger_function's param
- :return:
+ 執行事件函式並呼叫回呼函式
+ Execute trigger function and then call callback function
+
+ :param trigger_function_name: 事件函式名稱 (must exist in event_dict)
+ :param callback_function: 回呼函式 (callback function to execute)
+ :param callback_function_param: 回呼函式參數 (dict for kwargs, list for args)
+ :param callback_param_method: 參數傳遞方式 ("kwargs" or "args")
+ :param kwargs: 傳給事件函式的參數 (parameters for trigger function)
+ :return: 事件函式的回傳值 (return value of trigger function)
"""
try:
- if trigger_function_name not in self.event_dict.keys():
+ # 檢查事件函式是否存在
+ # Validate trigger function existence
+ if trigger_function_name not in self.event_dict:
raise CallbackExecutorException(get_bad_trigger_function)
- execute_return_value = self.event_dict.get(trigger_function_name)(**kwargs)
+
+ # 執行事件函式
+ # Execute trigger function
+ execute_return_value = self.event_dict[trigger_function_name](**kwargs)
+
+ # 執行回呼函式
+ # Execute callback function
if callback_function_param is not None:
if callback_param_method not in ["kwargs", "args"]:
raise CallbackExecutorException(get_bad_trigger_method)
@@ -51,9 +85,15 @@ def callback_function(
callback_function(*callback_function_param)
else:
callback_function()
+
return execute_return_value
+
except Exception as error:
+ # 目前只輸出錯誤,可以改成 logging 或 raise
+ # Currently prints error; can be replaced with logging or re-raise
print(repr(error), file=stderr)
-callback_executor = CallbackFunctionExecutor()
+# 建立全域執行器實例
+# Create global executor instance
+callback_executor = CallbackFunctionExecutor()
\ No newline at end of file
diff --git a/je_load_density/utils/executor/action_executor.py b/je_load_density/utils/executor/action_executor.py
index 5537710..4bb19ee 100644
--- a/je_load_density/utils/executor/action_executor.py
+++ b/je_load_density/utils/executor/action_executor.py
@@ -4,21 +4,43 @@
from inspect import getmembers, isbuiltin
from typing import Union, Any
-from je_load_density.utils.exception.exception_tags import executor_data_error, add_command_exception_tag
-from je_load_density.utils.exception.exception_tags import executor_list_error
+from je_load_density.utils.exception.exception_tags import (
+ executor_data_error,
+ add_command_exception_tag,
+ executor_list_error,
+)
from je_load_density.utils.exception.exceptions import LoadDensityTestExecuteException
-from je_load_density.utils.generate_report.generate_html_report import generate_html, generate_html_report
-from je_load_density.utils.generate_report.generate_json_report import generate_json, generate_json_report
-from je_load_density.utils.generate_report.generate_xml_report import generate_xml, generate_xml_report
+from je_load_density.utils.generate_report.generate_html_report import (
+ generate_html,
+ generate_html_report,
+)
+from je_load_density.utils.generate_report.generate_json_report import (
+ generate_json,
+ generate_json_report,
+)
+from je_load_density.utils.generate_report.generate_xml_report import (
+ generate_xml,
+ generate_xml_report,
+)
from je_load_density.utils.json.json_file.json_file import read_action_json
from je_load_density.utils.package_manager.package_manager_class import package_manager
from je_load_density.wrapper.start_wrapper.start_test import start_test
-class Executor(object):
+class Executor:
+ """
+ 執行器 (Executor)
+ Event-driven executor
- def __init__(self):
- self.event_dict = {
+ 提供事件字典 (event_dict),可根據動作名稱執行對應函式,
+ 並支援批次執行與檔案驅動。
+ Provides an event dictionary to execute functions by name,
+ supporting batch execution and file-driven execution.
+ """
+
+ def __init__(self) -> None:
+ # 初始化事件字典 (Initialize event dictionary)
+ self.event_dict: dict[str, Any] = {
"LD_start_test": start_test,
"LD_generate_html": generate_html,
"LD_generate_html_report": generate_html_report,
@@ -26,21 +48,29 @@ def __init__(self):
"LD_generate_json_report": generate_json_report,
"LD_generate_xml": generate_xml,
"LD_generate_xml_report": generate_xml_report,
- # Execute
+ # Executor internal methods
"LD_execute_action": self.execute_action,
"LD_execute_files": self.execute_files,
"LD_add_package_to_executor": package_manager.add_package_to_executor,
}
- # get all builtin function and add to event dict
- for function in getmembers(builtins, isbuiltin):
- self.event_dict.update({str(function[0]): function[1]})
- def _execute_event(self, action: list):
+ # 將所有 Python 內建函式加入事件字典
+ # Add all Python built-in functions to event_dict
+ for name, func in getmembers(builtins, isbuiltin):
+ self.event_dict[name] = func
+
+ def _execute_event(self, action: list) -> Any:
"""
- :param action: execute action
- :return: what event return
+ 執行單一事件
+ Execute a single event
+
+ :param action: 事件結構,例如 ["function_name", {"param": value}]
+ :return: 事件回傳值 (return value of executed event)
"""
event = self.event_dict.get(action[0])
+ if event is None:
+ raise LoadDensityTestExecuteException(executor_data_error + " " + str(action))
+
if len(action) == 2:
if isinstance(action[1], dict):
return event(**action[1])
@@ -51,68 +81,87 @@ def _execute_event(self, action: list):
else:
raise LoadDensityTestExecuteException(executor_data_error + " " + str(action))
- def execute_action(self, action_list: [list, dict]) -> dict:
+ def execute_action(self, action_list: Union[list, dict]) -> dict[str, Any]:
"""
- execute all action in action list
- :param action_list: like this structure
- [
- ["method on event_dict", {"param": params}],
- ["method on event_dict", {"param": params}]
- ]
- for loop and use execute_event function to execute
- :return: recode string, response as list
+ 執行多個事件
+ Execute multiple actions
+
+ :param action_list: 事件列表,例如:
+ [
+ ["LD_start_test", {"param": value}],
+ ["LD_generate_json", {"param": value}]
+ ]
+ :return: 執行紀錄字典 (execution record dict)
"""
if isinstance(action_list, dict):
action_list = action_list.get("load_density", None)
if action_list is None:
raise LoadDensityTestExecuteException(executor_list_error)
- execute_record_dict = dict()
+
+ execute_record_dict: dict[str, Any] = {}
+
try:
- if len(action_list) == 0 or isinstance(action_list, list) is False:
+ if not isinstance(action_list, list) or len(action_list) == 0:
raise LoadDensityTestExecuteException(executor_list_error)
except Exception as error:
print(repr(error), file=sys.stderr)
+
for action in action_list:
try:
event_response = self._execute_event(action)
- execute_record = "execute: " + str(action)
- execute_record_dict.update({execute_record: event_response})
+ execute_record = f"execute: {action}"
+ execute_record_dict[execute_record] = event_response
except Exception as error:
print(repr(error), file=sys.stderr)
print(action, file=sys.stderr)
- execute_record = "execute: " + str(action)
- execute_record_dict.update({execute_record: repr(error)})
+ execute_record = f"execute: {action}"
+ execute_record_dict[execute_record] = repr(error)
+
+ # 輸出執行結果 (Print execution results)
for key, value in execute_record_dict.items():
print(key)
print(value)
+
return execute_record_dict
- def execute_files(self, execute_files_list: list):
+ def execute_files(self, execute_files_list: list[str]) -> list[dict[str, Any]]:
"""
- execute action on all file in execute_files_list
- :param execute_files_list: list include execute files path
- :return: every execute detail as list
+ 執行檔案中的事件
+ Execute actions from files
+
+ :param execute_files_list: 檔案路徑列表 (list of file paths)
+ :return: 每個檔案的執行結果列表 (list of execution results per file)
"""
- execute_detail_list = list()
+ execute_detail_list: list[dict[str, Any]] = []
for file in execute_files_list:
execute_detail_list.append(self.execute_action(read_action_json(file)))
return execute_detail_list
+
+# 建立全域執行器 (Global executor instance)
executor = Executor()
package_manager.executor = executor
-def add_command_to_executor(command_dict: dict):
+def add_command_to_executor(command_dict: dict[str, Any]) -> None:
+ """
+ 新增自訂命令到執行器
+ Add custom commands to executor
+
+ :param command_dict: {command_name: function}
+ """
for command_name, command in command_dict.items():
if isinstance(command, (types.MethodType, types.FunctionType)):
- executor.event_dict.update({command_name: command})
+ executor.event_dict[command_name] = command
else:
raise LoadDensityTestExecuteException(add_command_exception_tag)
-def execute_action(action_list: list) -> dict:
+def execute_action(action_list: list) -> dict[str, Any]:
+ """全域執行事件 (Global execute action)"""
return executor.execute_action(action_list)
-def execute_files(execute_files_list: list) -> list:
- return executor.execute_files(execute_files_list)
+def execute_files(execute_files_list: list[str]) -> list[dict[str, Any]]:
+ """全域執行檔案事件 (Global execute files)"""
+ return executor.execute_files(execute_files_list)
\ No newline at end of file
diff --git a/je_load_density/utils/file_process/create_project_structure.py b/je_load_density/utils/file_process/create_project_structure.py
index 52828bb..af6ba05 100644
--- a/je_load_density/utils/file_process/create_project_structure.py
+++ b/je_load_density/utils/file_process/create_project_structure.py
@@ -1,17 +1,35 @@
from pathlib import Path
+from typing import Optional
+# 定義常數,避免硬編碼
+# Define constant to avoid hard-coded string
+TEMPLATE_DIR = "je_load_density/template"
-def _create_dir(dir_name: str):
+
+def _create_dir(dir_name: str) -> Optional[Path]:
"""
- create project dir
- :param dir_name: create dir use dir name
- :return: None
+ 建立專案目錄
+ Create project directory
+
+ :param dir_name: 要建立的目錄名稱 (Directory name to create)
+ :return: 成功建立或已存在的 Path 物件 (Path object if created/existed), None if failed
"""
- Path(dir_name).mkdir(
- parents=True,
- exist_ok=True
- )
+ try:
+ path = Path(dir_name)
+ path.mkdir(parents=True, exist_ok=True)
+ return path
+ except Exception as error:
+ # 錯誤處理:避免因權限或路徑問題導致程式崩潰
+ # Error handling: prevent crash due to permission or path issues
+ print(f"Failed to create directory {dir_name}: {error}")
+ return None
-def create_template_dir():
- _create_dir("je_load_density/template")
+def create_template_dir() -> Optional[Path]:
+ """
+ 建立模板目錄
+ Create template directory
+
+ :return: Path 物件或 None (Path object or None)
+ """
+ return _create_dir(TEMPLATE_DIR)
\ No newline at end of file
diff --git a/je_load_density/utils/file_process/get_dir_file_list.py b/je_load_density/utils/file_process/get_dir_file_list.py
index aa28fe6..690de02 100644
--- a/je_load_density/utils/file_process/get_dir_file_list.py
+++ b/je_load_density/utils/file_process/get_dir_file_list.py
@@ -1,18 +1,28 @@
-from os import getcwd
-from os import walk
-from os.path import abspath
-from os.path import join
+from pathlib import Path
+from typing import List
-def get_dir_files_as_list(dir_path: str = getcwd(), default_search_file_extension: str = ".json") -> list:
+def get_dir_files_as_list(
+ dir_path: str = str(Path.cwd()),
+ default_search_file_extension: str = ".json"
+) -> List[str]:
"""
- get dir file when end with default_search_file_extension
- :param dir_path: which dir we want to walk and get file list
- :param default_search_file_extension: which extension we want to search
- :return: [] if nothing searched or [file1, file2.... files] file was searched
+ 取得指定目錄下所有符合副檔名的檔案清單
+ Get all files in a directory that end with the given extension
+
+ :param dir_path: 要搜尋的目錄路徑 (Directory path to search)
+ :param default_search_file_extension: 要搜尋的副檔名 (File extension to search, e.g. ".json")
+ :return: 檔案絕對路徑清單 (List of absolute file paths)
"""
- return [
- abspath(join(dir_path, file)) for root, dirs, files in walk(dir_path)
- for file in files
- if file.endswith(default_search_file_extension.lower())
- ]
+ try:
+ path_obj = Path(dir_path)
+ if not path_obj.exists() or not path_obj.is_dir():
+ raise FileNotFoundError(f"Directory not found: {dir_path}")
+
+ return [
+ str(file.resolve())
+ for file in path_obj.rglob(f"*{default_search_file_extension.lower()}")
+ ]
+ except Exception as error:
+ print(f"Error while scanning directory {dir_path}: {error}")
+ return []
\ No newline at end of file
diff --git a/je_load_density/utils/generate_report/generate_html_report.py b/je_load_density/utils/generate_report/generate_html_report.py
index 08525b2..e5097bb 100644
--- a/je_load_density/utils/generate_report/generate_html_report.py
+++ b/je_load_density/utils/generate_report/generate_html_report.py
@@ -1,222 +1,129 @@
import sys
from threading import Lock
+from typing import List, Tuple
from je_load_density.utils.exception.exceptions import LoadDensityHTMLException
from je_load_density.utils.exception.exception_tags import html_generate_no_data_tag
from je_load_density.utils.test_record.test_record_class import test_record_instance
-_html_string_head = \
- """
-
-
-
-
- Load Density Report
-
-
-
-
- Test Report
-
- """.strip()
+# HTML 標頭 (HTML head)
+_HTML_STRING_HEAD = """
+
+
+
+ Load Density Report
+
+
+
+Test Report
+""".strip()
-_html_string_bottom = \
- """
-
-
- """.strip()
+# HTML 結尾 (HTML bottom)
+_HTML_STRING_BOTTOM = """""".strip()
-_success_table = \
- r"""
-
-
-
- | Test Report |
-
-
-
-
- | Method |
- {Method} |
-
-
- | test_url |
- {test_url} |
-
-
- | name |
- {name} |
-
-
- | status_code |
- {status_code} |
-
-
- | text |
- {text} |
-
-
- | content |
- {content} |
-
-
- | headers |
- {headers} |
-
-
-
-
- """.strip()
+# 成功測試表格模板 (Success table template)
+_SUCCESS_TABLE = r"""
+
+
+| Test Report |
+
+
+| Method | {Method} |
+| test_url | {test_url} |
+| name | {name} |
+| status_code | {status_code} |
+| text | {text} |
+| content | {content} |
+| headers | {headers} |
+
+
+
+""".strip()
-_failure_table = \
- r"""
-
-
-
- | Test Report |
-
-
-
-
- | http_method |
- {http_method} |
-
-
- | test_url |
- {test_url} |
-
-
- | name |
- {name} |
-
-
- | status_code |
- {status_code} |
-
-
- | error |
- {error} |
-
-
-
-
- """.strip()
+# 失敗測試表格模板 (Failure table template)
+_FAILURE_TABLE = r"""
+
+
+| Test Report |
+
+
+| http_method | {http_method} |
+| test_url | {test_url} |
+| name | {name} |
+| status_code | {status_code} |
+| error | {error} |
+
+
+
+""".strip()
-def generate_html():
+def generate_html() -> Tuple[List[str], List[str]]:
"""
- :return: success_test_dict, failure_test_dict
+ 產生 HTML 片段 (Generate HTML fragments)
+
+ :return: (成功測試清單, 失敗測試清單)
+ (list of success test HTML fragments, list of failure test HTML fragments)
"""
- if len(test_record_instance.test_record_list) == 0 and len(test_record_instance.error_record_list) == 0:
+ if not test_record_instance.test_record_list and not test_record_instance.error_record_list:
raise LoadDensityHTMLException(html_generate_no_data_tag)
- else:
- success_list = list()
- for record_data in test_record_instance.test_record_list:
- success_list.append(
- _success_table.format(
- Method=record_data.get("Method"),
- test_url=record_data.get("test_url"),
- name=record_data.get("name"),
- status_code=record_data.get("status_code"),
- text=record_data.get("text"),
- content=record_data.get("content"),
- headers=record_data.get("headers"),
- )
- )
- failure_list = list()
- if len(test_record_instance.error_record_list) == 0:
- pass
- else:
- for record_data in test_record_instance.error_record_list:
- failure_list.append(
- _failure_table.format(
- http_method=record_data.get("Method"),
- test_url=record_data.get("test_url"),
- name=record_data.get("name"),
- status_code=record_data.get("status_code"),
- error=record_data.get("error"),
- )
- )
+
+ success_list: List[str] = [
+ _SUCCESS_TABLE.format(
+ Method=record.get("Method"),
+ test_url=record.get("test_url"),
+ name=record.get("name"),
+ status_code=record.get("status_code"),
+ text=record.get("text"),
+ content=record.get("content"),
+ headers=record.get("headers"),
+ )
+ for record in test_record_instance.test_record_list
+ ]
+
+ failure_list: List[str] = [
+ _FAILURE_TABLE.format(
+ http_method=record.get("Method"),
+ test_url=record.get("test_url"),
+ name=record.get("name"),
+ status_code=record.get("status_code"),
+ error=record.get("error"),
+ )
+ for record in test_record_instance.error_record_list
+ ]
+
return success_list, failure_list
-def generate_html_report(html_name: str = "default_name"):
+def generate_html_report(html_name: str = "default_name") -> str:
"""
- format html_string and output html file
- :param html_name: save html file name
- :return: html_string
+ 產生完整 HTML 報告並輸出檔案
+ Generate full HTML report and save to file
+
+ :param html_name: 輸出檔案名稱 (Output file name, without extension)
+ :return: HTML 字串 (HTML string)
"""
_lock = Lock()
success_list, failure_list = generate_html()
+
try:
- _lock.acquire()
- with open(html_name + ".html", "w+") as file_to_write:
- file_to_write.writelines(
- _html_string_head
- )
- for success in success_list:
- file_to_write.write(success)
- for failure in failure_list:
- file_to_write.write(failure)
- file_to_write.writelines(
- _html_string_bottom
- )
+ with _lock: # 使用 with 確保自動 acquire/release
+ html_path = f"{html_name}.html"
+ with open(html_path, "w+", encoding="utf-8") as file_to_write:
+ file_to_write.write(_HTML_STRING_HEAD)
+ file_to_write.writelines(success_list)
+ file_to_write.writelines(failure_list)
+ file_to_write.write(_HTML_STRING_BOTTOM)
+ return html_path
except Exception as error:
print(repr(error), file=sys.stderr)
- finally:
- _lock.release()
+ return ""
\ No newline at end of file
diff --git a/je_load_density/utils/generate_report/generate_json_report.py b/je_load_density/utils/generate_report/generate_json_report.py
index ad1be6c..e082d13 100644
--- a/je_load_density/utils/generate_report/generate_json_report.py
+++ b/je_load_density/utils/generate_report/generate_json_report.py
@@ -1,72 +1,76 @@
import json
import sys
from threading import Lock
+from typing import Tuple, Dict
from je_load_density.utils.exception.exception_tags import cant_generate_json_report
from je_load_density.utils.exception.exceptions import LoadDensityGenerateJsonReportException
from je_load_density.utils.test_record.test_record_class import test_record_instance
-def generate_json():
- if len(test_record_instance.test_record_list) == 0 and len(test_record_instance.error_record_list) == 0:
+def generate_json() -> Tuple[Dict[str, dict], Dict[str, dict]]:
+ """
+ 產生測試紀錄的 JSON 結構
+ Generate JSON structure for test records
+
+ :return: (成功測試字典, 失敗測試字典)
+ (success_dict, failure_dict)
+ """
+ if not test_record_instance.test_record_list and not test_record_instance.error_record_list:
raise LoadDensityGenerateJsonReportException(cant_generate_json_report)
- else:
- success_dict = dict()
- failure_dict = dict()
- failure_count: int = 1
- failure_test_str: str = "Failure_Test"
- success_count: int = 1
- success_test_str: str = "Success_Test"
- for record_data in test_record_instance.test_record_list:
- success_dict.update(
- {
- success_test_str + str(success_count): {
- "Method": str(record_data.get("Method")),
- "test_url": str(record_data.get("test_url")),
- "name": str(record_data.get("name")),
- "status_code": str(record_data.get("status_code")),
- "text": str(record_data.get("text")),
- "content": str(record_data.get("content")),
- "headers": str(record_data.get("headers"))
- }
- }
- )
- success_count = success_count + 1
- for record_data in test_record_instance.error_record_list:
- failure_dict.update(
- {
- failure_test_str + str(failure_count): {
- "Method": str(record_data.get("Method")),
- "test_url": str(record_data.get("test_url")),
- "name": str(record_data.get("name")),
- "status_code": str(record_data.get("status_code")),
- "error": str(record_data.get("error"))
- }
- }
- )
- failure_count = failure_count + 1
+
+ success_dict: Dict[str, dict] = {}
+ failure_dict: Dict[str, dict] = {}
+
+ # 成功測試紀錄 (Success records)
+ for idx, record_data in enumerate(test_record_instance.test_record_list, start=1):
+ success_dict[f"Success_Test{idx}"] = {
+ "Method": str(record_data.get("Method")),
+ "test_url": str(record_data.get("test_url")),
+ "name": str(record_data.get("name")),
+ "status_code": str(record_data.get("status_code")),
+ "text": str(record_data.get("text")),
+ "content": str(record_data.get("content")),
+ "headers": str(record_data.get("headers")),
+ }
+
+ # 失敗測試紀錄 (Failure records)
+ for idx, record_data in enumerate(test_record_instance.error_record_list, start=1):
+ failure_dict[f"Failure_Test{idx}"] = {
+ "Method": str(record_data.get("Method")),
+ "test_url": str(record_data.get("test_url")),
+ "name": str(record_data.get("name")),
+ "status_code": str(record_data.get("status_code")),
+ "error": str(record_data.get("error")),
+ }
+
return success_dict, failure_dict
-def generate_json_report(json_file_name: str = "default_name"):
+def generate_json_report(json_file_name: str = "default_name") -> Tuple[str, str]:
"""
- :param json_file_name: save json file's name
+ 輸出測試紀錄 JSON 報告
+ Generate JSON report files for test records
+
+ :param json_file_name: 輸出檔案名稱前綴 (Output file name prefix)
+ :return: (成功檔案路徑, 失敗檔案路徑)
"""
lock = Lock()
success_dict, failure_dict = generate_json()
+
+ success_path = f"{json_file_name}_success.json"
+ failure_path = f"{json_file_name}_failure.json"
+
try:
- lock.acquire()
- with open(json_file_name + "_success.json", "w+") as file_to_write:
- json.dump(dict(success_dict), file_to_write, indent=4)
- except Exception as error:
- print(repr(error), file=sys.stderr)
- finally:
- lock.release()
- try:
- lock.acquire()
- with open(json_file_name + "_failure.json", "w+") as file_to_write:
- json.dump(dict(failure_dict), file_to_write, indent=4)
+ with lock: # 使用 with 確保自動 acquire/release
+ with open(success_path, "w+", encoding="utf-8") as file_to_write:
+ json.dump(success_dict, file_to_write, indent=4, ensure_ascii=False)
+
+ with open(failure_path, "w+", encoding="utf-8") as file_to_write:
+ json.dump(failure_dict, file_to_write, indent=4, ensure_ascii=False)
+
+ return success_path, failure_path
+
except Exception as error:
print(repr(error), file=sys.stderr)
- finally:
- lock.release()
+ return "", ""
\ No newline at end of file
diff --git a/je_load_density/utils/generate_report/generate_xml_report.py b/je_load_density/utils/generate_report/generate_xml_report.py
index cc9fb1b..11dd7c0 100644
--- a/je_load_density/utils/generate_report/generate_xml_report.py
+++ b/je_load_density/utils/generate_report/generate_xml_report.py
@@ -1,45 +1,58 @@
import sys
from threading import Lock
from xml.dom.minidom import parseString
+from typing import Tuple
+
from je_load_density.utils.generate_report.generate_json_report import generate_json
from je_load_density.utils.xml.change_xml_structure.change_xml_structure import dict_to_elements_tree
-def generate_xml():
+def generate_xml() -> Tuple[str, str]:
"""
- :return:
+ 產生 XML 字串 (Generate XML strings)
+
+ :return: (成功測試 XML 字串, 失敗測試 XML 字串)
+ (success_xml_str, failure_xml_str)
"""
success_dict, failure_dict = generate_json()
- success_dict = dict({"xml_data": success_dict})
- failure_dict = dict({"xml_data": failure_dict})
- success_json_to_xml = dict_to_elements_tree(success_dict)
- failure_json_to_xml = dict_to_elements_tree(failure_dict)
- return success_json_to_xml, failure_json_to_xml
+ # 包裝成 xml_data 根節點 (Wrap into xml_data root node)
+ success_dict = {"xml_data": success_dict}
+ failure_dict = {"xml_data": failure_dict}
+
+ success_xml_str = dict_to_elements_tree(success_dict)
+ failure_xml_str = dict_to_elements_tree(failure_dict)
+
+ return success_xml_str, failure_xml_str
-def generate_xml_report(xml_file_name: str = "default_name"):
+
+def generate_xml_report(xml_file_name: str = "default_name") -> Tuple[str, str]:
"""
- :param xml_file_name:
+ 輸出 XML 報告檔案 (Generate XML report files)
+
+ :param xml_file_name: 輸出檔案名稱前綴 (Output file name prefix)
+ :return: (成功檔案路徑, 失敗檔案路徑)
"""
- success_xml, failure_xml = generate_xml()
- success_xml = parseString(success_xml)
- failure_xml = parseString(failure_xml)
- success_xml = success_xml.toprettyxml()
- failure_xml = failure_xml.toprettyxml()
+ success_xml_str, failure_xml_str = generate_xml()
+
+ # 使用 minidom 美化輸出 (Pretty print XML using minidom)
+ success_xml = parseString(success_xml_str).toprettyxml()
+ failure_xml = parseString(failure_xml_str).toprettyxml()
+
lock = Lock()
+ success_path = f"{xml_file_name}_success.xml"
+ failure_path = f"{xml_file_name}_failure.xml"
+
try:
- lock.acquire()
- with open(xml_file_name + "_failure.xml", "w+") as file_to_write:
- file_to_write.write(failure_xml)
- except Exception as error:
- print(repr(error), file=sys.stderr)
- finally:
- lock.release()
- try:
- lock.acquire()
- with open(xml_file_name + "_success.xml", "w+") as file_to_write:
- file_to_write.write(success_xml)
+ with lock: # 使用 with 確保自動 acquire/release
+ with open(failure_path, "w+", encoding="utf-8") as file_to_write:
+ file_to_write.write(failure_xml)
+
+ with open(success_path, "w+", encoding="utf-8") as file_to_write:
+ file_to_write.write(success_xml)
+
+ return success_path, failure_path
+
except Exception as error:
print(repr(error), file=sys.stderr)
- finally:
- lock.release()
+ return "", ""
\ No newline at end of file
diff --git a/je_load_density/utils/get_data_strcture/get_api_data.py b/je_load_density/utils/get_data_strcture/get_api_data.py
index db68bf9..c8b3d5f 100644
--- a/je_load_density/utils/get_data_strcture/get_api_data.py
+++ b/je_load_density/utils/get_data_strcture/get_api_data.py
@@ -1,34 +1,59 @@
+import requests
import requests.exceptions
+from typing import Union, Dict
-def get_api_response_data(response: requests.Response,
- start_time: [str, float, int],
- end_time: [str, float, int]) -> dict:
+def get_api_response_data(
+ response: requests.Response,
+ start_time: Union[str, float, int],
+ end_time: Union[str, float, int]
+) -> Dict[str, Union[str, int, dict, bytes]]:
"""
- use requests response to create data dict
- :param response: requests response
- :param start_time: test start time
- :param end_time: test end time
- :return: data dict include [status_code, text, content, headers, history, encoding, cookies,
- elapsed, request_time_sec, request_method, request_url, request_body, start_time, end_time]
+ 使用 requests.Response 建立測試資料字典
+ Create a data dictionary from requests.Response
+
+ :param response: requests response 物件 (requests response object)
+ :param start_time: 測試開始時間 (test start time)
+ :param end_time: 測試結束時間 (test end time)
+ :return: 包含以下欄位的字典 (dictionary including):
+ - status_code
+ - text
+ - content
+ - headers
+ - encoding
+ - cookies
+ - elapsed
+ - history
+ - request_method
+ - request_url
+ - request_body
+ - start_time
+ - end_time
+ - json (if status_code == 200 and response.json() is valid)
"""
- response_data = {
+ response_data: Dict[str, Union[str, int, dict, bytes]] = {
"status_code": response.status_code,
"text": response.text,
"content": response.content,
- "headers": response.headers,
+ "headers": dict(response.headers),
"encoding": response.encoding,
+ "cookies": response.cookies.get_dict(),
+ "elapsed": response.elapsed.total_seconds() if response.elapsed else None,
+ "history": [r.url for r in response.history] if response.history else [],
"request_method": response.request.method,
"request_url": response.request.url,
"request_body": response.request.body,
"start_time": start_time,
- "end_time": end_time
+ "end_time": end_time,
}
+
+ # 嘗試解析 JSON (Try parsing JSON)
try:
- if response_data.get("status_code") == 200:
- response_data.update({"json": response.json()})
+ if response.status_code == 200:
+ response_data["json"] = response.json()
else:
- response_data.update({"json": None})
- except requests.exceptions.JSONDecodeError:
- response_data.update({"json": None})
- return response_data
+ response_data["json"] = None
+ except (requests.exceptions.JSONDecodeError, ValueError):
+ response_data["json"] = None
+
+ return response_data
\ No newline at end of file
diff --git a/je_load_density/utils/json/json_file/json_file.py b/je_load_density/utils/json/json_file/json_file.py
index 146c1e2..a71c678 100644
--- a/je_load_density/utils/json/json_file/json_file.py
+++ b/je_load_density/utils/json/json_file/json_file.py
@@ -1,42 +1,49 @@
import json
from pathlib import Path
from threading import Lock
+from typing import Any, Union
from je_load_density.utils.exception.exceptions import LoadDensityTestJsonException
-from je_load_density.utils.exception.exception_tags import cant_find_json_error
-from je_load_density.utils.exception.exception_tags import cant_save_json_error
+from je_load_density.utils.exception.exception_tags import cant_find_json_error, cant_save_json_error
-def read_action_json(json_file_path: str):
+def read_action_json(json_file_path: str) -> Union[dict, list]:
"""
- read json include actions
- :param json_file_path json file's path to read
+ 讀取 JSON 檔案並回傳內容
+ Read JSON file and return its content
+
+ :param json_file_path: JSON 檔案路徑 (path to JSON file)
+ :return: JSON 內容 (dict or list)
+ :raises LoadDensityTestJsonException: 當檔案不存在或無法讀取時 (if file not found or cannot be read)
"""
- _lock = Lock()
+ lock = Lock()
try:
- _lock.acquire()
- file_path = Path(json_file_path)
- if file_path.exists() and file_path.is_file():
- with open(json_file_path) as read_file:
- return json.load(read_file)
- except LoadDensityTestJsonException:
- raise LoadDensityTestJsonException(cant_find_json_error)
- finally:
- _lock.release()
+ with lock:
+ file_path = Path(json_file_path)
+ if file_path.exists() and file_path.is_file():
+ with open(json_file_path, "r", encoding="utf-8") as read_file:
+ return json.load(read_file)
+ else:
+ raise LoadDensityTestJsonException(cant_find_json_error)
+ except Exception as error:
+ # 捕捉所有錯誤並轉換成自訂例外
+ # Catch all errors and raise custom exception
+ raise LoadDensityTestJsonException(f"{cant_find_json_error}: {error}")
-def write_action_json(json_save_path: str, action_json: list):
+def write_action_json(json_save_path: str, action_json: Union[dict, list]) -> None:
"""
- write json file
- :param json_save_path json save path
- :param action_json the json str include action to write
+ 將資料寫入 JSON 檔案
+ Write data into JSON file
+
+ :param json_save_path: JSON 檔案儲存路徑 (path to save JSON file)
+ :param action_json: 要寫入的資料 (data to write, dict or list)
+ :raises LoadDensityTestJsonException: 當檔案無法寫入時 (if file cannot be saved)
"""
- _lock = Lock()
+ lock = Lock()
try:
- _lock.acquire()
- with open(json_save_path, "w+") as file_to_write:
- file_to_write.write(json.dumps(action_json, indent=4))
- except LoadDensityTestJsonException:
- raise LoadDensityTestJsonException(cant_save_json_error)
- finally:
- _lock.release()
+ with lock:
+ with open(json_save_path, "w+", encoding="utf-8") as file_to_write:
+ json.dump(action_json, file_to_write, indent=4, ensure_ascii=False)
+ except Exception as error:
+ raise LoadDensityTestJsonException(f"{cant_save_json_error}: {error}")
\ No newline at end of file
diff --git a/je_load_density/utils/logging/loggin_instance.py b/je_load_density/utils/logging/loggin_instance.py
index dfbbde8..23feede 100644
--- a/je_load_density/utils/logging/loggin_instance.py
+++ b/je_load_density/utils/logging/loggin_instance.py
@@ -1,15 +1,58 @@
import logging
import sys
+from logging.handlers import RotatingFileHandler
-load_density_logger = logging.getLogger("LoadDensity")
-load_density_logger.setLevel(logging.INFO)
-formatter = logging.Formatter('%(asctime)s | %(name)s | %(levelname)s | %(message)s')
-# Stream handler
-stream_handler = logging.StreamHandler(stream=sys.stderr)
-stream_handler.setFormatter(formatter)
-stream_handler.setLevel(logging.WARNING)
-load_density_logger.addHandler(stream_handler)
-# File handler
-file_handler = logging.FileHandler(filename="LoadDensity.log", mode="w")
-file_handler.setFormatter(formatter)
-load_density_logger.addHandler(file_handler)
+
+class LoadDensityLogger:
+ """
+ 封裝日誌系統
+ Encapsulated logging system with rotating file handler
+ """
+
+ def __init__(self,
+ logger_name: str = "LoadDensity",
+ log_file: str = "LoadDensity.log",
+ max_bytes: int = 1024 * 1024 * 1024, # 1GB
+ backup_count: int = 5):
+ """
+ 初始化 Logger
+ Initialize logger
+
+ :param logger_name: Logger 名稱 (Logger name)
+ :param log_file: 日誌檔案名稱 (Log file name)
+ :param max_bytes: 單一檔案最大大小 (Max file size in bytes)
+ :param backup_count: 保留檔案數量 (Number of backup files)
+ """
+ self.logger = logging.getLogger(logger_name)
+ self.logger.setLevel(logging.INFO)
+
+ formatter = logging.Formatter(
+ '%(asctime)s | %(name)s | %(levelname)s | %(message)s'
+ )
+
+ # Stream handler (輸出到 stderr)
+ stream_handler = logging.StreamHandler(stream=sys.stderr)
+ stream_handler.setFormatter(formatter)
+ stream_handler.setLevel(logging.WARNING)
+
+ # Rotating file handler (檔案大小限制 + 輪替)
+ file_handler = RotatingFileHandler(
+ filename=log_file,
+ mode="a",
+ maxBytes=max_bytes,
+ backupCount=backup_count,
+ encoding="utf-8"
+ )
+ file_handler.setFormatter(formatter)
+ file_handler.setLevel(logging.INFO)
+
+ # 加入 handlers
+ self.logger.addHandler(stream_handler)
+ self.logger.addHandler(file_handler)
+
+ def get_logger(self) -> logging.Logger:
+ """取得 logger 實例 (Get logger instance)"""
+ return self.logger
+
+
+load_density_logger = LoadDensityLogger().get_logger()
diff --git a/je_load_density/utils/package_manager/package_manager_class.py b/je_load_density/utils/package_manager/package_manager_class.py
index a9a1ee4..a461774 100644
--- a/je_load_density/utils/package_manager/package_manager_class.py
+++ b/je_load_density/utils/package_manager/package_manager_class.py
@@ -2,35 +2,61 @@
from importlib.util import find_spec
from inspect import getmembers, isfunction
from sys import stderr
+from typing import Optional, Any
-class PackageManager(object):
+class PackageManager:
+ """
+ 套件管理器
+ Package Manager
- def __init__(self):
- self.installed_package_dict = {
- }
- self.executor = None
+ 用於動態載入套件並將其函式加入到 Executor 的事件字典。
+ Used to dynamically load packages and register their functions into an Executor.
+ """
- def check_package(self, package: str):
- if self.installed_package_dict.get(package, None) is None:
+ def __init__(self) -> None:
+ # 已載入的套件字典 (Dictionary of loaded packages)
+ self.installed_package_dict: dict[str, Any] = {}
+ # Executor 參考 (Reference to Executor instance)
+ self.executor: Optional[Any] = None
+
+ def load_package_if_available(self, package: str) -> Optional[Any]:
+ """
+ 嘗試載入套件 (Try to load a package)
+
+ :param package: 套件名稱 (Package name)
+ :return: 套件模組或 None (Loaded module or None)
+ """
+ if package not in self.installed_package_dict:
found_spec = find_spec(package)
if found_spec is not None:
try:
installed_package = import_module(found_spec.name)
- self.installed_package_dict.update({found_spec.name: installed_package})
+ self.installed_package_dict[found_spec.name] = installed_package
except ModuleNotFoundError as error:
print(repr(error), file=stderr)
- return self.installed_package_dict.get(package, None)
+ return None
+ else:
+ return None
+ return self.installed_package_dict.get(package)
+
+ def add_package_to_executor(self, package: str) -> None:
+ """
+ 將套件的所有函式加入 Executor 的事件字典
+ Add all functions from a package into the Executor's event dictionary
- def add_package_to_executor(self, package):
- installed_package = self.check_package(package)
+ :param package: 套件名稱 (Package name)
+ """
+ installed_package = self.load_package_if_available(package)
if installed_package is not None and self.executor is not None:
- for function in getmembers(installed_package, isfunction):
- self.executor.event_dict.update({str(function): function})
+ for name, function in getmembers(installed_package, isfunction):
+ self.executor.event_dict[name] = function
elif installed_package is None:
print(repr(ModuleNotFoundError(f"Can't find package {package}")), file=stderr)
else:
- print(f"Executor error {self.executor}", file=stderr)
+ print(f"Executor error: {self.executor}", file=stderr)
-package_manager = PackageManager()
+# 建立全域 PackageManager 實例
+# Create global PackageManager instance
+package_manager = PackageManager()
\ No newline at end of file
diff --git a/je_load_density/utils/project/create_project_structure.py b/je_load_density/utils/project/create_project_structure.py
index 94cb525..efa35f6 100644
--- a/je_load_density/utils/project/create_project_structure.py
+++ b/je_load_density/utils/project/create_project_structure.py
@@ -1,58 +1,73 @@
from os import getcwd
from pathlib import Path
from threading import Lock
+from typing import Optional
from je_load_density.utils.json.json_file.json_file import write_action_json
-from je_load_density.utils.project.template.template_executor import executor_template_1, \
- executor_template_2
-from je_load_density.utils.project.template.template_keyword import template_keyword_1, \
- template_keyword_2
+from je_load_density.utils.project.template.template_executor import executor_template_1, executor_template_2
+from je_load_density.utils.project.template.template_keyword import template_keyword_1, template_keyword_2
def create_dir(dir_name: str) -> None:
"""
- :param dir_name: create dir use dir name
- :return: None
+ 建立目錄
+ Create directory
+
+ :param dir_name: 要建立的目錄名稱 (Directory name to create)
"""
- Path(dir_name).mkdir(
- parents=True,
- exist_ok=True
- )
+ Path(dir_name).mkdir(parents=True, exist_ok=True)
+
+def create_template(parent_name: str, project_path: Optional[str] = None) -> None:
+ """
+ 建立模板檔案 (Create template files)
-def create_template(parent_name: str, project_path: str = None) -> None:
+ :param parent_name: 專案主目錄名稱 (Project parent folder name)
+ :param project_path: 專案路徑 (Project path), 預設為當前工作目錄 (default: current working directory)
+ """
if project_path is None:
project_path = getcwd()
- keyword_dir_path = Path(project_path + "/" + parent_name + "/keyword")
- executor_dir_path = Path(project_path + "/" + parent_name + "/executor")
- lock = Lock()
+
+ project_root = Path(project_path) / parent_name
+ keyword_dir_path = project_root / "keyword"
+ executor_dir_path = project_root / "executor"
+
+ # 建立 keyword JSON 檔案
if keyword_dir_path.exists() and keyword_dir_path.is_dir():
- write_action_json(project_path + "/" + parent_name + "/keyword/keyword1.json", template_keyword_1)
- write_action_json(project_path + "/" + parent_name + "/keyword/keyword2.json", template_keyword_2)
- if executor_dir_path.exists() and keyword_dir_path.is_dir():
- lock.acquire()
- try:
- with open(project_path + "/" + parent_name + "/executor/executor_one_file.py", "w+") as file:
+ write_action_json(str(keyword_dir_path) + "keyword1.json", template_keyword_1)
+ write_action_json(str(keyword_dir_path) + "keyword2.json", template_keyword_2)
+
+ # 建立 executor Python 檔案
+ if executor_dir_path.exists() and executor_dir_path.is_dir():
+ lock = Lock()
+ with lock:
+ with open(executor_dir_path / "executor_one_file.py", "w+", encoding="utf-8") as file:
file.write(
executor_template_1.replace(
"{temp}",
- project_path + "/" + parent_name + "/keyword/keyword1.json"
+ str(keyword_dir_path / "keyword1.json")
)
)
- with open(project_path + "/" + parent_name + "/executor/executor_folder.py", "w+") as file:
+ with open(executor_dir_path / "executor_folder.py", "w+", encoding="utf-8") as file:
file.write(
executor_template_2.replace(
"{temp}",
- project_path + "/" + parent_name + "/keyword"
+ str(keyword_dir_path)
)
)
- finally:
- lock.release()
-def create_project_dir(project_path: str = None, parent_name: str = "LoadDensity") -> None:
+def create_project_dir(project_path: Optional[str] = None, parent_name: str = "LoadDensity") -> None:
+ """
+ 建立專案目錄結構 (Create project directory structure)
+
+ :param project_path: 專案路徑 (Project path), 預設為當前工作目錄 (default: current working directory)
+ :param parent_name: 專案主目錄名稱 (Project parent folder name)
+ """
if project_path is None:
project_path = getcwd()
- create_dir(project_path + "/" + parent_name + "/keyword")
- create_dir(project_path + "/" + parent_name + "/executor")
- create_template(parent_name)
+
+ project_root = Path(project_path) / parent_name
+ create_dir(str(project_root) + "keyword")
+ create_dir(str(project_root) + "executor")
+ create_template(parent_name, project_path)
\ No newline at end of file
diff --git a/je_load_density/utils/socket_server/load_density_socket_server.py b/je_load_density/utils/socket_server/load_density_socket_server.py
index 3f412a8..16a3a5e 100644
--- a/je_load_density/utils/socket_server/load_density_socket_server.py
+++ b/je_load_density/utils/socket_server/load_density_socket_server.py
@@ -1,6 +1,7 @@
import json
import sys
from socket import AF_INET, SOCK_STREAM
+from typing import Any
import gevent
from gevent import monkey
@@ -9,53 +10,86 @@
from je_load_density.utils.executor.action_executor import execute_action
-class TCPServer(object):
+class TCPServer:
+ """
+ 基於 gevent 的 TCP 伺服器
+ TCP server based on gevent
- def __init__(self):
+ - 接收 JSON 指令並執行對應動作
+ - 支援 "quit_server" 指令來關閉伺服器
+ """
+
+ def __init__(self) -> None:
self.close_flag: bool = False
self.server: socket.socket = socket.socket(AF_INET, SOCK_STREAM)
- def socket_server(self, host: str, port: int):
+ def socket_server(self, host: str, port: int) -> None:
+ """
+ 啟動伺服器
+ Start the TCP server
+
+ :param host: 伺服器主機位址 (Server host)
+ :param port: 伺服器埠號 (Server port)
+ """
self.server.bind((host, port))
self.server.listen()
+ print(f"Server started on {host}:{port}", flush=True)
+
while not self.close_flag:
- connection = self.server.accept()[0]
- gevent.spawn(self.handle, connection)
- sys.exit(0)
-
- def handle(self, connection):
- connection_data = connection.recv(8192)
- command_string = str(connection_data.strip(), encoding="utf-8")
- print("command is: " + command_string, flush=True)
- if command_string == "quit_server":
- connection.close()
- self.close_flag = True
- self.server.close()
- print("Now quit server", flush=True)
- else:
try:
- execute_str = json.loads(command_string)
- if execute_str is not None:
- for execute_return in execute_action(execute_str).values():
- connection.send(str(execute_return).encode("utf-8"))
- connection.send("\n".encode("utf-8"))
- connection.send("Return_Data_Over_JE".encode("utf-8"))
- connection.send("\n".encode("utf-8"))
+ connection, _ = self.server.accept()
+ gevent.spawn(self.handle, connection)
except Exception as error:
+ print(f"Server error: {error}", file=sys.stderr)
+ break
+
+ self.server.close()
+ print("Server shutdown complete", flush=True)
+
+ def handle(self, connection: socket.socket) -> None:
+ """
+ 處理單一連線
+ Handle a single connection
+
+ :param connection: 客戶端連線 (Client connection)
+ """
+ try:
+ connection_data = connection.recv(8192)
+ if not connection_data:
+ return
+
+ command_string = connection_data.strip().decode("utf-8")
+ print(f"Command received: {command_string}", flush=True)
+
+ if command_string == "quit_server":
+ self.close_flag = True
+ connection.send(b"Server shutting down\n")
+ print("Now quit server", flush=True)
+ else:
try:
- connection.send(str(error).encode("utf-8"))
- connection.send("\n".encode("utf-8"))
- connection.send("Return_Data_Over_JE".encode("utf-8"))
- connection.send("\n".encode("utf-8"))
+ execute_str: Any = json.loads(command_string)
+ if execute_str is not None:
+ for execute_return in execute_action(execute_str).values():
+ connection.send(f"{execute_return}\n".encode("utf-8"))
+ connection.send(b"Return_Data_Over_JE\n")
except Exception as error:
- print(repr(error))
- sys.exit(1)
- finally:
- connection.close()
+ connection.send(f"Error: {error}\n".encode("utf-8"))
+ connection.send(b"Return_Data_Over_JE\n")
+
+ finally:
+ connection.close()
+
+def start_load_density_socket_server(host: str = "localhost", port: int = 9940) -> TCPServer:
+ """
+ 啟動 LoadDensity TCP 伺服器
+ Start LoadDensity TCP server
-def start_load_density_socket_server(host: str = "localhost", port: int = 9940):
+ :param host: 主機位址 (Host)
+ :param port: 埠號 (Port)
+ :return: TCPServer 實例 (TCPServer instance)
+ """
monkey.patch_all()
server = TCPServer()
server.socket_server(host, port)
- return server
+ return server
\ No newline at end of file
diff --git a/je_load_density/utils/test_record/test_record_class.py b/je_load_density/utils/test_record/test_record_class.py
index 751cf87..6427873 100644
--- a/je_load_density/utils/test_record/test_record_class.py
+++ b/je_load_density/utils/test_record/test_record_class.py
@@ -1,15 +1,30 @@
-class TestRecord(object):
+from typing import List, Dict
+
+
+class TestRecord:
"""
- data class to record success and failure test
+ 測試紀錄類別
+ Test record class
+
+ 用來保存成功與失敗的測試紀錄。
+ Used to store success and failure test records.
"""
- def __init__(self):
- self.test_record_list = list()
- self.error_record_list = list()
+ def __init__(self) -> None:
+ # 成功測試紀錄 (Success test records)
+ self.test_record_list: List[Dict] = []
+ # 失敗測試紀錄 (Failure test records)
+ self.error_record_list: List[Dict] = []
- def clean_record(self):
- self.test_record_list = list()
- self.error_record_list = list()
+ def clear_records(self) -> None:
+ """
+ 清除所有測試紀錄
+ Clear all test records
+ """
+ self.test_record_list.clear()
+ self.error_record_list.clear()
-test_record_instance = TestRecord()
+# 建立全域測試紀錄實例
+# Create global test record instance
+test_record_instance = TestRecord()
\ No newline at end of file
diff --git a/je_load_density/utils/xml/change_xml_structure/change_xml_structure.py b/je_load_density/utils/xml/change_xml_structure/change_xml_structure.py
index fcb33ab..4c4a3af 100644
--- a/je_load_density/utils/xml/change_xml_structure/change_xml_structure.py
+++ b/je_load_density/utils/xml/change_xml_structure/change_xml_structure.py
@@ -1,61 +1,82 @@
from collections import defaultdict
from xml.etree import ElementTree
+from typing import Union, Dict, Any
-def elements_tree_to_dict(elements_tree):
+def elements_tree_to_dict(elements_tree: ElementTree.Element) -> Dict[str, Any]:
"""
- :param elements_tree: full xml string
- :return: xml str to dict
+ 將 XML ElementTree 轉換為字典
+ Convert XML ElementTree to dictionary
+
+ :param elements_tree: XML ElementTree 元素 (XML ElementTree element)
+ :return: 對應的字典結構 (Dictionary representation)
"""
- elements_dict: dict = {elements_tree.tag: {} if elements_tree.attrib else None}
- children: list = list(elements_tree)
+ elements_dict: Dict[str, Any] = {elements_tree.tag: {} if elements_tree.attrib else None}
+ children = list(elements_tree)
+
+ # 遞迴處理子節點 (Recursively process children)
if children:
default_dict = defaultdict(list)
for dc in map(elements_tree_to_dict, children):
for key, value in dc.items():
default_dict[key].append(value)
- elements_dict: dict = {
- elements_tree.tag: {key: value[0] if len(value) == 1 else value for key, value in default_dict.items()}}
+ elements_dict[elements_tree.tag] = {
+ key: value[0] if len(value) == 1 else value
+ for key, value in default_dict.items()
+ }
+
+ # 加入屬性 (Add attributes)
if elements_tree.attrib:
- elements_dict[elements_tree.tag].update(('@' + key, value) for key, value in elements_tree.attrib.items())
+ elements_dict[elements_tree.tag].update(
+ {f"@{key}": value for key, value in elements_tree.attrib.items()}
+ )
+
+ # 加入文字內容 (Add text content)
if elements_tree.text:
text = elements_tree.text.strip()
if children or elements_tree.attrib:
if text:
- elements_dict[elements_tree.tag]['#text'] = text
+ elements_dict[elements_tree.tag]["#text"] = text
else:
elements_dict[elements_tree.tag] = text
+
return elements_dict
-def dict_to_elements_tree(json_dict: dict):
+def dict_to_elements_tree(json_dict: Dict[str, Any]) -> str:
"""
- :param json_dict: json dict
- :return: json dict to xml string
+ 將字典轉換為 XML 字串
+ Convert dictionary to XML string
+
+ :param json_dict: JSON 格式字典 (Dictionary in JSON-like format)
+ :return: XML 字串 (XML string)
"""
- def _to_elements_tree(json_dict: dict, root):
+ def _to_elements_tree(json_dict: Any, root: ElementTree.Element) -> None:
if isinstance(json_dict, str):
root.text = json_dict
elif isinstance(json_dict, dict):
for key, value in json_dict.items():
- assert isinstance(key, str)
- if key.startswith('#'):
- assert key == '#text' and isinstance(value, str)
+ if key.startswith("#"): # 處理文字節點
+ if key != "#text" or not isinstance(value, str):
+ raise TypeError(f"Invalid text node: {key} -> {value}")
root.text = value
- elif key.startswith('@'):
- assert isinstance(value, str)
+ elif key.startswith("@"): # 處理屬性
+ if not isinstance(value, str):
+ raise TypeError(f"Invalid attribute value: {key} -> {value}")
root.set(key[1:], value)
- elif isinstance(value, list):
- for elements in value:
- _to_elements_tree(elements, ElementTree.SubElement(root, key))
- else:
+ elif isinstance(value, list): # 處理子節點清單
+ for element in value:
+ _to_elements_tree(element, ElementTree.SubElement(root, key))
+ else: # 處理單一子節點
_to_elements_tree(value, ElementTree.SubElement(root, key))
else:
- raise TypeError('invalid type: ' + str(type(json_dict)))
+ raise TypeError(f"Invalid type in dict_to_elements_tree: {type(json_dict)}")
+
+ if not isinstance(json_dict, dict) or len(json_dict) != 1:
+ raise ValueError("Input must be a dictionary with a single root element")
- assert isinstance(json_dict, dict) and len(json_dict) == 1
tag, body = next(iter(json_dict.items()))
node = ElementTree.Element(tag)
_to_elements_tree(body, node)
- return str(ElementTree.tostring(node), encoding="utf-8")
+ return ElementTree.tostring(node, encoding="utf-8").decode("utf-8")
\ No newline at end of file
diff --git a/je_load_density/utils/xml/xml_file/xml_file.py b/je_load_density/utils/xml/xml_file/xml_file.py
index 93400bc..216ace1 100644
--- a/je_load_density/utils/xml/xml_file/xml_file.py
+++ b/je_load_density/utils/xml/xml_file/xml_file.py
@@ -1,67 +1,96 @@
import xml.dom.minidom
from xml.etree import ElementTree
+from xml.etree.ElementTree import ParseError
+from typing import Optional
-from je_load_density.utils.exception.exception_tags import cant_read_xml_error
-from je_load_density.utils.exception.exception_tags import xml_type_error
-from je_load_density.utils.exception.exceptions import XMLException
-from je_load_density.utils.exception.exceptions import XMLTypeException
+from je_load_density.utils.exception.exception_tags import cant_read_xml_error, xml_type_error
+from je_load_density.utils.exception.exceptions import XMLException, XMLTypeException
-def reformat_xml_file(xml_string: str):
+def reformat_xml_file(xml_string: str) -> str:
+ """
+ 將 XML 字串重新格式化為漂亮的排版
+ Reformat XML string into pretty-printed format
+
+ :param xml_string: 原始 XML 字串 (Raw XML string)
+ :return: 格式化後的 XML 字串 (Pretty-printed XML string)
+ """
dom = xml.dom.minidom.parseString(xml_string)
- return dom.toprettyxml()
+ return dom.toprettyxml(indent=" ")
+
+class XMLParser:
+ """
+ XML 解析器
+ XML Parser
-class XMLParser(object):
+ 支援從字串或檔案解析 XML,並能將 XML 寫入檔案。
+ Supports parsing XML from string or file, and writing XML to file.
+ """
- def __init__(self, xml_string: str, xml_type: str = "string"):
+ def __init__(self, xml_string: str, xml_type: str = "string") -> None:
"""
- :param xml_string: full xml string
- :param xml_type: file or string
+ 初始化 XMLParser
+ Initialize XMLParser
+
+ :param xml_string: XML 字串或檔案路徑 (XML string or file path)
+ :param xml_type: "string" 或 "file" (Parse from string or file)
"""
- self.element_tree = ElementTree
- self.tree = None
- self.xml_root = None
- self.xml_from_type = "string"
- self.xml_string = xml_string.strip()
+ self.tree: Optional[ElementTree.ElementTree] = None
+ self.xml_root: Optional[ElementTree.Element] = None
+ self.xml_from_type: str = "string"
+ self.xml_string: str = xml_string.strip()
+
xml_type = xml_type.lower()
if xml_type not in ["file", "string"]:
raise XMLTypeException(xml_type_error)
+
if xml_type == "string":
self.xml_parser_from_string()
else:
self.xml_parser_from_file()
- def xml_parser_from_string(self, **kwargs):
+ def xml_parser_from_string(self, **kwargs) -> ElementTree.Element:
"""
- :param kwargs: any another param
- :return: xml root element tree
+ 從字串解析 XML
+ Parse XML from string
+
+ :param kwargs: 額外參數 (extra parameters)
+ :return: XML 根節點 (XML root element)
"""
try:
self.xml_root = ElementTree.fromstring(self.xml_string, **kwargs)
- except XMLException:
- raise XMLException(cant_read_xml_error)
+ except ParseError as error:
+ raise XMLException(f"{cant_read_xml_error}: {error}")
return self.xml_root
- def xml_parser_from_file(self, **kwargs):
+ def xml_parser_from_file(self, **kwargs) -> ElementTree.Element:
"""
- :param kwargs: any another param
- :return: xml root element tree
+ 從檔案解析 XML
+ Parse XML from file
+
+ :param kwargs: 額外參數 (extra parameters)
+ :return: XML 根節點 (XML root element)
"""
try:
self.tree = ElementTree.parse(self.xml_string, **kwargs)
- except XMLException:
- raise XMLException(cant_read_xml_error)
+ except (ParseError, OSError) as error:
+ raise XMLException(f"{cant_read_xml_error}: {error}")
self.xml_root = self.tree.getroot()
self.xml_from_type = "file"
return self.xml_root
- def write_xml(self, write_xml_filename: str, write_content: str):
+ def write_xml(self, write_xml_filename: str, write_content: str) -> None:
"""
- :param write_xml_filename: xml file name
- :param write_content: content to write
+ 將 XML 字串寫入檔案
+ Write XML string into file
+
+ :param write_xml_filename: 輸出檔案名稱 (Output file name)
+ :param write_content: XML 字串內容 (XML string content)
"""
- write_content = write_content.strip()
- content = self.element_tree.fromstring(write_content)
- tree = self.element_tree.ElementTree(content)
- tree.write(write_xml_filename, encoding="utf-8")
+ try:
+ content = ElementTree.fromstring(write_content.strip())
+ tree = ElementTree.ElementTree(content)
+ tree.write(write_xml_filename, encoding="utf-8", xml_declaration=True)
+ except ParseError as error:
+ raise XMLException(f"{cant_read_xml_error}: {error}")
\ No newline at end of file
diff --git a/je_load_density/wrapper/create_locust_env/create_locust_env.py b/je_load_density/wrapper/create_locust_env/create_locust_env.py
index 601f580..abe59a2 100644
--- a/je_load_density/wrapper/create_locust_env/create_locust_env.py
+++ b/je_load_density/wrapper/create_locust_env/create_locust_env.py
@@ -1,3 +1,5 @@
+from typing import List
+
import gevent
from locust import User
from locust import events
@@ -10,7 +12,7 @@
setup_logging("INFO", None)
-def prepare_env(user_class: [User], user_count: int = 50, spawn_rate: int = 10, test_time: int = 60,
+def prepare_env(user_class: List[User], user_count: int = 50, spawn_rate: int = 10, test_time: int = 60,
web_ui_dict: dict = None,
**kwargs):
"""
@@ -37,7 +39,7 @@ def prepare_env(user_class: [User], user_count: int = 50, spawn_rate: int = 10,
env.web_ui.stop()
-def create_env(user_class: [User], another_event: events = events):
+def create_env(user_class: List[User], another_event: events = events):
"""
:param another_event: you can use your locust event setting but don't change locust request event
:param user_class: locust user class
diff --git a/je_load_density/wrapper/event/request_hook.py b/je_load_density/wrapper/event/request_hook.py
index bb0c48b..3dab409 100644
--- a/je_load_density/wrapper/event/request_hook.py
+++ b/je_load_density/wrapper/event/request_hook.py
@@ -4,37 +4,45 @@
@events.request.add_listener
def request_hook(
- start_time,
- url,
- request_type,
- name,
- context,
- response,
- exception,
- response_length,
- response_time,
- **kwargs
+ start_time,
+ url,
+ request_type,
+ name,
+ context,
+ response,
+ exception,
+ response_length,
+ response_time,
+ **kwargs
):
+ """
+ Locust request hook
+ 將每個 request 的結果紀錄到 test_record_instance
+ """
+
if exception is None:
+ # 成功紀錄 (Success record)
test_record_instance.test_record_list.append(
{
"Method": str(request_type),
"test_url": str(url),
"name": str(name),
+ "status_code": str(response.status_code),
"text": str(response.text),
"content": str(response.content),
"headers": str(response.headers),
- "status_code": str(response.status_code),
- "error": str(exception)
+ "error": None, # 成功時 error 為 None
}
)
else:
+ # 失敗紀錄 (Failure record)
test_record_instance.error_record_list.append(
{
"Method": str(request_type),
"test_url": str(url),
"name": str(name),
- "status_code": str(response.status_code),
- "text": str(response.text)
+ "status_code": str(response.status_code) if response else None,
+ "text": str(response.text) if response else None,
+ "error": str(exception), # 失敗時紀錄 exception
}
- )
+ )
\ No newline at end of file
diff --git a/je_load_density/wrapper/proxy/proxy_user.py b/je_load_density/wrapper/proxy/proxy_user.py
index 40c7a84..6f1e2e3 100644
--- a/je_load_density/wrapper/proxy/proxy_user.py
+++ b/je_load_density/wrapper/proxy/proxy_user.py
@@ -1,17 +1,45 @@
+from typing import Dict, Any
from je_load_density.wrapper.proxy.user.fast_http_user_proxy import ProxyFastHTTPUser
from je_load_density.wrapper.proxy.user.http_user_proxy import ProxyHTTPUser
-class LocustUserProxy(object):
+class LocustUserProxy:
+ """
+ Locust 使用者代理容器
+ Locust User Proxy Container
- def __init__(self):
- self.user_dict = dict()
- self.user_dict.update(
- {
- "fast_http_user": ProxyFastHTTPUser(),
- "http_user": ProxyHTTPUser()
- }
- )
+ 用來保存並管理 FastHTTPUser 與 HTTPUser 的代理。
+ Used to store and manage FastHTTPUser and HTTPUser proxies.
+ """
+ def __init__(self) -> None:
+ # 使用者代理字典 (User proxy dictionary)
+ self.user_dict: Dict[str, Any] = {
+ "fast_http_user": ProxyFastHTTPUser(),
+ "http_user": ProxyHTTPUser(),
+ }
-locust_wrapper_proxy = LocustUserProxy()
+ def get_user(self, user_type: str) -> Any:
+ """
+ 取得指定類型的使用者代理
+ Get specified user proxy
+
+ :param user_type: "fast_http_user" 或 "http_user"
+ :return: 對應的使用者代理 (Corresponding user proxy)
+ """
+ return self.user_dict.get(user_type)
+
+ def set_user(self, user_type: str, user_instance: Any) -> None:
+ """
+ 設定或替換使用者代理
+ Set or replace user proxy
+
+ :param user_type: 使用者類型 (User type key)
+ :param user_instance: 使用者代理實例 (User proxy instance)
+ """
+ self.user_dict[user_type] = user_instance
+
+
+# 建立全域代理容器實例
+# Create global proxy container instance
+locust_wrapper_proxy = LocustUserProxy()
\ No newline at end of file
diff --git a/je_load_density/wrapper/proxy/user/fast_http_user_proxy.py b/je_load_density/wrapper/proxy/user/fast_http_user_proxy.py
index ff5ad38..d645a2e 100644
--- a/je_load_density/wrapper/proxy/user/fast_http_user_proxy.py
+++ b/je_load_density/wrapper/proxy/user/fast_http_user_proxy.py
@@ -1,9 +1,28 @@
-class ProxyFastHTTPUser(object):
+from typing import Dict, Any, List, Optional
- def __init__(self):
- self.user_detail_dict = None
- self.tasks = None
- def setting(self, user_detail_dict: dict, tasks):
+class ProxyFastHTTPUser:
+ """
+ 代理使用者類別
+ Proxy Fast HTTP User class
+
+ 用來保存使用者細節與任務設定。
+ Used to store user details and tasks configuration.
+ """
+
+ def __init__(self) -> None:
+ # 使用者細節 (User details)
+ self.user_detail_dict: Optional[Dict[str, Any]] = None
+ # 任務列表 (Tasks list)
+ self.tasks: Optional[List[Any]] = None
+
+ def configure(self, user_detail_dict: Dict[str, Any], tasks: List[Any]) -> None:
+ """
+ 設定使用者細節與任務
+ Configure user details and tasks
+
+ :param user_detail_dict: 使用者細節字典 (User details dictionary)
+ :param tasks: 任務列表 (List of tasks)
+ """
self.user_detail_dict = user_detail_dict
- self.tasks = tasks
+ self.tasks = tasks
\ No newline at end of file
diff --git a/je_load_density/wrapper/proxy/user/http_user_proxy.py b/je_load_density/wrapper/proxy/user/http_user_proxy.py
index fe26fd1..2caadb2 100644
--- a/je_load_density/wrapper/proxy/user/http_user_proxy.py
+++ b/je_load_density/wrapper/proxy/user/http_user_proxy.py
@@ -1,9 +1,40 @@
-class ProxyHTTPUser(object):
+from typing import Dict, Any, Callable, List, Optional
- def __init__(self):
- self.user_detail_dict = None
- self.tasks = None
- def setting(self, user_detail_dict: dict, tasks: dict):
+class ProxyHTTPUser:
+ """
+ 代理 HTTP 使用者類別
+ Proxy HTTP User class
+
+ 用來保存使用者細節與任務設定。
+ Used to store user details and tasks configuration.
+ """
+
+ def __init__(self) -> None:
+ # 使用者細節 (User details)
+ self.user_detail_dict: Optional[Dict[str, Any]] = None
+ # 任務列表 (Tasks list, 可是函式或其他可呼叫物件)
+ self.tasks: Optional[List[Callable]] = None
+
+ def configure(self, user_detail_dict: Dict[str, Any], tasks: List[Callable]) -> None:
+ """
+ 設定使用者細節與任務
+ Configure user details and tasks
+
+ :param user_detail_dict: 使用者細節字典 (User details dictionary)
+ :param tasks: 任務列表 (List of tasks, functions or callables)
+ """
self.user_detail_dict = user_detail_dict
self.tasks = tasks
+
+ def run_tasks(self) -> None:
+ """
+ 執行所有任務
+ Run all tasks
+ """
+ if not self.tasks:
+ print("No tasks configured.")
+ return
+ for task in self.tasks:
+ if callable(task):
+ task(self.user_detail_dict)
\ No newline at end of file
diff --git a/je_load_density/wrapper/start_wrapper/start_test.py b/je_load_density/wrapper/start_wrapper/start_test.py
index c2423b0..2eeb3e6 100644
--- a/je_load_density/wrapper/start_wrapper/start_test.py
+++ b/je_load_density/wrapper/start_wrapper/start_test.py
@@ -1,3 +1,4 @@
+from typing import Dict, Any, Optional
from je_load_density.utils.logging.loggin_instance import load_density_logger
from je_load_density.wrapper.create_locust_env.create_locust_env import prepare_env
from je_load_density.wrapper.user_template.fast_http_user_template import FastHttpUserWrapper, set_wrapper_fasthttp_user
@@ -5,37 +6,63 @@
def start_test(
- user_detail_dict: dict,
- user_count: int = 50, spawn_rate: int = 10, test_time: int = 60,
- web_ui_dict: dict = None,
- **kwargs
-):
+ user_detail_dict: Dict[str, Any],
+ user_count: int = 50,
+ spawn_rate: int = 10,
+ test_time: Optional[int] = 60,
+ web_ui_dict: Optional[Dict[str, Any]] = None,
+ **kwargs
+) -> Dict[str, Any]:
"""
- :param user_detail_dict: dict use to create user
- :param user_count: how many user we want to spawn
- :param spawn_rate: one time will spawn how many user
- :param test_time: total test run time
- :param web_ui_dict: web ui dict include host and port like {"host": "127.0.0.1", "port": 8089}
- :param kwargs: to catch unknown param
- :return: None
+ 啟動壓力測試
+ Start load test
+
+ :param user_detail_dict: 使用者設定字典 (User detail dictionary)
+ :param user_count: 使用者數量 (Number of users to spawn)
+ :param spawn_rate: 每秒生成使用者數量 (Spawn rate per second)
+ :param test_time: 測試持續時間 (Test duration in seconds)
+ :param web_ui_dict: Web UI 設定,例如 {"host": "127.0.0.1", "port": 8089}
+ :param kwargs: 其他參數 (extra parameters)
+ :return: 測試設定摘要字典 (Summary dictionary of test configuration)
"""
load_density_logger.info(
- f"start_test, user_detail_dict: {user_detail_dict}, user_count: {user_count}, "
- f"spawn_rate: {spawn_rate}, test_time: {test_time}, web_ui_dict: {web_ui_dict}, "
- f"params: {kwargs}"
+ f"start_test, user_detail_dict={user_detail_dict}, user_count={user_count}, "
+ f"spawn_rate={spawn_rate}, test_time={test_time}, web_ui_dict={web_ui_dict}, params={kwargs}"
)
+
+ # 使用者類型映射 (User type mapping)
user_dict = {
"fast_http_user": {"actually_user": FastHttpUserWrapper, "init": set_wrapper_fasthttp_user},
- "http_user": {"actually_user": HttpUserWrapper, "init": set_wrapper_http_user}
+ "http_user": {"actually_user": HttpUserWrapper, "init": set_wrapper_http_user},
}
- user = user_dict.get(user_detail_dict.get("user", "fast_http_user"))
- actually_user = user.get("actually_user", "actually_user")
- init_function = user.get("init", "init")
+
+ user_type = user_detail_dict.get("user", "fast_http_user")
+ user = user_dict.get(user_type)
+
+ if user is None:
+ raise ValueError(f"Unsupported user type: {user_type}")
+
+ actually_user = user["actually_user"]
+ init_function = user["init"]
+
+ # 初始化使用者設定 (Initialize user configuration)
init_function(user_detail_dict, **kwargs)
+ # 建立並執行測試環境 (Create and run test environment)
prepare_env(
- user_class=actually_user, user_count=user_count, spawn_rate=spawn_rate, test_time=test_time,
- web_ui_dict=web_ui_dict, **kwargs
+ user_class=actually_user,
+ user_count=user_count,
+ spawn_rate=spawn_rate,
+ test_time=test_time,
+ web_ui_dict=web_ui_dict,
+ **kwargs
)
- return str(user_detail_dict) + " " + "user_count: " + str(user_count) + " spawn_rate: " + str(
- spawn_rate) + " test_time: " + str(test_time)
+
+ # 回傳結構化結果 (Return structured result)
+ return {
+ "user_detail": user_detail_dict,
+ "user_count": user_count,
+ "spawn_rate": spawn_rate,
+ "test_time": test_time,
+ "web_ui": web_ui_dict,
+ }
\ No newline at end of file
diff --git a/je_load_density/wrapper/user_template/fast_http_user_template.py b/je_load_density/wrapper/user_template/fast_http_user_template.py
index eab3d3d..9e90614 100644
--- a/je_load_density/wrapper/user_template/fast_http_user_template.py
+++ b/je_load_density/wrapper/user_template/fast_http_user_template.py
@@ -1,24 +1,30 @@
-from locust import FastHttpUser, between
-from locust import task
-
+from typing import Dict, Any
+from locust import FastHttpUser, between, task
from je_load_density.wrapper.proxy.proxy_user import locust_wrapper_proxy
-def set_wrapper_fasthttp_user(user_detail_dict, **kwargs):
- locust_wrapper_proxy.user_dict.get("fast_http_user").setting(user_detail_dict, **kwargs)
+def set_wrapper_fasthttp_user(user_detail_dict: Dict[str, Any], **kwargs) -> type:
+ """
+ 設定 FastHttpUser 的代理使用者
+ Configure FastHttpUser proxy user
+ """
+ locust_wrapper_proxy.user_dict.get("fast_http_user").configure(user_detail_dict, **kwargs)
return FastHttpUserWrapper
class FastHttpUserWrapper(FastHttpUser):
"""
- locust fast http user use to test
+ Locust FastHttpUser 包裝類別
+ Locust FastHttpUser wrapper class
"""
+
host = "http://localhost"
wait_time = between(0.1, 0.2)
def __init__(self, environment):
super().__init__(environment)
- self.method = {
+ # HTTP 方法映射 (HTTP method mapping)
+ self.method: Dict[str, Any] = {
"get": self.client.get,
"post": self.client.post,
"put": self.client.put,
@@ -29,6 +35,18 @@ def __init__(self, environment):
}
@task
- def test(self):
- for test_task_method, test_task_data in locust_wrapper_proxy.user_dict.get("fast_http_user").tasks.items():
- self.method.get(str(test_task_method).lower())(test_task_data.get("request_url"))
+ def test(self) -> None:
+ """
+ 執行測試任務
+ Execute test tasks
+ """
+ proxy_user = locust_wrapper_proxy.user_dict.get("fast_http_user")
+ if not proxy_user or not proxy_user.tasks:
+ return
+
+ for test_task_method, test_task_data in proxy_user.tasks.items():
+ http_method = self.method.get(str(test_task_method).lower())
+ if http_method and isinstance(test_task_data, dict):
+ request_url = test_task_data.get("request_url")
+ if request_url:
+ http_method(request_url)
\ No newline at end of file
diff --git a/je_load_density/wrapper/user_template/http_user_template.py b/je_load_density/wrapper/user_template/http_user_template.py
index 9fec1ac..0e64999 100644
--- a/je_load_density/wrapper/user_template/http_user_template.py
+++ b/je_load_density/wrapper/user_template/http_user_template.py
@@ -1,23 +1,32 @@
+from typing import Dict, Any
+
from locust import HttpUser, task, between
from je_load_density.wrapper.proxy.proxy_user import locust_wrapper_proxy
-def set_wrapper_http_user(user_detail_dict: dict, **kwargs):
- locust_wrapper_proxy.user_dict.get("http_user").setting(user_detail_dict, **kwargs)
+def set_wrapper_http_user(user_detail_dict: Dict[str, Any], **kwargs) -> type:
+ """
+ 設定 HttpUser 的代理使用者
+ Configure HttpUser proxy user
+ """
+ locust_wrapper_proxy.user_dict.get("http_user").configure(user_detail_dict, **kwargs)
return HttpUserWrapper
class HttpUserWrapper(HttpUser):
"""
- locust http user use to test
+ Locust HttpUser 包裝類別
+ Locust HttpUser wrapper class
"""
+
host = "http://localhost"
wait_time = between(0.1, 0.2)
def __init__(self, environment):
super().__init__(environment)
- self.method = {
+ # HTTP 方法映射 (HTTP method mapping)
+ self.method: Dict[str, Any] = {
"get": self.client.get,
"post": self.client.post,
"put": self.client.put,
@@ -28,8 +37,18 @@ def __init__(self, environment):
}
@task
- def test(self):
- for test_task_method, test_task_data in locust_wrapper_proxy.user_dict.get("http_user").tasks.items():
- self.method.get(str(test_task_method).lower())(test_task_data.get("request_url"))
-
-
+ def test(self) -> None:
+ """
+ 執行測試任務
+ Execute test tasks
+ """
+ proxy_user = locust_wrapper_proxy.user_dict.get("http_user")
+ if not proxy_user or not proxy_user.tasks:
+ return
+
+ for test_task_method, test_task_data in proxy_user.tasks.items():
+ http_method = self.method.get(str(test_task_method).lower())
+ if http_method and isinstance(test_task_data, dict):
+ request_url = test_task_data.get("request_url")
+ if request_url:
+ http_method(request_url)
diff --git a/pyproject.toml b/pyproject.toml
index 9053aaa..04f5338 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "je_load_density"
-version = "0.0.63"
+version = "0.0.64"
authors = [
{ name = "JE-Chen", email = "jechenmailman@gmail.com" },
]
diff --git a/test/__init__.py b/test/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/test/unit_test/scheduler_test/sec_cron_test.py b/test/unit_test/scheduler_test/sec_cron_test.py
deleted file mode 100644
index cf5ecd5..0000000
--- a/test/unit_test/scheduler_test/sec_cron_test.py
+++ /dev/null
@@ -1,12 +0,0 @@
-from je_load_density import SchedulerManager
-
-
-def test_scheduler():
- print("Test Scheduler")
- scheduler.remove_blocking_job(id="test")
- scheduler.shutdown_blocking_scheduler()
-
-
-scheduler = SchedulerManager()
-scheduler.add_cron_blocking(function=test_scheduler, id="test", second="*")
-scheduler.start_block_scheduler()
diff --git a/test/unit_test/scheduler_test/sec_interval_test.py b/test/unit_test/scheduler_test/sec_interval_test.py
deleted file mode 100644
index c4c1cd9..0000000
--- a/test/unit_test/scheduler_test/sec_interval_test.py
+++ /dev/null
@@ -1,12 +0,0 @@
-from je_load_density import SchedulerManager
-
-
-def test_scheduler():
- print("Test Scheduler")
- scheduler.remove_blocking_job(id="test")
- scheduler.shutdown_blocking_scheduler()
-
-
-scheduler = SchedulerManager()
-scheduler.add_interval_blocking_secondly(function=test_scheduler, id="test")
-scheduler.start_block_scheduler()