Temporal primitives · session note

Temporal queues & options — per-activity routing, per-queue tuning, worker-runtime profiles

The companion to id-and-naming.html. Where the ID work was about identity, this work is about knobs: which task queue an activity dispatches to, what timeouts and retry policy apply, how the worker process itself is tuned. Three independent config layers, each addressing a real operational pain.

1. The three things this layer lets you do

CAPABILITY #1

Route by activity and by handle

Send each activity (and optionally each handle within an activity) to a dedicated task queue. Isolate GPU-bound image-gen runners from cheap LLM workers, route a specific OCR backend to a dedicated queue, scale OpenAI vs Anthropic pools independently.

CAPABILITY #2

Tune dispatch per queue and per handle

Set start_to_close_timeout, retry policy, heartbeat, and rate caps per task queue. Override for a single handle when one model variant on a backend needs different tuning. The workflow's own deadline is no longer spent by a single in-flight activity.

CAPABILITY #3

Specialize worker processes

Named worker-runtime profiles bundle the Worker(...) tuning (slots, pollers, heartbeat throttles, graceful shutdown). One worker process picks one profile via pipelex worker --profile <name>, plus a --scope selecting which activities it registers.

2. The shape of the solution — three orthogonal layers

Layer A
where it goes
activity_queues
+
Layer B
how it's dispatched
queue_options
handle_options
+
Layer C
how the worker behaves
worker_runtime_profiles

Layer A — routing. Per activity, per handle, which task queue do we dispatch to? Submitter-side.
Layer B — options. For that resolved queue (and optionally that specific handle), what timeouts and retry policy apply? Submitter-side.
Layer C — worker. Inside one worker process, how do we tune slots/pollers/heartbeats? Worker-side, selected via --profile.

3. Layer A — per-activity, per-handle routing

The activity_queues table on WorkerConfig is the entry point. Each entry declares a default queue for one activity and an optional by_handle map keyed by runtime handle (LLM model handle, image-gen handle, extract handle).

# pipelex.toml
[temporal.worker_config]
default_task_queue = "temporal_task_queue"
default_activity_start_to_close_timeout = "0:10:00"

# Route LLM text by model family. Anthropic runs on a dedicated pool;
# everything else falls back to the activity default.
[temporal.worker_config.activity_queues.act_llm_gen_text]
default = "llm-default-queue"
by_handle = {
  "claude-4.6-sonnet" = "anthropic-queue",
  "claude-4.6-opus"   = "anthropic-queue",
  "gpt-5.1"           = "openai-queue",
}

# Image generation on a GPU pool.
[temporal.worker_config.activity_queues.act_img_gen_images]
default = "img-gen-gpu-queue"

# Extract: route Azure Doc Intel separately from Mistral OCR.
[temporal.worker_config.activity_queues.act_extract_gen_extract_pages]
default = "extract-default-queue"
by_handle = { "azure-doc-intel" = "extract-azure-queue" }

The resolver — three steps, last-wins

WorkerConfig.resolve_queue(activity_name, routing_key) -> str | None

  1. activity_queues[activity_name].by_handle[routing_key]   # most specific
  2. activity_queues[activity_name].default                  # activity-level default
  3. self.default_task_queue                                 # worker-wide default
Hybrid fallback semantic. When activity_queues is fully empty (default config, no routing configured), resolve_queue returns None — and the dispatch path then omits the task_queue kwarg so Temporal routes the activity to the workflow's own queue. This preserves the pre-v1 single-queue default and the with_conditional_worker test pattern. As soon as any activity_queues entry exists, the operator has opted into routing topology and unmapped activities fall back explicitly to default_task_queue.

4. Layer B — per-queue and per-handle options

queue_options declares the submitter options (timeouts, retry policy, queue-level rate cap) for one task queue. handle_options (inside an activity_queues entry) declares the rare per-handle override on top of the queue's options.

# pipelex.toml — queue-level submitter options
[temporal.queue_options.temporal_task_queue]
start_to_close_timeout = "0:10:00"
heartbeat_timeout      = "0:01:00"
max_task_queue_activities_per_second = 1000

[temporal.queue_options.anthropic-queue]
start_to_close_timeout = "0:05:00"
heartbeat_timeout      = "0:01:00"
max_task_queue_activities_per_second = 50
[temporal.queue_options.anthropic-queue.retry_policy_config]
initial_interval     = "0:00:01"
backoff_coefficient  = 2.0
maximum_interval     = "0:01:00"
maximum_attempts     = 5
# IMPORTANT: overlay uses ``_extra`` (additive), not the baseline field name.
non_retryable_error_types_extra = ["AnthropicAuthError"]

[temporal.queue_options.img-gen-gpu-queue]
start_to_close_timeout = "0:30:00"     # GPU calls take longer
heartbeat_timeout      = "0:02:00"
max_task_queue_activities_per_second = 10

# Rare per-handle override for ONE long-context model on the anthropic queue
[temporal.worker_config.activity_queues.act_llm_gen_text.handle_options."claude-4.6-opus"]
start_to_close_timeout = "0:15:00"

Resolution — three layers, last-wins for scalars

1
baseline
worker_config.default_activity_start_to_close_timeout
worker_config.default_activity_heartbeat_timeout
worker_config.retry_policy_config
↓ overlay
2
queue_options[resolved_queue]
Per-queue timeouts, retry overlay, queue-level rate cap.
↓ overlay
3
activity_queues[activity].handle_options[handle]
Per-handle timeout / retry override for a single model variant.
↓ result
DispatchOptions
task_queue · start_to_close_timeout · schedule_to_close_timeout · schedule_to_start_timeout · heartbeat_timeout · retry_policy

The additive rule for non_retryable_error_types

Scalars (timeouts, attempts, intervals) are last-wins. non_retryable_error_types is additive — every layer can only add to the no-retry list, never remove from it. The schema enforces this asymmetry by splitting the retry-policy model in two:

LayerClassField
Baseline onlyRetryPolicyConfignon_retryable_error_types: list[str] — the main list
Queue / handle overlayRetryPolicyConfigOverlaynon_retryable_error_types_extra: list[str] — additive

Pydantic's extra="forbid" rejects the wrong field on each layer, so a config like queue_options.foo.retry_policy_config.non_retryable_error_types = [...] fails at load time with a clear validation error — you can't accidentally bypass the additive rule.

The resolver in code

WorkerConfig.resolve_dispatch(activity_name, routing_key, queue_options_by_queue, is_traced) -> DispatchOptions

@dataclass
class DispatchOptions:
    task_queue: str | None                         # None ⇒ omit kwarg, ride workflow's queue
    start_to_close_timeout: timedelta
    retry_policy: RetryPolicy
    schedule_to_close_timeout: timedelta | None = None
    schedule_to_start_timeout: timedelta | None = None
    heartbeat_timeout: timedelta | None = None

    def to_execute_kwargs(self) -> dict[str, Any]:
        # Splat into workflow.execute_activity(...) — omits keys whose value is None
        # so the Temporal SDK's own defaults kick in rather than receiving None.
        ...
queue_options without activity_queues — the asymmetry. If activity_queues is empty (no routing topology) but queue_options[default_task_queue] exists, the resolver still applies that queue's overlay. This lets single-queue deployments tune timeouts/retry/rate without opting into the routing model. Dispatch returns task_queue=None (activity rides the workflow's own queue) but the queue's options still flow through.

5. The call-site shape

Every workflow.execute_activity(...) in ContentGeneratorInWorkflow splats the resolver output. One uniform pattern across LLM, image-gen, jinja2, and extract calls:

async def make_llm_text(self, job_metadata, llm_setting_main, llm_prompt_for_text):
    worker_config = get_config().temporal.worker_config
    dispatch_kwargs = worker_config.resolve_dispatch(
        activity_name=act_llm_gen_text.__name__,
        routing_key=llm_assignment.llm_handle,                             # for by_handle / handle_options
        queue_options_by_queue=get_config().temporal.queue_options,
        is_traced=get_config().temporal.temporal_config.temporal_log_config.is_dispatch_resolution_traced,
    ).to_execute_kwargs()
    return await workflow.execute_activity(
        act_llm_gen_text,
        arg=llm_assignment,
        summary=build_activity_summary("LLM text", job_metadata,
                                       extras={"model": llm_assignment.llm_handle}),
        **dispatch_kwargs,    # task_queue + start_to_close + schedule_to_close + heartbeat + retry_policy
    )

6. Layer C — worker-runtime profiles

A single worker process selects one named profile via pipelex worker --profile <name>. The profile bundles every Worker(...) constructor knob.

[temporal.worker_runtime_profiles]
default_profile = "balanced"

[temporal.worker_runtime_profiles.profiles.balanced]
tuning_mode = "explicit"
max_cached_workflows               = 1000
max_concurrent_workflow_tasks      = 100
max_concurrent_activities          = 100
max_concurrent_local_activities    = 100
max_concurrent_workflow_task_polls = 5
max_concurrent_activity_task_polls = 5
max_activities_per_second          = 1000          # worker-local rate cap
sticky_queue_schedule_to_start_timeout = "0:00:10"
max_heartbeat_throttle_interval        = "0:01:00"
default_heartbeat_throttle_interval    = "0:00:30"
graceful_shutdown_timeout              = "0:00:30"

# Router profile — workflow-only, no activities at all.
[temporal.worker_runtime_profiles.profiles.router]
tuning_mode = "explicit"
max_cached_workflows               = 2000
max_concurrent_workflow_tasks      = 200
max_concurrent_activities          = 0             # 👈 ge=0 allows workflow-only workers
max_concurrent_local_activities    = 0
max_concurrent_workflow_task_polls = 10
max_concurrent_activity_task_polls = 0
max_activities_per_second          = 1
sticky_queue_schedule_to_start_timeout = "0:00:10"
max_heartbeat_throttle_interval        = "0:01:00"
default_heartbeat_throttle_interval    = "0:00:30"
graceful_shutdown_timeout              = "0:00:30"

# GPU profile — small slot count, long heartbeats
[temporal.worker_runtime_profiles.profiles.gpu]
tuning_mode = "explicit"
max_cached_workflows               = 100
max_concurrent_workflow_tasks      = 10
max_concurrent_activities          = 4             # GPU memory-bound
max_concurrent_local_activities    = 0
max_concurrent_workflow_task_polls = 2
max_concurrent_activity_task_polls = 2
max_activities_per_second          = 10
sticky_queue_schedule_to_start_timeout = "0:00:30"
max_heartbeat_throttle_interval        = "0:05:00"
default_heartbeat_throttle_interval    = "0:01:00"
graceful_shutdown_timeout              = "0:01:00"

Boot a specialized worker:

$ pipelex worker --task-queue img-gen-gpu-queue --profile gpu --scope runner-img-gen
$ pipelex worker --task-queue temporal_task_queue   --profile router  --scope router
$ pipelex worker --task-queue anthropic-queue       --profile balanced --scope runner-llm

Scopes (specialized worker scopes from the same diff): runner-llm, runner-img-gen, runner-extract, runner-jinja2. Each scope registers only its activities. act_render_page_views is registered under both runner-img-gen and runner-extract (belt-and-suspenders for the two paths that need it).

7. Strict CLI validation

pipelex worker --task-queue X now fast-fails if X isn't declared anywhere — default_task_queue, any activity_queues entry, or any queue_options key. With a Levenshtein "did you mean?" suggestion.

$ pipelex worker --task-queue antrhopic-queue
WorkerTaskQueueUnknownError: task queue 'antrhopic-queue' is not declared in config.
  Known queues: ['anthropic-queue', 'img-gen-gpu-queue', 'openai-queue',
                 'temporal_task_queue', 'extract-default-queue', 'extract-azure-queue']
  Did you mean: 'anthropic-queue'?

This is the strict counterpart to the lenient WARN at config-load on activity_queues entries that name unknown queues. Routing config can be sloppy in one direction (warn); the worker boot picks one queue and that one must exist (fail).

8. Dispatch resolution tracing

Flip a config flag to see exactly which layer each scalar came from on every activity call:

[temporal.temporal_config.temporal_log_config]
is_dispatch_resolution_traced = true
INFO  dispatch  act_llm_gen_text  routing_key=claude-4.6-opus
        task_queue=anthropic-queue            (activity_queues.by_handle)
        start_to_close_timeout=0:15:00        (handle_options)
        heartbeat_timeout=0:01:00             (queue_options)
        retry.maximum_attempts=5              (queue_options)
        retry.non_retryable=['AnthropicAuthError','OAIBillingError']  (baseline + queue_extra)

Off by default — verbose; turn it on when debugging mis-tuned timeouts or a queue that shouldn't be receiving traffic.

9. Workflow timeout vs activity timeout — separated

Two distinct budgets, set in distinct places:

Each kind of activity ends up with a timeout shaped to its real cost — a jinja2 render fails fast, a slow PDF extract gets the headroom it actually needs, and the workflow's own deadline isn't spent by a single in-flight activity.

10. Rate caps live in two distinct places

KnobWhereSemantics
max_task_queue_activities_per_second[temporal.queue_options.<queue>]Cluster-wide cap on this queue. Conveyed to the Temporal server by every worker on the queue. Latest value wins.
max_activities_per_second[temporal.worker_runtime_profiles.profiles.<name>]Worker-local cap on this process. Caps what one worker pulls regardless of cluster headroom.

The shipping default [temporal.queue_options.temporal_task_queue] sets max_task_queue_activities_per_second = 1000. Deployments using non-default queue names should add their own [temporal.queue_options.<queue>] entries with the cap appropriate for that backend pool.

11. The whole picture, in one example

Say someone calls a pipe whose router needs an Anthropic Opus completion. Here's the trace from submit to the wire:

  1. // inside WfPipeRouter: content_generator.make_llm_text(...)
  2. // llm_assignment.llm_handle resolves to "claude-4.6-opus"
  3. worker_config.resolve_dispatch("act_llm_gen_text", routing_key="claude-4.6-opus", queue_options_by_queue=...)
  4.   // step 1: resolve_queueactivity_queues["act_llm_gen_text"].by_handle["claude-4.6-opus"] = "anthropic-queue"
  5.   // step 2: overlay queue_options["anthropic-queue"]start_to_close=5m, heartbeat=1m, retry: 5 attempts, +AnthropicAuthError
  6.   // step 3: overlay activity_queues["act_llm_gen_text"].handle_options["claude-4.6-opus"]start_to_close=15m (longer context)
  7.   // step 4: non_retryable composed additively: baseline + queue._extra + handle._extra
  8. DispatchOptions(task_queue="anthropic-queue", start_to_close_timeout=15m, heartbeat_timeout=1m, retry_policy=<5 attempts, no-retry: [...]>)
  9. workflow.execute_activity(act_llm_gen_text, **dispatch_kwargs, summary="LLM text · pipe=translate_doc · model=claude-4.6-opus")
  10. → Temporal server routes the task to a worker polling anthropic-queue with the balanced profile.

12. Where to look