tprl_content_generation/ workflow layerWhat. Replaced seven near-identical WfMake* child workflows with direct workflow.execute_activity(act_*, …) calls inside WfPipeRouter. Deleted two near-duplicate generator implementations (ContentGeneratorTop + ContentGeneratorChild) and their factories, and a small assignment-types models file. Added one new in-workflow generator: ContentGeneratorInWorkflow.
Why. Every surviving WfMake* was a structurally identical 5-line wrapper around a single start_activity(…) call. The child-workflow indirection added ≈9 Temporal history events per content-generation call with zero behavioral payoff. On deep PipeSequences this was the dominant factor approaching the 10K-event soft limit.
Impact. History events per content-generation call drop from ≈12 → 3. Per-LLM-call durability is preserved — the activity boundary is unchanged. Retry policy, timeouts, and the asymmetric inference_task_queue routing rule are re-applied at the new dispatch site. Net deletion is concentrated and mechanical: 11 files gone, 1 file added.
When a PipeLLM operator inside WfPipeRouter needed to generate text, the call path went through three execution levels: router workflow, content-generation workflow, activity.
WfPipeRouter (parent workflow)
│
├─ pipe_llm._live_run_operator_pipe(...)
│ └─ get_content_generator() ◄── returns ContentGeneratorChild
│ └─ content_generator.make_llm_text(...)
│ │
│ ▼
│ ContentGeneratorChild.make_llm_text(...)
│ │
│ └─ WorkflowExecutorFactory[...].execute_child_workflow(WfMakeLLMText, ...)
│ │
│ ▼
│ ┌───────────────────────────────────────────────┐
│ │ WfMakeLLMText (child workflow) │
│ │ │
│ │ workflow.start_activity( │
│ │ activity=act_llm_gen_text, │
│ │ arg=llm_assignment, │
│ │ start_to_close_timeout=..., │
│ │ retry_policy=..., │
│ │ task_queue=inference_task_queue, ◄── LLM-text only
│ │ ) │
│ │ │
│ │ try / except ActivityError → │
│ │ TemporalError.from_app_error(...) │
│ └───────────────────────────────────────────────┘
│ │
│ ▼
│ act_llm_gen_text (activity)
│ │
└───── result ◄──────────┘
Every WfMake* workflow had the exact same shape:
@workflow.defn(name="wf_…")
class WfMakeXxx(WorkflowClass[XxxAssignment, ResultType]):
@workflow.run
async def run(self, workflow_arg: XxxAssignment) -> ResultType:
worker_config = get_config().temporal.worker_config
try:
result = await workflow.start_activity(
activity=act_xxx,
arg=workflow_arg,
start_to_close_timeout=worker_config.workflow_execution_timeout,
retry_policy=worker_config.retry_policy,
# task_queue=worker_config.inference_task_queue ← LLM-text only
)
except ActivityError as exc:
if isinstance(exc.cause, ApplicationError):
raise TemporalError.from_app_error(exc=exc.cause) from exc
raise
return result
Seven copies of the same template, one per content-generation activity:
| Workflow | Activity | Special routing |
|---|---|---|
WfMakeLLMText | act_llm_gen_text | task_queue=inference_task_queue |
WfMakeObject | act_llm_gen_object | — |
WfMakeObjectList | act_llm_gen_object_list | — |
WfMakeImages | act_img_gen_images | — |
WfMakeJinja2Text | act_jinja2_gen_text | — |
WfMakeExtract | act_extract_gen_extract_pages | — |
WfRenderPageViews | act_render_page_views | — |
There was no remaining workflow body that did anything between activities. The previous text-then-object stack — the one nontrivial case in the v1 plan — had already been removed in commit 16b775b8. Every surviving WfMake* was just 1 activity = 1 result.
A single make_llm_text from inside WfPipeRouter produced this event chain in the parent's history:
[parent: WfPipeRouter] StartChildWorkflowExecutionInitiated ChildWorkflowExecutionStarted ◄── child framing ChildWorkflowExecutionCompleted/Failed [child: WfMakeLLMText] WorkflowExecutionStarted WorkflowTaskScheduled/Started/Completed ActivityTaskScheduled ActivityTaskStarted ActivityTaskCompleted/Failed WorkflowTaskScheduled/Started/Completed WorkflowExecutionCompleted ≈ 12 events per LLM call
After the collapse, the same call inside the parent is just:
[parent: WfPipeRouter] ActivityTaskScheduled ActivityTaskStarted ActivityTaskCompleted/Failed 3 events per LLM call
For a long PipeSequence of N LLM steps, the history goes from "N child-workflow scheduling pairs in the parent + N small child histories" to "N activity scheduling pairs in the parent". Per-LLM-call durability is preserved — the activity boundary is unchanged, so the retry semantics and the durable-checkpoint guarantee on each LLM call are unaffected.
WfPipeRouter (parent workflow)
│
├─ pipe_llm._live_run_operator_pipe(...)
│ └─ get_content_generator() ◄── returns ContentGeneratorInWorkflow
│ └─ content_generator.make_llm_text(...)
│ │
│ ▼
│ ContentGeneratorInWorkflow.make_llm_text(...)
│ │
│ └─ workflow.execute_activity( ◄── one frame, not two
│ act_llm_gen_text,
│ llm_assignment,
│ start_to_close_timeout=...,
│ retry_policy=...,
│ task_queue=inference_task_queue, ◄── LLM-text only
│ activity_id=wfid, ◄── for Temporal UI
│ )
│
└───── result
The activities themselves (act_llm_gen_text, act_llm_gen_object, …) are unchanged. The worker registration is unchanged (the seven act_* functions still live in the crafting TaskPack). The only thing that disappears is the layer between the router and the activity.
≈12 → 3 events per content-generation call. On deep PipeSequences this is the dominant factor approaching Temporal's 10K-event soft limit. The router's history stays compact.
The activity boundary is where retries and durable checkpoints live. We collapsed the workflow wrapper above it, not the activity below it. retry_policy and start_to_close_timeout are re-applied at the new dispatch site.
Two near-duplicate generator implementations (ContentGeneratorTop + ContentGeneratorChild, ≈770 lines combined — the only real difference was execute_workflow vs execute_child_workflow) collapse into one in-workflow generator.
Before: WfPipeRouter → WfMakeLLMText → act_llm_gen_text. After: WfPipeRouter → act_llm_gen_text. The activity now sits one level below the controller it belongs to — matching the actual semantics.
Today's retry / timeout / task-queue knobs were spread across the WfMake* body, ContentGeneratorChildFactory plumbing, and a test that mutates worker_config.inference_task_queue. Post-collapse, all of it applies at the single execute_activity(…) call site.
The activities, the act_* functions, the crafting pack registration — all already existed. The refactor is largely deletion: 11 files out, 1 file in.
ContentGeneratorInWorkflowThe InWorkflow suffix flags a load-bearing constraint at every import site: this class only works when called from inside a workflow's run(), because each method calls workflow.execute_activity(…), which hard-fails outside a workflow context. The previous ContentGeneratorChild had the same constraint but the name didn't reflect it — easy to misuse from a test that tried to construct one and call its methods directly.
activity_id= for Temporal UI observabilityToday, ContentGeneratorChild builds a child_workflow_id from the parent's id plus a wfid kwarg (e.g. "craft-text", "craft-object-direct", "extract"). Operators and tests pass wfid= to give a Temporal-UI-visible name to a specific content-generation step. After the collapse, child workflow IDs are gone, but the activity_id= kwarg on execute_activity serves the same role — Temporal Web UI shows it as the per-step breadcrumb that ops use to triage failures.
We thread wfid into activity_id= at every dispatch site. Default wfid values are method-specific constants ("craft-text", "craft-object-direct", "craft-image-single", …), so the breadcrumb survives intact for ops triage.
Today's WfMake* child workflows get globally-unique IDs because make_child_workflow_id prepends the parent's workflow_id. After collapse, activity_ids only get the workflow's own scope — no parent prefix to fall back on. Temporal requires activity_ids to be unique within a workflow execution.
A Phase 0 audit confirmed today's invariant: at most one call per ContentGeneratorProtocol method per WfPipeRouter execution. PipeLLM._live_run_operator_pipe's branching ensures make_llm_text / make_object / make_object_list are mutually exclusive; PipeImgGen branches between make_single_image and make_image_list; PipeExtract calls make_extract_pages exactly once.
Strategy (i) — "default wfid is unique per workflow" — was chosen over Strategy (ii) (per-method counter) because the latter adds complexity for a problem we don't have today and would be harder to read in the Temporal UI. Three mitigations bolt the invariant down:
wfids, fixing the pre-existing duplicate "craft-image" shared by make_single_image and make_image_list — split into "craft-image-single" / "craft-image-list".make_extract_pages: this is the only method that dispatches two activities, so we explicitly construct f"{wfid}-pages" for act_extract_gen_extract_pages and f"{wfid}-render-page-views" for the conditional act_render_page_views.The generator carries a self._seen_activity_ids: dict[str, set[str]] keyed by workflow.info().workflow_id. Each make_* method adds its computed activity_id to the per-workflow set and raises ContentGenerationError on duplicate insert. Two non-obvious choices:
pipelex.py wires it once). A flat set[str] would produce cross-run false positives the second time any workflow hits the same default wfid.workflow.unsafe.is_replaying(). During replay, the check short-circuits. Without this, a cache-eviction replay on the same worker process would raise a spurious "duplicate activity_id" against the populated set from the original execution — the worker process holds the set in memory across replays of the same workflow_id.The audit proves no current call site triggers this, so the check is a regression guard, not a runtime cost. (Follow-up: the dict has no eviction — see Follow-up TODOs.)
inference_task_queue rule — provisional, isolatedToday's routing rule: LLM-text goes to inference_task_queue, everything else goes to the workflow's default queue. This was introduced as a quick split-worker test setup; it is NOT the long-term design. Real deployments will want per-provider, per-model, per-activity-class routing (full design in per-activity-queue-routing-v1.md, out of scope for this PR).
To survive cleanly until that lands, we extracted a tiny module-private helper:
def _inference_dispatch_kwargs(worker_config) -> dict[str, Any]: """Stopgap. Single deletion point when per-activity routing lands. See wip/temporal-primitives/per-activity-queue-routing-v1.md.""" return {"task_queue": worker_config.inference_task_queue}
Called at the single LLM-text dispatch site by **-unpacking. Unit tests pin both directions: LLM-text passes task_queue=worker_config.inference_task_queue, every other method must NOT pass task_queue= (over-routing would break split-worker production where the runner doesn't register image-gen / extract on the inference queue).
The whole refactor boils down to substituting one Temporal SDK call for another. Below is the LLM-text path. The other six methods follow the identical pattern.
# content_generator_child.py
async def make_llm_text(
self,
llm_assignment: LLMAssignment,
wfid: str = "craft-text",
...
) -> str:
child_workflow_id = make_child_workflow_id(
parent_id=workflow.info().workflow_id,
wfid=wfid,
)
return await self._workflow_executor \
.execute_child_workflow(
WfMakeLLMText,
arg=llm_assignment,
id=child_workflow_id,
...
)
# wf_make_llm_text.py
@workflow.defn(name="wf_make_llm_text")
class WfMakeLLMText(
WorkflowClass[LLMAssignment, str]
):
@workflow.run
async def run(
self, arg: LLMAssignment
) -> str:
wc = get_config().temporal.worker_config
try:
return await workflow.start_activity(
activity=act_llm_gen_text,
arg=arg,
start_to_close_timeout=
wc.workflow_execution_timeout,
retry_policy=wc.retry_policy,
task_queue=wc.inference_task_queue,
)
except ActivityError as exc:
if isinstance(exc.cause, ApplicationError):
raise TemporalError.from_app_error(
exc=exc.cause
) from exc
raise
# content_generator_in_workflow.py
async def make_llm_text(
self,
llm_assignment: LLMAssignment,
wfid: str = "craft-text",
...
) -> str:
self._check_activity_id_unique(wfid)
wc = get_config().temporal.worker_config
try:
# LLM-text only: route to inference queue.
# See _inference_dispatch_kwargs docstring.
return await workflow.execute_activity(
act_llm_gen_text,
llm_assignment,
start_to_close_timeout=
wc.workflow_execution_timeout,
retry_policy=wc.retry_policy,
activity_id=wfid,
**_inference_dispatch_kwargs(wc),
)
except ActivityError as exc:
if isinstance(exc.cause, ApplicationError):
raise TemporalError.from_app_error(
exc=exc.cause
) from exc
raise
# wf_make_llm_text.py
# ──────── DELETED ────────
Notice what stayed identical: the try/except ActivityError → TemporalError.from_app_error(…) translation, the start_to_close_timeout + retry_policy values, and the asymmetric task_queue routing. The behavioral surface is preserved; only the indirection is gone.
class ContentGeneratorInWorkflow: def __init__(self): # Keyed by workflow_id because this generator instance is set once on # the hub and reused across many workflow runs. A flat set[str] would # produce cross-run false positives. self._seen_activity_ids: dict[str, set[str]] = {} def _check_activity_id_unique(self, activity_id: str) -> None: # During replay, Temporal re-walks the history. The set is already # populated from the original execution; without this guard, a # cache-eviction replay would raise a spurious duplicate. if workflow.unsafe.is_replaying(): return wf_id = workflow.info().workflow_id seen = self._seen_activity_ids.setdefault(wf_id, set()) if activity_id in seen: msg = f"Duplicate activity_id {activity_id!r} in workflow {wf_id!r}" raise ContentGenerationError(msg) seen.add(activity_id)
PipeOperator execution). The check is here to catch future regressions: a new operator call site that calls the same protocol method twice would silently break on Temporal's "duplicate activity_id" error; instead it now fails fast with a typed Pipelex error and a clear message.
make_extract_pages page-views fixThis is the one place where the refactor isn't pure deletion — it fixes a pre-existing behavioral divergence between the direct-mode and Temporal-mode generators.
The direct-mode ContentGenerator.make_extract_pages augments its return when extract_job_params.should_include_page_views is true: it calls make_render_page_views(…) for multi-page document_uri inputs, or constructs a single-element page view inline for image_uri inputs.
The Temporal-mode ContentGeneratorChild.make_extract_pages did NOT. It just dispatched WfMakeExtract and returned the page contents as-is, missing both augmentations. Per the project's "flag and fix existing bugs" rule, the collapse is the right moment to fix it.
The new make_extract_pages mirrors the direct generator exactly — and it is the only protocol method that dispatches more than one activity, so it earns special handling for activity_id uniqueness:
async def make_extract_pages(self, ...) -> ExtractPagesOutput: base_id = wfid # e.g. "extract" pages_id = f"{base_id}-pages" page_views_id = f"{base_id}-render-page-views" self._check_activity_id_unique(pages_id) # First activity: real extraction. output = await workflow.execute_activity( act_extract_gen_extract_pages, ..., activity_id=pages_id, ) if not extract_job_params.should_include_page_views: return output # no double-emit when false + if extract_input.document_uri is not None: + # Second activity: render page views from the document. + self._check_activity_id_unique(page_views_id) + page_views = await workflow.execute_activity( + act_render_page_views, ..., + activity_id=page_views_id, + ) + elif extract_input.image_uri is not None: + # Single image → single-element page view, no second activity. + page_views = [ImageContent(url=extract_input.image_uri)] + else: + page_views = [] + + if len(page_views) != len(output.page_contents): + raise ContentGenerationError("page_views length mismatch") + + for page_content in output.page_contents: + page_content.page_view = page_views.pop(0) return output
The two-activity branch is the most fragile post-collapse path — it's the only place where the activity_id uniqueness mitigation actually matters in production. It's covered three ways:
image_uri, document_uri.TestTprlContentGeneratorPdfPageViews exercises the multi-page document_uri branch end-to-end through real Temporal, against a local 2-page PDF (multi-page catches pop(0) ordering bugs that a 1-page PDF would not).TestSplitWorkerExtractPages asserts via WorkflowHandle.fetch_history() that two ActivityTaskScheduled events appear with activity_ids "extract-pages" and "extract-render-page-views" — pins the contract.All under pipelex/temporal/tprl_content_generation/:
wf_make_llm_text.pywf_make_object.py (held both WfMakeObject and WfMakeObjectList)wf_make_images.pywf_make_jinja2_text.pywf_make_extract.pywf_render_page_views.pycontent_generator_top.py + content_generator_top_factory.py (test-only, dispatched WfMake* from the submitter side)content_generator_child.py + content_generator_child_factory.pycontent_generator_models.py (the AssignmentType / ResultType unions, only consumed by the two deleted generators)Plus four test files: test_tprl_content_generator_top.py, test_tprl_make_content_generator.py, and two already-commented historical files test_wf_gen_text.py + test_wf_jinja2.py.
content_generator_in_workflow.py — the single new generator implementing ContentGeneratorProtocol, one method per activity, ~300 lines.content_generator_in_workflow_factory.py — its producer.tests/unit/pipelex/temporal/test_content_generator_in_workflow.py — covers task_queue positive/negative, activity_id threading, the craft-image-single / craft-image-list split, the two-activity branch in make_extract_pages, the runtime duplicate check, replay-safety of the check, and ActivityError → TemporalError translation (both branches).pipelex/temporal/tasks.py — dropped the WfMake* + WfRenderPageViews imports; crafting TaskPack's workflow_list is now empty. Activity list unchanged.pipelex/pipelex.py — when temporal.is_enabled, constructs ContentGeneratorInWorkflowFactory unconditionally. The feature flag (briefly named PIPELEX_USE_IN_WORKFLOW_CONTENT_GENERATOR, then flipped and renamed to PIPELEX_USE_LEGACY_CONTENT_GENERATOR) was deleted at Phase 6.pipelex/temporal/test_extras/wf_test_content_generator_child.py — re-pointed to construct the new generator.content_generation/) — removed the top_crafter / child_crafter fixtures and their imports.test_split_worker_usage.py and helpers.py — describe the direct-activity-call topology.docs/under-the-hood/pipe-routing-and-execution.md — renamed "Content Generation Workflows" section to "Content Generation Activities"; replaced the two-column workflow|activity table with a single activity table.docs/under-the-hood/temporal-integration.md — four reference updates (PipeSequence topology box, Controller Dispatch Patterns table, "Internal sub-workflows" subsection renamed, Known Limitation section).CHANGELOG.md [Unreleased] — a new Changed entry covering the deletions, durability invariant, activity_id naming, the isolated inference_task_queue rule, and the page-views fix.Each phase ends with make agent-check && make agent-test. Two checkpoints — natural commit boundaries — are placed at A (ready to flip the flag) and B (codebase in target state).
| Phase | What lands | Risk gate |
|---|---|---|
| 0 | Pre-flight audit: confirm per-workflow uniqueness invariant for Strategy (i). No code changes — produces an entry in Decisions. | 30 min audit vs. duplicate-activity_id runtime errors later. |
| 1 | Build the new ContentGeneratorInWorkflow + factory + unit tests. No wiring. | Compile-only. |
| 2 | Wire behind env-flag PIPELEX_USE_IN_WORKFLOW_CONTENT_GENERATOR in pipelex.py + both Temporal conftests. Default OFF. | Both flag values pass the targeted Temporal subsets. |
| 3 | Re-point WfTestContentGeneratorChild to the new generator (test fixture; exercises new code regardless of flag). | content_generation/ integration suite green. |
| 4 | Fix the make_extract_pages page-views asymmetry (folded into Phase 1 since it's a Phase 1 method). | Unit tests cover all three branches. |
| ✅ A | Checkpoint A — new generator validated end-to-end, ready to flip. Commit boundary. | |
| 5 | Flip the env-flag default. Rename to PIPELEX_USE_LEGACY_CONTENT_GENERATOR so polarity is self-documenting. | Full make agent-test green under new default. |
| 6 | Delete the old surface in one commit: 11 source files, 4 test files, plus the env-flag branch. Re-generate _generated_model_sets.py after make cleanderived. | make tb + make agent-test green. |
| 7 | Cross-process coverage: TestSplitWorkerObjectGen (Tier 9), Tier 10 image-gen split-worker negative-routing assertion, TestSplitWorkerExtractPages (Tier 11). Extract _inference_dispatch_kwargs helper to isolate the provisional 2-queue model. | Three new tests pass. |
| 8 | Update docstrings + docs in docs/under-the-hood/, the tracing docstrings, and CHANGELOG. | Grep verifies no stale WfMake* / wf_make_* references outside CHANGELOG. |
| ✅ B | Checkpoint B — codebase in target state. Commit boundary. | |
| 9 | Final verification: full real-server pytest run + Temporal history smoke against split workers + replay-history grep. | Zero wf_make_* / wf_render_page_views workflow types observed in temporal workflow list. |
task_queue=inference_task_queue rule. Easy to forget at the LLM-text site or to over-apply elsewhere. Mis-routing image-gen to the inference queue would break split-worker production where the runner doesn't register the image-gen activity. Covered by the unit test pinning the helper symbol on both positive (LLM-text) and negative (other methods) assertions.
activity_id collisions under repeated calls. The default wfid values are method-specific constants; they do NOT disambiguate repeated calls to the same method. The Phase 1 runtime check (dict[workflow_id, set[str]], gated by workflow.unsafe.is_replaying()) converts this from a documented invariant to a checked one and is replay-safe.
model_validate(obj.model_dump(mode="json", serialize_as_any=True)) round-trips for make_object / make_object_list. Required because the activity boundary returns a generic BaseModel. Use mode="json" on BOTH — ContentGeneratorChild.make_object_list omitted it (pre-existing asymmetry). Tier 9 cross-process test (TestSplitWorkerObjectGen) validates the nested-field round-trip through Temporal's data converter.
make_extract_pages. Mirror the direct generator's branching exactly: no double-emit when should_include_page_views is false; handle both document_uri (multi-page render) and image_uri (single-image) inputs; assert length match. Triple-covered (unit + real-PDF + cross-process).
tests/integration/pipelex/temporal/conftest.py and test_payload_codec_pipeline.py explicitly call pipelex_hub.set_content_generator(...) after Pipelex.make(), so the env-flag branch in pipelex.py was bypassed in tests during Phase 2. Both conftests had to mirror the flag, then drop the branch at Phase 6.
inference_task_queue binary model with a general routing system covering LLM, image-gen, and extract — keyed by (activity_name, runtime_handle). Full design in wip/temporal-primitives/per-activity-queue-routing-v1.md. The Phase 7 stopgap helper _inference_dispatch_kwargs is the single deletion point when this lands.PipeSequence of N inference calls schedules N child workflows, decoupling activity scheduling across child task lists. Post-collapse, all N activities run through the parent's task list (or inference_task_queue for LLM-text). For deep pipelines (N > 50), single-queue QPS could spike. Watch ScheduleToStartLatency on both queues. Not blocking; upside dominates.make_render_page_views from ContentGeneratorProtocol if no operator caller emerges within one release cycle. Today it's only invoked internally from make_extract_pages; the Protocol method is dead code from the operator side. Estimated savings: ~50 lines across four files.make_extract_pages Protocol-violating signature in the direct generator (defaults extract_job_params / extract_job_config to None while the Protocol requires non-None)._seen_activity_ids growth. The dict never evicts. A long-running worker accumulates one entry per processed workflow_id over its lifetime. Add an eviction hook on workflow completion, or move the set into workflow-local state via a contextvar set at the start of WfPipeRouter.run.WfMake* child workflows were structurally identical 5-line wrappers around one activity each. Gone.ContentGeneratorTop, ContentGeneratorChild) collapse into one ContentGeneratorInWorkflow that calls workflow.execute_activity(…) directly.inference_task_queue rule survives — isolated in _inference_dispatch_kwargs, marked provisional, single deletion point when per-activity routing lands.activity_id=wfid; runtime uniqueness check is replay-safe and per-workflow.make_extract_pages page-views asymmetry between direct-mode and Temporal-mode is fixed in passing (project's "flag and fix" rule).Companion docs: collapse-content-generation-workflow-layer-v2.md (the why), TODOS.md (the executable plan), per-activity-queue-routing-v1.md (the deletion target for the provisional queue rule).