Skip to main content

Connections

Connection objects manage database sessions, execute statements, and handle transactions. They are obtained from the Engine and provide the interface for interacting with the database.

Obtaining Connections

Using connect()

from sqlalchemy import create_engine, text

engine = create_engine('postgresql://user:pass@localhost/db')

# Get a connection
conn = engine.connect()

try:
    result = conn.execute(text("SELECT 1"))
    print(result.scalar())
finally:
    conn.close()
# Automatically closes connection
with engine.connect() as conn:
    result = conn.execute(text("SELECT 1"))
    print(result.scalar())
# Connection automatically closed
Always use context managers (with statements) to ensure connections are properly returned to the pool.

Transaction Context

# Auto-commit transaction
with engine.begin() as conn:
    conn.execute(users.insert().values(name='alice'))
    conn.execute(orders.insert().values(user_id=1, total=100))
    # Automatically commits on success, rolls back on exception

Executing Statements

Basic Execution

from sqlalchemy import select, insert, update, delete

with engine.connect() as conn:
    # SELECT
    result = conn.execute(select(users))
    
    # INSERT
    result = conn.execute(
        insert(users).values(name='bob', email='[email protected]')
    )
    
    # UPDATE
    result = conn.execute(
        update(users).where(users.c.id == 1).values(name='Bobby')
    )
    
    # DELETE
    result = conn.execute(
        delete(users).where(users.c.id == 1)
    )
    
    # Must commit for DML
    conn.commit()
execute
method
Signature: Connection.execute(statement, parameters=None) -> CursorResultExecutes a SQL statement and returns a result object.

Parameterized Execution

from sqlalchemy import text

# Using text() with named parameters
stmt = text("SELECT * FROM users WHERE name = :name AND age > :age")

with engine.connect() as conn:
    result = conn.execute(
        stmt,
        {"name": "alice", "age": 18}
    )
    for row in result:
        print(row)

Result Objects

CursorResult

from sqlalchemy import select

with engine.connect() as conn:
    result = conn.execute(select(users))
    
    # Result is a CursorResult object
    print(type(result))  # <class 'sqlalchemy.engine.CursorResult'>

Fetching Results

# Fetch single row
with engine.connect() as conn:
    result = conn.execute(select(users).where(users.c.id == 1))
    row = result.fetchone()
    
    if row:
        print(f"ID: {row.id}, Name: {row.name}")

Accessing Row Data

with engine.connect() as conn:
    result = conn.execute(select(users))
    row = result.fetchone()
    
    # Attribute access
    print(row.id)
    print(row.name)
    print(row.email)
    
    # Index access
    print(row[0])
    print(row[1])
    
    # Dictionary-like access
    print(row['id'])
    print(row['name'])
    
    # As tuple
    print(tuple(row))
    
    # As dictionary
    print(dict(row))

Scalar Results

# Get single value
with engine.connect() as conn:
    # Get first column of first row
    count = conn.execute(
        select(func.count(users.c.id))
    ).scalar()
    print(f"Total users: {count}")
    
    # scalar_one() - raises if not exactly one row
    user_id = conn.execute(
        select(users.c.id).where(users.c.email == '[email protected]')
    ).scalar_one()
    
    # scalar_one_or_none() - None if no rows, raises if multiple
    user_id = conn.execute(
        select(users.c.id).where(users.c.email == '[email protected]')
    ).scalar_one_or_none()

Result Metadata

with engine.connect() as conn:
    result = conn.execute(select(users))
    
    # Column names
    print(result.keys())  # ['id', 'name', 'email']
    
    # Number of rows affected (for INSERT/UPDATE/DELETE)
    result = conn.execute(
        update(users).where(users.c.active == False).values(status='inactive')
    )
    print(f"Updated {result.rowcount} rows")
    
    # Last inserted ID (if supported)
    result = conn.execute(
        insert(users).values(name='alice')
    )
    print(f"Inserted ID: {result.inserted_primary_key}")

Transactions

Commit and Rollback

with engine.connect() as conn:
    # Start explicit transaction
    trans = conn.begin()
    
    try:
        conn.execute(users.insert().values(name='alice'))
        conn.execute(orders.insert().values(user_id=1, total=100))
        
        # Commit transaction
        trans.commit()
    except Exception as e:
        # Rollback on error
        trans.rollback()
        raise

begin() Context Manager

# Automatic commit/rollback
with engine.begin() as conn:
    conn.execute(users.insert().values(name='alice'))
    conn.execute(orders.insert().values(user_id=1, total=100))
    # Automatically commits on success
    # Automatically rolls back on exception
Use engine.begin() for automatic transaction management. It commits on success and rolls back on exceptions.

Nested Transactions (Savepoints)

with engine.begin() as conn:
    # Outer transaction
    conn.execute(users.insert().values(name='alice'))
    
    # Create savepoint
    nested = conn.begin_nested()
    try:
        conn.execute(users.insert().values(name='bob'))
        nested.commit()
    except Exception:
        # Rollback to savepoint
        nested.rollback()
    
    # Outer transaction continues
    conn.execute(users.insert().values(name='charlie'))
    # Commits alice and charlie, not bob

Two-Phase Commit

# Two-phase commit (XA transactions)
with engine.connect() as conn:
    trans = conn.begin_twophase()
    
    try:
        conn.execute(users.insert().values(name='alice'))
        
        # Prepare transaction
        trans.prepare()
        
        # Commit
        trans.commit()
    except Exception:
        trans.rollback()

Execution Options

Connection-Level Options

# Set execution options for connection
with engine.connect() as conn:
    # Set isolation level
    conn = conn.execution_options(
        isolation_level="SERIALIZABLE"
    )
    
    # Read-only mode
    conn = conn.execution_options(
        postgresql_readonly=True
    )
    
    result = conn.execute(select(users))

Statement-Level Options

from sqlalchemy import select

stmt = select(users).execution_options(
    autocommit=True,
    compiled_cache=None  # Disable cache for this statement
)

with engine.connect() as conn:
    result = conn.execute(stmt)

Isolation Levels

# Set isolation level
with engine.connect() as conn:
    conn = conn.execution_options(
        isolation_level="REPEATABLE READ"
    )
    
    result = conn.execute(select(users))

# Available levels (database-dependent):
# - READ UNCOMMITTED
# - READ COMMITTED
# - REPEATABLE READ
# - SERIALIZABLE

Connection Events

Statement Execution Events

from sqlalchemy import event

@event.listens_for(engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    conn.info.setdefault('query_start_time', []).append(time.time())
    print(f"Executing: {statement}")

@event.listens_for(engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    total_time = time.time() - conn.info['query_start_time'].pop()
    print(f"Query took {total_time:.3f}s")

Connection Pool Events

@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
    print("New database connection")

@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
    print("Connection checked out from pool")

@event.listens_for(engine, "checkin")
def receive_checkin(dbapi_conn, connection_record):
    print("Connection returned to pool")

Raw DBAPI Access

Getting DBAPI Connection

# Access underlying DBAPI connection
with engine.connect() as conn:
    dbapi_conn = conn.connection.dbapi_connection
    
    # Use DBAPI directly
    cursor = dbapi_conn.cursor()
    cursor.execute("SELECT 1")
    result = cursor.fetchone()
    cursor.close()

Detaching DBAPI Connection

# Detach DBAPI connection from pool
with engine.connect() as conn:
    dbapi_conn = conn.connection.detach()
    
    # Use DBAPI connection outside SQLAlchemy
    cursor = dbapi_conn.cursor()
    cursor.execute("SELECT 1")
    
    # Must close manually
    dbapi_conn.close()

Streaming Results

Server-Side Cursors

# PostgreSQL server-side cursor
with engine.connect() as conn:
    result = conn.execution_options(
        stream_results=True,
        max_row_buffer=1000
    ).execute(select(users))
    
    # Process in batches
    for partition in result.partitions(100):
        for row in partition:
            print(row)

Yield Per (ORM)

# For ORM queries (shown for reference)
from sqlalchemy.orm import Session

with Session(engine) as session:
    # Fetch in batches of 1000
    for row in session.execute(
        select(User)
    ).yield_per(1000):
        print(row)

Error Handling

Database Errors

from sqlalchemy import exc

with engine.connect() as conn:
    try:
        conn.execute(text("INVALID SQL"))
    except exc.ProgrammingError as e:
        print(f"SQL error: {e}")
    except exc.IntegrityError as e:
        print(f"Constraint violation: {e}")
    except exc.OperationalError as e:
        print(f"Database connection error: {e}")

Common Exceptions

  • DBAPIError: Base for all DBAPI errors
  • IntegrityError: Constraint violations (unique, foreign key, etc.)
  • ProgrammingError: SQL syntax errors
  • OperationalError: Database connection issues
  • DataError: Data type issues
  • DisconnectionError: Connection lost

Performance Patterns

Bulk Operations

# Bulk insert
with engine.begin() as conn:
    conn.execute(
        users.insert(),
        [
            {"name": f"user_{i}", "email": f"user_{i}@example.com"}
            for i in range(1000)
        ]
    )

# Bulk update
from sqlalchemy import bindparam

stmt = (
    update(users)
    .where(users.c.id == bindparam('uid'))
    .values(status=bindparam('s'))
)

with engine.begin() as conn:
    conn.execute(stmt, [
        {"uid": 1, "s": "active"},
        {"uid": 2, "s": "inactive"},
        {"uid": 3, "s": "active"}
    ])

Batch Processing

def process_large_query(conn, stmt, batch_size=1000):
    """Process large result set in batches."""
    result = conn.execute(stmt)
    
    while True:
        batch = result.fetchmany(batch_size)
        if not batch:
            break
        
        # Process batch
        for row in batch:
            process_row(row)

with engine.connect() as conn:
    process_large_query(conn, select(users))

Connection Reuse

# Reuse connection for multiple operations
with engine.connect() as conn:
    # All operations use same connection
    users_result = conn.execute(select(users))
    orders_result = conn.execute(select(orders))
    products_result = conn.execute(select(products))
    
    # Process results
    users = users_result.fetchall()
    orders = orders_result.fetchall()
    products = products_result.fetchall()

Testing Connections

Connection Health Check

def check_connection(engine):
    """Test database connection."""
    try:
        with engine.connect() as conn:
            conn.execute(text("SELECT 1"))
        return True
    except Exception as e:
        print(f"Connection failed: {e}")
        return False

if check_connection(engine):
    print("Database is accessible")

Mock Connections

from sqlalchemy import create_mock_engine

def dump_sql(sql, *args, **kwargs):
    print(f"SQL: {sql}")

# Create mock engine that prints SQL
mock_engine = create_mock_engine(dump_sql)

# Generate SQL without executing
stmt = select(users).where(users.c.id == 1)
mock_engine.execute(stmt)
# Prints: SQL: SELECT users.id, users.name FROM users WHERE users.id = ?

Next Steps

Connection Pooling

Learn about pool configuration and management

Engines

Configure database engines

Build docs developers (and LLMs) love