diff --git a/README.md b/README.md index 6b39669..3a8fc9d 100644 --- a/README.md +++ b/README.md @@ -1,56 +1,35 @@ -## LoadDensity -[![Downloads](https://static.pepy.tech/badge/je-load-density)](https://pepy.tech/project/je-load-density) - -[![Codacy Badge](https://app.codacy.com/project/badge/Grade/b3f05488c16a44959cbf0ec28d4c977c)](https://www.codacy.com/gh/JE-Chen/LoadDensity/dashboard?utm_source=github.com&utm_medium=referral&utm_content=JE-Chen/LoadDensity&utm_campaign=Badge_Grade) - -[![LoadDensity Stable Python3.8](https://github.com/Intergration-Automation-Testing/LoadDensity/actions/workflows/stable_python3_8.yml/badge.svg)](https://github.com/Intergration-Automation-Testing/LoadDensity/actions/workflows/stable_python3_8.yml) - -[![LoadDensity Stable Python3.9](https://github.com/Intergration-Automation-Testing/LoadDensity/actions/workflows/stable_python3_9.yml/badge.svg)](https://github.com/Intergration-Automation-Testing/LoadDensity/actions/workflows/stable_python3_9.yml) - -[![LoadDensity Stable Python3.10](https://github.com/Intergration-Automation-Testing/LoadDensity/actions/workflows/stable_python3_10.yml/badge.svg)](https://github.com/Intergration-Automation-Testing/LoadDensity/actions/workflows/stable_python3_10.yml) - -[![LoadDensity Stable Python3.11](https://github.com/Intergration-Automation-Testing/LoadDensity/actions/workflows/stable_python3_11.yml/badge.svg)](https://github.com/Intergration-Automation-Testing/LoadDensity/actions/workflows/stable_python3_11.yml) - -### Documentation - -[![Documentation Status](https://readthedocs.org/projects/loaddensity/badge/?version=latest)](https://loaddensity.readthedocs.io/en/latest/?badge=latest) - -[LoadDensity Doc Click Here!](https://loaddensity.readthedocs.io/en/latest/) - ---- -> Project Kanban \ -> https://github.com/orgs/Integration-Automation/projects/2/views/1 -> * Load automation. -> * Easy setup user template. -> * Load Density script. -> * Generate JSON/HTML/XML report. -> * 1 sec / thousands requests. -> * Fast spawn users. -> * Multi test on one task. -> * Specify test time. -> * OS Independent. -> * Remote automation support. -> * Project & Template support. ---- - -## install +# LoadDensity +A high‑performance load testing and automation tool. +It supports fast user spawning, flexible templates, and generates reports in multiple formats. +Designed to be cross‑platform and easy to integrate into your projects. + +- Load automation: Quickly set up and run load tests +- User templates: Simple configuration for reusable test users +- Load Density scripts: Define and execute repeatable scenarios +- Report generation: Export results in JSON, HTML, or XML +- High throughput: Thousands of requests per second +- Fast user spawning: Scale up test users instantly +- Multi‑test support: Run multiple tests on a single task +- Configurable test duration: Specify how long tests should run +- OS independent: Works across major operating systems +- Remote automation: Execute tests remotely +- Project & template support: Organize and reuse test setup + +## Installation ``` pip install je_locust_wrapper ``` -## require +## Require ``` -python 3.8 or later +python 3.9 or later ``` ->* Test on ->> * windows 10 ~ 11 ->> * osx 10.5 ~ 11 big sur ->> * ubuntu 20.0.4 ->> * raspberry pi 3B+ ->> * All test in test dir ->> * -### Architecture Diagram -![Architecture Diagram](architecture_diagram/LoadDnesity_Archirecture.drawio.png) \ No newline at end of file +## Tested Platforms +- Windows 10 ~ 11 +- macOS 10.15 ~ 11 Big Sur +- Ubuntu 20.04 +- Raspberry Pi 3B+ +- All test cases are located in the test directory diff --git a/architecture_diagram/LoadDnesity_Archirecture.drawio b/architecture_diagram/LoadDnesity_Archirecture.drawio deleted file mode 100644 index 08f8628..0000000 --- a/architecture_diagram/LoadDnesity_Archirecture.drawio +++ /dev/nulldiff --git a/architecture_diagram/LoadDnesity_Archirecture.drawio.png b/architecture_diagram/LoadDnesity_Archirecture.drawio.png deleted file mode 100644 index 082c47c..0000000 Binary files a/architecture_diagram/LoadDnesity_Archirecture.drawio.png and /dev/null differ diff --git a/dev.toml b/dev.toml index 91cad16..2d993e5 100644 --- a/dev.toml +++ b/dev.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" [project] name = "je_load_density_dev" -version = "0.0.77" +version = "0.0.78" authors = [ { name = "JE-Chen", email = "jechenmailman@gmail.com" }, ] diff --git a/dev_requirements.txt b/dev_requirements.txt index 371d98e..d2286ac 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -3,3 +3,4 @@ sphinx twine sphinx-rtd-theme build +pytest diff --git a/je_load_density/gui/load_density_gui_thread.py b/je_load_density/gui/load_density_gui_thread.py index ed20b54..dd858cc 100644 --- a/je_load_density/gui/load_density_gui_thread.py +++ b/je_load_density/gui/load_density_gui_thread.py @@ -1,26 +1,59 @@ from PySide6.QtCore import QThread from je_load_density.wrapper.start_wrapper.start_test import start_test +# 定義常數,避免硬編碼字串 +# Define constant to avoid hard-coded string +DEFAULT_USER_TYPE = "fast_http_user" + class LoadDensityGUIThread(QThread): + """ + GUI 測試執行緒 + GUI Test Thread + + 用於在背景執行負載測試,避免阻塞主介面。 + Used to run load tests in the background without blocking the main GUI. + """ - def __init__(self): + def __init__(self, + request_url: str = None, + test_duration: int = None, + user_count: int = None, + spawn_rate: int = None, + http_method: str = None): + """ + 初始化執行緒參數 + Initialize thread parameters + + :param request_url: 測試目標 URL (Target request URL) + :param test_duration: 測試持續時間 (Test duration in seconds) + :param user_count: 使用者數量 (Number of simulated users) + :param spawn_rate: 使用者生成速率 (User spawn rate) + :param http_method: HTTP 方法 (HTTP method, e.g., "GET", "POST") + """ super().__init__() - self.url = None - self.test_time = None - self.user_count = None - self.spawn_rate = None - self.method = None + self.request_url = request_url + self.test_duration = test_duration + self.user_count = user_count + self.spawn_rate = spawn_rate + self.http_method = http_method def run(self): + """ + 執行負載測試 + Run the load test + """ + if not self.request_url or not self.http_method: + # 基本檢查,避免傳入 None + # Basic validation to avoid None values + raise ValueError("Request URL and HTTP method must be provided.") + start_test( - { - "user": "fast_http_user", - }, - self.user_count, self.spawn_rate, self.test_time, - **{ - "tasks": { - self.method: {"request_url": self.url}, - } + {"user": DEFAULT_USER_TYPE}, # 使用者類型 (User type) + self.user_count, + self.spawn_rate, + self.test_duration, + tasks={ + self.http_method: {"request_url": self.request_url} } ) \ No newline at end of file diff --git a/je_load_density/gui/log_to_ui_filter.py b/je_load_density/gui/log_to_ui_filter.py index ffefbcc..16b7291 100644 --- a/je_load_density/gui/log_to_ui_filter.py +++ b/je_load_density/gui/log_to_ui_filter.py @@ -1,10 +1,30 @@ import logging import queue -locust_log_queue = queue.Queue() +# 建立一個佇列,用來存放攔截到的日誌訊息 +# Create a queue to store intercepted log messages +log_message_queue: queue.Queue[str] = queue.Queue() + class InterceptAllFilter(logging.Filter): + """ + 攔截所有日誌訊息並存入佇列 + Intercept all log messages and store them into a queue + + 此 Filter 可用於將 logging 模組的輸出導向 GUI 或其他處理流程。 + This filter can be used to redirect logging outputs to a GUI or other processing pipelines. + """ + + def filter(self, record: logging.LogRecord) -> bool: + """ + 攔截日誌紀錄並存入佇列 + Intercept log record and put it into the queue - def filter(self, record): - locust_log_queue.put(record.getMessage()) - return False + :param record: logging.LogRecord 物件 (Log record object) + :return: False → 阻止訊息繼續傳遞到其他 Handler + False → Prevents the message from propagating to other handlers + """ + # 只存放訊息文字,也可以改成存整個 record 以保留更多資訊 + # Only store the message text; alternatively, store the whole record for more details + log_message_queue.put(record.getMessage()) + return False \ No newline at end of file diff --git a/je_load_density/gui/main_widget.py b/je_load_density/gui/main_widget.py index 99f8534..8542b4c 100644 --- a/je_load_density/gui/main_widget.py +++ b/je_load_density/gui/main_widget.py @@ -1,27 +1,52 @@ import logging +import queue +from typing import Optional from PySide6.QtCore import QTimer from PySide6.QtGui import QIntValidator -from PySide6.QtWidgets import QWidget, QFormLayout, QLineEdit, QComboBox, QPushButton, QTextEdit, QVBoxLayout, QLabel +from PySide6.QtWidgets import ( + QWidget, QFormLayout, QLineEdit, QComboBox, + QPushButton, QTextEdit, QVBoxLayout, QLabel, QMessageBox +) from je_load_density.gui.load_density_gui_thread import LoadDensityGUIThread from je_load_density.gui.language_wrapper.multi_language_wrapper import language_wrapper -from je_load_density.gui.log_to_ui_filter import InterceptAllFilter, locust_log_queue +from je_load_density.gui.log_to_ui_filter import InterceptAllFilter, log_message_queue class LoadDensityWidget(QWidget): + """ + 負載測試 GUI 控制元件 + Load Test GUI Widget - def __init__(self, parent=None): + 提供使用者輸入測試參數並啟動負載測試, + 並將日誌訊息即時顯示在 GUI 中。 + Provides input fields for test parameters, starts load tests, + and displays log messages in real-time. + """ + + def __init__(self, parent: Optional[QWidget] = None): super().__init__(parent) - # from + + # === 表單區域 (Form Section) === form_layout = QFormLayout() + + # URL 輸入框 (Target URL input) self.url_input = QLineEdit() + + # 測試時間 (Test duration, must be int) self.test_time_input = QLineEdit() self.test_time_input.setValidator(QIntValidator()) + + # 使用者數量 (User count) self.user_count_input = QLineEdit() self.user_count_input.setValidator(QIntValidator()) + + # 生成速率 (Spawn rate) self.spawn_rate_input = QLineEdit() self.spawn_rate_input.setValidator(QIntValidator()) + + # HTTP 方法選擇 (HTTP method selection) self.method_combobox = QComboBox() self.method_combobox.addItems([ language_wrapper.language_word_dict.get("get"), @@ -32,50 +57,75 @@ def __init__(self, parent=None): language_wrapper.language_word_dict.get("head"), language_wrapper.language_word_dict.get("options"), ]) + + # 將元件加入表單 (Add widgets to form layout) form_layout.addRow(language_wrapper.language_word_dict.get("url"), self.url_input) form_layout.addRow(language_wrapper.language_word_dict.get("test_time"), self.test_time_input) form_layout.addRow(language_wrapper.language_word_dict.get("user_count"), self.user_count_input) form_layout.addRow(language_wrapper.language_word_dict.get("spawn_rate"), self.spawn_rate_input) form_layout.addRow(language_wrapper.language_word_dict.get("test_method"), self.method_combobox) + # === 啟動按鈕 (Start button) === self.start_button = QPushButton(language_wrapper.language_word_dict.get("start_button")) self.start_button.clicked.connect(self.run_load_density) - # Log panel + # === 日誌面板 (Log panel) === self.log_panel = QTextEdit() self.log_panel.setReadOnly(True) - # Add widget to vertical layout + # === 主版面配置 (Main layout) === main_layout = QVBoxLayout() main_layout.addLayout(form_layout) main_layout.addWidget(self.start_button) main_layout.addWidget(QLabel(language_wrapper.language_word_dict.get("log"))) main_layout.addWidget(self.log_panel) - # Param - self.run_load_density_thread = None + # === 執行緒與計時器 (Thread & Timer) === + self.run_load_density_thread: Optional[LoadDensityGUIThread] = None self.pull_log_timer = QTimer() - self.pull_log_timer.setInterval(20) + self.pull_log_timer.setInterval(50) # 稍微放大間隔,避免 UI 卡頓 self.pull_log_timer.timeout.connect(self.add_text_to_log) self.setLayout(main_layout) - def run_load_density(self): + def run_load_density(self) -> None: + """ + 啟動負載測試 + Start the load test + """ + try: + test_time = int(self.test_time_input.text()) + user_count = int(self.user_count_input.text()) + spawn_rate = int(self.spawn_rate_input.text()) + except ValueError: + QMessageBox.warning(self, "Invalid Input", "請輸入有效的數字\nPlease enter valid numbers") + return + self.run_load_density_thread = LoadDensityGUIThread() - self.run_load_density_thread.url = self.url_input.text() - self.run_load_density_thread.test_time = int(self.test_time_input.text()) - self.run_load_density_thread.user_count = int(self.user_count_input.text()) - self.run_load_density_thread.spawn_rate = int(self.spawn_rate_input.text()) - self.run_load_density_thread.method = self.method_combobox.currentText().lower() - log_handler_list = [handler for handler in logging.getLogger("root").handlers if handler.name == "log_reader"] + self.run_load_density_thread.request_url = self.url_input.text() + self.run_load_density_thread.test_duration = test_time + self.run_load_density_thread.user_count = user_count + self.run_load_density_thread.spawn_rate = spawn_rate + self.run_load_density_thread.http_method = self.method_combobox.currentText().lower() + + # 設定日誌攔截器 (Attach log filter) + root_logger = logging.getLogger("root") + log_handler_list = [handler for handler in root_logger.handlers if handler.name == "log_reader"] if log_handler_list: log_handler = log_handler_list[0] - log_handler.addFilter(InterceptAllFilter()) + # 避免重複新增 Filter (Prevent duplicate filters) + if not any(isinstance(f, InterceptAllFilter) for f in log_handler.filters): + log_handler.addFilter(InterceptAllFilter()) + + # 啟動執行緒與計時器 (Start thread & timer) self.run_load_density_thread.start() - self.pull_log_timer.stop() self.pull_log_timer.start() self.log_panel.clear() - def add_text_to_log(self): - if not locust_log_queue.empty(): - self.log_panel.append(locust_log_queue.get_nowait()) + def add_text_to_log(self) -> None: + """ + 將日誌訊息加入到 GUI 面板 + Append log messages to GUI panel + """ + while not log_message_queue.empty(): + self.log_panel.append(log_message_queue.get_nowait()) \ No newline at end of file diff --git a/je_load_density/gui/main_window.py b/je_load_density/gui/main_window.py index 2c4e167..8d14828 100644 --- a/je_load_density/gui/main_window.py +++ b/je_load_density/gui/main_window.py @@ -1,6 +1,7 @@ import sys +from typing import Optional -from PySide6.QtWidgets import QMainWindow, QApplication +from PySide6.QtWidgets import QMainWindow, QApplication, QWidget from qt_material import QtStyleTools from je_load_density.gui.language_wrapper.multi_language_wrapper import language_wrapper @@ -8,18 +9,41 @@ class LoadDensityUI(QMainWindow, QtStyleTools): + """ + 負載測試主視窗 + Load Test Main Window - def __init__(self): - super().__init__() + 提供 GUI 介面,整合測試控制元件與樣式設定。 + Provides the main GUI window, integrating the load test widget and applying styles. + """ + + def __init__(self, parent: Optional[QWidget] = None): + super().__init__(parent) + + # 應用程式名稱 (Application name) self.id = language_wrapper.language_word_dict.get("application_name") + + # 在 Windows 平台設定 AppUserModelID,讓工作列顯示正確的應用程式名稱 + # Set AppUserModelID on Windows so the taskbar shows the correct application name if sys.platform in ["win32", "cygwin", "msys"]: from ctypes import windll windll.shell32.SetCurrentProcessExplicitAppUserModelID(self.id) + + # 設定字體樣式 (Set font style) self.setStyleSheet( - f"font-size: 12pt;" - f"font-family: 'Lato';" + "font-size: 12pt;" + "font-family: 'Lato';" ) + + # 套用 qt-material 樣式 (Apply qt-material theme) self.apply_stylesheet(self, "dark_amber.xml") + + # 建立並設定主要控制元件 (Create and set main widget) self.load_density_widget = LoadDensityWidget() self.setCentralWidget(self.load_density_widget) +if __name__ == "__main__": + app = QApplication(sys.argv) + window = LoadDensityUI() + window.show() + sys.exit(app.exec()) \ No newline at end of file diff --git a/je_load_density/utils/callback/callback_function_executor.py b/je_load_density/utils/callback/callback_function_executor.py index a206b19..4807fe1 100644 --- a/je_load_density/utils/callback/callback_function_executor.py +++ b/je_load_density/utils/callback/callback_function_executor.py @@ -1,18 +1,41 @@ import typing from sys import stderr -from je_load_density.utils.exception.exception_tags import get_bad_trigger_function, get_bad_trigger_method +from je_load_density.utils.exception.exception_tags import ( + get_bad_trigger_function, + get_bad_trigger_method, +) from je_load_density.utils.exception.exceptions import CallbackExecutorException -from je_load_density.utils.generate_report.generate_html_report import generate_html, generate_html_report -from je_load_density.utils.generate_report.generate_json_report import generate_json, generate_json_report -from je_load_density.utils.generate_report.generate_xml_report import generate_xml, generate_xml_report +from je_load_density.utils.generate_report.generate_html_report import ( + generate_html, + generate_html_report, +) +from je_load_density.utils.generate_report.generate_json_report import ( + generate_json, + generate_json_report, +) +from je_load_density.utils.generate_report.generate_xml_report import ( + generate_xml, + generate_xml_report, +) from je_load_density.wrapper.start_wrapper.start_test import start_test -class CallbackFunctionExecutor(object): +class CallbackFunctionExecutor: + """ + 回呼函式執行器 + Callback Function Executor - def __init__(self): - self.event_dict: dict = { + 提供事件觸發與回呼機制,先執行指定的 trigger function, + 再執行 callback function。 + Provides a mechanism to trigger a function from event_dict, + then execute a callback function. + """ + + def __init__(self) -> None: + # 事件字典,定義可觸發的函式 + # Event dictionary, defines available trigger functions + self.event_dict: dict[str, typing.Callable] = { "user_test": start_test, "LD_generate_html": generate_html, "LD_generate_html_report": generate_html_report, @@ -23,25 +46,36 @@ def __init__(self): } def callback_function( - self, - trigger_function_name: str, - callback_function: typing.Callable, - callback_function_param: [dict, None] = None, - callback_param_method: str = "kwargs", - **kwargs - ): + self, + trigger_function_name: str, + callback_function: typing.Callable, + callback_function_param: typing.Optional[typing.Union[dict, list]] = None, + callback_param_method: str = "kwargs", + **kwargs, + ) -> typing.Any: """ - :param trigger_function_name: what function we want to trigger only accept function in event_dict - :param callback_function: what function we want to callback - :param callback_function_param: callback function's param only accept dict - :param callback_param_method: what type param will use on callback function only accept kwargs and args - :param kwargs: trigger_function's param - :return: + 執行事件函式並呼叫回呼函式 + Execute trigger function and then call callback function + + :param trigger_function_name: 事件函式名稱 (must exist in event_dict) + :param callback_function: 回呼函式 (callback function to execute) + :param callback_function_param: 回呼函式參數 (dict for kwargs, list for args) + :param callback_param_method: 參數傳遞方式 ("kwargs" or "args") + :param kwargs: 傳給事件函式的參數 (parameters for trigger function) + :return: 事件函式的回傳值 (return value of trigger function) """ try: - if trigger_function_name not in self.event_dict.keys(): + # 檢查事件函式是否存在 + # Validate trigger function existence + if trigger_function_name not in self.event_dict: raise CallbackExecutorException(get_bad_trigger_function) - execute_return_value = self.event_dict.get(trigger_function_name)(**kwargs) + + # 執行事件函式 + # Execute trigger function + execute_return_value = self.event_dict[trigger_function_name](**kwargs) + + # 執行回呼函式 + # Execute callback function if callback_function_param is not None: if callback_param_method not in ["kwargs", "args"]: raise CallbackExecutorException(get_bad_trigger_method) @@ -51,9 +85,15 @@ def callback_function( callback_function(*callback_function_param) else: callback_function() + return execute_return_value + except Exception as error: + # 目前只輸出錯誤,可以改成 logging 或 raise + # Currently prints error; can be replaced with logging or re-raise print(repr(error), file=stderr) -callback_executor = CallbackFunctionExecutor() +# 建立全域執行器實例 +# Create global executor instance +callback_executor = CallbackFunctionExecutor() \ No newline at end of file diff --git a/je_load_density/utils/executor/action_executor.py b/je_load_density/utils/executor/action_executor.py index 5537710..4bb19ee 100644 --- a/je_load_density/utils/executor/action_executor.py +++ b/je_load_density/utils/executor/action_executor.py @@ -4,21 +4,43 @@ from inspect import getmembers, isbuiltin from typing import Union, Any -from je_load_density.utils.exception.exception_tags import executor_data_error, add_command_exception_tag -from je_load_density.utils.exception.exception_tags import executor_list_error +from je_load_density.utils.exception.exception_tags import ( + executor_data_error, + add_command_exception_tag, + executor_list_error, +) from je_load_density.utils.exception.exceptions import LoadDensityTestExecuteException -from je_load_density.utils.generate_report.generate_html_report import generate_html, generate_html_report -from je_load_density.utils.generate_report.generate_json_report import generate_json, generate_json_report -from je_load_density.utils.generate_report.generate_xml_report import generate_xml, generate_xml_report +from je_load_density.utils.generate_report.generate_html_report import ( + generate_html, + generate_html_report, +) +from je_load_density.utils.generate_report.generate_json_report import ( + generate_json, + generate_json_report, +) +from je_load_density.utils.generate_report.generate_xml_report import ( + generate_xml, + generate_xml_report, +) from je_load_density.utils.json.json_file.json_file import read_action_json from je_load_density.utils.package_manager.package_manager_class import package_manager from je_load_density.wrapper.start_wrapper.start_test import start_test -class Executor(object): +class Executor: + """ + 執行器 (Executor) + Event-driven executor - def __init__(self): - self.event_dict = { + 提供事件字典 (event_dict),可根據動作名稱執行對應函式, + 並支援批次執行與檔案驅動。 + Provides an event dictionary to execute functions by name, + supporting batch execution and file-driven execution. + """ + + def __init__(self) -> None: + # 初始化事件字典 (Initialize event dictionary) + self.event_dict: dict[str, Any] = { "LD_start_test": start_test, "LD_generate_html": generate_html, "LD_generate_html_report": generate_html_report, @@ -26,21 +48,29 @@ def __init__(self): "LD_generate_json_report": generate_json_report, "LD_generate_xml": generate_xml, "LD_generate_xml_report": generate_xml_report, - # Execute + # Executor internal methods "LD_execute_action": self.execute_action, "LD_execute_files": self.execute_files, "LD_add_package_to_executor": package_manager.add_package_to_executor, } - # get all builtin function and add to event dict - for function in getmembers(builtins, isbuiltin): - self.event_dict.update({str(function[0]): function[1]}) - def _execute_event(self, action: list): + # 將所有 Python 內建函式加入事件字典 + # Add all Python built-in functions to event_dict + for name, func in getmembers(builtins, isbuiltin): + self.event_dict[name] = func + + def _execute_event(self, action: list) -> Any: """ - :param action: execute action - :return: what event return + 執行單一事件 + Execute a single event + + :param action: 事件結構,例如 ["function_name", {"param": value}] + :return: 事件回傳值 (return value of executed event) """ event = self.event_dict.get(action[0]) + if event is None: + raise LoadDensityTestExecuteException(executor_data_error + " " + str(action)) + if len(action) == 2: if isinstance(action[1], dict): return event(**action[1]) @@ -51,68 +81,87 @@ def _execute_event(self, action: list): else: raise LoadDensityTestExecuteException(executor_data_error + " " + str(action)) - def execute_action(self, action_list: [list, dict]) -> dict: + def execute_action(self, action_list: Union[list, dict]) -> dict[str, Any]: """ - execute all action in action list - :param action_list: like this structure - [ - ["method on event_dict", {"param": params}], - ["method on event_dict", {"param": params}] - ] - for loop and use execute_event function to execute - :return: recode string, response as list + 執行多個事件 + Execute multiple actions + + :param action_list: 事件列表,例如: + [ + ["LD_start_test", {"param": value}], + ["LD_generate_json", {"param": value}] + ] + :return: 執行紀錄字典 (execution record dict) """ if isinstance(action_list, dict): action_list = action_list.get("load_density", None) if action_list is None: raise LoadDensityTestExecuteException(executor_list_error) - execute_record_dict = dict() + + execute_record_dict: dict[str, Any] = {} + try: - if len(action_list) == 0 or isinstance(action_list, list) is False: + if not isinstance(action_list, list) or len(action_list) == 0: raise LoadDensityTestExecuteException(executor_list_error) except Exception as error: print(repr(error), file=sys.stderr) + for action in action_list: try: event_response = self._execute_event(action) - execute_record = "execute: " + str(action) - execute_record_dict.update({execute_record: event_response}) + execute_record = f"execute: {action}" + execute_record_dict[execute_record] = event_response except Exception as error: print(repr(error), file=sys.stderr) print(action, file=sys.stderr) - execute_record = "execute: " + str(action) - execute_record_dict.update({execute_record: repr(error)}) + execute_record = f"execute: {action}" + execute_record_dict[execute_record] = repr(error) + + # 輸出執行結果 (Print execution results) for key, value in execute_record_dict.items(): print(key) print(value) + return execute_record_dict - def execute_files(self, execute_files_list: list): + def execute_files(self, execute_files_list: list[str]) -> list[dict[str, Any]]: """ - execute action on all file in execute_files_list - :param execute_files_list: list include execute files path - :return: every execute detail as list + 執行檔案中的事件 + Execute actions from files + + :param execute_files_list: 檔案路徑列表 (list of file paths) + :return: 每個檔案的執行結果列表 (list of execution results per file) """ - execute_detail_list = list() + execute_detail_list: list[dict[str, Any]] = [] for file in execute_files_list: execute_detail_list.append(self.execute_action(read_action_json(file))) return execute_detail_list + +# 建立全域執行器 (Global executor instance) executor = Executor() package_manager.executor = executor -def add_command_to_executor(command_dict: dict): +def add_command_to_executor(command_dict: dict[str, Any]) -> None: + """ + 新增自訂命令到執行器 + Add custom commands to executor + + :param command_dict: {command_name: function} + """ for command_name, command in command_dict.items(): if isinstance(command, (types.MethodType, types.FunctionType)): - executor.event_dict.update({command_name: command}) + executor.event_dict[command_name] = command else: raise LoadDensityTestExecuteException(add_command_exception_tag) -def execute_action(action_list: list) -> dict: +def execute_action(action_list: list) -> dict[str, Any]: + """全域執行事件 (Global execute action)""" return executor.execute_action(action_list) -def execute_files(execute_files_list: list) -> list: - return executor.execute_files(execute_files_list) +def execute_files(execute_files_list: list[str]) -> list[dict[str, Any]]: + """全域執行檔案事件 (Global execute files)""" + return executor.execute_files(execute_files_list) \ No newline at end of file diff --git a/je_load_density/utils/file_process/create_project_structure.py b/je_load_density/utils/file_process/create_project_structure.py index 52828bb..af6ba05 100644 --- a/je_load_density/utils/file_process/create_project_structure.py +++ b/je_load_density/utils/file_process/create_project_structure.py @@ -1,17 +1,35 @@ from pathlib import Path +from typing import Optional +# 定義常數,避免硬編碼 +# Define constant to avoid hard-coded string +TEMPLATE_DIR = "je_load_density/template" -def _create_dir(dir_name: str): + +def _create_dir(dir_name: str) -> Optional[Path]: """ - create project dir - :param dir_name: create dir use dir name - :return: None + 建立專案目錄 + Create project directory + + :param dir_name: 要建立的目錄名稱 (Directory name to create) + :return: 成功建立或已存在的 Path 物件 (Path object if created/existed), None if failed """ - Path(dir_name).mkdir( - parents=True, - exist_ok=True - ) + try: + path = Path(dir_name) + path.mkdir(parents=True, exist_ok=True) + return path + except Exception as error: + # 錯誤處理:避免因權限或路徑問題導致程式崩潰 + # Error handling: prevent crash due to permission or path issues + print(f"Failed to create directory {dir_name}: {error}") + return None -def create_template_dir(): - _create_dir("je_load_density/template") +def create_template_dir() -> Optional[Path]: + """ + 建立模板目錄 + Create template directory + + :return: Path 物件或 None (Path object or None) + """ + return _create_dir(TEMPLATE_DIR) \ No newline at end of file diff --git a/je_load_density/utils/file_process/get_dir_file_list.py b/je_load_density/utils/file_process/get_dir_file_list.py index aa28fe6..690de02 100644 --- a/je_load_density/utils/file_process/get_dir_file_list.py +++ b/je_load_density/utils/file_process/get_dir_file_list.py @@ -1,18 +1,28 @@ -from os import getcwd -from os import walk -from os.path import abspath -from os.path import join +from pathlib import Path +from typing import List -def get_dir_files_as_list(dir_path: str = getcwd(), default_search_file_extension: str = ".json") -> list: +def get_dir_files_as_list( + dir_path: str = str(Path.cwd()), + default_search_file_extension: str = ".json" +) -> List[str]: """ - get dir file when end with default_search_file_extension - :param dir_path: which dir we want to walk and get file list - :param default_search_file_extension: which extension we want to search - :return: [] if nothing searched or [file1, file2.... files] file was searched + 取得指定目錄下所有符合副檔名的檔案清單 + Get all files in a directory that end with the given extension + + :param dir_path: 要搜尋的目錄路徑 (Directory path to search) + :param default_search_file_extension: 要搜尋的副檔名 (File extension to search, e.g. ".json") + :return: 檔案絕對路徑清單 (List of absolute file paths) """ - return [ - abspath(join(dir_path, file)) for root, dirs, files in walk(dir_path) - for file in files - if file.endswith(default_search_file_extension.lower()) - ] + try: + path_obj = Path(dir_path) + if not path_obj.exists() or not path_obj.is_dir(): + raise FileNotFoundError(f"Directory not found: {dir_path}") + + return [ + str(file.resolve()) + for file in path_obj.rglob(f"*{default_search_file_extension.lower()}") + ] + except Exception as error: + print(f"Error while scanning directory {dir_path}: {error}") + return [] \ No newline at end of file diff --git a/je_load_density/utils/generate_report/generate_html_report.py b/je_load_density/utils/generate_report/generate_html_report.py index 08525b2..e5097bb 100644 --- a/je_load_density/utils/generate_report/generate_html_report.py +++ b/je_load_density/utils/generate_report/generate_html_report.py @@ -1,222 +1,129 @@ import sys from threading import Lock +from typing import List, Tuple from je_load_density.utils.exception.exceptions import LoadDensityHTMLException from je_load_density.utils.exception.exception_tags import html_generate_no_data_tag from je_load_density.utils.test_record.test_record_class import test_record_instance -_html_string_head = \ - """ - - - - - Load Density Report - - - -

- Test Report -

- """.strip() +# HTML 標頭 (HTML head) +_HTML_STRING_HEAD = """ + + + + Load Density Report + + + +

Test Report

+""".strip() -_html_string_bottom = \ - """ - - - """.strip() +# HTML 結尾 (HTML bottom) +_HTML_STRING_BOTTOM = """""".strip() -_success_table = \ - r""" - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Test Report
Method{Method}
test_url{test_url}
name{name}
status_code{status_code}
text{text}
content{content}
headers{headers}
-
- """.strip() +# 成功測試表格模板 (Success table template) +_SUCCESS_TABLE = r""" + + + + + + + + + + + + + +
Test Report
Method{Method}
test_url{test_url}
name{name}
status_code{status_code}
text{text}
content{content}
headers{headers}
+
+""".strip() -_failure_table = \ - r""" - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Test Report
http_method{http_method}
test_url{test_url}
name{name}
status_code{status_code}
error{error}
-
- """.strip() +# 失敗測試表格模板 (Failure table template) +_FAILURE_TABLE = r""" + + + + + + + + + + + +
Test Report
http_method{http_method}
test_url{test_url}
name{name}
status_code{status_code}
error{error}
+
+""".strip() -def generate_html(): +def generate_html() -> Tuple[List[str], List[str]]: """ - :return: success_test_dict, failure_test_dict + 產生 HTML 片段 (Generate HTML fragments) + + :return: (成功測試清單, 失敗測試清單) + (list of success test HTML fragments, list of failure test HTML fragments) """ - if len(test_record_instance.test_record_list) == 0 and len(test_record_instance.error_record_list) == 0: + if not test_record_instance.test_record_list and not test_record_instance.error_record_list: raise LoadDensityHTMLException(html_generate_no_data_tag) - else: - success_list = list() - for record_data in test_record_instance.test_record_list: - success_list.append( - _success_table.format( - Method=record_data.get("Method"), - test_url=record_data.get("test_url"), - name=record_data.get("name"), - status_code=record_data.get("status_code"), - text=record_data.get("text"), - content=record_data.get("content"), - headers=record_data.get("headers"), - ) - ) - failure_list = list() - if len(test_record_instance.error_record_list) == 0: - pass - else: - for record_data in test_record_instance.error_record_list: - failure_list.append( - _failure_table.format( - http_method=record_data.get("Method"), - test_url=record_data.get("test_url"), - name=record_data.get("name"), - status_code=record_data.get("status_code"), - error=record_data.get("error"), - ) - ) + + success_list: List[str] = [ + _SUCCESS_TABLE.format( + Method=record.get("Method"), + test_url=record.get("test_url"), + name=record.get("name"), + status_code=record.get("status_code"), + text=record.get("text"), + content=record.get("content"), + headers=record.get("headers"), + ) + for record in test_record_instance.test_record_list + ] + + failure_list: List[str] = [ + _FAILURE_TABLE.format( + http_method=record.get("Method"), + test_url=record.get("test_url"), + name=record.get("name"), + status_code=record.get("status_code"), + error=record.get("error"), + ) + for record in test_record_instance.error_record_list + ] + return success_list, failure_list -def generate_html_report(html_name: str = "default_name"): +def generate_html_report(html_name: str = "default_name") -> str: """ - format html_string and output html file - :param html_name: save html file name - :return: html_string + 產生完整 HTML 報告並輸出檔案 + Generate full HTML report and save to file + + :param html_name: 輸出檔案名稱 (Output file name, without extension) + :return: HTML 字串 (HTML string) """ _lock = Lock() success_list, failure_list = generate_html() + try: - _lock.acquire() - with open(html_name + ".html", "w+") as file_to_write: - file_to_write.writelines( - _html_string_head - ) - for success in success_list: - file_to_write.write(success) - for failure in failure_list: - file_to_write.write(failure) - file_to_write.writelines( - _html_string_bottom - ) + with _lock: # 使用 with 確保自動 acquire/release + html_path = f"{html_name}.html" + with open(html_path, "w+", encoding="utf-8") as file_to_write: + file_to_write.write(_HTML_STRING_HEAD) + file_to_write.writelines(success_list) + file_to_write.writelines(failure_list) + file_to_write.write(_HTML_STRING_BOTTOM) + return html_path except Exception as error: print(repr(error), file=sys.stderr) - finally: - _lock.release() + return "" \ No newline at end of file diff --git a/je_load_density/utils/generate_report/generate_json_report.py b/je_load_density/utils/generate_report/generate_json_report.py index ad1be6c..e082d13 100644 --- a/je_load_density/utils/generate_report/generate_json_report.py +++ b/je_load_density/utils/generate_report/generate_json_report.py @@ -1,72 +1,76 @@ import json import sys from threading import Lock +from typing import Tuple, Dict from je_load_density.utils.exception.exception_tags import cant_generate_json_report from je_load_density.utils.exception.exceptions import LoadDensityGenerateJsonReportException from je_load_density.utils.test_record.test_record_class import test_record_instance -def generate_json(): - if len(test_record_instance.test_record_list) == 0 and len(test_record_instance.error_record_list) == 0: +def generate_json() -> Tuple[Dict[str, dict], Dict[str, dict]]: + """ + 產生測試紀錄的 JSON 結構 + Generate JSON structure for test records + + :return: (成功測試字典, 失敗測試字典) + (success_dict, failure_dict) + """ + if not test_record_instance.test_record_list and not test_record_instance.error_record_list: raise LoadDensityGenerateJsonReportException(cant_generate_json_report) - else: - success_dict = dict() - failure_dict = dict() - failure_count: int = 1 - failure_test_str: str = "Failure_Test" - success_count: int = 1 - success_test_str: str = "Success_Test" - for record_data in test_record_instance.test_record_list: - success_dict.update( - { - success_test_str + str(success_count): { - "Method": str(record_data.get("Method")), - "test_url": str(record_data.get("test_url")), - "name": str(record_data.get("name")), - "status_code": str(record_data.get("status_code")), - "text": str(record_data.get("text")), - "content": str(record_data.get("content")), - "headers": str(record_data.get("headers")) - } - } - ) - success_count = success_count + 1 - for record_data in test_record_instance.error_record_list: - failure_dict.update( - { - failure_test_str + str(failure_count): { - "Method": str(record_data.get("Method")), - "test_url": str(record_data.get("test_url")), - "name": str(record_data.get("name")), - "status_code": str(record_data.get("status_code")), - "error": str(record_data.get("error")) - } - } - ) - failure_count = failure_count + 1 + + success_dict: Dict[str, dict] = {} + failure_dict: Dict[str, dict] = {} + + # 成功測試紀錄 (Success records) + for idx, record_data in enumerate(test_record_instance.test_record_list, start=1): + success_dict[f"Success_Test{idx}"] = { + "Method": str(record_data.get("Method")), + "test_url": str(record_data.get("test_url")), + "name": str(record_data.get("name")), + "status_code": str(record_data.get("status_code")), + "text": str(record_data.get("text")), + "content": str(record_data.get("content")), + "headers": str(record_data.get("headers")), + } + + # 失敗測試紀錄 (Failure records) + for idx, record_data in enumerate(test_record_instance.error_record_list, start=1): + failure_dict[f"Failure_Test{idx}"] = { + "Method": str(record_data.get("Method")), + "test_url": str(record_data.get("test_url")), + "name": str(record_data.get("name")), + "status_code": str(record_data.get("status_code")), + "error": str(record_data.get("error")), + } + return success_dict, failure_dict -def generate_json_report(json_file_name: str = "default_name"): +def generate_json_report(json_file_name: str = "default_name") -> Tuple[str, str]: """ - :param json_file_name: save json file's name + 輸出測試紀錄 JSON 報告 + Generate JSON report files for test records + + :param json_file_name: 輸出檔案名稱前綴 (Output file name prefix) + :return: (成功檔案路徑, 失敗檔案路徑) """ lock = Lock() success_dict, failure_dict = generate_json() + + success_path = f"{json_file_name}_success.json" + failure_path = f"{json_file_name}_failure.json" + try: - lock.acquire() - with open(json_file_name + "_success.json", "w+") as file_to_write: - json.dump(dict(success_dict), file_to_write, indent=4) - except Exception as error: - print(repr(error), file=sys.stderr) - finally: - lock.release() - try: - lock.acquire() - with open(json_file_name + "_failure.json", "w+") as file_to_write: - json.dump(dict(failure_dict), file_to_write, indent=4) + with lock: # 使用 with 確保自動 acquire/release + with open(success_path, "w+", encoding="utf-8") as file_to_write: + json.dump(success_dict, file_to_write, indent=4, ensure_ascii=False) + + with open(failure_path, "w+", encoding="utf-8") as file_to_write: + json.dump(failure_dict, file_to_write, indent=4, ensure_ascii=False) + + return success_path, failure_path + except Exception as error: print(repr(error), file=sys.stderr) - finally: - lock.release() + return "", "" \ No newline at end of file diff --git a/je_load_density/utils/generate_report/generate_xml_report.py b/je_load_density/utils/generate_report/generate_xml_report.py index cc9fb1b..11dd7c0 100644 --- a/je_load_density/utils/generate_report/generate_xml_report.py +++ b/je_load_density/utils/generate_report/generate_xml_report.py @@ -1,45 +1,58 @@ import sys from threading import Lock from xml.dom.minidom import parseString +from typing import Tuple + from je_load_density.utils.generate_report.generate_json_report import generate_json from je_load_density.utils.xml.change_xml_structure.change_xml_structure import dict_to_elements_tree -def generate_xml(): +def generate_xml() -> Tuple[str, str]: """ - :return: + 產生 XML 字串 (Generate XML strings) + + :return: (成功測試 XML 字串, 失敗測試 XML 字串) + (success_xml_str, failure_xml_str) """ success_dict, failure_dict = generate_json() - success_dict = dict({"xml_data": success_dict}) - failure_dict = dict({"xml_data": failure_dict}) - success_json_to_xml = dict_to_elements_tree(success_dict) - failure_json_to_xml = dict_to_elements_tree(failure_dict) - return success_json_to_xml, failure_json_to_xml + # 包裝成 xml_data 根節點 (Wrap into xml_data root node) + success_dict = {"xml_data": success_dict} + failure_dict = {"xml_data": failure_dict} + + success_xml_str = dict_to_elements_tree(success_dict) + failure_xml_str = dict_to_elements_tree(failure_dict) + + return success_xml_str, failure_xml_str -def generate_xml_report(xml_file_name: str = "default_name"): + +def generate_xml_report(xml_file_name: str = "default_name") -> Tuple[str, str]: """ - :param xml_file_name: + 輸出 XML 報告檔案 (Generate XML report files) + + :param xml_file_name: 輸出檔案名稱前綴 (Output file name prefix) + :return: (成功檔案路徑, 失敗檔案路徑) """ - success_xml, failure_xml = generate_xml() - success_xml = parseString(success_xml) - failure_xml = parseString(failure_xml) - success_xml = success_xml.toprettyxml() - failure_xml = failure_xml.toprettyxml() + success_xml_str, failure_xml_str = generate_xml() + + # 使用 minidom 美化輸出 (Pretty print XML using minidom) + success_xml = parseString(success_xml_str).toprettyxml() + failure_xml = parseString(failure_xml_str).toprettyxml() + lock = Lock() + success_path = f"{xml_file_name}_success.xml" + failure_path = f"{xml_file_name}_failure.xml" + try: - lock.acquire() - with open(xml_file_name + "_failure.xml", "w+") as file_to_write: - file_to_write.write(failure_xml) - except Exception as error: - print(repr(error), file=sys.stderr) - finally: - lock.release() - try: - lock.acquire() - with open(xml_file_name + "_success.xml", "w+") as file_to_write: - file_to_write.write(success_xml) + with lock: # 使用 with 確保自動 acquire/release + with open(failure_path, "w+", encoding="utf-8") as file_to_write: + file_to_write.write(failure_xml) + + with open(success_path, "w+", encoding="utf-8") as file_to_write: + file_to_write.write(success_xml) + + return success_path, failure_path + except Exception as error: print(repr(error), file=sys.stderr) - finally: - lock.release() + return "", "" \ No newline at end of file diff --git a/je_load_density/utils/get_data_strcture/get_api_data.py b/je_load_density/utils/get_data_strcture/get_api_data.py index db68bf9..c8b3d5f 100644 --- a/je_load_density/utils/get_data_strcture/get_api_data.py +++ b/je_load_density/utils/get_data_strcture/get_api_data.py @@ -1,34 +1,59 @@ +import requests import requests.exceptions +from typing import Union, Dict -def get_api_response_data(response: requests.Response, - start_time: [str, float, int], - end_time: [str, float, int]) -> dict: +def get_api_response_data( + response: requests.Response, + start_time: Union[str, float, int], + end_time: Union[str, float, int] +) -> Dict[str, Union[str, int, dict, bytes]]: """ - use requests response to create data dict - :param response: requests response - :param start_time: test start time - :param end_time: test end time - :return: data dict include [status_code, text, content, headers, history, encoding, cookies, - elapsed, request_time_sec, request_method, request_url, request_body, start_time, end_time] + 使用 requests.Response 建立測試資料字典 + Create a data dictionary from requests.Response + + :param response: requests response 物件 (requests response object) + :param start_time: 測試開始時間 (test start time) + :param end_time: 測試結束時間 (test end time) + :return: 包含以下欄位的字典 (dictionary including): + - status_code + - text + - content + - headers + - encoding + - cookies + - elapsed + - history + - request_method + - request_url + - request_body + - start_time + - end_time + - json (if status_code == 200 and response.json() is valid) """ - response_data = { + response_data: Dict[str, Union[str, int, dict, bytes]] = { "status_code": response.status_code, "text": response.text, "content": response.content, - "headers": response.headers, + "headers": dict(response.headers), "encoding": response.encoding, + "cookies": response.cookies.get_dict(), + "elapsed": response.elapsed.total_seconds() if response.elapsed else None, + "history": [r.url for r in response.history] if response.history else [], "request_method": response.request.method, "request_url": response.request.url, "request_body": response.request.body, "start_time": start_time, - "end_time": end_time + "end_time": end_time, } + + # 嘗試解析 JSON (Try parsing JSON) try: - if response_data.get("status_code") == 200: - response_data.update({"json": response.json()}) + if response.status_code == 200: + response_data["json"] = response.json() else: - response_data.update({"json": None}) - except requests.exceptions.JSONDecodeError: - response_data.update({"json": None}) - return response_data + response_data["json"] = None + except (requests.exceptions.JSONDecodeError, ValueError): + response_data["json"] = None + + return response_data \ No newline at end of file diff --git a/je_load_density/utils/json/json_file/json_file.py b/je_load_density/utils/json/json_file/json_file.py index 146c1e2..a71c678 100644 --- a/je_load_density/utils/json/json_file/json_file.py +++ b/je_load_density/utils/json/json_file/json_file.py @@ -1,42 +1,49 @@ import json from pathlib import Path from threading import Lock +from typing import Any, Union from je_load_density.utils.exception.exceptions import LoadDensityTestJsonException -from je_load_density.utils.exception.exception_tags import cant_find_json_error -from je_load_density.utils.exception.exception_tags import cant_save_json_error +from je_load_density.utils.exception.exception_tags import cant_find_json_error, cant_save_json_error -def read_action_json(json_file_path: str): +def read_action_json(json_file_path: str) -> Union[dict, list]: """ - read json include actions - :param json_file_path json file's path to read + 讀取 JSON 檔案並回傳內容 + Read JSON file and return its content + + :param json_file_path: JSON 檔案路徑 (path to JSON file) + :return: JSON 內容 (dict or list) + :raises LoadDensityTestJsonException: 當檔案不存在或無法讀取時 (if file not found or cannot be read) """ - _lock = Lock() + lock = Lock() try: - _lock.acquire() - file_path = Path(json_file_path) - if file_path.exists() and file_path.is_file(): - with open(json_file_path) as read_file: - return json.load(read_file) - except LoadDensityTestJsonException: - raise LoadDensityTestJsonException(cant_find_json_error) - finally: - _lock.release() + with lock: + file_path = Path(json_file_path) + if file_path.exists() and file_path.is_file(): + with open(json_file_path, "r", encoding="utf-8") as read_file: + return json.load(read_file) + else: + raise LoadDensityTestJsonException(cant_find_json_error) + except Exception as error: + # 捕捉所有錯誤並轉換成自訂例外 + # Catch all errors and raise custom exception + raise LoadDensityTestJsonException(f"{cant_find_json_error}: {error}") -def write_action_json(json_save_path: str, action_json: list): +def write_action_json(json_save_path: str, action_json: Union[dict, list]) -> None: """ - write json file - :param json_save_path json save path - :param action_json the json str include action to write + 將資料寫入 JSON 檔案 + Write data into JSON file + + :param json_save_path: JSON 檔案儲存路徑 (path to save JSON file) + :param action_json: 要寫入的資料 (data to write, dict or list) + :raises LoadDensityTestJsonException: 當檔案無法寫入時 (if file cannot be saved) """ - _lock = Lock() + lock = Lock() try: - _lock.acquire() - with open(json_save_path, "w+") as file_to_write: - file_to_write.write(json.dumps(action_json, indent=4)) - except LoadDensityTestJsonException: - raise LoadDensityTestJsonException(cant_save_json_error) - finally: - _lock.release() + with lock: + with open(json_save_path, "w+", encoding="utf-8") as file_to_write: + json.dump(action_json, file_to_write, indent=4, ensure_ascii=False) + except Exception as error: + raise LoadDensityTestJsonException(f"{cant_save_json_error}: {error}") \ No newline at end of file diff --git a/je_load_density/utils/logging/loggin_instance.py b/je_load_density/utils/logging/loggin_instance.py index dfbbde8..23feede 100644 --- a/je_load_density/utils/logging/loggin_instance.py +++ b/je_load_density/utils/logging/loggin_instance.py @@ -1,15 +1,58 @@ import logging import sys +from logging.handlers import RotatingFileHandler -load_density_logger = logging.getLogger("LoadDensity") -load_density_logger.setLevel(logging.INFO) -formatter = logging.Formatter('%(asctime)s | %(name)s | %(levelname)s | %(message)s') -# Stream handler -stream_handler = logging.StreamHandler(stream=sys.stderr) -stream_handler.setFormatter(formatter) -stream_handler.setLevel(logging.WARNING) -load_density_logger.addHandler(stream_handler) -# File handler -file_handler = logging.FileHandler(filename="LoadDensity.log", mode="w") -file_handler.setFormatter(formatter) -load_density_logger.addHandler(file_handler) + +class LoadDensityLogger: + """ + 封裝日誌系統 + Encapsulated logging system with rotating file handler + """ + + def __init__(self, + logger_name: str = "LoadDensity", + log_file: str = "LoadDensity.log", + max_bytes: int = 1024 * 1024 * 1024, # 1GB + backup_count: int = 5): + """ + 初始化 Logger + Initialize logger + + :param logger_name: Logger 名稱 (Logger name) + :param log_file: 日誌檔案名稱 (Log file name) + :param max_bytes: 單一檔案最大大小 (Max file size in bytes) + :param backup_count: 保留檔案數量 (Number of backup files) + """ + self.logger = logging.getLogger(logger_name) + self.logger.setLevel(logging.INFO) + + formatter = logging.Formatter( + '%(asctime)s | %(name)s | %(levelname)s | %(message)s' + ) + + # Stream handler (輸出到 stderr) + stream_handler = logging.StreamHandler(stream=sys.stderr) + stream_handler.setFormatter(formatter) + stream_handler.setLevel(logging.WARNING) + + # Rotating file handler (檔案大小限制 + 輪替) + file_handler = RotatingFileHandler( + filename=log_file, + mode="a", + maxBytes=max_bytes, + backupCount=backup_count, + encoding="utf-8" + ) + file_handler.setFormatter(formatter) + file_handler.setLevel(logging.INFO) + + # 加入 handlers + self.logger.addHandler(stream_handler) + self.logger.addHandler(file_handler) + + def get_logger(self) -> logging.Logger: + """取得 logger 實例 (Get logger instance)""" + return self.logger + + +load_density_logger = LoadDensityLogger().get_logger() diff --git a/je_load_density/utils/package_manager/package_manager_class.py b/je_load_density/utils/package_manager/package_manager_class.py index a9a1ee4..a461774 100644 --- a/je_load_density/utils/package_manager/package_manager_class.py +++ b/je_load_density/utils/package_manager/package_manager_class.py @@ -2,35 +2,61 @@ from importlib.util import find_spec from inspect import getmembers, isfunction from sys import stderr +from typing import Optional, Any -class PackageManager(object): +class PackageManager: + """ + 套件管理器 + Package Manager - def __init__(self): - self.installed_package_dict = { - } - self.executor = None + 用於動態載入套件並將其函式加入到 Executor 的事件字典。 + Used to dynamically load packages and register their functions into an Executor. + """ - def check_package(self, package: str): - if self.installed_package_dict.get(package, None) is None: + def __init__(self) -> None: + # 已載入的套件字典 (Dictionary of loaded packages) + self.installed_package_dict: dict[str, Any] = {} + # Executor 參考 (Reference to Executor instance) + self.executor: Optional[Any] = None + + def load_package_if_available(self, package: str) -> Optional[Any]: + """ + 嘗試載入套件 (Try to load a package) + + :param package: 套件名稱 (Package name) + :return: 套件模組或 None (Loaded module or None) + """ + if package not in self.installed_package_dict: found_spec = find_spec(package) if found_spec is not None: try: installed_package = import_module(found_spec.name) - self.installed_package_dict.update({found_spec.name: installed_package}) + self.installed_package_dict[found_spec.name] = installed_package except ModuleNotFoundError as error: print(repr(error), file=stderr) - return self.installed_package_dict.get(package, None) + return None + else: + return None + return self.installed_package_dict.get(package) + + def add_package_to_executor(self, package: str) -> None: + """ + 將套件的所有函式加入 Executor 的事件字典 + Add all functions from a package into the Executor's event dictionary - def add_package_to_executor(self, package): - installed_package = self.check_package(package) + :param package: 套件名稱 (Package name) + """ + installed_package = self.load_package_if_available(package) if installed_package is not None and self.executor is not None: - for function in getmembers(installed_package, isfunction): - self.executor.event_dict.update({str(function): function}) + for name, function in getmembers(installed_package, isfunction): + self.executor.event_dict[name] = function elif installed_package is None: print(repr(ModuleNotFoundError(f"Can't find package {package}")), file=stderr) else: - print(f"Executor error {self.executor}", file=stderr) + print(f"Executor error: {self.executor}", file=stderr) -package_manager = PackageManager() +# 建立全域 PackageManager 實例 +# Create global PackageManager instance +package_manager = PackageManager() \ No newline at end of file diff --git a/je_load_density/utils/project/create_project_structure.py b/je_load_density/utils/project/create_project_structure.py index 94cb525..efa35f6 100644 --- a/je_load_density/utils/project/create_project_structure.py +++ b/je_load_density/utils/project/create_project_structure.py @@ -1,58 +1,73 @@ from os import getcwd from pathlib import Path from threading import Lock +from typing import Optional from je_load_density.utils.json.json_file.json_file import write_action_json -from je_load_density.utils.project.template.template_executor import executor_template_1, \ - executor_template_2 -from je_load_density.utils.project.template.template_keyword import template_keyword_1, \ - template_keyword_2 +from je_load_density.utils.project.template.template_executor import executor_template_1, executor_template_2 +from je_load_density.utils.project.template.template_keyword import template_keyword_1, template_keyword_2 def create_dir(dir_name: str) -> None: """ - :param dir_name: create dir use dir name - :return: None + 建立目錄 + Create directory + + :param dir_name: 要建立的目錄名稱 (Directory name to create) """ - Path(dir_name).mkdir( - parents=True, - exist_ok=True - ) + Path(dir_name).mkdir(parents=True, exist_ok=True) + +def create_template(parent_name: str, project_path: Optional[str] = None) -> None: + """ + 建立模板檔案 (Create template files) -def create_template(parent_name: str, project_path: str = None) -> None: + :param parent_name: 專案主目錄名稱 (Project parent folder name) + :param project_path: 專案路徑 (Project path), 預設為當前工作目錄 (default: current working directory) + """ if project_path is None: project_path = getcwd() - keyword_dir_path = Path(project_path + "/" + parent_name + "/keyword") - executor_dir_path = Path(project_path + "/" + parent_name + "/executor") - lock = Lock() + + project_root = Path(project_path) / parent_name + keyword_dir_path = project_root / "keyword" + executor_dir_path = project_root / "executor" + + # 建立 keyword JSON 檔案 if keyword_dir_path.exists() and keyword_dir_path.is_dir(): - write_action_json(project_path + "/" + parent_name + "/keyword/keyword1.json", template_keyword_1) - write_action_json(project_path + "/" + parent_name + "/keyword/keyword2.json", template_keyword_2) - if executor_dir_path.exists() and keyword_dir_path.is_dir(): - lock.acquire() - try: - with open(project_path + "/" + parent_name + "/executor/executor_one_file.py", "w+") as file: + write_action_json(str(keyword_dir_path) + "keyword1.json", template_keyword_1) + write_action_json(str(keyword_dir_path) + "keyword2.json", template_keyword_2) + + # 建立 executor Python 檔案 + if executor_dir_path.exists() and executor_dir_path.is_dir(): + lock = Lock() + with lock: + with open(executor_dir_path / "executor_one_file.py", "w+", encoding="utf-8") as file: file.write( executor_template_1.replace( "{temp}", - project_path + "/" + parent_name + "/keyword/keyword1.json" + str(keyword_dir_path / "keyword1.json") ) ) - with open(project_path + "/" + parent_name + "/executor/executor_folder.py", "w+") as file: + with open(executor_dir_path / "executor_folder.py", "w+", encoding="utf-8") as file: file.write( executor_template_2.replace( "{temp}", - project_path + "/" + parent_name + "/keyword" + str(keyword_dir_path) ) ) - finally: - lock.release() -def create_project_dir(project_path: str = None, parent_name: str = "LoadDensity") -> None: +def create_project_dir(project_path: Optional[str] = None, parent_name: str = "LoadDensity") -> None: + """ + 建立專案目錄結構 (Create project directory structure) + + :param project_path: 專案路徑 (Project path), 預設為當前工作目錄 (default: current working directory) + :param parent_name: 專案主目錄名稱 (Project parent folder name) + """ if project_path is None: project_path = getcwd() - create_dir(project_path + "/" + parent_name + "/keyword") - create_dir(project_path + "/" + parent_name + "/executor") - create_template(parent_name) + + project_root = Path(project_path) / parent_name + create_dir(str(project_root) + "keyword") + create_dir(str(project_root) + "executor") + create_template(parent_name, project_path) \ No newline at end of file diff --git a/je_load_density/utils/socket_server/load_density_socket_server.py b/je_load_density/utils/socket_server/load_density_socket_server.py index 3f412a8..16a3a5e 100644 --- a/je_load_density/utils/socket_server/load_density_socket_server.py +++ b/je_load_density/utils/socket_server/load_density_socket_server.py @@ -1,6 +1,7 @@ import json import sys from socket import AF_INET, SOCK_STREAM +from typing import Any import gevent from gevent import monkey @@ -9,53 +10,86 @@ from je_load_density.utils.executor.action_executor import execute_action -class TCPServer(object): +class TCPServer: + """ + 基於 gevent 的 TCP 伺服器 + TCP server based on gevent - def __init__(self): + - 接收 JSON 指令並執行對應動作 + - 支援 "quit_server" 指令來關閉伺服器 + """ + + def __init__(self) -> None: self.close_flag: bool = False self.server: socket.socket = socket.socket(AF_INET, SOCK_STREAM) - def socket_server(self, host: str, port: int): + def socket_server(self, host: str, port: int) -> None: + """ + 啟動伺服器 + Start the TCP server + + :param host: 伺服器主機位址 (Server host) + :param port: 伺服器埠號 (Server port) + """ self.server.bind((host, port)) self.server.listen() + print(f"Server started on {host}:{port}", flush=True) + while not self.close_flag: - connection = self.server.accept()[0] - gevent.spawn(self.handle, connection) - sys.exit(0) - - def handle(self, connection): - connection_data = connection.recv(8192) - command_string = str(connection_data.strip(), encoding="utf-8") - print("command is: " + command_string, flush=True) - if command_string == "quit_server": - connection.close() - self.close_flag = True - self.server.close() - print("Now quit server", flush=True) - else: try: - execute_str = json.loads(command_string) - if execute_str is not None: - for execute_return in execute_action(execute_str).values(): - connection.send(str(execute_return).encode("utf-8")) - connection.send("\n".encode("utf-8")) - connection.send("Return_Data_Over_JE".encode("utf-8")) - connection.send("\n".encode("utf-8")) + connection, _ = self.server.accept() + gevent.spawn(self.handle, connection) except Exception as error: + print(f"Server error: {error}", file=sys.stderr) + break + + self.server.close() + print("Server shutdown complete", flush=True) + + def handle(self, connection: socket.socket) -> None: + """ + 處理單一連線 + Handle a single connection + + :param connection: 客戶端連線 (Client connection) + """ + try: + connection_data = connection.recv(8192) + if not connection_data: + return + + command_string = connection_data.strip().decode("utf-8") + print(f"Command received: {command_string}", flush=True) + + if command_string == "quit_server": + self.close_flag = True + connection.send(b"Server shutting down\n") + print("Now quit server", flush=True) + else: try: - connection.send(str(error).encode("utf-8")) - connection.send("\n".encode("utf-8")) - connection.send("Return_Data_Over_JE".encode("utf-8")) - connection.send("\n".encode("utf-8")) + execute_str: Any = json.loads(command_string) + if execute_str is not None: + for execute_return in execute_action(execute_str).values(): + connection.send(f"{execute_return}\n".encode("utf-8")) + connection.send(b"Return_Data_Over_JE\n") except Exception as error: - print(repr(error)) - sys.exit(1) - finally: - connection.close() + connection.send(f"Error: {error}\n".encode("utf-8")) + connection.send(b"Return_Data_Over_JE\n") + + finally: + connection.close() + +def start_load_density_socket_server(host: str = "localhost", port: int = 9940) -> TCPServer: + """ + 啟動 LoadDensity TCP 伺服器 + Start LoadDensity TCP server -def start_load_density_socket_server(host: str = "localhost", port: int = 9940): + :param host: 主機位址 (Host) + :param port: 埠號 (Port) + :return: TCPServer 實例 (TCPServer instance) + """ monkey.patch_all() server = TCPServer() server.socket_server(host, port) - return server + return server \ No newline at end of file diff --git a/je_load_density/utils/test_record/test_record_class.py b/je_load_density/utils/test_record/test_record_class.py index 751cf87..6427873 100644 --- a/je_load_density/utils/test_record/test_record_class.py +++ b/je_load_density/utils/test_record/test_record_class.py @@ -1,15 +1,30 @@ -class TestRecord(object): +from typing import List, Dict + + +class TestRecord: """ - data class to record success and failure test + 測試紀錄類別 + Test record class + + 用來保存成功與失敗的測試紀錄。 + Used to store success and failure test records. """ - def __init__(self): - self.test_record_list = list() - self.error_record_list = list() + def __init__(self) -> None: + # 成功測試紀錄 (Success test records) + self.test_record_list: List[Dict] = [] + # 失敗測試紀錄 (Failure test records) + self.error_record_list: List[Dict] = [] - def clean_record(self): - self.test_record_list = list() - self.error_record_list = list() + def clear_records(self) -> None: + """ + 清除所有測試紀錄 + Clear all test records + """ + self.test_record_list.clear() + self.error_record_list.clear() -test_record_instance = TestRecord() +# 建立全域測試紀錄實例 +# Create global test record instance +test_record_instance = TestRecord() \ No newline at end of file diff --git a/je_load_density/utils/xml/change_xml_structure/change_xml_structure.py b/je_load_density/utils/xml/change_xml_structure/change_xml_structure.py index fcb33ab..4c4a3af 100644 --- a/je_load_density/utils/xml/change_xml_structure/change_xml_structure.py +++ b/je_load_density/utils/xml/change_xml_structure/change_xml_structure.py @@ -1,61 +1,82 @@ from collections import defaultdict from xml.etree import ElementTree +from typing import Union, Dict, Any -def elements_tree_to_dict(elements_tree): +def elements_tree_to_dict(elements_tree: ElementTree.Element) -> Dict[str, Any]: """ - :param elements_tree: full xml string - :return: xml str to dict + 將 XML ElementTree 轉換為字典 + Convert XML ElementTree to dictionary + + :param elements_tree: XML ElementTree 元素 (XML ElementTree element) + :return: 對應的字典結構 (Dictionary representation) """ - elements_dict: dict = {elements_tree.tag: {} if elements_tree.attrib else None} - children: list = list(elements_tree) + elements_dict: Dict[str, Any] = {elements_tree.tag: {} if elements_tree.attrib else None} + children = list(elements_tree) + + # 遞迴處理子節點 (Recursively process children) if children: default_dict = defaultdict(list) for dc in map(elements_tree_to_dict, children): for key, value in dc.items(): default_dict[key].append(value) - elements_dict: dict = { - elements_tree.tag: {key: value[0] if len(value) == 1 else value for key, value in default_dict.items()}} + elements_dict[elements_tree.tag] = { + key: value[0] if len(value) == 1 else value + for key, value in default_dict.items() + } + + # 加入屬性 (Add attributes) if elements_tree.attrib: - elements_dict[elements_tree.tag].update(('@' + key, value) for key, value in elements_tree.attrib.items()) + elements_dict[elements_tree.tag].update( + {f"@{key}": value for key, value in elements_tree.attrib.items()} + ) + + # 加入文字內容 (Add text content) if elements_tree.text: text = elements_tree.text.strip() if children or elements_tree.attrib: if text: - elements_dict[elements_tree.tag]['#text'] = text + elements_dict[elements_tree.tag]["#text"] = text else: elements_dict[elements_tree.tag] = text + return elements_dict -def dict_to_elements_tree(json_dict: dict): +def dict_to_elements_tree(json_dict: Dict[str, Any]) -> str: """ - :param json_dict: json dict - :return: json dict to xml string + 將字典轉換為 XML 字串 + Convert dictionary to XML string + + :param json_dict: JSON 格式字典 (Dictionary in JSON-like format) + :return: XML 字串 (XML string) """ - def _to_elements_tree(json_dict: dict, root): + def _to_elements_tree(json_dict: Any, root: ElementTree.Element) -> None: if isinstance(json_dict, str): root.text = json_dict elif isinstance(json_dict, dict): for key, value in json_dict.items(): - assert isinstance(key, str) - if key.startswith('#'): - assert key == '#text' and isinstance(value, str) + if key.startswith("#"): # 處理文字節點 + if key != "#text" or not isinstance(value, str): + raise TypeError(f"Invalid text node: {key} -> {value}") root.text = value - elif key.startswith('@'): - assert isinstance(value, str) + elif key.startswith("@"): # 處理屬性 + if not isinstance(value, str): + raise TypeError(f"Invalid attribute value: {key} -> {value}") root.set(key[1:], value) - elif isinstance(value, list): - for elements in value: - _to_elements_tree(elements, ElementTree.SubElement(root, key)) - else: + elif isinstance(value, list): # 處理子節點清單 + for element in value: + _to_elements_tree(element, ElementTree.SubElement(root, key)) + else: # 處理單一子節點 _to_elements_tree(value, ElementTree.SubElement(root, key)) else: - raise TypeError('invalid type: ' + str(type(json_dict))) + raise TypeError(f"Invalid type in dict_to_elements_tree: {type(json_dict)}") + + if not isinstance(json_dict, dict) or len(json_dict) != 1: + raise ValueError("Input must be a dictionary with a single root element") - assert isinstance(json_dict, dict) and len(json_dict) == 1 tag, body = next(iter(json_dict.items())) node = ElementTree.Element(tag) _to_elements_tree(body, node) - return str(ElementTree.tostring(node), encoding="utf-8") + return ElementTree.tostring(node, encoding="utf-8").decode("utf-8") \ No newline at end of file diff --git a/je_load_density/utils/xml/xml_file/xml_file.py b/je_load_density/utils/xml/xml_file/xml_file.py index 93400bc..216ace1 100644 --- a/je_load_density/utils/xml/xml_file/xml_file.py +++ b/je_load_density/utils/xml/xml_file/xml_file.py @@ -1,67 +1,96 @@ import xml.dom.minidom from xml.etree import ElementTree +from xml.etree.ElementTree import ParseError +from typing import Optional -from je_load_density.utils.exception.exception_tags import cant_read_xml_error -from je_load_density.utils.exception.exception_tags import xml_type_error -from je_load_density.utils.exception.exceptions import XMLException -from je_load_density.utils.exception.exceptions import XMLTypeException +from je_load_density.utils.exception.exception_tags import cant_read_xml_error, xml_type_error +from je_load_density.utils.exception.exceptions import XMLException, XMLTypeException -def reformat_xml_file(xml_string: str): +def reformat_xml_file(xml_string: str) -> str: + """ + 將 XML 字串重新格式化為漂亮的排版 + Reformat XML string into pretty-printed format + + :param xml_string: 原始 XML 字串 (Raw XML string) + :return: 格式化後的 XML 字串 (Pretty-printed XML string) + """ dom = xml.dom.minidom.parseString(xml_string) - return dom.toprettyxml() + return dom.toprettyxml(indent=" ") + +class XMLParser: + """ + XML 解析器 + XML Parser -class XMLParser(object): + 支援從字串或檔案解析 XML,並能將 XML 寫入檔案。 + Supports parsing XML from string or file, and writing XML to file. + """ - def __init__(self, xml_string: str, xml_type: str = "string"): + def __init__(self, xml_string: str, xml_type: str = "string") -> None: """ - :param xml_string: full xml string - :param xml_type: file or string + 初始化 XMLParser + Initialize XMLParser + + :param xml_string: XML 字串或檔案路徑 (XML string or file path) + :param xml_type: "string" 或 "file" (Parse from string or file) """ - self.element_tree = ElementTree - self.tree = None - self.xml_root = None - self.xml_from_type = "string" - self.xml_string = xml_string.strip() + self.tree: Optional[ElementTree.ElementTree] = None + self.xml_root: Optional[ElementTree.Element] = None + self.xml_from_type: str = "string" + self.xml_string: str = xml_string.strip() + xml_type = xml_type.lower() if xml_type not in ["file", "string"]: raise XMLTypeException(xml_type_error) + if xml_type == "string": self.xml_parser_from_string() else: self.xml_parser_from_file() - def xml_parser_from_string(self, **kwargs): + def xml_parser_from_string(self, **kwargs) -> ElementTree.Element: """ - :param kwargs: any another param - :return: xml root element tree + 從字串解析 XML + Parse XML from string + + :param kwargs: 額外參數 (extra parameters) + :return: XML 根節點 (XML root element) """ try: self.xml_root = ElementTree.fromstring(self.xml_string, **kwargs) - except XMLException: - raise XMLException(cant_read_xml_error) + except ParseError as error: + raise XMLException(f"{cant_read_xml_error}: {error}") return self.xml_root - def xml_parser_from_file(self, **kwargs): + def xml_parser_from_file(self, **kwargs) -> ElementTree.Element: """ - :param kwargs: any another param - :return: xml root element tree + 從檔案解析 XML + Parse XML from file + + :param kwargs: 額外參數 (extra parameters) + :return: XML 根節點 (XML root element) """ try: self.tree = ElementTree.parse(self.xml_string, **kwargs) - except XMLException: - raise XMLException(cant_read_xml_error) + except (ParseError, OSError) as error: + raise XMLException(f"{cant_read_xml_error}: {error}") self.xml_root = self.tree.getroot() self.xml_from_type = "file" return self.xml_root - def write_xml(self, write_xml_filename: str, write_content: str): + def write_xml(self, write_xml_filename: str, write_content: str) -> None: """ - :param write_xml_filename: xml file name - :param write_content: content to write + 將 XML 字串寫入檔案 + Write XML string into file + + :param write_xml_filename: 輸出檔案名稱 (Output file name) + :param write_content: XML 字串內容 (XML string content) """ - write_content = write_content.strip() - content = self.element_tree.fromstring(write_content) - tree = self.element_tree.ElementTree(content) - tree.write(write_xml_filename, encoding="utf-8") + try: + content = ElementTree.fromstring(write_content.strip()) + tree = ElementTree.ElementTree(content) + tree.write(write_xml_filename, encoding="utf-8", xml_declaration=True) + except ParseError as error: + raise XMLException(f"{cant_read_xml_error}: {error}") \ No newline at end of file diff --git a/je_load_density/wrapper/create_locust_env/create_locust_env.py b/je_load_density/wrapper/create_locust_env/create_locust_env.py index 601f580..abe59a2 100644 --- a/je_load_density/wrapper/create_locust_env/create_locust_env.py +++ b/je_load_density/wrapper/create_locust_env/create_locust_env.py @@ -1,3 +1,5 @@ +from typing import List + import gevent from locust import User from locust import events @@ -10,7 +12,7 @@ setup_logging("INFO", None) -def prepare_env(user_class: [User], user_count: int = 50, spawn_rate: int = 10, test_time: int = 60, +def prepare_env(user_class: List[User], user_count: int = 50, spawn_rate: int = 10, test_time: int = 60, web_ui_dict: dict = None, **kwargs): """ @@ -37,7 +39,7 @@ def prepare_env(user_class: [User], user_count: int = 50, spawn_rate: int = 10, env.web_ui.stop() -def create_env(user_class: [User], another_event: events = events): +def create_env(user_class: List[User], another_event: events = events): """ :param another_event: you can use your locust event setting but don't change locust request event :param user_class: locust user class diff --git a/je_load_density/wrapper/event/request_hook.py b/je_load_density/wrapper/event/request_hook.py index bb0c48b..3dab409 100644 --- a/je_load_density/wrapper/event/request_hook.py +++ b/je_load_density/wrapper/event/request_hook.py @@ -4,37 +4,45 @@ @events.request.add_listener def request_hook( - start_time, - url, - request_type, - name, - context, - response, - exception, - response_length, - response_time, - **kwargs + start_time, + url, + request_type, + name, + context, + response, + exception, + response_length, + response_time, + **kwargs ): + """ + Locust request hook + 將每個 request 的結果紀錄到 test_record_instance + """ + if exception is None: + # 成功紀錄 (Success record) test_record_instance.test_record_list.append( { "Method": str(request_type), "test_url": str(url), "name": str(name), + "status_code": str(response.status_code), "text": str(response.text), "content": str(response.content), "headers": str(response.headers), - "status_code": str(response.status_code), - "error": str(exception) + "error": None, # 成功時 error 為 None } ) else: + # 失敗紀錄 (Failure record) test_record_instance.error_record_list.append( { "Method": str(request_type), "test_url": str(url), "name": str(name), - "status_code": str(response.status_code), - "text": str(response.text) + "status_code": str(response.status_code) if response else None, + "text": str(response.text) if response else None, + "error": str(exception), # 失敗時紀錄 exception } - ) + ) \ No newline at end of file diff --git a/je_load_density/wrapper/proxy/proxy_user.py b/je_load_density/wrapper/proxy/proxy_user.py index 40c7a84..6f1e2e3 100644 --- a/je_load_density/wrapper/proxy/proxy_user.py +++ b/je_load_density/wrapper/proxy/proxy_user.py @@ -1,17 +1,45 @@ +from typing import Dict, Any from je_load_density.wrapper.proxy.user.fast_http_user_proxy import ProxyFastHTTPUser from je_load_density.wrapper.proxy.user.http_user_proxy import ProxyHTTPUser -class LocustUserProxy(object): +class LocustUserProxy: + """ + Locust 使用者代理容器 + Locust User Proxy Container - def __init__(self): - self.user_dict = dict() - self.user_dict.update( - { - "fast_http_user": ProxyFastHTTPUser(), - "http_user": ProxyHTTPUser() - } - ) + 用來保存並管理 FastHTTPUser 與 HTTPUser 的代理。 + Used to store and manage FastHTTPUser and HTTPUser proxies. + """ + def __init__(self) -> None: + # 使用者代理字典 (User proxy dictionary) + self.user_dict: Dict[str, Any] = { + "fast_http_user": ProxyFastHTTPUser(), + "http_user": ProxyHTTPUser(), + } -locust_wrapper_proxy = LocustUserProxy() + def get_user(self, user_type: str) -> Any: + """ + 取得指定類型的使用者代理 + Get specified user proxy + + :param user_type: "fast_http_user" 或 "http_user" + :return: 對應的使用者代理 (Corresponding user proxy) + """ + return self.user_dict.get(user_type) + + def set_user(self, user_type: str, user_instance: Any) -> None: + """ + 設定或替換使用者代理 + Set or replace user proxy + + :param user_type: 使用者類型 (User type key) + :param user_instance: 使用者代理實例 (User proxy instance) + """ + self.user_dict[user_type] = user_instance + + +# 建立全域代理容器實例 +# Create global proxy container instance +locust_wrapper_proxy = LocustUserProxy() \ No newline at end of file diff --git a/je_load_density/wrapper/proxy/user/fast_http_user_proxy.py b/je_load_density/wrapper/proxy/user/fast_http_user_proxy.py index ff5ad38..d645a2e 100644 --- a/je_load_density/wrapper/proxy/user/fast_http_user_proxy.py +++ b/je_load_density/wrapper/proxy/user/fast_http_user_proxy.py @@ -1,9 +1,28 @@ -class ProxyFastHTTPUser(object): +from typing import Dict, Any, List, Optional - def __init__(self): - self.user_detail_dict = None - self.tasks = None - def setting(self, user_detail_dict: dict, tasks): +class ProxyFastHTTPUser: + """ + 代理使用者類別 + Proxy Fast HTTP User class + + 用來保存使用者細節與任務設定。 + Used to store user details and tasks configuration. + """ + + def __init__(self) -> None: + # 使用者細節 (User details) + self.user_detail_dict: Optional[Dict[str, Any]] = None + # 任務列表 (Tasks list) + self.tasks: Optional[List[Any]] = None + + def configure(self, user_detail_dict: Dict[str, Any], tasks: List[Any]) -> None: + """ + 設定使用者細節與任務 + Configure user details and tasks + + :param user_detail_dict: 使用者細節字典 (User details dictionary) + :param tasks: 任務列表 (List of tasks) + """ self.user_detail_dict = user_detail_dict - self.tasks = tasks + self.tasks = tasks \ No newline at end of file diff --git a/je_load_density/wrapper/proxy/user/http_user_proxy.py b/je_load_density/wrapper/proxy/user/http_user_proxy.py index fe26fd1..2caadb2 100644 --- a/je_load_density/wrapper/proxy/user/http_user_proxy.py +++ b/je_load_density/wrapper/proxy/user/http_user_proxy.py @@ -1,9 +1,40 @@ -class ProxyHTTPUser(object): +from typing import Dict, Any, Callable, List, Optional - def __init__(self): - self.user_detail_dict = None - self.tasks = None - def setting(self, user_detail_dict: dict, tasks: dict): +class ProxyHTTPUser: + """ + 代理 HTTP 使用者類別 + Proxy HTTP User class + + 用來保存使用者細節與任務設定。 + Used to store user details and tasks configuration. + """ + + def __init__(self) -> None: + # 使用者細節 (User details) + self.user_detail_dict: Optional[Dict[str, Any]] = None + # 任務列表 (Tasks list, 可是函式或其他可呼叫物件) + self.tasks: Optional[List[Callable]] = None + + def configure(self, user_detail_dict: Dict[str, Any], tasks: List[Callable]) -> None: + """ + 設定使用者細節與任務 + Configure user details and tasks + + :param user_detail_dict: 使用者細節字典 (User details dictionary) + :param tasks: 任務列表 (List of tasks, functions or callables) + """ self.user_detail_dict = user_detail_dict self.tasks = tasks + + def run_tasks(self) -> None: + """ + 執行所有任務 + Run all tasks + """ + if not self.tasks: + print("No tasks configured.") + return + for task in self.tasks: + if callable(task): + task(self.user_detail_dict) \ No newline at end of file diff --git a/je_load_density/wrapper/start_wrapper/start_test.py b/je_load_density/wrapper/start_wrapper/start_test.py index c2423b0..2eeb3e6 100644 --- a/je_load_density/wrapper/start_wrapper/start_test.py +++ b/je_load_density/wrapper/start_wrapper/start_test.py @@ -1,3 +1,4 @@ +from typing import Dict, Any, Optional from je_load_density.utils.logging.loggin_instance import load_density_logger from je_load_density.wrapper.create_locust_env.create_locust_env import prepare_env from je_load_density.wrapper.user_template.fast_http_user_template import FastHttpUserWrapper, set_wrapper_fasthttp_user @@ -5,37 +6,63 @@ def start_test( - user_detail_dict: dict, - user_count: int = 50, spawn_rate: int = 10, test_time: int = 60, - web_ui_dict: dict = None, - **kwargs -): + user_detail_dict: Dict[str, Any], + user_count: int = 50, + spawn_rate: int = 10, + test_time: Optional[int] = 60, + web_ui_dict: Optional[Dict[str, Any]] = None, + **kwargs +) -> Dict[str, Any]: """ - :param user_detail_dict: dict use to create user - :param user_count: how many user we want to spawn - :param spawn_rate: one time will spawn how many user - :param test_time: total test run time - :param web_ui_dict: web ui dict include host and port like {"host": "127.0.0.1", "port": 8089} - :param kwargs: to catch unknown param - :return: None + 啟動壓力測試 + Start load test + + :param user_detail_dict: 使用者設定字典 (User detail dictionary) + :param user_count: 使用者數量 (Number of users to spawn) + :param spawn_rate: 每秒生成使用者數量 (Spawn rate per second) + :param test_time: 測試持續時間 (Test duration in seconds) + :param web_ui_dict: Web UI 設定,例如 {"host": "127.0.0.1", "port": 8089} + :param kwargs: 其他參數 (extra parameters) + :return: 測試設定摘要字典 (Summary dictionary of test configuration) """ load_density_logger.info( - f"start_test, user_detail_dict: {user_detail_dict}, user_count: {user_count}, " - f"spawn_rate: {spawn_rate}, test_time: {test_time}, web_ui_dict: {web_ui_dict}, " - f"params: {kwargs}" + f"start_test, user_detail_dict={user_detail_dict}, user_count={user_count}, " + f"spawn_rate={spawn_rate}, test_time={test_time}, web_ui_dict={web_ui_dict}, params={kwargs}" ) + + # 使用者類型映射 (User type mapping) user_dict = { "fast_http_user": {"actually_user": FastHttpUserWrapper, "init": set_wrapper_fasthttp_user}, - "http_user": {"actually_user": HttpUserWrapper, "init": set_wrapper_http_user} + "http_user": {"actually_user": HttpUserWrapper, "init": set_wrapper_http_user}, } - user = user_dict.get(user_detail_dict.get("user", "fast_http_user")) - actually_user = user.get("actually_user", "actually_user") - init_function = user.get("init", "init") + + user_type = user_detail_dict.get("user", "fast_http_user") + user = user_dict.get(user_type) + + if user is None: + raise ValueError(f"Unsupported user type: {user_type}") + + actually_user = user["actually_user"] + init_function = user["init"] + + # 初始化使用者設定 (Initialize user configuration) init_function(user_detail_dict, **kwargs) + # 建立並執行測試環境 (Create and run test environment) prepare_env( - user_class=actually_user, user_count=user_count, spawn_rate=spawn_rate, test_time=test_time, - web_ui_dict=web_ui_dict, **kwargs + user_class=actually_user, + user_count=user_count, + spawn_rate=spawn_rate, + test_time=test_time, + web_ui_dict=web_ui_dict, + **kwargs ) - return str(user_detail_dict) + " " + "user_count: " + str(user_count) + " spawn_rate: " + str( - spawn_rate) + " test_time: " + str(test_time) + + # 回傳結構化結果 (Return structured result) + return { + "user_detail": user_detail_dict, + "user_count": user_count, + "spawn_rate": spawn_rate, + "test_time": test_time, + "web_ui": web_ui_dict, + } \ No newline at end of file diff --git a/je_load_density/wrapper/user_template/fast_http_user_template.py b/je_load_density/wrapper/user_template/fast_http_user_template.py index eab3d3d..9e90614 100644 --- a/je_load_density/wrapper/user_template/fast_http_user_template.py +++ b/je_load_density/wrapper/user_template/fast_http_user_template.py @@ -1,24 +1,30 @@ -from locust import FastHttpUser, between -from locust import task - +from typing import Dict, Any +from locust import FastHttpUser, between, task from je_load_density.wrapper.proxy.proxy_user import locust_wrapper_proxy -def set_wrapper_fasthttp_user(user_detail_dict, **kwargs): - locust_wrapper_proxy.user_dict.get("fast_http_user").setting(user_detail_dict, **kwargs) +def set_wrapper_fasthttp_user(user_detail_dict: Dict[str, Any], **kwargs) -> type: + """ + 設定 FastHttpUser 的代理使用者 + Configure FastHttpUser proxy user + """ + locust_wrapper_proxy.user_dict.get("fast_http_user").configure(user_detail_dict, **kwargs) return FastHttpUserWrapper class FastHttpUserWrapper(FastHttpUser): """ - locust fast http user use to test + Locust FastHttpUser 包裝類別 + Locust FastHttpUser wrapper class """ + host = "http://localhost" wait_time = between(0.1, 0.2) def __init__(self, environment): super().__init__(environment) - self.method = { + # HTTP 方法映射 (HTTP method mapping) + self.method: Dict[str, Any] = { "get": self.client.get, "post": self.client.post, "put": self.client.put, @@ -29,6 +35,18 @@ def __init__(self, environment): } @task - def test(self): - for test_task_method, test_task_data in locust_wrapper_proxy.user_dict.get("fast_http_user").tasks.items(): - self.method.get(str(test_task_method).lower())(test_task_data.get("request_url")) + def test(self) -> None: + """ + 執行測試任務 + Execute test tasks + """ + proxy_user = locust_wrapper_proxy.user_dict.get("fast_http_user") + if not proxy_user or not proxy_user.tasks: + return + + for test_task_method, test_task_data in proxy_user.tasks.items(): + http_method = self.method.get(str(test_task_method).lower()) + if http_method and isinstance(test_task_data, dict): + request_url = test_task_data.get("request_url") + if request_url: + http_method(request_url) \ No newline at end of file diff --git a/je_load_density/wrapper/user_template/http_user_template.py b/je_load_density/wrapper/user_template/http_user_template.py index 9fec1ac..0e64999 100644 --- a/je_load_density/wrapper/user_template/http_user_template.py +++ b/je_load_density/wrapper/user_template/http_user_template.py @@ -1,23 +1,32 @@ +from typing import Dict, Any + from locust import HttpUser, task, between from je_load_density.wrapper.proxy.proxy_user import locust_wrapper_proxy -def set_wrapper_http_user(user_detail_dict: dict, **kwargs): - locust_wrapper_proxy.user_dict.get("http_user").setting(user_detail_dict, **kwargs) +def set_wrapper_http_user(user_detail_dict: Dict[str, Any], **kwargs) -> type: + """ + 設定 HttpUser 的代理使用者 + Configure HttpUser proxy user + """ + locust_wrapper_proxy.user_dict.get("http_user").configure(user_detail_dict, **kwargs) return HttpUserWrapper class HttpUserWrapper(HttpUser): """ - locust http user use to test + Locust HttpUser 包裝類別 + Locust HttpUser wrapper class """ + host = "http://localhost" wait_time = between(0.1, 0.2) def __init__(self, environment): super().__init__(environment) - self.method = { + # HTTP 方法映射 (HTTP method mapping) + self.method: Dict[str, Any] = { "get": self.client.get, "post": self.client.post, "put": self.client.put, @@ -28,8 +37,18 @@ def __init__(self, environment): } @task - def test(self): - for test_task_method, test_task_data in locust_wrapper_proxy.user_dict.get("http_user").tasks.items(): - self.method.get(str(test_task_method).lower())(test_task_data.get("request_url")) - - + def test(self) -> None: + """ + 執行測試任務 + Execute test tasks + """ + proxy_user = locust_wrapper_proxy.user_dict.get("http_user") + if not proxy_user or not proxy_user.tasks: + return + + for test_task_method, test_task_data in proxy_user.tasks.items(): + http_method = self.method.get(str(test_task_method).lower()) + if http_method and isinstance(test_task_data, dict): + request_url = test_task_data.get("request_url") + if request_url: + http_method(request_url) diff --git a/pyproject.toml b/pyproject.toml index 9053aaa..04f5338 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" [project] name = "je_load_density" -version = "0.0.63" +version = "0.0.64" authors = [ { name = "JE-Chen", email = "jechenmailman@gmail.com" }, ] diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/unit_test/scheduler_test/sec_cron_test.py b/test/unit_test/scheduler_test/sec_cron_test.py deleted file mode 100644 index cf5ecd5..0000000 --- a/test/unit_test/scheduler_test/sec_cron_test.py +++ /dev/null @@ -1,12 +0,0 @@ -from je_load_density import SchedulerManager - - -def test_scheduler(): - print("Test Scheduler") - scheduler.remove_blocking_job(id="test") - scheduler.shutdown_blocking_scheduler() - - -scheduler = SchedulerManager() -scheduler.add_cron_blocking(function=test_scheduler, id="test", second="*") -scheduler.start_block_scheduler() diff --git a/test/unit_test/scheduler_test/sec_interval_test.py b/test/unit_test/scheduler_test/sec_interval_test.py deleted file mode 100644 index c4c1cd9..0000000 --- a/test/unit_test/scheduler_test/sec_interval_test.py +++ /dev/null @@ -1,12 +0,0 @@ -from je_load_density import SchedulerManager - - -def test_scheduler(): - print("Test Scheduler") - scheduler.remove_blocking_job(id="test") - scheduler.shutdown_blocking_scheduler() - - -scheduler = SchedulerManager() -scheduler.add_interval_blocking_secondly(function=test_scheduler, id="test") -scheduler.start_block_scheduler()