Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.

Commit 3076e2e

Browse files
authored
Merge pull request #601 from michalskrivanek/shell-cli
add CLI support for Shell driver methods, stream output
2 parents 850f5cc + 5f9689c commit 3076e2e

5 files changed

Lines changed: 368 additions & 43 deletions

File tree

packages/jumpstarter-driver-shell/README.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,53 @@ methods will be generated dynamically, and they will be available as follows:
6363

6464
:returns: A tuple(stdout, stderr, return_code)
6565
```
66+
67+
## CLI Usage
68+
69+
The shell driver also provides a CLI when using `jmp shell`. All configured methods become available as CLI commands, except for methods starting with `_` which are considered private and hidden from the end user:
70+
71+
```console
72+
$ jmp shell --exporter shell-exporter
73+
$ j shell
74+
Usage: j shell [OPTIONS] COMMAND [ARGS]...
75+
76+
Shell command executor
77+
78+
Commands:
79+
env_var Execute the env_var shell method
80+
ls Execute the ls shell method
81+
method2 Execute the method2 shell method
82+
method3 Execute the method3 shell method
83+
```
84+
85+
### CLI Command Usage
86+
87+
Each configured method becomes a CLI command with the following options:
88+
89+
```console
90+
$ j shell ls --help
91+
Usage: j shell ls [OPTIONS] [ARGS]...
92+
93+
Execute the ls shell method
94+
95+
Options:
96+
-e, --env TEXT Environment variables in KEY=VALUE format
97+
--help Show this message and exit.
98+
```
99+
100+
### Examples
101+
102+
```console
103+
# Execute simple commands
104+
$ j shell ls
105+
file1.txt file2.txt directory/
106+
107+
# Pass arguments to shell methods
108+
$ j shell method3 "first arg" "second arg"
109+
Hello World first arg
110+
Hello World second arg
111+
112+
# Set environment variables
113+
$ j shell env_var arg1 arg2 --env ENV_VAR=myvalue
114+
arg1,arg2,myvalue
115+
```

packages/jumpstarter-driver-shell/jumpstarter_driver_shell/client.py

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
import sys
12
from dataclasses import dataclass
23

4+
import click
5+
36
from jumpstarter.client import DriverClient
47

58

@@ -11,8 +14,8 @@ class ShellClient(DriverClient):
1114
Client interface for Shell driver.
1215
1316
This client dynamically checks that the method is configured
14-
on the driver, and if it is, it will call it and get the results
15-
in the form of (stdout, stderr, returncode).
17+
on the driver, and if it is, it will call it with live streaming output.
18+
Output chunks are displayed as they arrive.
1619
"""
1720

1821
def _check_method_exists(self, method):
@@ -24,4 +27,59 @@ def _check_method_exists(self, method):
2427
## capture any method calls dynamically
2528
def __getattr__(self, name):
2629
self._check_method_exists(name)
27-
return lambda *args, **kwargs: tuple(self.call("call_method", name, kwargs, *args))
30+
def execute(*args, **kwargs):
31+
returncode = 0
32+
for stdout, stderr, code in self.streamingcall("call_method", name, kwargs, *args):
33+
if stdout:
34+
print(stdout, end='', flush=True)
35+
if stderr:
36+
print(stderr, end='', file=sys.stderr, flush=True)
37+
if code is not None:
38+
returncode = code
39+
return returncode
40+
return execute
41+
42+
def cli(self):
43+
"""Create CLI interface for dynamically configured shell methods"""
44+
@click.group
45+
def base():
46+
"""Shell command executor"""
47+
pass
48+
49+
# Get available methods from the driver
50+
if self._methods is None:
51+
self._methods = self.call("get_methods")
52+
53+
# Create a command for each configured method
54+
for method_name in self._methods:
55+
self._add_method_command(base, method_name)
56+
57+
return base
58+
59+
def _add_method_command(self, group, method_name):
60+
"""Add a Click command for a specific shell method"""
61+
@group.command(
62+
name=method_name,
63+
context_settings={"ignore_unknown_options": True, "allow_interspersed_args": False},
64+
)
65+
@click.argument('args', nargs=-1, type=click.UNPROCESSED)
66+
@click.option('--env', '-e', multiple=True,
67+
help='Environment variables in KEY=VALUE format')
68+
def method_command(args, env):
69+
# Parse environment variables
70+
env_dict = {}
71+
for env_var in env:
72+
if '=' in env_var:
73+
key, value = env_var.split('=', 1)
74+
env_dict[key] = value
75+
else:
76+
raise click.BadParameter(f"Invalid --env value '{env_var}'. Use KEY=VALUE.")
77+
78+
returncode = getattr(self, method_name)(*args, **env_dict)
79+
80+
# Exit with the same return code as the shell command
81+
if returncode != 0:
82+
raise click.exceptions.Exit(returncode)
83+
84+
# Update the docstring dynamically
85+
method_command.__doc__ = f"Execute the {method_name} shell method"
Lines changed: 126 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
import asyncio
12
import os
3+
import signal
24
import subprocess
35
from dataclasses import dataclass, field
6+
from typing import AsyncGenerator
47

58
from jumpstarter.driver import Driver, export
69

@@ -27,41 +30,38 @@ def get_methods(self) -> list[str]:
2730
return methods
2831

2932
@export
30-
def call_method(self, method: str, env, *args):
33+
async def call_method(self, method: str, env, *args) -> AsyncGenerator[tuple[str, str, int | None], None]:
34+
"""
35+
Execute a shell method with live streaming output.
36+
Yields (stdout_chunk, stderr_chunk, returncode) tuples.
37+
returncode is None until the process completes, then it's the final return code.
38+
"""
3139
self.logger.info(f"calling {method} with args: {args} and kwargs as env: {env}")
3240
if method not in self.methods:
3341
raise ValueError(f"Method '{method}' not found in available methods: {list(self.methods.keys())}")
3442
script = self.methods[method]
3543
self.logger.debug(f"running script: {script}")
44+
3645
try:
37-
result = self._run_inline_shell_script(method, script, *args, env_vars=env)
38-
if result.returncode != 0:
39-
self.logger.info(f"{method} return code: {result.returncode}")
40-
if result.stderr != "":
41-
stderr = result.stderr.rstrip("\n")
42-
self.logger.debug(f"{method} stderr:\n{stderr}")
43-
if result.stdout != "":
44-
stdout = result.stdout.rstrip("\n")
45-
self.logger.debug(f"{method} stdout:\n{stdout}")
46-
return result.stdout, result.stderr, result.returncode
46+
async for stdout_chunk, stderr_chunk, returncode in self._run_inline_shell_script(
47+
method, script, *args, env_vars=env
48+
):
49+
if stdout_chunk:
50+
self.logger.debug(f"{method} stdout:\n{stdout_chunk.rstrip()}")
51+
if stderr_chunk:
52+
self.logger.debug(f"{method} stderr:\n{stderr_chunk.rstrip()}")
53+
54+
if returncode is not None and returncode != 0:
55+
self.logger.info(f"{method} return code: {returncode}")
56+
57+
yield stdout_chunk, stderr_chunk, returncode
4758
except subprocess.TimeoutExpired as e:
4859
self.logger.error(f"Timeout expired while running {method}: {e}")
49-
return "", f"Timeout expired while running {method}: {e}", 199
50-
51-
def _run_inline_shell_script(self, method, script, *args, env_vars=None):
52-
"""
53-
Run the given shell script (as a string) with optional arguments and
54-
environment variables. Returns a CompletedProcess with stdout, stderr, and returncode.
55-
56-
:param script: The shell script contents as a string.
57-
:param args: Arguments to pass to the script (mapped to $1, $2, etc. in the script).
58-
:param env_vars: A dict of environment variables to make available to the script.
59-
60-
:return: A subprocess.CompletedProcess object (Python 3.5+).
61-
"""
60+
yield "", f"\nTimeout expired while running {method}: {e}\n", 199
6261

62+
def _validate_script_params(self, script, args, env_vars):
63+
"""Validate script parameters and return combined environment."""
6364
# Merge parent environment with the user-supplied env_vars
64-
# so that we don't lose existing environment variables.
6565
combined_env = os.environ.copy()
6666
if env_vars:
6767
# Validate environment variable names
@@ -82,16 +82,108 @@ def _run_inline_shell_script(self, method, script, *args, env_vars=None):
8282
if self.cwd and not os.path.isdir(self.cwd):
8383
raise ValueError(f"Working directory does not exist: {self.cwd}")
8484

85+
return combined_env
86+
87+
async def _read_process_output(self, process, read_all=False):
88+
"""Read data from stdout and stderr streams.
89+
90+
:param process: The subprocess to read from
91+
:param read_all: If True, read all remaining data. If False, read with timeout.
92+
:return: Tuple of (stdout_data, stderr_data)
93+
"""
94+
stdout_data = ""
95+
stderr_data = ""
96+
97+
# Read from stdout
98+
if process.stdout:
99+
try:
100+
if read_all:
101+
chunk = await process.stdout.read()
102+
else:
103+
chunk = await asyncio.wait_for(process.stdout.read(1024), timeout=0.01)
104+
if chunk:
105+
stdout_data = chunk.decode('utf-8', errors='replace')
106+
except (asyncio.TimeoutError, Exception):
107+
pass
108+
109+
# Read from stderr
110+
if process.stderr:
111+
try:
112+
if read_all:
113+
chunk = await process.stderr.read()
114+
else:
115+
chunk = await asyncio.wait_for(process.stderr.read(1024), timeout=0.01)
116+
if chunk:
117+
stderr_data = chunk.decode('utf-8', errors='replace')
118+
except (asyncio.TimeoutError, Exception):
119+
pass
120+
121+
return stdout_data, stderr_data
122+
123+
async def _run_inline_shell_script(
124+
self, method, script, *args, env_vars=None
125+
) -> AsyncGenerator[tuple[str, str, int | None], None]:
126+
"""
127+
Run the given shell script with live streaming output.
128+
129+
:param method: The method name (for logging).
130+
:param script: The shell script contents as a string.
131+
:param args: Arguments to pass to the script (mapped to $1, $2, etc. in the script).
132+
:param env_vars: A dict of environment variables to make available to the script.
133+
134+
:yields: Tuples of (stdout_chunk, stderr_chunk, returncode).
135+
returncode is None until the process completes.
136+
"""
137+
combined_env = self._validate_script_params(script, args, env_vars)
85138
cmd = self.shell + [script, method] + list(args)
86139

87-
# Run the command
88-
result = subprocess.run(
89-
cmd,
90-
capture_output=True, # Captures stdout and stderr
91-
text=True, # Returns stdout/stderr as strings (not bytes)
92-
env=combined_env, # Pass our merged environment
93-
cwd=self.cwd, # Run in the working directory (if set)
94-
timeout=self.timeout,
140+
# Start the process with pipes for streaming and new process group
141+
process = await asyncio.create_subprocess_exec(
142+
*cmd,
143+
stdout=asyncio.subprocess.PIPE,
144+
stderr=asyncio.subprocess.PIPE,
145+
env=combined_env,
146+
cwd=self.cwd,
147+
start_new_session=True, # Create new process group
95148
)
96149

97-
return result
150+
# Create a task to monitor the process timeout
151+
start_time = asyncio.get_event_loop().time()
152+
153+
# Read output in real-time
154+
while process.returncode is None:
155+
self.logger.debug(f"running {method} with cmd: {cmd} and env: {combined_env} and args: {args}")
156+
if asyncio.get_event_loop().time() - start_time > self.timeout:
157+
# Send SIGTERM to entire process group for graceful termination
158+
try:
159+
os.killpg(process.pid, signal.SIGTERM)
160+
except (ProcessLookupError, OSError):
161+
# Process group might already be gone
162+
pass
163+
try:
164+
await asyncio.wait_for(process.wait(), timeout=5.0)
165+
except asyncio.TimeoutError:
166+
try:
167+
os.killpg(process.pid, signal.SIGKILL)
168+
self.logger.warning(f"SIGTERM failed to terminate {process.pid}, sending SIGKILL")
169+
except (ProcessLookupError, OSError):
170+
pass
171+
raise subprocess.TimeoutExpired(cmd, self.timeout) from None
172+
173+
try:
174+
stdout_data, stderr_data = await self._read_process_output(process, read_all=False)
175+
176+
# Yield any data we got
177+
if stdout_data or stderr_data:
178+
yield stdout_data, stderr_data, None
179+
180+
# Small delay to prevent busy waiting
181+
await asyncio.sleep(0.1)
182+
183+
except Exception:
184+
break
185+
186+
# Process completed, get return code and final output
187+
returncode = process.returncode
188+
remaining_stdout, remaining_stderr = await self._read_process_output(process, read_all=True)
189+
yield remaining_stdout, remaining_stderr, returncode

0 commit comments

Comments
 (0)