Multi-step automation graph yang bisa dibuat admin atau AI tanpa recompile. Drag-drop canvas + node-based: klasifikasi AI, skill, shell, HTTP, DB query, branch. Trigger cron / channel / webhook / manual. State file-based + resume-able.
Empat sisi: trigger (kapan), action (apa), env (config + secret), data tables (state lookup). Notifikasi success/failure = explicit action node di graph (tidak ada field notify: terpisah). Plus governance (approve, guard). Engine impl pakai istilah "graph" / "node" / "DAG" — accurate di technical level (lihat §6).
cronchannelwebhookmanualschedule_aterror n8n-styleclassify (AI)agent (skills inside)channel — outbound (reply/send/react)connector — external APIshell / pythonhttp / db_querydatatable_* — wick internalbranch / paralleltext / textareasecret (encrypted)number / checkboxdropdown / kvlist / pickerdata_tables: field declare aliaswick_data_table_rows tablewick_data_tables.schema (JSONB)datatable_* node di Actiontype: channel atau type: connector action node di akhir graph. Branching success/failure pakai on_failure: fallback + fallback node. Implicit reply-to-source thread untuk trigger channel masih ada (engine convenience).
on_error: { trigger_workflow: "error-handler" } → kalau fail, engine fire workflow "error-handler" yg subscribe type: error trigger. Error-handler dapet payload (source workflow, failed node, error msg, state snapshot). Loop protection: max 3 nested error workflows. Cocok buat centralized escalation + auto-retry pattern.
<BaseDir>/workflows/support-triage/ ├─ workflow.yaml # graph + triggers + env schema ├─ env.yaml # values (UI-managed, secret encrypted) ├─ README.md ├─ nodes/ # per-node assets (opsional) │ ├─ classify-intent.md # prompt buat classify node │ ├─ format-answer.md # prompt buat agent node │ └─ enrich-user.sql # query buat db_query node ├─ __tests__/ # fixture per node │ └─ classify-intent.json └─ runs/ # state per run (auto-managed) └─ 01938...uuid/ ├─ state.json # snapshot ├─ events.jsonl # append-only log └─ nodes/ # output cache per node
# Schema + data live in Postgres, managed via UI tab: TABLE wick_data_tables WHERE slug = 'processed-events' # columns: slug, name, schema JSONB, # access JSONB, retention JSONB, ... TABLE wick_data_table_rows WHERE table_slug = 'processed-events' # composite PK (table_slug, pk) # data JSONB validated app-layer
Schema + data hidup di wick Postgres. UI Data Tables tab = source of truth. Workflow link via alias di data_tables: field. Lihat §9.
Empat pattern konkret. Workflow yang ga muat ke salah satunya = scope creep.
Pesan masuk #support → klasifikasi bug/question/feature → route ke connector / agent.
trigger: channel #support ↓ classify (AI): bug | question | feature | other ├─ bug → connector: linear · create_issue → channel reply ├─ question → agent: lookup docs + reply → channel reply ├─ feature → connector: airtable · append_row → channel reply └─ other → agent: friendly bounce
Pager fires → fetch 3 sumber paralel → gabung → analisis AI → escalate/heal.
trigger: webhook /hooks/pagerduty ↓ parallel: ├─ http: grafana dashboard ├─ shell: ssh prod "df -h" └─ db_query: recent errors ↓ merge agent: analisis ↓ classify: needs_human | self_heal ├─ human → connector: pagerduty · escalate └─ self_heal → shell: runbook.sh
Cron pagi → fetch 3 sumber paralel → AI summarize → post Slack.
trigger: cron "0 8 * * *" Asia/Jakarta ↓ parallel: ├─ connector: github · list_issues (24h) ├─ connector: linear · list_issues (24h) └─ channel: slack · summarize #product-feedback ↓ merge agent: bikin digest markdown ↓ channel: slack · send_message → #leadership
Inquiry → button → AI summary modal → submit ticket. 1 workflow, 3 trigger, 3 entry_node. Tidak ada engine pause — chain via channel interaction events.
triggers:
- { type: channel, event: message, entry_node: post-button }
- { type: channel, event: action, entry_node: fetch-thread }
- { type: channel, event: submission, entry_node: create-ticket }
graph (3 stage chains):
STAGE 1 (message → button):
post-button (connector: chat.post_with_button) [run #1 ends]
STAGE 2 (action → AI + modal):
fetch-thread → summarize (agent + skill) → show-modal [run #2]
STAGE 3 (submission → ticket + reply):
create-ticket → confirm-reply [run #3]
Pesan → "ini pertanyaan?" → "topiknya apa?" → "open/closed?" → handler tepat.
trigger: channel chat #support
↓
classify-1: question | statement
└─ question →
classify-2: product-A | product-B | other
└─ product-A →
classify-3: how-to | bug-report
├─ how-to → agent skills=[product-A-docs] → reply
└─ bug-report → connector tracker.create + reply
Drag node dari palette, sambungin edge, klik node buat edit. Drawflow (vanilla JS, ~28KB) cocok dengan stack wick (templ + tailwind + vanilla JS).
id: 0193e2b4-6c20-7a5f-... name: Slack Support Triage enabled: true triggers: - id: trigger-channel-slack type: channel entry_node: classify-intent channel: slack target: "#support" match: mention_bot: true graph: # entry/edges populated by canvas; graph.entry is a legacy # fallback only for hand-edited YAML without per-trigger entry_node. nodes: - id: classify-intent type: classify preset: classifier-cheap prompt: | Klasifikasi pesan support berikut. Pesan: {{.Event.Payload.text}} Jawab: bug | question | feature | other cases: bug: handle-bug question: handle-question feature_request: handle-feature default: silent-end - id: handle-bug type: skill skill: create-linear-ticket args: title: "{{.Event.Payload.text | truncate 80}}" project: SUPPORT next: reply-link
↑ Live Drawflow demo. Drag node, drag dari satu output ke input lain. (Read-only di mockup ini — production version full editable.)
Tiap node punya schema spesifik. Output di-stamp ke {{.Node.<id>.<field>}}, downstream consume via template.
LLM klasifikasi → verdict enum. 6-layer reliability (lihat §5).
type: classify
provider: claude # optional
preset: classifier-cheap
prompt: |
Klasifikasi: {{.Event.Payload.text}}
output_cases: [bug, question]
# edges in graph.edges[]:
# - { from: , case: bug, to: handle-bug }
# - { from: , case: default, to: end }
Agent reasoning, skills inside (provider-specific bundle). Session: new / root / persistent.
type: agent
provider: claude # claude|codex|gemini
preset: support-responder
session: new # new|root|persistent
skills: [docs-search]
prompt: |
Q: {{.Event.Payload.text}}
max_turns: 5
Outbound action via channel module (reply/send/react). Symmetric dgn trigger.
type: channel
channel: slack
op: reply_thread
args:
channel: "{{.Event.Payload.channel_id}}"
thread: "{{.Event.Payload.thread}}"
text: "Tracked: {{.Node.t.html_url}}"
Invoke connector module op. Full reuse connectors/<name>/.
type: connector
module: github
op: create_issue
args:
repo: "acme/inbox"
title: "{{.Event.Payload.text}}"
# edge: { from: , to: reply-link }
Bash/sh exec. Output: .stdout, .exit_code, parse JSON optional.
type: shell command: [bash, nodes/check.sh] env: HOST: prod-1.abc.com parse_output: json timeout_sec: 30
HTTP request dgn header/body template. SSRF allowlist enforced.
type: http
method: GET
url: https://api.github.com/...
headers:
Authorization: "Bearer {{.Secret.GH}}"
parse_response: json
retry: {max: 3}
Parameterized SQL only. Output: .rows, .row_count.
type: db_query
database: main
query: |
SELECT id, email FROM users
WHERE last_active > $1
args:
- "{{.Event.At}}"
Deterministic routing, no LLM. Boolean → if/else, string → switch.
type: branch
expr: '{{.Node.t.severity}}'
# edges:
# - { from: , case: critical, to: page-oncall }
# - { from: , case: high, to: notify-team }
# - { from: , case: default, to: silent }
Fan-out N branches, wait-for-all. Output keyed per branch.
type: parallel
branches:
- grafana-fetch
- db-fetch
- shell-check
# edge: { from: , to: merge-results }
jq / gotemplate / jsonpath. Reshape data antar node.
type: transform
engine: jq
input: "{{.Node.users.rows}}"
expression: "[.[] | .id]"
# edge: { from: , to: enrich }
venv per workflow, hash-validated. Output JSON parse optional.
type: python
script: nodes/process.py
requirements: requirements.txt
env:
DATA: "{{.Node.fetch.json}}"
parse_output: json
Explicit fan-in. Wait listed nodes, combine output.
type: merge
inputs: [a, b, c]
strategy: object
# edge: { from: , to: process }
"Ada row yang match?" 1 node check + branch. Dedup pattern terpendek.
type: datatable_exists
table: events
where:
id: "{{.Event.Payload.event_id}}"
is_processed: true
# edges:
# - { from: , case: "true", to: skip }
# - { from: , case: "false", to: process }
Load 1 row by primary key. Branch via found / not_found.
type: datatable_get
table: users
key:
id: "{{.Event.Payload.user}}"
# edges:
# - { from: , case: found, to: enrich } # .row avail
# - { from: , case: not_found, to: create }
Multi-row search. Filter, sort, paginate.
type: datatable_query
table: tickets
where: { status: pending }
order_by:
- { column: created_at, direction: desc }
limit: 50
Insert fresh atau insert-or-update (idempotent).
type: datatable_upsert
table: events
key: [id]
row:
id: "{{.Event.Payload.event_id}}"
status: received
is_processed: false
Terminator. Optional result as final workflow output.
type: end
result: "{{.Node.summary.text}}"
LLM ga deterministik — output bisa "Bug.", "I think this is bug", atau "bug_report". Wick handle lewat 6 lapis defense. Default sudah agresif (layer 1-3 active), sisanya opt-in.
| # | Layer | Apa yg dilakukan | Default |
|---|---|---|---|
| 1 | structured_output |
AI di-prompt return JSON schema {verdict, confidence, reasoning}. CLI output di-parse + validate. Provider abstract per impl (lihat §5.1 CLI vs API SDK). |
ON |
| 2 | normalize |
Lowercase, trim, strip ./,/"/'. "Bug." → "bug". |
ON |
| 3 | exact match | Cek cases keys. Found? Route. Skip ke layer 4. |
ON |
| 4 | fuzzy_match |
Levenshtein < 3 ke key terdekat. "bug_report" → "bug", "kestion" → "question". | opt-in |
| 5 | retry_on_mismatch |
Output ga match? Re-prompt dgn pesan lebih strict ("JAWAB CUMA satu dari: ..."). Retry N kali. | 1× |
| 6 | confidence_threshold |
AI return {verdict, confidence}. Kalau confidence < threshold → fire default case + audit warn. |
0.0 |
Engine derive JSON schema dari cases keys. Prompt builder kirim schema ke CLI sebagai instruksi, parser validate stdout:
{
"name": "classify_response",
"input_schema": {
"type": "object",
"properties": {
"verdict": {
"type": "string",
"enum": ["bug", "question",
"feature_request", "other"]
},
"confidence": {"type": "number"},
"reasoning": {"type": "string"}
},
"required": ["verdict"]
}
}
Embed sample input → output di prompt, reliability naik signifikan:
examples: - input: "ada bug di chat widget" output: bug - input: "gimana cara connect?" output: question - input: "tolong tambahin dark mode" output: feature_request
default case + audit log warning. default ga ada di cases = config bug, validator reject di parse time.
Wick agent jalan via subprocess internal/agents/provider/ — claude, codex, gemini adalah CLI binary, bukan API SDK call. Beda penting buat reliability strategy.
| API SDK (n8n / direct integration) | CLI subprocess (wick) | |
|---|---|---|
| Schema enforcement | tool_use / response_format di API call — provider-level |
Prompt-based + parse stdout — application-level |
| Output channel | JSON response body | stdout (text atau --output-format json kalau CLI support) |
| Latency | ~200-1000ms | ~500-2000ms (proc spawn overhead) |
| Tool ecosystem | per node config | inherit dari agent session (file/bash/MCP) — built-in |
| Cost predictability | tinggi (token count langsung) | indirect (parse CLI usage report) |
| Streaming | server-sent events | stdin/stdout stream |
type Provider interface { Name() string // ... existing agent methods // New for classify-style nodes: StructuredCall(ctx, prompt, schema, opts) (StructuredResult, error) } type StructuredResult struct { Raw string // stdout CLI Parsed map[string]any // JSON-parsed OK bool // parse + validate Error string Usage Usage // tokens, cost }
claude --output-format json --print <prompt> + system prompt yang minta JSON shape. Parse stdout.
Cek CLI capabilities, kemungkinan prompt-only + parse fallback.
Cek CLI capabilities, kemungkinan prompt-only + parse fallback.
CLI ga support strict JSON mode → fallback prompt-only + agresif layer 4-5 (fuzzy + retry).
Capabilities() interface declare structured mode support — engine pilih path optimal. Layer 4-5 tetep aktif sebagai safety net.
agent complex. Cocok untuk wick's domain: AI orchestration, bukan high-throughput API gateway.
User butuh tau alasan AI mutusin verdict tertentu. confidence + reasoning wajib di-surface di UI run timeline + audit page.
{
"verdict": "bug",
"confidence": 0.92,
"reasoning": "mentions production error
and widget breakage"
}
confidence + reasoning opt-out via output_fields: [verdict] kalau pengen minimal.
Reasoning expandable per click. Audit page indexed by verdict + confidence range untuk trace pattern.
Classify call = subprocess CLI + LLM tokens. Channel-rame workflow bisa nuclear cost. Three-layer defense:
Per workflow, existing §14 Security. Hit cap → trigger queue overflow per workflow config.
MAX_DAILY_CLASSIFY_TOKENSEnv field (widget: number, default 0=unlimited). Engine track running sum per UTC day. Hit cap → reject classify, fire default case if exists.
cache_ttl_secPer-node field. Hash Event.Text + prompt template, cache verdict. Saves N calls untuk pesan identik berulang. Reset on prompt change.
# workflow.yaml — apply cost cap env: - name: MAX_DAILY_CLASSIFY_TOKENS widget: number default: 100000 # cap budget for spam protection graph: nodes: - id: classify-intent type: classify cache_ttl_sec: 3600 # reuse classification 1h prompt: | ... cases: ... default: silent-end # fires kalau budget exceeded
Skill = capability bundle spesifik per provider (Claude Code skill, Codex skill, Gemini skill). Wick query provider buat list, AI/UI tampilin sebagai checklist di agent node Inspector.
type Provider interface { Name() string Capabilities() Capabilities StructuredCall(...) (...) // NEW: skill catalog discovery ListSkills(ctx) ([]Skill, error) } type Skill struct { Name string // "weekly-sync" Description string InputSchema map[string]any Source string // builtin | bundle }
claude --list-skills --output-format json atau parse ~/.claude/skills/ directory. Cache di status_cache.
Cek CLI capability di impl time, fallback empty list kalau ga support.
Cek CLI capability di impl time.
provider: claude di agent node → UI panggil Provider.ListSkills() (cached) → checklist muncul → user pilih → write ke skills: [...]. Validator workflow load reject kalau skill ga match provider (Claude skill di-pair dgn Gemini provider).
CLI spawn ~500ms-1s overhead. Workflow dgn 5 agent node = 5× spawn kalau setiap node fresh. Field session: per node control behavior.
| Mode | Behavior | Use case |
|---|---|---|
new default |
Fresh subprocess per node, ga share context | Isolation, deterministic, paralel-safe |
root |
Share single subprocess di-spawn di awal workflow run, semua root-session node lewat proses sama. Sequential dalam run. |
Multi-turn reasoning across nodes, faster (1× spawn) |
persistent |
Subprocess persist across workflow runs, session ID = workflow slug. Context inherit dari run sebelumnya | Long-running assistant pattern, learn-from-history |
- id: classify-intent type: classify session: new # default, classify independent - id: deep-analysis type: agent session: root # share context dgn root nodes lain ... - id: ongoing-monitor type: agent session: persistent # context bawa dari run sebelumnya ...
state.json map: {sessions: {root: <proc_id>, persistent: <proc_id>}}session: root run)new + root sessions, detach persistentcurrent, root re-spawn (lose intermediate context dari mode root — known trade-off, documented)classify = new (independent), agent = new (isolation safe). Opt-in root kalau butuh shared context, persistent selalu explicit.
branch — switch & if/elseRouting deterministik tanpa LLM. Sama node, dua pattern. Pakai ini dulu kalau aturan bisa di-encode pasti — 1ms latency, no cost.
- id: check-priority type: branch expr: '{{.Node.t.priority}} == "p0"' cases: "true": page-oncall "false": queue-normal
- id: route-severity type: branch expr: '{{.Node.t.severity}}' cases: critical: page-oncall high: notify-team medium: queue-normal default: queue-low
branch |
classify |
|
|---|---|---|
| Latency | < 1ms | 200-2000ms (LLM call) |
| Cost | gratis | per-token |
| Determinism | 100% | 95%+ (dgn 6 layer) |
| Input | structured / typed | natural language |
| When | aturan jelas | text ambiguous |
Channel (slack/telegram/rest/whatsapp/email) bukan hardcoded di workflow. Workflow konsumsi lewat registry self-describing — channel daftarin trigger spec + action spec, workflow auto-discover.
type Channel interface { Name() string IsConfigured() bool // Inbound: trigger schema TriggerSpecs() []TriggerSpec // Outbound: invoked via workflow node type: channel Actions() []ActionSpec Send(ctx, action, args) (any, error) // Event push to router Subscribe(handler EventHandler) }
type: channel (symmetric)Channel Actions() diakses langsung via node type: channel. Ga auto-promote ke skill registry — channel = own domain (agent conversation), skill reserved untuk Claude Code skill bundle.
Workflow consume: type: channel + channel: slack + op: reply_thread. AI discover via workflow_channels() MCP op.
type: connector + module: github + op: create_issue. Reuse 100% existing connector pattern.
triggers: - type: channel channel: slack event: message match_enabled: true # gate; false = dump-all match: mode: whitelist channel_id: '[{"id":"C123","name":"general"}]' text_contains: "support"
Form rendered from EventDescriptor.MatchSchema — each event declares a wick-tagged Go struct via entity.StructToConfigs. Picker chips serialize as [{id,name}] JSON; router does id-membership at dispatch time.
- id: reply-with-link type: channel # same module as trigger channel: slack op: reply_thread # one of channel.Actions() args: channel: "{{.Event.Payload.channel_id}}" thread: "{{.Event.Payload.thread}}" text: "Tracked: {{.Node.ticket.url}}"
UI Inspector: pilih channel dropdown → op dropdown auto-populate dari Channel.Actions(). AI discover via workflow_channels().
type: channel action ke source thread, engine inject synthetic node di akhir flow dgn {{.Run.final_result}} ke thread asal. Override via reply_source: false di trigger spec.
internal/agents/channels/<name>/, implement Channel interface (TriggerSpecs + Actions + Send), register di setup.Compose. Trigger router + action dispatcher otomatis pick up. Workflow engine + UI tidak butuh perubahan.
Workflow butuh config: Slack channel target, GitHub PAT, retry, feature flag. Schema di workflow.yaml (developer contract, version-controlled). Values di <slug>/env.yaml (UI-managed, secrets encrypted dgn wick_enc_).
Reuse vocabulary wick:"..." config-tag yang sudah ada (docs/reference/config-tags.md) — same widget + modifier names, same form renderer, same secret/kvlist/picker UI behavior. Beda cuma: Go module schema di struct tag, workflow schema di YAML.
env: - name: SLACK_CHANNEL widget: text desc: "Where to post notifications" default: "#support" - name: GITHUB_PAT widget: secret # encrypted; UI ••• when set desc: "GitHub PAT" required: true - name: MAX_DAILY_RUNS widget: number # auto for int/float default: 100 - name: ESCALATION_MODE widget: dropdown options: [pager, slack, email] default: slack - name: GUARD_PROMPT_EXTRA widget: textarea - name: ENABLE_AUTO_TRIAGE widget: checkbox # auto for bool default: true - name: ALLOWED_CHANNELS # multi-row table widget: kvlist columns: [id, name] - name: ALLOWED_USERS # typeahead from Slack widget: picker source: slack.users visible_when: ENABLE_AUTO_TRIAGE:true - name: GITHUB_WEBHOOK_URL widget: url - name: NOTIFY_EMAIL widget: email
# Schema dari workflow.yaml authoritative. # Storage identik dgn configs.value column. SLACK_CHANNEL: "#support-prod" GITHUB_PAT: wick_enc_aGVsbG8gd29ybGQ= MAX_DAILY_RUNS: 500 ESCALATION_MODE: pager GUARD_PROMPT_EXTRA: | Reject if notify ke #leadership tanpa approval. ENABLE_AUTO_TRIAGE: true ALLOWED_CHANNELS: # kvlist = JSON array - { id: "C123", name: "#support" } - { id: "C456", name: "#support-prod" } ALLOWED_USERS: # picker same shape - { id: "U100", name: "Yoga" } GITHUB_WEBHOOK_URL: "https://hooks.example.com/gh" NOTIFY_EMAIL: "alerts@abc.com"
- type: skill skill: slack.send_dm args: channel: "{{.Env.SLACK_CHANNEL}}" - type: http headers: Authorization: "Bearer {{.Secret.GITHUB_PAT}}"
{{.Env.X}} = non-secret. {{.Secret.X}} = encrypted, auto-decrypt runtime. Engine reject mixing — secret ga bisa render via .Env. (prevent log leak).
config-tags.md)| Widget | YAML literal | UI form |
|---|---|---|
text (default) | string | single-line input |
textarea | string | multi-line textarea |
secret | string (encrypted on disk) | password input + Reveal/Rotate |
number | int/float | number input |
checkbox | bool | toggle |
dropdown | string | select (needs options:) |
email / url | string | HTML type="email"/"url" |
color / date / datetime | string | native HTML pickers |
kvlist | JSON array of objects | editable inline table (needs columns:) |
picker | JSON array [{id,name}] | searchable typeahead (needs source: registered LookupProvider) |
config-tags.md)| YAML key | Effect |
|---|---|
desc | help text below field |
default | seed value kalau ga di-set |
required: true | block save kalau kosong; c.Missing() flag |
locked: true | read-only — set once at boot |
regen: true | regenerate button (need registered generator) |
hidden: true | seed di DB tapi ga muncul di form |
visible_when: field:value | conditional show — tampil cuma kalau field lain == value |
Same components as configs page existing — reuse, ga bikin renderer baru. Auto-save 800ms after last keystroke.
Where to post notifications
| id | name | |
|---|---|---|
| ✕ | ||
| ✕ |
workflow_get_env_schema(slug) → [{name, type, default, description, required}] workflow_get_env_values(slug, reveal_secrets=false) → {SLACK_CHANNEL: "#support", GITHUB_PAT: "wick_enc_..."} reveal_secrets=true → require admin token workflow_set_env_values(slug, values) → atomic write env.yaml, secret auto-encrypt
workflow_set_env_values, hasilnya stored encrypted. Future read default return ciphertext.
env: field = ga butuh config. Settings tab show "No config required".
Workflow butuh state lookup beyond single run: "udah handle event ini?", "user opt-out?", "cache enrichment yang masih fresh?". Data Tables = standalone tool (internal/tools/data-tables/), schema-aware, MCP-discoverable, UI table view built-in (n8n-style). Sejajar Workflows / Workspaces / Presets / Providers / Channels.
"udah pernah handle event_id X?" — query by primary key
tickets pending escalation, users opted-out, pending approvals
enriched data dgn TTL, avoid re-fetch external API
records arbitrary (mis. "tickets bulan Mei"), beyond JobRun
TABLE wick_data_tables ( slug TEXT PRIMARY KEY, name TEXT NOT NULL, description TEXT, schema JSONB NOT NULL, # columns + indexes access JSONB NOT NULL, # allowlist + flags retention JSONB, # ttl_days, by_column created_by TEXT, created_at TIMESTAMPTZ, updated_at TIMESTAMPTZ ); # Optional audit (opt-in via access.audit_changes): TABLE wick_data_table_audit ( id, table_slug, op, actor, diff, at );
wick database (existing, same as configs / jobs / sessions / connectors) TABLE wick_data_table_rows ( table_slug TEXT NOT NULL, pk TEXT NOT NULL, data JSONB NOT NULL, created_at TIMESTAMPTZ DEFAULT now(), updated_at TIMESTAMPTZ DEFAULT now(), PRIMARY KEY (table_slug, pk) ); -- Partial functional index per table: CREATE INDEX idx_events_status ON wick_data_table_rows ((data->>'status')) WHERE table_slug = 'events';
dataset.yaml, no git history. UI = source of truth (sama kayak n8n). Schema lives di wick_data_tables.schema JSONB. Data via shared Postgres table (wick_data_table_rows) dgn composite PK (table_slug, pk). JSONB validated app-layer. Indexes = partial functional, scoped per table. Audit changes opt-in.
Schema = JSONB di wick_data_tables.schema. Migration = single UPDATE + optional data migration job kalau rows perlu disentuh.
# wick_data_tables.schema (JSONB) { "columns": [ { "name": "event_id", "type": "string", "primary_key": true }, { "name": "status", "type": "enum", "options": ["received", "done"] }, { "name": "priority", "type": "enum", "options": ["low", "med", "high"] } ], "strictness": "strict" } # Audit row (opt-in): INSERT INTO wick_data_table_audit (op, actor, diff, ...) VALUES ('schema_change', ...);
| Op | Data touch? |
|---|---|
| Add column | ✗ default applies |
| Drop column (soft) | ✗ key tetep di JSONB |
| Drop column (hard) | ✓ batch UPDATE strip key |
| Rename column | ✓ batch UPDATE rename key |
| Change type | maybe — kalau lossy |
| Add index | ✗ CONCURRENTLY |
| Drop index | ✗ |
Schema change = atomic UPDATE wick_data_tables.schema, validator update. Ga ada CREATE TABLE atau ALTER TABLE per data table. 100 tables = 1 rows table.
Composite PK natural index buat datatable_get/datatable_exists by primary key. Sub-ms lookup.
Native column CHECK constraint ga ada (validate app-layer). Type-strict B-tree index = bikin partial functional index per kebutuhan.
# wick_data_tables row slug: processed-events name: "Processed channel events" schema: columns: - { name: event_id, type: string, primary_key: true } - { name: workflow_slug, type: string, indexed: true } - { name: handled_at, type: timestamp, indexed: true } - { name: result, type: json } indexes: - [workflow_slug, handled_at] strictness: strict retention: ttl_days: 90 by_column: handled_at access: workflows: [support-triage] ui_editable: true mcp_writable: true audit_changes: false
Pakai datatable_exists — sebelumnya butuh query + branch (2 nodes), sekarang cukup 1.
data_tables: - name: events # alias ref: events mode: read_write graph: entry: check-handled nodes: - id: check-handled type: datatable_exists table: events where: id: "{{.Event.Payload.event_id}}" is_processed: true cases: "true": end # already done "false": process - id: process ... next: mark-done - id: mark-done type: datatable_upsert table: events key: [id] row: id: "{{.Event.Payload.event_id}}" status: done is_processed: true handled_at: "{{.Run.StartedAt}}"
Column list, add/remove/rename. Schema change → atomic DB TX + audit entry (kalau enabled). Destructive change = typed confirmation + dry-run.
Paginated, sortable, filterable. Inline edit kalau ui_editable. Bulk delete + CSV/JSON export.
| event_id | workflow | handled_at |
|---|---|---|
| E-001 | support-triage | 10:05 |
| E-002 | support-triage | 10:08 |
| E-003 | incident-resp | 10:12 |
~ 1,247 rows
Ad-hoc WHERE/select, execute, show result. Schema-aware autocomplete.
where workflow_slug = 'support-triage' order_by handled_at desc limit 100
Data table di wick's DB tapi ga di-query bebas via db_query. Akses cuma lewat datatable_* node types, MCP ops, UI Data Tables tab, atau Service interface internal Go.
datatable_list() datatable_get(slug) datatable_create(slug, schema) datatable_update_schema(slug, patch) datatable_drop(slug) datatable_infer_schema(slug) datatable_export(slug, format) datatable_import(bundle, on_conflict)
datatable_query(slug, where, ...) datatable_insert(slug, row) datatable_upsert(slug, row) datatable_delete(slug, where) datatable_count(slug, where)
type Service interface { List() ([]Table, error) Get(slug string) (Table, error) Create(t Table) error UpdateSchema(slug, patch SchemaPatch) error Drop(slug string) error Query(slug, q Query) (QueryResult, error) Insert(slug, row map[string]any) (any, error) Upsert(slug, key []string, row ...) (UpsertResult, error) Delete(slug, where Where) (int64, error) Count(slug, where Where) (int64, error) }
Service translate ke parameterized SQL. Kontrak: no string concat, no raw SQL passthrough, no OR 1=1 injection bisa lewat.
db_query node ke external read-replica DBShare data table = use case utama (dedup events, cross-workflow state, shared lookup). Safety dari explicit contract.
# wick_data_tables.schema { "strictness": "strict", # layer 1 "columns": [ { "name": "id", "type": "string", "primary_key": true }, { "name": "source", "type": "enum", ... }, { "name": "status", "type": "enum", ... } ] } # wick_data_tables.access { "workflows": [ # layer 2: allowlist "webhook-handler", "calendar-poller", "slack-monitor" ], "read_only_workflows": ["audit-monthly"], "row_filter": "none" # layer 3: by_creator opt-in }
data_tables: - name: events # alias ref: events mode: read_write
3 safety layers buat shared table. Strictness, allowlist, row_filter. Default safe (strict + explicit empty allowlist). Power user opt-out ke lax kalau butuh JSONB flexibility.
| Mode | Behavior | Use case |
|---|---|---|
strict default |
Reject insert kalau ada extra key di luar columns. Workflow harus declare semua fields |
Production, critical data, multi-workflow share |
lax |
Accept extra keys, simpan ke JSONB. Warn di UI saat read kalau ada key tak dikenal. Query by extra key = full scan | Dev iteration, exploration, ad-hoc data |
Schema change yg sentuh existing rows = formal flow dgn manual user trigger di tiap step. Never auto-magic.
datatable_update_schema)wick_data_tables.schema, audit entry kalau enabledEngine stamp _meta.migrated_at per row. Re-run skip rows already stamped after migration start. Crash mid-job → restart aman.
Partial fail rollback batch transaction. Mark migration failed di JobRun, user retry atau revert.
Revert = PATCH schema kembali ke previous shape + reverse migration job. Audit table simpan diff buat replay.
100ms between batches avoid lock contention. Pause/resume mid-flight. JobRun page show progress.
UI New data table → tab Upload CSV:
.csv. Engine sniff header + first 100 rows.string default; int/float kalau semua sample numeric; timestamp kalau ISO parseable). User override per column.Limit awal: 10MB CSV, 100k rows per import. Larger via MCP datatable_import bundle.
Rows orphan di wick_data_table_rows tapi wick_data_tables ga ada.
Engine detect orphan rows ↓ UI list page: "Orphan table" badge + row count ↓ Klik → adoption modal: ├─ [Adopt] → sample N rows, infer schema dari │ JSONB keys. User review + edit + save. └─ [Drop] → DELETE rows (typed confirmation)
events.tar.gz ├─ schema.json # schema + access + retention └─ rows.jsonl # ordered by pk wick datatable import events.tar.gz \ --on-conflict abort # abort | overwrite | skip | merge
priority: "1" tapi schema expect enum [low|medium|high]):
transform node antara query + use, atau migration job batch UPDATE ke shape barudb_query vs datatable_*db_query |
datatable_* |
|
|---|---|---|
| Schema source | external system | wick wick_data_tables.schema JSONB |
| Storage | user-configured DSN | wick's Postgres (same DB) |
| Discovery | manual | MCP datatable_list + UI |
| UI | none | 3-panel table view + CSV upload |
| Migration | user's responsibility | wick-managed (audit-able) |
| TTL cleanup | external | built-in |
| When | data hidup di sistem lain | data lahir dari workflow / AI / UI |
AI-first mandate (§2 #0): workflow yang dibuat AI dari prompt harus testable sebelum deploy. 2 layer: unit test (single node + mock) + integration test (full flow + scripted events + mock external).
<BaseDir>/workflows/support-triage/ └─ __tests__/ ├─ nodes/ # unit tests — 1 file per node │ ├─ classify-intent.test.yaml │ ├─ handle-bug.test.yaml │ └─ summarize.test.yaml ├─ integration/ # integration tests — full flow │ ├─ bug-flow.test.yaml │ ├─ feature-flow.test.yaml │ └─ multi-stage.test.yaml ├─ fixtures/ # reusable input data └─ mocks/ # reusable mock responses
# __tests__/nodes/classify-intent.test.yaml node: classify-intent cases: - name: "bug pattern → verdict bug" input: Event: { Text: "production error di checkout" } mocks: provider_response: verdict: bug confidence: 0.92 expect: output.verdict: bug output.confidence: ">= 0.5" assertions: - { type: case_fired, value: bug } - name: "low confidence → default" input: Event: { Text: "lorem ipsum" } mocks: provider_response: { verdict: unclear, confidence: 0.3 } expect: output.verdict: "default" # layer 6 defends runtime_warnings: - "below confidence threshold"
# __tests__/integration/bug-flow.test.yaml name: "bug inquiry → ticket + reply" trigger: type: channel event: Type: channel Text: "production error di checkout" Channel: C123 Thread: 1234567.89 mocks: nodes: - node: classify-intent response: { verdict: bug, confidence: 0.9 } connectors: - module: tracker op: create_issue args_match: { title: contains "checkout" } response: { number: 42, url: "https://..." } channels: - channel: chat op: reply_thread capture: true assertions: - path_taken: [classify-intent, handle-bug, reply] - node: reply args.text: contains "issues/42" - final_status: success - duration_ms: "< 2000" - cost_usd: "< 0.01"
| Type | Real call | Mocked when test mode |
|---|---|---|
| Provider (classify, agent) | CLI subprocess | Engine bypass, return scripted |
| Connector (type: connector) | Operation.Execute() | Engine bypass, scripted response per args_match |
| Channel (type: channel) | Channel.Send() | Capture call (record args), scripted response |
| HTTP (type: http) | http.Do() | Match URL pattern + method |
| Data Table (datatable_*) | Postgres query | In-memory test DB atau scripted rows |
| Shell (type: shell) | exec.Cmd | Skip exec, scripted stdout/exit |
wick workflow test <slug> --filter node:classify --integration --watch --coverage --record <run-id> # MCP ops: workflow_test(slug, filter?) workflow_record_test(slug, run_id) workflow_test_coverage(slug) workflow_simulate(slug, event, mocks)
__tests__/nodes/*.test.yaml__tests__/integration/*.test.yamlworkflow_test(slug)workflow_request_reviewTests = AI's verification loop sebelum admin review. Reduce back-and-forth.
handle-feature, silent-end. Suggest adding test cases.
Tiap run = folder dengan state.json (snapshot) + events.jsonl (append-only). Crash mid-flow = resume dari node terakhir sukses.
{"ts":"10:00:01","event":"started","trigger":"channel"}
{"ts":"10:00:01","event":"node_started","node":"classify-intent"}
{"ts":"10:00:03","event":"node_completed","node":"classify-intent",
"output":{"verdict":"bug"},"duration_ms":2100,"tokens":245}
{"ts":"10:00:03","event":"node_started","node":"handle-bug"}
{"ts":"10:00:04","event":"node_completed","node":"handle-bug",
"output":{"ticket_id":"LINEAR-123"},"duration_ms":1800}
{"ts":"10:00:04","event":"node_started","node":"reply-link"}
{"ts":"10:00:05","event":"node_completed","node":"reply-link"}
{"ts":"10:00:05","event":"completed",
"duration_ms":4100,"cost_usd":0.0008}
Worker crash → next start baca state.current, lanjut dari sana. Run ID sama.
Klik ↻ Replay in editor di Runs panel → state + events loaded ke editor (badge node, Logs, INPUT/OUTPUT cache). Tidak fire run baru. Buat debug step-by-step.
Run selesai → fire run baru dgn event yg sama via ?prefill=<runID>. Skip dedup, audit log catat origin.
Debug: state restore sampai node X, lanjut dari sana dgn output baru. Belum impl.
Daily cleanup job (reuse connector-runs-purge pattern) prune runs/<id>/ folder + JobRun row.
retention: success_days: 30 # default failure_days: 90 # keep longer for debug keep_max: 10000 # absolute cap
Default applied kalau ga ada retention: field. Override per workflow sesuai pattern usage.
Workflow yg cuma butuh "did this event get handled?" → simpan ke data table (small, queryable, dedup-friendly). JobRun = audit/debug, bukan source of truth.
keep_max → engine suggest data-table approach + tighter retention.
30 hari default · prune oldest first kalau hit keep_max
90 hari default · debug-friendly window
Reaper tandain Failed kalau now − updated_at > 2× max_duration
Workflow editing dirancang biar AI dari mana saja bisa bikin/edit, ga peduli punya file tool atau tidak. MCP server di /mcp (HTTP + bearer) = single source of truth.
| Env | File access | MCP transport | Pattern |
|---|---|---|---|
| Claude Code, Cursor (local CLI) | ✓ native | stdio (local) | File tool + thin MCP introspection |
| Claude Desktop | ✗ | stdio atau HTTP/SSE ke wick | Full MCP ops (tier 1+2+3) |
| ChatGPT (custom GPT, plugin) | ✗ | HTTP /mcp + bearer |
Full MCP ops |
| Gemini Gems / custom action | ✗ | HTTP /mcp + bearer |
Full MCP ops |
| Wick built-in UI assistant | ✓ (server-side proxy) | in-process | File tool internal |
workflow_workspace() → entry point, base_dir, schemas workflow_node_types() → [{type, schema, example, when_to_use}] workflow_trigger_types() → trigger schema catalog workflow_channels() → channel registry self-describe workflow_connectors() → connector modules + rows + ops workflow_skills() → local Claude Code skill bundles only
workflow_list(filter) → [{slug, id, name, enabled, approved}] workflow_get(slug) → full workflow definition workflow_list_files(slug) → [{path, size, modified}] workflow_read_file(slug, path) → content (replace Read tool for remote AI)
workflow_create(slug, template?) workflow_write_file(slug, path, content) workflow_delete_file(slug, path) workflow_delete(slug)
workflow_add_node(slug, node) workflow_update_node(slug, id, patch) workflow_delete_node(slug, id) workflow_connect(slug, from, to, case?) workflow_disconnect(slug, from, to) workflow_move_node(slug, id, x, y) workflow_set_triggers(slug, triggers[]) workflow_toggle(slug, enabled)
workflow_validate(slug) workflow_simulate(slug, event) workflow_test(slug) workflow_run_now(slug, event?)
workflow_get_runs(slug, limit) workflow_get_run(slug, run_id) workflow_request_review(slug, msg) workflow_capture_fixture(slug, run_id, node)
workflow_workspace()workflow_create(slug)workflow_validate(slug)workflow_simulate(slug, evt)workflow_request_review(slug)workflow_workspace()workflow_node_types() ← discoverworkflow_create(slug)workflow_add_node(slug, ...) ×Nworkflow_connect(slug, ...) ×Nworkflow_write_file(slug, "nodes/prompt.md", content)workflow_validate / simulateworkflow_request_review(slug)Wick MCP server udah ada di /mcp. Workflow ops register ke existing server. Call:
POST https://wick.your-host.com/mcp
Authorization: Bearer wick_token_...
Content-Type: application/json
{
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "workflow_add_node",
"arguments": {
"slug": "support-triage",
"node": {...}
}
},
"id": 1
}
mcpServers: wick: url: https://wick.host.com/mcp headers: Authorization: Bearer wick_token_...
Action dgn OpenAPI spec reference /mcp. Bearer di Action authentication.
Function calling dgn HTTP action ke /mcp.
In-process MCP client, ga butuh auth (authenticated session).
Token punya 3 scope dimension: workflow allowlist, op allowlist, approval cap. Audit log catat tiap call.
write_file, ga ada Edit/PartialEdit. Engine atomic tmp+rename.list_files cuma path; isi besar harus read_file per-file (lebih round trip)workflow_grep future kalau perluScope di-assign saat token issuance. Default AI assistant = write-without-enable + wajib panggil workflow_request_review. Admin approve via UI (lihat §13 AI guard).
| Use case | Read | File CRUD | Canvas ops | Toggle enable | Approve |
|---|---|---|---|---|---|
| AI assistant default for MCP tokens | ✓ | ✓ | ✓ | ✗ | ✗ |
| Admin user | ✓ | ✓ | ✓ | ✓ | ✓ |
| Read-only viewer | ✓ | ✗ | ✗ | ✗ | ✗ |
| CI/CD push | ✓ | ✓ | ✗ | ✓ (post-CI) | ✗ |
Per-token: ["*"] atau specific slugs. Reject MCP call ke workflow di luar allowlist.
Per-token: list MCP ops yang allowed. Default sesuai use-case template.
Tiap call: token ID, user, op, args hash, result, timestamp. Reuse infra audit existing.
Ephemeral reviewer agent baca semua node + script + prompt, banding dgn rule. Block / warn / off per config.
No rm -rf, dd, mkfs, fork bombs.
classify prompt ga passthrough {{.Event.Payload.text}} ke shell raw. db_query parameterized only.
HTTP host allowlist. No local IP (SSRF defense). Secret tag (wick_enc_) wajib.
User klik "Enable" di UI │ ▼ Guard enabled? ──no──► commit │ yes ▼ Spawn reviewer (preset: guard) │ ▼ Prompt: "Review workflow. Rules: ... Graph: ${yaml.graph}. Files: ${all_files}. Return: {ok, violations[]}" │ ▼ Parse JSON response ├── ok=true ─────────► commit + audit "guard passed" ├── ok=false, warn ──► commit + warning banner ├── ok=false, block ─► reject + show violations └── timeout ─────────► block (fail-closed) Override: button "Override Guard" + mandatory reasoning → audit log
Workflow boleh exec arbitrary code dan call external — surface area besar. Multi-layer defense.
approved=false by default<user_input> tag$1, $2)require_role: admin di manual triggerBukan kompetitor — pendekatan AI fundamentally beda. n8n = API SDK approach; wick = CLI subprocess approach.
| Aspect | n8n | Wick Workflow |
|---|---|---|
| Decision logic | If/Switch dgn expression | branch (if/switch) + classify (AI natural lang) — dua-duanya tersedia |
| AI integration | API SDK per node (OpenAI/Anthropic) — 1 node = 1 API call | CLI subprocess (claude/codex/gemini) — AI = agent dgn tool ecosystem built-in (Read/Edit/Bash/MCP) |
| Output reliability | tool_use schema enforcement di API level |
Prompt-based JSON + parser + 5-layer fallback (§5.1) |
| AI nodes | Add-on, generic LLM call | First-class, pool + session reuse, share state antar node |
| Editor | UI wajib | UI atau YAML atau AI-via-MCP |
| Storage | DB | File-based, gitops-friendly |
| Skill orchestration | Generic webhook | Skill registry first-class — channel actions auto-registered |
| Deployment | Hosted / self-host k8s | Embedded di wick binary |
| Latency per AI node | ~200-1000ms (API direct) | ~500-2000ms (proc spawn overhead) — accepted trade-off |
Call OpenAI API per node, tiap call independen. Cocok buat "LLM as one of many service integrations".
Spawn agent CLI yang punya tool ecosystem (file/bash/MCP) inheriting dari agent session. Cocok buat "AI orchestration dgn skill + file manipulation + MCP tools".