Skip to main content
Energy CMMS is built on a modern, containerized architecture designed for high availability, scalability, and maintainability in demanding industrial environments.

Architecture Overview

The architecture supports both on-premises and cloud deployment via Docker containers managed by Coolify.

Core Components

Django Application Server

The heart of Energy CMMS is a Django 5.1.7 monolith with modular app structure:
# From energia/settings.py
INSTALLED_APPS = [
    'core',           # System core & energy monitoring
    'activos',        # Asset management
    'mantenimiento',  # CMMS
    'documentos',     # Document control
    'proyectos',      # Project management
    'presupuestos',   # Budget control
    'inventarios',    # Inventory management
    'almacen',        # Warehouse operations
    'seguridad',      # Safety management
    'auditorias',     # Audit & quality
    'callcenter',     # Ticketing system
    'comunicaciones', # Transmittals
    'servicios',      # Services & KPIs
    'plantillas',     # Template engine
]
Design Rationale: Monolithic architecture chosen for:
  • Simplified deployment (single container)
  • Consistent transaction boundaries across modules
  • Reduced inter-service communication overhead
  • Easier debugging and development

Database: PostgreSQL with Extensions

Primary data store with specialized extensions:
-- Vector extension for AI embeddings
CREATE EXTENSION IF NOT EXISTS vector;

-- Example embedding storage
CREATE TABLE documentos_documento (
    id SERIAL PRIMARY KEY,
    codigo VARCHAR(50),
    titulo VARCHAR(200),
    embedding vector(768)  -- Google Gemini embedding dimension
);

-- Vector similarity search
CREATE INDEX ON documentos_documento USING ivfflat (embedding vector_cosine_ops);
Schema Characteristics:
  • 150+ tables across all modules
  • Normalized design with foreign key enforcement
  • Strategic denormalization for reporting (e.g., PresupuestoAgrupado)
  • Full audit trail via Django’s automatic history

Object Storage: MinIO

All file uploads stored in S3-compatible MinIO:
# From energia/settings.py - Production configuration
AWS_S3_ENDPOINT_URL = os.environ.get('AWS_S3_ENDPOINT_URL', 
    'http://minio-cksckkgkcoogow4o4kg0gsog:9000')
AWS_STORAGE_BUCKET_NAME = 'energia-media'
AWS_S3_ADDRESSING_STYLE = 'path'  # Critical for MinIO compatibility

STORAGES = {
    "default": {
        "BACKEND": "storages.backends.s3boto3.S3Boto3Storage",
    },
    "staticfiles": {
        "BACKEND": "whitenoise.storage.CompressedManifestStaticFilesStorage",
    },
}
Storage Organization:
energia-media/
├── docs/2026/03/DOC-2026-0123/
│   ├── Manual_Rev1.pdf
│   └── Manual_Rev2.pdf
├── activos/fotos/
│   └── PUMP-001_20260306.jpg
├── ordenes/evidencias/
│   └── OT-2026-0456_completion.jpg
└── comunicaciones/adjuntos/
    └── TRM-2026-0789_attachment.pdf
Benefits:
  • Decoupled from application server (stateless Django containers)
  • Unlimited scaling without database bloat
  • Built-in versioning and lifecycle policies
  • Direct browser access via presigned URLs

Cache & Message Broker: Redis

Dual-purpose Redis instance:

Django Cache

  • Template fragment caching
  • Session storage
  • Query result caching
  • User preference caching

Celery Broker

  • Task queue management
  • Result backend (optional)
  • Task routing
  • Priority queues
# From energia/settings.py
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 
    'redis://default:saul123@lwcc8sss480ks4oc8gcgw4go:6379/0')

CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.redis.RedisCache',
        'LOCATION': CELERY_BROKER_URL,
        'OPTIONS': {
            'socket_timeout': 5,
            'socket_connect_timeout': 5,
            'retry_on_timeout': True,
        }
    }
}

Module Architecture

Core Design Patterns

Each Django app follows a consistent structure:
mantenimiento/
├── models.py              # Data models
├── admin.py               # Django admin configuration
├── views.py               # Request handlers
├── urls.py                # URL routing
├── forms.py               # Form validation
├── tasks.py               # Celery async tasks
├── api.py                 # REST API endpoints (optional)
├── templates/
│   └── mantenimiento/
│       ├── ordentrabajo_list.html
│       └── ordentrabajo_detail.html
├── static/
│   └── mantenimiento/
│       ├── css/
│       └── js/
└── migrations/
    ├── 0001_initial.py
    └── ...

Inter-Module Dependencies

Dependency graph (simplified): Key Relationships:
  • Activos is foundational - most modules reference assets
  • Mantenimiento orchestrates inventory, safety, and budgets
  • Documentos serves as an attachment hub for all modules
  • Core provides shared utilities (energy monitoring, user profiles)

Database Relationship Examples

Asset-Centric Model

# From activos/models (pseudocode)
class Ubicacion(models.Model):
    nombre = models.CharField(max_length=200)
    padre = models.ForeignKey('self', null=True)  # Recursive hierarchy

class Activo(models.Model):
    codigo_interno = models.CharField(max_length=50, unique=True)
    ubicacion = models.ForeignKey(Ubicacion)
    categoria = models.ForeignKey(Categoria)
    responsable = models.ForeignKey(User)
    
    # Reverse relations from other modules:
    # - ordentrabajo_set (from Mantenimiento)
    # - documento_set (from Documentos)
    # - puntomedicion_set (from Core)

Maintenance Workflow

# From mantenimiento/models.py
class Rutina(models.Model):
    """Preventive maintenance template"""
    nombre = models.CharField(max_length=200)
    tipo = models.ForeignKey(Tipo)  # Hierarchical classification
    frecuencia = models.ForeignKey(Frecuencia)
    procedimiento = models.ForeignKey(Procedimiento)

class Programacion(models.Model):
    """Schedule for automatic work order generation"""
    rutina = models.ForeignKey(Rutina)
    activo = models.ForeignKey('activos.Activo')
    fecha_inicio = models.DateField()
    fecha_fin = models.DateField()
    horario = models.ForeignKey(Horario)
    activo = models.BooleanField(default=True)

class OrdenTrabajo(models.Model):
    """Work order - can be generated from Rutina or Aviso"""
    numero = models.CharField(max_length=50, unique=True)
    tipo = models.CharField(choices=[('PREVENTIVO', 'Preventive'), 
                                     ('CORRECTIVO', 'Corrective')])
    programacion = models.ForeignKey(Programacion, null=True)
    aviso = models.ForeignKey(Aviso, null=True)  # If corrective
    activo = models.ForeignKey('activos.Activo')
    tecnico_asignado = models.ForeignKey('TecnicoPuesto')
    estado = models.CharField(choices=ESTADOS)
    
class CierreOrdenTrabajo(models.Model):
    """Work order completion record"""
    orden = models.OneToOneField(OrdenTrabajo)
    horas_hombre = models.DecimalField()
    falla = models.ForeignKey(Falla, null=True)
    observaciones = models.TextField()
    cerrado_por = models.ForeignKey(User)
    fecha_cierre = models.DateTimeField(auto_now=True)
Workflow: Programacion → auto-generates OrdenTrabajo → Technician executes → Creates CierreOrdenTrabajo

Integration Architecture

n8n Automation Platform

External automation server for AI and webhook processing: Configured Webhooks (from energia/settings.py):
# Document processing pipeline
N8N_PROCESS_DOCUMENT_WEBHOOK_URL = os.environ.get(
    'N8N_PROCESS_DOCUMENT_WEBHOOK_URL',
    'http://n8n-z8wscww488scgs84oo4os008:5678/webhook/process-document'
)

# AI chat interface
N8N_CHAT_WEBHOOK_URL = os.environ.get(
    'N8N_CHAT_WEBHOOK_URL',
    'http://n8n-z8wscww488scgs84oo4os008:5678/webhook/chat-documento'
)

# Text extraction for search indexing
N8N_EXTRACT_TEXTO_WEBHOOK_URL = os.environ.get(
    'N8N_EXTRACT_TEXTO_WEBHOOK_URL',
    'http://n8n-z8wscww488scgs84oo4os008:5678/webhook/extract-text'
)
n8n Workflow Features:
  • Asynchronous processing (Django doesn’t wait)
  • Retry logic with exponential backoff
  • Error notification via Django callback
  • Supports multiple AI models (Gemini, GPT-4, Claude)

Dynamics 365 Integration

ERP integration for procurement workflows:
# From presupuestos/dynamics_utils.py (conceptual)
class DynamicsConnector:
    def sync_requisition(self, requisicion_id):
        """Push requisition to D365 Purchasing"""
        req = Requisicion.objects.get(id=requisicion_id)
        
        # Map to D365 PurchReqTable format
        payload = {
            'PurchReqId': req.numero,
            'PurchReqName': req.descripcion,
            'WorkerPurchPlayyName': req.solicitante.username,
            'lines': [
                {
                    'ItemId': art.material.codigo,
                    'PurchQty': art.cantidad,
                    'PurchPrice': art.precio_unitario,
                }
                for art in req.articulos.all()
            ]
        }
        
        response = requests.post(
            'https://d365.ccg.hn/api/v1/PurchReqTable',
            json=payload,
            headers={'Authorization': f'Bearer {DYNAMICS_TOKEN}'}
        )
        
        if response.ok:
            req.sincronizado = True
            req.ultima_sync = timezone.now()
            req.save()
        
        return response
Sync Strategy:
  • Celery periodic task runs every 5 minutes
  • Only syncs requisitions in ‘APROBADA’ state
  • Bidirectional: pulls D365 approval status back
  • Conflict resolution: D365 is source of truth for status

External Ticketing (SIG) Integration

Callcenter module scrapes external system:
# From callcenter/scraper.py (simplified)
def scrape_sig_tickets():
    """Fetch new tickets from legacy system"""
    session = requests.Session()
    
    # Login
    session.post('http://sig.ccg.hn/login', data={
        'username': SIG_USER,
        'password': SIG_PASSWORD
    })
    
    # Get ticket list HTML
    response = session.get('http://sig.ccg.hn/tickets?status=open')
    soup = BeautifulSoup(response.text, 'html.parser')
    
    for row in soup.select('table.tickets tbody tr'):
        ticket_num = row.select_one('td.numero').text.strip()
        
        # Check if already exists
        if SolicitudTicket.objects.filter(numero=ticket_num).exists():
            continue
        
        # Create new ticket
        SolicitudTicket.objects.create(
            numero=ticket_num,
            tipo='M2',
            descripcion=row.select_one('td.descripcion').text,
            solicitante=get_default_user(),
            estado='PENDIENTE'
        )
Execution:
  • Celery beat task: every 15 minutes
  • Error handling: logs to Django DB (django_celery_results)
  • Rate limiting: max 100 tickets per sync

Background Task Processing

Celery Configuration

# From energia/settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'  # Store results in Django DB
CELERY_CACHE_BACKEND = 'django-cache'

CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60  # 30 minutes max

CELERY_BROKER_CONNECTION_TIMEOUT = 3  # Fast fail if Redis down
CELERY_BROKER_TRANSPORT_OPTIONS = {
    'socket_timeout': 3,
    'socket_connect_timeout': 3,
    'socket_keepalive': True,
}

Periodic Tasks (Celery Beat)

# From energia/settings.py
CELERY_BEAT_SCHEDULE = {
    'sync-document-embeddings-every-minute': {
        'task': 'documentos.tasks.sync_document_embeddings',
        'schedule': 60.0,  # Run every 60 seconds
    },
    'generate-preventive-work-orders-daily': {
        'task': 'mantenimiento.tasks.generate_scheduled_orders',
        'schedule': crontab(hour=6, minute=0),  # 6:00 AM daily
    },
    'sync-dynamics-requisitions': {
        'task': 'presupuestos.tasks.sync_all_requisitions',
        'schedule': 300.0,  # Every 5 minutes
    },
    'scrape-sig-tickets': {
        'task': 'callcenter.tasks.sync_tickets_from_sig',
        'schedule': 900.0,  # Every 15 minutes
    },
}

Task Example: Import Activos

# From activos/tasks.py (conceptual)
from celery import shared_task
from django.core.cache import cache

@shared_task(bind=True)
def import_activos_from_excel(self, file_path, user_id):
    """Background task for large asset imports"""
    import pandas as pd
    from activos.models import Activo, Ubicacion, RegistroImportacion
    
    # Create import record for tracking
    registro = RegistroImportacion.objects.create(
        archivo=file_path,
        usuario_id=user_id,
        estado='EN_PROCESO',
        celery_task_id=self.request.id
    )
    
    try:
        df = pd.read_excel(file_path)
        total = len(df)
        
        for idx, row in df.iterrows():
            # Validate and create activo
            ubicacion = Ubicacion.objects.get(codigo=row['ubicacion_codigo'])
            
            Activo.objects.update_or_create(
                codigo_interno=row['codigo'],
                defaults={
                    'descripcion': row['descripcion'],
                    'ubicacion': ubicacion,
                    'estado': row['estado'],
                }
            )
            
            # Update progress in cache for UI polling
            progress = int((idx + 1) / total * 100)
            cache.set(f'import_progress_{self.request.id}', progress, timeout=3600)
            
            # Update Celery state for monitoring
            self.update_state(state='PROGRESS', meta={'current': idx + 1, 'total': total})
        
        registro.estado = 'COMPLETADO'
        registro.registros_procesados = total
        registro.save()
        
        return {'status': 'success', 'total': total}
    
    except Exception as e:
        registro.estado = 'ERROR'
        registro.mensaje_error = str(e)
        registro.save()
        raise
UI Integration:
// Frontend polling for task progress
function checkImportProgress(taskId) {
    fetch(`/activos/import-progress/${taskId}/`)
        .then(res => res.json())
        .then(data => {
            updateProgressBar(data.progress);
            if (data.state === 'SUCCESS') {
                showSuccessMessage(data.result.total + ' activos imported');
            } else if (data.state !== 'FAILURE') {
                setTimeout(() => checkImportProgress(taskId), 2000);
            }
        });
}

Deployment Architecture

Container Strategy

Designed for Docker Compose / Coolify orchestration:
# Conceptual docker-compose.yml structure
services:
  db:
    image: pgvector/pgvector:pg16
    environment:
      POSTGRES_DB: energia
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: ${DB_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine
    command: redis-server --requirepass ${REDIS_PASSWORD}
    volumes:
      - redis_data:/data

  minio:
    image: minio/minio:latest
    command: server /data --console-address ":9001"
    environment:
      MINIO_ROOT_USER: ${MINIO_USER}
      MINIO_ROOT_PASSWORD: ${MINIO_PASSWORD}
    volumes:
      - minio_data:/data

  web:
    build: .
    command: gunicorn energia.wsgi:application --bind 0.0.0.0:8000 --workers 4
    environment:
      DATABASE_URL: postgres://postgres:${DB_PASSWORD}@db:5432/energia
      CELERY_BROKER_URL: redis://:${REDIS_PASSWORD}@redis:6379/0
      AWS_S3_ENDPOINT_URL: http://minio:9000
    depends_on:
      - db
      - redis
      - minio

  celery_worker:
    build: .
    command: celery -A energia worker -l info
    environment:
      DATABASE_URL: postgres://postgres:${DB_PASSWORD}@db:5432/energia
      CELERY_BROKER_URL: redis://:${REDIS_PASSWORD}@redis:6379/0
    depends_on:
      - db
      - redis

  celery_beat:
    build: .
    command: celery -A energia beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
    environment:
      DATABASE_URL: postgres://postgres:${DB_PASSWORD}@db:5432/energia
      CELERY_BROKER_URL: redis://:${REDIS_PASSWORD}@redis:6379/0
    depends_on:
      - db
      - redis

  n8n:
    image: n8nio/n8n
    environment:
      N8N_BASIC_AUTH_USER: admin
      N8N_BASIC_AUTH_PASSWORD: ${N8N_PASSWORD}
      WEBHOOK_URL: https://energia.example.com
    volumes:
      - n8n_data:/home/node/.n8n

Environment Detection

Smart environment configuration:
# From energia/settings.py
DEBUG = os.environ.get('DJANGO_DEBUG', 'False').lower() == 'true'
IS_LOCAL = DEBUG and not os.environ.get('COOLIFY_FQDN')

# Automatic host configuration
coolify_fqdn = os.environ.get('COOLIFY_FQDN')
if coolify_fqdn:
    ALLOWED_HOSTS.append(coolify_fqdn)
    CSRF_TRUSTED_ORIGINS.append(f"https://{coolify_fqdn}")

# Network routing: internal vs. external URLs
if coolify_fqdn:
    SITE_URL = f"http://{coolify_fqdn}"  # Public URL for users
    INTERNAL_SITE_URL = 'http://kgogwsw00cwcw8g0wk0gsogg:8000'  # Docker network
else:
    SITE_URL = 'http://localhost:8000'
    INTERNAL_SITE_URL = 'http://181.115.47.107:8000'  # Dev remote callback

Static & Media File Serving

Production Strategy:
# Static files (CSS/JS): WhiteNoise (served by Django)
STATIC_ROOT = BASE_DIR / 'staticfiles'
STATICFILES_STORAGE = 'whitenoise.storage.CompressedManifestStaticFilesStorage'

# Media files (uploads): MinIO via proxy
MEDIA_URL = '/media-proxy/'  # Proxied through Django to avoid Mixed Content

# Django view for media proxy
def media_proxy_view(request, path):
    """Proxy MinIO files to add HTTPS and authentication"""
    minio_url = f"{AWS_S3_ENDPOINT_URL}/{AWS_STORAGE_BUCKET_NAME}/{path}"
    response = requests.get(minio_url, stream=True)
    
    return HttpResponse(
        response.content,
        content_type=response.headers.get('Content-Type'),
        headers={'Content-Disposition': response.headers.get('Content-Disposition')}
    )
Why proxy instead of direct MinIO access?
  • Avoids mixed content warnings (HTTPS site loading HTTP MinIO)
  • Centralizes authentication (check Django permissions before serving)
  • Simplifies CORS configuration
  • Enables access logging and rate limiting

High Availability Considerations

For production deployments, consider these scaling strategies:
Database:
  • PostgreSQL replication (master-standby)
  • Connection pooling via PgBouncer
  • Read replicas for reporting queries
Application:
  • Horizontal scaling: 4+ Gunicorn workers per container
  • Multiple web containers behind load balancer
  • Session store in Redis (not filesystem)
Celery:
  • Separate worker pools by task type (import, sync, report)
  • Priority queues for user-facing vs. background tasks
  • Auto-scaling based on queue depth
Storage:
  • MinIO distributed mode (4+ nodes)
  • Erasure coding for redundancy
  • S3 lifecycle policies for archival

Security Architecture

Network Isolation

Security Boundaries:
  • Only Traefik exposed to internet (ports 80/443)
  • Database, Redis, MinIO only accessible via internal network
  • n8n authenticates via webhook secrets
  • Celery workers have no external network access

Authentication Flow

# From energia/settings.py
LOGIN_URL = '/admin/login/'
LOGIN_REDIRECT_URL = '/app/'

# Django's built-in session authentication
SESSION_COOKIE_SECURE = not DEBUG  # HTTPS only in production
SESSION_COOKIE_HTTPONLY = True  # Prevent XSS access
SESSION_COOKIE_SAMESITE = 'Lax'  # CSRF protection
User Model Extensions:
# From core/models.py
class PerfilUsuario(models.Model):
    user = models.OneToOneField(User, on_delete=models.CASCADE, related_name='perfil')
    ubicacion_defecto = models.ForeignKey('activos.Ubicacion', null=True)
    jefe_directo = models.ForeignKey(User, null=True, related_name='subordinados')
    telefono = models.CharField(max_length=20, blank=True)
    
    # Custom permissions can be added here
    puede_aprobar_requisiciones = models.BooleanField(default=False)
    limite_aprobacion = models.DecimalField(max_digits=10, decimal_places=2, default=0)

Permission System

Django’s built-in permission framework:
# In views.py
from django.contrib.auth.decorators import login_required, permission_required

@login_required
@permission_required('mantenimiento.view_ordentrabajo', raise_exception=True)
def orden_trabajo_list(request):
    """Only users with 'view_ordentrabajo' permission can access"""
    ordenes = OrdenTrabajo.objects.filter(estado='PENDIENTE')
    return render(request, 'mantenimiento/ordenes.html', {'ordenes': ordenes})

# In admin.py
class OrdenTrabajoAdmin(admin.ModelAdmin):
    def has_add_permission(self, request):
        # Only supervisors can create work orders
        return request.user.groups.filter(name='Supervisores').exists()
Group-Based Access Control: Common user groups:
  • Administradores: Full system access
  • Supervisores: Create/edit work orders, approve budgets
  • Técnicos: View assigned work orders, update status
  • Consultores: Read-only access to reports
  • Operadores: Limited access to call center and asset viewing

Data Protection

SQL Injection Prevention:
# Django ORM automatically escapes parameters
activos = Activo.objects.filter(codigo_interno=user_input)  # Safe

# Raw SQL (avoid, but if needed):
from django.db import connection
cursor = connection.cursor()
cursor.execute("SELECT * FROM activos_activo WHERE codigo = %s", [user_input])  # Safe with parameters
XSS Prevention:
{# Django templates auto-escape by default #}
<p>User input: {{ user_comment }}</p>  {# Automatically escaped #}

{# Explicitly mark safe only for trusted content #}
{{ html_from_admin|safe }}
CSRF Protection:
{# All POST forms must include CSRF token #}
<form method="post">
    {% csrf_token %}
    <input type="text" name="description">
    <button type="submit">Save</button>
</form>
# From energia/settings.py
CSRF_TRUSTED_ORIGINS = [
    'https://softcom.ccg.hn',
    'https://*.ccg.hn',
]

# For AJAX requests
CSRF_COOKIE_HTTPONLY = False  # Allow JavaScript to read for X-CSRFToken header

Audit Logging

Comprehensive audit trail:
# Django admin automatically logs changes
from django.contrib.admin.models import LogEntry, ADDITION, CHANGE, DELETION

# View all changes to a specific object
logs = LogEntry.objects.filter(
    content_type__model='activo',
    object_id='123'
).order_by('-action_time')

for log in logs:
    print(f"{log.action_time}: {log.user.username} {log.get_action_flag_display()} - {log.change_message}")
Custom audit models for sensitive operations:
# From documentos/models.py
class AuditoriaFirmas(models.Model):
    """Immutable record of all document signatures"""
    documento_firmado = models.ForeignKey(DocumentoFirmado)
    firmante = models.ForeignKey(User)
    fecha_firma = models.DateTimeField(auto_now_add=True)
    ip_address = models.GenericIPAddressField()
    user_agent = models.TextField()
    hash_documento = models.CharField(max_length=64)  # SHA256 of signed file
    
    class Meta:
        permissions = [
            ("view_auditoria_firmas", "Can view signature audit trail"),
        ]

Performance Optimization

Database Query Optimization

Select Related / Prefetch Related:
# Inefficient: N+1 queries
ordenes = OrdenTrabajo.objects.filter(estado='PENDIENTE')
for orden in ordenes:
    print(orden.activo.descripcion)  # Query for each activo
    print(orden.tecnico_asignado.user.username)  # Query for each tecnico

# Optimized: 1 query with joins
ordenes = OrdenTrabajo.objects.filter(estado='PENDIENTE') \
    .select_related('activo', 'tecnico_asignado__user') \
    .prefetch_related('valorpasoorden_set')

# Now only 2-3 queries total regardless of result count
Database Indexes:
# From activos/models.py
class Activo(models.Model):
    codigo_interno = models.CharField(max_length=50, unique=True, db_index=True)
    ubicacion = models.ForeignKey(Ubicacion, on_delete=models.PROTECT, db_index=True)
    estado = models.CharField(max_length=20, db_index=True)
    
    class Meta:
        indexes = [
            models.Index(fields=['ubicacion', 'estado']),  # Composite index
            models.Index(fields=['-fecha_creacion']),  # Descending for "latest" queries
        ]
Query Result Caching:
from django.core.cache import cache

def get_active_assets_count():
    cache_key = 'active_assets_count'
    count = cache.get(cache_key)
    
    if count is None:
        count = Activo.objects.filter(estado='ACTIVO').count()
        cache.set(cache_key, count, timeout=300)  # Cache for 5 minutes
    
    return count

Template Fragment Caching

{% load cache %}

{% cache 600 sidebar request.user.id %}
    {# Expensive sidebar widget that only changes per user #}
    <div class="sidebar">
        {% for menu_item in get_user_menu %}
            <a href="{{ menu_item.url }}">{{ menu_item.title }}</a>
        {% endfor %}
    </div>
{% endcache %}

Pagination

from django.core.paginator import Paginator

def activo_list(request):
    activos = Activo.objects.select_related('ubicacion').order_by('codigo_interno')
    paginator = Paginator(activos, 50)  # 50 items per page
    
    page_number = request.GET.get('page', 1)
    page_obj = paginator.get_page(page_number)
    
    return render(request, 'activos/list.html', {'page_obj': page_obj})

Celery Task Optimization

Chunking large operations:
from celery import group

@shared_task
def process_single_activo(activo_id):
    """Process one activo (fast)"""
    activo = Activo.objects.get(id=activo_id)
    # ... processing logic
    return activo.codigo_interno

def process_all_activos():
    """Distribute load across workers"""
    activo_ids = Activo.objects.values_list('id', flat=True)
    
    # Create group of parallel tasks
    job = group(process_single_activo.s(aid) for aid in activo_ids)
    result = job.apply_async()
    
    return result

Monitoring & Observability

Celery Task Monitoring

Results stored in Django database:
# From energia/settings.py
CELERY_RESULT_BACKEND = 'django-db'

# Admin interface to view task status
from django_celery_results.models import TaskResult

# Get task results
recent_tasks = TaskResult.objects.filter(
    task_name='documentos.tasks.sync_document_embeddings'
).order_by('-date_created')[:10]

for task in recent_tasks:
    print(f"{task.task_id}: {task.status} - {task.result}")

Application Logging

# From energia/settings.py
LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'verbose': {
            'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
            'style': '{',
        },
    },
    'handlers': {
        'console': {
            'level': 'INFO',
            'class': 'logging.StreamHandler',
            'formatter': 'verbose',
        },
        'file': {
            'level': 'WARNING',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/var/log/energia/django.log',
            'maxBytes': 1024 * 1024 * 10,  # 10 MB
            'backupCount': 5,
            'formatter': 'verbose',
        },
    },
    'loggers': {
        'django': {
            'handlers': ['console', 'file'],
            'level': 'INFO',
            'propagate': True,
        },
        'mantenimiento': {
            'handlers': ['console', 'file'],
            'level': 'DEBUG',
        },
    },
}

Health Check Endpoints

# From core/views.py
from django.http import JsonResponse
from django.db import connections
from django.core.cache import cache

def health_check(request):
    """Endpoint for load balancer health checks"""
    status = {
        'database': 'unknown',
        'cache': 'unknown',
        'celery': 'unknown',
    }
    
    # Check database
    try:
        connections['default'].cursor()
        status['database'] = 'ok'
    except Exception as e:
        status['database'] = f'error: {str(e)}'
    
    # Check cache
    try:
        cache.set('health_check', 'ok', timeout=10)
        if cache.get('health_check') == 'ok':
            status['cache'] = 'ok'
    except Exception as e:
        status['cache'] = f'error: {str(e)}'
    
    # Check Celery (simple ping to broker)
    try:
        from celery import current_app
        inspect = current_app.control.inspect()
        if inspect.ping():
            status['celery'] = 'ok'
    except Exception as e:
        status['celery'] = f'error: {str(e)}'
    
    all_ok = all(v == 'ok' for v in status.values())
    
    return JsonResponse(status, status=200 if all_ok else 503)

Scalability Roadmap

Current Capacity

  • 50,000+ assets
  • 10,000+ work orders/month
  • 100+ concurrent users
  • 1TB document storage

Future Scaling

  • Microservices extraction (Documents, Inventory)
  • Elasticsearch for full-text search
  • GraphQL API layer
  • Multi-tenancy support

Potential Microservices Split

If monolith becomes unwieldy: Services to Extract First:
  1. Documents: Self-contained with heavy storage needs
  2. Inventory: High transaction volume, could benefit from dedicated optimization
  3. Integrations: n8n, Dynamics sync as separate worker services

Technology Stack Summary

Backend

TechnologyVersionPurpose
Python3.11+Runtime
Django5.1.7Web framework
PostgreSQL16+Primary database
pgvectorlatestVector embeddings
Redis7.xCache & message broker
Celery5.xBackground tasks
Gunicorn21+WSGI server

Storage & Integration

TechnologyPurpose
MinIOS3-compatible object storage
n8nAutomation & AI workflows
Dynamics 365ERP integration
WhiteNoiseStatic file serving

Key Python Packages

# Core Framework
Django==5.1.7
psycopg2-binary==2.9.9
dj-database-url==2.1.0

# Storage & Files
django-storages==1.14.2
boto3==1.34.51
whitenoise==6.6.0

# Admin & UI
django-jazzmin==2.6.0
django-colorfield==0.11.0

# Data Handling
django-import-export==3.3.7
pandas==2.2.0
openpyxl==3.1.2

# Async & Tasks
celery==5.3.6
redis==5.0.1
django-celery-results==2.5.1
django-celery-beat==2.5.0

# API & Integration
requests==2.31.0
django-cors-headers==4.3.1
beautifulsoup4==4.12.3  # For web scraping

# AI & Embeddings
google-generativeai==0.3.2
pgvector==0.2.4

# Security
django-ratelimit==4.1.0
cryptography==42.0.2

Development Workflow

Local Development Setup

# Clone repository
git clone https://github.com/ccg/energia-cmms.git
cd energia-cmms

# Create virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install dependencies
pip install -r requirements.txt

# Set environment variables
export DJANGO_DEBUG=True
export DATABASE_URL=postgres://postgres:admin123@localhost:5434/energia

# Run migrations
python manage.py migrate

# Create superuser
python manage.py createsuperuser

# Start development server
python manage.py runserver

# In separate terminal: Start Celery worker
celery -A energia worker -l info

# In another terminal: Start Celery beat
celery -A energia beat -l info

Testing Strategy

# From mantenimiento/tests.py
from django.test import TestCase
from django.contrib.auth.models import User
from activos.models import Activo, Ubicacion
from mantenimiento.models import OrdenTrabajo, Rutina

class OrdenTrabajoTestCase(TestCase):
    def setUp(self):
        self.ubicacion = Ubicacion.objects.create(nombre='Planta', codigo='PLT01')
        self.activo = Activo.objects.create(
            codigo_interno='PUMP-001',
            ubicacion=self.ubicacion
        )
        self.user = User.objects.create_user('tecnico', '[email protected]', 'pass')
    
    def test_create_orden_correctiva(self):
        orden = OrdenTrabajo.objects.create(
            tipo='CORRECTIVO',
            activo=self.activo,
            descripcion='Bomba con ruido anormal',
            solicitado_por=self.user
        )
        
        self.assertEqual(orden.estado, 'PENDIENTE')
        self.assertIsNotNone(orden.numero)  # Auto-generated
        self.assertTrue(orden.numero.startswith('OT-'))
    
    def test_cierre_orden_requiere_horas(self):
        orden = OrdenTrabajo.objects.create(
            tipo='CORRECTIVO',
            activo=self.activo
        )
        
        # Attempt to close without hours should fail validation
        from mantenimiento.forms import CierreOrdenForm
        form = CierreOrdenForm(data={'observaciones': 'Fixed'})
        self.assertFalse(form.is_valid())
        self.assertIn('horas_hombre', form.errors)

Database Migrations

# Create migration after model changes
python manage.py makemigrations mantenimiento

# Review migration file
cat mantenimiento/migrations/0042_add_cierre_fields.py

# Apply migration
python manage.py migrate mantenimiento

# Rollback if needed
python manage.py migrate mantenimiento 0041

Future Architecture Considerations

GraphQL API Layer

For flexible mobile/external integrations:
# Potential graphene-django implementation
import graphene
from graphene_django import DjangoObjectType
from activos.models import Activo

class ActivoType(DjangoObjectType):
    class Meta:
        model = Activo
        fields = '__all__'

class Query(graphene.ObjectType):
    activos = graphene.List(ActivoType, ubicacion_id=graphene.Int())
    
    def resolve_activos(self, info, ubicacion_id=None):
        qs = Activo.objects.select_related('ubicacion')
        if ubicacion_id:
            qs = qs.filter(ubicacion_id=ubicacion_id)
        return qs

schema = graphene.Schema(query=Query)

Event-Driven Architecture

For real-time updates:
# Django Channels for WebSocket support
from channels.generic.websocket import AsyncWebsocketConsumer
import json

class OrdenTrabajoConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_group_name = 'ordenes_updates'
        await self.channel_layer.group_add(self.room_group_name, self.channel_name)
        await self.accept()
    
    async def orden_updated(self, event):
        await self.send(text_data=json.dumps({
            'type': 'orden_update',
            'orden_id': event['orden_id'],
            'estado': event['estado']
        }))

# Trigger from signal
from django.db.models.signals import post_save
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

@receiver(post_save, sender=OrdenTrabajo)
def notify_orden_update(sender, instance, **kwargs):
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        'ordenes_updates',
        {
            'type': 'orden_updated',
            'orden_id': instance.id,
            'estado': instance.estado
        }
    )

Multi-Tenancy

For SaaS deployment:
# Tenant-aware models
from django_tenants.models import TenantMixin

class Cliente(TenantMixin):
    name = models.CharField(max_length=100)
    created_on = models.DateField(auto_now_add=True)
    
    auto_create_schema = True  # Automatic schema per tenant

# All models automatically filtered by tenant
activos = Activo.objects.all()  # Only returns current tenant's data

Conclusion

Energy CMMS is architected for: Reliability: Battle-tested Django framework with comprehensive error handling
Scalability: Celery-based background processing, caching, and database optimization
Maintainability: Modular app structure with clear separation of concerns
Extensibility: Plugin architecture for custom modules and integrations
Security: Defense-in-depth with multiple layers of protection
The system is production-ready for industrial environments with 100+ users, 50,000+ assets, and 10,000+ monthly work orders.

Deployment Guide

Ready to deploy? Check out the installation guide for step-by-step instructions.

Build docs developers (and LLMs) love