Skip to main content

Overview

The Query Engine provides a high-level API for traversing and analyzing the knowledge graph. It supports complex operations like dependency analysis, impact assessment, ownership tracking, and pathfinding.

QueryEngine Class

The QueryEngine class (graph/query.py:12) wraps the storage layer with domain-specific query methods:
graph/query.py
class QueryEngine:
    """Graph query engine for complex traversals and analysis."""
    
    def __init__(self, storage: GraphStorage):
        self.storage = storage
The query engine uses Cypher (Neo4j’s query language) under the hood but provides Python methods for common operations.

Query Types

The engine supports several categories of queries:

Retrieval

Get individual nodes or lists of nodes by type and filters.

Traversal

Navigate relationships to find dependencies and dependents.

Analysis

Perform impact analysis, pathfinding, and ownership queries.

Node Retrieval

Retrieve a single node by its unique ID:
graph/query.py
def get_node(self, node_id: str) -> Optional[Dict[str, Any]]:
    """Retrieve single node by ID."""
    return self.storage.get_node(node_id)
Example Usage:
node = query_engine.get_node('service:payment-service')
# Returns:
# {
#   'id': 'service:payment-service',
#   'type': 'service',
#   'name': 'payment-service',
#   'team': 'payments',
#   'port': 8083
# }

Dependency Traversal

Get all transitive dependencies (what this node depends on):
graph/query.py
def downstream(self, node_id: str, max_depth: int = 10, 
               edge_types: List[str] = None) -> List[Dict[str, Any]]:
    """
    Get all transitive dependencies (what this node depends on).
    
    Args:
        node_id: Starting node ID
        max_depth: Maximum traversal depth to prevent infinite loops
        edge_types: Optional list of edge types to follow
    """
    edge_filter = ""
    if edge_types:
        edge_types_upper = [et.upper() for et in edge_types]
        edge_filter = f"WHERE type(r) IN {edge_types_upper}"
    
    query = f"""
    MATCH path = (start {{id: $node_id}})-[r*1..{max_depth}]->(dependency)
    {edge_filter}
    WITH dependency, min(length(path)) as distance
    RETURN dependency, distance
    ORDER BY distance, dependency.name
    """
    
    result = self.storage.execute_cypher(query, {'node_id': node_id})
    return [
        {
            **dict(record['dependency']),
            'distance': record['distance']
        }
        for record in result
    ]
Example Usage:
# Get all dependencies of payment-service
deps = query_engine.downstream('service:payment-service')
# Returns databases, caches, and other services it depends on

# Get only direct service calls (not database uses)
service_deps = query_engine.downstream(
    'service:payment-service',
    edge_types=['calls']
)
Results Include Distance:
# [
#   {'id': 'database:payments-db', 'type': 'database', 'distance': 1},
#   {'id': 'service:auth-service', 'type': 'service', 'distance': 1},
#   {'id': 'database:users-db', 'type': 'database', 'distance': 2}
# ]

Impact Analysis

Comprehensive impact analysis combining upstream and downstream dependencies:
graph/query.py
def blast_radius(self, node_id: str, max_depth: int = 10) -> Dict[str, Any]:
    """
    Full impact analysis - upstream + downstream + affected teams.
    
    Args:
        node_id: Starting node ID
        max_depth: Maximum traversal depth
    """
    # Get the starting node
    start_node = self.get_node(node_id)
    if not start_node:
        return {'error': f'Node {node_id} not found'}
    
    # Get upstream and downstream dependencies
    upstream_nodes = self.upstream(node_id, max_depth)
    downstream_nodes = self.downstream(node_id, max_depth)
    
    # Get all affected node IDs
    affected_node_ids = {node_id}
    affected_node_ids.update(node['id'] for node in upstream_nodes)
    affected_node_ids.update(node['id'] for node in downstream_nodes)
    
    # Find teams that own any of the affected nodes
    affected_teams = set()
    for affected_id in affected_node_ids:
        query = """
        MATCH (team {type: 'team'})-[:OWNS]->(node {id: $node_id})
        RETURN team
        """
        team_result = self.storage.execute_cypher(query, {'node_id': affected_id})
        for record in team_result:
            team = dict(record['team'])
            affected_teams.add(team['name'])
    
    return {
        'center_node': start_node,
        'upstream_dependencies': upstream_nodes,
        'downstream_dependencies': downstream_nodes,
        'affected_teams': list(affected_teams),
        'total_affected_nodes': len(affected_node_ids),
        'summary': f"If {start_node['name']} fails, it could affect {len(upstream_nodes)} upstream and {len(downstream_nodes)} downstream components, impacting {len(affected_teams)} teams."
    }
Example Usage:
# Analyze impact of redis failure
impact = query_engine.blast_radius('cache:redis-main')

print(impact['summary'])
# "If redis-main fails, it could affect 5 upstream and 0 downstream components, impacting 3 teams."

print(impact['affected_teams'])
# ['payments', 'checkout', 'notifications']
Response Structure:
{
    'center_node': {...},
    'upstream_dependencies': [...],      # What depends on this
    'downstream_dependencies': [...],    # What this depends on
    'affected_teams': ['team1', 'team2'],
    'total_affected_nodes': 12,
    'summary': 'Human-readable impact summary'
}

Pathfinding

Find the shortest path between two nodes:
graph/query.py
def path(self, from_id: str, to_id: str, max_depth: int = 10) -> Dict[str, Any]:
    """
    Find shortest path between two nodes.
    
    Args:
        from_id: Source node ID
        to_id: Target node ID
        max_depth: Maximum path length to search
    """
    query = f"""
    MATCH path = shortestPath((start {{id: $from_id}})-[*1..{max_depth}]-(end {{id: $to_id}}))
    RETURN [node in nodes(path) | {{id: node.id, name: node.name, type: node.type}}] as nodes,
           [rel in relationships(path) | type(rel)] as relationships,
           length(path) as path_length
    """
    
    result = self.storage.execute_cypher(query, {
        'from_id': from_id,
        'to_id': to_id
    })
    
    if result:
        record = result[0]
        return {
            'nodes': record['nodes'],
            'relationships': record['relationships'],
            'path_length': record['path_length'],
            'path_description': self._format_path_description(
                record['nodes'], 
                record['relationships']
            )
        }
    
    return {
        'nodes': [],
        'relationships': [],
        'path_length': 0,
        'path_description': f"No path found between {from_id} and {to_id}"
    }
Example Usage:
# How does frontend connect to payments database?
path_result = query_engine.path(
    'service:frontend',
    'database:payments-db'
)

print(path_result['path_description'])
# "frontend (service) --calls-> api-gateway (service) --calls-> payment-service (service) --uses-> payments-db (database)"

print(path_result['path_length'])
# 3
Response Structure:
{
    'nodes': [
        {'id': 'service:frontend', 'name': 'frontend', 'type': 'service'},
        {'id': 'service:api-gateway', 'name': 'api-gateway', 'type': 'service'},
        # ...
    ],
    'relationships': ['CALLS', 'CALLS', 'USES'],
    'path_length': 3,
    'path_description': 'Human-readable path'
}

Ownership Queries

Find the team that owns a given node:
graph/query.py
def get_owner(self, node_id: str) -> Optional[Dict[str, Any]]:
    """Find the team that owns a given node."""
    query = """
    MATCH (team {type: 'team'})-[:OWNS]->(node {id: $node_id})
    RETURN team
    """
    
    result = self.storage.execute_cypher(query, {'node_id': node_id})
    if result:
        return dict(result[0]['team'])
    return None
Example Usage:
owner = query_engine.get_owner('service:payment-service')
# Returns:
# {
#   'id': 'team:payments',
#   'type': 'team',
#   'name': 'payments',
#   'lead': 'Alice Smith',
#   'slack_channel': '#team-payments'
# }

Specialized Queries

Find all services that use a specific database:
graph/query.py
def get_services_using_database(self, database_name: str) -> List[Dict[str, Any]]:
    """Find all services that use a specific database."""
    query = """
    MATCH (service)-[:USES]->(db {name: $db_name})
    WHERE db.type IN ['database', 'cache']
    RETURN service
    ORDER BY service.name
    """
    
    result = self.storage.execute_cypher(query, {'db_name': database_name})
    return [dict(record['service']) for record in result]
Example Usage:
services = query_engine.get_services_using_database('users-db')
# Returns all services that have a USES edge to the users database

Query Parameters

Max Depth

Most traversal queries accept a max_depth parameter to prevent infinite loops:
# Default: traverse up to 10 levels
downstream_deps = query_engine.downstream('service:api', max_depth=10)

# Limit to direct dependencies only
direct_deps = query_engine.downstream('service:api', max_depth=1)
Very large max_depth values on highly connected graphs can cause performance issues. Use reasonable limits.

Edge Type Filtering

Traversal queries support filtering by edge types:
# Follow only CALLS relationships
service_calls = query_engine.downstream(
    'service:api',
    edge_types=['calls']
)

# Follow CALLS and USES relationships
all_deps = query_engine.downstream(
    'service:api',
    edge_types=['calls', 'uses']
)

Performance Considerations

Indexed Lookups

Node lookups by ID are fast due to Neo4j indexing on the id property.

Traversal Optimization

Neo4j optimizes graph traversals using relationship pointers, not table scans.

Distance Calculation

The min(length(path)) aggregation ensures shortest distances are returned.

Query Caching

Frequently executed queries benefit from Neo4j’s query plan cache.

Custom Cypher Queries

For advanced use cases, execute custom Cypher directly:
from graph.storage import GraphStorage

storage = GraphStorage()

# Find all services with more than 3 dependencies
query = """
MATCH (service {type: 'service'})-[r]->(dependency)
WITH service, count(r) as dep_count
WHERE dep_count > 3
RETURN service, dep_count
ORDER BY dep_count DESC
"""

results = storage.execute_cypher(query)

Query Examples by Use Case

# Get all services that depend on users-db
affected = query_engine.upstream('database:users-db')

# Get teams to notify
teams = set()
for service in affected:
    owner = query_engine.get_owner(service['id'])
    if owner:
        teams.add(owner['name'])
# Get all services
all_services = query_engine.get_nodes('service')

# Find unowned services
unowned = []
for service in all_services:
    owner = query_engine.get_owner(service['id'])
    if not owner:
        unowned.append(service['name'])
# Get full blast radius
impact = query_engine.blast_radius('service:payment-service')

# Risk score based on affected components
risk_score = (
    len(impact['upstream_dependencies']) * 2 +  # Breaking changes
    len(impact['downstream_dependencies']) +     # Dependency issues
    len(impact['affected_teams']) * 3            # Coordination overhead
)
# Get all service-to-service calls
all_services = query_engine.get_nodes('service')

service_graph = {}
for service in all_services:
    deps = query_engine.downstream(
        service['id'],
        edge_types=['calls'],
        max_depth=1
    )
    service_graph[service['name']] = [d['name'] for d in deps]

Next Steps

Natural Language

Learn how natural language queries are parsed into graph operations.

Knowledge Graph

Review the underlying graph data model.

Build docs developers (and LLMs) love