diff --git a/src/lsf_mcp_server.egg-info/PKG-INFO b/src/lsf_mcp_server.egg-info/PKG-INFO deleted file mode 100644 index de7c285..0000000 --- a/src/lsf_mcp_server.egg-info/PKG-INFO +++ /dev/null @@ -1,462 +0,0 @@ -Metadata-Version: 2.4 -Name: lsf-mcp-server -Version: 0.1.0 -Summary: MCP server for IBM Spectrum LSF job management and file operations -Author: LSF MCP Server Contributors -Requires-Python: >=3.10 -Description-Content-Type: text/markdown -License-File: LICENSE -Requires-Dist: mcp>=1.0.0 -Requires-Dist: httpx>=0.27.0 -Requires-Dist: pydantic>=2.0.0 -Requires-Dist: anyio>=4.0.0 -Provides-Extra: dev -Requires-Dist: pytest>=8.0.0; extra == "dev" -Requires-Dist: pytest-asyncio>=0.23.0; extra == "dev" -Requires-Dist: pytest-httpx>=0.30.0; extra == "dev" -Dynamic: license-file - -# LSF MCP Server - -A Model Context Protocol (MCP) server for IBM Spectrum LSF (Load Sharing Facility) that provides comprehensive job management and file operation capabilities through the LSF REST API. - -## Features - -### Job Management (3 tools) -- **submit_job** - Submit jobs with simple or advanced options -- **query_jobs** - Query job status with flexible filtering -- **kill_job** - Terminate running or pending jobs - -### Cluster Information (6 tools) -- **list_hosts** - List cluster hosts with status and load -- **list_queues** - List available queues and their configuration -- **check_load** - Check system load on hosts -- **list_host_info** - Get detailed host information -- **get_cluster_id** - Get LSF cluster identifier and version -- **get_cluster_info** - Get comprehensive cluster information via API - -### File Operations (4 tools) -- **upload_file** - Upload files to LSF server -- **download_file** - Download files from LSF server -- **list_files** - List files in directories -- **delete_file** - Delete files on LSF server - -## Requirements - -- Python 3.10 or higher -- Access to an LSF REST API server -- LSF credentials (username and password) - -## Installation - -### 1. Clone or Download - -```bash -# Clone the repository or download and extract the source code -cd lsf-mcp-server -``` - -### 2. Create Virtual Environment - -```bash -# Use Python 3.10 or higher -python3 -m venv venv -source venv/bin/activate # On Windows: venv\Scripts\activate -``` - -### 3. Install Dependencies - -```bash -pip install -e . -``` - -## Configuration - -### MCP Settings File - -The MCP server is configured through IBM Bob's settings file. You need to manually edit this file to add your LSF credentials. - -**Settings File Location:** -``` -~/Library/Application Support/IBM Bob/User/globalStorage/ibm.bob-code/settings/mcp_settings.json -``` - -### Configuration Steps - -1. Open the MCP settings file in a text editor -2. Add the LSF MCP server configuration: - -```json -{ - "mcpServers": { - "lsf": { - "command": "/absolute/path/to/lsf-mcp-server/venv/bin/python", - "args": ["-m", "lsf_mcp_server.server"], - "env": { - "LSF_SERVER_URL": "http://party1.dev.fyre.ibm.com:8088", - "LSF_USERNAME": "your-lsf-username", - "LSF_PASSWORD": "your-lsf-password" - } - } - } -} -``` - -3. Replace the placeholder values: - - `command`: **Use the absolute path to your venv Python binary** (e.g., `/Users/username/lsf-mcp-server/venv/bin/python`) - - `LSF_SERVER_URL`: Your LSF REST API server URL - - `LSF_USERNAME`: Your LSF username - - `LSF_PASSWORD`: Your LSF password - -4. Save the file and restart IBM Bob - -**Important:** The `command` field must point to the Python binary inside your virtual environment. This ensures Bob uses the correct Python with all required dependencies installed. Do not use just `"python"` as it may not find the installed packages. - -## Usage - -Once configured, you can interact with the LSF MCP server through IBM Bob using natural language. Here are some examples: - -### Job Management - -**Submit a simple job:** -``` -Submit a job to run 'hostname' on the LSF cluster -``` - -**Submit a job with specific resources:** -``` -Submit a job named 'my-analysis' to run 'python analyze.py' on queue 'normal' with 4 processors and 8GB memory -``` - -**Submit a job with advanced options:** -``` -Submit a job with advanced options: -J myjob -q normal -n 8 -R "span[ptile=4]" -W 2:00 ./run.sh -``` - -**Query all jobs:** -``` -Show me all my LSF jobs -``` - -**Query a specific job:** -``` -What's the status of job 12345? -``` - -**Query jobs in a queue:** -``` -Show me all jobs in the 'normal' queue -``` - -**Kill a job:** -``` -Kill job 12345 -``` - -### Cluster Information - -**List all hosts:** -``` -List all hosts in the LSF cluster -``` - -**Check specific host:** -``` -Show me information about host node01 -``` - -**List queues:** -``` -What queues are available? -``` - -**Check system load:** -``` -What's the current load on the cluster? -``` - -**Get cluster ID:** -``` -What's the LSF cluster ID? -``` - -### File Operations - -**Upload a file:** -``` -Upload mydata.txt to /home/user/data/ on the LSF server -``` - -**Download a file:** -``` -Download /home/user/results.txt from the LSF server -``` - -**List files:** -``` -List files in /home/user/jobs/ on the LSF server -``` - -**Delete a file:** -``` -Delete /home/user/temp/old_file.txt on the LSF server -``` - -## Tool Reference - -### submit_job - -Submit a job to the LSF cluster. - -**Parameters:** -- `command` (required): Command to execute -- `job_name` (optional): Job name -- `queue` (optional): Queue name -- `num_processors` (optional): Number of processors -- `memory_mb` (optional): Memory in MB -- `wall_time` (optional): Wall time limit (HH:MM format) -- `output_file` (optional): Standard output file path -- `error_file` (optional): Standard error file path -- `working_directory` (optional): Working directory -- `advanced_options` (optional): Full LSF options string for advanced control - -**Example Response:** -```json -{ - "success": true, - "command": "bsub -J myjob hostname", - "result": { - "jobId": "12345", - "status": "submitted" - } -} -``` - -### query_jobs - -Query job status and information. - -**Parameters:** -- `job_id` (optional): Specific job ID to query -- `user` (optional): Filter by username -- `queue` (optional): Filter by queue name -- `status` (optional): Filter by job status - -**Example Response:** -```json -{ - "success": true, - "result": { - "jobs": [ - { - "jobid": "12345", - "stat": "RUN", - "queue": "normal", - "user": "username", - "job_name": "myjob" - } - ] - } -} -``` - -### kill_job - -Kill a running or pending job. - -**Parameters:** -- `job_id` (required): Job ID to kill -- `force` (optional): Force kill (default: false) - -### list_hosts - -List LSF cluster hosts. - -**Parameters:** -- `host_name` (optional): Specific host to query - -### list_queues - -List available LSF queues. - -**Parameters:** -- `queue_name` (optional): Specific queue to query - -### check_load - -Check system load on hosts. - -**Parameters:** -- `host_name` (optional): Specific host to check - -### list_host_info - -Get detailed host information. - -**Parameters:** -- `host_name` (optional): Specific host to query - -### get_cluster_id - -Get LSF cluster identifier and version. - -**Parameters:** None - -### get_cluster_info - -Get comprehensive cluster information via API. - -**Parameters:** None - -### upload_file - -Upload a file to the LSF server. - -**Parameters:** -- `local_path` (required): Path to local file -- `remote_path` (required): Destination path on LSF server - -### download_file - -Download a file from the LSF server. - -**Parameters:** -- `remote_path` (required): Path on LSF server -- `local_path` (optional): Local destination (returns content if not provided) - -### list_files - -List files in a directory. - -**Parameters:** -- `path` (required): Directory path to list - -### delete_file - -Delete a file on the LSF server. - -**Parameters:** -- `file_path` (required): Path to file to delete - -## Architecture - -The LSF MCP server runs locally on your machine and communicates with: -1. **IBM Bob** via stdio (standard input/output) -2. **LSF REST API** via HTTP - -``` -┌─────────────────────────────────────────────────────────────┐ -│ Your Local Machine │ -│ │ -│ ┌────────────────┐ stdio ┌──────────────┐ │ -│ │ IBM Bob │◄──────────────────────►│ LSF MCP │ │ -│ │ (AI Client) │ (stdin/stdout) │ Server │ │ -│ └────────────────┘ └──────┬───────┘ │ -│ │ │ -└────────────────────────────────────────────────────┼─────────┘ - │ - │ HTTP - ▼ - ┌────────────────────────────┐ - │ LSF REST API Server │ - │ party1.dev.fyre.ibm.com │ - │ :8088 │ - └────────────────────────────┘ -``` - -## Security - -- **Credentials**: Stored in environment variables (not in code) -- **Session Management**: Automatic login/logout with token storage -- **Local Execution**: All code runs on your machine with your permissions -- **No Cloud Storage**: Credentials never leave your machine - -## Troubleshooting - -### Server Won't Start - -**Problem:** MCP server fails to start - -**Solutions:** -1. Check that all environment variables are set correctly in `mcp_settings.json` -2. Verify Python 3.10+ is being used -3. Ensure the virtual environment is activated -4. Check the LSF server URL is accessible - -### Authentication Fails - -**Problem:** Cannot authenticate with LSF server - -**Solutions:** -1. Verify your LSF username and password are correct -2. Check that the LSF REST API server is running -3. Ensure you have network access to the LSF server -4. Check the server logs for detailed error messages - -### Tool Execution Fails - -**Problem:** Tools return errors - -**Solutions:** -1. Check that you're authenticated (server handles this automatically) -2. Verify the LSF command syntax is correct -3. Ensure you have permissions for the requested operation -4. Check the LSF server logs for more details - -### View Server Logs - -Server logs are written to stderr and can be viewed in IBM Bob's output panel or terminal. - -## Development - -### Project Structure - -``` -lsf-mcp-server/ -├── src/ -│ └── lsf_mcp_server/ -│ ├── __init__.py -│ ├── server.py # Main MCP server -│ ├── lsf_client.py # LSF REST API client -│ ├── auth.py # Authentication manager -│ ├── models.py # Pydantic models -│ └── tools/ -│ ├── __init__.py -│ ├── jobs.py # Job management tools -│ ├── cluster.py # Cluster information tools -│ └── files.py # File operation tools -├── pyproject.toml # Project configuration -├── README.md # This file -└── LICENSE -``` - -### Running Tests - -```bash -# Install dev dependencies -pip install -e ".[dev]" - -# Run tests (when implemented) -pytest -``` - -## License - -See LICENSE file for details. - -## Support - -For issues, questions, or contributions, please refer to the project repository. - -## Version - -Current version: 0.1.0 - -## Changelog - -### 0.1.0 (2026-02-25) -- Initial release -- 13 tools for LSF job management, cluster information, and file operations -- Support for LSF REST API authentication -- Comprehensive error handling -- Full MCP protocol implementation diff --git a/src/lsf_mcp_server.egg-info/SOURCES.txt b/src/lsf_mcp_server.egg-info/SOURCES.txt deleted file mode 100644 index 627c914..0000000 --- a/src/lsf_mcp_server.egg-info/SOURCES.txt +++ /dev/null @@ -1,18 +0,0 @@ -LICENSE -README.md -pyproject.toml -src/lsf_mcp_server/__init__.py -src/lsf_mcp_server/auth.py -src/lsf_mcp_server/lsf_client.py -src/lsf_mcp_server/models.py -src/lsf_mcp_server/server.py -src/lsf_mcp_server.egg-info/PKG-INFO -src/lsf_mcp_server.egg-info/SOURCES.txt -src/lsf_mcp_server.egg-info/dependency_links.txt -src/lsf_mcp_server.egg-info/entry_points.txt -src/lsf_mcp_server.egg-info/requires.txt -src/lsf_mcp_server.egg-info/top_level.txt -src/lsf_mcp_server/tools/__init__.py -src/lsf_mcp_server/tools/cluster.py -src/lsf_mcp_server/tools/files.py -src/lsf_mcp_server/tools/jobs.py \ No newline at end of file diff --git a/src/lsf_mcp_server.egg-info/dependency_links.txt b/src/lsf_mcp_server.egg-info/dependency_links.txt deleted file mode 100644 index 8b13789..0000000 --- a/src/lsf_mcp_server.egg-info/dependency_links.txt +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/lsf_mcp_server.egg-info/entry_points.txt b/src/lsf_mcp_server.egg-info/entry_points.txt deleted file mode 100644 index a64999a..0000000 --- a/src/lsf_mcp_server.egg-info/entry_points.txt +++ /dev/null @@ -1,2 +0,0 @@ -[console_scripts] -lsf-mcp-server = lsf_mcp_server.server:main diff --git a/src/lsf_mcp_server.egg-info/requires.txt b/src/lsf_mcp_server.egg-info/requires.txt deleted file mode 100644 index 9c7748c..0000000 --- a/src/lsf_mcp_server.egg-info/requires.txt +++ /dev/null @@ -1,9 +0,0 @@ -mcp>=1.0.0 -httpx>=0.27.0 -pydantic>=2.0.0 -anyio>=4.0.0 - -[dev] -pytest>=8.0.0 -pytest-asyncio>=0.23.0 -pytest-httpx>=0.30.0 diff --git a/src/lsf_mcp_server.egg-info/top_level.txt b/src/lsf_mcp_server.egg-info/top_level.txt deleted file mode 100644 index de1e027..0000000 --- a/src/lsf_mcp_server.egg-info/top_level.txt +++ /dev/null @@ -1 +0,0 @@ -lsf_mcp_server diff --git a/src/lsf_mcp_server/__init__.py b/src/lsf_mcp_server/__init__.py index d4571b6..6ac1e5d 100644 --- a/src/lsf_mcp_server/__init__.py +++ b/src/lsf_mcp_server/__init__.py @@ -20,4 +20,4 @@ # when running with python -m lsf_mcp_server.server # Users should import directly: from lsf_mcp_server.server import LSFMCPServer, main -__all__ = ['__version__'] \ No newline at end of file +__all__ = ["__version__"] diff --git a/src/lsf_mcp_server/auth.py b/src/lsf_mcp_server/auth.py index e154e17..8509a31 100644 --- a/src/lsf_mcp_server/auth.py +++ b/src/lsf_mcp_server/auth.py @@ -16,19 +16,19 @@ import logging from typing import Dict, Optional -from .lsf_client import LSFClient +from .lsf_client import LSFClient logger = logging.getLogger(__name__) class AuthManager: """Manages authentication with LSF REST API.""" - + def __init__(self, client: LSFClient, username: str, password: str): """ Initialize authentication manager. - + Args: client: LSF API client username: LSF username @@ -38,88 +38,88 @@ def __init__(self, client: LSFClient, username: str, password: str): self.username = username self.password = password self.session_info: Optional[Dict] = None - + async def login(self) -> Dict: """ Authenticate with LSF REST API. - + Returns: Session information from the API - + Raises: Exception: If authentication fails """ - logger.info(f"Logging in as user: {self.username}") - + logger.info("Logging in as user: %s", self.username) + try: response = await self.client.post( - '/lsf/v1/auth/logon', + "/lsf/v1/auth/logon", json={ - 'name': self.username, - 'originalName': self.username, - 'pass': self.password - } + "name": self.username, + "originalName": self.username, + "pass": self.password, + }, ) - + session_data = response.json() - + # Extract session token from response # The token is typically in the Set-Cookie header or response body - if 'token' in session_data: - token = session_data['token'] + if "token" in session_data: + token = session_data["token"] else: # Try to extract from cookies cookies = response.cookies - if 'LSF_SESSION' in cookies: - token = cookies['LSF_SESSION'] + if "LSF_SESSION" in cookies: + token = cookies["LSF_SESSION"] else: raise Exception("No session token found in response") - + self.client.set_session_token(token) self.session_info = session_data - + logger.info("Successfully authenticated with LSF API") return session_data - + except Exception as e: - logger.error(f"Authentication failed: {str(e)}") + logger.error("Authentication failed: %s", str(e)) raise Exception(f"Failed to authenticate with LSF API: {str(e)}") - + async def logout(self): """ Log out from LSF REST API. - + Raises: Exception: If logout fails """ if not self.session_info: logger.warning("No active session to logout") return - + logger.info("Logging out from LSF API") - + try: - await self.client.post('/lsf/v1/auth/logout') + await self.client.post("/lsf/v1/auth/logout") self.client.clear_session_token() self.session_info = None logger.info("Successfully logged out") - + except Exception as e: - logger.error(f"Logout failed: {str(e)}") + logger.error("Logout failed: %s", str(e)) # Clear session anyway self.client.clear_session_token() self.session_info = None - + async def ensure_authenticated(self): """ Ensure we have a valid session, re-authenticate if needed. - + This method can be called before making API requests to ensure the session is still valid. """ if not self.session_info: await self.login() - + def is_authenticated(self) -> bool: """Check if currently authenticated.""" - return self.session_info is not None \ No newline at end of file + return self.session_info is not None diff --git a/src/lsf_mcp_server/lsf_client.py b/src/lsf_mcp_server/lsf_client.py index ede5b9a..da4c0e0 100644 --- a/src/lsf_mcp_server/lsf_client.py +++ b/src/lsf_mcp_server/lsf_client.py @@ -16,144 +16,138 @@ import base64 import logging -from typing import Any, Dict, Optional -import httpx +from typing import Dict, Optional +import httpx logger = logging.getLogger(__name__) class LSFClient: """HTTP client for LSF REST API.""" - + def __init__(self, base_url: str, timeout: float = 30.0): """ Initialize LSF API client. - + Args: base_url: Base URL of the LSF REST API (e.g., http://host:8088) timeout: Request timeout in seconds """ - self.base_url = base_url.rstrip('/') + self.base_url = base_url.rstrip("/") self.timeout = timeout self.session_token: Optional[str] = None self._client = httpx.AsyncClient(timeout=timeout) - + async def close(self): """Close the HTTP client.""" await self._client.aclose() - + def set_session_token(self, token: str): """Set the session token for authenticated requests.""" self.session_token = token - + def clear_session_token(self): """Clear the session token.""" self.session_token = None - - def _get_headers(self, additional_headers: Optional[Dict[str, str]] = None) -> Dict[str, str]: + + def _get_headers( + self, additional_headers: Optional[Dict[str, str]] = None + ) -> Dict[str, str]: """ Get headers for requests, including session token if available. - + Args: additional_headers: Additional headers to include - + Returns: Dictionary of headers """ headers = {} - + if self.session_token: - headers['Authorization'] = self.session_token - + headers["Authorization"] = self.session_token + if additional_headers: headers.update(additional_headers) - + return headers - - async def request( - self, - method: str, - endpoint: str, - **kwargs - ) -> httpx.Response: + + async def request(self, method: str, endpoint: str, **kwargs) -> httpx.Response: """ Make an HTTP request to the LSF API. - + Args: method: HTTP method (GET, POST, DELETE, etc.) endpoint: API endpoint (e.g., /v1/cluster) **kwargs: Additional arguments to pass to httpx - + Returns: HTTP response - + Raises: httpx.HTTPError: If the request fails """ url = f"{self.base_url}{endpoint}" - + # Merge headers - headers = self._get_headers(kwargs.pop('headers', None)) - - logger.debug(f"{method} {url}") - + headers = self._get_headers(kwargs.pop("headers", None)) + + logger.debug("%s %s", method, url) + try: response = await self._client.request( - method=method, - url=url, - headers=headers, - **kwargs + method=method, url=url, headers=headers, **kwargs ) - - logger.debug(f"Response status: {response.status_code}") - + + logger.debug("Response status: %s", response.status_code) + # Raise for 4xx and 5xx status codes response.raise_for_status() - + return response - + except httpx.HTTPStatusError as e: - logger.error(f"HTTP error: {e.response.status_code} - {e.response.text}") + logger.error("HTTP error: %s - %s", e.response.status_code, e.response.text) raise except httpx.RequestError as e: - logger.error(f"Request error: {str(e)}") + logger.error("Request error: %s", str(e)) raise - + async def get(self, endpoint: str, **kwargs) -> httpx.Response: """Make a GET request.""" - return await self.request('GET', endpoint, **kwargs) - + return await self.request("GET", endpoint, **kwargs) + async def post(self, endpoint: str, **kwargs) -> httpx.Response: """Make a POST request.""" - return await self.request('POST', endpoint, **kwargs) - + return await self.request("POST", endpoint, **kwargs) + async def delete(self, endpoint: str, **kwargs) -> httpx.Response: """Make a DELETE request.""" - return await self.request('DELETE', endpoint, **kwargs) - + return await self.request("DELETE", endpoint, **kwargs) + @staticmethod def encode_path(path: str) -> str: """ Encode a file path to base64 for use in API endpoints. - + Args: path: File path to encode - + Returns: Base64 encoded path """ return base64.b64encode(path.encode()).decode() - + @staticmethod def decode_path(encoded_path: str) -> str: """ Decode a base64 encoded file path. - + Args: encoded_path: Base64 encoded path - + Returns: Decoded file path """ - return base64.b64decode(encoded_path.encode()).decode() \ No newline at end of file + return base64.b64decode(encoded_path.encode()).decode() diff --git a/src/lsf_mcp_server/models.py b/src/lsf_mcp_server/models.py index fb65a2f..8cd1883 100644 --- a/src/lsf_mcp_server/models.py +++ b/src/lsf_mcp_server/models.py @@ -14,26 +14,33 @@ """Pydantic models for request/response validation.""" -from typing import Optional, List +from typing import Optional + from pydantic import BaseModel, Field class JobSubmitRequest(BaseModel): """Request model for job submission.""" + command: str = Field(..., description="Command to execute") job_name: Optional[str] = Field(None, description="Job name") queue: Optional[str] = Field(None, description="Queue name") - num_processors: Optional[int] = Field(None, description="Number of processors", ge=1) + num_processors: Optional[int] = Field( + None, description="Number of processors", ge=1 + ) memory_mb: Optional[int] = Field(None, description="Memory in MB", ge=1) wall_time: Optional[str] = Field(None, description="Wall time limit (HH:MM format)") output_file: Optional[str] = Field(None, description="Standard output file path") error_file: Optional[str] = Field(None, description="Standard error file path") working_directory: Optional[str] = Field(None, description="Working directory") - advanced_options: Optional[str] = Field(None, description="Advanced LSF options string") + advanced_options: Optional[str] = Field( + None, description="Advanced LSF options string" + ) class JobQueryRequest(BaseModel): """Request model for job query.""" + job_id: Optional[str] = Field(None, description="Specific job ID to query") user: Optional[str] = Field(None, description="Filter by username") queue: Optional[str] = Field(None, description="Filter by queue name") @@ -42,53 +49,65 @@ class JobQueryRequest(BaseModel): class JobKillRequest(BaseModel): """Request model for killing a job.""" + job_id: str = Field(..., description="Job ID to kill") force: bool = Field(False, description="Force kill the job") class FileUploadRequest(BaseModel): """Request model for file upload.""" + local_path: str = Field(..., description="Local file path to upload") remote_path: str = Field(..., description="Remote destination path on LSF server") class FileDownloadRequest(BaseModel): """Request model for file download.""" + remote_path: str = Field(..., description="Remote file path on LSF server") - local_path: Optional[str] = Field(None, description="Local destination path (optional)") + local_path: Optional[str] = Field( + None, description="Local destination path (optional)" + ) class FileListRequest(BaseModel): """Request model for listing files.""" + path: str = Field(..., description="Directory path to list") class FileDeleteRequest(BaseModel): """Request model for file deletion.""" + file_path: str = Field(..., description="File path to delete") class LSFCommandRequest(BaseModel): """Request model for executing LSF commands.""" + command: str = Field(..., description="LSF command to execute") parse_json: bool = Field(True, description="Whether to parse JSON output") class HostListRequest(BaseModel): """Request model for listing hosts.""" + host_name: Optional[str] = Field(None, description="Specific host name to query") class QueueListRequest(BaseModel): """Request model for listing queues.""" + queue_name: Optional[str] = Field(None, description="Specific queue name to query") class LoadCheckRequest(BaseModel): """Request model for checking system load.""" + host_name: Optional[str] = Field(None, description="Specific host to check") class HostInfoRequest(BaseModel): """Request model for host information.""" - host_name: Optional[str] = Field(None, description="Specific host name to query") \ No newline at end of file + + host_name: Optional[str] = Field(None, description="Specific host name to query") diff --git a/src/lsf_mcp_server/server.py b/src/lsf_mcp_server/server.py index 8104a3e..33cc8e4 100644 --- a/src/lsf_mcp_server/server.py +++ b/src/lsf_mcp_server/server.py @@ -15,6 +15,7 @@ """Main MCP server implementation for LSF.""" import asyncio +import json import logging import os import sys @@ -22,55 +23,56 @@ from mcp.server import Server from mcp.server.stdio import stdio_server -from mcp.types import Tool, TextContent +from mcp.types import TextContent, Tool -from .lsf_client import LSFClient from .auth import AuthManager -from .tools import JobTools, ClusterTools, FileTools - +from .lsf_client import LSFClient +from .tools import ClusterTools, FileTools, JobTools # Configure logging logging.basicConfig( level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - stream=sys.stderr + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + stream=sys.stderr, ) logger = logging.getLogger(__name__) class LSFMCPServer: """MCP Server for LSF operations.""" - + def __init__(self): """Initialize the LSF MCP server.""" self.server = Server("lsf-mcp-server") - + # Get configuration from environment - self.lsf_url = os.getenv('LSF_SERVER_URL') - self.lsf_username = os.getenv('LSF_USERNAME') - self.lsf_password = os.getenv('LSF_PASSWORD') - + self.lsf_url = os.getenv("LSF_SERVER_URL") + self.lsf_username = os.getenv("LSF_USERNAME") + self.lsf_password = os.getenv("LSF_PASSWORD") + if not all([self.lsf_url, self.lsf_username, self.lsf_password]): raise ValueError( "Missing required environment variables: " "LSF_SERVER_URL, LSF_USERNAME, LSF_PASSWORD" ) - + # Initialize LSF client and auth - self.client = LSFClient(self.lsf_url) - self.auth = AuthManager(self.client, self.lsf_username, self.lsf_password) - + self.client = LSFClient(str(self.lsf_url)) + self.auth = AuthManager( + self.client, str(self.lsf_username), str(self.lsf_password) + ) + # Initialize tool handlers self.job_tools = JobTools(self.client, self.auth) self.cluster_tools = ClusterTools(self.client, self.auth) self.file_tools = FileTools(self.client, self.auth) - + # Register handlers self._register_handlers() - + def _register_handlers(self): """Register MCP server handlers.""" - + @self.server.list_tools() async def list_tools() -> list[Tool]: """List available tools.""" @@ -83,47 +85,47 @@ async def list_tools() -> list[Tool]: "properties": { "command": { "type": "string", - "description": "Command to execute" + "description": "Command to execute", }, "job_name": { "type": "string", - "description": "Job name (optional)" + "description": "Job name (optional)", }, "queue": { "type": "string", - "description": "Queue name (optional)" + "description": "Queue name (optional)", }, "num_processors": { "type": "integer", - "description": "Number of processors (optional)" + "description": "Number of processors (optional)", }, "memory_mb": { "type": "integer", - "description": "Memory in MB (optional)" + "description": "Memory in MB (optional)", }, "wall_time": { "type": "string", - "description": "Wall time limit in HH:MM format (optional)" + "description": "Wall time limit in HH:MM format (optional)", }, "output_file": { "type": "string", - "description": "Standard output file path (optional)" + "description": "Standard output file path (optional)", }, "error_file": { "type": "string", - "description": "Standard error file path (optional)" + "description": "Standard error file path (optional)", }, "working_directory": { "type": "string", - "description": "Working directory (optional)" + "description": "Working directory (optional)", }, "advanced_options": { "type": "string", - "description": "Advanced LSF options string for full control (optional)" - } + "description": "Advanced LSF options string for full control (optional)", + }, }, - "required": ["command"] - } + "required": ["command"], + }, ), Tool( name="query_jobs", @@ -133,22 +135,22 @@ async def list_tools() -> list[Tool]: "properties": { "job_id": { "type": "string", - "description": "Specific job ID to query (optional)" + "description": "Specific job ID to query (optional)", }, "user": { "type": "string", - "description": "Filter by username (optional)" + "description": "Filter by username (optional)", }, "queue": { "type": "string", - "description": "Filter by queue name (optional)" + "description": "Filter by queue name (optional)", }, "status": { "type": "string", - "description": "Filter by job status (optional)" - } - } - } + "description": "Filter by job status (optional)", + }, + }, + }, ), Tool( name="kill_job", @@ -158,15 +160,15 @@ async def list_tools() -> list[Tool]: "properties": { "job_id": { "type": "string", - "description": "Job ID to kill" + "description": "Job ID to kill", }, "force": { "type": "boolean", - "description": "Force kill the job (optional, default: false)" - } + "description": "Force kill the job (optional, default: false)", + }, }, - "required": ["job_id"] - } + "required": ["job_id"], + }, ), Tool( name="list_hosts", @@ -176,10 +178,10 @@ async def list_tools() -> list[Tool]: "properties": { "host_name": { "type": "string", - "description": "Specific host name to query (optional)" + "description": "Specific host name to query (optional)", } - } - } + }, + }, ), Tool( name="list_queues", @@ -189,10 +191,10 @@ async def list_tools() -> list[Tool]: "properties": { "queue_name": { "type": "string", - "description": "Specific queue name to query (optional)" + "description": "Specific queue name to query (optional)", } - } - } + }, + }, ), Tool( name="check_load", @@ -202,10 +204,10 @@ async def list_tools() -> list[Tool]: "properties": { "host_name": { "type": "string", - "description": "Specific host to check (optional)" + "description": "Specific host to check (optional)", } - } - } + }, + }, ), Tool( name="list_host_info", @@ -215,26 +217,20 @@ async def list_tools() -> list[Tool]: "properties": { "host_name": { "type": "string", - "description": "Specific host name to query (optional)" + "description": "Specific host name to query (optional)", } - } - } + }, + }, ), Tool( name="get_cluster_id", description="Get LSF cluster identifier and version information.", - inputSchema={ - "type": "object", - "properties": {} - } + inputSchema={"type": "object", "properties": {}}, ), Tool( name="get_cluster_info", description="Get comprehensive LSF cluster information via API.", - inputSchema={ - "type": "object", - "properties": {} - } + inputSchema={"type": "object", "properties": {}}, ), Tool( name="upload_file", @@ -244,15 +240,15 @@ async def list_tools() -> list[Tool]: "properties": { "local_path": { "type": "string", - "description": "Path to local file to upload" + "description": "Path to local file to upload", }, "remote_path": { "type": "string", - "description": "Destination path on LSF server" - } + "description": "Destination path on LSF server", + }, }, - "required": ["local_path", "remote_path"] - } + "required": ["local_path", "remote_path"], + }, ), Tool( name="download_file", @@ -262,15 +258,15 @@ async def list_tools() -> list[Tool]: "properties": { "remote_path": { "type": "string", - "description": "Path on LSF server" + "description": "Path on LSF server", }, "local_path": { "type": "string", - "description": "Local destination path (optional, returns content if not provided)" - } + "description": "Local destination path (optional, returns content if not provided)", + }, }, - "required": ["remote_path"] - } + "required": ["remote_path"], + }, ), Tool( name="list_files", @@ -280,11 +276,11 @@ async def list_tools() -> list[Tool]: "properties": { "path": { "type": "string", - "description": "Directory path to list" + "description": "Directory path to list", } }, - "required": ["path"] - } + "required": ["path"], + }, ), Tool( name="delete_file", @@ -294,86 +290,85 @@ async def list_tools() -> list[Tool]: "properties": { "file_path": { "type": "string", - "description": "Path to file to delete" + "description": "Path to file to delete", } }, - "required": ["file_path"] - } - ) + "required": ["file_path"], + }, + ), ] - + @self.server.call_tool() async def call_tool(name: str, arguments: Any) -> Sequence[TextContent]: """Handle tool calls.""" try: # Route to appropriate tool handler - if name == "submit_job": - result = await self.job_tools.submit_job(**arguments) - elif name == "query_jobs": - result = await self.job_tools.query_jobs(**arguments) - elif name == "kill_job": - result = await self.job_tools.kill_job(**arguments) - elif name == "list_hosts": - result = await self.cluster_tools.list_hosts(**arguments) - elif name == "list_queues": - result = await self.cluster_tools.list_queues(**arguments) - elif name == "check_load": - result = await self.cluster_tools.check_load(**arguments) - elif name == "list_host_info": - result = await self.cluster_tools.list_host_info(**arguments) - elif name == "get_cluster_id": - result = await self.cluster_tools.get_cluster_id() - elif name == "get_cluster_info": - result = await self.cluster_tools.get_cluster_info() - elif name == "upload_file": - result = await self.file_tools.upload_file(**arguments) - elif name == "download_file": - result = await self.file_tools.download_file(**arguments) - elif name == "list_files": - result = await self.file_tools.list_files(**arguments) - elif name == "delete_file": - result = await self.file_tools.delete_file(**arguments) - else: - raise ValueError(f"Unknown tool: {name}") - + match name: + case "submit_job": + result = await self.job_tools.submit_job(**arguments) + case "query_jobs": + result = await self.job_tools.query_jobs(**arguments) + case "kill_job": + result = await self.job_tools.kill_job(**arguments) + case "list_hosts": + result = await self.cluster_tools.list_hosts(**arguments) + case "list_queues": + result = await self.cluster_tools.list_queues(**arguments) + case "check_load": + result = await self.cluster_tools.check_load(**arguments) + case "list_host_info": + result = await self.cluster_tools.list_host_info(**arguments) + case "get_cluster_id": + result = await self.cluster_tools.get_cluster_id() + case "get_cluster_info": + result = await self.cluster_tools.get_cluster_info() + case "upload_file": + result = await self.file_tools.upload_file(**arguments) + case "download_file": + result = await self.file_tools.download_file(**arguments) + case "list_files": + result = await self.file_tools.list_files(**arguments) + case "delete_file": + result = await self.file_tools.delete_file(**arguments) + case _: + raise ValueError(f"Unknown tool: {name}") + # Format result as JSON string - import json result_text = json.dumps(result, indent=2) - + return [TextContent(type="text", text=result_text)] - + except Exception as e: - logger.error(f"Error executing tool {name}: {str(e)}") - import json - error_result = { - "success": False, - "error": str(e), - "tool": name - } - return [TextContent(type="text", text=json.dumps(error_result, indent=2))] - + logger.error("Error executing tool %s: %s", name, str(e)) + error_result = {"success": False, "error": str(e), "tool": name} + return [ + TextContent(type="text", text=json.dumps(error_result, indent=2)) + ] + async def run(self): """Run the MCP server.""" logger.info("Starting LSF MCP Server") - logger.info(f"LSF Server URL: {self.lsf_url}") - logger.info(f"LSF Username: {self.lsf_username}") - + logger.info("LSF Server URL: %s", self.lsf_url) + logger.info("LSF Username: %s", self.lsf_username) + try: # Don't authenticate immediately - let ensure_authenticated() handle it # This prevents the server from crashing if LSF is temporarily unavailable - logger.info("LSF MCP Server initialized (authentication will occur on first request)") - + logger.info( + "LSF MCP Server initialized (authentication will occur on first request)" + ) + # Run the server async with stdio_server() as (read_stream, write_stream): logger.info("LSF MCP Server is ready") await self.server.run( read_stream, write_stream, - self.server.create_initialization_options() + self.server.create_initialization_options(), ) - + except Exception as e: - logger.error(f"Server error: {str(e)}") + logger.error("Server error: %s", str(e)) raise finally: # Cleanup @@ -383,7 +378,7 @@ async def run(self): await self.client.close() logger.info("LSF MCP Server shutdown complete") except Exception as e: - logger.error(f"Error during cleanup: {str(e)}") + logger.error("Error during cleanup: %s", str(e)) def main(): @@ -394,9 +389,9 @@ def main(): except KeyboardInterrupt: logger.info("Server interrupted by user") except Exception as e: - logger.error(f"Fatal error: {str(e)}") + logger.error("Fatal error: %s", str(e)) sys.exit(1) if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/lsf_mcp_server/tools/__init__.py b/src/lsf_mcp_server/tools/__init__.py index 944f164..58b5ef1 100644 --- a/src/lsf_mcp_server/tools/__init__.py +++ b/src/lsf_mcp_server/tools/__init__.py @@ -14,8 +14,8 @@ """LSF MCP Server tools package.""" -from .jobs import JobTools from .cluster import ClusterTools from .files import FileTools +from .jobs import JobTools -__all__ = ['JobTools', 'ClusterTools', 'FileTools'] \ No newline at end of file +__all__ = ["JobTools", "ClusterTools", "FileTools"] diff --git a/src/lsf_mcp_server/tools/cluster.py b/src/lsf_mcp_server/tools/cluster.py index 7d3c254..2e1bc39 100644 --- a/src/lsf_mcp_server/tools/cluster.py +++ b/src/lsf_mcp_server/tools/cluster.py @@ -15,165 +15,151 @@ """Cluster information tools for LSF.""" import logging -from typing import Dict, Any, Optional -from ..lsf_client import LSFClient -from ..auth import AuthManager +from typing import Any, Dict, Optional +from ..auth import AuthManager +from ..lsf_client import LSFClient logger = logging.getLogger(__name__) class ClusterTools: """Tools for LSF cluster information and monitoring.""" - + def __init__(self, client: LSFClient, auth: AuthManager): """ Initialize cluster tools. - + Args: client: LSF API client auth: Authentication manager """ self.client = client self.auth = auth - + async def _execute_lsf_command(self, command: str) -> Dict[str, Any]: """ Execute an LSF command via the API. - + Args: command: LSF command to execute - + Returns: Command execution result """ await self.auth.ensure_authenticated() - - logger.info(f"Executing LSF command: {command}") - + + logger.info("Executing LSF command: %s", command) + try: response = await self.client.post( - '/lsf/v1/cluster/usercmd', - data={'command': command, 'env': ''}, - headers={'Content-Type': 'application/x-www-form-urlencoded'} + "/lsf/v1/cluster/usercmd", + data={"command": command, "env": ""}, + headers={"Content-Type": "application/x-www-form-urlencoded"}, ) - + result = response.json() - logger.info(f"Command executed successfully") - - return { - 'success': True, - 'command': command, - 'result': result - } - + logger.info("Command executed successfully") + + return {"success": True, "command": command, "result": result} + except Exception as e: - logger.error(f"Failed to execute command: {str(e)}") - return { - 'success': False, - 'error': str(e), - 'command': command - } - + logger.error("Failed to execute command: %s", str(e)) + return {"success": False, "error": str(e), "command": command} + async def list_hosts(self, host_name: Optional[str] = None) -> Dict[str, Any]: """ List LSF cluster hosts using bhosts command. - + Args: host_name: Specific host name to query (optional) - + Returns: Host information """ cmd = "bhosts -o 'HOST_NAME STATUS jl_u MAX NJOBS RUN SSUSP USUSP RSV' -json" if host_name: cmd = f"bhosts {host_name} -o 'HOST_NAME STATUS jl_u MAX NJOBS RUN SSUSP USUSP RSV' -json" - + return await self._execute_lsf_command(cmd) - + async def list_queues(self, queue_name: Optional[str] = None) -> Dict[str, Any]: """ List LSF queues using bqueues command. - + Args: queue_name: Specific queue name to query (optional) - + Returns: Queue information """ cmd = "bqueues -o 'QUEUE_NAME PRIO STATUS MAX NJOBS PEND RUN SUSP' -json" if queue_name: cmd = f"bqueues {queue_name} -o 'QUEUE_NAME PRIO STATUS MAX NJOBS PEND RUN SUSP' -json" - + return await self._execute_lsf_command(cmd) - + async def check_load(self, host_name: Optional[str] = None) -> Dict[str, Any]: """ Check system load using lsload command. - + Args: host_name: Specific host to check (optional) - + Returns: Load information """ cmd = "lsload -o 'HOST_NAME status r15s r1m r15m ut pg ls it tmp swp mem' -json" if host_name: cmd = f"lsload {host_name} -o 'HOST_NAME status r15s r1m r15m ut pg ls it tmp swp mem' -json" - + return await self._execute_lsf_command(cmd) - + async def list_host_info(self, host_name: Optional[str] = None) -> Dict[str, Any]: """ Get detailed host information using lshosts command. - + Args: host_name: Specific host name to query (optional) - + Returns: Detailed host information """ cmd = "lshosts -o 'HOST_NAME type model cpuf ncpus maxmem maxswp server RESOURCES' -json" if host_name: cmd = f"lshosts {host_name} -o 'HOST_NAME type model cpuf ncpus maxmem maxswp server RESOURCES' -json" - + return await self._execute_lsf_command(cmd) - + async def get_cluster_id(self) -> Dict[str, Any]: """ Get LSF cluster identifier using lsid command. - + Returns: Cluster ID and version information """ cmd = "lsid" return await self._execute_lsf_command(cmd) - + async def get_cluster_info(self) -> Dict[str, Any]: """ Get LSF cluster information via API endpoint. - + Returns: Cluster configuration and status """ await self.auth.ensure_authenticated() - + logger.info("Getting cluster information via API") - + try: - response = await self.client.get('/lsf/v1/cluster') + response = await self.client.get("/lsf/v1/cluster") result = response.json() - + logger.info("Cluster information retrieved successfully") - - return { - 'success': True, - 'result': result - } - + + return {"success": True, "result": result} + except Exception as e: - logger.error(f"Failed to get cluster info: {str(e)}") - return { - 'success': False, - 'error': str(e) - } \ No newline at end of file + logger.error("Failed to get cluster info: %s", str(e)) + return {"success": False, "error": str(e)} diff --git a/src/lsf_mcp_server/tools/files.py b/src/lsf_mcp_server/tools/files.py index 2f39686..d777d0a 100644 --- a/src/lsf_mcp_server/tools/files.py +++ b/src/lsf_mcp_server/tools/files.py @@ -16,216 +16,180 @@ import logging import os -from typing import Dict, Any, Optional -from ..lsf_client import LSFClient -from ..auth import AuthManager +from typing import Any, Dict, Optional +import httpx + +from ..auth import AuthManager +from ..lsf_client import LSFClient logger = logging.getLogger(__name__) class FileTools: """Tools for file operations on LSF server.""" - + def __init__(self, client: LSFClient, auth: AuthManager): """ Initialize file tools. - + Args: client: LSF API client auth: Authentication manager """ self.client = client self.auth = auth - - async def upload_file( - self, - local_path: str, - remote_path: str - ) -> Dict[str, Any]: + + async def upload_file(self, local_path: str, remote_path: str) -> Dict[str, Any]: """ Upload a file to the LSF server. - + Args: local_path: Path to local file remote_path: Destination path on LSF server - + Returns: Upload result """ await self.auth.ensure_authenticated() - - logger.info(f"Uploading file: {local_path} -> {remote_path}") - + + logger.info("Uploading file: %s -> %s", local_path, remote_path) + try: # Check if local file exists if not os.path.exists(local_path): raise FileNotFoundError(f"Local file not found: {local_path}") - + # Read file content - with open(local_path, 'rb') as f: + with open(local_path, "rb") as f: file_content = f.read() - + # Prepare multipart form data - files = { - 'file': (os.path.basename(local_path), file_content) - } - data = { - 'path': remote_path - } - - response = await self.client.post( - '/lsf/v1/files', - files=files, - data=data - ) - + files = {"file": (os.path.basename(local_path), file_content)} + data = {"path": remote_path} + + response = await self.client.post("/lsf/v1/files", files=files, data=data) + result = response.json() - logger.info(f"File uploaded successfully") - + logger.info("File uploaded successfully") + return { - 'success': True, - 'local_path': local_path, - 'remote_path': remote_path, - 'result': result + "success": True, + "local_path": local_path, + "remote_path": remote_path, + "result": result, } - + except Exception as e: - logger.error(f"Failed to upload file: {str(e)}") + logger.error("Failed to upload file: %s", str(e)) return { - 'success': False, - 'error': str(e), - 'local_path': local_path, - 'remote_path': remote_path + "success": False, + "error": str(e), + "local_path": local_path, + "remote_path": remote_path, } - + async def download_file( - self, - remote_path: str, - local_path: Optional[str] = None + self, remote_path: str, local_path: Optional[str] = None ) -> Dict[str, Any]: """ Download a file from the LSF server. - + Args: remote_path: Path on LSF server local_path: Local destination path (optional) - + Returns: Download result with file content or saved path """ await self.auth.ensure_authenticated() - - logger.info(f"Downloading file: {remote_path}") - + + logger.info("Downloading file: %s", remote_path) + try: # Encode the remote path as base64 encoded_path = self.client.encode_path(remote_path) - - response = await self.client.get(f'/lsf/v1/files/{encoded_path}') - + + response = await self.client.get(f"/lsf/v1/files/{encoded_path}") + # If local_path is provided, save to file if local_path: - with open(local_path, 'wb') as f: + with open(local_path, "wb") as f: f.write(response.content) - - logger.info(f"File downloaded and saved to: {local_path}") - + + logger.info("File downloaded and saved to: %s", local_path) + return { - 'success': True, - 'remote_path': remote_path, - 'local_path': local_path, - 'size_bytes': len(response.content) + "success": True, + "remote_path": remote_path, + "local_path": local_path, + "size_bytes": len(response.content), } else: # Return content as text - logger.info(f"File downloaded successfully") - + logger.info("File downloaded successfully") + return { - 'success': True, - 'remote_path': remote_path, - 'content': response.text, - 'size_bytes': len(response.content) + "success": True, + "remote_path": remote_path, + "content": response.text, + "size_bytes": len(response.content), } - + except Exception as e: - logger.error(f"Failed to download file: {str(e)}") - return { - 'success': False, - 'error': str(e), - 'remote_path': remote_path - } - + logger.error("Failed to download file: %s", str(e)) + return {"success": False, "error": str(e), "remote_path": remote_path} + async def list_files(self, path: str) -> Dict[str, Any]: """ List files in a directory on the LSF server. - + Args: path: Directory path to list - + Returns: List of files with metadata """ await self.auth.ensure_authenticated() - - logger.info(f"Listing files in: {path}") - + + logger.info("Listing files in: %s", path) + try: - response = await self.client.get( - '/lsf/v1/files', - params={'path': path} - ) - + response = await self.client.get("/lsf/v1/files", params={"path": path}) + result = response.json() - logger.info(f"Files listed successfully") - - return { - 'success': True, - 'path': path, - 'result': result - } - + logger.info("Files listed successfully") + + return {"success": True, "path": path, "result": result} + except Exception as e: - logger.error(f"Failed to list files: {str(e)}") - return { - 'success': False, - 'error': str(e), - 'path': path - } - + logger.error("Failed to list files: %s", str(e)) + return {"success": False, "error": str(e), "path": path} + async def delete_file(self, file_path: str) -> Dict[str, Any]: """ Delete a file on the LSF server. - + Args: file_path: Path to file to delete - + Returns: Deletion result """ await self.auth.ensure_authenticated() - - logger.info(f"Deleting file: {file_path}") - + + logger.info("Deleting file: %s", file_path) + try: # Encode the file path as base64 encoded_path = self.client.encode_path(file_path) - - response = await self.client.delete(f'/lsf/v1/files/{encoded_path}') - + + response = await self.client.delete(f"/lsf/v1/files/{encoded_path}") + result = response.json() - logger.info(f"File deleted successfully") - - return { - 'success': True, - 'file_path': file_path, - 'result': result - } - + logger.info("File deleted successfully") + + return {"success": True, "file_path": file_path, "result": result} except Exception as e: - logger.error(f"Failed to delete file: {str(e)}") - return { - 'success': False, - 'error': str(e), - 'file_path': file_path - } \ No newline at end of file + logger.error("Failed to delete file: %s", str(e)) + return {"success": False, "error": str(e), "file_path": file_path} diff --git a/src/lsf_mcp_server/tools/jobs.py b/src/lsf_mcp_server/tools/jobs.py index 1ee5251..0c6ea33 100644 --- a/src/lsf_mcp_server/tools/jobs.py +++ b/src/lsf_mcp_server/tools/jobs.py @@ -14,30 +14,29 @@ """Job management tools for LSF.""" -import json import logging -from typing import Dict, Any, Optional -from ..lsf_client import LSFClient -from ..auth import AuthManager +from typing import Any, Dict, Optional +from ..auth import AuthManager +from ..lsf_client import LSFClient logger = logging.getLogger(__name__) class JobTools: """Tools for managing LSF jobs.""" - + def __init__(self, client: LSFClient, auth: AuthManager): """ Initialize job tools. - + Args: client: LSF API client auth: Authentication manager """ self.client = client self.auth = auth - + def _build_bsub_command( self, command: str, @@ -48,39 +47,39 @@ def _build_bsub_command( wall_time: Optional[str] = None, output_file: Optional[str] = None, error_file: Optional[str] = None, - working_directory: Optional[str] = None + working_directory: Optional[str] = None, ) -> str: """Build a bsub command string from parameters.""" cmd_parts = ["bsub"] - + if job_name: cmd_parts.append(f"-J {job_name}") - + if queue: cmd_parts.append(f"-q {queue}") - + if num_processors: cmd_parts.append(f"-n {num_processors}") - + if memory_mb: cmd_parts.append(f"-M {memory_mb}") - + if wall_time: cmd_parts.append(f"-W {wall_time}") - + if output_file: cmd_parts.append(f"-o {output_file}") - + if error_file: cmd_parts.append(f"-e {error_file}") - + if working_directory: cmd_parts.append(f"-cwd {working_directory}") - + cmd_parts.append(command) - + return " ".join(cmd_parts) - + async def submit_job( self, command: str, @@ -92,11 +91,11 @@ async def submit_job( output_file: Optional[str] = None, error_file: Optional[str] = None, working_directory: Optional[str] = None, - advanced_options: Optional[str] = None + advanced_options: Optional[str] = None, ) -> Dict[str, Any]: """ Submit a job to LSF. - + Args: command: Command to execute job_name: Job name @@ -108,166 +107,154 @@ async def submit_job( error_file: Error file path working_directory: Working directory advanced_options: Advanced LSF options string - + Returns: Job submission result """ await self.auth.ensure_authenticated() - + # Build the bsub command if advanced_options: lsf_command = f"bsub {advanced_options} {command}" else: lsf_command = self._build_bsub_command( - command, job_name, queue, num_processors, - memory_mb, wall_time, output_file, error_file, - working_directory + command, + job_name, + queue, + num_processors, + memory_mb, + wall_time, + output_file, + error_file, + working_directory, ) - - logger.info(f"Submitting job: {lsf_command}") - + + logger.info("Submitting job: %s", lsf_command) + try: response = await self.client.post( - '/lsf/v1/cluster/usercmd', - data={'command': lsf_command, 'env': ''}, - headers={'Content-Type': 'application/x-www-form-urlencoded'} + "/lsf/v1/cluster/usercmd", + data={"command": lsf_command, "env": ""}, + headers={"Content-Type": "application/x-www-form-urlencoded"}, ) - + result = response.json() - logger.info(f"Job submitted successfully: {result}") - - return { - 'success': True, - 'command': lsf_command, - 'result': result - } - + logger.info("Job submitted successfully: %s", result) + + return {"success": True, "command": lsf_command, "result": result} + except Exception as e: - logger.error(f"Failed to submit job: {str(e)}") - return { - 'success': False, - 'error': str(e), - 'command': lsf_command - } - + logger.error("Failed to submit job: %s", str(e)) + return {"success": False, "error": str(e), "command": lsf_command} + async def query_jobs( self, job_id: Optional[str] = None, user: Optional[str] = None, queue: Optional[str] = None, - status: Optional[str] = None + status: Optional[str] = None, ) -> Dict[str, Any]: """ Query job status and information. - + Args: job_id: Specific job ID to query user: Filter by username queue: Filter by queue name status: Filter by job status - + Returns: Job information """ await self.auth.ensure_authenticated() - + # Build bjobs command cmd_parts = ["bjobs"] - + if user: cmd_parts.append(f"-u {user}") - + if queue: cmd_parts.append(f"-q {queue}") - + if not job_id: # Show all jobs if no specific job ID cmd_parts.append("-a") - + # Request JSON output with detailed fields - cmd_parts.append("-o 'jobid stat queue user job_name submit_time start_time finish_time run_time cpu_used mem max_mem' -json") - + cmd_parts.append( + "-o 'jobid stat queue user job_name submit_time start_time finish_time run_time cpu_used mem max_mem' -json" + ) + # Job ID must come last if job_id: cmd_parts.append(job_id) - + lsf_command = " ".join(cmd_parts) - logger.info(f"Querying jobs: {lsf_command}") - + logger.info("Querying jobs: %s", lsf_command) + try: response = await self.client.post( - '/lsf/v1/cluster/usercmd', - data={'command': lsf_command, 'env': ''}, - headers={'Content-Type': 'application/x-www-form-urlencoded'} + "/lsf/v1/cluster/usercmd", + data={"command": lsf_command, "env": ""}, + headers={"Content-Type": "application/x-www-form-urlencoded"}, ) - + result = response.json() - logger.info(f"Job query successful") - - return { - 'success': True, - 'command': lsf_command, - 'result': result - } - + logger.info("Job query successful") + + return {"success": True, "command": lsf_command, "result": result} + except Exception as e: - logger.error(f"Failed to query jobs: {str(e)}") - return { - 'success': False, - 'error': str(e), - 'command': lsf_command - } - - async def kill_job( - self, - job_id: str, - force: bool = False - ) -> Dict[str, Any]: + logger.error("Failed to query jobs: %s", str(e)) + return {"success": False, "error": str(e), "command": lsf_command} + + async def kill_job(self, job_id: str, force: bool = False) -> Dict[str, Any]: """ Kill a running or pending job. - + Args: job_id: Job ID to kill force: Force kill the job - + Returns: Kill operation result """ await self.auth.ensure_authenticated() - + # Build bkill command cmd_parts = ["bkill"] - + if force: cmd_parts.append("-r") - + cmd_parts.append(job_id) - + lsf_command = " ".join(cmd_parts) - logger.info(f"Killing job: {lsf_command}") - + logger.info("Killing job: %s", lsf_command) + try: response = await self.client.post( - '/lsf/v1/cluster/usercmd', - data={'command': lsf_command, 'env': ''}, - headers={'Content-Type': 'application/x-www-form-urlencoded'} + "/lsf/v1/cluster/usercmd", + data={"command": lsf_command, "env": ""}, + headers={"Content-Type": "application/x-www-form-urlencoded"}, ) - + result = response.json() - logger.info(f"Job killed successfully: {result}") - + logger.info("Job killed successfully: %s", result) + return { - 'success': True, - 'job_id': job_id, - 'command': lsf_command, - 'result': result + "success": True, + "job_id": job_id, + "command": lsf_command, + "result": result, } - + except Exception as e: - logger.error(f"Failed to kill job: {str(e)}") + logger.error("Failed to kill job: %s", str(e)) return { - 'success': False, - 'error': str(e), - 'job_id': job_id, - 'command': lsf_command - } \ No newline at end of file + "success": False, + "error": str(e), + "job_id": job_id, + "command": lsf_command, + } diff --git a/tests/test_lsf_client.py b/tests/test_lsf_client.py index 49e7443..43f5801 100644 --- a/tests/test_lsf_client.py +++ b/tests/test_lsf_client.py @@ -15,63 +15,64 @@ """Tests for LSF client.""" import pytest + from lsf_mcp_server.lsf_client import LSFClient class TestLSFClient: """Test cases for LSFClient.""" - + def test_client_initialization(self): """Test that client initializes with correct base URL.""" client = LSFClient("http://example.com:8088") assert client.base_url == "http://example.com:8088" assert client.session_token is None - + def test_client_strips_trailing_slash(self): """Test that trailing slash is removed from base URL.""" client = LSFClient("http://example.com:8088/") assert client.base_url == "http://example.com:8088" - + def test_set_session_token(self): """Test setting session token.""" client = LSFClient("http://example.com:8088") client.set_session_token("test-token-123") assert client.session_token == "test-token-123" - + def test_clear_session_token(self): """Test clearing session token.""" client = LSFClient("http://example.com:8088") client.set_session_token("test-token-123") client.clear_session_token() assert client.session_token is None - + def test_encode_path(self): """Test path encoding to base64.""" path = "/home/user/test.txt" encoded = LSFClient.encode_path(path) assert isinstance(encoded, str) assert len(encoded) > 0 - + def test_decode_path(self): """Test path decoding from base64.""" path = "/home/user/test.txt" encoded = LSFClient.encode_path(path) decoded = LSFClient.decode_path(encoded) assert decoded == path - + def test_get_headers_without_token(self): """Test headers without session token.""" client = LSFClient("http://example.com:8088") headers = client._get_headers() assert "Authorization" not in headers - + def test_get_headers_with_token(self): """Test headers with session token.""" client = LSFClient("http://example.com:8088") client.set_session_token("test-token-123") headers = client._get_headers() assert headers["Authorization"] == "test-token-123" - + def test_get_headers_with_additional(self): """Test headers with additional headers.""" client = LSFClient("http://example.com:8088") @@ -87,4 +88,5 @@ async def test_client_close(): await client.close() # If no exception is raised, the test passes + # Made with Bob