The Query Engine provides a high-level API for traversing and analyzing the knowledge graph. It supports complex operations like dependency analysis, impact assessment, ownership tracking, and pathfinding.
List nodes by type with optional property filters:
graph/query.py
def get_nodes(self, node_type: str = None, filters: Dict[str, Any] = None) -> List[Dict[str, Any]]: """List nodes by type with optional filters.""" return self.storage.get_nodes(node_type, filters)
Example Usage:
# Get all servicesservices = query_engine.get_nodes('service')# Get services owned by a specific teamteam_services = query_engine.get_nodes( 'service', filters={'team': 'payments'})
Search nodes by any property value:
graph/query.py
def find_nodes_by_property(self, property_name: str, property_value: Any) -> List[Dict[str, Any]]: """Find nodes by a specific property value.""" query = f""" MATCH (n) WHERE n.{property_name} = $value RETURN n ORDER BY n.type, n.name """ result = self.storage.execute_cypher(query, {'value': property_value}) return [dict(record['n']) for record in result]
Example Usage:
# Find all nodes with port 8080nodes = query_engine.find_nodes_by_property('port', 8080)
Get all transitive dependencies (what this node depends on):
graph/query.py
def downstream(self, node_id: str, max_depth: int = 10, edge_types: List[str] = None) -> List[Dict[str, Any]]: """ Get all transitive dependencies (what this node depends on). Args: node_id: Starting node ID max_depth: Maximum traversal depth to prevent infinite loops edge_types: Optional list of edge types to follow """ edge_filter = "" if edge_types: edge_types_upper = [et.upper() for et in edge_types] edge_filter = f"WHERE type(r) IN {edge_types_upper}" query = f""" MATCH path = (start {{id: $node_id}})-[r*1..{max_depth}]->(dependency) {edge_filter} WITH dependency, min(length(path)) as distance RETURN dependency, distance ORDER BY distance, dependency.name """ result = self.storage.execute_cypher(query, {'node_id': node_id}) return [ { **dict(record['dependency']), 'distance': record['distance'] } for record in result ]
Example Usage:
# Get all dependencies of payment-servicedeps = query_engine.downstream('service:payment-service')# Returns databases, caches, and other services it depends on# Get only direct service calls (not database uses)service_deps = query_engine.downstream( 'service:payment-service', edge_types=['calls'])
Get all transitive dependents (what depends on this node):
graph/query.py
def upstream(self, node_id: str, max_depth: int = 10, edge_types: List[str] = None) -> List[Dict[str, Any]]: """ Get all transitive dependents (what depends on this node). Args: node_id: Starting node ID max_depth: Maximum traversal depth to prevent infinite loops edge_types: Optional list of edge types to follow """ edge_filter = "" if edge_types: edge_types_upper = [et.upper() for et in edge_types] edge_filter = f"WHERE type(r) IN {edge_types_upper}" query = f""" MATCH path = (dependent)-[r*1..{max_depth}]->(start {{id: $node_id}}) {edge_filter} WITH dependent, min(length(path)) as distance RETURN dependent, distance ORDER BY distance, dependent.name """
Example Usage:
# Find what depends on the users databasedependents = query_engine.upstream('database:users-db')# Returns all services that use this database# Find direct users only (distance = 1)direct_users = [ dep for dep in dependents if dep['distance'] == 1]
Comprehensive impact analysis combining upstream and downstream dependencies:
graph/query.py
def blast_radius(self, node_id: str, max_depth: int = 10) -> Dict[str, Any]: """ Full impact analysis - upstream + downstream + affected teams. Args: node_id: Starting node ID max_depth: Maximum traversal depth """ # Get the starting node start_node = self.get_node(node_id) if not start_node: return {'error': f'Node {node_id} not found'} # Get upstream and downstream dependencies upstream_nodes = self.upstream(node_id, max_depth) downstream_nodes = self.downstream(node_id, max_depth) # Get all affected node IDs affected_node_ids = {node_id} affected_node_ids.update(node['id'] for node in upstream_nodes) affected_node_ids.update(node['id'] for node in downstream_nodes) # Find teams that own any of the affected nodes affected_teams = set() for affected_id in affected_node_ids: query = """ MATCH (team {type: 'team'})-[:OWNS]->(node {id: $node_id}) RETURN team """ team_result = self.storage.execute_cypher(query, {'node_id': affected_id}) for record in team_result: team = dict(record['team']) affected_teams.add(team['name']) return { 'center_node': start_node, 'upstream_dependencies': upstream_nodes, 'downstream_dependencies': downstream_nodes, 'affected_teams': list(affected_teams), 'total_affected_nodes': len(affected_node_ids), 'summary': f"If {start_node['name']} fails, it could affect {len(upstream_nodes)} upstream and {len(downstream_nodes)} downstream components, impacting {len(affected_teams)} teams." }
Example Usage:
# Analyze impact of redis failureimpact = query_engine.blast_radius('cache:redis-main')print(impact['summary'])# "If redis-main fails, it could affect 5 upstream and 0 downstream components, impacting 3 teams."print(impact['affected_teams'])# ['payments', 'checkout', 'notifications']
Response Structure:
{ 'center_node': {...}, 'upstream_dependencies': [...], # What depends on this 'downstream_dependencies': [...], # What this depends on 'affected_teams': ['team1', 'team2'], 'total_affected_nodes': 12, 'summary': 'Human-readable impact summary'}
def get_owner(self, node_id: str) -> Optional[Dict[str, Any]]: """Find the team that owns a given node.""" query = """ MATCH (team {type: 'team'})-[:OWNS]->(node {id: $node_id}) RETURN team """ result = self.storage.execute_cypher(query, {'node_id': node_id}) if result: return dict(result[0]['team']) return None
def get_team_assets(self, team_name: str) -> List[Dict[str, Any]]: """Get all assets owned by a team.""" query = """ MATCH (team {type: 'team', name: $team_name})-[:OWNS]->(asset) RETURN asset ORDER BY asset.type, asset.name """ result = self.storage.execute_cypher(query, {'team_name': team_name}) return [dict(record['asset']) for record in result]
Example Usage:
assets = query_engine.get_team_assets('payments')# Returns all services, databases, etc. owned by the payments team
def get_services_using_database(self, database_name: str) -> List[Dict[str, Any]]: """Find all services that use a specific database.""" query = """ MATCH (service)-[:USES]->(db {name: $db_name}) WHERE db.type IN ['database', 'cache'] RETURN service ORDER BY service.name """ result = self.storage.execute_cypher(query, {'db_name': database_name}) return [dict(record['service']) for record in result]
Example Usage:
services = query_engine.get_services_using_database('users-db')# Returns all services that have a USES edge to the users database
Most traversal queries accept a max_depth parameter to prevent infinite loops:
# Default: traverse up to 10 levelsdownstream_deps = query_engine.downstream('service:api', max_depth=10)# Limit to direct dependencies onlydirect_deps = query_engine.downstream('service:api', max_depth=1)
Very large max_depth values on highly connected graphs can cause performance issues. Use reasonable limits.
For advanced use cases, execute custom Cypher directly:
from graph.storage import GraphStoragestorage = GraphStorage()# Find all services with more than 3 dependenciesquery = """MATCH (service {type: 'service'})-[r]->(dependency)WITH service, count(r) as dep_countWHERE dep_count > 3RETURN service, dep_countORDER BY dep_count DESC"""results = storage.execute_cypher(query)
# Get all services that depend on users-dbaffected = query_engine.upstream('database:users-db')# Get teams to notifyteams = set()for service in affected: owner = query_engine.get_owner(service['id']) if owner: teams.add(owner['name'])
Audit team ownership coverage
# Get all servicesall_services = query_engine.get_nodes('service')# Find unowned servicesunowned = []for service in all_services: owner = query_engine.get_owner(service['id']) if not owner: unowned.append(service['name'])
Calculate deployment risk
# Get full blast radiusimpact = query_engine.blast_radius('service:payment-service')# Risk score based on affected componentsrisk_score = ( len(impact['upstream_dependencies']) * 2 + # Breaking changes len(impact['downstream_dependencies']) + # Dependency issues len(impact['affected_teams']) * 3 # Coordination overhead)
Map service communication
# Get all service-to-service callsall_services = query_engine.get_nodes('service')service_graph = {}for service in all_services: deps = query_engine.downstream( service['id'], edge_types=['calls'], max_depth=1 ) service_graph[service['name']] = [d['name'] for d in deps]