Execution¶
An execution is a single run of a workflow. When a trigger fires, Pipelit compiles the reachable portion of the workflow into a directed graph, then the orchestrator runs each node asynchronously via RQ workers, broadcasting status updates over WebSocket in real time.
Execution Lifecycle¶
sequenceDiagram
participant Trigger as Trigger (chat/telegram/schedule)
participant Handler as Trigger Handler
participant DB as Database
participant Builder as Topology Builder
participant Redis as Redis State
participant RQ as RQ Worker
participant Orch as Orchestrator
participant WS as WebSocket
participant UI as Frontend
Trigger->>Handler: Event arrives
Handler->>DB: Create WorkflowExecution (status: pending)
Handler->>RQ: Enqueue start_execution job
RQ->>Orch: start_execution(execution_id)
Orch->>DB: Set status: running
Orch->>WS: execution_started
WS->>UI: Reset node statuses
Orch->>Builder: build_topology(workflow, trigger_node_id)
Builder->>DB: Query nodes & edges
Builder-->>Orch: Topology (entry nodes, edges, fan-in counts)
Orch->>Redis: Save topology + initial state
loop For each entry node
Orch->>RQ: Enqueue execute_node_job
end
RQ->>Orch: execute_node_job(execution_id, node_id)
Orch->>WS: node_status: running
WS->>UI: Show spinning badge
Orch->>Orch: Resolve expressions, load component, execute
Orch->>Redis: Merge result into state
Orch->>DB: Write ExecutionLog
Orch->>WS: node_status: success (with output)
WS->>UI: Show checkmark badge + output link
Orch->>Orch: Advance to successor nodes
Note over Orch,RQ: Repeat for each downstream node
Orch->>DB: Set status: completed, extract final output
Orch->>WS: execution_completed
WS->>UI: Update execution status
Orch->>Redis: Cleanup execution keys Trigger-Scoped Execution¶
When a trigger fires, the builder does not compile the entire workflow. Instead, it performs a BFS (breadth-first search) from the trigger node over direct edges to find all reachable downstream nodes. Only these nodes are included in the execution.
This design enables several patterns:
- Multiple trigger branches -- a single workflow can have a chat trigger and a Telegram trigger, each leading to different processing paths. Firing one trigger does not execute the other branch.
- Unused nodes -- nodes that are not connected to any trigger (e.g., nodes being configured but not yet wired in) are silently ignored.
- No build errors from disconnected nodes -- incomplete branches do not cause validation failures during execution.
Sub-Component Exclusion
Sub-components (ai_model, tools, output_parser, memory_read, memory_write, etc.) are excluded from the execution graph even if reachable. They are resolved at build time by their parent nodes, not executed independently by the orchestrator.
Node Status Lifecycle¶
Each node in an execution progresses through a status lifecycle:
stateDiagram-v2
[*] --> pending : Node enqueued on RQ
pending --> running : Worker picks up job
running --> success : Component returns result
running --> failed : Component raises exception
running --> waiting : Subworkflow spawned
running --> skipped : Condition not met
failed --> running : Retry (up to 3 attempts)
waiting --> running : Child execution completes | Status | Description | Canvas Badge |
|---|---|---|
pending | Node is queued on RQ, waiting for a worker | -- |
running | Node component is currently executing | Spinning circle |
success | Node completed and produced output | Green checkmark |
failed | Node raised an exception after all retries | Red X |
skipped | Node was skipped (e.g., conditional routing took another path) | Gray dash |
waiting | Node is waiting for a child subworkflow to complete | -- |
Retry Logic¶
If a node raises an exception, the orchestrator retries it up to 3 times with exponential backoff:
| Attempt | Delay |
|---|---|
| 1st retry | 1 second |
| 2nd retry | 2 seconds |
| 3rd retry | 4 seconds |
After 3 failed attempts, the execution fails with the error from the last attempt, unless the node is inside a loop body with on_error=continue, in which case the loop advances to the next iteration.
ExecutionLog Entries¶
Every node execution produces an ExecutionLog entry in the database:
| Field | Type | Description |
|---|---|---|
execution_id | UUID | Parent execution |
node_id | string | Which node ran |
status | string | completed or failed |
duration_ms | int | Wall-clock execution time in milliseconds |
output | dict | Node output (for successful executions) |
error | string | Error message (for failed executions, truncated to 2,000 chars) |
error_code | string | Exception class name (e.g., ValueError, TimeoutError) |
log_metadata | dict | Additional metadata including token usage |
Logs are visible on the Execution Detail page, where each row can be expanded to show the full output in a <pre> block.
NodeResult¶
The orchestrator wraps every node's raw output into a NodeResult Pydantic model:
class NodeResult(BaseModel):
status: NodeStatus # success, failed, skipped
data: dict[str, Any] # The component's output data
error: NodeError | None # Error details if failed
metadata: dict[str, Any] # Token usage, timing, etc.
started_at: datetime
completed_at: datetime
Factory methods provide a clean API for creating results:
NodeResult.success(data={...})-- successful execution with output data.NodeResult.failed(error_code="ValueError", message="...")-- failed execution with error details.NodeResult.skipped(reason="Route did not match")-- skipped node with explanation.
All NodeResult objects are stored in state["node_results"] keyed by node ID and persisted to Redis during execution.
RQ Worker Processing¶
Executions run asynchronously on RQ (Redis Queue) workers:
- Trigger handlers enqueue a
start_executionjob on theworkflowsqueue. - The
start_executionjob builds the topology, initializes state, and enqueues oneexecute_node_jobper entry node. - Each
execute_node_jobexecutes a single node, then enqueues jobs for successor nodes. - When no more in-flight nodes remain, the orchestrator finalizes the execution.
RQ Queue: "workflows"
├── start_execution(exec_id)
│ ├── execute_node_job(exec_id, node_A)
│ │ ├── execute_node_job(exec_id, node_B)
│ │ └── execute_node_job(exec_id, node_C) # fan-out
│ │ └── execute_node_job(exec_id, node_D)
The default job timeout is 600 seconds (10 minutes). An in-flight counter in Redis tracks how many nodes are currently executing. When it reaches zero, _finalize() is called.
Worker Requirement
At least one RQ worker must be running to process executions. Without a worker, jobs will queue indefinitely. Start a worker with: rq worker workflows --url $REDIS_URL
Component Output Convention¶
Components (the Python functions that implement each node type) return flat dictionaries. The orchestrator interprets the keys:
Regular Keys¶
Any key that does not start with an underscore is treated as a port output value and stored in state["node_outputs"][node_id]:
# Component returns:
{"output": "Hello world", "confidence": 0.95}
# Orchestrator stores:
state["node_outputs"]["agent_abc123"] = {
"output": "Hello world",
"confidence": 0.95
}
These values are accessible to downstream nodes via expressions: {{ agent_abc123.output }}.
Reserved Underscore-Prefixed Keys¶
Keys starting with _ are side-effect directives that control execution state:
| Key | Type | Effect |
|---|---|---|
_route | string | Sets state["route"] for conditional edge routing. Used by switch nodes and AI routers. |
_messages | list | Appended to state["messages"] (the LangGraph message list). Used by agent nodes to persist the full conversation. |
_state_patch | dict | Merged into the global state dict. Protected keys (messages, node_outputs, node_results) cannot be overwritten. |
_delay_seconds | float | Delays enqueuing successor nodes by this many seconds. Used by wait nodes. |
_loop | dict | Contains items list for loop node iteration. |
_subworkflow | dict | Contains child_execution_id for subworkflow nodes waiting on a child. |
_token_usage | dict | Token counts, cost, and tool invocation stats. Accumulated into execution-level totals. |
Legacy Output Format
For backwards compatibility, components that return {"node_outputs": {...}} directly (the legacy format) are also supported. The orchestrator detects this and merges accordingly.
State Management¶
Execution state is stored in Redis with a 1-hour TTL. The state dict contains:
| Key | Description |
|---|---|
messages | LangGraph message list (HumanMessage, AIMessage, ToolMessage, etc.) |
trigger | The trigger payload that started the execution |
user_context | User profile ID and Telegram chat ID |
execution_id | UUID of the current execution |
route | Current routing value for conditional edges |
node_outputs | Dict of {node_id: {port: value}} for all completed nodes |
node_results | Dict of {node_id: NodeResult} for all completed nodes |
current_node | ID of the node currently being executed |
loop | Loop iteration context (item, index, total) when inside a loop body |
State is loaded and saved around each node execution. Multiple RQ workers can process different nodes of the same execution concurrently, with Redis providing the shared state.
Fan-Out and Fan-In¶
The orchestrator supports parallel execution branches:
- Fan-out -- when a node has multiple direct outgoing edges, all target nodes are enqueued simultaneously on RQ.
- Fan-in -- merge nodes track incoming edge counts. A merge node only executes when all of its upstream branches have completed. This is tracked with a Redis counter per merge node.
Conditional Routing¶
Switch nodes and AI router nodes set state["route"] via the _route key. The orchestrator matches this value against condition_value on each outgoing conditional edge to determine which branch to take. Only the matched target node is enqueued.
Finalization¶
When all in-flight nodes complete (the Redis counter reaches zero), the orchestrator finalizes:
- Extracts the final output from state (last AI message, or
node_outputs). - Sets
execution.status = "completed"and persists cost data. - Publishes
execution_completedvia WebSocket. - Runs output delivery (Telegram reply, webhook callback, etc.).
- Completes the memory episode for the execution.
- If this is a child execution, resumes the parent at its waiting node.
- Cleans up all Redis keys for the execution.