Skip to content

Add @Flow.model functional API#206

Open
NeejWeej wants to merge 13 commits into
Point72:mainfrom
NeejWeej:nk/auto_deps_auto_callable_model
Open

Add @Flow.model functional API#206
NeejWeej wants to merge 13 commits into
Point72:mainfrom
NeejWeej:nk/auto_deps_auto_callable_model

Conversation

@NeejWeej
Copy link
Copy Markdown

@NeejWeej NeejWeej commented May 4, 2026

PR Summary: @Flow.model

Branch: nk/auto_deps_auto_callable_model

Replaces #171. Reopened from a personal fork.

This PR adds @Flow.model, an authoring API that turns a typed Python function
into a real CallableModel factory. The goal is to make common DAG stages
easier to write while keeping execution inside existing ccflow machinery:
CallableModel, evaluators, caches, dependency graphs, registry/Hydra loading,
and serialization.

Core API

@Flow.model splits function parameters into two categories:

  • Regular parameters: ordinary unmarked parameters. These are construction-time
    model inputs and may be literals, defaults, or direct upstream
    CallableModel dependencies.
  • Contextual parameters: parameters marked with FromContext[T]. These are
    runtime inputs supplied by context, .flow.compute(...), construction-time
    contextual defaults, or .flow.with_context(...).
from ccflow import Flow, FromContext


@Flow.model
def add(a: int, b: FromContext[int]) -> int:
    return a + b


model = add(a=10)
assert model.flow.compute(b=5).value == 15

When a function returns a non-ResultBase value, the generated model wraps it in
GenericResult. Explicit ResultBase returns are preserved.

Dependency Wiring

Regular parameters can be bound directly to upstream models:

@Flow.model
def source(value: FromContext[int]) -> int:
    return value * 10


@Flow.model
def root(x: int, bonus: FromContext[int]) -> int:
    return x + bonus


model = root(x=source())
assert model.flow.compute(value=3, bonus=7).value == 37

Only direct regular-parameter values are treated as upstream dependencies in
this first version. Containers such as list, tuple, dict, and set are
ordinary literal values; @Flow.model does not scan them for nested model
dependencies.

Generated __deps__ methods expose non-lazy direct upstream dependencies to the
existing graph evaluator. Lazy[T] is supported for direct dependency thunks
when a dependency should only be evaluated if user code calls it.

Context Rewrites

This PR adds .flow.with_context(...) plus @Flow.context_transform.

with_context(...) rewrites runtime context for one dependency edge without
mutating the wrapped model. This supports fanout patterns where the same model is
evaluated against different contextual inputs in different branches.

@Flow.context_transform
def previous_day(day: FromContext[int]) -> int:
    return day - 1


previous = source().flow.with_context(value=previous_day())

Context bindings are stored as one ordered operation stream. Chained
with_context(...) calls preserve write order. Context transforms read from the
original ambient context, not from values written by earlier bindings in the
chain. Earlier field bindings overwritten by later field bindings do not run or
require inputs; patch transforms remain conservative because their output keys
can be dynamic.

Positional with_context(...) arguments must be bound @Flow.context_transform
results that return mappings. Keyword field bindings may be static values or
bound field transforms. Callable keyword values are allowed when the target
contextual field type validates them, for example FromContext[Callable[..., T]].

Execution And Introspection Helpers

Every CallableModel now exposes model.flow.

The public .flow surface is intentionally small:

  • compute(...): ergonomic execution from a context object or contextual kwargs.
  • with_context(...): edge-local context rewrites.
  • inspect(...): structured debugging and introspection.

inspect(...) returns a FlowInspection object:

inspection = model.flow.inspect()

inspection.inputs
inspection.context_inputs
inspection.runtime_inputs
inspection.required_inputs
inspection.bound_inputs
inspection.dependencies

The top-level inspection fields are current-level only. They describe the model
or wrapper being inspected, not a flattened view of the whole dependency graph.

  • inputs: a dict from function input name to InputSpec, including type,
    required/default/value/source information.
  • context_inputs: declared contextual contract for the model or wrapped model.
  • runtime_inputs: direct runtime inputs the current model or wrapper may read
    after its own bindings.
  • required_inputs: required direct runtime inputs still unsatisfied by
    defaults or bindings.
  • bound_inputs: concrete values already fixed on the current model or wrapper.
  • dependencies: dependency edges discovered from direct generated-model
    regular inputs.

context_inputs intentionally remains faithful to the declared model contract.
For bound wrappers, use runtime_inputs, required_inputs, bound_inputs, and
inputs on the inspection object to understand the effective caller-facing
context after bindings.

@Flow.model
def add(a: int, b: FromContext[int], c: FromContext[int] = 5) -> int:
    return a + b + c


@Flow.context_transform
def from_seed(seed: FromContext[int]) -> int:
    return seed + 1


bound = add(a=10).flow.with_context(b=from_seed())
inspection = bound.flow.inspect()

assert inspection.context_inputs == {"b": int, "c": int}
assert inspection.runtime_inputs == {"c": int, "seed": int}
assert inspection.required_inputs == {"seed": int}
assert inspection.bound_inputs == {"a": 10}

inspect(...) can also take a proposed context object or contextual kwargs.
Those values are used structurally: known direct inputs get values, and
dependency edges get projected context. inspect(...) does not validate unused
runtime fields, does not report missing runtime fields as a separate check
object, and does not try to flatten graph-wide requirements. A strict debug-time
input checker is intentionally deferred until current-model versus graph-wide
semantics are explicit.

Dependency depth is controlled by one option:

model.flow.inspect(dependencies="direct")     # default
model.flow.inspect(dependencies="recursive")  # generated-model dependency tree
model.flow.inspect(dependencies="none")       # skip dependency inspection

dependencies="direct" lists immediate dependency edges. dependencies="none"
leaves dependencies empty. dependencies="recursive" follows inspect-visible
dependencies from constructed @Flow.model inputs and with_context(...)
wrappers.

This is not a full evaluator graph browser. A handwritten CallableModel can
appear as a dependency target when it is bound to an @Flow.model regular input,
but inspect(...) does not expand that handwritten model's custom
CallableModel.__deps__ implementation. That broader graph introspection is a
follow-on feature.

compute() deliberately does not bind regular parameters. If a kwarg matches a
regular parameter or model configuration field, it raises instead of silently
treating runtime context as model construction input.

Flow.call(auto_context=...)

The PR also adds Flow.call(auto_context=...) as a narrow opt-in for handwritten
CallableModel.__call__ methods that want to declare context fields as
keyword-only parameters.

This is not the main @Flow.model authoring path. It does not add
FromContext[...], dependency wiring, generated factories, or
.flow.with_context(...) semantics by itself.

Serialization

Importable module-level @Flow.model functions produce generated classes with
stable module import paths, so JSON/config-style round trips can work across
processes when the defining module is importable.

Local, nested, and __main__ generated models are best-effort for
pickle/cloudpickle object transport, not stable config artifacts. Their analyzed
function contract is serialized so restore does not need to re-run type-hint
resolution in the receiving process.

Generated model and BoundModel pickle restore use portable validation data
instead of raw Pydantic state. This avoids fragile process-local generic classes
such as GenericResult[int] leaking into pickle/Ray payloads.

@Flow.context_transform bindings always store a serialized analyzed config.
They do not rely on import-path detection, because during decoration the module
global usually still points at the undecorated function.

Cache And Graph Identity

Public cache_key(...) remains structural by default.

Generated and bound models also support effective identity for model
evaluations. Effective identity describes the parts of an invocation that affect
the result, so unused ambient FlowContext fields do not split built-in cache
entries or graph nodes.

The built-in MemoryCacheEvaluator uses:

cache_key(context, effective=True)

Custom evaluators can use the same public API if they want generated-model-aware
keys:

from ccflow.evaluators import cache_key


key = cache_key(model_evaluation_context, effective=True)

The default remains structural:

cache_key(model_evaluation_context)

Ordinary handwritten CallableModel classes continue to use structural
identity. This is intentional: arbitrary CallableModel.__call__
implementations can inspect context in ways ccflow cannot infer safely.

Opaque evaluators also use structural identity, since they could access
arbitrary fields on the context that differ from the signature of a generated
model.

Unexpected errors while deriving effective identity propagate. The only
structural fallback is the explicit internal "effective key unavailable" path,
such as recursive effective identity.

Why Effective Identity Matters

The existing structural key can over-split cache entries when callers pass a
richer context than the model semantically uses. With structural context
identity, adding or changing an ambient field for one branch of a DAG can
invalidate cache reuse in another branch that does not use that field.

For ordinary handwritten models, ccflow cannot safely infer what Python code
uses. A normal __call__ implementation might inspect type(context), call
context.model_dump(), read subclass-only fields, or otherwise depend on the
full runtime context object.

@Flow.model improves this case because consumed contextual inputs are explicit
via FromContext[...], so generated models can safely ignore unused ambient
fields in effective cache and graph identity.

from datetime import date

from ccflow import Flow, FlowContext, FlowOptionsOverride, FromContext
from ccflow.evaluators import MemoryCacheEvaluator


calls = {"count": 0}


@Flow.model
def day_name(day: FromContext[date]) -> str:
    calls["count"] += 1
    return day.strftime("%A")


model = day_name()
cache = MemoryCacheEvaluator()

ctx1 = FlowContext(day=date(2024, 1, 1), request_id="a")
ctx2 = FlowContext(day=date(2024, 1, 1), request_id="b")

with FlowOptionsOverride(options={"evaluator": cache, "cacheable": True}):
    assert model(ctx1).value == "Monday"
    assert model(ctx2).value == "Monday"

assert calls["count"] == 1

Validation And Error Behavior

The generated model remains a Pydantic model, but ccflow owns runtime binding
and coercion semantics.

The generated model's stored Pydantic fields use SkipValidation[...]. This is
implemented in _generated_field_annotation(...), which is used when
create_model(...) builds the generated CallableModel subclass. The public
factory signature still shows the user-facing annotations (T,
FromContext[T], Lazy[T]); SkipValidation[...] is only for the internal
Pydantic fields stored on the generated model instance.

That prevents Pydantic field validation from forcing registry resolution,
dependency handling, lazy handling, or contextual-default handling before
ccflow's generated-model validator can apply the correct rules.

The generated Pydantic field schema keeps useful type information when Pydantic
can build a schema for it. If Pydantic cannot build a schema for known schema
construction reasons, only the Pydantic field schema falls back to Any;
runtime coercion still uses the real annotation.

Validation is literal-first for regular parameter values. Serialized-looking
dependency dictionaries using type_ or _target_ are only interpreted as
dependencies after normal literal validation fails where that distinction is
ambiguous.

Unexpected errors from type hint resolution, type adapter construction, runtime
validation internals, and effective identity derivation propagate instead of
being masked by broad fallback paths.

Dependency evaluation preserves the original exception type and adds dependency
path context when the Python runtime supports exception notes.

Compatibility

The PR is additive:

  • Existing CallableModel implementations continue to work.
  • Existing Flow.call behavior is preserved.
  • cache_key(...) remains structural unless effective=True is explicitly
    requested.
  • Plain CallableModel cache keys and graph keys remain structural.
  • FlowContext is an open runtime carrier for generated models.
  • Declared context_type=... can still be used to validate FromContext[...]
    fields against an existing nominal context.

Test Coverage

The test suite covers:

  • generated model execution and validation,
  • contextual defaults and runtime precedence,
  • direct dependency wiring and lazy dependencies,
  • regular container values remaining literals,
  • with_context(...) field and patch transforms,
  • ordered chained bindings and original-ambient transform reads,
  • declared context type validation,
  • model.flow.inspect(...) introspection,
  • dependency context projection for nested inspection,
  • registry and Hydra-style construction,
  • pickle/cloudpickle, Ray, and cross-process serialization,
  • stable import-path JSON round trips for importable generated models,
  • cache_key(..., effective=True) behavior,
  • dependency graph integration,
  • ordinary CallableModel compatibility,
  • error propagation for type hints, type adapters, validators, and effective
    identity.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 4, 2026

Codecov Report

❌ Patch coverage is 87.18535% with 168 lines in your changes missing coverage. Please review.
✅ Project coverage is 93.88%. Comparing base (3c8fd19) to head (5d73ed6).

Files with missing lines Patch % Lines
...amples/flow_model/flow_model_hydra_builder_demo.py 0.00% 57 Missing ⚠️
ccflow/examples/flow_model/flow_model_example.py 0.00% 49 Missing ⚠️
ccflow/_flow_model_binding.py 93.07% 18 Missing and 11 partials ⚠️
ccflow/tests/test_callable.py 91.01% 15 Missing ⚠️
ccflow/tests/test_flow_context.py 97.61% 7 Missing ⚠️
ccflow/evaluators/common.py 93.02% 4 Missing and 2 partials ⚠️
ccflow/context.py 93.10% 1 Missing and 1 partial ⚠️
ccflow/tests/flow_model_hydra_fixtures.py 91.30% 1 Missing and 1 partial ⚠️
ccflow/tests/evaluators/test_common.py 98.70% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #206      +/-   ##
==========================================
- Coverage   95.37%   93.88%   -1.49%     
==========================================
  Files         142      150       +8     
  Lines       11404    17011    +5607     
  Branches      620     1090     +470     
==========================================
+ Hits        10876    15971    +5095     
- Misses        399      820     +421     
- Partials      129      220      +91     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@NeejWeej NeejWeej force-pushed the nk/auto_deps_auto_callable_model branch 4 times, most recently from dddfb5b to 9f1755c Compare May 4, 2026 08:22
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
@NeejWeej NeejWeej force-pushed the nk/auto_deps_auto_callable_model branch from 9f1755c to a074f8f Compare May 4, 2026 08:41
@NeejWeej NeejWeej force-pushed the nk/auto_deps_auto_callable_model branch 7 times, most recently from 4038d26 to af73a7f Compare May 14, 2026 12:45
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
@NeejWeej NeejWeej force-pushed the nk/auto_deps_auto_callable_model branch from af73a7f to 6ff5409 Compare May 14, 2026 15:19
NeejWeej added 2 commits May 14, 2026 11:37
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
@NeejWeej NeejWeej force-pushed the nk/auto_deps_auto_callable_model branch from 7d4e83c to 40f3adb Compare May 14, 2026 15:39
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
@NeejWeej NeejWeej force-pushed the nk/auto_deps_auto_callable_model branch from fdd9b8a to c193211 Compare May 14, 2026 16:40
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
@NeejWeej NeejWeej force-pushed the nk/auto_deps_auto_callable_model branch from 16896d0 to 7c620e2 Compare May 15, 2026 09:13
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant