Skip to main content

Overview

EduMate uses Redis Queue (RQ) for handling long-running background tasks asynchronously, including:
  • PDF chunking and vector indexing
  • MCQ generation via Gemini AI
  • Document processing pipelines
RQ is a simple Python library for queueing jobs and processing them in the background with workers.

Why Redis Queue?

Processing large PDFs and generating AI assessments can take several minutes. Using RQ allows:
  • Non-blocking API responses
  • Job status tracking
  • Failure handling and retry logic
  • Scalable worker processes

Installation

Install Redis

Ubuntu/Debian:
sudo apt update
sudo apt install redis-server
macOS:
brew install redis
Windows:
# Use WSL or download from: https://redis.io/download

Install Python Dependencies

pip install redis rq

Configuration

Redis Client Setup

Location: backend/client/rq_client.py:1-9
from redis import Redis
from rq import Queue

queue = Queue(
    connection=Redis(
        host='localhost',
        port="6379",
    )
)
host
string
default:"localhost"
Redis server hostname.
port
string
default:"6379"
Redis server port number.

Environment Variables

You can configure Redis via environment variables:
REDIS_HOST=localhost
REDIS_PORT=6379

Background Jobs

1. Document Chunking Job

Location: backend/queue/doc_chunking.py:61-110 Processes PDF files into chunks and stores them in Qdrant vector database.
def chunk(doc_path, collection_name: str):
    # Find and load PDFs
    pdf_paths = find_pdfs(doc_path)
    docs = load_all(pdf_paths)
    
    # Split into chunks
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=15000,
        chunk_overlap=4000
    )
    chunks = text_splitter.split_documents(documents=docs)
    
    # Generate embeddings and store
    embedding_model = OllamaEmbeddings(
        model='qwen3-embedding:0.6b',
        base_url='http://localhost:11434'
    )
    
    vector_store = QdrantVectorStore.from_documents(
        documents=chunks,
        embedding=embedding_model,
        url='http://localhost:6333',
        collection_name=collection_name,
    )
    
    return {
        "stored": True,
        "chunks": len(chunks),
        "source": str(pdf_paths[0]),
        "collection_name": collection_name,
    }
Chunk Configuration:
  • chunk_size: 15,000 characters
  • chunk_overlap: 4,000 characters
  • Embedding model: qwen3-embedding:0.6b

2. MCQ Generation Job

Location: backend/queue/chat.py:98-156 Generates MCQ assessments using Gemini AI based on retrieved context.
def search_and_ask(user_query, collection_name: str, 
                   blooms_requirements: str = "5 remember, 3 understand, 4 apply, 3 analyze, 2 evaluate, 3 create", 
                   top_k=5):
    
    # Search vector database
    vector_db = _vector_db(collection_name=collection_name)
    search_results = vector_db.similarity_search(query=user_query, k=top_k)
    
    # Build context from search results
    context_blocks = []
    for result in search_results:
        block = (
            f"--- ADMIN METADATA (DO NOT MENTION IN OUTPUT) ---\n"
            f"Source: {result.metadata['source']}\n"
            f"Page: {result.metadata['page_label']}\n"
            f"--- EDUCATIONAL CONTENT ---\n"
            f"{result.page_content}\n"
        )
        context_blocks.append(block)
    
    # Generate MCQs with Gemini
    response = open_ai_client.chat.completions.parse(
        model='gemini-2.5-flash-lite',
        response_format=OutputFormat,
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": user_query},
        ],
    )
    
    return response.choices[0].message.parsed.model_dump()

API Integration

Enqueue Jobs

Location: backend/server.py:190-241 Chunking endpoint:
@app.post('/chunking')
def chunking(file: UploadFile | None = File(None)):
    collection_name = f"edu_mate_{uuid.uuid4().hex}"
    
    # Save uploaded file
    save_path = os.path.join(UPLOADS_DIR, f"{uuid.uuid4().hex}_{filename}")
    with open(save_path, "wb") as f:
        f.write(file.file.read())
    
    # Enqueue job with 10-minute timeout
    job = queue.enqueue(chunk, [save_path], collection_name, job_timeout=600)
    
    return {"status": "queued", "job_id": job.id, "collection_name": collection_name}
Chat endpoint:
@app.post('/chat')
def chat(query: str, collection_name: str, blooms_requirements: str):
    job = queue.enqueue(search_and_ask, query, collection_name, 
                       blooms_requirements, job_timeout=600)
    return {"status": "queued", "job_id": job.id}

Check Job Status

@app.get('/job_status')
def get_result(job_id: str):
    job = queue.fetch_job(job_id=job_id)
    
    if job is None:
        return {"status": None}
    
    if job.is_finished:
        return {"status": "finished", "result": job.result}
    
    if job.is_failed:
        return {"status": "failed", "error": str(job.exc_info)}
    
    return {"status": job.get_status()}

Running the Worker

Start Redis Server

# Start Redis
redis-server

# Verify Redis is running
redis-cli ping
# Should return: PONG

Start RQ Worker

Create a worker file or run directly:
# Option 1: Using RQ CLI (recommended)
rq worker --with-scheduler

# Option 2: Python script
python -c "
from backend.client.rq_client import queue
from rq import Worker

worker = Worker([queue])
worker.work()
"
Run the worker in a separate terminal or as a background service.

Production Deployment

For production, use a process manager: Using systemd:
# /etc/systemd/system/edumate-worker.service
[Unit]
Description=EduMate RQ Worker
After=network.target redis.service

[Service]
Type=simple
User=www-data
WorkingDirectory=/path/to/edumate
ExecStart=/path/to/venv/bin/rq worker --with-scheduler
Restart=always

[Install]
WantedBy=multi-user.target
sudo systemctl enable edumate-worker
sudo systemctl start edumate-worker
Using Supervisor:
[program:edumate-worker]
command=/path/to/venv/bin/rq worker --with-scheduler
directory=/path/to/edumate
autostart=true
autorestart=true
stdout_logfile=/var/log/edumate-worker.log
stderr_logfile=/var/log/edumate-worker-error.log

Job Configuration

Timeouts

Both chunking and chat jobs use a 600-second (10-minute) timeout:
job = queue.enqueue(chunk, doc_path, collection_name, job_timeout=600)
Adjust timeout based on your PDF sizes and server capacity.

Job Result Expiration

By default, RQ keeps job results for 500 seconds. Configure this:
job = queue.enqueue(
    chunk, 
    doc_path, 
    collection_name,
    job_timeout=600,
    result_ttl=3600  # Keep results for 1 hour
)

Monitoring

Check Queue Status

# Python shell
from backend.client.rq_client import queue

# Get queue length
print(queue.count)  # Number of jobs waiting

# List all jobs
for job in queue.jobs:
    print(f"{job.id}: {job.get_status()}")

RQ Dashboard (Optional)

Install and run the web dashboard:
pip install rq-dashboard
rq-dashboard
Access at http://localhost:9181

Troubleshooting

Redis Connection Errors

# Check if Redis is running
redis-cli ping

# Check Redis logs
sudo journalctl -u redis -f

Worker Not Processing Jobs

# Ensure worker is running
ps aux | grep "rq worker"

# Check worker logs
rq info

Job Failures

Check the job exception info:
job = queue.fetch_job(job_id)
if job.is_failed:
    print(job.exc_info)
Common failures include Ollama not running or Qdrant connection issues.

Build docs developers (and LLMs) love