Multi-Agent Delegation¶
Pipelit supports hierarchical multi-agent task delegation. An agent can receive a complex goal, decompose it into tasks, create or discover workflows to execute those tasks, track progress through a persistent registry, and learn from outcomes.
This architecture builds entirely on Pipelit's existing primitives (workflow CRUD, agent nodes, tool sub-components, subworkflow execution) with the addition of a task registry and specialized agent tools.
Architecture Overview¶
graph TB
subgraph Trigger["Trigger Layer"]
Chat[Chat Trigger]
TG[Telegram Trigger]
WH[Webhook Trigger]
end
subgraph MainAgent["Main Agent"]
Agent[Agent Node + Tools]
Decision{Decision}
Agent --> Decision
Decision -->|Simple| Direct[Respond Directly]
Decision -->|Multi-step| Epic[Create Epic]
Decision -->|Familiar| Search[Search Registry]
end
subgraph Registry["Task Registry"]
EpicModel[Epic<br/>Top-level goal]
Tasks[Tasks<br/>Discrete units of work]
EpicModel --> Tasks
end
subgraph Execution["Workflow Execution"]
Inline[Inline Tool Calls]
Spawn[spawn_and_await<br/>Subworkflow]
Create[workflow_create<br/>+ spawn_and_await]
end
Chat --> Agent
TG --> Agent
WH --> Agent
Epic --> Tasks
Search --> Tasks
Tasks -->|Simple task| Inline
Tasks -->|Complex task| Spawn
Tasks -->|Novel task| Create
Spawn -->|Results| Registry
Create -->|Results| Registry
classDef trigger fill:#fed7aa,stroke:#ea580c
classDef agent fill:#dbeafe,stroke:#2563eb
classDef registry fill:#d1fae5,stroke:#059669
classDef exec fill:#fce7f3,stroke:#db2777
class Chat,TG,WH trigger
class Agent,Decision,Direct,Epic,Search agent
class EpicModel,Tasks registry
class Inline,Spawn,Create exec Key Architectural Decisions¶
Workflows Over Agents¶
The unit of delegation is workflows, not individual agents. Agents are single nodes. Workflows are composable graphs with triggers, tools, routing, and memory -- strictly more expressive. An agent delegating to a workflow subsumes delegating to an agent.
Dynamic Subworkflows Over JSON Plans¶
Agents create executable workflow graphs, not static JSON task plans. A dynamically created subworkflow IS the plan AND is immediately executable:
- Nodes = subtasks
- Edges = dependencies
- Fan-out topology = parallel groups
This eliminates the "plan then interpret then execute" pipeline.
Two-Level Task Hierarchy¶
The registry uses two levels only -- Epics and Tasks:
- Epic = Top-level goal (e.g., "Write comprehensive tests for platform auth"). Spans multiple tasks, tracks budget, aggregates cost.
- Task = Discrete unit of work (e.g., "Analyze coverage gaps in auth module"). Maps to one workflow execution or one inline tool call sequence.
- Subtask = Nodes within a workflow. Already exist as workflow internals. Not modeled in the registry.
Task Registry¶
Epic Model¶
class Epic(Base):
__tablename__ = "epics"
id: str # ULID primary key (ep_xxxxxxxxxxxx)
title: str # Short goal name
description: str # Detailed goal, constraints, criteria
tags: list # For discovery (JSON column)
status: str # planning | active | paused | completed | failed | cancelled
priority: int # 1=critical, 2=high, 3=medium, 4=low
# Budget tracking
budget_tokens: int | None # Token ceiling
budget_usd: float | None # USD ceiling
spent_tokens: int # Running total from child tasks
spent_usd: float # Running total from child tasks
agent_overhead_tokens: int # Main agent's reasoning cost
# Progress
total_tasks: int
completed_tasks: int
failed_tasks: int
Task Model¶
class Task(Base):
__tablename__ = "tasks"
id: str # ULID primary key (tk_xxxxxxxxxxxx)
epic_id: str # FK -> epics
title: str
status: str # pending | blocked | running | completed | failed | cancelled
# Workflow linkage
workflow_slug: str | None # Assigned workflow
execution_id: str | None # Current execution (soft reference)
workflow_source: str # inline | existing | created | template
# Dependencies
depends_on: list # List of task IDs (JSON column)
# Requirements for workflow matching
requirements: dict # {"model": "gpt-4", "tools": ["code"], ...}
# Cost tracking
actual_tokens: int
actual_usd: float
retry_count: int
max_retries: int
Status Lifecycles¶
stateDiagram-v2
state "Epic Lifecycle" as EpicLC {
[*] --> planning
planning --> active
active --> completed
active --> paused
paused --> active
active --> failed
active --> cancelled
paused --> cancelled
}
state "Task Lifecycle" as TaskLC {
[*] --> pending
pending --> blocked
blocked --> pending : dependencies met
pending --> running
running --> completed
running --> failed
failed --> pending : retry
pending --> cancelled
blocked --> cancelled
} Cost Aggregation¶
Costs roll up automatically from Task to Epic:
def sync_epic_costs(epic: Epic):
tasks = epic.tasks
epic.spent_tokens = sum(t.actual_tokens for t in tasks)
epic.spent_usd = sum(t.actual_usd for t in tasks)
epic.total_tasks = len(tasks)
epic.completed_tasks = sum(1 for t in tasks if t.status == "completed")
epic.failed_tasks = sum(1 for t in tasks if t.status == "failed")
Budget is checked before spawning new tasks:
def check_budget(epic: Epic, estimated_tokens: int) -> tuple[bool, str]:
if epic.budget_tokens and (epic.spent_tokens + estimated_tokens > epic.budget_tokens):
return False, "Would exceed token budget"
return True, "ok"
Agent Tools¶
Registry Tools¶
These are registered as tool sub-components and connected to agent nodes via tool edges:
| Tool | Function Name | Description |
|---|---|---|
epic_create | create_epic | Create a tracked epic with budget and tags |
epic_status | epic_status | Get progress, cost, and task breakdown |
epic_update | update_epic | Transition status, adjust budget, record outcome |
epic_search | search_epics | Search past epics by description and tags |
task_create | create_task | Create a task with dependencies and requirements |
task_list | list_tasks | List tasks filtered by epic, status, or tags |
task_update | update_task | Update status, add notes, record results |
task_cancel | cancel_task | Cancel task and its running execution |
spawn_and_await¶
The critical delegation tool. Spawns a subworkflow execution and returns results using LangGraph's interrupt() primitive -- no RQ workers are blocked.
sequenceDiagram
participant RQ1 as RQ Worker 1
participant Agent as Agent (LangGraph)
participant Check as Checkpointer
participant Orch as Orchestrator
participant RQ2 as RQ Worker 2
participant Child as Child Workflow
RQ1->>Agent: invoke(messages)
Agent->>Agent: LLM reasons: "delegate this task"
Agent->>Agent: Call spawn_and_await tool
Agent->>Check: interrupt() - save full state
Note over Check: Conversation + pending<br/>tool call saved
Agent-->>RQ1: Return _subworkflow signal
RQ1->>Orch: Handle _subworkflow
Orch->>RQ2: Enqueue child execution
Note over RQ1: Worker released
RQ2->>Child: Execute child workflow
Child-->>RQ2: Child result
RQ2->>Orch: Child completed
Orch->>Orch: Inject result into state
Orch->>RQ1: Re-enqueue agent node
RQ1->>Check: Load saved state
RQ1->>Agent: Command(resume=child_result)
Agent->>Agent: interrupt() returns child result
Agent->>Agent: LLM continues: "The subworkflow returned..."
Agent-->>RQ1: Normal completion Dual Checkpointer Strategy¶
spawn_and_await requires a checkpointer to save/restore agent state during interrupt/resume. Two backends are used depending on configuration:
| Scenario | Checkpointer | Storage | Thread ID | Lifecycle |
|---|---|---|---|---|
conversation_memory ON | SqliteSaver | SQLite (checkpoints.db) | {user_id}:{chat_id}:{workflow_id} | Permanent -- conversation history persists |
conversation_memory OFF + has spawn_and_await | Redis checkpointer | Redis | exec:{execution_id}:{node_id} | Ephemeral -- auto-expires with 1h TTL |
| Neither | None | -- | -- | One-shot, no checkpointing |
workflow_create¶
Creates workflows programmatically from a YAML DSL specification. Supports two modes:
- Create from scratch -- Full DSL with trigger, steps, tools, and model declarations
- Fork and patch -- Start from an existing workflow, apply incremental modifications
See the Workflow DSL page for the full specification.
workflow_discover¶
Searches existing workflows with gap-analysis scoring against declared requirements:
# Agent calls workflow_discover with requirements
discover_workflows(
query="webhook verification",
requirements='{"trigger": "webhook", "tools": ["code"]}'
)
# Returns ranked results with gap analysis
[{
"slug": "moltbook-verify",
"match_score": 0.95,
"has": ["code", "webhook"],
"missing": [],
"extra": ["http_request"],
"success_rate": 0.92,
}]
Three-tier reuse decision:
| Match Score | Action |
|---|---|
| >= 0.95 | Reuse as-is -- spawn_and_await directly |
| >= 0.50 | Fork and patch -- workflow_create with based_on + patches |
| < 0.50 | Create from scratch -- workflow_create with full DSL |
Execution Walkthrough¶
A concrete trace showing the full architecture handling: "Read moltbook.com/skill.md and follow the instructions to join Moltbook."
Step 1: Agent Creates Epic¶
Tool: epic_create({
title: "Join Moltbook",
description: "Read moltbook.com/skill.md and follow join instructions",
tags: ["moltbook", "onboarding"]
})
-> { epic_id: "ep_01JKXYZ" }
Step 2: Research Task (Inline)¶
Tool: task_create({ epic_id: "ep_01JKXYZ", title: "Fetch and analyze skill.md" })
Tool: http_request({ url: "https://moltbook.com/skill.md" })
Tool: task_update({ task_id: "tk_01JKABC", status: "completed",
result_summary: "Join requires: register, submit profile, verify webhook" })
Step 3: Registration Task (Inline)¶
Tool: task_create({ epic_id: "ep_01JKXYZ", title: "Register with Moltbook API" })
Tool: http_request({ url: "https://moltbook.com/api/register", method: "POST", ... })
Tool: task_update({ task_id: "tk_01JKDEF", status: "completed" })
Step 4: Webhook Task (Dynamic Workflow Creation)¶
Tool: task_create({
epic_id: "ep_01JKXYZ",
title: "Set up webhook for verification",
depends_on: ["tk_01JKDEF"]
})
Tool: workflow_create({
dsl: "name: Moltbook Webhook\ntrigger:\n type: webhook\nsteps:\n ..."
})
Tool: task_update({ task_id: "tk_01JKGHI", status: "completed",
workflow_slug: "moltbook-verify" })
Step 5: Epic Completed¶
Tool: epic_update({
epic_id: "ep_01JKXYZ",
status: "completed",
result_summary: "Registered with Moltbook. Webhook endpoint live."
})
Next Time: Reuse¶
When a similar request arrives ("Set up a webhook for ServiceX"):
Tool: epic_search({ query: "webhook verification" })
-> Returns ep_01JKXYZ with workflow "moltbook-verify"
Tool: workflow_discover({ requirements: {"trigger": "webhook", "tools": ["code"]} })
-> [{"slug": "moltbook-verify", "match_score": 0.95}]
Tool: workflow_create({
dsl: "based_on: moltbook-verify\npatches:\n - action: update_prompt\n ..."
})
Fork instead of reinvent. The proven structure is preserved.
Integration Points¶
Orchestrator Cost Sync¶
After a child execution completes, token usage and USD costs are synced from the execution to the task, then rolled up to the epic:
Child execution completes
-> _persist_execution_costs() writes to WorkflowExecution
-> _sync_task_costs() writes to Task
-> sync_epic_costs() rolls up to Epic
-> _check_budget() gates next task
WebSocket Events¶
New channels and event types for real-time task tracking:
- Channel
epic:<epic_id>carriestask_created,task_updated,epic_updatedevents - Agents subscribed to epic channels receive real-time status updates
Feedback Loop¶
After an epic completes successfully, it is persisted as procedural memory:
memory_write(
key=f"procedure:{epic.id}",
value={
"goal": epic.title,
"tags": epic.tags,
"workflow_ids": [t.workflow_id for t in epic.tasks],
"success_rate": epic.completed_tasks / epic.total_tasks,
},
fact_type="procedure"
)
Future agents discover successful patterns via both the registry (structured query through epic_search) and memory (semantic search through memory_read).