Skip to main content

Connection Management

The Connection object represents a single database connection and provides methods for executing SQL statements and managing transactions. Connections are obtained from an Engine’s connection pool.

Basic Connection Usage

Acquiring a Connection

Connections are obtained from an Engine using the connect() method:
from sqlalchemy import create_engine, text

engine = create_engine("postgresql://user:pass@localhost/dbname")

# Get a connection from the pool
with engine.connect() as conn:
    result = conn.execute(text("SELECT * FROM users"))
    for row in result:
        print(row)
# Connection automatically returned to pool
Always use a context manager (with statement) to ensure connections are properly returned to the pool, even if an exception occurs.

Connection with Transaction

Use engine.begin() to get a connection with an automatic transaction:
with engine.begin() as conn:
    conn.execute(text("INSERT INTO users (name) VALUES (:name)"), {"name": "Alice"})
    conn.execute(text("INSERT INTO users (name) VALUES (:name)"), {"name": "Bob"})
# Transaction commits automatically on success, rolls back on exception

Executing Statements

Text Statements

from sqlalchemy import text

with engine.connect() as conn:
    # Simple query
    result = conn.execute(text("SELECT * FROM users WHERE id = :id"), {"id": 1})
    row = result.fetchone()
    
    # Multiple parameter sets (executemany)
    conn.execute(
        text("INSERT INTO users (name, email) VALUES (:name, :email)"),
        [
            {"name": "Alice", "email": "[email protected]"},
            {"name": "Bob", "email": "[email protected]"}
        ]
    )
    conn.commit()

Core Expression Language

from sqlalchemy import select, insert, update, delete, MetaData, Table

metadata = MetaData()
users = Table("users", metadata, autoload_with=engine)

with engine.connect() as conn:
    # SELECT
    stmt = select(users).where(users.c.name == "Alice")
    result = conn.execute(stmt)
    
    # INSERT
    stmt = insert(users).values(name="Charlie", email="[email protected]")
    conn.execute(stmt)
    
    # UPDATE
    stmt = update(users).where(users.c.id == 1).values(email="[email protected]")
    conn.execute(stmt)
    
    # DELETE
    stmt = delete(users).where(users.c.id == 1)
    conn.execute(stmt)
    
    conn.commit()

Transaction Management

Manual Transactions

Explicitly control transaction boundaries:
with engine.connect() as conn:
    # Start a transaction
    trans = conn.begin()
    try:
        conn.execute(text("INSERT INTO accounts (balance) VALUES (100)"))
        conn.execute(text("INSERT INTO transactions (type) VALUES ('deposit')"))
        trans.commit()
    except Exception:
        trans.rollback()
        raise
# Transaction starts automatically and commits on success
with engine.begin() as conn:
    conn.execute(text("INSERT INTO accounts (balance) VALUES (100)"))
    conn.execute(text("INSERT INTO transactions (type) VALUES ('deposit')"))
# Auto-commit on exit, auto-rollback on exception

Nested Transactions (Savepoints)

Create savepoints within a transaction:
with engine.begin() as conn:
    conn.execute(text("INSERT INTO users (name) VALUES ('Alice')"))
    
    # Create a savepoint
    savepoint = conn.begin_nested()
    try:
        conn.execute(text("INSERT INTO users (name) VALUES ('Bob')"))
        savepoint.commit()
    except Exception:
        savepoint.rollback()
    
    # This INSERT is committed regardless of savepoint outcome
    conn.execute(text("INSERT INTO users (name) VALUES ('Charlie')"))

Transaction Isolation Levels

Set isolation level per connection:
# Set isolation level for this connection
with engine.connect() as conn:
    conn = conn.execution_options(isolation_level="SERIALIZABLE")
    with conn.begin():
        result = conn.execute(text("SELECT * FROM accounts WHERE id = 1"))
        # ... transactional work at SERIALIZABLE level
Available isolation levels (database-dependent):
  • "SERIALIZABLE" - Highest isolation, prevents all anomalies
  • "REPEATABLE READ" - Prevents non-repeatable reads
  • "READ COMMITTED" - Prevents dirty reads (default for most databases)
  • "READ UNCOMMITTED" - Lowest isolation
  • "AUTOCOMMIT" - No transaction, each statement commits immediately
Isolation levels must be set before a transaction begins. You cannot change the isolation level of an active transaction.

Connection Pooling

SQLAlchemy uses connection pooling to manage database connections efficiently. The pool maintains a set of connections that can be reused.

Pool Types

1

QueuePool (Default)

The default pool for most databases. Maintains a fixed pool with overflow support.
from sqlalchemy.pool import QueuePool

engine = create_engine(
    "postgresql://...",
    poolclass=QueuePool,
    pool_size=5,      # Keep 5 connections open
    max_overflow=10,  # Allow up to 15 total connections
    pool_timeout=30   # Wait up to 30 seconds for a connection
)
2

NullPool

No pooling - creates and disposes connections for each request.
from sqlalchemy.pool import NullPool

engine = create_engine(
    "postgresql://...",
    poolclass=NullPool
)
Useful for:
  • Testing
  • Development with frequent schema changes
  • Applications using external connection pooling
3

StaticPool

Single connection for all requests. Useful for SQLite in-memory databases.
from sqlalchemy.pool import StaticPool

engine = create_engine(
    "sqlite:///:memory:",
    poolclass=StaticPool
)
4

SingletonThreadPool

Maintains one connection per thread.
from sqlalchemy.pool import SingletonThreadPool

engine = create_engine(
    "sqlite:///mydb.db",
    poolclass=SingletonThreadPool
)

Pool Configuration

pool_size
int
default:"5"
Number of connections to keep open in the pool.
max_overflow
int
default:"10"
Maximum connections that can be created beyond pool_size. Total connections = pool_size + max_overflow.
pool_timeout
float
default:"30"
Seconds to wait for a connection before raising TimeoutError.
pool_recycle
int
default:"-1"
Recycle connections after this many seconds. Set to 3600 (1 hour) for MySQL to avoid “MySQL server has gone away” errors.
engine = create_engine("mysql://...", pool_recycle=3600)
pool_pre_ping
bool
default:"False"
Test connections for liveness before using them. Adds slight overhead but prevents errors from stale connections.
engine = create_engine("postgresql://...", pool_pre_ping=True)
pool_use_lifo
bool
default:"False"
Use last-in-first-out retrieval. Helps server-side timeouts close unused connections.
pool_reset_on_return
str
default:"rollback"
How to reset connections when returned to the pool:
  • "rollback" - Issue ROLLBACK (default)
  • "commit" - Issue COMMIT
  • None - No reset action

Pool Events

Listen to pool events for monitoring and debugging:
from sqlalchemy import event

@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
    print("New connection created")

@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
    print("Connection checked out from pool")

@event.listens_for(engine, "checkin")
def receive_checkin(dbapi_conn, connection_record):
    print("Connection returned to pool")

Execution Options

Modify connection behavior with execution options:
with engine.connect() as conn:
    # Set options on the connection
    conn = conn.execution_options(
        isolation_level="SERIALIZABLE",
        stream_results=True,
        max_row_buffer=1000
    )
    
    result = conn.execute(text("SELECT * FROM large_table"))
    for row in result:
        print(row)

Common Execution Options

isolation_level
str
Transaction isolation level for this connection.
stream_results
bool
default:"False"
Use server-side cursors for large result sets (PostgreSQL, MySQL).
max_row_buffer
int
Buffer size when streaming results.
compiled_cache
dict | None
Custom compiled statement cache, or None to disable caching.
schema_translate_map
dict
Map schema names for multi-tenant applications:
conn = conn.execution_options(
    schema_translate_map={None: "tenant_1"}
)

Working with Results

Fetching Rows

with engine.connect() as conn:
    result = conn.execute(text("SELECT * FROM users"))
    
    # Fetch one row
    row = result.fetchone()
    print(row.name, row.email)
    
    # Fetch all rows
    all_rows = result.fetchall()
    
    # Fetch many rows
    batch = result.fetchmany(size=100)
    
    # Iterate over results
    for row in result:
        print(row)

Accessing Row Data

result = conn.execute(text("SELECT id, name, email FROM users"))
row = result.fetchone()

# By column name
print(row.name)

# By index
print(row[1])

# As dictionary
print(row._mapping["name"])

# Convert to dict
row_dict = dict(row._mapping)

Scalar Results

# Get a single value
from sqlalchemy import select

result = conn.execute(text("SELECT COUNT(*) FROM users"))
count = result.scalar()
print(f"Total users: {count}")

# Using scalar_one() - raises if not exactly one result
result = conn.execute(text("SELECT name FROM users WHERE id = 1"))
name = result.scalar_one()

Connection Properties and Methods

Key Properties

  • conn.closed - Whether the connection is closed
  • conn.invalidated - Whether the connection is invalidated
  • conn.dialect - The Dialect instance
  • conn.engine - The parent Engine

Key Methods

# Get current isolation level
level = conn.get_isolation_level()

# Get nested transaction level
depth = conn.get_nested_transaction()

# Test if in transaction
if conn.in_transaction():
    print("Transaction active")

# Invalidate the connection
conn.invalidate()

# Close the connection
conn.close()

Best Practices

1

Always Use Context Managers

Ensures connections are returned to the pool:
# Good
with engine.connect() as conn:
    result = conn.execute(text("SELECT 1"))

# Avoid
conn = engine.connect()
result = conn.execute(text("SELECT 1"))
conn.close()
2

Use engine.begin() for Transactions

Simplifies transaction management:
# Good
with engine.begin() as conn:
    conn.execute(text("INSERT INTO users (name) VALUES ('Alice')"))
# Auto-commit

# More verbose
with engine.connect() as conn:
    with conn.begin():
        conn.execute(text("INSERT INTO users (name) VALUES ('Alice')"))
3

Configure Pool for Your Workload

# Web application with moderate concurrency
engine = create_engine(
    "postgresql://...",
    pool_size=10,
    max_overflow=20,
    pool_recycle=3600,
    pool_pre_ping=True
)
4

Handle Disconnects Gracefully

from sqlalchemy.exc import DBAPIError

try:
    with engine.connect() as conn:
        result = conn.execute(text("SELECT 1"))
except DBAPIError as e:
    if e.connection_invalidated:
        print("Connection was invalidated, retry...")
    raise
5

Use Appropriate Isolation Levels

Choose based on your consistency requirements:
# Financial transaction - use SERIALIZABLE
with engine.connect().execution_options(
    isolation_level="SERIALIZABLE"
) as conn:
    with conn.begin():
        # Transfer money between accounts
        pass

# Read-only report - use READ UNCOMMITTED for performance
with engine.connect().execution_options(
    isolation_level="READ UNCOMMITTED"
) as conn:
    # Generate report
    pass

See Also

Build docs developers (and LLMs) love