Connection Management
The Connection object represents a single database connection and provides methods for executing SQL statements and managing transactions. Connections are obtained from an Engine’s connection pool.
Basic Connection Usage
Acquiring a Connection
Connections are obtained from an Engine using the connect() method:
from sqlalchemy import create_engine, text
engine = create_engine("postgresql://user:pass@localhost/dbname")
# Get a connection from the pool
with engine.connect() as conn:
result = conn.execute(text("SELECT * FROM users"))
for row in result:
print(row)
# Connection automatically returned to pool
Always use a context manager (with statement) to ensure connections are properly returned to the pool, even if an exception occurs.
Connection with Transaction
Use engine.begin() to get a connection with an automatic transaction:
with engine.begin() as conn:
conn.execute(text("INSERT INTO users (name) VALUES (:name)"), {"name": "Alice"})
conn.execute(text("INSERT INTO users (name) VALUES (:name)"), {"name": "Bob"})
# Transaction commits automatically on success, rolls back on exception
Executing Statements
Text Statements
from sqlalchemy import text
with engine.connect() as conn:
# Simple query
result = conn.execute(text("SELECT * FROM users WHERE id = :id"), {"id": 1})
row = result.fetchone()
# Multiple parameter sets (executemany)
conn.execute(
text("INSERT INTO users (name, email) VALUES (:name, :email)"),
[
{"name": "Alice", "email": "[email protected]"},
{"name": "Bob", "email": "[email protected]"}
]
)
conn.commit()
Core Expression Language
from sqlalchemy import select, insert, update, delete, MetaData, Table
metadata = MetaData()
users = Table("users", metadata, autoload_with=engine)
with engine.connect() as conn:
# SELECT
stmt = select(users).where(users.c.name == "Alice")
result = conn.execute(stmt)
# INSERT
stmt = insert(users).values(name="Charlie", email="[email protected]")
conn.execute(stmt)
# UPDATE
stmt = update(users).where(users.c.id == 1).values(email="[email protected]")
conn.execute(stmt)
# DELETE
stmt = delete(users).where(users.c.id == 1)
conn.execute(stmt)
conn.commit()
Transaction Management
Manual Transactions
Explicitly control transaction boundaries:
with engine.connect() as conn:
# Start a transaction
trans = conn.begin()
try:
conn.execute(text("INSERT INTO accounts (balance) VALUES (100)"))
conn.execute(text("INSERT INTO transactions (type) VALUES ('deposit')"))
trans.commit()
except Exception:
trans.rollback()
raise
Automatic Transaction (Recommended)
# Transaction starts automatically and commits on success
with engine.begin() as conn:
conn.execute(text("INSERT INTO accounts (balance) VALUES (100)"))
conn.execute(text("INSERT INTO transactions (type) VALUES ('deposit')"))
# Auto-commit on exit, auto-rollback on exception
Nested Transactions (Savepoints)
Create savepoints within a transaction:
with engine.begin() as conn:
conn.execute(text("INSERT INTO users (name) VALUES ('Alice')"))
# Create a savepoint
savepoint = conn.begin_nested()
try:
conn.execute(text("INSERT INTO users (name) VALUES ('Bob')"))
savepoint.commit()
except Exception:
savepoint.rollback()
# This INSERT is committed regardless of savepoint outcome
conn.execute(text("INSERT INTO users (name) VALUES ('Charlie')"))
Transaction Isolation Levels
Set isolation level per connection:
# Set isolation level for this connection
with engine.connect() as conn:
conn = conn.execution_options(isolation_level="SERIALIZABLE")
with conn.begin():
result = conn.execute(text("SELECT * FROM accounts WHERE id = 1"))
# ... transactional work at SERIALIZABLE level
Available isolation levels (database-dependent):
"SERIALIZABLE" - Highest isolation, prevents all anomalies
"REPEATABLE READ" - Prevents non-repeatable reads
"READ COMMITTED" - Prevents dirty reads (default for most databases)
"READ UNCOMMITTED" - Lowest isolation
"AUTOCOMMIT" - No transaction, each statement commits immediately
Isolation levels must be set before a transaction begins. You cannot change the isolation level of an active transaction.
Connection Pooling
SQLAlchemy uses connection pooling to manage database connections efficiently. The pool maintains a set of connections that can be reused.
Pool Types
QueuePool (Default)
The default pool for most databases. Maintains a fixed pool with overflow support.from sqlalchemy.pool import QueuePool
engine = create_engine(
"postgresql://...",
poolclass=QueuePool,
pool_size=5, # Keep 5 connections open
max_overflow=10, # Allow up to 15 total connections
pool_timeout=30 # Wait up to 30 seconds for a connection
)
NullPool
No pooling - creates and disposes connections for each request.from sqlalchemy.pool import NullPool
engine = create_engine(
"postgresql://...",
poolclass=NullPool
)
Useful for:
- Testing
- Development with frequent schema changes
- Applications using external connection pooling
StaticPool
Single connection for all requests. Useful for SQLite in-memory databases.from sqlalchemy.pool import StaticPool
engine = create_engine(
"sqlite:///:memory:",
poolclass=StaticPool
)
SingletonThreadPool
Maintains one connection per thread.from sqlalchemy.pool import SingletonThreadPool
engine = create_engine(
"sqlite:///mydb.db",
poolclass=SingletonThreadPool
)
Pool Configuration
Number of connections to keep open in the pool.
Maximum connections that can be created beyond pool_size. Total connections = pool_size + max_overflow.
Seconds to wait for a connection before raising TimeoutError.
Recycle connections after this many seconds. Set to 3600 (1 hour) for MySQL to avoid “MySQL server has gone away” errors.engine = create_engine("mysql://...", pool_recycle=3600)
Test connections for liveness before using them. Adds slight overhead but prevents errors from stale connections.engine = create_engine("postgresql://...", pool_pre_ping=True)
Use last-in-first-out retrieval. Helps server-side timeouts close unused connections.
How to reset connections when returned to the pool:
"rollback" - Issue ROLLBACK (default)
"commit" - Issue COMMIT
None - No reset action
Pool Events
Listen to pool events for monitoring and debugging:
from sqlalchemy import event
@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
print("New connection created")
@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
print("Connection checked out from pool")
@event.listens_for(engine, "checkin")
def receive_checkin(dbapi_conn, connection_record):
print("Connection returned to pool")
Execution Options
Modify connection behavior with execution options:
with engine.connect() as conn:
# Set options on the connection
conn = conn.execution_options(
isolation_level="SERIALIZABLE",
stream_results=True,
max_row_buffer=1000
)
result = conn.execute(text("SELECT * FROM large_table"))
for row in result:
print(row)
Common Execution Options
Transaction isolation level for this connection.
Use server-side cursors for large result sets (PostgreSQL, MySQL).
Buffer size when streaming results.
Custom compiled statement cache, or None to disable caching.
Map schema names for multi-tenant applications:conn = conn.execution_options(
schema_translate_map={None: "tenant_1"}
)
Working with Results
Fetching Rows
with engine.connect() as conn:
result = conn.execute(text("SELECT * FROM users"))
# Fetch one row
row = result.fetchone()
print(row.name, row.email)
# Fetch all rows
all_rows = result.fetchall()
# Fetch many rows
batch = result.fetchmany(size=100)
# Iterate over results
for row in result:
print(row)
Accessing Row Data
result = conn.execute(text("SELECT id, name, email FROM users"))
row = result.fetchone()
# By column name
print(row.name)
# By index
print(row[1])
# As dictionary
print(row._mapping["name"])
# Convert to dict
row_dict = dict(row._mapping)
Scalar Results
# Get a single value
from sqlalchemy import select
result = conn.execute(text("SELECT COUNT(*) FROM users"))
count = result.scalar()
print(f"Total users: {count}")
# Using scalar_one() - raises if not exactly one result
result = conn.execute(text("SELECT name FROM users WHERE id = 1"))
name = result.scalar_one()
Connection Properties and Methods
Key Properties
conn.closed - Whether the connection is closed
conn.invalidated - Whether the connection is invalidated
conn.dialect - The Dialect instance
conn.engine - The parent Engine
Key Methods
# Get current isolation level
level = conn.get_isolation_level()
# Get nested transaction level
depth = conn.get_nested_transaction()
# Test if in transaction
if conn.in_transaction():
print("Transaction active")
# Invalidate the connection
conn.invalidate()
# Close the connection
conn.close()
Best Practices
Always Use Context Managers
Ensures connections are returned to the pool:# Good
with engine.connect() as conn:
result = conn.execute(text("SELECT 1"))
# Avoid
conn = engine.connect()
result = conn.execute(text("SELECT 1"))
conn.close()
Use engine.begin() for Transactions
Simplifies transaction management:# Good
with engine.begin() as conn:
conn.execute(text("INSERT INTO users (name) VALUES ('Alice')"))
# Auto-commit
# More verbose
with engine.connect() as conn:
with conn.begin():
conn.execute(text("INSERT INTO users (name) VALUES ('Alice')"))
Configure Pool for Your Workload
# Web application with moderate concurrency
engine = create_engine(
"postgresql://...",
pool_size=10,
max_overflow=20,
pool_recycle=3600,
pool_pre_ping=True
)
Handle Disconnects Gracefully
from sqlalchemy.exc import DBAPIError
try:
with engine.connect() as conn:
result = conn.execute(text("SELECT 1"))
except DBAPIError as e:
if e.connection_invalidated:
print("Connection was invalidated, retry...")
raise
Use Appropriate Isolation Levels
Choose based on your consistency requirements:# Financial transaction - use SERIALIZABLE
with engine.connect().execution_options(
isolation_level="SERIALIZABLE"
) as conn:
with conn.begin():
# Transfer money between accounts
pass
# Read-only report - use READ UNCOMMITTED for performance
with engine.connect().execution_options(
isolation_level="READ UNCOMMITTED"
) as conn:
# Generate report
pass
See Also