A walk-through of the redesign on feature/Temporal-ids. We rebuilt Pipelex's Temporal identity layer around the principle: Pipelex already mints the right IDs — thread them straight through Temporal's primitives, don't invent new ones. Phases 1–6 are shipped; this note explains the shape so you can navigate the diff without reading every plan file.
Pipelex was customizing activity_id="craft-text" for LLM-text calls. When a pipe made two LLM calls in the same workflow run, Temporal rejected the second one because activity IDs must be unique among open activities of a run. The root cause was a disambiguator living on the wrong layer — at the worker, instead of on the workflow.
Workflow IDs looked like EdgdJ-HR5fd-TemporalPipeRun-pipe-router — a truncated session prefix, a random short-uuid, and the calling class name. No way to filter by pipe, no way to find a run from our app's pipeline_run_id, no semantic context in the Workflow Type column either (everything registered as wf_pipe_run / wf_pipe_router).
Temporal makes a hard distinction between IDs (per-instance, unique, runtime) and Types (per-class, registered, static). Get those mixed up and the design will be wrong before it starts.
| Primitive | What it is | UI visibility | Determinism rule |
|---|---|---|---|
| Workflow ID | User-supplied per-instance string. Unique among open workflows in a namespace. | Primary clickable identifier in every list / detail view. | Set client-side: anything. Set inside workflow code (child): must be replay-safe. |
| Run ID | Server-assigned UUID, one per run inside a chain. | Shown next to Workflow ID. Disambiguates retries / continue-as-new. | You don't control it. |
| Workflow Type | Registered class name. Static. Same set on every worker polling the queue. | "Type" column in the list view. | n/a — registered at worker boot. |
| Activity ID | Per-invocation string. Unique among open activities of a run. | Shown in Event History next to Activity Type. | Inside workflow code: replay-safe. SDK default = sequential integer by history position. |
| Activity Type | Registered function name. Static. | "Activity Type" in Event History — tells you "this was an LLM call". | n/a — registered at worker boot. |
| Task Queue | Routing channel that workers poll. | Shown per-execution; filterable. | n/a — operational. |
| Search Attributes | Typed, indexed, filterable key/value pairs. | Filterable columns in the list view. | n/a — derived from workflow input. |
| Static Summary / Details | 200-byte / 20 KB Markdown set at workflow start. | Purple text in list / detail views. | Set at submit time — no replay concern. |
| Per-activity Summary | 200-byte Markdown set per execute_activity call. | Purple text on ActivityTaskScheduled events. | Must be a pure function of workflow input. |
One sheet, one decision per row. No information is lost on the way down, no information is invented on the way up.
| Pipelex concept | Temporal primitive | Why |
|---|---|---|
pipeline_run_id (existing UUID) | Workflow ID (top-level) | Already unique. Already used by our app. No reason to truncate or wrap it. |
| Implicit "child role" | Slash-suffix on parent's Workflow ID | Path-like, trivially parseable with split("/"). |
| Pipe family | Workflow Type (registered: wf_pipe_run, wf_pipe_router) | Two is enough. Filter by pipe via search attributes instead of registering hundreds of types. |
| Generation method | Activity Type (registered: act_llm_gen_text, …) | Already good — bug was at the ID layer, not the type layer. |
| One call to a generator method | Activity ID (SDK-default integer) | Stop customizing. Let Temporal assign "1", "2", … per history position. Deterministic by construction. |
pipe_code | Search attribute PipeCode + leading segment of static_summary | Filterable column + readable column. |
domain_code / user_id / session_id | Search attributes DomainCode / UserId / SessionId | Filterable. Five total: PipeCode, PipelineRunId, SessionId, UserId, DomainCode. |
| Pipe description | Tail of static_summary | e.g. translate_doc — Translate EN→FR. 200-byte Markdown. |
| Per-call context (model handle, class name…) | Per-activity summary= | This is the right place — not activity_id. |
env_prefix is one of ut- (unit test), ci-, cc- (codex cloud), cct-, or empty for NORMAL.
pipeline_run_id is Pipelex's existing UUID from JobMetadata.pipeline_run_id — set by PipelineFactory.make_pipeline_run_id.
Examples:
ut-3f9c8b2a-1e4d-4f5b-9c7a-2d8e1f0a6b3c # unit test
3f9c8b2a-1e4d-4f5b-9c7a-2d8e1f0a6b3c # production
Separator is / (not -): UUIDs already contain -, and a fresh separator keeps the path structure unambiguous.
// Top-level ut-3f9c…b3c // Fixed-role child ut-3f9c…b3c/pipe-router // Dynamic sub-pipe child ut-3f9c…b3c/pipe-router/translate_doc-7c1e2f8a // Nested router ut-3f9c…b3c/pipe-router/translate_doc-7c1e2f8a/extract_text-4d9b1c83
Two suffix conventions:
• Fixed-role child (1:1, known role): pipe-router — used by wf_pipe_run spawning its sole wf_pipe_router child.
• Dynamic sub-pipe child: {pipe_code}-{8-hex-chars} — used inside the router when dispatching a sub-pipe. The 8 hex chars come from workflow.uuid4(), which is replay-safe (Temporal's workflow.uuid4 is deterministic; stdlib uuid.uuid4 is forbidden inside workflows).
// pipelex/temporal/temporal_manager.py — make_top_workflow_id - def make_top_workflow_id(self, base_id: str) -> str: - # Truncated session + random shortuuid + caller class name - return f"{prefix}{self.session_id[:5]}-{random_short_uuid()}-{base_id}" + def make_top_workflow_id(self, pipeline_run_id: str) -> str: + prefix: str + match runtime_manager.run_mode: + case RunMode.UNIT_TEST: prefix = "ut-" + case RunMode.NORMAL: prefix = "" + case RunMode.CI_TEST: prefix = "ci-" + case RunMode.CODEX_CLOUD: prefix = "cc-" + case RunMode.CODEX_CLOUD_TEST: prefix = "cct-" + return f"{prefix}{pipeline_run_id}"
// pipelex/temporal/tprl_pipe/temporal_pipe_router.py — child branch if is_in_temporal_workflow(): parent_workflow_id = workflow.info().workflow_id + # Replay-safe: workflow.uuid4() is deterministic by design. + child_workflow_id = f"{parent_workflow_id}/{pipe_job.pipe.code}-{str(workflow.uuid4())[:8]}" + pipe_output = await workflow.execute_child_workflow( + WfPipeRouter.run, + arg=pipe_job, + id=child_workflow_id, + search_attributes=build_search_attributes(pipe_job), + static_summary=build_static_summary(pipe_job.pipe), + static_details=build_static_details(pipe_job), + )
{env_prefix}{pipeline_run_id}, callers that pass a stable pipeline_run_id to PipelineFactory.make_pipeline and re-execute now land on the same Temporal Workflow Execution Chain (with a fresh run_id, under SDK-default ALLOW_DUPLICATE). The old behavior produced a fresh workflow_id per execution by accident, via the random short-uuid. Documented behavior now, not a bug.
Pipelex no longer passes activity_id= anywhere. The Python SDK auto-assigns sequential integers ("1", "2", …) per workflow run. They are:
(workflow_id, run_id) by construction.Per-call meaning lives in summary=. Every make_* method in ContentGeneratorInWorkflow looks like this:
async def make_llm_text(self, job_metadata, llm_setting_main, llm_prompt_for_text) -> str: dispatch_kwargs = worker_config.resolve_dispatch( activity_name=act_llm_gen_text.__name__, routing_key=llm_assignment.llm_handle, queue_options_by_queue=get_config().temporal.queue_options, is_traced=get_config().temporal.temporal_config.temporal_log_config.is_dispatch_resolution_traced, ).to_execute_kwargs() return await workflow.execute_activity( act_llm_gen_text, arg=llm_assignment, summary=build_activity_summary("LLM text", job_metadata, extras={"model": llm_assignment.llm_handle}), **dispatch_kwargs, # task_queue + timeouts + retry, see queues-and-options.html )
Pipe authors never think about Temporal IDs: identity flows entirely through JobMetadata.pipeline_run_id, the SDK assigns the activity ID, the per-call summary carries the call-specific meaning.
We weren't using any of Temporal's display primitives. Now we use all the relevant ones, in three layers.
Set once at workflow start. 200-byte single-line Markdown.
{pipe_code} — {description} // e.g. "translate_doc — Translate a document from English to French"
Set once at workflow start. 20 KB Markdown table.
| Field | Value |
|---------------|------------------------------------------------|
| Pipe | `translate_doc` |
| Domain | `documents` |
| Pipeline run | `3f9c8b2a-1e4d-4f5b-9c7a-2d8e1f0a6b3c` |
| User | `acme-corp` |
| Session | `EdgdJ7Yk4Q3HF2pXyZv9w8` |
| Library crate | `documents@2.1.4` |
| Input | `source_text`, `target_language` |
Set per workflow.execute_activity(...) call. 200 bytes. Where the call-specific meaning lives.
| Method | Summary format |
|---|---|
make_llm_text | LLM text · pipe={code} · model={handle} |
make_object | LLM object · pipe={code} · class={class_name} |
make_object_list | LLM object list · pipe={code} · class={class_name} |
make_single_image | Img gen 1× · pipe={code} · model={handle} |
make_image_list | Img gen N× · pipe={code} · model={handle} · n={count} |
make_templated_text | Templated text · pipe={code} |
make_extract_pages | Extract pages · pipe={code} · handle={extract_handle} |
Formatting is centralized in one place — pipelex/temporal/tprl/observability.py — so call sites stay thin:
build_search_attributes(pipe_job) # 5 typed keys → TypedSearchAttributes build_static_summary(pipe) # "{code} — {description}", 200B clamp build_static_details(pipe_job) # markdown table, 20KB clamp build_activity_summary(label, metadata, # "{label} · pipe={code} · {extras…}" extras={...})
Five custom Keyword attributes, registered once per namespace, set on every workflow start:
| Attribute | Type | Source |
|---|---|---|
PipeCode | Keyword | pipe_job.pipe.code |
PipelineRunId | Keyword | pipe_job.job_metadata.pipeline_run_id |
SessionId | Keyword | stamped at submitter boundary from TemporalManager.session_id |
UserId | Keyword | pipe_job.job_metadata.user_id |
DomainCode | Keyword | pipe_job.pipe.domain_code |
A real Temporal cluster rejects every StartWorkflowExecution RPC that references an unregistered attribute. We used to soft-warn at worker boot; now we hard-fail with the exact registration command embedded in the exception:
$ pipelex worker
SearchAttributeRegistrationError: namespace "default" is missing required attributes:
PipeCode, PipelineRunId, SessionId, UserId, DomainCode
Register via Pipelex CLI:
pipelex setup-temporal-namespace
Or via the Temporal CLI:
temporal operator search-attribute create \
--namespace default \
--name PipeCode --type Keyword \
--name PipelineRunId --type Keyword \
--name SessionId --type Keyword \
--name UserId --type Keyword \
--name DomainCode --type Keyword
# pipelex.toml — turn off custom attributes entirely
[temporal.search_attributes]
enabled = false
attributes = ["PipeCode", "PipelineRunId", "SessionId", "UserId", "DomainCode"]
enabled = false skips both the worker-boot check and the per-workflow attribute attachment (use when the namespace is owned by a third party who will not register)."PipelineRunID" fail at config load instead of silently producing no attribute.pipelex setup-temporal-namespace CLI wraps OperatorService.AddSearchAttributes so operators don't need a separate Temporal CLI install. --dry-run prints the equivalent raw command for handing off to a namespace admin.The load-bearing fact is that Temporal's SDK assigns activity IDs by history position — so replays produce the same IDs without Pipelex maintaining any worker-side state.
| Choice | Why it is replay-safe |
|---|---|
| Top-level Workflow ID set on the submitter | Set outside workflow code; no replay concern. |
SDK-default activity_id | Server-assigned by history position. |
Child Workflow ID disambiguator uses workflow.uuid4() | Temporal's workflow.uuid4() is deterministic by design (stdlib uuid.uuid4() is forbidden in workflow code). |
Child Workflow ID uses pipe_job.pipe.code | Read from workflow input, not from outside state. |
| Search attribute values | Derived from pipe_job + JobMetadata (workflow input). |
| Static summary / details | Set at submit time, outside workflow code. |
| Per-activity summary | Pure function of job_metadata + activity input — no I/O, time, or randomness. |
SessionId stamped at the submitter boundary | Threaded through every child workflow via JobMetadata.session_id so child-workflow starts stay byte-equal across replays even after a worker restart. |
| Workflow ID | Type | PipeCode |
|---|---|---|
EdgdJ-HR5fd-TemporalPipeRun-pipe-router | wf_pipe_run | (unset) |
EdgdJ-HR5fd-TemporalPipeRun-pipe-router (child) | wf_pipe_router | (unset) |
| Workflow ID | Type | PipeCode | Summary |
|---|---|---|---|
ut-3f9c…b3c | wf_pipe_run | translate_doc | translate_doc — Translate EN→FR |
ut-3f9c…b3c/pipe-router | wf_pipe_router | translate_doc | translate_doc — Translate EN→FR |
ut-3f9c…b3c/pipe-router/extract_text-4d9b1c83 | wf_pipe_router | extract_text | extract_text — Extract paragraphs |
// BEFORE ActivityTaskScheduled: id=craft-text, type=act_llm_gen_text ActivityTaskScheduled: id=craft-text, type=act_llm_gen_text ← collision, workflow crashes // AFTER ActivityTaskScheduled: id=1, type=act_llm_gen_text Summary: LLM text · pipe=translate_doc · model=gpt-4o ActivityTaskScheduled: id=2, type=act_llm_gen_text Summary: LLM text · pipe=translate_doc · model=gpt-4o ActivityTaskScheduled: id=3, type=act_llm_gen_object Summary: LLM object · pipe=translate_doc · class=Section
wip/temporal-primitives/01-id-and-naming-design.mdwip/temporal-primitives/02-id-and-naming-plan.mdwip/temporal-primitives/00-temporal-id-primitives.mdpipelex/temporal/tprl/observability.pypipelex/temporal/tprl/namespace_check.py, pipelex/cli/commands/setup_temporal_namespace_cmd.py