Overview
Orchestration frameworks enable you to build sophisticated AI applications by connecting language models with external data sources, tools, and APIs. This guide covers using LangChain and LlamaIndex to orchestrate Gemini models in production environments.
Orchestration frameworks help you move beyond simple question-answering to build data-aware and agentic applications that can reason and take action.
LangChain Integration
LangChain provides modular abstractions for working with LLMs, making it easy to swap components and build complex applications.
Setting Up LangChain with Gemini
Install Dependencies
Install the LangChain packages with Google integrations: pip install langchain langchain-core langchain-google-genai \
langchain-community langchain-text-splitters
Initialize the Model
Configure LangChain to use Gemini models: from langchain_google_genai import ChatGoogleGenerativeAI
import os
PROJECT_ID = os.environ.get( "GOOGLE_CLOUD_PROJECT" )
LOCATION = "us-central1"
# Initialize Gemini model
llm = ChatGoogleGenerativeAI(
model = "gemini-2.0-flash-exp" ,
project = PROJECT_ID ,
location = LOCATION ,
temperature = 0.7 ,
)
Create a Chain
Build a simple chain with prompt templates: from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# Define prompt template
prompt = ChatPromptTemplate.from_messages([
( "system" , "You are a helpful AI assistant." ),
( "user" , " {input} " )
])
# Create chain
chain = prompt | llm | StrOutputParser()
# Invoke chain
response = chain.invoke({ "input" : "Explain quantum computing in simple terms" })
print (response)
Building RAG Applications
Retrieval-Augmented Generation combines document retrieval with language generation:
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_chroma import Chroma
from langchain.chains import RetrievalQA
# Load and split documents
loader = PyPDFLoader( "technical_docs.pdf" )
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size = 1000 ,
chunk_overlap = 200
)
splits = text_splitter.split_documents(documents)
# Create embeddings and vector store
embeddings = GoogleGenerativeAIEmbeddings(
model = "text-embedding-004" ,
project = PROJECT_ID
)
vectorstore = Chroma.from_documents(
documents = splits,
embedding = embeddings,
persist_directory = "./chroma_db"
)
# Create retrieval chain
qa_chain = RetrievalQA.from_chain_type(
llm = llm,
chain_type = "stuff" ,
retriever = vectorstore.as_retriever(
search_kwargs = { "k" : 3 }
)
)
# Query the system
query = "What are the key features discussed in the document?"
response = qa_chain.invoke(query)
print (response)
LangGraph for Multi-Step Workflows
LangGraph enables building stateful, multi-agent workflows:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class GraphState ( TypedDict ):
query: str
documents: list[ str ]
generation: str
def retrieve ( state ):
"""Retrieve relevant documents"""
query = state[ "query" ]
documents = vectorstore.similarity_search(query, k = 3 )
return { "documents" : documents, "query" : query}
def generate ( state ):
"""Generate response based on documents"""
query = state[ "query" ]
documents = state[ "documents" ]
context = " \n\n " .join([doc.page_content for doc in documents])
prompt = f "Context: { context } \n\n Question: { query } \n\n Answer:"
generation = llm.invoke(prompt)
return { "generation" : generation.content}
# Build graph
workflow = StateGraph(GraphState)
workflow.add_node( "retrieve" , retrieve)
workflow.add_node( "generate" , generate)
workflow.set_entry_point( "retrieve" )
workflow.add_edge( "retrieve" , "generate" )
workflow.add_edge( "generate" , END )
app = workflow.compile()
# Execute workflow
result = app.invoke({ "query" : "How does the system handle errors?" })
print (result[ "generation" ])
LlamaIndex Workflows
LlamaIndex provides event-driven workflows for building complex RAG applications.
Creating a Workflow
Setup
Workflow Definition
Execute Workflow
from llama_index.core import Settings, VectorStoreIndex, SimpleDirectoryReader
from llama_index.llms.vertex import Vertex
from llama_index.embeddings.vertex import VertexTextEmbedding
from llama_index.core.workflow import (
Workflow,
Event,
StartEvent,
StopEvent,
step
)
import vertexai
PROJECT_ID = "your-project-id"
LOCATION = "us-central1"
vertexai.init( project = PROJECT_ID , location = LOCATION )
# Configure LlamaIndex settings
Settings.llm = Vertex(
model = "gemini-2.0-flash-exp" ,
project = PROJECT_ID ,
location = LOCATION
)
Settings.embed_model = VertexTextEmbedding(
model_name = "text-embedding-004" ,
project = PROJECT_ID ,
location = LOCATION
)
Multi-Step Query Decomposition
Break complex queries into sub-questions:
from llama_index.core.indices.query.query_transform.base import (
StepDecomposeQueryTransform
)
from llama_index.core.postprocessor.llm_rerank import LLMRerank
class MultiStepQueryEvent ( Event ):
query: str
sub_questions: list[ str ] = []
class RerankEvent ( Event ):
results: list
class AdvancedRAGWorkflow ( Workflow ):
@step
async def decompose_query ( self , ev : StartEvent) -> MultiStepQueryEvent:
"""Break down complex query into sub-questions"""
query_transform = StepDecomposeQueryTransform(
llm = Settings.llm,
verbose = True
)
sub_questions = query_transform.run(ev.query)
return MultiStepQueryEvent(
query = ev.query,
sub_questions = sub_questions
)
@step
async def answer_sub_questions ( self , ev : MultiStepQueryEvent) -> RerankEvent:
"""Answer each sub-question"""
results = []
for sub_q in ev.sub_questions:
response = await self .index.aquery(sub_q)
results.append(response)
return RerankEvent( results = results)
@step
async def rerank_and_synthesize ( self , ev : RerankEvent) -> StopEvent:
"""Rerank results and generate final answer"""
reranker = LLMRerank( llm = Settings.llm, top_n = 5 )
reranked = reranker.postprocess_nodes(ev.results)
final_response = await self .index.asynthesize(
query = ev.query,
nodes = reranked
)
return StopEvent( result = final_response)
Workflow Patterns
Sequential Chains Execute steps in order, passing outputs to next steps
Parallel Execution Run multiple operations concurrently for faster processing
Conditional Routing Branch workflows based on intermediate results
Human-in-the-Loop Incorporate human feedback at key decision points
Sequential Chain Pattern
from langchain.chains import SequentialChain, LLMChain
# Define multiple chains
chain_1 = LLMChain(
llm = llm,
prompt = ChatPromptTemplate.from_template(
"Summarize this text: {text} "
),
output_key = "summary"
)
chain_2 = LLMChain(
llm = llm,
prompt = ChatPromptTemplate.from_template(
"Translate to Spanish: {summary} "
),
output_key = "translation"
)
# Combine into sequential chain
sequential_chain = SequentialChain(
chains = [chain_1, chain_2],
input_variables = [ "text" ],
output_variables = [ "summary" , "translation" ]
)
result = sequential_chain.invoke({
"text" : "Long technical document..."
})
Best Practices
Always implement error handling and retry logic in production workflows. External API calls can fail, and document retrieval may return empty results.
Error Handling
from tenacity import retry, stop_after_attempt, wait_exponential
@retry (
stop = stop_after_attempt( 3 ),
wait = wait_exponential( multiplier = 1 , min = 4 , max = 10 )
)
def robust_llm_call ( prompt : str ) -> str :
"""LLM call with automatic retries"""
try :
response = llm.invoke(prompt)
return response.content
except Exception as e:
print ( f "Error: { e } " )
raise
Monitoring and Logging
import logging
from langchain.callbacks import StdOutCallbackHandler
logging.basicConfig( level = logging. INFO )
logger = logging.getLogger( __name__ )
# Add callbacks for observability
chain = prompt | llm.with_config(
callbacks = [StdOutCallbackHandler()]
)
logger.info( "Starting workflow execution" )
result = chain.invoke({ "input" : query})
logger.info( f "Workflow completed: { len (result) } characters generated" )
Next Steps