Skip to content
2 changes: 2 additions & 0 deletions unstract/core/src/unstract/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
PipelineStatus,
PipelineStatusUpdateRequest,
QueueName,
SkipReason,
StatusMappings,
TaskError,
TaskExecutionContext,
Expand Down Expand Up @@ -76,6 +77,7 @@
"WebhookResult",
"FileExecutionResult",
"BatchExecutionResult",
"SkipReason",
"CallbackExecutionData",
"WorkflowExecutionUpdateRequest",
"PipelineStatusUpdateRequest",
Expand Down
175 changes: 170 additions & 5 deletions unstract/core/src/unstract/core/worker_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,32 @@
FAILED = "Failed"


class SkipReason(str, Enum):
"""Reasons a per-file execution can be skipped.

Comment thread
muhammad-ali-e marked this conversation as resolved.
Closed vocabulary so typos at producer call sites fail at
construction time rather than silently producing an
unrecognisable value on the wire. Subclasses ``str`` so members
compare equal to their underlying string value; wire serialisation
extracts ``.value`` via ``serialize_dataclass_to_dict``'s
``isinstance(value, Enum)`` branch (this is *not*
:class:`enum.StrEnum`, which would also change ``__str__`` to
return the value — that distinction matters if any caller ever
does ``str(SkipReason.X)`` rather than ``SkipReason.X.value``).

Values mirror the batch-level skip counters on
:class:`BatchExecutionResult` (``skipped_already_completed`` /
``skipped_active_duplicate``) so per-file and batch-level
vocabulary stay in lockstep.
"""

# WorkflowFileExecution.status was already COMPLETED — another
# worker finished this file before we got to it.
ALREADY_COMPLETED = "already_completed"
# Another worker is currently processing the same file content.
ACTIVE_DUPLICATE = "active_duplicate"


class NotificationMethod(str, Enum):
"""Notification delivery methods."""

Expand Down Expand Up @@ -239,7 +265,14 @@

@dataclass
class FileExecutionResult:
"""Structured result for file execution tasks."""
"""Structured result for file execution tasks.

The API-deployment chord path historically returned dicts with
``file_name`` / ``result_data`` / ``skipped`` rather than the
canonical ``file`` / ``result`` shape. The optional aliases below
Comment thread
muhammad-ali-e marked this conversation as resolved.
let producers populate either vocabulary without the consumer
needing to know which path produced the result.
"""

file: str
file_execution_id: str | None
Expand All @@ -249,6 +282,21 @@
metadata: dict[str, Any] | None = None
processing_time: float = 0.0
file_size: int = 0
# TODO(UN-3516): remove these three legacy API-path aliases once

Check warning on line 285 in unstract/core/src/unstract/core/worker_models.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Complete the task associated to this "TODO" comment.

See more on https://sonarcloud.io/project/issues?id=Zipstack_unstract&issues=AZ6mVAM7uvU_i541iDUW&open=AZ6mVAM7uvU_i541iDUW&pullRequest=2020
# consumers have migrated to the canonical ``file`` / ``result``
# fields and to a typed skip-reason. Kept additive in the meantime
# so the chord-callback producer can preserve its current dict
# vocabulary without consumer breakage.
file_name: str | None = None
Comment thread
muhammad-ali-e marked this conversation as resolved.
result_data: Any | None = None
Comment thread
muhammad-ali-e marked this conversation as resolved.
skipped: SkipReason | None = None
# Storage backend's acknowledgement for a successfully-processed
# file. Forwarded verbatim from
# ``api_client.store_file_execution_result``. No in-tree consumer
# reads it today, but external integrations may inspect it on the
# wire — preserving it as a typed field rather than dict-spreading
# so it survives the BatchExecutionResult round-trip.
storage_result: Any | None = None
Comment thread
muhammad-ali-e marked this conversation as resolved.

def __post_init__(self) -> None:
Comment thread
muhammad-ali-e marked this conversation as resolved.
Comment thread
muhammad-ali-e marked this conversation as resolved.
Comment thread
muhammad-ali-e marked this conversation as resolved.
if self.error:
Expand All @@ -275,6 +323,37 @@
"""Convert to JSON-serializable dict for backward compatibility."""
return self.to_api_dict()
Comment thread
muhammad-ali-e marked this conversation as resolved.

@staticmethod
def _parse_skipped(
raw: Any, file_execution_id: str | None = None
) -> "SkipReason | None":
"""Lenient ``SkipReason`` parser for the consumer side.

Producer call sites are typed (constructor takes the enum, typos
fail at construction). On the consumer side we accept an unknown
wire value gracefully — a newer-producer / older-consumer
rolling-deploy must not crash the entire batch task on a value
the consumer doesn't recognise. Standard "strict on emit,
lenient on receive" posture.

``file_execution_id`` is optional but threaded through from
``from_dict`` so the warning log carries file context — without
it, a debugger seeing the warning has no way to find which file
triggered the unknown value.
"""
if not raw:
return None
try:
return SkipReason(raw)
except ValueError:
logger.warning(
"Unknown SkipReason on wire: %r (file_execution_id=%s); "
"treating as None",
raw,
file_execution_id,
)
return None

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "FileExecutionResult":
"""Create from dictionary (e.g., task result)."""
Expand All @@ -284,15 +363,22 @@
if data.get("error")
else ApiDeploymentResultStatus.SUCCESS
)
file_execution_id = data.get("file_execution_id")
return cls(
file=data.get("file", ""),
file_execution_id=data.get("file_execution_id"),
file_execution_id=file_execution_id,
status=status,
error=data.get("error"),
result=data.get("result"),
metadata=data.get("metadata"),
processing_time=data.get("processing_time", 0.0),
file_size=data.get("file_size", 0),
file_name=data.get("file_name"),
result_data=data.get("result_data"),
skipped=cls._parse_skipped(
data.get("skipped"), file_execution_id=file_execution_id
),
storage_result=data.get("storage_result"),
)

def is_successful(self) -> bool:
Expand All @@ -306,7 +392,13 @@

@dataclass
class BatchExecutionResult:
"""Structured result for batch execution tasks."""
"""Structured result for batch execution tasks.

The general workflow chord path additionally tracks skipped-file
sub-counts and the org context on the wire. The optional fields
below capture that vocabulary without changing existing field
semantics — strictly additive.
"""

total_files: int
successful_files: int
Expand All @@ -315,6 +407,12 @@
file_results: list[FileExecutionResult] = field(default_factory=list)
batch_id: str | None = None
errors: list[str] = field(default_factory=list)
# Optional general-path fields. Producers populate; consumers that
# don't know about them are unaffected (existing reads use ``.get()``
# with defaults).
skipped_already_completed: int = 0
Comment thread
muhammad-ali-e marked this conversation as resolved.
skipped_active_duplicate: int = 0
organization_id: str | None = None
Comment thread
muhammad-ali-e marked this conversation as resolved.

@property
def success_rate(self) -> float:
Expand All @@ -324,8 +422,23 @@
return (self.successful_files / self.total_files) * 100

def to_dict(self) -> dict[str, Any]:
Comment thread
muhammad-ali-e marked this conversation as resolved.
"""Convert to dictionary for API response."""
return serialize_dataclass_to_dict(self)
"""Convert to dictionary for API response.

Strips ``None`` from nested ``file_results`` so a per-file dict
read from ``batch["file_results"][i]`` has the same wire shape
as a standalone ``FileExecutionResult.to_dict()`` — both omit
unset optional fields. ``serialize_dataclass_to_dict`` only
strips ``None`` at the outer level, so without this fixup the
per-file dicts nested in the batch wire would carry explicit
``"file_name": None`` etc. entries and break consumers that
rely on membership checks.
"""
wire = serialize_dataclass_to_dict(self)
Comment thread
muhammad-ali-e marked this conversation as resolved.
wire["file_results"] = [
{k: v for k, v in fr.items() if v is not None}
for fr in wire.get("file_results", [])
]
return wire

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "BatchExecutionResult":
Expand All @@ -343,6 +456,58 @@
file_results=file_results,
batch_id=data.get("batch_id"),
errors=data.get("errors", []),
skipped_already_completed=data.get("skipped_already_completed", 0),
skipped_active_duplicate=data.get("skipped_active_duplicate", 0),
organization_id=data.get("organization_id"),
)

@classmethod
def from_file_results(
cls,
file_results: list[FileExecutionResult],
*,
execution_time: float,
organization_id: str | None = None,
batch_id: str | None = None,
errors: list[str] | None = None,
) -> "BatchExecutionResult":
"""Build a batch result by deriving counters from typed file results.

Counts ``successful_files`` / ``failed_files`` / ``skipped_*``
from the file results themselves rather than letting the caller
pass them as parameters — removes the class of bug where a
producer's hand-rolled counters drift from the underlying
``file_results`` (e.g. a string-match on the wire that misses a
new ``SkipReason`` member). Existing call sites that need to
keep their own counter semantics can keep using the
constructor directly; this is purely additive.

Note that ``successful_files`` here matches
``BatchExecutionResult.is_successful()`` semantics — a file with
``skipped`` set is *also* considered successful (consistent with
the API-path producer's "skipped files count as successful"
rule). Callers that need a different split should compute it
themselves and use the constructor.
"""
successful = sum(1 for fr in file_results if fr.is_successful())
failed = sum(1 for fr in file_results if not fr.is_successful())
skipped_already_completed = sum(
1 for fr in file_results if fr.skipped == SkipReason.ALREADY_COMPLETED
)
skipped_active_duplicate = sum(
1 for fr in file_results if fr.skipped == SkipReason.ACTIVE_DUPLICATE
)
return cls(
total_files=len(file_results),
successful_files=successful,
failed_files=failed,
execution_time=execution_time,
file_results=file_results,
batch_id=batch_id,
errors=errors or [],
skipped_already_completed=skipped_already_completed,
skipped_active_duplicate=skipped_active_duplicate,
organization_id=organization_id,
)

def add_file_result(self, file_result: FileExecutionResult):
Expand Down
Loading
Loading