Skip to content

Commit 87e53f6

Browse files
author
Datata1
committed
context manager for log streams
1 parent 5beb026 commit 87e53f6

File tree

14 files changed

+427
-565
lines changed

14 files changed

+427
-565
lines changed

examples/create_workspace_with_landscape.py

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
"""
2-
Demo: Create a workspace, deploy a landscape profile, and stream logs.
3-
"""
4-
51
import asyncio
62
import time
73

@@ -14,7 +10,7 @@
1410
)
1511
from codesphere.resources.workspace.logs import LogStage
1612

17-
TEAM_ID = 35698
13+
TEAM_ID = 123
1814

1915

2016
async def main():
@@ -25,8 +21,8 @@ async def main():
2521
raise ValueError("Micro plan not found")
2622

2723
workspace_name = f"pipeline-demo-{int(time.time())}"
28-
print(f"Creating workspace '{workspace_name}'...")
2924

25+
print(f"Creating workspace '{workspace_name}'...")
3026
workspace = await sdk.workspaces.create(
3127
WorkspaceCreate(plan_id=plan.id, team_id=TEAM_ID, name=workspace_name)
3228
)
@@ -45,7 +41,7 @@ async def main():
4541
.add_reactive_service("web")
4642
.plan(plan.id)
4743
.add_step(
48-
'for i in $(seq 1 50); do echo "[$i] Processing request..."; sleep 1; done'
44+
'for i in $(seq 1 20); do echo "[$i] Processing request..."; sleep 1; done'
4945
)
5046
.add_port(3000, public=True)
5147
.add_path("/", port=3000)
@@ -82,17 +78,17 @@ async def main():
8278

8379
print("\n--- Run Stage ---")
8480
await workspace.landscape.start_stage(PipelineStage.RUN, profile="production")
85-
print("Started run stage, waiting for logs...\n")
81+
print("Started run stage\n")
8682

87-
print("Streaming logs from 'web' service:")
83+
print("Streaming logs from 'web' service (using context manager):")
8884
count = 0
89-
async for entry in workspace.logs.stream_server(step=0, server="web"):
90-
if entry.get_text():
91-
print(f" {entry.get_text().strip()}")
92-
count += 1
85+
async with workspace.logs.open_server_stream(step=0, server="web") as stream:
86+
async for entry in stream:
87+
if entry.get_text():
88+
print(f" {entry.get_text().strip()}")
89+
count += 1
9390

9491
print(f"\n✓ Stream ended ({count} log entries)")
95-
print(f"✓ Workspace {workspace.id} is still running.")
9692

9793

9894
if __name__ == "__main__":

src/codesphere/core/__init__.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
from .base import ResourceBase
2-
from .operations import APIOperation, AsyncCallable
3-
from .handler import _APIOperationExecutor, APIRequestHandler
1+
from .base import CamelModel, ResourceBase
2+
from .handler import APIRequestHandler, _APIOperationExecutor
3+
from .operations import APIOperation, AsyncCallable, StreamOperation
44

55
__all__ = [
6+
"CamelModel",
67
"ResourceBase",
78
"APIOperation",
89
"_APIOperationExecutor",
910
"APIRequestHandler",
1011
"AsyncCallable",
12+
"StreamOperation",
1113
]

src/codesphere/core/operations.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,25 @@
1-
from typing import Callable, Awaitable, Generic, Optional, Type, TypeAlias, TypeVar
2-
3-
from pydantic import BaseModel
1+
from typing import Awaitable, Callable, Generic, Optional, Type, TypeAlias, TypeVar
42

3+
from pydantic import BaseModel, ConfigDict
54

65
_T = TypeVar("_T")
76
ResponseT = TypeVar("ResponseT")
87
InputT = TypeVar("InputT")
8+
EntryT = TypeVar("EntryT")
99

1010
AsyncCallable: TypeAlias = Callable[[], Awaitable[_T]]
1111

1212

1313
class APIOperation(BaseModel, Generic[ResponseT, InputT]):
14+
model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True)
15+
1416
method: str
1517
endpoint_template: str
1618
response_model: Type[ResponseT]
1719
input_model: Optional[Type[InputT]] = None
20+
21+
22+
class StreamOperation(BaseModel, Generic[EntryT]):
23+
model_config = ConfigDict(arbitrary_types_allowed=True, frozen=True)
24+
endpoint_template: str
25+
entry_model: Type[EntryT]

src/codesphere/resources/workspace/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from .git import GitHead, WorkspaceGitManager
2-
from .logs import LogEntry, LogProblem, LogStage, WorkspaceLogManager
2+
from .logs import LogEntry, LogProblem, LogStage, LogStream, WorkspaceLogManager
33
from .resources import WorkspacesResource
44
from .schemas import (
55
CommandInput,
@@ -20,6 +20,7 @@
2020
"CommandOutput",
2121
"WorkspaceGitManager",
2222
"GitHead",
23+
"LogStream",
2324
"WorkspaceLogManager",
2425
"LogEntry",
2526
"LogProblem",

src/codesphere/resources/workspace/landscape/models.py

Lines changed: 0 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -107,24 +107,11 @@ async def teardown(self) -> None:
107107
async def scale(self, services: Dict[str, int]) -> None:
108108
await self._execute_operation(_SCALE_OP, data=services)
109109

110-
# Pipeline operations
111-
112110
async def start_stage(
113111
self,
114112
stage: Union[PipelineStage, str],
115113
profile: Optional[str] = None,
116114
) -> None:
117-
"""Start a pipeline stage.
118-
119-
Args:
120-
stage: The pipeline stage to start ('prepare', 'test', or 'run').
121-
profile: Optional profile name. If provided, starts the stage with
122-
that profile. Required for first run after deploy.
123-
124-
Raises:
125-
ValidationError: If the workspace is not running or parameters are invalid.
126-
NotFoundError: If the workspace is not found.
127-
"""
128115
if isinstance(stage, PipelineStage):
129116
stage = stage.value
130117

@@ -137,15 +124,6 @@ async def start_stage(
137124
await self._execute_operation(_START_PIPELINE_STAGE_OP, stage=stage)
138125

139126
async def stop_stage(self, stage: Union[PipelineStage, str]) -> None:
140-
"""Stop a pipeline stage.
141-
142-
Args:
143-
stage: The pipeline stage to stop ('prepare', 'test', or 'run').
144-
145-
Raises:
146-
ValidationError: If the workspace is not running or parameters are invalid.
147-
NotFoundError: If the workspace is not found.
148-
"""
149127
if isinstance(stage, PipelineStage):
150128
stage = stage.value
151129

@@ -154,18 +132,6 @@ async def stop_stage(self, stage: Union[PipelineStage, str]) -> None:
154132
async def get_stage_status(
155133
self, stage: Union[PipelineStage, str]
156134
) -> PipelineStatusList:
157-
"""Get the status of a pipeline stage.
158-
159-
Args:
160-
stage: The pipeline stage to get status for ('prepare', 'test', or 'run').
161-
162-
Returns:
163-
List of PipelineStatus objects, one per replica/server.
164-
165-
Raises:
166-
ValidationError: If the workspace is not running or parameters are invalid.
167-
NotFoundError: If the workspace is not found.
168-
"""
169135
if isinstance(stage, PipelineStage):
170136
stage = stage.value
171137

@@ -179,22 +145,6 @@ async def wait_for_stage(
179145
poll_interval: float = 5.0,
180146
server: Optional[str] = None,
181147
) -> PipelineStatusList:
182-
"""Wait for a pipeline stage to complete (success or failure).
183-
184-
Args:
185-
stage: The pipeline stage to wait for.
186-
timeout: Maximum time to wait in seconds (default: 300).
187-
poll_interval: Time between status checks in seconds (default: 5).
188-
server: Optional server name to filter by. If None, waits for all
189-
servers that have steps defined for this stage.
190-
191-
Returns:
192-
Final PipelineStatusList when stage completes.
193-
194-
Raises:
195-
TimeoutError: If the stage doesn't complete within the timeout.
196-
ValidationError: If the workspace is not running.
197-
"""
198148
if poll_interval <= 0:
199149
raise ValueError("poll_interval must be greater than 0")
200150

@@ -204,26 +154,17 @@ async def wait_for_stage(
204154
while elapsed < timeout:
205155
status_list = await self.get_stage_status(stage)
206156

207-
# Filter to relevant servers for THIS stage
208-
# A server is relevant for this stage if:
209-
# - It has steps defined (meaning it participates in this stage)
210-
# - OR it's not in 'waiting' state (meaning it has started)
211157
relevant_statuses = []
212158
for s in status_list:
213159
if server is not None:
214-
# Filter by specific server
215160
if s.server == server:
216161
relevant_statuses.append(s)
217162
else:
218-
# Include servers that have steps for this stage
219-
# Servers with no steps and waiting state don't participate in this stage
220163
if s.steps:
221164
relevant_statuses.append(s)
222165
elif s.state != PipelineState.WAITING:
223-
# Started but no steps visible yet
224166
relevant_statuses.append(s)
225167

226-
# If no relevant statuses yet, keep waiting
227168
if not relevant_statuses:
228169
log.debug(
229170
"Pipeline stage '%s': no servers with steps yet, waiting...",
@@ -233,7 +174,6 @@ async def wait_for_stage(
233174
elapsed += poll_interval
234175
continue
235176

236-
# Check if all relevant servers have completed
237177
all_completed = all(
238178
s.state
239179
in (PipelineState.SUCCESS, PipelineState.FAILURE, PipelineState.ABORTED)
@@ -244,7 +184,6 @@ async def wait_for_stage(
244184
log.debug("Pipeline stage '%s' completed.", stage_name)
245185
return PipelineStatusList(root=relevant_statuses)
246186

247-
# Log current state
248187
states = [f"{s.server}={s.state.value}" for s in relevant_statuses]
249188
log.debug(
250189
"Pipeline stage '%s' status: %s (elapsed: %.1fs)",

src/codesphere/resources/workspace/landscape/operations.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
response_model=type(None),
2626
)
2727

28-
# Pipeline operations
2928
_START_PIPELINE_STAGE_OP = APIOperation(
3029
method="POST",
3130
endpoint_template="/workspaces/{id}/pipeline/{stage}/start",

src/codesphere/resources/workspace/landscape/schemas.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,12 @@
1010

1111

1212
class PipelineStage(str, Enum):
13-
"""Pipeline stage for operations."""
14-
1513
PREPARE = "prepare"
1614
TEST = "test"
1715
RUN = "run"
1816

1917

2018
class PipelineState(str, Enum):
21-
"""State of a pipeline stage or step."""
22-
2319
WAITING = "waiting"
2420
RUNNING = "running"
2521
SUCCESS = "success"
@@ -28,16 +24,12 @@ class PipelineState(str, Enum):
2824

2925

3026
class StepStatus(CamelModel):
31-
"""Status of a single pipeline step."""
32-
3327
state: PipelineState
3428
started_at: Optional[str] = None
3529
finished_at: Optional[str] = None
3630

3731

3832
class PipelineStatus(CamelModel):
39-
"""Status of a pipeline stage execution."""
40-
4133
state: PipelineState
4234
started_at: Optional[str] = None
4335
finished_at: Optional[str] = None
@@ -47,8 +39,6 @@ class PipelineStatus(CamelModel):
4739

4840

4941
class PipelineStatusList(RootModel[List[PipelineStatus]]):
50-
"""List of pipeline status entries (one per replica/server)."""
51-
5242
root: List[PipelineStatus]
5343

5444
def __iter__(self):

src/codesphere/resources/workspace/logs/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
from .models import WorkspaceLogManager
1+
from .models import LogStream, WorkspaceLogManager
22
from .schemas import LogEntry, LogProblem, LogStage
33

44
__all__ = [
5+
"LogStream",
56
"WorkspaceLogManager",
67
"LogEntry",
78
"LogProblem",

0 commit comments

Comments
 (0)