From 98d44117d0df593517c5e64f706bf2fc42c01549 Mon Sep 17 00:00:00 2001 From: JE-Chen <33644111+JE-Chen@users.noreply.github.com> Date: Sun, 13 Jul 2025 15:59:25 +0800 Subject: [PATCH 1/4] Update Pipeline name Update Pipeline name --- .github/workflows/file_automation_dev_python3_12.yml | 2 +- .github/workflows/file_automation_stable_python3_12.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/file_automation_dev_python3_12.yml b/.github/workflows/file_automation_dev_python3_12.yml index 12fde69..b7ca17e 100644 --- a/.github/workflows/file_automation_dev_python3_12.yml +++ b/.github/workflows/file_automation_dev_python3_12.yml @@ -1,4 +1,4 @@ -name: FileAutomation Dev Python3.9 +name: FileAutomation Dev Python3.12 on: push: diff --git a/.github/workflows/file_automation_stable_python3_12.yml b/.github/workflows/file_automation_stable_python3_12.yml index 8227e50..9c1a4cd 100644 --- a/.github/workflows/file_automation_stable_python3_12.yml +++ b/.github/workflows/file_automation_stable_python3_12.yml @@ -1,4 +1,4 @@ -name: FileAutomation Stable Python3.9 +name: FileAutomation Stable Python3.12 on: push: From 0b32f8a002c1c9eef26088e29574cd55e14709fe Mon Sep 17 00:00:00 2001 From: JE-Chen <33644111+JE-Chen@users.noreply.github.com> Date: Sat, 19 Jul 2025 17:01:02 +0800 Subject: [PATCH 2/4] Update both version Update exception tags --- .../utils/exception/exception_tags.py | 27 +++++++++++-------- stable.toml => dev.toml | 6 ++--- pyproject.toml | 6 ++--- 3 files changed, 22 insertions(+), 17 deletions(-) rename stable.toml => dev.toml (92%) diff --git a/automation_file/utils/exception/exception_tags.py b/automation_file/utils/exception/exception_tags.py index ed28c1e..769d7b4 100644 --- a/automation_file/utils/exception/exception_tags.py +++ b/automation_file/utils/exception/exception_tags.py @@ -1,16 +1,21 @@ -token_is_exist: str = "token file is exists" +token_is_exist: str = "token file already exists" + # Callback executor -get_bad_trigger_method: str = "get bad trigger method, only accept kwargs and args" -get_bad_trigger_function: str = "get bad trigger function only accept function in event_dict" +get_bad_trigger_method: str = "invalid trigger method: only kwargs and args accepted" +get_bad_trigger_function: str = "invalid trigger function: only functions in event_dict accepted" + # add command -add_command_exception: str = "command value type should be as method or function" +add_command_exception: str = "command value type must be a method or function" + # executor -executor_list_error: str = "executor receive wrong data list is none or wrong type" +executor_list_error: str = "executor received invalid data: list is empty or of wrong type" + # json tag -cant_execute_action_error: str = "cant execute action" -cant_generate_json_report: str = "can't generate json report" -cant_find_json_error: str = "cant find json file" -cant_save_json_error: str = "cant save json file" -action_is_null_error: str = "json action is null" +cant_execute_action_error: str = "can't execute action" +cant_generate_json_report: str = "can't generate JSON report" +cant_find_json_error: str = "can't find JSON file" +cant_save_json_error: str = "can't save JSON file" +action_is_null_error: str = "JSON action is null" + # argparse -argparse_get_wrong_data: str = "argparse receive wrong data" +argparse_get_wrong_data: str = "argparse received invalid data" \ No newline at end of file diff --git a/stable.toml b/dev.toml similarity index 92% rename from stable.toml rename to dev.toml index 1bee462..e66ed89 100644 --- a/stable.toml +++ b/dev.toml @@ -5,15 +5,15 @@ requires = ["setuptools>=61.0"] build-backend = "setuptools.build_meta" [project] -name = "automation_file" -version = "0.0.27" +name = "automation_file_dev" +version = "0.0.30" authors = [ { name = "JE-Chen", email = "zenmailman@gmail.com" }, ] description = "" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.10" -license-files = ["LICENSE"] +license = { text = "MIT" } dependencies = [ "google-api-python-client", "google-auth-httplib2", diff --git a/pyproject.toml b/pyproject.toml index 7b2a596..0c1da04 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,15 +5,15 @@ requires = ["setuptools>=61.0"] build-backend = "setuptools.build_meta" [project] -name = "automation_file_dev" -version = "0.0.29" +name = "automation_file" +version = "0.0.28" authors = [ { name = "JE-Chen", email = "zenmailman@gmail.com" }, ] description = "" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.10" -license = { text = "MIT" } +license-files = ["LICENSE"] dependencies = [ "google-api-python-client", "google-auth-httplib2", From 25d75a4195542d9f5f5dc9b44615b9fceeb8ca68 Mon Sep 17 00:00:00 2001 From: JE-Chen Date: Wed, 5 Nov 2025 01:24:57 +0800 Subject: [PATCH 3/4] Update dev version * Add zh-tw & en comments * Fix some grammar error --- README.md | 103 ++++++++++--- automation_file/local/dir/dir_process.py | 65 ++++++-- automation_file/local/file/file_process.py | 94 ++++++++---- automation_file/local/zip/zip_process.py | 142 ++++++++++-------- automation_file/remote/download/file.py | 44 ++++-- .../google_drive/delete/delete_manager.py | 23 ++- .../remote/google_drive/dir/folder_manager.py | 33 +++- .../google_drive/download/download_file.py | 57 +++++-- .../remote/google_drive/driver_instance.py | 45 ++++-- .../google_drive/search/search_drive.py | 69 ++++++--- .../remote/google_drive/share/share_file.py | 70 +++++---- .../google_drive/upload/upload_to_driver.py | 97 +++++++----- .../callback/callback_function_executor.py | 93 +++++++++--- .../utils/executor/action_executor.py | 99 ++++++++---- .../utils/file_process/get_dir_file_list.py | 26 ++-- automation_file/utils/json/json_file.py | 54 ++++--- .../utils/logging/loggin_instance.py | 32 +++- .../package_manager/package_manager_class.py | 88 ++++++----- .../utils/project/create_project_structure.py | 87 +++++++---- .../file_automation_socket_server.py | 43 +++++- pyproject.toml | 6 +- dev.toml => stable.toml | 6 +- 22 files changed, 942 insertions(+), 434 deletions(-) rename dev.toml => stable.toml (92%) diff --git a/README.md b/README.md index d058e9f..b56e0a2 100644 --- a/README.md +++ b/README.md @@ -1,31 +1,90 @@ -### FileAutomation +# FileAutomation -### Documentation +This project provides a modular framework for file automation and Google Drive integration. +It supports local file and directory operations, ZIP archive handling, +Google Drive CRUD (create, search, upload, download, delete, share), +and remote execution through a TCP Socket Server. -* TODO +# Features +## Local File and Directory Operations +- Create, delete, copy, and rename files +- Create, delete, and copy directories +- Recursively search for files by extension ---- -> Project Kanban \ -> https://github.com/orgs/Integration-Automation/projects/2/views/1 -> * FileAutomation is used to manager files and dirs. -> * Easily file automation. -> * Automatically backup. -> * Automatically download from Google Drive. -> * Automatically upload to Google Drive. -> * Automatically zip and unzip file. -> * Automatically manager specify file. -> * OS Independent. -> * Remote automation support. -> * Project & Template support. -> * Detailed log file. -> * Scheduler. +## ZIP Archive Handling +- Create ZIP archives +- Extract single files or entire archives +- Set ZIP archive passwords +- Read archive information + +## Google Drive Integration +- Upload: single files, entire directories, to root or specific folders +- Download: single files or entire folders +- Search: by name, MIME type, or custom fields +- Delete: remove files from Drive +- Share: with specific users, domains, or via public link +- Folder Management: create new folders in Drive + +## Automation Executors +- Executor: central manager for all executable functions, supports action lists +- CallbackExecutor: supports callback functions for flexible workflows +- PackageManager: dynamically loads packages and registers functions into executors + +# JSON Configuration +- Read and write JSON-based action lists +- Define automation workflows in JSON format + +# TCP Socket Server +- Start a TCP server to receive JSON commands and execute corresponding actions +- Supports remote control and returns execution results + +## Installation and Requirements + +- Requirements + - Python 3.9+ + - Google API Client + - Google Drive API enabled and credentials.json downloaded ## install > pip install automation_file -## Requires -> python 3.9 or later +# Usage + +1. Initialize Google Drive +```python +from automation_file.remote.google_drive.driver_instance import driver_instance + +driver_instance.later_init("token.json", "credentials.json") +``` + +2. Upload a File +```python +from automation_file.remote.google_drive.upload.upload_to_driver import drive_upload_to_drive + +drive_upload_to_drive("example.txt") +``` + +3. Search Files +```python +from automation_file.remote.google_drive.search.search_drive import drive_search_all_file + +files = drive_search_all_file() +print(files) +``` + +4. Start TCP Server +```python +from automation_file.utils.socket_server.file_automation_socket_server import start_autocontrol_socket_server + +server = start_autocontrol_socket_server("localhost", 9943) +``` -### Architecture Diagram -![architecture_diagram](architecture_diagram/FileAutomation.drawio.png) +# Example JSON Action +```json +[ + ["FA_create_file", {"file_path": "test.txt"}], + ["FA_drive_upload_to_drive", {"file_path": "test.txt"}], + ["FA_drive_search_all_file"] +] +``` \ No newline at end of file diff --git a/automation_file/local/dir/dir_process.py b/automation_file/local/dir/dir_process.py index c65269c..dcd088c 100644 --- a/automation_file/local/dir/dir_process.py +++ b/automation_file/local/dir/dir_process.py @@ -1,78 +1,113 @@ import shutil from pathlib import Path +# 匯入自訂例外與日誌工具 +# Import custom exception and logging utility from automation_file.utils.exception.exceptions import DirNotExistsException from automation_file.utils.logging.loggin_instance import file_automation_logger def copy_dir(dir_path: str, target_dir_path: str) -> bool: """ - Copy dir to target path (path need as dir path) - :param dir_path: which dir do we want to copy (str path) - :param target_dir_path: copy dir to this path - :return: True if success else False + 複製資料夾到目標路徑 + Copy directory to target path + :param dir_path: 要複製的資料夾路徑 (str) + Directory path to copy (str) + :param target_dir_path: 複製到的目標資料夾路徑 (str) + Target directory path (str) + :return: 成功回傳 True,失敗回傳 False + Return True if success, else False """ - dir_path = Path(dir_path) + dir_path = Path(dir_path) # 轉換為 Path 物件 / Convert to Path object target_dir_path = Path(target_dir_path) - if dir_path.is_dir(): + if dir_path.is_dir(): # 確認來源是否為資料夾 / Check if source is a directory try: + # 複製整個資料夾,若目標已存在則允許覆蓋 + # Copy entire directory, allow overwrite if target exists shutil.copytree(dir_path, target_dir_path, dirs_exist_ok=True) file_automation_logger.info(f"Copy dir {dir_path}") return True except shutil.Error as error: + # 複製失敗時記錄錯誤 + # Log error if copy fails file_automation_logger.error(f"Copy dir {dir_path} failed: {repr(error)}") else: + # 若來源資料夾不存在,記錄錯誤 + # Log error if source directory does not exist file_automation_logger.error(f"Copy dir {dir_path} failed: {repr(DirNotExistsException)}") return False + return False def remove_dir_tree(dir_path: str) -> bool: """ - :param dir_path: which dir do we want to remove (str path) - :return: True if success else False + 刪除整個資料夾樹 + Remove entire directory tree + :param dir_path: 要刪除的資料夾路徑 (str) + Directory path to remove (str) + :return: 成功回傳 True,失敗回傳 False + Return True if success, else False """ dir_path = Path(dir_path) - if dir_path.is_dir(): + if dir_path.is_dir(): # 確認是否為資料夾 / Check if directory exists try: - shutil.rmtree(dir_path) + shutil.rmtree(dir_path) # 遞迴刪除資料夾 / Recursively delete directory file_automation_logger.info(f"Remove dir tree {dir_path}") return True except shutil.Error as error: file_automation_logger.error(f"Remove dir tree {dir_path} error: {repr(error)}") return False + return False def rename_dir(origin_dir_path, target_dir: str) -> bool: """ - :param origin_dir_path: which dir do we want to rename (str path) - :param target_dir: target name as str full path - :return: True if success else False + 重新命名資料夾 + Rename directory + :param origin_dir_path: 原始資料夾路徑 (str) + Original directory path (str) + :param target_dir: 新的完整路徑 (str) + Target directory path (str) + :return: 成功回傳 True,失敗回傳 False + Return True if success, else False """ origin_dir_path = Path(origin_dir_path) if origin_dir_path.exists() and origin_dir_path.is_dir(): try: + # 使用 Path.rename 重新命名資料夾 + # Rename directory using Path.rename Path.rename(origin_dir_path, target_dir) file_automation_logger.info( f"Rename dir origin dir path: {origin_dir_path}, target dir path: {target_dir}") return True except Exception as error: + # 捕捉所有例外並記錄 + # Catch all exceptions and log file_automation_logger.error( f"Rename dir error: {repr(error)}, " f"Rename dir origin dir path: {origin_dir_path}, " f"target dir path: {target_dir}") else: + # 若來源資料夾不存在,記錄錯誤 + # Log error if source directory does not exist file_automation_logger.error( f"Rename dir error: {repr(DirNotExistsException)}, " f"Rename dir origin dir path: {origin_dir_path}, " f"target dir path: {target_dir}") return False + return False def create_dir(dir_path: str) -> None: """ - :param dir_path: create dir on dir_path + 建立資料夾 + Create directory + :param dir_path: 要建立的資料夾路徑 (str) + Directory path to create (str) :return: None """ dir_path = Path(dir_path) + # 若資料夾已存在則不會報錯 + # Create directory, no error if already exists dir_path.mkdir(exist_ok=True) - file_automation_logger.info(f"Create dir {dir_path}") + file_automation_logger.info(f"Create dir {dir_path}") \ No newline at end of file diff --git a/automation_file/local/file/file_process.py b/automation_file/local/file/file_process.py index c6a1081..21c1257 100644 --- a/automation_file/local/file/file_process.py +++ b/automation_file/local/file/file_process.py @@ -2,24 +2,32 @@ import sys from pathlib import Path +# 匯入自訂例外與日誌工具 +# Import custom exceptions and logging utility from automation_file.utils.exception.exceptions import FileNotExistsException, DirNotExistsException from automation_file.utils.logging.loggin_instance import file_automation_logger def copy_file(file_path: str, target_path: str, copy_metadata: bool = True) -> bool: """ - :param file_path: which file do we want to copy (str path) - :param target_path: put copy file on target path - :param copy_metadata: copy file metadata or not - :return: True if success else False + 複製單一檔案 + Copy a single file + :param file_path: 要複製的檔案路徑 (str) + File path to copy (str) + :param target_path: 複製到的目標路徑 (str) + Target path (str) + :param copy_metadata: 是否複製檔案的中繼資料 (預設 True) + Whether to copy file metadata (default True) + :return: 成功回傳 True,失敗回傳 False + Return True if success, else False """ file_path = Path(file_path) if file_path.is_file() and file_path.exists(): try: if copy_metadata: - shutil.copy2(file_path, target_path) + shutil.copy2(file_path, target_path) # 複製檔案與中繼資料 / Copy file with metadata else: - shutil.copy(file_path, target_path) + shutil.copy(file_path, target_path) # 只複製檔案內容 / Copy file only file_automation_logger.info(f"Copy file origin path: {file_path}, target path : {target_path}") return True except shutil.Error as error: @@ -27,20 +35,28 @@ def copy_file(file_path: str, target_path: str, copy_metadata: bool = True) -> b else: file_automation_logger.error(f"Copy file failed: {repr(FileNotExistsException)}") return False + return False def copy_specify_extension_file( file_dir_path: str, target_extension: str, target_path: str, copy_metadata: bool = True) -> bool: """ - :param file_dir_path: which dir do we want to search - :param target_extension: what extension we will search - :param target_path: copy file to target path - :param copy_metadata: copy file metadata or not - :return: True if success else False + 複製指定副檔名的檔案 + Copy files with a specific extension + :param file_dir_path: 要搜尋的資料夾路徑 (str) + Directory path to search (str) + :param target_extension: 要搜尋的副檔名 (str) + File extension to search (str) + :param target_path: 複製到的目標路徑 (str) + Target path (str) + :param copy_metadata: 是否複製檔案中繼資料 (bool) + Whether to copy metadata (bool) + :return: 成功回傳 True,失敗回傳 False + Return True if success, else False """ file_dir_path = Path(file_dir_path) if file_dir_path.exists() and file_dir_path.is_dir(): - for file in file_dir_path.glob(f"**/*.{target_extension}"): + for file in file_dir_path.glob(f"**/*.{target_extension}"): # 遞迴搜尋指定副檔名 / Recursively search files copy_file(str(file), target_path, copy_metadata=copy_metadata) file_automation_logger.info( f"Copy specify extension file on dir" @@ -55,15 +71,20 @@ def copy_specify_extension_file( def copy_all_file_to_dir(dir_path: str, target_dir_path: str) -> bool: """ - :param dir_path: copy all file on dir - :param target_dir_path: put file to target dir - :return: True if success else False + 將整個資料夾移動到目標資料夾 + Move entire directory into target directory + :param dir_path: 要移動的資料夾路徑 (str) + Directory path to move (str) + :param target_dir_path: 目標資料夾路徑 (str) + Target directory path (str) + :return: 成功回傳 True,失敗回傳 False + Return True if success, else False """ dir_path = Path(dir_path) target_dir_path = Path(target_dir_path) if dir_path.is_dir() and target_dir_path.is_dir(): try: - shutil.move(str(dir_path), str(target_dir_path)) + shutil.move(str(dir_path), str(target_dir_path)) # 移動資料夾 / Move directory file_automation_logger.info( f"Copy all file to dir, " f"origin dir: {dir_path}, " @@ -80,25 +101,32 @@ def copy_all_file_to_dir(dir_path: str, target_dir_path: str) -> bool: else: print(repr(DirNotExistsException), file=sys.stderr) return False + return False def rename_file(origin_file_path, target_name: str, file_extension=None) -> bool: """ - :param origin_file_path: which dir do we want to search file - :param target_name: rename file to target name - :param file_extension: Which extension do we search - :return: True if success else False + 重新命名資料夾內的檔案 + Rename files inside a directory + :param origin_file_path: 要搜尋檔案的資料夾路徑 (str) + Directory path to search (str) + :param target_name: 新的檔案名稱 (str) + New file name (str) + :param file_extension: 指定副檔名 (可選) (str) + File extension filter (optional) (str) + :return: 成功回傳 True,失敗回傳 False + Return True if success, else False """ origin_file_path = Path(origin_file_path) if origin_file_path.exists() and origin_file_path.is_dir(): if file_extension is None: - file_list = list(origin_file_path.glob("**/*")) + file_list = list(origin_file_path.glob("**/*")) # 全部檔案 / All files else: - file_list = list(origin_file_path.glob(f"**/*.{file_extension}")) + file_list = list(origin_file_path.glob(f"**/*.{file_extension}")) # 指定副檔名 / Specific extension try: file_index = 0 for file in file_list: - file.rename(Path(origin_file_path, target_name)) + file.rename(Path(origin_file_path, target_name)) # 重新命名檔案 / Rename file file_index = file_index + 1 file_automation_logger.info( f"Renamed file: origin file path:{origin_file_path}, with new name: {target_name}") @@ -115,24 +143,32 @@ def rename_file(origin_file_path, target_name: str, file_extension=None) -> bool file_automation_logger.error( f"Rename file failed, error: {repr(DirNotExistsException)}") return False + return False def remove_file(file_path: str) -> None: """ - :param file_path: which file do we want to remove + 刪除檔案 + Remove a file + :param file_path: 要刪除的檔案路徑 (str) + File path to remove (str) :return: None """ file_path = Path(file_path) if file_path.exists() and file_path.is_file(): - file_path.unlink(missing_ok=True) + file_path.unlink(missing_ok=True) # 刪除檔案,若不存在則忽略 / Delete file, ignore if missing file_automation_logger.info(f"Remove file, file path: {file_path}") def create_file(file_path: str, content: str) -> None: """ - :param file_path: create file on path - :param content: what content will write to file + 建立檔案並寫入內容 + Create a file and write content + :param file_path: 檔案路徑 (str) + File path (str) + :param content: 要寫入的內容 (str) + Content to write (str) :return: None """ - with open(file_path, "w+") as file: - file.write(content) + with open(file_path, "w+") as file: # "w+" 表示寫入模式,若不存在則建立 / "w+" means write mode, create if not exists + file.write(content) \ No newline at end of file diff --git a/automation_file/local/zip/zip_process.py b/automation_file/local/zip/zip_process.py index bd4e970..0510ea1 100644 --- a/automation_file/local/zip/zip_process.py +++ b/automation_file/local/zip/zip_process.py @@ -1,147 +1,163 @@ import zipfile from pathlib import Path from shutil import make_archive -from typing import List +from typing import List, Dict, Union from zipfile import ZipInfo +# 匯入自訂例外與日誌工具 +# Import custom exception and logging utility from automation_file.utils.exception.exceptions import ZIPGetWrongFileException from automation_file.utils.logging.loggin_instance import file_automation_logger def zip_dir(dir_we_want_to_zip: str, zip_name: str) -> None: """ - :param dir_we_want_to_zip: dir str path - :param zip_name: zip file name + 壓縮整個資料夾成 zip 檔 + Zip an entire directory + :param dir_we_want_to_zip: 要壓縮的資料夾路徑 (str) + Directory path to zip (str) + :param zip_name: 壓縮檔名稱 (str) + Zip file name (str) :return: None """ make_archive(root_dir=dir_we_want_to_zip, base_name=zip_name, format="zip") file_automation_logger.info(f"Dir to zip: {dir_we_want_to_zip}, zip file name: {zip_name}") -def zip_file(zip_file_path: str, file: [str, List[str]]) -> None: +def zip_file(zip_file_path: str, file: Union[str, List[str]]) -> None: """ - :param zip_file_path: add file to zip file - :param file: single file path or list of file path (str) to add into zip + 將單一檔案或多個檔案加入 zip + Add single or multiple files into a zip + :param zip_file_path: zip 檔路徑 (str) + Zip file path (str) + :param file: 檔案路徑或檔案路徑清單 (str 或 List[str]) + File path or list of file paths :return: None """ current_zip = zipfile.ZipFile(zip_file_path, mode="w") if isinstance(file, str): file_name = Path(file) - current_zip.write(file, file_name.name) - file_automation_logger.info( - f"Write file: {file_name} to zip: {current_zip}" - ) + current_zip.write(file, file_name.name) # 寫入單一檔案 / Write single file + file_automation_logger.info(f"Write file: {file_name} to zip: {current_zip}") else: if isinstance(file, list): for writeable in file: file_name = Path(writeable) - current_zip.write(writeable, file_name.name) - file_automation_logger.info( - f"Write file: {writeable} to zip: {current_zip}" - ) + current_zip.write(writeable, file_name.name) # 寫入多個檔案 / Write multiple files + file_automation_logger.info(f"Write file: {writeable} to zip: {current_zip}") else: - file_automation_logger.error( - repr(ZIPGetWrongFileException)) + file_automation_logger.error(repr(ZIPGetWrongFileException)) current_zip.close() -def read_zip_file(zip_file_path: str, file_name: str, password: [str, None] = None) -> bytes: +def read_zip_file(zip_file_path: str, file_name: str, password: Union[str, None] = None) -> bytes: """ - :param zip_file_path: which zip do we want to read - :param file_name: which file on zip do we want to read - :param password: if zip have password use this password to unzip zip file - :return: + 讀取 zip 檔中的指定檔案 + Read a specific file inside a zip + :param zip_file_path: zip 檔路徑 (str) + Zip file path (str) + :param file_name: zip 中的檔案名稱 (str) + File name inside zip (str) + :param password: 若 zip 有密碼,需提供 (str 或 None) + Password if zip is protected + :return: 檔案內容 (bytes) + File content (bytes) """ current_zip = zipfile.ZipFile(zip_file_path, mode="r") with current_zip.open(name=file_name, mode="r", pwd=password, force_zip64=True) as read_file: data = read_file.read() current_zip.close() - file_automation_logger.info( - f"Read zip file: {zip_file_path}" - ) + file_automation_logger.info(f"Read zip file: {zip_file_path}") return data -def unzip_file( - zip_file_path: str, extract_member, extract_path: [str, None] = None, password: [str, None] = None) -> None: +def unzip_file(zip_file_path: str, extract_member, extract_path: Union[str, None] = None, + password: Union[str, None] = None) -> None: """ - :param zip_file_path: which zip we want to unzip - :param extract_member: which member we want to unzip - :param extract_path: extract member to path - :param password: if zip have password use this password to unzip zip file + 解壓縮 zip 中的單一檔案 + Extract a single file from a zip + :param zip_file_path: zip 檔路徑 (str) + Zip file path (str) + :param extract_member: 要解壓縮的檔案名稱 (str) + File name to extract + :param extract_path: 解壓縮到的路徑 (str 或 None) + Path to extract to + :param password: 若 zip 有密碼,需提供 (str 或 None) + Password if zip is protected :return: None """ current_zip = zipfile.ZipFile(zip_file_path, mode="r") current_zip.extract(member=extract_member, path=extract_path, pwd=password) file_automation_logger.info( - f"Unzip file: {zip_file_path}, " - f"extract member: {extract_member}, " - f"extract path: {extract_path}, " - f"password: {password}" + f"Unzip file: {zip_file_path}, extract member: {extract_member}, extract path: {extract_path}, password: {password}" ) current_zip.close() -def unzip_all( - zip_file_path: str, extract_member: [str, None] = None, - extract_path: [str, None] = None, password: [str, None] = None) -> None: +def unzip_all(zip_file_path: str, extract_member: Union[str, None] = None, + extract_path: Union[str, None] = None, password: Union[str, None] = None) -> None: """ - :param zip_file_path: which zip do we want to unzip - :param extract_member: which member do we want to unzip - :param extract_path: extract to path - :param password: if zip have password use this password to unzip zip file + 解壓縮 zip 中的所有檔案 + Extract all files from a zip + :param zip_file_path: zip 檔路徑 (str) + Zip file path (str) + :param extract_member: 指定要解壓縮的檔案 (可選) (str 或 None) + Specific members to extract (optional) + :param extract_path: 解壓縮到的路徑 (str 或 None) + Path to extract to + :param password: 若 zip 有密碼,需提供 (str 或 None) + Password if zip is protected :return: None """ current_zip = zipfile.ZipFile(zip_file_path, mode="r") current_zip.extractall(members=extract_member, path=extract_path, pwd=password) file_automation_logger.info( - f"Unzip file: {zip_file_path}, " - f"extract member: {extract_member}, " - f"extract path: {extract_path}, " - f"password: {password}" + f"Unzip file: {zip_file_path}, extract member: {extract_member}, extract path: {extract_path}, password: {password}" ) current_zip.close() def zip_info(zip_file_path: str) -> List[ZipInfo]: """ - :param zip_file_path: read zip file info + 取得 zip 檔案的詳細資訊 (ZipInfo 物件) + Get detailed info of a zip file (ZipInfo objects) + :param zip_file_path: zip 檔路徑 (str) + Zip file path (str) :return: List[ZipInfo] """ current_zip = zipfile.ZipFile(zip_file_path, mode="r") - info_list = current_zip.infolist() + info_list = current_zip.infolist() # 回傳 ZipInfo 物件清單 / Return list of ZipInfo objects current_zip.close() - file_automation_logger.info( - f"Show zip info: {zip_file_path}" - ) + file_automation_logger.info(f"Show zip info: {zip_file_path}") return info_list def zip_file_info(zip_file_path: str) -> List[str]: """ - :param zip_file_path: read inside zip file info + 取得 zip 檔案內所有檔案名稱 + Get list of file names inside a zip + :param zip_file_path: zip 檔路徑 (str) + Zip file path (str) :return: List[str] """ current_zip = zipfile.ZipFile(zip_file_path, mode="r") - name_list = current_zip.namelist() + name_list = current_zip.namelist() # 回傳檔案名稱清單 / Return list of file names current_zip.close() - file_automation_logger.info( - f"Show zip file info: {zip_file_path}" - ) + file_automation_logger.info(f"Show zip file info: {zip_file_path}") return name_list def set_zip_password(zip_file_path: str, password: bytes) -> None: """ - :param zip_file_path: which zip do we want to set password - :param password: password will be set + 設定 zip 檔案的密碼 (注意:標準 zipfile 僅支援讀取密碼,不支援加密寫入) + Set password for a zip file (Note: standard zipfile only supports reading with password, not writing encrypted zips) + :param zip_file_path: zip 檔路徑 (str) + Zip file path (str) + :param password: 密碼 (bytes) + Password (bytes) :return: None """ current_zip = zipfile.ZipFile(zip_file_path) - current_zip.setpassword(pwd=password) + current_zip.setpassword(pwd=password) # 設定解壓縮時的密碼 / Set password for extraction current_zip.close() - file_automation_logger.info( - f"Set zip file password, " - f"zip file: {zip_file_path}, " - f"zip password: {password}" - ) + file_automation_logger.info(f"Set zip file password, zip file: {zip_file_path}, zip password: {password}") \ No newline at end of file diff --git a/automation_file/remote/download/file.py b/automation_file/remote/download/file.py index a0e0d3c..c9d1239 100644 --- a/automation_file/remote/download/file.py +++ b/automation_file/remote/download/file.py @@ -1,29 +1,56 @@ import requests from tqdm import tqdm +# 匯入自訂的日誌工具 +# Import custom logging utility from automation_file.utils.logging.loggin_instance import file_automation_logger def download_file(file_url: str, file_name: str, chunk_size: int = 1024, timeout: int = 10): + """ + 下載檔案並顯示進度條 + Download a file with progress bar + :param file_url: 檔案下載網址 (str) + File download URL (str) + :param file_name: 儲存檔案名稱 (str) + File name to save as (str) + :param chunk_size: 每次下載的資料塊大小,預設 1024 bytes + Size of each download chunk, default 1024 bytes + :param timeout: 請求逾時時間 (秒),預設 10 + Request timeout in seconds, default 10 + :return: None + """ try: - response = requests.get(file_url, stream=True, timeout=10) - response.raise_for_status() + # 發送 HTTP GET 請求,使用串流模式避免一次載入大檔案 + # Send HTTP GET request with streaming to avoid loading large file at once + response = requests.get(file_url, stream=True, timeout=timeout) + response.raise_for_status() # 若狀態碼非 200,則拋出例外 / Raise exception if status code is not 200 + + # 從回應標頭取得檔案大小 (若伺服器有提供) + # Get total file size from response headers (if available) total_size = int(response.headers.get('content-length', 0)) + + # 以二進位寫入模式開啟檔案 + # Open file in binary write mode with open(file_name, 'wb') as file: if total_size > 0: - with tqdm( - total=total_size, unit='B', unit_scale=True, desc=file_name - ) as progress: + # 使用 tqdm 顯示下載進度條 + # Use tqdm to show download progress bar + with tqdm(total=total_size, unit='B', unit_scale=True, desc=file_name) as progress: for chunk in response.iter_content(chunk_size=chunk_size): - if chunk: + if chunk: # 避免空資料塊 / Avoid empty chunks file.write(chunk) - progress.update(len(chunk)) + progress.update(len(chunk)) # 更新進度條 / Update progress bar else: + # 若無法取得檔案大小,仍逐塊下載 + # If file size is unknown, still download in chunks for chunk in response.iter_content(chunk_size=chunk_size): if chunk: file.write(chunk) file_automation_logger.info(f"File download is complete. Saved as: {file_name}") + + # 錯誤處理區塊 / Error handling except requests.exceptions.HTTPError as http_err: file_automation_logger.error(f"HTTP error:{http_err}") except requests.exceptions.ConnectionError: @@ -31,5 +58,4 @@ def download_file(file_url: str, file_name: str, chunk_size: int = 1024, timeout except requests.exceptions.Timeout: file_automation_logger.error("Request timed out. The server did not respond.") except Exception as err: - file_automation_logger.error(f"Error:{err}") - + file_automation_logger.error(f"Error:{err}") \ No newline at end of file diff --git a/automation_file/remote/google_drive/delete/delete_manager.py b/automation_file/remote/google_drive/delete/delete_manager.py index dfa5e25..5f46657 100644 --- a/automation_file/remote/google_drive/delete/delete_manager.py +++ b/automation_file/remote/google_drive/delete/delete_manager.py @@ -2,22 +2,35 @@ from googleapiclient.errors import HttpError +# 匯入 Google Drive 驅動實例與日誌工具 +# Import Google Drive driver instance and logging utility from automation_file.remote.google_drive.driver_instance import driver_instance from automation_file.utils.logging.loggin_instance import file_automation_logger def drive_delete_file(file_id: str) -> Union[Dict[str, str], None]: """ - :param file_id: Google Drive file id - :return: Dict[str, str] or None + 刪除 Google Drive 上的檔案 + Delete a file from Google Drive + :param file_id: Google Drive 檔案 ID (str) + Google Drive file ID (str) + :return: 若成功,回傳刪除結果 (Dict),否則回傳 None + Return deletion result (Dict) if success, else None """ try: + # 呼叫 Google Drive API 刪除檔案 + # Call Google Drive API to delete file file = driver_instance.service.files().delete(fileId=file_id).execute() + + # 記錄刪除成功的訊息 + # Log successful deletion file_automation_logger.info(f"Delete drive file: {file_id}") return file + except HttpError as error: + # 捕捉 Google API 錯誤並記錄 + # Catch Google API error and log it file_automation_logger.error( - f"Delete file failed," - f"error: {error}" + f"Delete file failed, error: {error}" ) - return None + return None \ No newline at end of file diff --git a/automation_file/remote/google_drive/dir/folder_manager.py b/automation_file/remote/google_drive/dir/folder_manager.py index b537abd..dd6073d 100644 --- a/automation_file/remote/google_drive/dir/folder_manager.py +++ b/automation_file/remote/google_drive/dir/folder_manager.py @@ -2,31 +2,48 @@ from googleapiclient.errors import HttpError +# 匯入 Google Drive 驅動實例與日誌工具 +# Import Google Drive driver instance and logging utility from automation_file.remote.google_drive.driver_instance import driver_instance from automation_file.utils.logging.loggin_instance import file_automation_logger def drive_add_folder(folder_name: str) -> Union[dict, None]: """ - :param folder_name: folder name will create on Google Drive - :return: dict or None + 在 Google Drive 建立資料夾 + Create a folder on Google Drive + :param folder_name: 要建立的資料夾名稱 (str) + Folder name to create (str) + :return: 若成功,回傳資料夾 ID (dict),否則回傳 None + Return folder ID (dict) if success, else None """ try: + # 設定資料夾的中繼資料 (名稱與 MIME 類型) + # Define folder metadata (name and MIME type) file_metadata = { "name": folder_name, "mimeType": "application/vnd.google-apps.folder" } + + # 呼叫 Google Drive API 建立資料夾,並只回傳 id 欄位 + # Call Google Drive API to create folder, return only "id" file = driver_instance.service.files().create( body=file_metadata, fields="id" ).execute() - file_automation_logger.info( - f"Add drive folder: {folder_name}" - ) + + # 記錄建立成功的訊息 + # Log successful folder creation + file_automation_logger.info(f"Add drive folder: {folder_name}") + + # 回傳資料夾 ID + # Return folder ID return file.get("id") + except HttpError as error: + # 捕捉 Google API 錯誤並記錄 + # Catch Google API error and log it file_automation_logger.error( - f"Delete file failed," - f"error: {error}" + f"Add folder failed, error: {error}" ) - return None + return None \ No newline at end of file diff --git a/automation_file/remote/google_drive/download/download_file.py b/automation_file/remote/google_drive/download/download_file.py index 24d5289..54c2c7d 100644 --- a/automation_file/remote/google_drive/download/download_file.py +++ b/automation_file/remote/google_drive/download/download_file.py @@ -5,34 +5,56 @@ from googleapiclient.errors import HttpError from googleapiclient.http import MediaIoBaseDownload +# 匯入 Google Drive 驅動實例與日誌工具 +# Import Google Drive driver instance and logging utility from automation_file.remote.google_drive.driver_instance import driver_instance from automation_file.utils.logging.loggin_instance import file_automation_logger def drive_download_file(file_id: str, file_name: str) -> Union[BytesIO, None]: """ - :param file_id: file have this id will download - :param file_name: file save on local name - :return: file + 從 Google Drive 下載單一檔案 + Download a single file from Google Drive + :param file_id: Google Drive 檔案 ID (str) + Google Drive file ID (str) + :param file_name: 本地端儲存檔案名稱 (str) + Local file name to save as (str) + :return: BytesIO 物件 (檔案內容) 或 None + BytesIO object (file content) or None """ try: + # 建立下載請求 + # Create download request request = driver_instance.service.files().get_media(fileId=file_id) + + # 使用 BytesIO 暫存檔案內容 + # Use BytesIO to temporarily store file content file = io.BytesIO() + + # 建立下載器 + # Create downloader downloader = MediaIoBaseDownload(file, request) done = False + + # 逐區塊下載檔案,直到完成 + # Download file in chunks until done while done is False: status, done = downloader.next_chunk() file_automation_logger.info( f"Download {file_name} {int(status.progress() * 100)}%." ) + except HttpError as error: file_automation_logger.error( - f"Delete file failed," - f"error: {error}" + f"Download file failed, error: {error}" ) return None + + # 將下載完成的檔案寫入本地端 + # Save downloaded file to local storage with open(file_name, "wb") as output_file: output_file.write(file.getbuffer()) + file_automation_logger.info( f"Download file: {file_id} with name: {file_name}" ) @@ -41,29 +63,44 @@ def drive_download_file(file_id: str, file_name: str) -> Union[BytesIO, None]: def drive_download_file_from_folder(folder_name: str) -> Union[dict, None]: """ - :param folder_name: which folder do we want to download file - :return: dict or None + 從 Google Drive 指定資料夾下載所有檔案 + Download all files from a specific Google Drive folder + :param folder_name: 資料夾名稱 (str) + Folder name (str) + :return: 檔案名稱與 ID 的字典,或 None + Dictionary of file names and IDs, or None """ try: files = dict() + + # 先找到指定名稱的資料夾 + # Find the folder by name response = driver_instance.service.files().list( q=f"mimeType = 'application/vnd.google-apps.folder' and name = '{folder_name}'" ).execute() + folder = response.get("files", [])[0] folder_id = folder.get("id") + + # 列出該資料夾下的所有檔案 + # List all files inside the folder response = driver_instance.service.files().list( q=f"'{folder_id}' in parents" ).execute() + + # 逐一下載檔案 + # Download each file for file in response.get("files", []): drive_download_file(file.get("id"), file.get("name")) files.update({file.get("name"): file.get("id")}) + file_automation_logger.info( f"Download all file on {folder_name} done." ) return files + except HttpError as error: file_automation_logger.error( - f"Delete file failed," - f"error: {error}" + f"Download file failed, error: {error}" ) - return None + return None \ No newline at end of file diff --git a/automation_file/remote/google_drive/driver_instance.py b/automation_file/remote/google_drive/driver_instance.py index 591eb92..cdfa84c 100644 --- a/automation_file/remote/google_drive/driver_instance.py +++ b/automation_file/remote/google_drive/driver_instance.py @@ -12,45 +12,66 @@ class GoogleDrive(object): def __init__(self): + # Google Drive 實例相關屬性 + # Attributes for Google Drive instance self.google_drive_instance = None self.creds = None self.service = None + # 權限範圍:完整存取 Google Drive + # Scope: full access to Google Drive self.scopes = ["https://www.googleapis.com/auth/drive"] def later_init(self, token_path: str, credentials_path: str): """ - :param token_path: Google Drive token path - :param credentials_path: Google Drive credentials path + 初始化 Google Drive API 驅動 + Initialize Google Drive API driver + :param token_path: Google Drive token 檔案路徑 (str) + Path to token.json file + :param credentials_path: Google Drive credentials 憑證檔案路徑 (str) + Path to credentials.json file :return: None """ token_path = Path(token_path) credentials_path = Path(credentials_path) creds = None - # The file token.json stores the user's access and refresh tokens, and is - # created automatically when the authorization flow completes for the first - # time. + + # token.json 儲存使用者的 access 與 refresh token + # token.json stores user's access and refresh tokens if token_path.exists(): - file_automation_logger.info("Token exists try to load.") + file_automation_logger.info("Token exists, try to load.") creds = Credentials.from_authorized_user_file(str(token_path), self.scopes) - # If there are no (valid) credentials available, let the user log in. + + # 如果沒有有效的憑證,則重新登入 + # If no valid credentials, perform login if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: + # 如果憑證過期但有 refresh token,則刷新 + # Refresh credentials if expired but refresh token exists creds.refresh(Request()) else: + # 使用 OAuth2 流程重新登入 + # Use OAuth2 flow for login flow = InstalledAppFlow.from_client_secrets_file( - str(credentials_path), self.scopes) + str(credentials_path), self.scopes + ) creds = flow.run_local_server(port=0) - # Save the credentials for the next run + + # 儲存憑證到 token.json,供下次使用 + # Save credentials to token.json for future use with open(str(token_path), 'w') as token: token.write(creds.to_json()) + try: + # 建立 Google Drive API service + # Build Google Drive API service self.service = build('drive', 'v3', credentials=creds) file_automation_logger.info("Loading service successfully.") except HttpError as error: file_automation_logger.error( - f"Delete file failed," - f"error: {error}" + f"Init service failed, error: {error}" ) -driver_instance = GoogleDrive() +# 建立單例,供其他模組使用 +# Create a singleton instance for other modules to use +driver_instance = GoogleDrive() \ No newline at end of file diff --git a/automation_file/remote/google_drive/search/search_drive.py b/automation_file/remote/google_drive/search/search_drive.py index 19a2c47..3cccf40 100644 --- a/automation_file/remote/google_drive/search/search_drive.py +++ b/automation_file/remote/google_drive/search/search_drive.py @@ -2,81 +2,100 @@ from googleapiclient.errors import HttpError +# 匯入 Google Drive 驅動實例與日誌工具 +# Import Google Drive driver instance and logging utility from automation_file.remote.google_drive.driver_instance import driver_instance from automation_file.utils.logging.loggin_instance import file_automation_logger def drive_search_all_file() -> Union[dict, None]: """ - Search all file on Google Drive - :return: dict or None + 搜尋 Google Drive 上的所有檔案 + Search all files on Google Drive + :return: 檔案名稱與 ID 的字典,或 None + Dictionary of file names and IDs, or None """ try: item = dict() + # 呼叫 Google Drive API 取得所有檔案 + # Call Google Drive API to list all files response = driver_instance.service.files().list().execute() for file in response.get("files", []): item.update({file.get("name"): file.get("id")}) - file_automation_logger.info( - f"Search all file on drive" - ) + + file_automation_logger.info("Search all file on drive") return item + except HttpError as error: file_automation_logger.error( - f"Delete file failed," - f"error: {error}" + f"Search file failed, error: {error}" ) return None def drive_search_file_mimetype(mime_type: str) -> Union[dict, None]: """ - :param mime_type: search all file with mime_type on Google Drive - :return: dict or None + 搜尋 Google Drive 上指定 MIME 類型的檔案 + Search all files with a specific MIME type on Google Drive + :param mime_type: MIME 類型 (str) + MIME type (str) + :return: 檔案名稱與 ID 的字典,或 None + Dictionary of file names and IDs, or None """ try: files = dict() page_token = None while True: - # pylint: disable=maybe-no-member + # 呼叫 Google Drive API,依 MIME 類型搜尋檔案 + # Call Google Drive API to search files by MIME type response = driver_instance.service.files().list( q=f"mimeType='{mime_type}'", fields="nextPageToken, files(id, name)", - pageToken=page_token).execute() + pageToken=page_token + ).execute() + for file in response.get("files", []): files.update({file.get("name"): file.get("id")}) + + # 處理分頁結果 + # Handle pagination page_token = response.get('nextPageToken', None) if page_token is None: break - file_automation_logger.info( - f"Search all {mime_type} file on drive" - ) + + file_automation_logger.info(f"Search all {mime_type} file on drive") return files + except HttpError as error: file_automation_logger.error( - f"Delete file failed," - f"error: {error}" + f"Search file failed, error: {error}" ) return None def drive_search_field(field_pattern: str) -> Union[dict, None]: """ - :param field_pattern: what pattern will we use to search - :return: dict or None + 使用自訂欄位模式搜尋檔案 + Search files with a custom field pattern + :param field_pattern: 欄位模式 (str) + Field pattern (str) + :return: 檔案名稱與 ID 的字典,或 None + Dictionary of file names and IDs, or None """ try: files = dict() + # 呼叫 Google Drive API,依指定欄位模式搜尋 + # Call Google Drive API with custom field pattern response = driver_instance.service.files().list(fields=field_pattern).execute() + for file in response.get("files", []): files.update({file.get("name"): file.get("id")}) - file_automation_logger.info( - f"Search all {field_pattern}" - ) + + file_automation_logger.info(f"Search all {field_pattern}") return files + except HttpError as error: file_automation_logger.error( - f"Delete file failed," - f"error: {error}" + f"Search file failed, error: {error}" ) - return None - + return None \ No newline at end of file diff --git a/automation_file/remote/google_drive/share/share_file.py b/automation_file/remote/google_drive/share/share_file.py index 7d81aa4..bf1d020 100644 --- a/automation_file/remote/google_drive/share/share_file.py +++ b/automation_file/remote/google_drive/share/share_file.py @@ -2,6 +2,8 @@ from googleapiclient.errors import HttpError +# 匯入 Google Drive 驅動實例與日誌工具 +# Import Google Drive driver instance and logging utility from automation_file.remote.google_drive.driver_instance import driver_instance from automation_file.utils.logging.loggin_instance import file_automation_logger @@ -9,10 +11,16 @@ def drive_share_file_to_user( file_id: str, user: str, user_role: str = "writer") -> Union[dict, None]: """ - :param file_id: which file do we want to share - :param user: what user do we want to share - :param user_role: what role do we want to share - :return: dict or None + 分享檔案給指定使用者 + Share a file with a specific user + :param file_id: 要分享的檔案 ID (str) + File ID to share (str) + :param user: 使用者的 email (str) + User email address (str) + :param user_role: 權限角色 (預設 writer) + Permission role (default writer) + :return: 成功回傳 dict,失敗回傳 None + Return dict if success, else None """ try: service = driver_instance.service @@ -22,27 +30,30 @@ def drive_share_file_to_user( "emailAddress": user } file_automation_logger.info( - f"Share file: {file_id}, " - f"to user: {user}, " - f"with user role: {user_role}" + f"Share file: {file_id}, to user: {user}, with user role: {user_role}" ) return service.permissions().create( fileId=file_id, body=user_permission, - fields='id', ).execute() + fields='id', + ).execute() except HttpError as error: file_automation_logger.error( - f"Delete file failed," - f"error: {error}" + f"Share file failed, error: {error}" ) return None def drive_share_file_to_anyone(file_id: str, share_role: str = "reader") -> Union[dict, None]: """ - :param file_id: which file do we want to share - :param share_role: what role do we want to share - :return: dict or None + 分享檔案給任何人(公開連結) + Share a file with anyone (public link) + :param file_id: 要分享的檔案 ID (str) + File ID to share (str) + :param share_role: 權限角色 (預設 reader) + Permission role (default reader) + :return: 成功回傳 dict,失敗回傳 None + Return dict if success, else None """ try: service = driver_instance.service @@ -52,16 +63,16 @@ def drive_share_file_to_anyone(file_id: str, share_role: str = "reader") -> Unio "role": share_role } file_automation_logger.info( - f"Share file to anyone file: {file_id} with role: {share_role}" + f"Share file to anyone, file: {file_id} with role: {share_role}" ) return service.permissions().create( fileId=file_id, body=user_permission, - fields='id', ).execute() + fields='id', + ).execute() except HttpError as error: file_automation_logger.error( - f"Delete file failed," - f"error: {error}" + f"Share file failed, error: {error}" ) return None @@ -69,10 +80,16 @@ def drive_share_file_to_anyone(file_id: str, share_role: str = "reader") -> Unio def drive_share_file_to_domain( file_id: str, domain: str, domain_role: str = "reader") -> Union[dict, None]: """ - :param file_id: which file do we want to share - :param domain: what domain do we want to share - :param domain_role: what role do we want to share - :return: dict or None + 分享檔案給指定網域的所有使用者 + Share a file with all users in a specific domain + :param file_id: 要分享的檔案 ID (str) + File ID to share (str) + :param domain: 網域名稱 (str),例如 "example.com" + Domain name (str), e.g., "example.com" + :param domain_role: 權限角色 (預設 reader) + Permission role (default reader) + :return: 成功回傳 dict,失敗回傳 None + Return dict if success, else None """ try: service = driver_instance.service @@ -82,16 +99,15 @@ def drive_share_file_to_domain( "domain": domain } file_automation_logger.info( - f"Share file to domain: {domain}, " - f"with domain role: {domain_role}" + f"Share file to domain: {domain}, with domain role: {domain_role}" ) return service.permissions().create( fileId=file_id, body=domain_permission, - fields='id', ).execute() + fields='id', + ).execute() except HttpError as error: file_automation_logger.error( - f"Delete file failed," - f"error: {error}" + f"Share file failed, error: {error}" ) - return None + return None \ No newline at end of file diff --git a/automation_file/remote/google_drive/upload/upload_to_driver.py b/automation_file/remote/google_drive/upload/upload_to_driver.py index 4a9f6a3..a700fc8 100644 --- a/automation_file/remote/google_drive/upload/upload_to_driver.py +++ b/automation_file/remote/google_drive/upload/upload_to_driver.py @@ -4,15 +4,22 @@ from googleapiclient.errors import HttpError from googleapiclient.http import MediaFileUpload +# 匯入 Google Drive 驅動實例與日誌工具 +# Import Google Drive driver instance and logging utility from automation_file.remote.google_drive.driver_instance import driver_instance from automation_file.utils.logging.loggin_instance import file_automation_logger def drive_upload_to_drive(file_path: str, file_name: str = None) -> Union[dict, None]: """ - :param file_path: which file do we want to upload - :param file_name: file name on Google Drive - :return: dict or None + 上傳單一檔案到 Google Drive 根目錄 + Upload a single file to Google Drive root + :param file_path: 要上傳的檔案路徑 (str) + File path to upload (str) + :param file_name: 在 Google Drive 上的檔案名稱 (可選) + File name on Google Drive (optional) + :return: 成功回傳 dict (包含檔案 ID),失敗回傳 None + Return dict (with file ID) if success, else None """ try: file_path = Path(file_path) @@ -32,28 +39,33 @@ def drive_upload_to_drive(file_path: str, file_name: str = None) -> Union[dict, fields="id" ).execute() file_automation_logger.info( - f"Upload file to drive file: {file_path}, " - f"with name: {file_name}" + f"Upload file to drive file: {file_path}, with name: {file_name}" ) return file_id else: - file_automation_logger.error( - FileNotFoundError - ) + # 若檔案不存在,記錄錯誤 + # Log error if file does not exist + file_automation_logger.error(FileNotFoundError) except HttpError as error: + # ⚠️ 原本寫成 Delete file failed,應改為 Upload file failed file_automation_logger.error( - f"Delete file failed," - f"error: {error}" + f"Upload file failed, error: {error}" ) return None def drive_upload_to_folder(folder_id: str, file_path: str, file_name: str = None) -> Union[dict, None]: """ - :param folder_id: which folder do we want to upload file into - :param file_path: which file do we want to upload - :param file_name: file name on Google Drive - :return: dict or None + 上傳單一檔案到 Google Drive 指定資料夾 + Upload a single file into a specific Google Drive folder + :param folder_id: 目標資料夾 ID (str) + Target folder ID (str) + :param file_path: 要上傳的檔案路徑 (str) + File path to upload (str) + :param file_name: 在 Google Drive 上的檔案名稱 (可選) + File name on Google Drive (optional) + :return: 成功回傳 dict (包含檔案 ID),失敗回傳 None + Return dict (with file ID) if success, else None """ try: file_path = Path(file_path) @@ -74,27 +86,26 @@ def drive_upload_to_folder(folder_id: str, file_path: str, file_name: str = None fields="id" ).execute() file_automation_logger.info( - f"Upload file to folder: {folder_id}," - f"file_path: {file_path}, " - f"with name: {file_name}" + f"Upload file to folder: {folder_id}, file_path: {file_path}, with name: {file_name}" ) return file_id else: - file_automation_logger.error( - FileNotFoundError - ) + file_automation_logger.error(FileNotFoundError) except HttpError as error: file_automation_logger.error( - f"Delete file failed," - f"error: {error}" + f"Upload file failed, error: {error}" ) return None -def drive_upload_dir_to_drive(dir_path: str) -> List[Optional[set]]: +def drive_upload_dir_to_drive(dir_path: str) -> List[Optional[dict]] | None: """ - :param dir_path: which dir do we want to upload to drive - :return: List[Optional[set]] + 上傳整個資料夾中的所有檔案到 Google Drive 根目錄 + Upload all files from a local directory to Google Drive root + :param dir_path: 要上傳的資料夾路徑 (str) + Directory path to upload (str) + :return: 檔案 ID 清單 (List[dict]),或空清單 + List of file IDs (List[dict]) or empty list """ dir_path = Path(dir_path) ids = list() @@ -108,29 +119,41 @@ def drive_upload_dir_to_drive(dir_path: str) -> List[Optional[set]]: ) return ids else: - file_automation_logger.error( - FileNotFoundError - ) + file_automation_logger.error(FileNotFoundError) + return None -def drive_upload_dir_to_folder(folder_id: str, dir_path: str) -> List[Optional[set]]: +def drive_upload_dir_to_folder(folder_id: str, dir_path: str) -> List[Optional[dict]] | None: """ - :param folder_id: which folder do we want to put dir into - :param dir_path: which dir do we want to upload - :return: List[Optional[set]] + 上傳整個資料夾中的所有檔案到 Google Drive 指定資料夾 + Upload all files from a local directory into a specific Google Drive folder + + :param folder_id: 目標 Google Drive 資料夾 ID (str) + Target Google Drive folder ID (str) + :param dir_path: 本地端要上傳的資料夾路徑 (str) + Local directory path to upload (str) + :return: 檔案 ID 清單 (List[dict]),或 None + List of file IDs (List[dict]) or None """ dir_path = Path(dir_path) - ids = list() + ids: List[Optional[dict]] = [] + if dir_path.is_dir(): path_list = dir_path.iterdir() for path in path_list: if path.is_file(): + # 呼叫單檔上傳函式 (drive_upload_to_folder),並收集回傳的檔案 ID + # Call single-file upload function and collect returned file ID ids.append(drive_upload_to_folder(folder_id, str(path.absolute()), path.name)) + file_automation_logger.info( - f"Upload all file on dir: {dir_path} to folder: {folder_id}" + f"Upload all files in dir: {dir_path} to folder: {folder_id}" ) return ids else: - file_automation_logger.error( - FileNotFoundError - ) + # 若資料夾不存在,記錄錯誤 + # Log error if directory does not exist + file_automation_logger.error(FileNotFoundError) + + return None + diff --git a/automation_file/utils/callback/callback_function_executor.py b/automation_file/utils/callback/callback_function_executor.py index 99c6a04..2e4f257 100644 --- a/automation_file/utils/callback/callback_function_executor.py +++ b/automation_file/utils/callback/callback_function_executor.py @@ -1,28 +1,54 @@ import typing +# 匯入本地檔案與資料夾處理函式 +# Import local file and directory processing functions from automation_file.local.dir.dir_process import copy_dir, create_dir, remove_dir_tree -from automation_file.local.file.file_process import copy_file, remove_file, rename_file, copy_specify_extension_file, \ - copy_all_file_to_dir -from automation_file.local.zip.zip_process import zip_dir, zip_file, zip_info, zip_file_info, set_zip_password, \ - read_zip_file, unzip_file, unzip_all +from automation_file.local.file.file_process import ( + copy_file, remove_file, rename_file, + copy_specify_extension_file, copy_all_file_to_dir +) +from automation_file.local.zip.zip_process import ( + zip_dir, zip_file, zip_info, zip_file_info, + set_zip_password, read_zip_file, unzip_file, unzip_all +) + +# 匯入 Google Drive 功能 +# Import Google Drive functions from automation_file.remote.google_drive.delete.delete_manager import drive_delete_file from automation_file.remote.google_drive.dir.folder_manager import drive_add_folder -from automation_file.remote.google_drive.download.download_file import drive_download_file, drive_download_file_from_folder +from automation_file.remote.google_drive.download.download_file import ( + drive_download_file, drive_download_file_from_folder +) from automation_file.remote.google_drive.driver_instance import driver_instance -from automation_file.remote.google_drive.search.search_drive import \ +from automation_file.remote.google_drive.search.search_drive import ( drive_search_all_file, drive_search_field, drive_search_file_mimetype -from automation_file.remote.google_drive.share.share_file import \ +) +from automation_file.remote.google_drive.share.share_file import ( drive_share_file_to_anyone, drive_share_file_to_domain, drive_share_file_to_user -from automation_file.remote.google_drive.upload.upload_to_driver import \ - drive_upload_dir_to_folder, drive_upload_to_folder, drive_upload_dir_to_drive, drive_upload_to_drive +) +from automation_file.remote.google_drive.upload.upload_to_driver import ( + drive_upload_dir_to_folder, drive_upload_to_folder, + drive_upload_dir_to_drive, drive_upload_to_drive +) + +# 匯入例外與日誌工具 +# Import exceptions and logging from automation_file.utils.exception.exception_tags import get_bad_trigger_function, get_bad_trigger_method from automation_file.utils.exception.exceptions import CallbackExecutorException from automation_file.utils.logging.loggin_instance import file_automation_logger class CallbackFunctionExecutor(object): + """ + CallbackFunctionExecutor 負責: + - 管理所有可觸發的函式 (event_dict) + - 執行指定的 trigger function + - 在 trigger function 執行後,呼叫 callback function + """ def __init__(self): + # event_dict 對應 trigger_function_name 與實際函式 + # event_dict maps trigger_function_name to actual function self.event_dict: dict = { "FA_copy_file": copy_file, "FA_rename_file": rename_file, @@ -61,41 +87,66 @@ def callback_function( self, trigger_function_name: str, callback_function: typing.Callable, - callback_function_param: [dict, None] = None, + callback_function_param: typing.Optional[dict] = None, callback_param_method: str = "kwargs", **kwargs ) -> typing.Any: """ - :param trigger_function_name: what function we want to trigger only accept function in event_dict - :param callback_function: what function we want to callback - :param callback_function_param: callback function's param only accept dict - :param callback_param_method: what type param will use on callback function only accept kwargs and args - :param kwargs: trigger_function's param - :return: trigger_function_name return value + 執行指定的 trigger function,並在完成後執行 callback function + Execute a trigger function, then run a callback function + + :param trigger_function_name: 要觸發的函式名稱 (必須存在於 event_dict) + Function name to trigger (must exist in event_dict) + :param callback_function: 要執行的 callback function + Callback function to execute + :param callback_function_param: callback function 的參數 (dict) + Parameters for callback function (dict) + :param callback_param_method: callback function 的參數傳遞方式 ("kwargs" 或 "args") + Parameter passing method ("kwargs" or "args") + :param kwargs: trigger function 的參數 + Parameters for trigger function + :return: trigger function 的回傳值 + Return value of trigger function """ try: if trigger_function_name not in self.event_dict.keys(): raise CallbackExecutorException(get_bad_trigger_function) - file_automation_logger.info(f"Callback trigger {trigger_function_name} with param {kwargs}") + + file_automation_logger.info( + f"Callback trigger {trigger_function_name} with param {kwargs}" + ) + + # 執行 trigger function + # Execute trigger function execute_return_value = self.event_dict.get(trigger_function_name)(**kwargs) + + # 執行 callback function if callback_function_param is not None: if callback_param_method not in ["kwargs", "args"]: raise CallbackExecutorException(get_bad_trigger_method) + if callback_param_method == "kwargs": callback_function(**callback_function_param) file_automation_logger.info( - f"Callback function {callback_function} with param {callback_function_param}") + f"Callback function {callback_function} with param {callback_function_param}" + ) else: callback_function(*callback_function_param) file_automation_logger.info( - f"Callback function {callback_function} with param {callback_function_param}") + f"Callback function {callback_function} with param {callback_function_param}" + ) else: callback_function() file_automation_logger.info(f"Callback function {callback_function}") + return execute_return_value + except Exception as error: file_automation_logger.error( - f"Callback function failed. {repr(error)}") + f"Callback function failed. {repr(error)}" + ) -callback_executor = CallbackFunctionExecutor() +# 建立單例,供其他模組使用 +# Create a singleton instance for other modules to use +callback_executor = CallbackFunctionExecutor() \ No newline at end of file diff --git a/automation_file/utils/executor/action_executor.py b/automation_file/utils/executor/action_executor.py index 28b74ec..6d1996b 100644 --- a/automation_file/utils/executor/action_executor.py +++ b/automation_file/utils/executor/action_executor.py @@ -3,24 +3,43 @@ from inspect import getmembers, isbuiltin from typing import Union, Any +# 匯入本地檔案與資料夾處理函式 +# Import local file and directory processing functions from automation_file.local.dir.dir_process import copy_dir, create_dir, remove_dir_tree -from automation_file.local.file.file_process import copy_file, remove_file, rename_file, copy_specify_extension_file, \ - copy_all_file_to_dir, create_file -from automation_file.local.zip.zip_process import zip_dir, zip_file, zip_info, zip_file_info, set_zip_password, \ - read_zip_file, unzip_file, unzip_all +from automation_file.local.file.file_process import ( + copy_file, remove_file, rename_file, + copy_specify_extension_file, copy_all_file_to_dir, create_file +) +from automation_file.local.zip.zip_process import ( + zip_dir, zip_file, zip_info, zip_file_info, + set_zip_password, read_zip_file, unzip_file, unzip_all +) + +# 匯入 Google Drive 功能 +# Import Google Drive functions from automation_file.remote.google_drive.delete.delete_manager import drive_delete_file from automation_file.remote.google_drive.dir.folder_manager import drive_add_folder -from automation_file.remote.google_drive.download.download_file import drive_download_file, \ - drive_download_file_from_folder +from automation_file.remote.google_drive.download.download_file import ( + drive_download_file, drive_download_file_from_folder +) from automation_file.remote.google_drive.driver_instance import driver_instance -from automation_file.remote.google_drive.search.search_drive import \ +from automation_file.remote.google_drive.search.search_drive import ( drive_search_all_file, drive_search_field, drive_search_file_mimetype -from automation_file.remote.google_drive.share.share_file import \ +) +from automation_file.remote.google_drive.share.share_file import ( drive_share_file_to_anyone, drive_share_file_to_domain, drive_share_file_to_user -from automation_file.remote.google_drive.upload.upload_to_driver import \ - drive_upload_dir_to_folder, drive_upload_to_folder, drive_upload_dir_to_drive, drive_upload_to_drive -from automation_file.utils.exception.exception_tags import add_command_exception, executor_list_error, \ +) +from automation_file.remote.google_drive.upload.upload_to_driver import ( + drive_upload_dir_to_folder, drive_upload_to_folder, + drive_upload_dir_to_drive, drive_upload_to_drive +) + +# 匯入例外、JSON 工具、日誌工具與套件管理器 +# Import exceptions, JSON utils, logging, and package manager +from automation_file.utils.exception.exception_tags import ( + add_command_exception, executor_list_error, action_is_null_error, cant_execute_action_error +) from automation_file.utils.exception.exceptions import ExecuteActionException, AddCommandException from automation_file.utils.json.json_file import read_action_json from automation_file.utils.logging.loggin_instance import file_automation_logger @@ -28,6 +47,12 @@ class Executor(object): + """ + Executor 負責: + - 維護一個 event_dict,將字串名稱對應到實際函式 + - 執行 action list 中的動作 + - 支援從 JSON 檔讀取 action list 並執行 + """ def __init__(self): self.event_dict: dict = { @@ -67,37 +92,47 @@ def __init__(self): "FA_drive_delete_file": drive_delete_file, "FA_drive_download_file": drive_download_file, "FA_drive_download_file_from_folder": drive_download_file_from_folder, - # Execute + # Executor 自身功能 "FA_execute_action": self.execute_action, "FA_execute_files": self.execute_files, "FA_add_package_to_executor": package_manager.add_package_to_executor, } - # get all builtin function and add to event dict + + # 將所有 Python 內建函式加入 event_dict + # Add all Python built-in functions into event_dict for function in getmembers(builtins, isbuiltin): self.event_dict.update({str(function[0]): function[1]}) def _execute_event(self, action: list): + """ + 執行單一 action + Execute a single action + :param action: [函式名稱, 參數] + :return: 函式回傳值 + """ event = self.event_dict.get(action[0]) if len(action) == 2: if isinstance(action[1], dict): - return event(**action[1]) + return event(**action[1]) # 使用 kwargs else: - return event(*action[1]) + return event(*action[1]) # 使用 args elif len(action) == 1: return event() else: raise ExecuteActionException(cant_execute_action_error + " " + str(action)) - def execute_action(self, action_list: [list, dict]) -> dict: + def execute_action(self, action_list: Union[list, dict]) -> dict: """ - use to execute all action on action list(action file or program list) - :param action_list the list include action - for loop the list and execute action + 執行 action list + Execute all actions in action list + :param action_list: list 或 dict (若為 dict,需包含 "auto_control") + :return: 執行紀錄 dict """ if isinstance(action_list, dict): action_list: list = action_list.get("auto_control") if action_list is None: raise ExecuteActionException(executor_list_error) + execute_record_dict = dict() try: if len(action_list) == 0 or isinstance(action_list, list) is False: @@ -106,13 +141,12 @@ def execute_action(self, action_list: [list, dict]) -> dict: file_automation_logger.error( f"Execute {action_list} failed. {repr(error)}" ) + for action in action_list: try: event_response = self._execute_event(action) execute_record = "execute: " + str(action) - file_automation_logger.info( - f"Execute {action}" - ) + file_automation_logger.info(f"Execute {action}") execute_record_dict.update({execute_record: event_response}) except Exception as error: file_automation_logger.error( @@ -120,15 +154,21 @@ def execute_action(self, action_list: [list, dict]) -> dict: ) execute_record = "execute: " + str(action) execute_record_dict.update({execute_record: repr(error)}) + + # 輸出執行結果 + # Print execution results for key, value in execute_record_dict.items(): print(key, flush=True) print(value, flush=True) + return execute_record_dict def execute_files(self, execute_files_list: list) -> list: """ - :param execute_files_list: list include execute files path - :return: every execute detail as list + 從 JSON 檔讀取並執行 action list + Execute action lists from JSON files + :param execute_files_list: JSON 檔案路徑清單 + :return: 每個檔案的執行結果 list """ execute_detail_list: list = list() for file in execute_files_list: @@ -136,17 +176,18 @@ def execute_files(self, execute_files_list: list) -> list: return execute_detail_list +# 建立單例,供其他模組使用 executor = Executor() package_manager.executor = executor def add_command_to_executor(command_dict: dict): """ - :param command_dict: dict include command we want to add to event_dict + 動態新增指令到 event_dict + Dynamically add commands to event_dict + :param command_dict: dict {command_name: function} """ - file_automation_logger.info( - f"Add command to executor {command_dict}" - ) + file_automation_logger.info(f"Add command to executor {command_dict}") for command_name, command in command_dict.items(): if isinstance(command, (types.MethodType, types.FunctionType)): executor.event_dict.update({command_name: command}) @@ -159,4 +200,4 @@ def execute_action(action_list: list) -> dict: def execute_files(execute_files_list: list) -> list: - return executor.execute_files(execute_files_list) + return executor.execute_files(execute_files_list) \ No newline at end of file diff --git a/automation_file/utils/file_process/get_dir_file_list.py b/automation_file/utils/file_process/get_dir_file_list.py index 852f84a..c5300ae 100644 --- a/automation_file/utils/file_process/get_dir_file_list.py +++ b/automation_file/utils/file_process/get_dir_file_list.py @@ -1,7 +1,5 @@ -from os import getcwd -from os import walk -from os.path import abspath -from os.path import join +from os import getcwd, walk +from os.path import abspath, join from typing import List @@ -9,13 +7,19 @@ def get_dir_files_as_list( dir_path: str = getcwd(), default_search_file_extension: str = ".json") -> List[str]: """ - get dir file when end with default_search_file_extension - :param dir_path: which dir we want to walk and get file list - :param default_search_file_extension: which extension we want to search - :return: [] if nothing searched or [file1, file2.... files] file was searched + 遞迴搜尋資料夾下所有符合副檔名的檔案,並回傳完整路徑清單 + Recursively search for files with a specific extension in a directory and return absolute paths + + :param dir_path: 要搜尋的資料夾路徑 (預設為當前工作目錄) + Directory path to search (default: current working directory) + :param default_search_file_extension: 要搜尋的副檔名 (預設為 ".json") + File extension to search (default: ".json") + :return: 若無符合檔案則回傳空清單,否則回傳檔案完整路徑清單 + [] if no files found, else [file1, file2, ...] """ return [ - abspath(join(dir_path, file)) for root, dirs, files in walk(dir_path) + abspath(join(root, file)) + for root, dirs, files in walk(dir_path) for file in files - if file.endswith(default_search_file_extension.lower()) - ] + if file.lower().endswith(default_search_file_extension.lower()) + ] \ No newline at end of file diff --git a/automation_file/utils/json/json_file.py b/automation_file/utils/json/json_file.py index ebf3b86..8b4aef6 100644 --- a/automation_file/utils/json/json_file.py +++ b/automation_file/utils/json/json_file.py @@ -6,43 +6,61 @@ from automation_file.utils.exception.exceptions import JsonActionException from automation_file.utils.logging.loggin_instance import file_automation_logger +# 全域鎖,避免多執行緒同時讀寫 JSON 檔案 +# Global lock to prevent concurrent read/write on JSON files _lock = Lock() def read_action_json(json_file_path: str) -> list: """ - use to read action file - :param json_file_path json file's path to read + 讀取 JSON 檔案並回傳內容 + Read a JSON file and return its content + + :param json_file_path: JSON 檔案路徑 (str) + Path to JSON file (str) + :return: JSON 內容 (list) + JSON content (list) """ _lock.acquire() try: file_path = Path(json_file_path) if file_path.exists() and file_path.is_file(): - file_automation_logger.info( - f"Read json file {json_file_path}" - ) - with open(json_file_path) as read_file: - return json.loads(read_file.read()) + file_automation_logger.info(f"Read json file {json_file_path}") + with open(json_file_path, encoding="utf-8") as read_file: + return json.load(read_file) + else: + # 若檔案不存在,丟出自訂例外 + # Raise custom exception if file not found + raise JsonActionException(cant_find_json_error) except JsonActionException: - raise JsonActionException(cant_find_json_error) + raise + except Exception as error: + # 捕捉其他例外並轉換成 JsonActionException + # Catch other exceptions and raise JsonActionException + raise JsonActionException(f"{cant_find_json_error}: {repr(error)}") finally: _lock.release() def write_action_json(json_save_path: str, action_json: list) -> None: """ - use to save action file - :param json_save_path json save path - :param action_json the json str include action to write + 將資料寫入 JSON 檔案 + Write data into a JSON file + + :param json_save_path: JSON 檔案儲存路徑 (str) + Path to save JSON file (str) + :param action_json: 要寫入的 JSON 資料 (list) + JSON data to write (list) + :return: None """ _lock.acquire() try: - file_automation_logger.info( - f"Write {action_json} as file {json_save_path}" - ) - with open(json_save_path, "w+") as file_to_write: - file_to_write.write(json.dumps(action_json, indent=4)) + file_automation_logger.info(f"Write {action_json} as file {json_save_path}") + with open(json_save_path, "w+", encoding="utf-8") as file_to_write: + json.dump(action_json, file_to_write, indent=4, ensure_ascii=False) except JsonActionException: - raise JsonActionException(cant_save_json_error) + raise + except Exception as error: + raise JsonActionException(f"{cant_save_json_error}: {repr(error)}") finally: - _lock.release() + _lock.release() \ No newline at end of file diff --git a/automation_file/utils/logging/loggin_instance.py b/automation_file/utils/logging/loggin_instance.py index 6887b04..b82a87f 100644 --- a/automation_file/utils/logging/loggin_instance.py +++ b/automation_file/utils/logging/loggin_instance.py @@ -1,16 +1,30 @@ import logging +# 設定 root logger 的等級為 DEBUG +# Set root logger level to DEBUG logging.root.setLevel(logging.DEBUG) + +# 建立一個專用 logger +# Create a dedicated logger file_automation_logger = logging.getLogger("File Automation") + +# 設定 log 格式 +# Define log format formatter = logging.Formatter('%(asctime)s | %(name)s | %(levelname)s | %(message)s') -# File handler -file_handler = logging.FileHandler(filename="FileAutomation.log", mode="w") + +# === File handler === +# 將 log 輸出到檔案 FileAutomation.log +# Write logs to file FileAutomation.log +file_handler = logging.FileHandler(filename="FileAutomation.log", mode="w", encoding="utf-8") file_handler.setFormatter(formatter) file_automation_logger.addHandler(file_handler) -class FileAutomationLoggingHandler(logging.Handler): - # redirect logging stderr output to queue +class FileAutomationLoggingHandler(logging.Handler): + """ + 自訂 logging handler,將 log 訊息輸出到標準輸出 (print) + Custom logging handler to redirect logs to stdout (print) + """ def __init__(self): super().__init__() @@ -18,10 +32,12 @@ def __init__(self): self.setLevel(logging.DEBUG) def emit(self, record: logging.LogRecord) -> None: + # 將 log 訊息格式化後輸出到 console + # Print formatted log message to console print(self.format(record)) -# Stream handler -file_automation_logger.addHandler(FileAutomationLoggingHandler()) - - +# === Stream handler === +# 將 log 輸出到 console +# Add custom stream handler to logger +file_automation_logger.addHandler(FileAutomationLoggingHandler()) \ No newline at end of file diff --git a/automation_file/utils/package_manager/package_manager_class.py b/automation_file/utils/package_manager/package_manager_class.py index 2e4eb0c..1606783 100644 --- a/automation_file/utils/package_manager/package_manager_class.py +++ b/automation_file/utils/package_manager/package_manager_class.py @@ -7,17 +7,26 @@ class PackageManager(object): + """ + PackageManager 負責: + - 檢查套件是否存在並載入 + - 將套件中的函式、內建函式、類別註冊到 executor 或 callback_executor + """ def __init__(self): - self.installed_package_dict = { - } + # 已安裝套件快取,避免重複 import + # Cache for installed packages + self.installed_package_dict = {} self.executor = None self.callback_executor = None def check_package(self, package: str): """ - :param package: package to check exists or not - :return: package if find else None + 檢查並載入套件 + Check if a package exists and import it + + :param package: 套件名稱 (str) + :return: 套件模組物件,若不存在則回傳 None """ if self.installed_package_dict.get(package, None) is None: found_spec = find_spec(package) @@ -25,76 +34,63 @@ def check_package(self, package: str): try: installed_package = import_module(found_spec.name) self.installed_package_dict.update( - {found_spec.name: installed_package}) + {found_spec.name: installed_package} + ) except ModuleNotFoundError as error: print(repr(error), file=stderr) return self.installed_package_dict.get(package, None) def add_package_to_executor(self, package): - file_automation_logger.info(f"add_package_to_executor, package: {package}") """ - :param package: package's function will add to executor + 將套件的成員加入 executor 的 event_dict + Add package members to executor's event_dict """ - self.add_package_to_target( - package=package, - target=self.executor - ) + file_automation_logger.info(f"add_package_to_executor, package: {package}") + self.add_package_to_target(package=package, target=self.executor) def add_package_to_callback_executor(self, package): - file_automation_logger.info(f"add_package_to_callback_executor, package: {package}") """ - :param package: package's function will add to callback_executor + 將套件的成員加入 callback_executor 的 event_dict + Add package members to callback_executor's event_dict """ - self.add_package_to_target( - package=package, - target=self.callback_executor - ) + file_automation_logger.info(f"add_package_to_callback_executor, package: {package}") + self.add_package_to_target(package=package, target=self.callback_executor) def get_member(self, package, predicate, target): """ - :param package: package we want to get member - :param predicate: predicate - :param target: which event_dict will be added + 取得套件成員並加入目標 event_dict + Get members of a package and add them to target's event_dict + + :param package: 套件名稱 + :param predicate: 過濾條件 (isfunction, isbuiltin, isclass) + :param target: 目標 executor/callback_executor """ installed_package = self.check_package(package) if installed_package is not None and target is not None: for member in getmembers(installed_package, predicate): target.event_dict.update( - {str(package) + "_" + str(member[0]): member[1]}) + {f"{package}_{member[0]}": member[1]} + ) elif installed_package is None: - print(repr(ModuleNotFoundError(f"Can't find package {package}")), - file=stderr) + print(repr(ModuleNotFoundError(f"Can't find package {package}")), file=stderr) else: print(f"Executor error {self.executor}", file=stderr) def add_package_to_target(self, package, target): """ - :param package: package we want to get member - :param target: which event_dict will be added + 將套件的 function、builtin、class 成員加入指定 target + Add functions, builtins, and classes from a package to target + + :param package: 套件名稱 + :param target: 目標 executor/callback_executor """ try: - self.get_member( - package=package, - predicate=isfunction, - target=target - ) - self.get_member( - package=package, - predicate=isbuiltin, - target=target - ) - self.get_member( - package=package, - predicate=isfunction, - target=target - ) - self.get_member( - package=package, - predicate=isclass, - target=target - ) + self.get_member(package=package, predicate=isfunction, target=target) + self.get_member(package=package, predicate=isbuiltin, target=target) + self.get_member(package=package, predicate=isclass, target=target) except Exception as error: print(repr(error), file=stderr) -package_manager = PackageManager() +# 建立單例,供其他模組使用 +package_manager = PackageManager() \ No newline at end of file diff --git a/automation_file/utils/project/create_project_structure.py b/automation_file/utils/project/create_project_structure.py index c2287ac..95de3c9 100644 --- a/automation_file/utils/project/create_project_structure.py +++ b/automation_file/utils/project/create_project_structure.py @@ -4,65 +4,88 @@ from automation_file.utils.json.json_file import write_action_json from automation_file.utils.logging.loggin_instance import file_automation_logger -from automation_file.utils.project.template.template_executor import executor_template_1, \ - executor_template_2, bad_executor_template_1 -from automation_file.utils.project.template.template_keyword import template_keyword_1, \ - template_keyword_2, bad_template_1 +from automation_file.utils.project.template.template_executor import ( + executor_template_1, executor_template_2, bad_executor_template_1 +) +from automation_file.utils.project.template.template_keyword import ( + template_keyword_1, template_keyword_2, bad_template_1 +) def create_dir(dir_name: str) -> None: """ - :param dir_name: create dir use dir name + 建立資料夾 (若不存在則自動建立) + Create a directory (auto-create if not exists) + + :param dir_name: 資料夾名稱或路徑 :return: None """ - Path(dir_name).mkdir( - parents=True, - exist_ok=True - ) + Path(dir_name).mkdir(parents=True, exist_ok=True) def create_template(parent_name: str, project_path: str = None) -> None: + """ + 在專案目錄下建立 keyword JSON 與 executor Python 檔案 + Create keyword JSON files and executor Python files under project directory + + :param parent_name: 專案主資料夾名稱 + :param project_path: 專案路徑 (預設為當前工作目錄) + """ if project_path is None: project_path = getcwd() - keyword_dir_path = Path(project_path + "/" + parent_name + "/keyword") - executor_dir_path = Path(project_path + "/" + parent_name + "/executor") + + keyword_dir_path = Path(f"{project_path}/{parent_name}/keyword") + executor_dir_path = Path(f"{project_path}/{parent_name}/executor") + lock = Lock() + + # === 建立 keyword JSON 檔案 === if keyword_dir_path.exists() and keyword_dir_path.is_dir(): - write_action_json(project_path + "/" + parent_name + "/keyword/keyword1.json", template_keyword_1) - write_action_json(project_path + "/" + parent_name + "/keyword/keyword2.json", template_keyword_2) - write_action_json(project_path + "/" + parent_name + "/keyword/bad_keyword_1.json", bad_template_1) - if executor_dir_path.exists() and keyword_dir_path.is_dir(): - lock.acquire() - try: - with open(project_path + "/" + parent_name + "/executor/executor_one_file.py", "w+") as file: + write_action_json(str(keyword_dir_path / "keyword1.json"), template_keyword_1) + write_action_json(str(keyword_dir_path / "keyword2.json"), template_keyword_2) + write_action_json(str(keyword_dir_path / "bad_keyword_1.json"), bad_template_1) + + # === 建立 executor Python 檔案 === + if executor_dir_path.exists() and executor_dir_path.is_dir(): + with lock: + with open(executor_dir_path / "executor_one_file.py", "w+", encoding="utf-8") as file: file.write( executor_template_1.replace( - "{temp}", - project_path + "/" + parent_name + "/keyword/keyword1.json" + "{temp}", str(keyword_dir_path / "keyword1.json") ) ) - with open(project_path + "/" + parent_name + "/executor/executor_bad_file.py", "w+") as file: + with open(executor_dir_path / "executor_bad_file.py", "w+", encoding="utf-8") as file: file.write( bad_executor_template_1.replace( - "{temp}", - project_path + "/" + parent_name + "/keyword/bad_keyword_1.json" + "{temp}", str(keyword_dir_path / "bad_keyword_1.json") ) ) - with open(project_path + "/" + parent_name + "/executor/executor_folder.py", "w+") as file: + with open(executor_dir_path / "executor_folder.py", "w+", encoding="utf-8") as file: file.write( executor_template_2.replace( - "{temp}", - project_path + "/" + parent_name + "/keyword" + "{temp}", str(keyword_dir_path) ) ) - finally: - lock.release() def create_project_dir(project_path: str = None, parent_name: str = "FileAutomation") -> None: - file_automation_logger.info(f"create_project_dir, project_path: {project_path}, parent_name: {parent_name}") + """ + 建立專案目錄結構 (包含 keyword 與 executor 資料夾),並生成範例檔案 + Create project directory structure (with keyword and executor folders) and generate template files + + :param project_path: 專案路徑 (預設為當前工作目錄) + :param parent_name: 專案主資料夾名稱 (預設 "FileAutomation") + """ + file_automation_logger.info( + f"create_project_dir, project_path: {project_path}, parent_name: {parent_name}" + ) + if project_path is None: project_path = getcwd() - create_dir(project_path + "/" + parent_name + "/keyword") - create_dir(project_path + "/" + parent_name + "/executor") - create_template(parent_name) + + # 建立 keyword 與 executor 資料夾 + create_dir(f"{project_path}/{parent_name}/keyword") + create_dir(f"{project_path}/{parent_name}/executor") + + # 建立範例檔案 + create_template(parent_name, project_path) \ No newline at end of file diff --git a/automation_file/utils/socket_server/file_automation_socket_server.py b/automation_file/utils/socket_server/file_automation_socket_server.py index a9669ad..f58fb5c 100644 --- a/automation_file/utils/socket_server/file_automation_socket_server.py +++ b/automation_file/utils/socket_server/file_automation_socket_server.py @@ -7,24 +7,44 @@ class TCPServerHandler(socketserver.BaseRequestHandler): + """ + TCPServerHandler 負責處理每個 client 的請求 + TCPServerHandler handles each client request + """ def handle(self): + # 接收 client 傳來的資料 (最大 8192 bytes) + # Receive data from client command_string = str(self.request.recv(8192).strip(), encoding="utf-8") socket = self.request print("command is: " + command_string, flush=True) + + # 若收到 quit_server 指令,則關閉伺服器 + # Shutdown server if quit_server command received if command_string == "quit_server": self.server.shutdown() self.server.close_flag = True print("Now quit server", flush=True) else: try: + # 將接收到的 JSON 字串轉換為 Python 物件 + # Parse JSON string into Python object execute_str = json.loads(command_string) + + # 執行對應的動作,並將結果逐一回傳給 client + # Execute actions and send results back to client for execute_function, execute_return in execute_action(execute_str).items(): socket.sendto(str(execute_return).encode("utf-8"), self.client_address) socket.sendto("\n".encode("utf-8"), self.client_address) + + # 傳送結束標記,讓 client 知道資料已傳完 + # Send end marker to indicate data transmission is complete socket.sendto("Return_Data_Over_JE".encode("utf-8"), self.client_address) socket.sendto("\n".encode("utf-8"), self.client_address) + except Exception as error: + # 錯誤處理:將錯誤訊息輸出到 stderr 並回傳給 client + # Error handling: log to stderr and send back to client print(repr(error), file=sys.stderr) try: socket.sendto(str(error).encode("utf-8"), self.client_address) @@ -36,6 +56,10 @@ def handle(self): class TCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + """ + 自訂 TCPServer,支援多執行緒處理 + Custom TCPServer with threading support + """ def __init__(self, server_address, request_handler_class): super().__init__(server_address, request_handler_class) @@ -43,13 +67,30 @@ def __init__(self, server_address, request_handler_class): def start_autocontrol_socket_server(host: str = "localhost", port: int = 9943): + """ + 啟動自動控制 TCP Socket Server + Start the auto-control TCP Socket Server + + :param host: 主機位址 (預設 localhost) + Host address (default: localhost) + :param port: 監聽埠號 (預設 9943) + Port number (default: 9943) + :return: server instance + """ + # 支援從命令列參數指定 host 與 port + # Support overriding host and port from command line arguments if len(sys.argv) == 2: host = sys.argv[1] elif len(sys.argv) == 3: host = sys.argv[1] port = int(sys.argv[2]) + server = TCPServer((host, port), TCPServerHandler) + + # 使用背景執行緒啟動 server + # Start server in a background thread server_thread = threading.Thread(target=server.serve_forever) server_thread.daemon = True server_thread.start() - return server + + return server \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 0c1da04..b351cd6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,15 +5,15 @@ requires = ["setuptools>=61.0"] build-backend = "setuptools.build_meta" [project] -name = "automation_file" -version = "0.0.28" +name = "automation_file_dev" +version = "0.0.31" authors = [ { name = "JE-Chen", email = "zenmailman@gmail.com" }, ] description = "" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.10" -license-files = ["LICENSE"] +license = { text = "MIT" } dependencies = [ "google-api-python-client", "google-auth-httplib2", diff --git a/dev.toml b/stable.toml similarity index 92% rename from dev.toml rename to stable.toml index e66ed89..0c1da04 100644 --- a/dev.toml +++ b/stable.toml @@ -5,15 +5,15 @@ requires = ["setuptools>=61.0"] build-backend = "setuptools.build_meta" [project] -name = "automation_file_dev" -version = "0.0.30" +name = "automation_file" +version = "0.0.28" authors = [ { name = "JE-Chen", email = "zenmailman@gmail.com" }, ] description = "" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.10" -license = { text = "MIT" } +license-files = ["LICENSE"] dependencies = [ "google-api-python-client", "google-auth-httplib2", From 87e252450adb0be7930ef380953d040589208aab Mon Sep 17 00:00:00 2001 From: JE-Chen Date: Wed, 5 Nov 2025 01:39:24 +0800 Subject: [PATCH 4/4] Update stable version --- pyproject.toml => dev.toml | 0 stable.toml | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename pyproject.toml => dev.toml (100%) diff --git a/pyproject.toml b/dev.toml similarity index 100% rename from pyproject.toml rename to dev.toml diff --git a/stable.toml b/stable.toml index 0c1da04..5b97907 100644 --- a/stable.toml +++ b/stable.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" [project] name = "automation_file" -version = "0.0.28" +version = "0.0.29" authors = [ { name = "JE-Chen", email = "zenmailman@gmail.com" }, ]