Execution Engine¶
The execution engine is the core of Pipelit's runtime. It compiles workflow graphs, resolves execution order, manages state, executes node components, and broadcasts real-time status events. This page covers each service in detail.
Execution Pipeline Overview¶
sequenceDiagram
participant Trigger as Trigger Handler
participant RQ as RQ Worker
participant Builder as Builder
participant Topo as Topology
participant Orch as Orchestrator
participant Expr as Expression Resolver
participant Comp as Component
participant State as Redis State
participant WS as WebSocket Broadcast
Trigger->>RQ: dispatch_event() -> enqueue job
RQ->>Builder: build_graph(workflow, trigger_node)
Builder->>Topo: get_reachable_nodes(trigger_node)
Topo-->>Builder: reachable node set
Builder-->>RQ: CompiledGraph
RQ->>Orch: execute(execution, compiled_graph)
Orch->>State: Initialize execution state
loop For each node in topological order
Orch->>WS: node_status: "running"
Orch->>Expr: resolve_expressions(node.config, state)
Expr-->>Orch: resolved config
Orch->>Comp: execute_component(node, state)
Comp-->>Orch: component output (flat dict)
Orch->>Orch: process output (wrap, side effects)
Orch->>State: store node_outputs[node_id]
Orch->>WS: node_status: "success" + output
end
Orch->>State: Cleanup execution state
Orch->>WS: execution_completed Builder (services/builder.py)¶
The builder compiles a Workflow database model into a LangGraph CompiledGraph. This is the bridge between Pipelit's database representation and LangGraph's execution model.
Trigger-Scoped Compilation¶
When a trigger fires, the builder only compiles nodes reachable downstream from that specific trigger. This is a critical design decision:
- A single workflow can have multiple trigger branches (e.g., a chat trigger and a Telegram trigger feeding different agent chains)
- Unused nodes on the canvas do not cause build errors
- Each trigger activation produces a minimal, focused execution graph
graph LR
subgraph "Full Workflow Canvas"
TC[trigger_chat] --> A1[agent_1]
TT[trigger_telegram] --> A2[agent_2]
A2 --> Code[code_1]
Unused[unused_node]
end
subgraph "Compiled Graph (chat trigger fires)"
TC2[trigger_chat] --> A1b[agent_1]
end In this example, when the chat trigger fires, only trigger_chat and agent_1 are compiled. trigger_telegram, agent_2, code_1, and unused_node are all excluded.
Compilation Steps¶
- Load workflow nodes and edges from the database
- BFS reachability from the trigger node (via
topology.py) - Filter nodes and edges to the reachable set
- Resolve LLM configurations for AI-type nodes
- Load tool factories for connected tool nodes
- Build LangGraph state graph with node functions and edge routing
- Compile and optionally cache the result in Redis
Graph Caching¶
Compiled graphs are cached in Redis keyed by workflow ID and a structural hash. The cache is invalidated whenever nodes or edges are modified (detected via WebSocket-broadcast mutations).
Topology Analyzer (services/topology.py)¶
The topology service provides BFS-based graph analysis:
def get_reachable_nodes(
trigger_node_id: str,
edges: list[WorkflowEdge],
) -> set[str]:
"""BFS from trigger node following direct edges.
Returns the set of node_ids reachable downstream."""
This is used by the builder for trigger-scoped compilation and by the orchestrator to determine execution order.
Orchestrator (services/orchestrator.py)¶
The orchestrator is the execution engine's core. It walks through nodes in topological order, resolves template expressions, executes components, manages state, and broadcasts events.
Execution Lifecycle¶
For each node in the execution:
- Status broadcast -- Publish
node_status: "pending"then"running"via WebSocket - Expression resolution -- Resolve
{{ nodeId.portName }}Jinja2 templates insystem_promptandextra_config - Component execution -- Call the component factory to get the node function, then execute it
- Output processing -- Process the component's return dict:
- Non-underscore keys are wrapped into
node_outputs[node_id] _routesetsstate["route"]for conditional routing_messagesare appended tostate["messages"]_state_patchis merged into global state
- Non-underscore keys are wrapped into
- Result recording -- Wrap output in a
NodeResultwith status, data, error code, and metadata - State storage -- Store
node_outputsandnode_resultsin Redis - Status broadcast -- Publish
node_status: "success"(with output) or"failed"(with error)
State Management¶
Execution state is stored in Redis during execution:
state = {
"messages": [...], # LangGraph message list
"node_outputs": { # Per-node output data
"trigger_chat_abc": {"text": "hello", "payload": {...}},
"agent_def": {"output": "response text"},
},
"node_results": { # Per-node execution results
"trigger_chat_abc": NodeResult(status="success", ...),
},
"route": "category_a", # Current route for conditional edges
"execution_id": "exec_xyz",
"user_context": {...}, # User info from trigger
}
Subworkflow Handling¶
When a node returns {"_subworkflow": {...}}, the orchestrator:
- Creates a child
WorkflowExecution - Enqueues the child execution as an RQ job
- Sets the parent node status to
"waiting" - Releases the RQ worker (non-blocking)
- When the child completes, injects the result into
state["_subworkflow_results"] - Re-enqueues the parent node for continuation
Error Handling¶
Failed nodes are wrapped in NodeResult.failed() with:
error-- Human-readable error messageerror_code-- Machine-readable error classificationmetadata-- Additional context (stack trace, timing)
The orchestrator decides whether to skip downstream nodes or halt the entire execution based on the error severity and the node's position in the graph.
Expression Resolver (services/expressions.py)¶
Before executing a component, the orchestrator resolves Jinja2 template expressions in system_prompt and extra_config values.
Template Syntax¶
{{ nodeId.portName }} -- Reference a specific node's output port
{{ trigger.text }} -- The trigger's text input
{{ trigger.payload }} -- The trigger's full payload
{{ trigger.payload.key }} -- A specific key from the trigger payload
Context Variables¶
The expression context includes:
| Variable | Source | Example |
|---|---|---|
<node_id> | node_outputs[node_id] | {{ agent_abc.output }} |
trigger | The trigger that fired this execution | {{ trigger.text }} |
The trigger shorthand always refers to whichever trigger fired the current execution, which is useful in multi-trigger workflows where chat and Telegram triggers feed the same downstream nodes.
Jinja2 Features¶
- Standard filters are supported:
{{ trigger.text | upper }} - Undefined variables gracefully fall back to the original template string (no errors)
- Nested access works:
{{ node.output.nested.key }}
Frontend Integration¶
The frontend provides an ExpressionTextarea component with a { } button that opens a VariablePicker popover. The picker performs BFS over upstream nodes and presents clickable {{ nodeId.port }} items for insertion.
Executor (services/executor.py)¶
The executor is the top-level wrapper around the orchestrator, responsible for:
- Job setup -- Loading the workflow, execution, and related data from the database
- Graph building -- Calling the builder to compile the LangGraph graph
- Execution dispatch -- Calling the orchestrator to execute the graph
- Finalization -- Updating the execution status, persisting costs, and broadcasting completion events
- Cleanup -- Removing execution state from Redis
RQ Job Wrappers¶
def execute_workflow_job(execution_id: str):
"""RQ job entry point for full workflow execution."""
def execute_node_job(execution_id: str, node_id: str):
"""RQ job entry point for single-node re-invocation
(used after subworkflow completion)."""
State Model (services/state.py)¶
The workflow state extends LangGraph's MessagesState:
class WorkflowState(MessagesState):
"""Global state passed through the LangGraph execution."""
messages: Annotated[list[AnyMessage], add_messages]
node_outputs: dict[str, dict]
node_results: dict[str, NodeResult]
route: str
execution_id: str
user_context: dict
The messages field uses LangGraph's add_messages reducer, which handles deduplication via stable message IDs. This is critical for conversation memory where the system prompt is re-injected on each invocation.
Component Output Convention¶
Components return flat dicts with their port values:
# Simple component
return {"output": "processed text"}
# Multi-port component
return {"category": "spam", "raw": "original text", "confidence": 0.95}
The orchestrator automatically wraps all non-underscore keys into node_outputs[node_id].
Reserved Underscore Keys¶
| Key | Effect |
|---|---|
_route | Sets state["route"] for conditional edge routing |
_messages | Appended to state["messages"] (LangGraph message list) |
_state_patch | Dict merged into global state (excluding protected keys) |
_subworkflow | Signals subworkflow delegation to orchestrator |
_token_usage | Token usage data accumulated by orchestrator for cost tracking |
Components no longer receive or use their own node_id. Legacy format (returning node_outputs directly) is still supported for backwards compatibility.
Conditional Routing¶
Switch Node¶
The switch node evaluates rules against input data and emits a _route value:
rules = [
{"field": "trigger.payload.type", "operator": "equals", "value": "urgent", "route": "fast_path"},
{"field": "trigger.payload.type", "operator": "equals", "value": "normal", "route": "standard"},
]
# Falls back to "default" route if no rules match
Per-Edge Condition Values¶
Each conditional edge carries a condition_value string. The orchestrator matches state["route"] against each outgoing edge's condition_value to determine the next node:
graph LR
Switch[switch] -->|condition_value="fast_path"| Agent1[urgent_agent]
Switch -->|condition_value="standard"| Agent2[normal_agent]
Switch -->|condition_value="default"| Log[logger] Only switch nodes can originate conditional edges. This is enforced by the edge creation API.