Skip to main content

Overview

Energy CMMS uses Celery with Redis as the message broker for handling long-running operations like data imports, document processing, and scheduled maintenance tasks.

Architecture

Configuration

Environment-Based Setup

Celery automatically configures based on environment:
import os
import sys

# Environment detection
IS_LOCAL = DEBUG and not os.environ.get('COOLIFY_FQDN')

# Celery Broker URL
if IS_LOCAL:
    # Development: Local Redis
    CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
    print(f"[DEBUG] Entorno LOCAL detectado. Redis: {CELERY_BROKER_URL}")
else:
    # Production: Redis in Docker/Coolify
    CELERY_BROKER_URL = os.environ.get(
        'CELERY_BROKER_URL',
        'redis://default:saul123@lwcc8sss480ks4oc8gcgw4go:6379/0'
    )
    print(f"[DEBUG] Entorno PRODUCCION detectado. Redis: {CELERY_BROKER_URL}")

sys.stdout.flush()

# Result Backend
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', 'django-db')
CELERY_CACHE_BACKEND = 'django-cache'

# Task Settings
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60  # 30 minutes max per task
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = TIME_ZONE
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True

# Aggressive timeouts to prevent hangs
CELERY_BROKER_CONNECTION_TIMEOUT = 3  # 3 seconds to connect
CELERY_BROKER_TRANSPORT_OPTIONS = {
    'socket_timeout': 3,
    'socket_connect_timeout': 3,
    'socket_keepalive': True,
}

# Production settings
if not DEBUG:
    CELERY_BROKER_CONNECTION_RETRY = True
    CELERY_BROKER_CONNECTION_MAX_RETRIES = 10

Redis Cache Configuration

Shared cache for Celery and Django:
settings.py - Cache Configuration
import redis

if IS_LOCAL:
    try:
        # Test Redis connection
        redis_url = CELERY_BROKER_URL
        r = redis.from_url(redis_url, socket_connect_timeout=1)
        r.ping()
        
        # Redis available
        CACHES = {
            'default': {
                'BACKEND': 'django.core.cache.backends.redis.RedisCache',
                'LOCATION': redis_url,
                'OPTIONS': {
                    'socket_timeout': 5,
                    'socket_connect_timeout': 5,
                    'retry_on_timeout': True,
                }
            }
        }
        print(f"[DEBUG] Cache: Redis conectado exitosamente en {redis_url}")
    except Exception as e:
        # Fallback to local memory cache
        CACHES = {
            'default': {
                'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
                'LOCATION': 'unique-snowflake',
            }
        }
        print(f"[DEBUG] Cache: LocMem (Redis no disponible: {e})")
else:
    # Production: Redis cache
    CACHES = {
        'default': {
            'BACKEND': 'django.core.cache.backends.redis.RedisCache',
            'LOCATION': CELERY_BROKER_URL,
            'OPTIONS': {
                'socket_timeout': 5,
                'socket_connect_timeout': 5,
                'retry_on_timeout': True,
            }
        }
    }
    print(f"[DEBUG] Cache: Redis Produccion ({CELERY_BROKER_URL})")

Task Examples

Asset Import Task

From activos/tasks.py:
activos/tasks.py
from celery import shared_task
from import_export import resources
from django.core.cache import cache
from django.core.files.storage import default_storage

@shared_task(bind=True)
def import_activos_task(self, file_path, file_format, user_id=None, 
                        import_name="Importación sin nombre", 
                        verification_mode=False, dry_run=False):
    """
    Task to import assets with progress tracking
    """
    from tablib import Dataset
    from .models import RegistroImportacion, Activo
    from .resources import ActivoResource
    from django.contrib.auth.models import User
    
    user = User.objects.get(id=user_id) if user_id else None
    cache_key = f"import_activos_progress_{user_id}"
    
    # Create import record
    registro = RegistroImportacion.objects.create(
        nombre=import_name,
        usuario=user,
        estado='PROCESANDO'
    )
    
    # Read file from storage
    with default_storage.open(file_path, 'rb') as f:
        file_content = f.read()
        dataset = Dataset().load(file_content, format=file_format)
    
    total_rows = len(dataset)
    registro.total_rows = total_rows
    registro.save()
    
    # Initialize progress
    progress_info = {
        'current': 0,
        'total': total_rows,
        'status': 'Procesando...',
        'percent': 0,
        'new': 0,
        'updated': 0,
        'errors': 0
    }
    cache.set(cache_key, progress_info, 3600)
    self.update_state(state='PROGRESS', meta=progress_info)
    
    # Import data
    resource = ActivoResource()
    result = resource.import_data(dataset, dry_run=dry_run, raise_errors=False)
    
    # Collect errors
    detailed_errors = []
    for error in result.base_errors:
        detailed_errors.append(f"Error General: {str(error.error)}")
    
    # Update record
    registro.filas_nuevas = result.totals.get('new', 0)
    registro.filas_actualizadas = result.totals.get('update', 0)
    registro.filas_omitidas = result.totals.get('skip', 0)
    registro.filas_error = len(detailed_errors)
    registro.estado = 'COMPLETADO'
    registro.save()
    
    # Cleanup
    if not dry_run and default_storage.exists(file_path):
        default_storage.delete(file_path)
    
    final_res = {
        'status': 'completed',
        'total': total_rows,
        'new': result.totals.get('new', 0),
        'updated': result.totals.get('update', 0),
        'skipped': result.totals.get('skip', 0),
        'errors': len(detailed_errors),
        'error_list': detailed_errors
    }
    
    cache.set(cache_key, final_res, 3600)
    return final_res

Document Processing Task

From documentos/tasks.py:
documentos/tasks.py
import logging
from celery import shared_task
import requests
from django.conf import settings

logger = logging.getLogger(__name__)

@shared_task(name='documentos.tasks.extract_document_metadata')
def extract_document_metadata(revision_id):
    """
    Extract text and metadata from document (PDF, etc.)
    """
    from .models import Revision
    
    try:
        revision = Revision.objects.get(pk=revision_id)
        revision.estado_extraccion = 'PROCESANDO'
        revision.save()

        if not revision.archivo:
            revision.estado_extraccion = 'ERROR'
            revision.datos_extraidos = {'error': 'No hay archivo asociado'}
            revision.save()
            return

        # Prepare callback URL for n8n
        base_callback_url = settings.INTERNAL_SITE_URL
        callback_url = f"{base_callback_url}/documentos/api/callback-procesamiento/{revision.id}/"

        payload = {
            'revision_id': revision.id,
            'documento_id': revision.documento.id,
            'filename': os.path.basename(revision.archivo.name),
            'file_url': revision.archivo.url,
            'file_key': revision.archivo.name,
            'tipo_documento': revision.documento.tipo_documento.nombre,
            'callback_url': callback_url,
            'metadatos_requeridos': list(
                revision.documento.tipo_documento.metadatos_config.values_list('nombre', flat=True)
            )
        }
        
        # Send to n8n for processing
        n8n_url = settings.N8N_PROCESS_DOCUMENT_WEBHOOK_URL
        response = requests.post(n8n_url, json=payload, timeout=10)
        logger.info(f"Revision {revision_id}: Enviada a n8n. Status: {response.status_code}")
        
        return True

    except Exception as e:
        logger.error(f"Error en extract_document_metadata: {str(e)}")
        revision.estado_extraccion = 'ERROR'
        revision.datos_extraidos = {'error': str(e)}
        revision.save()
        return False

Maintenance Task (Import Work Orders)

From mantenimiento/tasks.py:
mantenimiento/tasks.py
from celery import shared_task

@shared_task(bind=True, name='mantenimiento.tasks.import_ordenes_task')
def import_ordenes_task(self, file_path, file_format, user_id=None, 
                        verification_mode=False, dry_run=False, 
                        import_name="Importación OTs"):
    """
    Task to import work orders with progress tracking
    """
    from tablib import Dataset
    from django.core.files.storage import default_storage
    from .admin import OrdenTrabajoResource
    from django.core.cache import cache
    from .models import OrdenTrabajo
    from activos.models import RegistroImportacion
    from django.contrib.auth.models import User
    
    user = User.objects.get(id=user_id) if user_id else None
    cache_key = f"import_ordenes_progress_{user_id}"
    
    # Create import record
    if not verification_mode and not dry_run:
        registro = RegistroImportacion.objects.create(
            nombre=import_name,
            tipo='Ordenes Trabajo',
            usuario=user,
            estado='PROCESANDO'
        )
    
    # Read file
    with default_storage.open(file_path, 'rb') as f:
        file_content = f.read()
        dataset = Dataset().load(file_content, format=file_format)
    
    total_rows = len(dataset)
    
    # Initialize resource
    resource = OrdenTrabajoResource()
    resource.celery_task = self
    resource.cache_key = cache_key
    resource.total_rows = total_rows
    
    # Set initial progress
    progress_info = {
        'current': 0,
        'total': total_rows,
        'status': 'Iniciando importacion...',
        'percent': 0
    }
    cache.set(cache_key, progress_info, 3600)
    self.update_state(state='PROGRESS', meta=progress_info)
    
    # Import data
    try:
        result = resource.import_data(
            dataset,
            dry_run=dry_run,
            raise_errors=False,
            use_transactions=True
        )
        
        # Collect errors
        detailed_errors = []
        for error in result.base_errors:
            detailed_errors.append(f"Error General: {str(error.error)}")
        
        for line, errors in result.row_errors():
            for error in errors:
                detailed_errors.append(f"Fila {line}: {str(error.error)}")
        
        # Update record
        if not verification_mode and not dry_run:
            registro.filas_nuevas = result.totals.get('new', 0)
            registro.filas_actualizadas = result.totals.get('update', 0)
            registro.filas_omitidas = result.totals.get('skip', 0)
            registro.filas_error = len(detailed_errors)
            registro.estado = 'COMPLETADO'
            if detailed_errors:
                registro.detalles_error = "\n".join(detailed_errors[:50])
            registro.save()
        
        final_res = {
            'status': 'completed',
            'total': total_rows,
            'new': result.totals.get('new', 0),
            'updated': result.totals.get('update', 0),
            'skipped': result.totals.get('skip', 0),
            'errors': len(detailed_errors),
            'error_list': detailed_errors
        }
    except Exception as e:
        error_msg = f"Error crítico: {str(e)}"
        final_res = {'status': 'error', 'message': error_msg}
    
    # Cleanup file
    if not dry_run and default_storage.exists(file_path):
        default_storage.delete(file_path)
    
    cache.set(cache_key, final_res, 3600)
    return final_res

Periodic Tasks (Celery Beat)

Schedule recurring tasks:
settings.py - Beat Schedule
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'sync-document-embeddings-every-minute': {
        'task': 'documentos.tasks.sync_document_embeddings',
        'schedule': 60.0,  # Every 60 seconds
    },
    'cleanup-old-imports-daily': {
        'task': 'activos.tasks.cleanup_old_imports',
        'schedule': crontab(hour=2, minute=0),  # 2:00 AM daily
    },
    'generate-maintenance-reports-weekly': {
        'task': 'mantenimiento.tasks.generate_weekly_report',
        'schedule': crontab(day_of_week=1, hour=8, minute=0),  # Monday 8:00 AM
    },
}

Running Celery

Development

# Start Celery worker
celery -A energia worker -l info

Production (Docker)

docker-compose.yml
services:
  celery_worker:
    image: energia-cmms:latest
    command: celery -A energia worker -l info
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - DATABASE_URL=postgresql://user:pass@db:5432/energia
    depends_on:
      - redis
      - db
    volumes:
      - media_data:/app/media
  
  celery_beat:
    image: energia-cmms:latest
    command: celery -A energia beat -l info
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - DATABASE_URL=postgresql://user:pass@db:5432/energia
    depends_on:
      - redis
      - db

Monitoring Task Progress

Check Task Status

from celery.result import AsyncResult

def check_task_status(task_id):
    result = AsyncResult(task_id)
    
    if result.state == 'PENDING':
        response = {
            'state': result.state,
            'status': 'Pending...'
        }
    elif result.state == 'PROGRESS':
        response = {
            'state': result.state,
            'current': result.info.get('current', 0),
            'total': result.info.get('total', 1),
            'status': result.info.get('status', ''),
            'percent': result.info.get('percent', 0)
        }
    elif result.state == 'SUCCESS':
        response = {
            'state': result.state,
            'result': result.result
        }
    else:
        # FAILURE or other states
        response = {
            'state': result.state,
            'status': str(result.info)
        }
    
    return response

Monitor via Cache

from django.core.cache import cache

def get_import_progress(user_id):
    cache_key = f"import_activos_progress_{user_id}"
    progress = cache.get(cache_key)
    
    if progress:
        return progress
    else:
        return {'status': 'No import in progress'}

Troubleshooting

Error: Error: No connection could be madeSolutions:
  1. Verify Redis is running:
redis-cli ping
# Expected: PONG
  1. Check connection string:
from django.conf import settings
print(settings.CELERY_BROKER_URL)
  1. Test connection:
import redis
r = redis.from_url('redis://localhost:6379/0')
r.ping()  # Should return True
Symptoms:
  • Tasks stay in PENDING state
  • No error messages
Solutions:
  1. Check worker is running:
celery -A energia inspect active
  1. Verify task is registered:
celery -A energia inspect registered
  1. Check timeouts:
CELERY_TASK_TIME_LIMIT = 30 * 60  # 30 minutes
CELERY_BROKER_CONNECTION_TIMEOUT = 3
Error: OperationalError: connection timed outSolutions:
  1. Verify DATABASE_URL uses correct hostname:
# Bad (for Docker)
DATABASE_URL=postgresql://user:pass@localhost:5432/db

# Good (for Docker)
DATABASE_URL=postgresql://user:pass@db:5432/db
  1. Increase connection timeout:
DATABASES['default']['OPTIONS'].update({
    'connect_timeout': 20,
    'keepalives': 1,
    'keepalives_idle': 30,
})
Solution: Restart workers periodically:
# Max tasks before restart
celery -A energia worker --max-tasks-per-child=1000

# Max memory before restart
celery -A energia worker --max-memory-per-child=200000  # 200MB

Best Practices

Use bind=True

Always use @shared_task(bind=True) for tasks that need to track progress or retry

Implement Timeouts

Set CELERY_TASK_TIME_LIMIT to prevent runaway tasks

Clean Up Files

Delete temporary files in finally blocks to prevent storage leaks

Use Idempotent Tasks

Design tasks to be safely retried without side effects

n8n Integration

Trigger n8n workflows from Celery tasks

MinIO Storage

Process files from MinIO in background tasks

Build docs developers (and LLMs) love