Skip to main content
The MessageBus provides communication infrastructure for agents to send messages using point-to-point, broadcast, and pub/sub patterns.

Overview

MoFA provides two message bus implementations:
  • AgentBus - Kernel-level bus with serialization
  • SimpleMessageBus - Runtime-level bus for SimpleRuntime

AgentBus (Kernel)

The AgentBus from mofa-kernel provides low-level message routing with serialization support.

Creating a Bus

use mofa_kernel::bus::AgentBus;

let bus = AgentBus::new();

Communication Modes

pub enum CommunicationMode {
    PointToPoint(String),  // Target agent ID
    Broadcast,             // All agents
    PubSub(String),        // Topic name
}

register_channel

Register a communication channel for an agent.
agent_metadata
&AgentMetadata
Agent metadata
mode
CommunicationMode
Communication mode to register
use mofa_kernel::bus::CommunicationMode;

// Register for point-to-point
bus.register_channel(
    &agent_metadata,
    CommunicationMode::PointToPoint("sender-id".to_string())
).await?;

// Register for pub/sub
bus.register_channel(
    &agent_metadata,
    CommunicationMode::PubSub("events".to_string())
).await?;

// Broadcast doesn't need registration

send_message

Send a message through the bus.
sender_id
&str
Sender agent ID
mode
CommunicationMode
Communication mode
message
&AgentMessage
Message to send
use mofa_kernel::message::AgentMessage;

// Point-to-point
let message = AgentMessage::TaskRequest {
    task_id: "task-1".to_string(),
    content: "Process this".to_string(),
};

bus.send_message(
    "sender-1",
    CommunicationMode::PointToPoint("receiver-1".to_string()),
    &message
).await?;

// Broadcast
bus.send_message(
    "sender-1",
    CommunicationMode::Broadcast,
    &message
).await?;

// Pub/sub
bus.send_message(
    "sender-1",
    CommunicationMode::PubSub("tasks".to_string()),
    &message
).await?;

receive_message

Receive a message from the bus.
id
&str
Receiver agent ID
mode
CommunicationMode
Communication mode to receive from
// Point-to-point receive
if let Some(message) = bus.receive_message(
    "receiver-1",
    CommunicationMode::PointToPoint("sender-1".to_string())
).await? {
    println!("Received: {:?}", message);
}

// Broadcast receive
if let Some(message) = bus.receive_message(
    "receiver-1",
    CommunicationMode::Broadcast
).await? {
    println!("Broadcast: {:?}", message);
}

// Topic receive
if let Some(message) = bus.receive_message(
    "subscriber-1",
    CommunicationMode::PubSub("tasks".to_string())
).await? {
    println!("Topic message: {:?}", message);
}

unsubscribe_topic

Unsubscribe from a topic.
id
&str
Agent ID
topic
&str
Topic name
bus.unsubscribe_topic("subscriber-1", "tasks").await?;

AgentBus Example

use mofa_kernel::bus::{AgentBus, CommunicationMode};
use mofa_kernel::message::AgentMessage;
use mofa_kernel::agent::{AgentMetadata, AgentCapabilities, AgentState};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let bus = AgentBus::new();

    // Create agent metadata
    let sender_meta = AgentMetadata {
        id: "sender".to_string(),
        name: "Sender Agent".to_string(),
        description: None,
        version: None,
        capabilities: AgentCapabilities::default(),
        state: AgentState::Ready,
    };

    let receiver_meta = AgentMetadata {
        id: "receiver".to_string(),
        name: "Receiver Agent".to_string(),
        description: None,
        version: None,
        capabilities: AgentCapabilities::default(),
        state: AgentState::Ready,
    };

    // Register receiver for point-to-point
    bus.register_channel(
        &receiver_meta,
        CommunicationMode::PointToPoint("sender".to_string())
    ).await?;

    // Send message
    let message = AgentMessage::TaskRequest {
        task_id: "task-1".to_string(),
        content: "Hello!".to_string(),
    };

    bus.send_message(
        "sender",
        CommunicationMode::PointToPoint("receiver".to_string()),
        &message
    ).await?;

    // Receive message
    if let Some(received) = bus.receive_message(
        "receiver",
        CommunicationMode::PointToPoint("sender".to_string())
    ).await? {
        println!("Received: {:?}", received);
    }

    Ok(())
}

SimpleMessageBus (Runtime)

The SimpleMessageBus is used by SimpleRuntime and works with AgentEvent.

Creating a Bus

use mofa_runtime::SimpleMessageBus;

let bus = SimpleMessageBus::new();

register

Register an agent with event sender.
agent_id
&str
Agent ID
tx
Sender<AgentEvent>
Event sender channel
let (tx, rx) = tokio::sync::mpsc::channel(100);
bus.register("agent-1", tx).await;

subscribe

Subscribe agent to a topic.
agent_id
&str
Agent ID
topic
&str
Topic name
bus.subscribe("agent-1", "notifications").await;

send_to

Send event to specific agent.
target_id
&str
Target agent ID
event
AgentEvent
Event to send
let event = AgentEvent::TaskReceived(task);
bus.send_to("agent-2", event).await?;

broadcast

Broadcast event to all registered agents.
let event = AgentEvent::Shutdown;
bus.broadcast(event).await?;

publish

Publish event to topic subscribers.
topic
&str
Topic name
event
AgentEvent
Event to publish
let event = AgentEvent::Custom("update".to_string(), vec![]);
bus.publish("notifications", event).await?;

SimpleMessageBus Example

use mofa_runtime::SimpleMessageBus;
use mofa_kernel::message::{AgentEvent, TaskRequest};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let bus = Arc::new(SimpleMessageBus::new());

    // Create agents
    let (tx1, mut rx1) = tokio::sync::mpsc::channel(100);
    let (tx2, mut rx2) = tokio::sync::mpsc::channel(100);
    let (tx3, mut rx3) = tokio::sync::mpsc::channel(100);

    // Register agents
    bus.register("agent-1", tx1).await;
    bus.register("agent-2", tx2).await;
    bus.register("agent-3", tx3).await;

    // Subscribe to topics
    bus.subscribe("agent-1", "tasks").await;
    bus.subscribe("agent-2", "tasks").await;
    bus.subscribe("agent-3", "notifications").await;

    // Spawn receivers
    tokio::spawn(async move {
        while let Some(event) = rx1.recv().await {
            println!("[Agent 1] Received: {:?}", event);
        }
    });

    tokio::spawn(async move {
        while let Some(event) = rx2.recv().await {
            println!("[Agent 2] Received: {:?}", event);
        }
    });

    tokio::spawn(async move {
        while let Some(event) = rx3.recv().await {
            println!("[Agent 3] Received: {:?}", event);
        }
    });

    // Point-to-point
    bus.send_to(
        "agent-1",
        AgentEvent::Custom("direct".to_string(), vec![])
    ).await?;

    // Publish to topic (agent-1 and agent-2 receive)
    let task_event = AgentEvent::TaskReceived(TaskRequest {
        task_id: "task-1".to_string(),
        content: "Work".to_string(),
    });
    bus.publish("tasks", task_event).await?;

    // Broadcast (all agents receive)
    bus.broadcast(AgentEvent::Shutdown).await?;

    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    Ok(())
}

Message Serialization

AgentBus uses bincode for message serialization:
// Serialization happens automatically
let message_bytes = bincode::serialize(&message)?;

// Deserialization happens automatically
let message: AgentMessage = bincode::deserialize(&bytes)?;

Error Handling

use mofa_kernel::bus::BusError;

match bus.send_message(sender_id, mode, &message).await {
    Ok(_) => println!("Sent"),
    Err(BusError::AgentNotRegistered(id)) => {
        eprintln!("Agent {} not registered", id);
    }
    Err(BusError::ChannelNotFound(msg)) => {
        eprintln!("Channel error: {}", msg);
    }
    Err(BusError::SendFailed(msg)) => {
        eprintln!("Send failed: {}", msg);
    }
    Err(BusError::Serialization(msg)) => {
        eprintln!("Serialization error: {}", msg);
    }
}

Best Practices

Channel Sizing

// Choose appropriate buffer size
let (tx, rx) = tokio::sync::mpsc::channel(1000); // Large buffer
let (tx, rx) = tokio::sync::mpsc::channel(10);   // Small buffer

Error Recovery

// Retry on failure
for attempt in 1..=3 {
    match bus.send_message(sender, mode, &message).await {
        Ok(_) => break,
        Err(e) if attempt < 3 => {
            eprintln!("Attempt {} failed: {}", attempt, e);
            tokio::time::sleep(Duration::from_millis(100)).await;
        }
        Err(e) => return Err(e.into()),
    }
}

Topic Management

// Subscribe
bus.subscribe("agent-1", "events").await;

// Use topic
bus.publish("events", event).await?;

// Cleanup when done
bus.unsubscribe_topic("agent-1", "events").await?;

Performance Considerations

  • AgentBus: Adds serialization overhead, suitable for cross-process communication
  • SimpleMessageBus: Zero-copy in-process messaging, faster for single-process
  • Use bounded channels to prevent memory issues
  • Consider batch operations for high-throughput scenarios

Build docs developers (and LLMs) love