A worker agent is a specialized AI agent built to perform a specific business process. It’s not a general-purpose assistant — it’s purpose-built, like hiring someone for a defined role. A sales outreach agent knows how to research prospects, craft personalized messages, and follow up. A support triage agent knows how to categorize tickets, pull customer context, and route to the right team.
In Hive, a Coding Agent (like Claude Code or Cursor) generates worker agents from a natural language goal description. You describe what you want the agent to do, and the coding agent produces the graph, nodes, edges, and configuration. The worker agent is the thing that actually runs.
Creating a worker agent
Worker agents are defined by their graph structure and goal:
examples/templates/deep_research_agent/agent.py
from framework.graph import Goal, GraphSpec
from framework.runtime.agent_runtime import AgentRuntime, create_agent_runtime
class DeepResearchAgent:
"""Deep Research Agent — 4-node pipeline with user checkpoints."""
def __init__(self, config=None):
self.config = config or default_config
self.goal = goal
self.nodes = nodes
self.edges = edges
self._agent_runtime: AgentRuntime | None = None
def _build_graph(self) -> GraphSpec:
"""Build the GraphSpec."""
return GraphSpec(
id="deep-research-agent-graph",
goal_id=self.goal.id,
version="1.0.0",
entry_node="intake",
nodes=self.nodes,
edges=self.edges,
default_model="claude-sonnet-4-5",
)
Sessions
A session is a single execution of a worker agent against a specific input. If your outreach agent processes 50 prospects, that’s 50 sessions.
Each session is isolated — it has its own shared memory, its own execution state, and its own history. This matters because sessions can be long-running. An agent might start researching a prospect, pause for human approval, wait hours or days, and then resume to send the message. The session preserves everything across that gap.
examples/templates/deep_research_agent/agent.py
async def trigger_and_wait(
self,
entry_point: str = "default",
input_data: dict | None = None,
timeout: float | None = None,
session_state: dict | None = None,
) -> ExecutionResult | None:
"""Execute the graph and wait for completion."""
if self._agent_runtime is None:
raise RuntimeError("Agent not started. Call start() first.")
return await self._agent_runtime.trigger_and_wait(
entry_point_id=entry_point,
input_data=input_data or {},
session_state=session_state,
)
Sessions also make debugging straightforward. Every decision the agent made, every tool it called, every retry it attempted — it’s all captured in the session. When something goes wrong, you can trace exactly what happened.
Iterations
Within a session, nodes (especially event_loop nodes) work in iterations. An iteration is one turn of the loop: the LLM reasons about the current state, possibly calls tools, observes results, and produces output. Then the judge evaluates: is this good enough?
If not, the node iterates again. The LLM sees what went wrong and adjusts its approach. This is how agents self-correct without human intervention — through rapid iteration within a single node, not by restarting the whole process.
core/framework/graph/event_loop_node.py
class SubagentJudge:
"""Judge for subagent execution.
Accepts immediately when all required output keys are filled.
On RETRY, reminds the subagent of its task with progressive urgency.
"""
async def evaluate(self, context: dict[str, Any]) -> JudgeVerdict:
missing = context.get("missing_keys", [])
if not missing:
return JudgeVerdict(action="ACCEPT")
iteration = context.get("iteration", 0)
remaining = self._max_iterations - iteration - 1
if remaining <= 3:
urgency = (
f"URGENT: Only {remaining} iterations left. "
f"Stop all other work and call set_output NOW for: {missing}"
)
else:
urgency = f"Missing output keys: {missing}. Use set_output to provide them."
return JudgeVerdict(action="RETRY", feedback=f"Your task: {self._task}\n{urgency}")
Iterations have limits. You set a maximum per node to prevent runaway loops:
examples/templates/deep_research_agent/agent.py
graph = GraphSpec(
id="deep-research-agent-graph",
loop_config={
"max_iterations": 100,
"max_tool_calls_per_turn": 30,
"max_history_tokens": 32000,
},
)
If a node can’t produce acceptable output within its iteration budget, it fails and the graph’s error-handling edges take over.
Headless execution
A lot of business processes need to run continuously — monitoring inboxes, processing incoming leads, watching for events. These agents run headless: no UI, no human sitting at a terminal, just the agent doing its job in the background.
Headless doesn’t mean unsupervised. HITL (human-in-the-loop) nodes still pause execution and wait for human input when the agent hits a decision it shouldn’t make alone. The difference is that instead of a live conversation, the agent sends a notification, waits for a response through whatever channel you’ve configured, and resumes when the human weighs in.
This is the operational model Hive is designed for: agents that run 24/7 as part of your business infrastructure, with humans stepping in only when needed. The goal is to automate the routine and escalate the exceptions.
The runtime
The worker agent runtime manages the lifecycle: starting sessions, executing the graph, handling pauses and resumes, tracking costs, and collecting metrics. It coordinates everything the agent needs — LLM access, tool execution, shared memory, credential management — so individual nodes can focus on their specific job.
core/framework/runtime/agent_runtime.py
class AgentRuntime:
"""Runtime for worker agents.
Manages:
- Session lifecycle and isolation
- Graph execution via ExecutionStream
- Cost tracking and budget enforcement
- Decision logging for evolution
- Crash recovery and checkpointing
"""
Key things the runtime handles:
Cost tracking
Every LLM call is metered. You set budget constraints on the goal, and the runtime enforces them. An agent can’t silently burn through your API credits.
Decision logging
Every meaningful choice the agent makes is recorded: what it was trying to do, what options it considered, what it chose, and what happened. This isn’t just for debugging — it’s the raw material that evolution uses to improve future generations.
Event streaming
The runtime emits events as the agent works. You can wire these up to dashboards, logs, or alerting systems to monitor agents in real time.
events = [
"node.started",
"node.completed",
"node.failed",
"tool.called",
"decision.made",
"hitl.paused",
"hitl.resumed",
]
Crash recovery
If execution is interrupted (process crash, deployment, anything), the runtime can resume from the last checkpoint. Conversation state and memory are persisted, so the agent picks up where it left off rather than starting over.
examples/templates/deep_research_agent/agent.py
checkpoint_config = CheckpointConfig(
enabled=True,
checkpoint_on_node_start=False,
checkpoint_on_node_complete=True,
checkpoint_max_age_days=7,
async_checkpoint=True,
)
self._agent_runtime = create_agent_runtime(
graph=self._graph,
goal=self.goal,
storage_path=self._storage_path,
checkpoint_config=checkpoint_config,
)
Starting and stopping agents
Worker agents have a simple lifecycle:
examples/templates/deep_research_agent/agent.py
async def start(self, mock_mode=False) -> None:
"""Set up and start the agent runtime."""
if self._agent_runtime is None:
self._setup(mock_mode=mock_mode)
if not self._agent_runtime.is_running:
await self._agent_runtime.start()
async def stop(self) -> None:
"""Stop the agent runtime and clean up."""
if self._agent_runtime and self._agent_runtime.is_running:
await self._agent_runtime.stop()
self._agent_runtime = None
async def run(self, context: dict, mock_mode=False) -> ExecutionResult:
"""Run the agent (convenience method for single execution)."""
await self.start(mock_mode=mock_mode)
try:
result = await self.trigger_and_wait("default", context)
return result or ExecutionResult(success=False, error="Execution timeout")
finally:
await self.stop()
Validation
Worker agents can validate their own structure:
examples/templates/deep_research_agent/agent.py
def validate(self):
"""Validate agent structure."""
errors = []
warnings = []
node_ids = {node.id for node in self.nodes}
for edge in self.edges:
if edge.source not in node_ids:
errors.append(f"Edge {edge.id}: source '{edge.source}' not found")
if edge.target not in node_ids:
errors.append(f"Edge {edge.id}: target '{edge.target}' not found")
if self.entry_node not in node_ids:
errors.append(f"Entry node '{self.entry_node}' not found")
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
}
The big picture
The worker agent model is Hive’s answer to a simple question: how do you run AI agents like you’d run a team?
You hire for a role (define the goal), you onboard them with context (provide tools, credentials, domain knowledge), you set expectations (success criteria and constraints), you let them work independently (headless execution), and you check in when something unusual comes up (HITL). When they’re not performing well, you don’t debug them line by line — you evolve them (see Evolution).