Collapsing the tprl_content_generation/ workflow layer

Branch refactor/Temporal-primitives · 9 phases, 2 checkpoints · companion to collapse-content-generation-workflow-layer-v2.md

What. Replaced seven near-identical WfMake* child workflows with direct workflow.execute_activity(act_*, …) calls inside WfPipeRouter. Deleted two near-duplicate generator implementations (ContentGeneratorTop + ContentGeneratorChild) and their factories, and a small assignment-types models file. Added one new in-workflow generator: ContentGeneratorInWorkflow.

Why. Every surviving WfMake* was a structurally identical 5-line wrapper around a single start_activity(…) call. The child-workflow indirection added ≈9 Temporal history events per content-generation call with zero behavioral payoff. On deep PipeSequences this was the dominant factor approaching the 10K-event soft limit.

Impact. History events per content-generation call drop from ≈12 → 3. Per-LLM-call durability is preserved — the activity boundary is unchanged. Retry policy, timeouts, and the asymmetric inference_task_queue routing rule are re-applied at the new dispatch site. Net deletion is concentrated and mechanical: 11 files gone, 1 file added.

12 → 3history events / call
−11files deleted
+1file added
9phases shipped

1 — The shape of the problem (before)

When a PipeLLM operator inside WfPipeRouter needed to generate text, the call path went through three execution levels: router workflow, content-generation workflow, activity.

WfPipeRouter  (parent workflow)
    │
    ├─ pipe_llm._live_run_operator_pipe(...)
    │     └─ get_content_generator()           ◄── returns ContentGeneratorChild
    │     └─ content_generator.make_llm_text(...)
    │            │
    │            ▼
    │     ContentGeneratorChild.make_llm_text(...)
    │            │
    │            └─ WorkflowExecutorFactory[...].execute_child_workflow(WfMakeLLMText, ...)
    │                        │
    │                        ▼
    │              ┌───────────────────────────────────────────────┐
    │              │  WfMakeLLMText  (child workflow)              │
    │              │                                               │
    │              │   workflow.start_activity(                    │
    │              │     activity=act_llm_gen_text,                │
    │              │     arg=llm_assignment,                       │
    │              │     start_to_close_timeout=...,               │
    │              │     retry_policy=...,                         │
    │              │     task_queue=inference_task_queue,  ◄── LLM-text only
    │              │   )                                           │
    │              │                                               │
    │              │   try / except ActivityError →                │
    │              │     TemporalError.from_app_error(...)         │
    │              └───────────────────────────────────────────────┘
    │                        │
    │                        ▼
    │                act_llm_gen_text  (activity)
    │                        │
    └───── result ◄──────────┘

Every WfMake* workflow had the exact same shape:

@workflow.defn(name="wf_…")
class WfMakeXxx(WorkflowClass[XxxAssignment, ResultType]):
    @workflow.run
    async def run(self, workflow_arg: XxxAssignment) -> ResultType:
        worker_config = get_config().temporal.worker_config
        try:
            result = await workflow.start_activity(
                activity=act_xxx,
                arg=workflow_arg,
                start_to_close_timeout=worker_config.workflow_execution_timeout,
                retry_policy=worker_config.retry_policy,
                # task_queue=worker_config.inference_task_queue  ← LLM-text only
            )
        except ActivityError as exc:
            if isinstance(exc.cause, ApplicationError):
                raise TemporalError.from_app_error(exc=exc.cause) from exc
            raise
        return result

Seven copies of the same template, one per content-generation activity:

WorkflowActivitySpecial routing
WfMakeLLMTextact_llm_gen_texttask_queue=inference_task_queue
WfMakeObjectact_llm_gen_object
WfMakeObjectListact_llm_gen_object_list
WfMakeImagesact_img_gen_images
WfMakeJinja2Textact_jinja2_gen_text
WfMakeExtractact_extract_gen_extract_pages
WfRenderPageViewsact_render_page_views

There was no remaining workflow body that did anything between activities. The previous text-then-object stack — the one nontrivial case in the v1 plan — had already been removed in commit 16b775b8. Every surviving WfMake* was just 1 activity = 1 result.

2 — The Temporal history cost

A single make_llm_text from inside WfPipeRouter produced this event chain in the parent's history:

[parent: WfPipeRouter]
  StartChildWorkflowExecutionInitiated
  ChildWorkflowExecutionStarted        ◄── child framing
  ChildWorkflowExecutionCompleted/Failed

[child: WfMakeLLMText]
  WorkflowExecutionStarted
  WorkflowTaskScheduled/Started/Completed
  ActivityTaskScheduled
  ActivityTaskStarted
  ActivityTaskCompleted/Failed
  WorkflowTaskScheduled/Started/Completed
  WorkflowExecutionCompleted

≈ 12 events per LLM call

After the collapse, the same call inside the parent is just:

[parent: WfPipeRouter]
  ActivityTaskScheduled
  ActivityTaskStarted
  ActivityTaskCompleted/Failed

3 events per LLM call

For a long PipeSequence of N LLM steps, the history goes from "N child-workflow scheduling pairs in the parent + N small child histories" to "N activity scheduling pairs in the parent". Per-LLM-call durability is preserved — the activity boundary is unchanged, so the retry semantics and the durable-checkpoint guarantee on each LLM call are unaffected.

3 — The after picture

WfPipeRouter  (parent workflow)
    │
    ├─ pipe_llm._live_run_operator_pipe(...)
    │     └─ get_content_generator()           ◄── returns ContentGeneratorInWorkflow
    │     └─ content_generator.make_llm_text(...)
    │            │
    │            ▼
    │     ContentGeneratorInWorkflow.make_llm_text(...)
    │            │
    │            └─ workflow.execute_activity(             ◄── one frame, not two
    │                   act_llm_gen_text,
    │                   llm_assignment,
    │                   start_to_close_timeout=...,
    │                   retry_policy=...,
    │                   task_queue=inference_task_queue,   ◄── LLM-text only
    │                   activity_id=wfid,                  ◄── for Temporal UI
    │               )
    │
    └───── result

The activities themselves (act_llm_gen_text, act_llm_gen_object, …) are unchanged. The worker registration is unchanged (the seven act_* functions still live in the crafting TaskPack). The only thing that disappears is the layer between the router and the activity.

4 — Why this is a good trade

History-event reduction

≈12 → 3 events per content-generation call. On deep PipeSequences this is the dominant factor approaching Temporal's 10K-event soft limit. The router's history stays compact.

Durability preserved

The activity boundary is where retries and durable checkpoints live. We collapsed the workflow wrapper above it, not the activity below it. retry_policy and start_to_close_timeout are re-applied at the new dispatch site.

Less code to maintain

Two near-duplicate generator implementations (ContentGeneratorTop + ContentGeneratorChild, ≈770 lines combined — the only real difference was execute_workflow vs execute_child_workflow) collapse into one in-workflow generator.

One fewer indirection in the UI

Before: WfPipeRouter → WfMakeLLMText → act_llm_gen_text. After: WfPipeRouter → act_llm_gen_text. The activity now sits one level below the controller it belongs to — matching the actual semantics.

One source of truth for dispatch config

Today's retry / timeout / task-queue knobs were spread across the WfMake* body, ContentGeneratorChildFactory plumbing, and a test that mutates worker_config.inference_task_queue. Post-collapse, all of it applies at the single execute_activity(…) call site.

No new abstractions

The activities, the act_* functions, the crafting pack registration — all already existed. The refactor is largely deletion: 11 files out, 1 file in.

5 — Key design decisions

5.1 · Class name: ContentGeneratorInWorkflow

The InWorkflow suffix flags a load-bearing constraint at every import site: this class only works when called from inside a workflow's run(), because each method calls workflow.execute_activity(…), which hard-fails outside a workflow context. The previous ContentGeneratorChild had the same constraint but the name didn't reflect it — easy to misuse from a test that tried to construct one and call its methods directly.

5.2 · activity_id= for Temporal UI observability

Today, ContentGeneratorChild builds a child_workflow_id from the parent's id plus a wfid kwarg (e.g. "craft-text", "craft-object-direct", "extract"). Operators and tests pass wfid= to give a Temporal-UI-visible name to a specific content-generation step. After the collapse, child workflow IDs are gone, but the activity_id= kwarg on execute_activity serves the same role — Temporal Web UI shows it as the per-step breadcrumb that ops use to triage failures.

We thread wfid into activity_id= at every dispatch site. Default wfid values are method-specific constants ("craft-text", "craft-object-direct", "craft-image-single", …), so the breadcrumb survives intact for ops triage.

5.3 · Activity-id uniqueness: Strategy (i)

Today's WfMake* child workflows get globally-unique IDs because make_child_workflow_id prepends the parent's workflow_id. After collapse, activity_ids only get the workflow's own scope — no parent prefix to fall back on. Temporal requires activity_ids to be unique within a workflow execution.

A Phase 0 audit confirmed today's invariant: at most one call per ContentGeneratorProtocol method per WfPipeRouter execution. PipeLLM._live_run_operator_pipe's branching ensures make_llm_text / make_object / make_object_list are mutually exclusive; PipeImgGen branches between make_single_image and make_image_list; PipeExtract calls make_extract_pages exactly once.

Strategy (i) — "default wfid is unique per workflow" — was chosen over Strategy (ii) (per-method counter) because the latter adds complexity for a problem we don't have today and would be harder to read in the Temporal UI. Three mitigations bolt the invariant down:

5.4 · Runtime uniqueness check, replay-safe

The generator carries a self._seen_activity_ids: dict[str, set[str]] keyed by workflow.info().workflow_id. Each make_* method adds its computed activity_id to the per-workflow set and raises ContentGenerationError on duplicate insert. Two non-obvious choices:

The audit proves no current call site triggers this, so the check is a regression guard, not a runtime cost. (Follow-up: the dict has no eviction — see Follow-up TODOs.)

5.5 · Asymmetric inference_task_queue rule — provisional, isolated

Today's routing rule: LLM-text goes to inference_task_queue, everything else goes to the workflow's default queue. This was introduced as a quick split-worker test setup; it is NOT the long-term design. Real deployments will want per-provider, per-model, per-activity-class routing (full design in per-activity-queue-routing-v1.md, out of scope for this PR).

To survive cleanly until that lands, we extracted a tiny module-private helper:

pipelex/temporal/tprl_content_generation/content_generator_in_workflow.py
def _inference_dispatch_kwargs(worker_config) -> dict[str, Any]:
    """Stopgap. Single deletion point when per-activity routing lands.
    See wip/temporal-primitives/per-activity-queue-routing-v1.md."""
    return {"task_queue": worker_config.inference_task_queue}

Called at the single LLM-text dispatch site by **-unpacking. Unit tests pin both directions: LLM-text passes task_queue=worker_config.inference_task_queue, every other method must NOT pass task_queue= (over-routing would break split-worker production where the runner doesn't register image-gen / extract on the inference queue).

6 — Code: the LLM-text dispatch, before vs after

The whole refactor boils down to substituting one Temporal SDK call for another. Below is the LLM-text path. The other six methods follow the identical pattern.

Before — ContentGeneratorChild + WfMakeLLMText

# content_generator_child.py
async def make_llm_text(
    self,
    llm_assignment: LLMAssignment,
    wfid: str = "craft-text",
    ...
) -> str:
    child_workflow_id = make_child_workflow_id(
        parent_id=workflow.info().workflow_id,
        wfid=wfid,
    )
    return await self._workflow_executor \
        .execute_child_workflow(
            WfMakeLLMText,
            arg=llm_assignment,
            id=child_workflow_id,
            ...
        )

# wf_make_llm_text.py
@workflow.defn(name="wf_make_llm_text")
class WfMakeLLMText(
    WorkflowClass[LLMAssignment, str]
):
    @workflow.run
    async def run(
        self, arg: LLMAssignment
    ) -> str:
        wc = get_config().temporal.worker_config
        try:
            return await workflow.start_activity(
                activity=act_llm_gen_text,
                arg=arg,
                start_to_close_timeout=
                    wc.workflow_execution_timeout,
                retry_policy=wc.retry_policy,
                task_queue=wc.inference_task_queue,
            )
        except ActivityError as exc:
            if isinstance(exc.cause, ApplicationError):
                raise TemporalError.from_app_error(
                    exc=exc.cause
                ) from exc
            raise

After — ContentGeneratorInWorkflow only

# content_generator_in_workflow.py
async def make_llm_text(
    self,
    llm_assignment: LLMAssignment,
    wfid: str = "craft-text",
    ...
) -> str:
    self._check_activity_id_unique(wfid)
    wc = get_config().temporal.worker_config
    try:
        # LLM-text only: route to inference queue.
        # See _inference_dispatch_kwargs docstring.
        return await workflow.execute_activity(
            act_llm_gen_text,
            llm_assignment,
            start_to_close_timeout=
                wc.workflow_execution_timeout,
            retry_policy=wc.retry_policy,
            activity_id=wfid,
            **_inference_dispatch_kwargs(wc),
        )
    except ActivityError as exc:
        if isinstance(exc.cause, ApplicationError):
            raise TemporalError.from_app_error(
                exc=exc.cause
            ) from exc
        raise

# wf_make_llm_text.py
#  ──────── DELETED ────────

Notice what stayed identical: the try/except ActivityError → TemporalError.from_app_error(…) translation, the start_to_close_timeout + retry_policy values, and the asymmetric task_queue routing. The behavioral surface is preserved; only the indirection is gone.

7 — The runtime uniqueness check, in code

pipelex/temporal/tprl_content_generation/content_generator_in_workflow.py
class ContentGeneratorInWorkflow:
    def __init__(self):
        # Keyed by workflow_id because this generator instance is set once on
        # the hub and reused across many workflow runs. A flat set[str] would
        # produce cross-run false positives.
        self._seen_activity_ids: dict[str, set[str]] = {}
 
    def _check_activity_id_unique(self, activity_id: str) -> None:
        # During replay, Temporal re-walks the history. The set is already
        # populated from the original execution; without this guard, a
        # cache-eviction replay would raise a spurious duplicate.
        if workflow.unsafe.is_replaying():
            return
        wf_id = workflow.info().workflow_id
        seen = self._seen_activity_ids.setdefault(wf_id, set())
        if activity_id in seen:
            msg = f"Duplicate activity_id {activity_id!r} in workflow {wf_id!r}"
            raise ContentGenerationError(msg)
        seen.add(activity_id)
Why this is a guard, not a hot-path cost. The Phase 0 audit proved no current call site can trigger this (every protocol method is mutually exclusive within one PipeOperator execution). The check is here to catch future regressions: a new operator call site that calls the same protocol method twice would silently break on Temporal's "duplicate activity_id" error; instead it now fails fast with a typed Pipelex error and a clear message.

8 — The make_extract_pages page-views fix

This is the one place where the refactor isn't pure deletion — it fixes a pre-existing behavioral divergence between the direct-mode and Temporal-mode generators.

The direct-mode ContentGenerator.make_extract_pages augments its return when extract_job_params.should_include_page_views is true: it calls make_render_page_views(…) for multi-page document_uri inputs, or constructs a single-element page view inline for image_uri inputs.

The Temporal-mode ContentGeneratorChild.make_extract_pages did NOT. It just dispatched WfMakeExtract and returned the page contents as-is, missing both augmentations. Per the project's "flag and fix existing bugs" rule, the collapse is the right moment to fix it.

The new make_extract_pages mirrors the direct generator exactly — and it is the only protocol method that dispatches more than one activity, so it earns special handling for activity_id uniqueness:

ContentGeneratorInWorkflow.make_extract_pages
async def make_extract_pages(self, ...) -> ExtractPagesOutput:
    base_id = wfid  # e.g. "extract"
    pages_id = f"{base_id}-pages"
    page_views_id = f"{base_id}-render-page-views"
    self._check_activity_id_unique(pages_id)
 
    # First activity: real extraction.
    output = await workflow.execute_activity(
        act_extract_gen_extract_pages, ...,
        activity_id=pages_id,
    )
 
    if not extract_job_params.should_include_page_views:
        return output  # no double-emit when false
 
+    if extract_input.document_uri is not None:
+        # Second activity: render page views from the document.
+        self._check_activity_id_unique(page_views_id)
+        page_views = await workflow.execute_activity(
+            act_render_page_views, ...,
+            activity_id=page_views_id,
+        )
+    elif extract_input.image_uri is not None:
+        # Single image → single-element page view, no second activity.
+        page_views = [ImageContent(url=extract_input.image_uri)]
+    else:
+        page_views = []
+
+    if len(page_views) != len(output.page_contents):
+        raise ContentGenerationError("page_views length mismatch")
+
+    for page_content in output.page_contents:
+        page_content.page_view = page_views.pop(0)
    return output

The two-activity branch is the most fragile post-collapse path — it's the only place where the activity_id uniqueness mitigation actually matters in production. It's covered three ways:

9 — File-level inventory

Deleted −11 files

All under pipelex/temporal/tprl_content_generation/:

Plus four test files: test_tprl_content_generator_top.py, test_tprl_make_content_generator.py, and two already-commented historical files test_wf_gen_text.py + test_wf_jinja2.py.

Added +1 file

Modified touched

10 — Phase-by-phase landing plan

Each phase ends with make agent-check && make agent-test. Two checkpoints — natural commit boundaries — are placed at A (ready to flip the flag) and B (codebase in target state).

PhaseWhat landsRisk gate
0Pre-flight audit: confirm per-workflow uniqueness invariant for Strategy (i). No code changes — produces an entry in Decisions.30 min audit vs. duplicate-activity_id runtime errors later.
1Build the new ContentGeneratorInWorkflow + factory + unit tests. No wiring.Compile-only.
2Wire behind env-flag PIPELEX_USE_IN_WORKFLOW_CONTENT_GENERATOR in pipelex.py + both Temporal conftests. Default OFF.Both flag values pass the targeted Temporal subsets.
3Re-point WfTestContentGeneratorChild to the new generator (test fixture; exercises new code regardless of flag).content_generation/ integration suite green.
4Fix the make_extract_pages page-views asymmetry (folded into Phase 1 since it's a Phase 1 method).Unit tests cover all three branches.
✅ ACheckpoint A — new generator validated end-to-end, ready to flip. Commit boundary.
5Flip the env-flag default. Rename to PIPELEX_USE_LEGACY_CONTENT_GENERATOR so polarity is self-documenting.Full make agent-test green under new default.
6Delete the old surface in one commit: 11 source files, 4 test files, plus the env-flag branch. Re-generate _generated_model_sets.py after make cleanderived.make tb + make agent-test green.
7Cross-process coverage: TestSplitWorkerObjectGen (Tier 9), Tier 10 image-gen split-worker negative-routing assertion, TestSplitWorkerExtractPages (Tier 11). Extract _inference_dispatch_kwargs helper to isolate the provisional 2-queue model.Three new tests pass.
8Update docstrings + docs in docs/under-the-hood/, the tracing docstrings, and CHANGELOG.Grep verifies no stale WfMake* / wf_make_* references outside CHANGELOG.
✅ BCheckpoint B — codebase in target state. Commit boundary.
9Final verification: full real-server pytest run + Temporal history smoke against split workers + replay-history grep.Zero wf_make_* / wf_render_page_views workflow types observed in temporal workflow list.

11 — Where bugs are most likely to hide

1. The asymmetric task_queue=inference_task_queue rule. Easy to forget at the LLM-text site or to over-apply elsewhere. Mis-routing image-gen to the inference queue would break split-worker production where the runner doesn't register the image-gen activity. Covered by the unit test pinning the helper symbol on both positive (LLM-text) and negative (other methods) assertions.
2. activity_id collisions under repeated calls. The default wfid values are method-specific constants; they do NOT disambiguate repeated calls to the same method. The Phase 1 runtime check (dict[workflow_id, set[str]], gated by workflow.unsafe.is_replaying()) converts this from a documented invariant to a checked one and is replay-safe.
3. model_validate(obj.model_dump(mode="json", serialize_as_any=True)) round-trips for make_object / make_object_list. Required because the activity boundary returns a generic BaseModel. Use mode="json" on BOTH — ContentGeneratorChild.make_object_list omitted it (pre-existing asymmetry). Tier 9 cross-process test (TestSplitWorkerObjectGen) validates the nested-field round-trip through Temporal's data converter.
4. Page-views augmentation in make_extract_pages. Mirror the direct generator's branching exactly: no double-emit when should_include_page_views is false; handle both document_uri (multi-page render) and image_uri (single-image) inputs; assert length match. Triple-covered (unit + real-PDF + cross-process).
5. Test infrastructure bypass. Both tests/integration/pipelex/temporal/conftest.py and test_payload_codec_pipeline.py explicitly call pipelex_hub.set_content_generator(...) after Pipelex.make(), so the env-flag branch in pipelex.py was bypassed in tests during Phase 2. Both conftests had to mirror the flag, then drop the branch at Phase 6.

12 — Follow-up TODOs (out of scope)

13 — TL;DR for the reviewer

  1. Seven WfMake* child workflows were structurally identical 5-line wrappers around one activity each. Gone.
  2. Two near-duplicate generators (ContentGeneratorTop, ContentGeneratorChild) collapse into one ContentGeneratorInWorkflow that calls workflow.execute_activity(…) directly.
  3. History events per content-generation call: ≈12 → 3. Per-LLM-call durability preserved (activity boundary unchanged).
  4. The asymmetric inference_task_queue rule survives — isolated in _inference_dispatch_kwargs, marked provisional, single deletion point when per-activity routing lands.
  5. Activity_id observability preserved via activity_id=wfid; runtime uniqueness check is replay-safe and per-workflow.
  6. The make_extract_pages page-views asymmetry between direct-mode and Temporal-mode is fixed in passing (project's "flag and fix" rule).
  7. Net: 11 files deleted, 1 added; behavior preserved on every covered path; tests strengthened cross-process for the post-collapse paths.

Companion docs: collapse-content-generation-workflow-layer-v2.md (the why), TODOS.md (the executable plan), per-activity-queue-routing-v1.md (the deletion target for the provisional queue rule).