The companion to id-and-naming.html. Where the ID work was about identity, this work is about knobs: which task queue an activity dispatches to, what timeouts and retry policy apply, how the worker process itself is tuned. Three independent config layers, each addressing a real operational pain.
Send each activity (and optionally each handle within an activity) to a dedicated task queue. Isolate GPU-bound image-gen runners from cheap LLM workers, route a specific OCR backend to a dedicated queue, scale OpenAI vs Anthropic pools independently.
Set start_to_close_timeout, retry policy, heartbeat, and rate caps per task queue. Override for a single handle when one model variant on a backend needs different tuning. The workflow's own deadline is no longer spent by a single in-flight activity.
Named worker-runtime profiles bundle the Worker(...) tuning (slots, pollers, heartbeat throttles, graceful shutdown). One worker process picks one profile via pipelex worker --profile <name>, plus a --scope selecting which activities it registers.
activity_queues
queue_optionshandle_options
worker_runtime_profiles
Layer A — routing. Per activity, per handle, which task queue do we dispatch to? Submitter-side.
Layer B — options. For that resolved queue (and optionally that specific handle), what timeouts and retry policy apply? Submitter-side.
Layer C — worker. Inside one worker process, how do we tune slots/pollers/heartbeats? Worker-side, selected via --profile.
The activity_queues table on WorkerConfig is the entry point. Each entry declares a default queue for one activity and an optional by_handle map keyed by runtime handle (LLM model handle, image-gen handle, extract handle).
# pipelex.toml
[temporal.worker_config]
default_task_queue = "temporal_task_queue"
default_activity_start_to_close_timeout = "0:10:00"
# Route LLM text by model family. Anthropic runs on a dedicated pool;
# everything else falls back to the activity default.
[temporal.worker_config.activity_queues.act_llm_gen_text]
default = "llm-default-queue"
by_handle = {
"claude-4.6-sonnet" = "anthropic-queue",
"claude-4.6-opus" = "anthropic-queue",
"gpt-5.1" = "openai-queue",
}
# Image generation on a GPU pool.
[temporal.worker_config.activity_queues.act_img_gen_images]
default = "img-gen-gpu-queue"
# Extract: route Azure Doc Intel separately from Mistral OCR.
[temporal.worker_config.activity_queues.act_extract_gen_extract_pages]
default = "extract-default-queue"
by_handle = { "azure-doc-intel" = "extract-azure-queue" }
WorkerConfig.resolve_queue(activity_name, routing_key) -> str | None 1. activity_queues[activity_name].by_handle[routing_key] # most specific 2. activity_queues[activity_name].default # activity-level default 3. self.default_task_queue # worker-wide default
activity_queues is fully empty (default config, no routing configured), resolve_queue returns None — and the dispatch path then omits the task_queue kwarg so Temporal routes the activity to the workflow's own queue. This preserves the pre-v1 single-queue default and the with_conditional_worker test pattern. As soon as any activity_queues entry exists, the operator has opted into routing topology and unmapped activities fall back explicitly to default_task_queue.
queue_options declares the submitter options (timeouts, retry policy, queue-level rate cap) for one task queue. handle_options (inside an activity_queues entry) declares the rare per-handle override on top of the queue's options.
# pipelex.toml — queue-level submitter options
[temporal.queue_options.temporal_task_queue]
start_to_close_timeout = "0:10:00"
heartbeat_timeout = "0:01:00"
max_task_queue_activities_per_second = 1000
[temporal.queue_options.anthropic-queue]
start_to_close_timeout = "0:05:00"
heartbeat_timeout = "0:01:00"
max_task_queue_activities_per_second = 50
[temporal.queue_options.anthropic-queue.retry_policy_config]
initial_interval = "0:00:01"
backoff_coefficient = 2.0
maximum_interval = "0:01:00"
maximum_attempts = 5
# IMPORTANT: overlay uses ``_extra`` (additive), not the baseline field name.
non_retryable_error_types_extra = ["AnthropicAuthError"]
[temporal.queue_options.img-gen-gpu-queue]
start_to_close_timeout = "0:30:00" # GPU calls take longer
heartbeat_timeout = "0:02:00"
max_task_queue_activities_per_second = 10
# Rare per-handle override for ONE long-context model on the anthropic queue
[temporal.worker_config.activity_queues.act_llm_gen_text.handle_options."claude-4.6-opus"]
start_to_close_timeout = "0:15:00"
worker_config.default_activity_start_to_close_timeoutworker_config.default_activity_heartbeat_timeoutworker_config.retry_policy_config
task_queue · start_to_close_timeout · schedule_to_close_timeout · schedule_to_start_timeout · heartbeat_timeout · retry_policy
Scalars (timeouts, attempts, intervals) are last-wins. non_retryable_error_types is additive — every layer can only add to the no-retry list, never remove from it. The schema enforces this asymmetry by splitting the retry-policy model in two:
| Layer | Class | Field |
|---|---|---|
| Baseline only | RetryPolicyConfig | non_retryable_error_types: list[str] — the main list |
| Queue / handle overlay | RetryPolicyConfigOverlay | non_retryable_error_types_extra: list[str] — additive |
Pydantic's extra="forbid" rejects the wrong field on each layer, so a config like queue_options.foo.retry_policy_config.non_retryable_error_types = [...] fails at load time with a clear validation error — you can't accidentally bypass the additive rule.
WorkerConfig.resolve_dispatch(activity_name, routing_key, queue_options_by_queue, is_traced) -> DispatchOptions @dataclass class DispatchOptions: task_queue: str | None # None ⇒ omit kwarg, ride workflow's queue start_to_close_timeout: timedelta retry_policy: RetryPolicy schedule_to_close_timeout: timedelta | None = None schedule_to_start_timeout: timedelta | None = None heartbeat_timeout: timedelta | None = None def to_execute_kwargs(self) -> dict[str, Any]: # Splat into workflow.execute_activity(...) — omits keys whose value is None # so the Temporal SDK's own defaults kick in rather than receiving None. ...
activity_queues is empty (no routing topology) but queue_options[default_task_queue] exists, the resolver still applies that queue's overlay. This lets single-queue deployments tune timeouts/retry/rate without opting into the routing model. Dispatch returns task_queue=None (activity rides the workflow's own queue) but the queue's options still flow through.
Every workflow.execute_activity(...) in ContentGeneratorInWorkflow splats the resolver output. One uniform pattern across LLM, image-gen, jinja2, and extract calls:
async def make_llm_text(self, job_metadata, llm_setting_main, llm_prompt_for_text): worker_config = get_config().temporal.worker_config dispatch_kwargs = worker_config.resolve_dispatch( activity_name=act_llm_gen_text.__name__, routing_key=llm_assignment.llm_handle, # for by_handle / handle_options queue_options_by_queue=get_config().temporal.queue_options, is_traced=get_config().temporal.temporal_config.temporal_log_config.is_dispatch_resolution_traced, ).to_execute_kwargs() return await workflow.execute_activity( act_llm_gen_text, arg=llm_assignment, summary=build_activity_summary("LLM text", job_metadata, extras={"model": llm_assignment.llm_handle}), **dispatch_kwargs, # task_queue + start_to_close + schedule_to_close + heartbeat + retry_policy )
A single worker process selects one named profile via pipelex worker --profile <name>. The profile bundles every Worker(...) constructor knob.
[temporal.worker_runtime_profiles]
default_profile = "balanced"
[temporal.worker_runtime_profiles.profiles.balanced]
tuning_mode = "explicit"
max_cached_workflows = 1000
max_concurrent_workflow_tasks = 100
max_concurrent_activities = 100
max_concurrent_local_activities = 100
max_concurrent_workflow_task_polls = 5
max_concurrent_activity_task_polls = 5
max_activities_per_second = 1000 # worker-local rate cap
sticky_queue_schedule_to_start_timeout = "0:00:10"
max_heartbeat_throttle_interval = "0:01:00"
default_heartbeat_throttle_interval = "0:00:30"
graceful_shutdown_timeout = "0:00:30"
# Router profile — workflow-only, no activities at all.
[temporal.worker_runtime_profiles.profiles.router]
tuning_mode = "explicit"
max_cached_workflows = 2000
max_concurrent_workflow_tasks = 200
max_concurrent_activities = 0 # 👈 ge=0 allows workflow-only workers
max_concurrent_local_activities = 0
max_concurrent_workflow_task_polls = 10
max_concurrent_activity_task_polls = 0
max_activities_per_second = 1
sticky_queue_schedule_to_start_timeout = "0:00:10"
max_heartbeat_throttle_interval = "0:01:00"
default_heartbeat_throttle_interval = "0:00:30"
graceful_shutdown_timeout = "0:00:30"
# GPU profile — small slot count, long heartbeats
[temporal.worker_runtime_profiles.profiles.gpu]
tuning_mode = "explicit"
max_cached_workflows = 100
max_concurrent_workflow_tasks = 10
max_concurrent_activities = 4 # GPU memory-bound
max_concurrent_local_activities = 0
max_concurrent_workflow_task_polls = 2
max_concurrent_activity_task_polls = 2
max_activities_per_second = 10
sticky_queue_schedule_to_start_timeout = "0:00:30"
max_heartbeat_throttle_interval = "0:05:00"
default_heartbeat_throttle_interval = "0:01:00"
graceful_shutdown_timeout = "0:01:00"
Boot a specialized worker:
$ pipelex worker --task-queue img-gen-gpu-queue --profile gpu --scope runner-img-gen $ pipelex worker --task-queue temporal_task_queue --profile router --scope router $ pipelex worker --task-queue anthropic-queue --profile balanced --scope runner-llm
Scopes (specialized worker scopes from the same diff): runner-llm, runner-img-gen, runner-extract, runner-jinja2. Each scope registers only its activities. act_render_page_views is registered under both runner-img-gen and runner-extract (belt-and-suspenders for the two paths that need it).
pipelex worker --task-queue X now fast-fails if X isn't declared anywhere — default_task_queue, any activity_queues entry, or any queue_options key. With a Levenshtein "did you mean?" suggestion.
$ pipelex worker --task-queue antrhopic-queue
WorkerTaskQueueUnknownError: task queue 'antrhopic-queue' is not declared in config.
Known queues: ['anthropic-queue', 'img-gen-gpu-queue', 'openai-queue',
'temporal_task_queue', 'extract-default-queue', 'extract-azure-queue']
Did you mean: 'anthropic-queue'?
This is the strict counterpart to the lenient WARN at config-load on activity_queues entries that name unknown queues. Routing config can be sloppy in one direction (warn); the worker boot picks one queue and that one must exist (fail).
Flip a config flag to see exactly which layer each scalar came from on every activity call:
[temporal.temporal_config.temporal_log_config]
is_dispatch_resolution_traced = true
INFO dispatch act_llm_gen_text routing_key=claude-4.6-opus
task_queue=anthropic-queue (activity_queues.by_handle)
start_to_close_timeout=0:15:00 (handle_options)
heartbeat_timeout=0:01:00 (queue_options)
retry.maximum_attempts=5 (queue_options)
retry.non_retryable=['AnthropicAuthError','OAIBillingError'] (baseline + queue_extra)
Off by default — verbose; turn it on when debugging mis-tuned timeouts or a queue that shouldn't be receiving traffic.
Two distinct budgets, set in distinct places:
workflow_execution_timeout on WorkerConfig bounds the workflow as a whole.default_activity_start_to_close_timeout on WorkerConfig (ships at 10 min) is the baseline budget on each activity call.queue_options[<queue>].start_to_close_timeout overrides the activity budget per queue.handle_options[<handle>].start_to_close_timeout overrides it for one specific handle when the queue baseline doesn't fit.Each kind of activity ends up with a timeout shaped to its real cost — a jinja2 render fails fast, a slow PDF extract gets the headroom it actually needs, and the workflow's own deadline isn't spent by a single in-flight activity.
| Knob | Where | Semantics |
|---|---|---|
max_task_queue_activities_per_second | [temporal.queue_options.<queue>] | Cluster-wide cap on this queue. Conveyed to the Temporal server by every worker on the queue. Latest value wins. |
max_activities_per_second | [temporal.worker_runtime_profiles.profiles.<name>] | Worker-local cap on this process. Caps what one worker pulls regardless of cluster headroom. |
The shipping default [temporal.queue_options.temporal_task_queue] sets max_task_queue_activities_per_second = 1000. Deployments using non-default queue names should add their own [temporal.queue_options.<queue>] entries with the cap appropriate for that backend pool.
Say someone calls a pipe whose router needs an Anthropic Opus completion. Here's the trace from submit to the wire:
content_generator.make_llm_text(...)"claude-4.6-opus"worker_config.resolve_dispatch("act_llm_gen_text", routing_key="claude-4.6-opus", queue_options_by_queue=...)resolve_queue → activity_queues["act_llm_gen_text"].by_handle["claude-4.6-opus"] = "anthropic-queue"queue_options["anthropic-queue"] → start_to_close=5m, heartbeat=1m, retry: 5 attempts, +AnthropicAuthErroractivity_queues["act_llm_gen_text"].handle_options["claude-4.6-opus"] → start_to_close=15m (longer context)DispatchOptions(task_queue="anthropic-queue", start_to_close_timeout=15m, heartbeat_timeout=1m, retry_policy=<5 attempts, no-retry: [...]>)workflow.execute_activity(act_llm_gen_text, **dispatch_kwargs, summary="LLM text · pipe=translate_doc · model=claude-4.6-opus")anthropic-queue with the balanced profile.pipelex/temporal/config_temporal.py — WorkerConfig, QueueOptions, HandleOptions, ActivityRouteConfig, WorkerRuntimeProfile, DispatchOptionsWorkerConfig.resolve_queue and WorkerConfig.resolve_dispatch in the same filepipelex/temporal/tprl_content_generation/content_generator_in_workflow.py — every make_* methodpipelex/pipelex.toml under [temporal.worker_config], [temporal.queue_options.*], [temporal.worker_runtime_profiles.*].pipelex/pipelex.toml ships the same blocks commented out