diff --git a/src/aws_durable_execution_sdk_python_testing/executor.py b/src/aws_durable_execution_sdk_python_testing/executor.py index d2ff0c6..9319508 100644 --- a/src/aws_durable_execution_sdk_python_testing/executor.py +++ b/src/aws_durable_execution_sdk_python_testing/executor.py @@ -11,7 +11,11 @@ DurableExecutionInvocationOutput, InvocationStatus, ) -from aws_durable_execution_sdk_python.lambda_service import ErrorObject, OperationUpdate +from aws_durable_execution_sdk_python.lambda_service import ( + ErrorObject, + Operation, + OperationUpdate, +) from aws_durable_execution_sdk_python_testing.exceptions import ( ExecutionAlreadyStartedException, @@ -33,6 +37,7 @@ StartDurableExecutionInput, StartDurableExecutionOutput, StopDurableExecutionResponse, + TERMINAL_STATUSES, ) from aws_durable_execution_sdk_python_testing.model import ( Event as HistoryEvent, @@ -54,8 +59,8 @@ class Executor(ExecutionObserver): - MAX_CONSECUTIVE_FAILED_ATTEMPTS = 5 - RETRY_BACKOFF_SECONDS = 5 + MAX_CONSECUTIVE_FAILED_ATTEMPTS: int = 5 + RETRY_BACKOFF_SECONDS: int = 5 def __init__(self, store: ExecutionStore, scheduler: Scheduler, invoker: Invoker): self._store = store @@ -393,20 +398,18 @@ def get_execution_state( def get_execution_history( self, execution_arn: str, - include_execution_data: bool = False, # noqa: FBT001, FBT002, ARG002 - reverse_order: bool = False, # noqa: FBT001, FBT002, ARG002 + include_execution_data: bool = False, # noqa: FBT001, FBT002 + reverse_order: bool = False, # noqa: FBT001, FBT002 marker: str | None = None, max_items: int | None = None, ) -> GetDurableExecutionHistoryResponse: """Get execution history with events. - TODO: incomplete - Args: execution_arn: The execution ARN include_execution_data: Whether to include execution data in events reverse_order: Return events in reverse chronological order - marker: Pagination marker + marker: Pagination marker (event_id) max_items: Maximum items to return Returns: @@ -415,30 +418,79 @@ def get_execution_history( Raises: ResourceNotFoundException: If execution does not exist """ - execution = self.get_execution(execution_arn) # noqa: F841 - - # Convert operations to events - # This is a simplified implementation - real implementation would need - # to generate proper event history from operations - events: list[HistoryEvent] = [] + execution: Execution = self.get_execution(execution_arn) + + # Generate events + all_events: list[HistoryEvent] = [] + event_id: int = 1 + ops: list[Operation] = execution.operations + for op in ops: + if op.start_timestamp is not None: + started = HistoryEvent.from_operation_started( + op, event_id, include_execution_data + ) + all_events.append(started) + event_id += 1 + if op.end_timestamp is not None and op.status in TERMINAL_STATUSES: + finished = HistoryEvent.from_operation_finished( + op, event_id, include_execution_data + ) + all_events.append(finished) + event_id += 1 - # Apply pagination + # Apply cursor-based pagination if max_items is None: max_items = 100 - start_index = 0 + # Handle pagination marker + start_index: int = 0 if marker: try: - start_index = int(marker) + marker_event_id: int = int(marker) + # Find the index of the first event with event_id >= marker + start_index = len(all_events) + for i, e in enumerate(all_events): + if e.event_id >= marker_event_id: + start_index = i + break except ValueError: start_index = 0 - end_index = start_index + max_items - paginated_events = events[start_index:end_index] - - next_marker = None - if end_index < len(events): - next_marker = str(end_index) + # Apply reverse order after pagination setup + if reverse_order: + all_events.reverse() + # Adjust start_index for reversed order + if marker: + try: + marker_event_id = int(marker) + # In reverse order, we want events with event_id < marker + start_index = len(all_events) + for i, e in enumerate(all_events): + if e.event_id < marker_event_id: + start_index = i + break + except ValueError: + start_index = 0 + + # Get paginated events + end_index: int = start_index + max_items + paginated_events: list[HistoryEvent] = all_events[start_index:end_index] + + # Generate next marker + next_marker: str | None = None + if end_index < len(all_events): + if reverse_order: + # Next marker is the event_id of the last returned event + next_marker = ( + str(paginated_events[-1].event_id) if paginated_events else None + ) + else: + # Next marker is the event_id of the next event after the last returned + next_marker = ( + str(all_events[end_index].event_id) + if end_index < len(all_events) + else None + ) return GetDurableExecutionHistoryResponse( events=paginated_events, next_marker=next_marker diff --git a/src/aws_durable_execution_sdk_python_testing/model.py b/src/aws_durable_execution_sdk_python_testing/model.py index 49f30fb..cf85f27 100644 --- a/src/aws_durable_execution_sdk_python_testing/model.py +++ b/src/aws_durable_execution_sdk_python_testing/model.py @@ -4,6 +4,7 @@ import datetime from dataclasses import dataclass, replace +from enum import Enum from typing import Any # Import existing types from the main SDK - REUSE EVERYTHING POSSIBLE @@ -36,6 +37,43 @@ ) +class EventType(Enum): + """Event types for durable execution events.""" + + EXECUTION_STARTED = "ExecutionStarted" + EXECUTION_SUCCEEDED = "ExecutionSucceeded" + EXECUTION_FAILED = "ExecutionFailed" + EXECUTION_TIMED_OUT = "ExecutionTimedOut" + EXECUTION_STOPPED = "ExecutionStopped" + CONTEXT_STARTED = "ContextStarted" + CONTEXT_SUCCEEDED = "ContextSucceeded" + CONTEXT_FAILED = "ContextFailed" + WAIT_STARTED = "WaitStarted" + WAIT_SUCCEEDED = "WaitSucceeded" + WAIT_CANCELLED = "WaitCancelled" + STEP_STARTED = "StepStarted" + STEP_SUCCEEDED = "StepSucceeded" + STEP_FAILED = "StepFailed" + CHAINED_INVOKE_STARTED = "ChainedInvokeStarted" + CHAINED_INVOKE_SUCCEEDED = "ChainedInvokeSucceeded" + CHAINED_INVOKE_FAILED = "ChainedInvokeFailed" + CHAINED_INVOKE_TIMED_OUT = "ChainedInvokeTimedOut" + CHAINED_INVOKE_STOPPED = "ChainedInvokeStopped" + CALLBACK_STARTED = "CallbackStarted" + CALLBACK_SUCCEEDED = "CallbackSucceeded" + CALLBACK_FAILED = "CallbackFailed" + CALLBACK_TIMED_OUT = "CallbackTimedOut" + + +TERMINAL_STATUSES: set[OperationStatus] = { + OperationStatus.SUCCEEDED, + OperationStatus.FAILED, + OperationStatus.TIMED_OUT, + OperationStatus.STOPPED, + OperationStatus.CANCELLED, +} + + @dataclass(frozen=True) class LambdaContext(LambdaContextProtocol): """Lambda context for testing.""" @@ -58,6 +96,7 @@ def log(self, msg) -> None: pass # No-op for testing +# region web_api_models # Web API specific models (not in Smithy but needed for web interface) @dataclass(frozen=True) class StartDurableExecutionInput: @@ -141,6 +180,10 @@ def to_dict(self) -> dict[str, Any]: return result +# endregion web_api_models + + +# region smithy_api_models # Smithy-based API models @dataclass(frozen=True) class GetDurableExecutionRequest: @@ -424,6 +467,10 @@ def to_dict(self) -> dict[str, Any]: return result +# endregion smithy_api_models + + +# region event_structures # Event-related structures from Smithy model @dataclass(frozen=True) class EventInput: @@ -445,6 +492,17 @@ def to_dict(self) -> dict[str, Any]: result["Payload"] = self.payload return result + @classmethod + def from_details( + cls, + details: ExecutionDetails, + include: bool = True, # noqa: FBT001, FBT002 + ) -> EventInput: + details_result: str | None = details.input_payload if details else None + payload: str | None = details_result if include else None + truncated: bool = not include + return cls(payload=payload, truncated=truncated) + @dataclass(frozen=True) class EventResult: @@ -466,6 +524,17 @@ def to_dict(self) -> dict[str, Any]: result["Payload"] = self.payload return result + @classmethod + def from_details( + cls, + details: CallbackDetails | StepDetails | ChainedInvokeDetails | ContextDetails, + include: bool = True, # noqa: FBT001, FBT002 + ) -> EventResult: + details_result: str | None = details.result if details else None + payload: str | None = details_result if include else None + truncated: bool = not include + return cls(payload=payload, truncated=truncated) + @dataclass(frozen=True) class EventError: @@ -491,6 +560,14 @@ def to_dict(self) -> dict[str, Any]: result["Payload"] = self.payload.to_dict() return result + @classmethod + def from_details( + cls, + details: CallbackDetails | StepDetails | ChainedInvokeDetails | ContextDetails, + ) -> EventError: + error_object: ErrorObject | None = details.error if details else None + return cls(error_object) + @dataclass(frozen=True) class RetryDetails: @@ -1013,6 +1090,10 @@ def to_dict(self) -> dict[str, Any]: return result +# endregion event_structures + + +# region event_class @dataclass(frozen=True) class Event: """Event structure from Smithy model.""" @@ -1272,7 +1353,646 @@ def to_dict(self) -> dict[str, Any]: ) return result + # region execution + @classmethod + def create_execution_event( + cls, + operation: Operation, + event_timestamp: datetime.datetime, + event_id: int, + operation_status: OperationStatus | None = None, + event_input: EventInput | None = None, + event_result: EventResult | None = None, + event_error: EventError | None = None, + execution_timeout: int | None = None, + ) -> Event: + """Create execution event based on action.""" + operation_status = operation_status or operation.status + event_sub_type: str | None = ( + operation.sub_type.value if operation.sub_type else None + ) + + match operation_status: + case OperationStatus.STARTED: + return cls( + event_type=EventType.EXECUTION_STARTED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + execution_started_details=ExecutionStartedDetails( + input=event_input, + execution_timeout=execution_timeout, + ), + ) + case OperationStatus.SUCCEEDED: + return cls( + event_type=EventType.EXECUTION_SUCCEEDED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + execution_succeeded_details=ExecutionSucceededDetails( + result=event_result + ), + ) + case OperationStatus.FAILED: + return cls( + event_type=EventType.EXECUTION_FAILED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + execution_failed_details=ExecutionFailedDetails(error=event_error), + ) + case OperationStatus.TIMED_OUT: + return cls( + event_type=EventType.EXECUTION_TIMED_OUT.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + execution_timed_out_details=ExecutionTimedOutDetails( + error=event_error + ), + ) + case OperationStatus.STOPPED: + return cls( + event_type=EventType.EXECUTION_STOPPED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + execution_stopped_details=ExecutionStoppedDetails( + error=event_error + ), + ) + case _: + msg = f"Operation status {operation_status} is not valid for execution operations. Valid statuses are: STARTED, SUCCEEDED, FAILED, TIMED_OUT, STOPPED" + raise InvalidParameterValueException(msg) + + # endregion execution + + # region context + @classmethod + def create_context_event( + cls, + operation: Operation, + event_timestamp: datetime.datetime, + event_id: int, + operation_status: OperationStatus | None = None, + include_execution_data: bool = False, + ) -> Event: + """Create context event based on action.""" + operation_status = operation_status or operation.status + context_details: ContextDetails | None = operation.context_details + event_error: EventError | None = ( + EventError.from_details(context_details) if context_details else None + ) + event_result: EventResult | None = ( + EventResult.from_details(context_details, include_execution_data) + if context_details + else None + ) + event_sub_type: str | None = ( + operation.sub_type.value if operation.sub_type else None + ) + + match operation_status: + case OperationStatus.STARTED: + return cls( + event_type=EventType.CONTEXT_STARTED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + context_started_details=ContextStartedDetails(), + ) + case OperationStatus.SUCCEEDED: + return cls( + event_type=EventType.CONTEXT_SUCCEEDED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + context_succeeded_details=ContextSucceededDetails( + result=event_result + ), + ) + case OperationStatus.FAILED: + return cls( + event_type=EventType.CONTEXT_FAILED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + context_failed_details=ContextFailedDetails(error=event_error), + ) + case _: + msg = ( + f"Operation status {operation_status} is not valid for context operations. " + f"Valid statuses are: STARTED, SUCCEEDED, FAILED" + ) + raise InvalidParameterValueException(msg) + + # endregion context + + # region wait + @classmethod + def create_wait_event( + cls, + operation: Operation, + event_timestamp: datetime.datetime, + event_id: int, + operation_status: OperationStatus | None = None, + ) -> Event: + """Create wait event based on action.""" + operation_status = operation_status or operation.status + wait_details: WaitDetails | None = operation.wait_details + scheduled_end_timestamp: datetime.datetime | None = ( + wait_details.scheduled_end_timestamp if wait_details else None + ) + duration: int | None = None + if ( + wait_details + and wait_details.scheduled_end_timestamp + and operation.start_timestamp + ): + duration = int( + ( + wait_details.scheduled_end_timestamp - operation.start_timestamp + ).total_seconds() + ) + event_sub_type: str | None = ( + operation.sub_type.value if operation.sub_type else None + ) + + match operation_status: + case OperationStatus.STARTED: + return cls( + event_type=EventType.WAIT_STARTED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + wait_started_details=WaitStartedDetails( + duration=duration, + scheduled_end_timestamp=scheduled_end_timestamp, + ), + ) + case OperationStatus.SUCCEEDED: + return cls( + event_type=EventType.WAIT_SUCCEEDED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + wait_succeeded_details=WaitSucceededDetails(duration=duration), + ) + case OperationStatus.CANCELLED: + return cls( + event_type=EventType.WAIT_CANCELLED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + wait_cancelled_details=WaitCancelledDetails(), + ) + case _: + msg = ( + f"Operation status {operation_status} is not valid for wait operations. " + f"Valid statuses are: STARTED, SUCCEEDED, CANCELLED" + ) + raise InvalidParameterValueException(msg) + + # endregion wait + + # region step + @classmethod + def create_step_event( + cls, + operation: Operation, + event_timestamp: datetime.datetime, + event_id: int, + operation_status: OperationStatus | None = None, + include_execution_data: bool = False, + ) -> Event: + """Create step event based on action.""" + operation_status = operation_status or operation.status + step_details: StepDetails | None = operation.step_details + event_error: EventError | None = ( + EventError.from_details(step_details) if step_details else None + ) + event_result: EventResult | None = ( + EventResult.from_details(step_details, include_execution_data) + if step_details + else None + ) + event_sub_type: str | None = ( + operation.sub_type.value if operation.sub_type else None + ) + + match operation_status: + case OperationStatus.STARTED: + return cls( + event_type=EventType.STEP_STARTED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + step_started_details=StepStartedDetails(), + ) + case OperationStatus.SUCCEEDED: + return cls( + event_type=EventType.STEP_SUCCEEDED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + step_succeeded_details=StepSucceededDetails(result=event_result), + ) + case OperationStatus.FAILED: + return cls( + event_type=EventType.STEP_FAILED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + step_failed_details=StepFailedDetails(error=event_error), + ) + case _: + msg = ( + f"Operation status {operation_status} is not valid for step operations. " + f"Valid statuses are: STARTED, SUCCEEDED, FAILED" + ) + raise InvalidParameterValueException(msg) + + # endregion step + + # region chained_invoke + @classmethod + def create_chained_invoke_event( + cls, + operation: Operation, + event_timestamp: datetime.datetime, + event_id: int, + operation_status: OperationStatus | None = None, + include_execution_data: bool = False, + ) -> Event: + """Create chained invoke event based on action.""" + operation_status = operation_status or operation.status + chained_invoke_details: ChainedInvokeDetails | None = ( + operation.chained_invoke_details + ) + event_error: EventError | None = ( + EventError.from_details(chained_invoke_details) + if chained_invoke_details + else None + ) + event_result: EventResult | None = ( + EventResult.from_details(chained_invoke_details, include_execution_data) + if chained_invoke_details + else None + ) + event_sub_type: str | None = ( + operation.sub_type.value if operation.sub_type else None + ) + + match operation_status: + case OperationStatus.STARTED: + return cls( + event_type=EventType.CHAINED_INVOKE_STARTED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + chained_invoke_started_details=ChainedInvokeStartedDetails( + input=EventInput(payload=None, truncated=False) + ), + ) + case OperationStatus.SUCCEEDED: + return cls( + event_type=EventType.CHAINED_INVOKE_SUCCEEDED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + chained_invoke_succeeded_details=ChainedInvokeSucceededDetails( + result=event_result + ), + ) + case OperationStatus.FAILED: + return cls( + event_type=EventType.CHAINED_INVOKE_FAILED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + chained_invoke_failed_details=ChainedInvokeFailedDetails( + error=event_error + ), + ) + case OperationStatus.TIMED_OUT: + return cls( + event_type=EventType.CHAINED_INVOKE_TIMED_OUT.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + chained_invoke_timed_out_details=ChainedInvokeTimedOutDetails( + error=event_error + ), + ) + case OperationStatus.STOPPED: + return cls( + event_type=EventType.CHAINED_INVOKE_STOPPED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + chained_invoke_stopped_details=ChainedInvokeStoppedDetails( + error=event_error + ), + ) + case _: + msg = ( + f"Operation status {operation_status} is not valid for chained invoke operations. Valid statuses are: " + f"STARTED, SUCCEEDED, FAILED, TIMED_OUT, STOPPED" + ) + raise InvalidParameterValueException(msg) + + # endregion chained_invoke + + # region callback + @classmethod + def create_callback_event( + cls, + operation: Operation, + event_timestamp: datetime.datetime, + event_id: int, + operation_status: OperationStatus | None = None, + include_execution_data: bool = False, + ) -> Event: + """Create callback event based on action.""" + operation_status = operation_status or operation.status + callback_details: CallbackDetails | None = operation.callback_details + callback_id: str | None = ( + callback_details.callback_id if callback_details else None + ) + event_error: EventError | None = ( + EventError.from_details(callback_details) if callback_details else None + ) + event_result: EventResult | None = ( + EventResult.from_details(callback_details, include_execution_data) + if callback_details + else None + ) + event_sub_type: str | None = ( + operation.sub_type.value if operation.sub_type else None + ) + + match operation_status: + case OperationStatus.STARTED: + return cls( + event_type=EventType.CALLBACK_STARTED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + callback_started_details=CallbackStartedDetails( + callback_id=callback_id + ), + ) + case OperationStatus.SUCCEEDED: + return cls( + event_type=EventType.CALLBACK_SUCCEEDED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + callback_succeeded_details=CallbackSucceededDetails( + result=event_result + ), + ) + case OperationStatus.FAILED: + return cls( + event_type=EventType.CALLBACK_FAILED.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + callback_failed_details=CallbackFailedDetails(error=event_error), + ) + case OperationStatus.TIMED_OUT: + return cls( + event_type=EventType.CALLBACK_TIMED_OUT.value, + event_timestamp=event_timestamp, + sub_type=event_sub_type, + event_id=event_id, + operation_id=operation.operation_id, + name=operation.name, + parent_id=operation.parent_id, + callback_timed_out_details=CallbackTimedOutDetails( + error=event_error + ), + ) + case _: + msg = ( + f"Operation status {operation_status} is not valid for callback operations. " + f"Valid statuses are: STARTED, SUCCEEDED, FAILED, TIMED_OUT" + ) + raise InvalidParameterValueException(msg) + + # endregion callback + + @classmethod + def from_operation_started( + cls, + operation: Operation, + event_id: int, + include_execution_data: bool = False, # noqa: FBT001, FBT002 + ) -> Event: + """Convert operation to started event.""" + if operation.start_timestamp is None: + msg: str = "Operation start timestamp cannot be None when converting to started event" + raise InvalidParameterValueException(msg) + match operation.operation_type: + case OperationType.EXECUTION: + event_input: EventInput | None = ( + EventInput.from_details( + operation.execution_details, + include=include_execution_data, + ) + if operation.execution_details + else None + ) + return cls.create_execution_event( + operation=operation, + event_timestamp=operation.start_timestamp, + event_id=event_id, + operation_status=OperationStatus.STARTED, + event_input=event_input, + ) + + case OperationType.STEP: + return cls.create_step_event( + operation=operation, + event_timestamp=operation.start_timestamp, + event_id=event_id, + operation_status=OperationStatus.STARTED, + ) + + case OperationType.WAIT: + return cls.create_wait_event( + operation=operation, + event_timestamp=operation.start_timestamp, + event_id=event_id, + operation_status=OperationStatus.STARTED, + ) + + case OperationType.CALLBACK: + return cls.create_callback_event( + operation=operation, + event_timestamp=operation.start_timestamp, + event_id=event_id, + operation_status=OperationStatus.STARTED, + ) + + case OperationType.CHAINED_INVOKE: + return cls.create_chained_invoke_event( + operation=operation, + event_timestamp=operation.start_timestamp, + event_id=event_id, + operation_status=OperationStatus.STARTED, + ) + case OperationType.CONTEXT: + return cls.create_context_event( + operation=operation, + event_timestamp=operation.start_timestamp, + event_id=event_id, + operation_status=OperationStatus.STARTED, + ) + case _: + msg = f"Unknown operation type: {operation.operation_type}" + raise InvalidParameterValueException(msg) + + @classmethod + def from_operation_finished( + cls, + operation: Operation, + event_id: int, + include_execution_data: bool = False, # noqa: FBT001, FBT002 + ) -> Event: + """Convert operation to finished event.""" + if operation.end_timestamp is None: + msg: str = "Operation end timestamp cannot be None when converting to finished event" + raise InvalidParameterValueException(msg) + + if operation.status not in TERMINAL_STATUSES: + msg = f"Operation status must be one of SUCCEEDED, FAILED, TIMED_OUT, STOPPED, or CANCELLED. Got: {operation.status}" + raise InvalidParameterValueException(msg) + + match operation.operation_type: + case OperationType.EXECUTION: + return cls.create_execution_event( + operation=operation, + event_timestamp=operation.end_timestamp, + event_id=event_id, + ) + + case OperationType.WAIT: + return cls.create_wait_event( + operation=operation, + event_timestamp=operation.end_timestamp, + event_id=event_id, + ) + + case OperationType.STEP: + return cls.create_step_event( + operation=operation, + event_timestamp=operation.end_timestamp, + event_id=event_id, + include_execution_data=include_execution_data, + ) + + case OperationType.CALLBACK: + return cls.create_callback_event( + operation=operation, + event_timestamp=operation.end_timestamp, + event_id=event_id, + include_execution_data=include_execution_data, + ) + + case OperationType.CHAINED_INVOKE: + return cls.create_chained_invoke_event( + operation=operation, + event_timestamp=operation.end_timestamp, + event_id=event_id, + include_execution_data=include_execution_data, + ) + + case OperationType.CONTEXT: + return cls.create_context_event( + operation=operation, + event_timestamp=operation.end_timestamp, + event_id=event_id, + include_execution_data=include_execution_data, + ) + + case _: + msg = f"Unknown operation type: {operation.operation_type}" + raise InvalidParameterValueException(msg) + + +# endregion event_class + + +# region history_models @dataclass(frozen=True) class HistoryEventTypeConfig: """Configuration for how to process a specific event type.""" @@ -1477,7 +2197,7 @@ def events_to_operations(events: list[Event]) -> list[Operation]: List of operations, one per unique operation ID Raises: - ValueError: When required fields are missing from an event + InvalidParameterValueException: When required fields are missing from an event Note: InvocationCompleted events are currently skipped as they don't represent @@ -1489,7 +2209,7 @@ def events_to_operations(events: list[Event]) -> list[Operation]: for event in events: if not event.event_type: msg = "Missing required 'event_type' field in event" - raise ValueError(msg) + raise InvalidParameterValueException(msg) # Get event type configuration event_config: HistoryEventTypeConfig | None = HISTORY_EVENT_TYPES.get( @@ -1497,7 +2217,7 @@ def events_to_operations(events: list[Event]) -> list[Operation]: ) if not event_config: msg = f"Unknown event type: {event.event_type}" - raise ValueError(msg) + raise InvalidParameterValueException(msg) # TODO: add support for populating invocation information from InvocationCompleted event if event.event_type == "InvocationCompleted": @@ -1505,7 +2225,7 @@ def events_to_operations(events: list[Event]) -> list[Operation]: if not event.operation_id: msg = f"Missing required 'operation_id' field in event {event.event_id}" - raise ValueError(msg) + raise InvalidParameterValueException(msg) # Get previous operation if it exists previous_operation: Operation | None = operations_map.get(event.operation_id) @@ -1523,8 +2243,8 @@ def events_to_operations(events: list[Event]) -> list[Operation]: if event.sub_type: try: sub_type = OperationSubType(event.sub_type) - except ValueError: - pass + except ValueError as e: + raise InvalidParameterValueException(str(e)) from e # Create base operation operation = Operation( @@ -1851,6 +2571,10 @@ def to_dict(self) -> dict[str, Any]: return result +# endregion history_models + + +# region callback_models # Callback-related models @dataclass(frozen=True) class SendDurableExecutionCallbackSuccessRequest: @@ -1927,6 +2651,10 @@ class SendDurableExecutionCallbackHeartbeatResponse: """Response from sending callback heartbeat.""" +# endregion callback_models + + +# region checkpoint_models # Checkpoint-related models @dataclass(frozen=True) class CheckpointUpdatedExecutionState: @@ -2049,6 +2777,10 @@ def to_dict(self) -> dict[str, Any]: return result +# endregion checkpoint_models + + +# region error_models # Error response structure for consistent error handling @dataclass(frozen=True) class ErrorResponse: @@ -2094,3 +2826,6 @@ def to_dict(self) -> dict[str, Any]: error_data["requestId"] = self.request_id return {"error": error_data} + + +# endregion error_models diff --git a/tests/checkpoint/processors/base_test.py b/tests/checkpoint/processors/base_test.py index d058944..ee588c4 100644 --- a/tests/checkpoint/processors/base_test.py +++ b/tests/checkpoint/processors/base_test.py @@ -318,9 +318,11 @@ def test_create_invoke_details_no_options(): def test_create_wait_details_with_current_operation(): processor = MockProcessor() - scheduled_time = datetime.datetime.now(tz=datetime.UTC) + scheduled_end_timestamp = datetime.datetime.now(tz=datetime.UTC) current_op = Mock() - current_op.wait_details = WaitDetails(scheduled_end_timestamp=scheduled_time) + current_op.wait_details = WaitDetails( + scheduled_end_timestamp=scheduled_end_timestamp + ) wait_options = WaitOptions(wait_seconds=30) update = OperationUpdate( @@ -333,7 +335,7 @@ def test_create_wait_details_with_current_operation(): result = processor.create_wait_details(update, current_op) assert isinstance(result, WaitDetails) - assert result.scheduled_end_timestamp == scheduled_time + assert result.scheduled_end_timestamp == scheduled_end_timestamp def test_create_wait_details_without_current_operation(): diff --git a/tests/event_factory_test.py b/tests/event_factory_test.py new file mode 100644 index 0000000..43a8ad2 --- /dev/null +++ b/tests/event_factory_test.py @@ -0,0 +1,1284 @@ +"""Tests for Event factory methods. + +This module tests all the event creation factory methods in the Event class. +""" + +from datetime import UTC, datetime + +import pytest +from aws_durable_execution_sdk_python.lambda_service import ( + ErrorObject, + OperationStatus, + OperationType, +) + +from aws_durable_execution_sdk_python_testing.exceptions import ( + InvalidParameterValueException, +) +from aws_durable_execution_sdk_python_testing.model import ( + CheckpointDurableExecutionRequest, + ErrorResponse, + Event, + EventError, + EventInput, + EventResult, + Execution, + ExecutionStartedDetails, + LambdaContext, + StartDurableExecutionInput, +) + + +# Helper function to create mock operations +def create_mock_operation( + operation_id: str = "op-1", + name: str = "test_op", + parent_id=None, + status: OperationStatus = OperationStatus.STARTED, +): + from unittest.mock import Mock + + op = Mock() + op.operation_id = operation_id + op.name = name + op.parent_id = parent_id + op.status = status + return op + + +# region execution-tests +def test_create_execution_started(): + operation = create_mock_operation( + "op-1", "test_execution", status=OperationStatus.STARTED + ) + event_input = EventInput(payload='{"test": "data"}', truncated=False) + event = Event.create_execution_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=1, + operation_status=OperationStatus.STARTED, + event_input=event_input, + execution_timeout=300, + ) + + assert event.event_type == "ExecutionStarted" + assert event.operation_id == "op-1" + assert event.name == "test_execution" + assert event.execution_started_details.input.payload == '{"test": "data"}' + assert event.execution_started_details.execution_timeout == 300 + + +def test_create_execution_succeeded(): + operation = create_mock_operation("op-1", status=OperationStatus.SUCCEEDED) + event_result = EventResult(payload='{"result": "success"}', truncated=False) + event = Event.create_execution_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=2, + event_result=event_result, + ) + + assert event.event_type == "ExecutionSucceeded" + assert event.execution_succeeded_details.result.payload == '{"result": "success"}' + + +def test_create_execution_failed(): + operation = create_mock_operation("op-1", status=OperationStatus.FAILED) + error_obj = ErrorObject.from_message("Execution failed") + event_error = EventError(payload=error_obj, truncated=False) + event = Event.create_execution_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=3, + event_error=event_error, + ) + + assert event.event_type == "ExecutionFailed" + assert event.execution_failed_details.error.payload.message == "Execution failed" + + +def test_create_execution_timed_out(): + operation = create_mock_operation("op-1", status=OperationStatus.TIMED_OUT) + error_obj = ErrorObject.from_message("Execution timed out") + event_error = EventError(payload=error_obj, truncated=False) + event = Event.create_execution_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=4, + event_error=event_error, + ) + + assert event.event_type == "ExecutionTimedOut" + assert ( + event.execution_timed_out_details.error.payload.message == "Execution timed out" + ) + + +def test_create_execution_stopped(): + operation = create_mock_operation("op-1", status=OperationStatus.STOPPED) + error_obj = ErrorObject.from_message("Execution stopped") + event_error = EventError(payload=error_obj, truncated=False) + event = Event.create_execution_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=5, + event_error=event_error, + ) + + assert event.event_type == "ExecutionStopped" + assert event.execution_stopped_details.error.payload.message == "Execution stopped" + + +def test_create_execution_invalid_status(): + operation = create_mock_operation("op-1", status=OperationStatus.CANCELLED) + with pytest.raises( + InvalidParameterValueException, + match="Operation status .* is not valid for execution operations", + ): + Event.create_execution_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=1, + operation_status=OperationStatus.CANCELLED, + ) + + +# endregion execution-tests + + +# region context-tests +def test_create_context_started(): + operation = create_mock_operation( + "ctx-1", "test_context", status=OperationStatus.STARTED + ) + event = Event.create_context_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=1, + operation_status=OperationStatus.STARTED, + ) + + assert event.event_type == "ContextStarted" + assert event.operation_id == "ctx-1" + assert event.name == "test_context" + assert event.context_started_details is not None + + +def test_create_context_succeeded(): + operation = create_mock_operation("ctx-1", status=OperationStatus.SUCCEEDED) + operation.context_details = type( + "MockDetails", (), {"result": '{"context": "result"}', "error": None} + )() + event = Event.create_context_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=2, + include_execution_data=True, + ) + + assert event.event_type == "ContextSucceeded" + assert event.context_succeeded_details.result.payload == '{"context": "result"}' + + +def test_create_context_failed(): + operation = create_mock_operation("ctx-1", status=OperationStatus.FAILED) + error_obj = ErrorObject.from_message("Context failed") + operation.context_details = type( + "MockDetails", (), {"result": None, "error": error_obj} + )() + event = Event.create_context_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=3, + ) + + assert event.event_type == "ContextFailed" + assert event.context_failed_details.error.payload.message == "Context failed" + + +def test_create_context_invalid_status(): + operation = create_mock_operation("ctx-1", status=OperationStatus.TIMED_OUT) + with pytest.raises( + InvalidParameterValueException, + match="Operation status .* is not valid for context operations", + ): + Event.create_context_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=1, + operation_status=OperationStatus.TIMED_OUT, + ) + + +# endregion context-tests + + +# region wait-tests +def test_create_wait_started(): + operation = create_mock_operation("wait-1", status=OperationStatus.STARTED) + operation.start_timestamp = datetime.fromisoformat("2024-01-01T12:00:00Z") + operation.wait_details = type( + "MockDetails", + (), + {"scheduled_end_timestamp": datetime.fromisoformat("2024-01-01T12:05:00Z")}, + )() + event = Event.create_wait_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=1, + operation_status=OperationStatus.STARTED, + ) + + assert event.event_type == "WaitStarted" + assert event.wait_started_details.duration == 300 + assert event.wait_started_details.scheduled_end_timestamp == datetime.fromisoformat( + "2024-01-01T12:05:00Z" + ) + + +def test_create_wait_succeeded(): + operation = create_mock_operation("wait-1", status=OperationStatus.SUCCEEDED) + operation.start_timestamp = datetime.fromisoformat("2024-01-01T12:00:00Z") + operation.wait_details = type( + "MockDetails", + (), + {"scheduled_end_timestamp": datetime.fromisoformat("2024-01-01T12:05:00Z")}, + )() + event = Event.create_wait_event( + operation=operation, + event_timestamp="2024-01-01T12:05:00Z", + event_id=2, + ) + + assert event.event_type == "WaitSucceeded" + assert event.wait_succeeded_details.duration == 300 + + +def test_create_wait_cancelled(): + operation = create_mock_operation("wait-1", status=OperationStatus.CANCELLED) + operation.wait_details = None + event = Event.create_wait_event( + operation=operation, + event_timestamp="2024-01-01T12:03:00Z", + event_id=3, + ) + + assert event.event_type == "WaitCancelled" + assert event.wait_cancelled_details is not None + + +def test_create_wait_invalid_status(): + operation = create_mock_operation("wait-1", status=OperationStatus.FAILED) + operation.wait_details.scheduled_end_timestamp = operation.start_timestamp = ( + datetime.fromisoformat("2024-01-01T12:00:00Z") + ) + with pytest.raises( + InvalidParameterValueException, + match="Operation status .* is not valid for wait operations", + ): + Event.create_wait_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=1, + operation_status=OperationStatus.FAILED, + ) + + +# endregion wait-tests + + +# region step-tests +def test_create_step_started(): + operation = create_mock_operation( + "step-1", "test_step", status=OperationStatus.STARTED + ) + event = Event.create_step_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=1, + operation_status=OperationStatus.STARTED, + ) + + assert event.event_type == "StepStarted" + assert event.operation_id == "step-1" + assert event.name == "test_step" + assert event.step_started_details is not None + + +def test_create_step_succeeded(): + operation = create_mock_operation("step-1", status=OperationStatus.SUCCEEDED) + operation.step_details = type( + "MockDetails", (), {"result": '{"step": "result"}', "error": None} + )() + event = Event.create_step_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=2, + include_execution_data=True, + ) + + assert event.event_type == "StepSucceeded" + assert event.step_succeeded_details.result.payload == '{"step": "result"}' + + +def test_create_step_failed(): + operation = create_mock_operation("step-1", status=OperationStatus.FAILED) + error_obj = ErrorObject.from_message("Step failed") + operation.step_details = type( + "MockDetails", (), {"result": None, "error": error_obj} + )() + event = Event.create_step_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=3, + ) + + assert event.event_type == "StepFailed" + assert event.step_failed_details.error.payload.message == "Step failed" + + +def test_create_step_invalid_status(): + operation = create_mock_operation("step-1", status=OperationStatus.TIMED_OUT) + with pytest.raises( + InvalidParameterValueException, + match="Operation status .* is not valid for step operations", + ): + Event.create_step_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=1, + operation_status=OperationStatus.TIMED_OUT, + ) + + +# endregion step-tests + + +# region chained_invoke +def test_create_chained_invoke_started(): + operation = create_mock_operation( + "invoke-1", "test_invoke", status=OperationStatus.STARTED + ) + event = Event.create_chained_invoke_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=1, + operation_status=OperationStatus.STARTED, + ) + + assert event.event_type == "ChainedInvokeStarted" + assert event.operation_id == "invoke-1" + assert event.name == "test_invoke" + assert event.chained_invoke_started_details is not None + + +# endregion callback + + +# endregion helpers-test + + +def test_create_chained_invoke_succeeded(): + operation = create_mock_operation("invoke-1", status=OperationStatus.SUCCEEDED) + operation.chained_invoke_details = type( + "MockDetails", (), {"result": '{"invoke": "result"}', "error": None} + )() + event = Event.create_chained_invoke_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=2, + include_execution_data=True, + ) + + assert event.event_type == "ChainedInvokeSucceeded" + assert ( + event.chained_invoke_succeeded_details.result.payload == '{"invoke": "result"}' + ) + + +def test_create_chained_invoke_failed(): + operation = create_mock_operation("invoke-1", status=OperationStatus.FAILED) + error_obj = ErrorObject.from_message("Invoke failed") + operation.chained_invoke_details = type( + "MockDetails", (), {"result": None, "error": error_obj} + )() + event = Event.create_chained_invoke_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=3, + ) + + assert event.event_type == "ChainedInvokeFailed" + assert event.chained_invoke_failed_details.error.payload.message == "Invoke failed" + + +def test_create_chained_invoke_timed_out(): + operation = create_mock_operation("invoke-1", status=OperationStatus.TIMED_OUT) + error_obj = ErrorObject.from_message("Invoke timed out") + operation.chained_invoke_details = type( + "MockDetails", (), {"result": None, "error": error_obj} + )() + event = Event.create_chained_invoke_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=4, + ) + + assert event.event_type == "ChainedInvokeTimedOut" + assert ( + event.chained_invoke_timed_out_details.error.payload.message + == "Invoke timed out" + ) + + +def test_create_chained_invoke_stopped(): + operation = create_mock_operation("invoke-1", status=OperationStatus.STOPPED) + error_obj = ErrorObject.from_message("Invoke stopped") + operation.chained_invoke_details = type( + "MockDetails", (), {"result": None, "error": error_obj} + )() + event = Event.create_chained_invoke_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=5, + ) + + assert event.event_type == "ChainedInvokeStopped" + assert ( + event.chained_invoke_stopped_details.error.payload.message == "Invoke stopped" + ) + + +def test_create_chained_invoke_invalid_status(): + operation = create_mock_operation("invoke-1", status=OperationStatus.CANCELLED) + with pytest.raises( + InvalidParameterValueException, + match="Operation status .* is not valid for chained invoke operations", + ): + Event.create_chained_invoke_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=1, + operation_status=OperationStatus.CANCELLED, + ) + + +# endregion chained_invoke + + +# region callback-tests +def test_create_callback_started(): + operation = create_mock_operation( + "callback-1", "test_callback", status=OperationStatus.STARTED + ) + operation.callback_details = type( + "MockDetails", (), {"callback_id": "cb-123", "result": None, "error": None} + )() + event = Event.create_callback_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=1, + operation_status=OperationStatus.STARTED, + ) + + assert event.event_type == "CallbackStarted" + assert event.operation_id == "callback-1" + assert event.name == "test_callback" + assert event.callback_started_details.callback_id == "cb-123" + + +def test_create_callback_succeeded(): + operation = create_mock_operation("callback-1", status=OperationStatus.SUCCEEDED) + operation.callback_details = type( + "MockDetails", + (), + {"callback_id": None, "result": '{"callback": "result"}', "error": None}, + )() + event = Event.create_callback_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=2, + include_execution_data=True, + ) + + assert event.event_type == "CallbackSucceeded" + assert event.callback_succeeded_details.result.payload == '{"callback": "result"}' + + +def test_create_callback_failed(): + operation = create_mock_operation("callback-1", status=OperationStatus.FAILED) + error_obj = ErrorObject.from_message("Callback failed") + operation.callback_details = type( + "MockDetails", (), {"callback_id": None, "result": None, "error": error_obj} + )() + event = Event.create_callback_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=3, + ) + + assert event.event_type == "CallbackFailed" + assert event.callback_failed_details.error.payload.message == "Callback failed" + + +def test_create_callback_timed_out(): + operation = create_mock_operation("callback-1", status=OperationStatus.TIMED_OUT) + error_obj = ErrorObject.from_message("Callback timed out") + operation.callback_details = type( + "MockDetails", (), {"callback_id": None, "result": None, "error": error_obj} + )() + event = Event.create_callback_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=4, + ) + + assert event.event_type == "CallbackTimedOut" + assert ( + event.callback_timed_out_details.error.payload.message == "Callback timed out" + ) + + +def test_create_callback_invalid_status(): + operation = create_mock_operation("callback-1", status=OperationStatus.STOPPED) + with pytest.raises( + InvalidParameterValueException, + match="Operation status .* is not valid for callback operations", + ): + Event.create_callback_event( + operation=operation, + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=1, + operation_status=OperationStatus.STOPPED, + ) + + +# endregion callback-tests + + +# region model-tests +def test_lambda_context(): + context = LambdaContext(aws_request_id="test-123") + assert context.aws_request_id == "test-123" + assert context.get_remaining_time_in_millis() == 900000 + context.log("test message") # Should not raise + + +def test_start_durable_execution_input_missing_field(): + with pytest.raises( + InvalidParameterValueException, match="Missing required field: AccountId" + ): + StartDurableExecutionInput.from_dict({}) + + +def test_start_durable_execution_input_to_dict_with_optionals(): + input_obj = StartDurableExecutionInput( + account_id="123456789", + function_name="test-func", + function_qualifier="$LATEST", + execution_name="test-exec", + execution_timeout_seconds=300, + execution_retention_period_days=7, + invocation_id="inv-123", + trace_fields={"key": "value"}, + tenant_id="tenant-123", + input='{"test": "data"}', + ) + result = input_obj.to_dict() + assert result["InvocationId"] == "inv-123" + assert result["TraceFields"] == {"key": "value"} + assert result["TenantId"] == "tenant-123" + assert result["Input"] == '{"test": "data"}' + + +def test_execution_from_dict_empty_function_arn(): + data = { + "DurableExecutionArn": "arn:aws:lambda:us-east-1:123456789:function:test", + "DurableExecutionName": "test-exec", + "Status": "SUCCEEDED", + "StartTimestamp": 1640995200.0, + } + execution = Execution.from_dict(data) + assert execution.function_arn == "" + + +def test_execution_to_dict_with_function_arn(): + execution = Execution( + durable_execution_arn="arn:aws:lambda:us-east-1:123456789:function:test", + durable_execution_name="test-exec", + function_arn="arn:aws:lambda:us-east-1:123456789:function:test", + status="SUCCEEDED", + start_timestamp=1640995200.0, + ) + result = execution.to_dict() + assert "FunctionArn" in result + + +def test_event_input_from_details(): + from aws_durable_execution_sdk_python.lambda_service import ExecutionDetails + + details = ExecutionDetails(input_payload='{"test": "data"}') + event_input = EventInput.from_details(details, include=True) + assert event_input.payload == '{"test": "data"}' + assert not event_input.truncated + + event_input_truncated = EventInput.from_details(details, include=False) + assert event_input_truncated.payload is None + assert event_input_truncated.truncated + + +def test_event_result_from_details(): + from aws_durable_execution_sdk_python.lambda_service import StepDetails + + details = StepDetails(result='{"result": "success"}') + event_result = EventResult.from_details(details, include=True) + assert event_result.payload == '{"result": "success"}' + assert not event_result.truncated + + +def test_event_error_from_details(): + from aws_durable_execution_sdk_python.lambda_service import StepDetails + + error_obj = ErrorObject.from_message("Test error") + details = StepDetails(error=error_obj) + event_error = EventError.from_details(details) + assert event_error.payload.message == "Test error" + + +def test_event_from_dict_with_all_details(): + data = { + "EventType": "ExecutionStarted", + "EventTimestamp": datetime.fromisoformat("2024-01-01T12:00:00Z"), + "EventId": 1, + "Id": "op-1", + "Name": "test", + "ParentId": "parent-1", + "SubType": "test-subtype", + "ExecutionStartedDetails": { + "Input": {"Payload": '{"test": "data"}', "Truncated": False}, + "ExecutionTimeout": 300, + }, + } + event = Event.from_dict(data) + assert event.sub_type == "test-subtype" + assert event.parent_id == "parent-1" + + +def test_event_to_dict_with_all_details(): + event = Event( + event_type="ExecutionStarted", + event_timestamp=datetime.fromisoformat("2024-01-01T12:00:00Z"), + event_id=1, + operation_id="op-1", + name="test", + parent_id="parent-1", + sub_type="test-subtype", + execution_started_details=ExecutionStartedDetails( + input=EventInput(payload='{"test": "data"}', truncated=False), + execution_timeout=300, + ), + ) + result = event.to_dict() + assert result["SubType"] == "test-subtype" + assert result["ParentId"] == "parent-1" + assert result["ExecutionStartedDetails"]["ExecutionTimeout"] == 300 + + +def test_error_response_from_dict_nested(): + data = { + "error": { + "type": "ValidationError", + "message": "Invalid input", + "code": "400", + "requestId": "req-123", + } + } + error_response = ErrorResponse.from_dict(data) + assert error_response.error_type == "ValidationError" + assert error_response.error_message == "Invalid input" + assert error_response.error_code == "400" + assert error_response.request_id == "req-123" + + +def test_error_response_from_dict_flat(): + data = {"type": "ValidationError", "message": "Invalid input"} + error_response = ErrorResponse.from_dict(data) + assert error_response.error_type == "ValidationError" + assert error_response.error_message == "Invalid input" + + +def test_checkpoint_durable_execution_request_from_dict(): + token: str = "token-123" + data = { + "CheckpointToken": token, + "Updates": [ + {"Id": "op-1", "Type": "STEP", "Action": "START", "SubType": "Step"} + ], + } + request = CheckpointDurableExecutionRequest.from_dict(data, "arn:test") + assert request.checkpoint_token == token + assert len(request.updates) == 1 + assert request.updates[0].operation_id == "op-1" + + +# endregion model-tests + + +# region from_operation_started_tests +class TestFromOperationStarted: + """Tests for Event.from_operation_started method.""" + + def test_from_operation_started_execution(self): + """Test converting execution operation to started event.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "exec-123" + operation.name = "test_execution" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.EXECUTION + operation.start_timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + + execution_details = Mock() + execution_details.input_payload = '{"test": "data"}' + operation.execution_details = execution_details + + event = Event.from_operation_started( + operation, event_id=1, include_execution_data=True + ) + + assert event.event_type == "ExecutionStarted" + assert event.operation_id == "exec-123" + assert event.name == "test_execution" + assert event.parent_id == "parent-123" + assert event.execution_started_details.input.payload == '{"test": "data"}' + assert not event.execution_started_details.input.truncated + + def test_from_operation_started_execution_no_data(self): + """Test execution operation with include_execution_data=False.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "exec-123" + operation.name = "test_execution" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.EXECUTION + operation.start_timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + + execution_details = Mock() + execution_details.input_payload = '{"test": "data"}' + operation.execution_details = execution_details + + event = Event.from_operation_started( + operation, event_id=1, include_execution_data=False + ) + + assert event.event_type == "ExecutionStarted" + assert event.execution_started_details.input.payload is None + assert event.execution_started_details.input.truncated + + def test_from_operation_started_step(self): + """Test converting step operation to started event.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "step-123" + operation.name = "test_step" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.STEP + operation.start_timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + + event = Event.from_operation_started(operation, event_id=2) + + assert event.event_type == "StepStarted" + assert event.operation_id == "step-123" + assert event.name == "test_step" + assert event.parent_id == "parent-123" + assert event.step_started_details is not None + + def test_from_operation_started_wait(self): + """Test converting wait operation to started event.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "wait-123" + operation.name = "test_wait" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.WAIT + operation.start_timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + + wait_details = Mock() + wait_details.scheduled_end_timestamp = datetime( + 2024, 1, 1, 12, 5, 0, tzinfo=UTC + ) + operation.wait_details = wait_details + + event = Event.from_operation_started(operation, event_id=3) + + assert event.event_type == "WaitStarted" + assert event.operation_id == "wait-123" + assert event.name == "test_wait" + assert event.parent_id == "parent-123" + assert event.wait_started_details.duration == 300 + assert ( + event.wait_started_details.scheduled_end_timestamp + == datetime.fromisoformat("2024-01-01T12:05:00+00:00") + ) + + def test_from_operation_started_callback(self): + """Test converting callback operation to started event.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "callback-123" + operation.name = "test_callback" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.CALLBACK + operation.start_timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + + callback_details = Mock() + callback_details.callback_id = "cb-456" + operation.callback_details = callback_details + + event = Event.from_operation_started(operation, event_id=4) + + assert event.event_type == "CallbackStarted" + assert event.operation_id == "callback-123" + assert event.name == "test_callback" + assert event.parent_id == "parent-123" + assert event.callback_started_details.callback_id == "cb-456" + + def test_from_operation_started_chained_invoke(self): + """Test converting chained invoke operation to started event.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "invoke-123" + operation.name = "test_invoke" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.CHAINED_INVOKE + operation.start_timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + + event = Event.from_operation_started(operation, event_id=5) + + assert event.event_type == "ChainedInvokeStarted" + assert event.operation_id == "invoke-123" + assert event.name == "test_invoke" + assert event.parent_id == "parent-123" + assert event.chained_invoke_started_details is not None + + def test_from_operation_started_context(self): + """Test converting context operation to started event.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "context-123" + operation.name = "test_context" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.CONTEXT + operation.start_timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + + event = Event.from_operation_started(operation, event_id=6) + + assert event.event_type == "ContextStarted" + assert event.operation_id == "context-123" + assert event.name == "test_context" + assert event.parent_id == "parent-123" + assert event.context_started_details is not None + + def test_from_operation_started_no_timestamp(self): + """Test error when operation has no start timestamp.""" + from unittest.mock import Mock + + operation = Mock() + operation.start_timestamp = None + + with pytest.raises( + InvalidParameterValueException, + match="Operation start timestamp cannot be None", + ): + Event.from_operation_started(operation, event_id=1) + + def test_from_operation_started_unknown_type(self): + """Test error with unknown operation type.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_type = "UNKNOWN_TYPE" + operation.start_timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + + with pytest.raises( + InvalidParameterValueException, match="Unknown operation type: UNKNOWN_TYPE" + ): + Event.from_operation_started(operation, event_id=1) + + +# endregion from_operation_started_tests + + +# region from_operation_finished_tests +class TestFromOperationFinished: + """Tests for Event.from_operation_finished method.""" + + def test_from_operation_finished_execution_succeeded(self): + """Test converting succeeded execution operation to finished event.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "exec-123" + operation.name = "test_execution" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.EXECUTION + operation.status = OperationStatus.SUCCEEDED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + event = Event.from_operation_finished(operation, event_id=1) + + assert event.event_type == "ExecutionSucceeded" + assert event.operation_id == "exec-123" + assert event.name == "test_execution" + assert event.parent_id == "parent-123" + + def test_from_operation_finished_execution_failed(self): + """Test converting failed execution operation to finished event.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "exec-123" + operation.name = "test_execution" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.EXECUTION + operation.status = OperationStatus.FAILED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + event = Event.from_operation_finished(operation, event_id=1) + + assert event.event_type == "ExecutionFailed" + assert event.operation_id == "exec-123" + + def test_from_operation_finished_step_with_result(self): + """Test converting succeeded step operation with result.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "step-123" + operation.name = "test_step" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.STEP + operation.status = OperationStatus.SUCCEEDED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + step_details = Mock() + step_details.result = '{"result": "success"}' + step_details.error = None + operation.step_details = step_details + + event = Event.from_operation_finished( + operation, event_id=2, include_execution_data=True + ) + + assert event.event_type == "StepSucceeded" + assert event.operation_id == "step-123" + assert event.step_succeeded_details.result.payload == '{"result": "success"}' + + def test_from_operation_finished_step_with_error(self): + """Test converting failed step operation with error.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "step-123" + operation.name = "test_step" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.STEP + operation.status = OperationStatus.FAILED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + step_details = Mock() + step_details.result = None + step_details.error = ErrorObject.from_message("Step failed") + operation.step_details = step_details + + event = Event.from_operation_finished(operation, event_id=2) + + assert event.event_type == "StepFailed" + assert event.step_failed_details.error.payload.message == "Step failed" + + def test_from_operation_finished_wait_succeeded(self): + """Test converting succeeded wait operation.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "wait-123" + operation.name = "test_wait" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.WAIT + operation.status = OperationStatus.SUCCEEDED + operation.start_timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=UTC) + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + wait_details = Mock() + wait_details.scheduled_end_timestamp = datetime( + 2024, 1, 1, 12, 5, 0, tzinfo=UTC + ) + operation.wait_details = wait_details + + event = Event.from_operation_finished(operation, event_id=3) + + assert event.event_type == "WaitSucceeded" + assert event.wait_succeeded_details.duration == 300 + + def test_from_operation_finished_wait_cancelled(self): + """Test converting cancelled wait operation.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "wait-123" + operation.name = "test_wait" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.WAIT + operation.status = OperationStatus.CANCELLED + operation.end_timestamp = datetime(2024, 1, 1, 12, 3, 0, tzinfo=UTC) + operation.wait_details = None + + event = Event.from_operation_finished(operation, event_id=3) + + assert event.event_type == "WaitCancelled" + assert event.wait_cancelled_details is not None + + def test_from_operation_finished_callback_succeeded(self): + """Test converting succeeded callback operation.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "callback-123" + operation.name = "test_callback" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.CALLBACK + operation.status = OperationStatus.SUCCEEDED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + callback_details = Mock() + callback_details.result = '{"callback": "result"}' + callback_details.error = None + operation.callback_details = callback_details + + event = Event.from_operation_finished( + operation, event_id=4, include_execution_data=True + ) + + assert event.event_type == "CallbackSucceeded" + assert ( + event.callback_succeeded_details.result.payload == '{"callback": "result"}' + ) + + def test_from_operation_finished_callback_timed_out(self): + """Test converting timed out callback operation.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "callback-123" + operation.name = "test_callback" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.CALLBACK + operation.status = OperationStatus.TIMED_OUT + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + callback_details = Mock() + callback_details.result = None + callback_details.error = ErrorObject.from_message("Callback timed out") + operation.callback_details = callback_details + + event = Event.from_operation_finished(operation, event_id=4) + + assert event.event_type == "CallbackTimedOut" + assert ( + event.callback_timed_out_details.error.payload.message + == "Callback timed out" + ) + + def test_from_operation_finished_chained_invoke_succeeded(self): + """Test converting succeeded chained invoke operation.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "invoke-123" + operation.name = "test_invoke" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.CHAINED_INVOKE + operation.status = OperationStatus.SUCCEEDED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + chained_invoke_details = Mock() + chained_invoke_details.result = '{"invoke": "result"}' + chained_invoke_details.error = None + operation.chained_invoke_details = chained_invoke_details + + event = Event.from_operation_finished( + operation, event_id=5, include_execution_data=True + ) + + assert event.event_type == "ChainedInvokeSucceeded" + assert ( + event.chained_invoke_succeeded_details.result.payload + == '{"invoke": "result"}' + ) + + def test_from_operation_finished_chained_invoke_stopped(self): + """Test converting stopped chained invoke operation.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "invoke-123" + operation.name = "test_invoke" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.CHAINED_INVOKE + operation.status = OperationStatus.STOPPED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + chained_invoke_details = Mock() + chained_invoke_details.result = None + chained_invoke_details.error = ErrorObject.from_message("Invoke stopped") + operation.chained_invoke_details = chained_invoke_details + + event = Event.from_operation_finished(operation, event_id=5) + + assert event.event_type == "ChainedInvokeStopped" + assert ( + event.chained_invoke_stopped_details.error.payload.message + == "Invoke stopped" + ) + + def test_from_operation_finished_context_succeeded(self): + """Test converting succeeded context operation.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "context-123" + operation.name = "test_context" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.CONTEXT + operation.status = OperationStatus.SUCCEEDED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + context_details = Mock() + context_details.result = '{"context": "result"}' + context_details.error = None + operation.context_details = context_details + operation.result = None + operation.error = None + + event = Event.from_operation_finished( + operation, event_id=6, include_execution_data=True + ) + + assert event.event_type == "ContextSucceeded" + assert event.context_succeeded_details.result.payload == '{"context": "result"}' + + def test_from_operation_finished_context_failed(self): + """Test converting failed context operation.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "context-123" + operation.name = "test_context" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.CONTEXT + operation.status = OperationStatus.FAILED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + context_details = Mock() + context_details.result = None + context_details.error = ErrorObject.from_message("Context failed") + operation.context_details = context_details + operation.result = None + operation.error = None + + event = Event.from_operation_finished(operation, event_id=6) + + assert event.event_type == "ContextFailed" + assert event.context_failed_details.error.payload.message == "Context failed" + + def test_from_operation_finished_no_end_timestamp(self): + """Test error when operation has no end timestamp.""" + from unittest.mock import Mock + + operation = Mock() + operation.end_timestamp = None + + with pytest.raises( + InvalidParameterValueException, + match="Operation end timestamp cannot be None", + ): + Event.from_operation_finished(operation, event_id=1) + + def test_from_operation_finished_invalid_status(self): + """Test error with invalid operation status.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.status = OperationStatus.STARTED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + with pytest.raises( + InvalidParameterValueException, + match="Operation status must be one of SUCCEEDED, FAILED, TIMED_OUT, STOPPED, or CANCELLED", + ): + Event.from_operation_finished(operation, event_id=1) + + def test_from_operation_finished_unknown_type(self): + """Test error with unknown operation type.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_type = "UNKNOWN_TYPE" + operation.status = OperationStatus.SUCCEEDED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + + with pytest.raises( + InvalidParameterValueException, match="Unknown operation type: UNKNOWN_TYPE" + ): + Event.from_operation_finished(operation, event_id=1) + + def test_from_operation_finished_no_details(self): + """Test operations with no detail objects.""" + from datetime import UTC, datetime + from unittest.mock import Mock + + operation = Mock() + operation.operation_id = "step-123" + operation.name = "test_step" + operation.parent_id = "parent-123" + operation.operation_type = OperationType.STEP + operation.status = OperationStatus.SUCCEEDED + operation.end_timestamp = datetime(2024, 1, 1, 12, 5, 0, tzinfo=UTC) + operation.step_details = None + + event = Event.from_operation_finished(operation, event_id=2) + + assert event.event_type == "StepSucceeded" + assert event.step_succeeded_details.result is None + + +# endregion from_operation_finished_tests diff --git a/tests/executor_test.py b/tests/executor_test.py index 78a067e..e9ad1b1 100644 --- a/tests/executor_test.py +++ b/tests/executor_test.py @@ -1938,6 +1938,7 @@ def test_get_execution_state_invalid_token(executor, mock_store): def test_get_execution_history(executor, mock_store): """Test get_execution_history method.""" mock_execution = Mock() + mock_execution.operations = [] # Empty operations list mock_store.load.return_value = mock_execution result = executor.get_execution_history("test-arn") @@ -1947,6 +1948,115 @@ def test_get_execution_history(executor, mock_store): mock_store.load.assert_called_once_with("test-arn") +def test_get_execution_history_with_events(executor, mock_store): + """Test get_execution_history with actual events.""" + from aws_durable_execution_sdk_python.lambda_service import StepDetails + + # Create operations that will generate events + op1 = Operation( + operation_id="op-1", + operation_type=OperationType.STEP, + status=OperationStatus.SUCCEEDED, + start_timestamp=datetime.now(UTC), + end_timestamp=datetime.now(UTC), + step_details=StepDetails(result="test_result"), + ) + + mock_execution = Mock() + mock_execution.operations = [op1] + mock_store.load.return_value = mock_execution + + result = executor.get_execution_history("test-arn", include_execution_data=True) + + assert len(result.events) == 2 # Started + Succeeded events + assert result.events[0].event_type == "StepStarted" + assert result.events[1].event_type == "StepSucceeded" + + +def test_get_execution_history_reverse_order(executor, mock_store): + """Test get_execution_history with reverse order.""" + op1 = Operation( + operation_id="op-1", + operation_type=OperationType.STEP, + status=OperationStatus.SUCCEEDED, + start_timestamp=datetime.now(UTC), + end_timestamp=datetime.now(UTC), + ) + + mock_execution = Mock() + mock_execution.operations = [op1] + mock_store.load.return_value = mock_execution + + result = executor.get_execution_history("test-arn", reverse_order=True) + + assert len(result.events) == 2 + # In reverse order, succeeded event should come first + assert result.events[0].event_type == "StepSucceeded" + assert result.events[1].event_type == "StepStarted" + + +def test_get_execution_history_pagination(executor, mock_store): + """Test get_execution_history with pagination.""" + # Create multiple operations to generate many events + operations = [] + for i in range(3): + op = Operation( + operation_id=f"op-{i}", + operation_type=OperationType.STEP, + status=OperationStatus.SUCCEEDED, + start_timestamp=datetime.now(UTC), + end_timestamp=datetime.now(UTC), + ) + operations.append(op) + + mock_execution = Mock() + mock_execution.operations = operations + mock_store.load.return_value = mock_execution + + # Test with max_items=2 + result = executor.get_execution_history("test-arn", max_items=2) + + assert len(result.events) == 2 + assert result.next_marker == "3" # Next event_id + + +def test_get_execution_history_pagination_with_marker(executor, mock_store): + """Test get_execution_history pagination with marker.""" + operations = [] + for i in range(3): + op = Operation( + operation_id=f"op-{i}", + operation_type=OperationType.STEP, + status=OperationStatus.SUCCEEDED, + start_timestamp=datetime.now(UTC), + end_timestamp=datetime.now(UTC), + ) + operations.append(op) + + mock_execution = Mock() + mock_execution.operations = operations + mock_store.load.return_value = mock_execution + + # Test with marker (start from event_id 3) + result = executor.get_execution_history("test-arn", marker="3", max_items=2) + + assert len(result.events) == 2 + # Should get events with event_id >= 3 + + +def test_get_execution_history_invalid_marker(executor, mock_store): + """Test get_execution_history with invalid marker.""" + mock_execution = Mock() + mock_execution.operations = [] + mock_store.load.return_value = mock_execution + + # Invalid marker should default to 1 + result = executor.get_execution_history("test-arn", marker="invalid") + + assert result.events == [] + assert result.next_marker is None + + def test_checkpoint_execution(executor, mock_store): """Test checkpoint_execution method.""" mock_execution = Mock() diff --git a/tests/model_test.py b/tests/model_test.py index 2cd3fde..9b2404f 100644 --- a/tests/model_test.py +++ b/tests/model_test.py @@ -6,6 +6,10 @@ import pytest +from aws_durable_execution_sdk_python.lambda_service import ( + OperationStatus, + OperationType, +) from aws_durable_execution_sdk_python_testing.exceptions import ( InvalidParameterValueException, ) @@ -63,6 +67,7 @@ WaitCancelledDetails, WaitStartedDetails, WaitSucceededDetails, + events_to_operations, ) @@ -2955,20 +2960,6 @@ def test_events_to_operations_empty_list(): def test_events_to_operations_execution_started(): """Test events_to_operations with ExecutionStarted event.""" - import datetime - - from aws_durable_execution_sdk_python.lambda_service import ( - OperationStatus, - OperationType, - ) - - from aws_durable_execution_sdk_python_testing.model import ( - Event, - EventInput, - ExecutionStartedDetails, - events_to_operations, - ) - event = Event( event_type="ExecutionStarted", event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), @@ -2990,20 +2981,6 @@ def test_events_to_operations_execution_started(): def test_events_to_operations_callback_lifecycle(): """Test events_to_operations with complete callback lifecycle.""" - import datetime - - from aws_durable_execution_sdk_python.lambda_service import ( - OperationStatus, - OperationType, - ) - - from aws_durable_execution_sdk_python_testing.model import ( - CallbackStartedDetails, - CallbackSucceededDetails, - Event, - EventResult, - events_to_operations, - ) started_event = Event( event_type="CallbackStarted", @@ -3036,57 +3013,42 @@ def test_events_to_operations_callback_lifecycle(): def test_events_to_operations_missing_event_type(): """Test events_to_operations raises error for missing event_type.""" - import datetime - - from aws_durable_execution_sdk_python_testing.model import ( - Event, - events_to_operations, - ) - event = Event( event_type=None, event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), ) - with pytest.raises(ValueError, match="Missing required 'event_type' field"): + with pytest.raises( + InvalidParameterValueException, match="Missing required 'event_type' field" + ): events_to_operations([event]) def test_events_to_operations_unknown_event_type(): """Test events_to_operations raises error for unknown event type.""" - import datetime - - from aws_durable_execution_sdk_python_testing.model import ( - Event, - events_to_operations, - ) - event = Event( event_type="UnknownEventType", event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), operation_id="op-1", ) - with pytest.raises(ValueError, match="Unknown event type: UnknownEventType"): + with pytest.raises( + InvalidParameterValueException, match="Unknown event type: UnknownEventType" + ): events_to_operations([event]) def test_events_to_operations_missing_operation_id(): """Test events_to_operations raises error for missing operation_id.""" - import datetime - - from aws_durable_execution_sdk_python_testing.model import ( - Event, - events_to_operations, - ) - event = Event( event_type="StepStarted", event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), operation_id=None, ) - with pytest.raises(ValueError, match="Missing required 'operation_id' field"): + with pytest.raises( + InvalidParameterValueException, match="Missing required 'operation_id' field" + ): events_to_operations([event]) @@ -3243,13 +3205,6 @@ def test_events_to_operations_chained_invoke_succeeded(): def test_events_to_operations_skips_invocation_completed(): """Test events_to_operations skips InvocationCompleted events.""" - import datetime - - from aws_durable_execution_sdk_python_testing.model import ( - Event, - events_to_operations, - ) - invocation_event = Event( event_type="InvocationCompleted", event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), @@ -3550,13 +3505,6 @@ def test_events_to_operations_merges_timestamps(): def test_events_to_operations_preserves_parent_id(): """Test events_to_operations preserves parent_id from events.""" - import datetime - - from aws_durable_execution_sdk_python_testing.model import ( - Event, - events_to_operations, - ) - event = Event( event_type="StepStarted", event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), @@ -3573,13 +3521,6 @@ def test_events_to_operations_preserves_parent_id(): def test_events_to_operations_preserves_sub_type(): """Test events_to_operations preserves sub_type from events.""" - import datetime - - from aws_durable_execution_sdk_python_testing.model import ( - Event, - events_to_operations, - ) - event = Event( event_type="StepStarted", event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), @@ -3595,23 +3536,17 @@ def test_events_to_operations_preserves_sub_type(): def test_events_to_operations_invalid_sub_type(): - """Test events_to_operations handles invalid sub_type gracefully.""" - import datetime - - from aws_durable_execution_sdk_python_testing.model import ( - Event, - events_to_operations, - ) - + """Test events_to_operations raises InvalidParameterValueException when sub_type is invalid.""" + invalid_sub_type: str = "INVALID_SUB_TYPE" event = Event( event_type="StepStarted", event_timestamp=datetime.datetime(2023, 1, 1, 0, 0, 0, tzinfo=datetime.UTC), operation_id="step-1", - sub_type="INVALID_SUB_TYPE", + sub_type=invalid_sub_type, ) - operations = events_to_operations([event]) - - assert len(operations) == 1 - # Invalid sub_type should be ignored (set to None) - assert operations[0].sub_type is None + with pytest.raises( + InvalidParameterValueException, + match=f"'{invalid_sub_type}' is not a valid OperationSubType", + ): + events_to_operations([event])