Skip to main content

Backend Architecture

The Ganimede backend is a Python application built on Starlette with asynchronous managers for notebook operations, kernel execution, and real-time synchronization.

Application Structure

The backend runs two servers:
  1. Starlette App (port 8000): HTTP routes and WebSocket endpoints
  2. Ypy WebSocket Server (port 1234): Yjs CRDT synchronization
# main.py structure
api/ganimede/
├── main.py              # Application entry point
└── managers/
    ├── RouteManager.py  # HTTP/WebSocket routing
    ├── ConfigManager.py # Configuration endpoint
    ├── Notebook.py      # Notebook state and operations
    ├── Kernel.py        # Jupyter kernel client
    ├── Comms.py         # WebSocket communication
    └── Cell.py          # Cell operations (not shown in main.py)

Main Application (main.py)

The main application coordinates all components:
import y_py as Y
from starlette.applications import Starlette
from ypy_websocket import ASGIServer, WebsocketServer, WebsocketProvider

# Create Starlette app
app = Starlette(debug=True)

# Create Yjs server and document
websocket_server = WebsocketServer()
yapp = ASGIServer(websocket_server)
ydoc = Y.YDoc()
Startup sequence (main.py:47):
@app.on_event("startup")
async def on_startup():
    # 1. Start Ypy WebSocket Server
    task = loop.create_task(server_task())
    
    # 2. Connect as client to Ypy server
    websocket = await connect("ws://localhost:1234/g-y-room")
    websocket_provider = WebsocketProvider(ydoc, websocket)
    await websocket_provider.start()
    
    # 3. Initialize managers
    route_manager = RouteManager(app)
    config_handler = ConfigHandler()
    comms = Comms()
    kernel = Kernel(comms, ydoc)
    notebook = Notebook(
        notebook_path=os.environ["NOTEBOOK_LOC"],
        kernel=kernel,
        comms=comms,
        ydoc=ydoc
    )
    
    # 4. Register routes
    route_manager.add_route("/config", config_handler.get, ["GET"])
    route_manager.add_websocket_route("/", comms.endpoint)

Manager Components

RouteManager

Location: managers/RouteManager.py Responsibilities:
  • Serve frontend static files (index.html, assets)
  • Register HTTP and WebSocket routes
  • Handle development vs production file paths
class RouteManager:
    def __init__(self, app):
        self.app = app
        self.default_route()  # / → index.html
        
    def add_route(self, path, handler, methods, name):
        self.app.add_route(path, handler, methods=methods, name=name)
        
    def add_websocket_route(self, path, handler, name):
        self.app.add_websocket_route(path, handler, name=name)
Routes:
  • / - Serve index.html
  • /config - Return configuration JSON
  • / (WebSocket) - Comms endpoint
  • /assets/{path} - Static assets

ConfigManager

Location: managers/ConfigManager.py Responsibilities:
  • Provide frontend configuration (Monaco settings, grid snap, etc.)
class ConfigHandler:
    def __init__(self):
        self.data = {
            "monaco": {"fontSize": 13, "lineNumbers": "on"},
            "outputs": {"fontSize": 14},
            "grid": {"snap": {"x": 20, "y": 20}}
        }

Comms Manager

Location: managers/Comms.py Responsibilities:
  • WebSocket communication for control messages
  • Route messages to appropriate channel queues
  • Bidirectional message passing
class Comms:
    def __init__(self):
        self.websocket: WebSocket
        self.out_queue = Queue()
        self.channel_queues = {
            "comms": Queue(),
            "notebook": Queue(),
            "kernel": Queue(),
            "config": Queue(),
        }
Message flow (Comms.py:24):
async def endpoint(self, websocket: WebSocket):
    await websocket.accept()
    
    # Concurrent send/receive tasks
    async def send_task():
        while True:
            data = await self.out_queue.get()
            await websocket.send_json(data)
    
    async def receive_task():
        while True:
            data = await websocket.receive_json()
            # Route to appropriate channel
            self.channel_queues[data["channel"]].put_nowait(data)
    
    await asyncio.gather(send_task(), receive_task())
Message format:
{
  "channel": "notebook",
  "method": "queue_cell",
  "message": {"cell_id": "abc123"}
}

Kernel Manager

Location: managers/Kernel.py Responsibilities:
  • Start and manage Jupyter kernel instances
  • Execute code and process outputs
  • Handle kernel interrupts and restarts
  • Update kernel state in YDoc
class Kernel:
    def __init__(self, comms: Comms, ydoc: Y.YDoc):
        self.kernel_manager = AsyncKernelManager()
        self.kernel_client = None
        self.ydoc = ydoc
        self.ykernel = ydoc.get_map("kernel")
        self._busy = False
Code execution (Kernel.py:115):
async def execute(self, code: str, msg_queue: asyncio.Queue):
    # Start kernel if needed
    if self.kernel_client is None:
        await self.start_kernel()
    
    # Execute code
    async def execute_code(code: str):
        self.kernel_client.execute(code)
        reply = await self.kernel_client.get_shell_msg()
        msg_queue.put_nowait({
            "msg_type": "execute_reply",
            "execution_count": reply["content"]["execution_count"]
        })
    
    # Process IOPub messages
    async def proc_io_msgs():
        self.busy = True
        while self.busy:
            msg = await self.kernel_client.get_iopub_msg(timeout=1)
            if msg["msg_type"] == "status":
                if msg["content"]["execution_state"] == "idle":
                    self.busy = False
            else:
                msg_queue.put_nowait(self._msg_to_output(msg))
    
    await asyncio.gather(execute_code(code), proc_io_msgs())
Kernel state synchronization: The kernel busy state is synchronized via YDoc (Kernel.py:48):
@busy.setter
def busy(self, value):
    self._busy = value
    with self.ydoc.begin_transaction() as t:
        self.ykernel.set(t, "busy", value)

Notebook Manager

Location: managers/Notebook.py Responsibilities:
  • Load .ipynb files into YDoc
  • Manage cell graph structures (np_graph, pc_graph)
  • Queue cells for execution
  • Handle checkpoint (save) operations
  • Coordinate cell lifecycle
class Notebook:
    def __init__(self, kernel, comms, ydoc, notebook_path):
        self.kernel = kernel
        self.comms = comms
        self.ydoc = ydoc
        self.notebook_path = notebook_path
        
        # Graph structures
        self.np_graph = {}  # next-prev directed graph
        self.pc_graph = {}  # parent-children directed graph
        
        self.notebook_file = self._load_notebook()
        self.init_cells()
Cell initialization (Notebook.py:73): Loads cells from .ipynb into YDoc shared types:
def init_cells(self):
    self.ycells = self.ydoc.get_array("cells")
    
    for cell in self.notebook_file["cells"]:
        id = cell["id"]
        source = Y.YText(("".join(cell["source"])))
        outputs = Y.YArray(cell["outputs"])
        
        ycell = self.ydoc.get_map(id)
        with self.ydoc.begin_transaction() as t:
            ycell.set(t, "id", id)
            ycell.set(t, "type", cell["cell_type"])
            ycell.set(t, "source", source)
            ycell.set(t, "execution_count", cell.get("execution_count"))
            ycell.set(t, "outputs", outputs)
            ycell.set(t, "top", gm_metadata.get("top"))
            ycell.set(t, "left", gm_metadata.get("left"))
            ycell.set(t, "state", "idle")
            
            self.ycells.append(t, id)
Run queue (Notebook.py:258):
async def queue_cell(self, cell_id: str):
    self.run_queue.put_nowait(cell_id)
    self._change_cell_state(cell_id, "queued")
    with self.ydoc.begin_transaction() as t:
        self.ydoc.get_array("run_queue").append(t, cell_id)

async def run_queue_loop(self):
    while True:
        cell_id = await self.run_queue.get()
        self._change_cell_state(cell_id, "running")
        self._clear_outputs(cell_id)
        await self.run(cell_id)
        with self.ydoc.begin_transaction() as t:
            self.ydoc.get_array("run_queue").delete(t, 0)
Graph structures:
  • np_graph: Maps cell_id → [next_cell_ids] (sequential flow)
  • pc_graph: Maps parent_cell_id → [child_cell_ids] (tissue grouping)
These graphs are computed from heading cells (Notebook.py:144) and synchronized to YDoc maps.

YDoc Integration

The backend uses y_py (Python Yjs) to create shared types:
ydoc = Y.YDoc()

# Shared types
ycells = ydoc.get_array("cells")           # List of cell IDs
ynp_graph = ydoc.get_map("np_graph")       # Next-prev graph
ypc_graph = ydoc.get_map("pc_graph")       # Parent-children graph
ykernel = ydoc.get_map("kernel")           # Kernel state
yrun_queue = ydoc.get_array("run_queue")   # Execution queue

# Per-cell maps
ycell = ydoc.get_map(cell_id)
ycell.set(t, "source", Y.YText())          # Collaborative text
ycell.set(t, "outputs", Y.YArray())        # Output list
All updates use transactions:
with ydoc.begin_transaction() as t:
    ycell.set(t, "state", "running")
    outputs.append(t, output_data)

Async Patterns

The backend uses asyncio throughout:
  • Task creation: asyncio.create_task(self.listen_comms())
  • Queue-based communication: asyncio.Queue() for message passing
  • Concurrent operations: await asyncio.gather(task1, task2)
  • Event loops: asyncio.get_event_loop()
Each manager listens on its own queue:
async def listen_comms(self):
    while True:
        item = await self.comms_queue.get()
        method = item["method"]
        message = item.get("message")
        await getattr(self, method)(**message)

Key Patterns

  1. Manager-based architecture: Each concern (kernel, notebook, comms) is a separate manager
  2. YDoc as single source of truth: All state lives in YDoc shared types
  3. Queue-based messaging: Comms routes messages via asyncio queues
  4. Transaction-based updates: All YDoc changes use transactions for atomicity
  5. Async throughout: All I/O operations are async for concurrency

Build docs developers (and LLMs) love