Connect to Redis from Python using redis-py, the official Redis client
The official Python client for Redis is redis-py. It provides a complete interface to Redis, including support for all Redis data types, commands, and advanced features like pipelining, pub/sub, and Redis modules.
For production applications, consider installing redis[hiredis] for improved performance. The hiredis C parser can significantly speed up response parsing.
import redis# Connect to Redisclient = redis.Redis( host='localhost', port=6379, decode_responses=True # Automatically decode responses to strings)# Test the connectionclient.ping()print("Connected to Redis!")
2
Perform basic operations
Use Redis commands through the client:
# SET and GET operationsclient.set('user:1:name', 'Alice')name = client.get('user:1:name')print(f"Name: {name}") # Output: Name: Alice# Work with numbersclient.set('counter', 0)client.incr('counter')client.incrby('counter', 5)count = client.get('counter')print(f"Counter: {count}") # Output: Counter: 6# Expirationclient.setex('session:abc', 3600, 'session_data') # Expires in 1 hour
3
Handle errors
Implement proper error handling:
import redisfrom redis.exceptions import ConnectionError, TimeoutError, RedisErrortry: client = redis.Redis( host='localhost', port=6379, socket_connect_timeout=5, socket_timeout=5, decode_responses=True ) # Perform operations client.set('key', 'value') value = client.get('key')except ConnectionError as e: print(f"Could not connect to Redis: {e}")except TimeoutError as e: print(f"Redis operation timed out: {e}")except RedisError as e: print(f"Redis error: {e}")finally: # Close the connection client.close()
For production applications, use connection pooling:
import redis# Create a connection poolpool = redis.ConnectionPool( host='localhost', port=6379, max_connections=10, decode_responses=True)# Create clients from the poolclient = redis.Redis(connection_pool=pool)# All operations use connections from the poolclient.set('key', 'value')value = client.get('key')
# Store user data as a hashclient.hset('user:1', mapping={ 'name': 'Alice', 'email': '[email protected]', 'age': 30})# Get specific fieldsname = client.hget('user:1', 'name')# Get all fieldsuser = client.hgetall('user:1')print(user) # {'name': 'Alice', 'email': '[email protected]', 'age': '30'}# Increment numeric fieldclient.hincrby('user:1', 'age', 1)
# Add items to a listclient.rpush('queue', 'task1', 'task2', 'task3')# Get list lengthlength = client.llen('queue')# Pop item from listtask = client.lpop('queue')print(task) # task1# Get range of itemstasks = client.lrange('queue', 0, -1)
# Use WATCH for optimistic lockingwith client.pipeline() as pipe: while True: try: # Watch a key for changes pipe.watch('balance') balance = int(pipe.get('balance') or 0) if balance >= 100: # Execute transaction pipe.multi() pipe.decrby('balance', 100) pipe.execute() break else: pipe.unwatch() raise ValueError("Insufficient balance") except redis.WatchError: # Retry if key was modified continue