Skip to main content

Overview

Orchestration frameworks enable you to build sophisticated AI applications by connecting language models with external data sources, tools, and APIs. This guide covers using LangChain and LlamaIndex to orchestrate Gemini models in production environments.
Orchestration frameworks help you move beyond simple question-answering to build data-aware and agentic applications that can reason and take action.

LangChain Integration

LangChain provides modular abstractions for working with LLMs, making it easy to swap components and build complex applications.

Setting Up LangChain with Gemini

1

Install Dependencies

Install the LangChain packages with Google integrations:
pip install langchain langchain-core langchain-google-genai \
  langchain-community langchain-text-splitters
2

Initialize the Model

Configure LangChain to use Gemini models:
from langchain_google_genai import ChatGoogleGenerativeAI
import os

PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT")
LOCATION = "us-central1"

# Initialize Gemini model
llm = ChatGoogleGenerativeAI(
    model="gemini-2.0-flash-exp",
    project=PROJECT_ID,
    location=LOCATION,
    temperature=0.7,
)
3

Create a Chain

Build a simple chain with prompt templates:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Define prompt template
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful AI assistant."),
    ("user", "{input}")
])

# Create chain
chain = prompt | llm | StrOutputParser()

# Invoke chain
response = chain.invoke({"input": "Explain quantum computing in simple terms"})
print(response)

Building RAG Applications

Retrieval-Augmented Generation combines document retrieval with language generation:
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_chroma import Chroma
from langchain.chains import RetrievalQA

# Load and split documents
loader = PyPDFLoader("technical_docs.pdf")
documents = loader.load()

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200
)
splits = text_splitter.split_documents(documents)

# Create embeddings and vector store
embeddings = GoogleGenerativeAIEmbeddings(
    model="text-embedding-004",
    project=PROJECT_ID
)

vectorstore = Chroma.from_documents(
    documents=splits,
    embedding=embeddings,
    persist_directory="./chroma_db"
)

# Create retrieval chain
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(
        search_kwargs={"k": 3}
    )
)

# Query the system
query = "What are the key features discussed in the document?"
response = qa_chain.invoke(query)
print(response)

LangGraph for Multi-Step Workflows

LangGraph enables building stateful, multi-agent workflows:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator

class GraphState(TypedDict):
    query: str
    documents: list[str]
    generation: str

def retrieve(state):
    """Retrieve relevant documents"""
    query = state["query"]
    documents = vectorstore.similarity_search(query, k=3)
    return {"documents": documents, "query": query}

def generate(state):
    """Generate response based on documents"""
    query = state["query"]
    documents = state["documents"]
    
    context = "\n\n".join([doc.page_content for doc in documents])
    prompt = f"Context: {context}\n\nQuestion: {query}\n\nAnswer:"
    
    generation = llm.invoke(prompt)
    return {"generation": generation.content}

# Build graph
workflow = StateGraph(GraphState)
workflow.add_node("retrieve", retrieve)
workflow.add_node("generate", generate)

workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", END)

app = workflow.compile()

# Execute workflow
result = app.invoke({"query": "How does the system handle errors?"})  
print(result["generation"])

LlamaIndex Workflows

LlamaIndex provides event-driven workflows for building complex RAG applications.

Creating a Workflow

from llama_index.core import Settings, VectorStoreIndex, SimpleDirectoryReader
from llama_index.llms.vertex import Vertex
from llama_index.embeddings.vertex import VertexTextEmbedding
from llama_index.core.workflow import (
    Workflow,
    Event,
    StartEvent,
    StopEvent,
    step
)
import vertexai

PROJECT_ID = "your-project-id"
LOCATION = "us-central1"

vertexai.init(project=PROJECT_ID, location=LOCATION)

# Configure LlamaIndex settings
Settings.llm = Vertex(
    model="gemini-2.0-flash-exp",
    project=PROJECT_ID,
    location=LOCATION
)

Settings.embed_model = VertexTextEmbedding(
    model_name="text-embedding-004",
    project=PROJECT_ID,
    location=LOCATION
)

Multi-Step Query Decomposition

Break complex queries into sub-questions:
from llama_index.core.indices.query.query_transform.base import (
    StepDecomposeQueryTransform
)
from llama_index.core.postprocessor.llm_rerank import LLMRerank

class MultiStepQueryEvent(Event):
    query: str
    sub_questions: list[str] = []

class RerankEvent(Event):
    results: list

class AdvancedRAGWorkflow(Workflow):
    @step
    async def decompose_query(self, ev: StartEvent) -> MultiStepQueryEvent:
        """Break down complex query into sub-questions"""
        query_transform = StepDecomposeQueryTransform(
            llm=Settings.llm,
            verbose=True
        )
        
        sub_questions = query_transform.run(ev.query)
        return MultiStepQueryEvent(
            query=ev.query,
            sub_questions=sub_questions
        )
    
    @step
    async def answer_sub_questions(self, ev: MultiStepQueryEvent) -> RerankEvent:
        """Answer each sub-question"""
        results = []
        for sub_q in ev.sub_questions:
            response = await self.index.aquery(sub_q)
            results.append(response)
        
        return RerankEvent(results=results)
    
    @step
    async def rerank_and_synthesize(self, ev: RerankEvent) -> StopEvent:
        """Rerank results and generate final answer"""
        reranker = LLMRerank(llm=Settings.llm, top_n=5)
        reranked = reranker.postprocess_nodes(ev.results)
        
        final_response = await self.index.asynthesize(
            query=ev.query,
            nodes=reranked
        )
        
        return StopEvent(result=final_response)

Workflow Patterns

Sequential Chains

Execute steps in order, passing outputs to next steps

Parallel Execution

Run multiple operations concurrently for faster processing

Conditional Routing

Branch workflows based on intermediate results

Human-in-the-Loop

Incorporate human feedback at key decision points

Sequential Chain Pattern

from langchain.chains import SequentialChain, LLMChain

# Define multiple chains
chain_1 = LLMChain(
    llm=llm,
    prompt=ChatPromptTemplate.from_template(
        "Summarize this text: {text}"
    ),
    output_key="summary"
)

chain_2 = LLMChain(
    llm=llm,
    prompt=ChatPromptTemplate.from_template(
        "Translate to Spanish: {summary}"
    ),
    output_key="translation"
)

# Combine into sequential chain
sequential_chain = SequentialChain(
    chains=[chain_1, chain_2],
    input_variables=["text"],
    output_variables=["summary", "translation"]
)

result = sequential_chain.invoke({
    "text": "Long technical document..."
})

Best Practices

Always implement error handling and retry logic in production workflows. External API calls can fail, and document retrieval may return empty results.

Error Handling

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10)
)
def robust_llm_call(prompt: str) -> str:
    """LLM call with automatic retries"""
    try:
        response = llm.invoke(prompt)
        return response.content
    except Exception as e:
        print(f"Error: {e}")
        raise

Monitoring and Logging

import logging
from langchain.callbacks import StdOutCallbackHandler

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Add callbacks for observability
chain = prompt | llm.with_config(
    callbacks=[StdOutCallbackHandler()]
)

logger.info("Starting workflow execution")
result = chain.invoke({"input": query})
logger.info(f"Workflow completed: {len(result)} characters generated")

Next Steps

Build docs developers (and LLMs) love