Connection objects manage database sessions, execute statements, and handle transactions. They are obtained from the Engine and provide the interface for interacting with the database.
from sqlalchemy import create_engine, textengine = create_engine('postgresql://user:pass@localhost/db')# Get a connectionconn = engine.connect()try: result = conn.execute(text("SELECT 1")) print(result.scalar())finally: conn.close()
# Auto-commit transactionwith engine.begin() as conn: conn.execute(users.insert().values(name='alice')) conn.execute(orders.insert().values(user_id=1, total=100)) # Automatically commits on success, rolls back on exception
from sqlalchemy import text# Using text() with named parametersstmt = text("SELECT * FROM users WHERE name = :name AND age > :age")with engine.connect() as conn: result = conn.execute( stmt, {"name": "alice", "age": 18} ) for row in result: print(row)
from sqlalchemy import bindparam# Using bindparamstmt = select(users).where( users.c.name == bindparam('username'))with engine.connect() as conn: result = conn.execute(stmt, {"username": "alice"})
from sqlalchemy import selectwith engine.connect() as conn: result = conn.execute(select(users)) # Result is a CursorResult object print(type(result)) # <class 'sqlalchemy.engine.CursorResult'>
# Get single valuewith engine.connect() as conn: # Get first column of first row count = conn.execute( select(func.count(users.c.id)) ).scalar() print(f"Total users: {count}") # scalar_one() - raises if not exactly one row user_id = conn.execute( select(users.c.id).where(users.c.email == '[email protected]') ).scalar_one() # scalar_one_or_none() - None if no rows, raises if multiple user_id = conn.execute( select(users.c.id).where(users.c.email == '[email protected]') ).scalar_one_or_none()
# Automatic commit/rollbackwith engine.begin() as conn: conn.execute(users.insert().values(name='alice')) conn.execute(orders.insert().values(user_id=1, total=100)) # Automatically commits on success # Automatically rolls back on exception
Use engine.begin() for automatic transaction management. It commits on success and rolls back on exceptions.
from sqlalchemy import selectstmt = select(users).execution_options( autocommit=True, compiled_cache=None # Disable cache for this statement)with engine.connect() as conn: result = conn.execute(stmt)
# PostgreSQL server-side cursorwith engine.connect() as conn: result = conn.execution_options( stream_results=True, max_row_buffer=1000 ).execute(select(users)) # Process in batches for partition in result.partitions(100): for row in partition: print(row)
# For ORM queries (shown for reference)from sqlalchemy.orm import Sessionwith Session(engine) as session: # Fetch in batches of 1000 for row in session.execute( select(User) ).yield_per(1000): print(row)
def process_large_query(conn, stmt, batch_size=1000): """Process large result set in batches.""" result = conn.execute(stmt) while True: batch = result.fetchmany(batch_size) if not batch: break # Process batch for row in batch: process_row(row)with engine.connect() as conn: process_large_query(conn, select(users))