Skip to main content

Overview

The ConversionManager is the core orchestration system for Frame’s conversion tasks. It manages a queue of pending tasks, enforces concurrency limits, and coordinates worker processes.

Architecture

The manager uses an actor-like pattern with message passing:
  • Main event loop - Processes messages and manages queue state
  • Worker tasks - Spawn for each conversion, run FFmpeg
  • Message channel - Async communication between components
┌─────────────────────────────────────────┐
│         ConversionManager               │
├─────────────────────────────────────────┤
│  Message Channel (mpsc)                 │
│  ┌─────────────────────────────────┐   │
│  │  Event Loop                     │   │
│  │  - Queue: VecDeque<Task>       │   │
│  │  - Running: HashMap<id, ()>    │   │
│  │  - Max Concurrency: AtomicUsize│   │
│  └─────────────────────────────────┘   │
│                                         │
│  Active Tasks Map                       │
│  ┌─────────────────────────────────┐   │
│  │  HashMap<task_id, process_pid>  │   │
│  └─────────────────────────────────┘   │
│                                         │
│  Cancelled Tasks Set                    │
│  ┌─────────────────────────────────┐   │
│  │  HashSet<task_id>               │   │
│  └─────────────────────────────────┘   │
└─────────────────────────────────────────┘

          ├─> Worker 1 (FFmpeg)
          ├─> Worker 2 (FFmpeg)
          └─> Worker N (FFmpeg)

ManagerMessage Types

Messages sent through the internal channel (manager.rs:27-33):
Enqueue
Box<ConversionTask>
Add a new task to the queue. Triggers queue processing.
ConcurrencyUpdated
()
Max concurrency setting changed. Triggers queue processing to start additional tasks if slots available.
TaskStarted
(String, u32)
Worker has started processing. Contains task ID and FFmpeg process PID.
TaskCompleted
String
Task finished successfully. Contains task ID. Triggers queue processing.
TaskError
(String, ConversionError)
Task failed with error. Contains task ID and error details. Triggers queue processing.

Task Lifecycle

1. Enqueue

manager.sender.send(ManagerMessage::Enqueue(Box::new(task))).await
  • Task added to queue (VecDeque)
  • Task ID added to queued_ids set (prevents duplicates)
  • Removed from cancelled_tasks (allows re-queuing)
  • process_queue() called to potentially start task

2. Started

When a worker slot is available:
  • Task removed from queue
  • Task ID removed from queued_ids
  • Task ID added to running_tasks map
  • Worker spawned with run_ffmpeg_worker()
  • Worker sends TaskStarted(id, pid) message
  • PID stored in active_tasks map
If task was cancelled before starting:
  • Process terminated immediately
  • Task removed from running_tasks and active_tasks

3. Progress Updates

Worker emits events during execution:
app.emit("conversion-progress", ProgressPayload { id, progress })
app.emit("conversion-log", LogPayload { id, line })

4. Completion

On success:
  • Worker sends TaskCompleted(id) message
  • Task removed from running_tasks, cancelled_tasks, active_tasks
  • conversion-completed event emitted with output path
  • process_queue() called to start next queued task
On error:
  • Worker sends TaskError(id, error) message
  • Same cleanup as completion
  • conversion-error event emitted
  • Error logged to console and sent to frontend

5. Cancellation

User-initiated cancel:
  • Task ID added to cancelled_tasks set
  • If running, FFmpeg process terminated (SIGKILL/TerminateProcess)
  • Temporary upscale directory cleaned up
  • If queued but not started, will be skipped when dequeued

Concurrency Commands

get_max_concurrency

Retrieves the current maximum number of concurrent conversions allowed.

Signature

pub fn get_max_concurrency(
    manager: tauri::State<'_, ConversionManager>,
) -> Result<usize, ConversionError>

Response

value
number
Current maximum concurrency setting. Default: 2

Example

const maxConcurrency = await invoke('get_max_concurrency');
console.log(`Max concurrent tasks: ${maxConcurrency}`);

set_max_concurrency

Updates the maximum number of concurrent conversions. Takes effect immediately.

Signature

pub fn set_max_concurrency(
    manager: tauri::State<'_, ConversionManager>,
    value: usize,
) -> Result<(), ConversionError>

Parameters

value
number
required
New maximum concurrency value. Must be at least 1.

Behavior

When increased:
  • Immediately processes queue to start additional tasks up to new limit
When decreased:
  • Running tasks continue
  • No new tasks start until running count drops below new limit

Errors

  • InvalidInput - Value is 0 or negative

Example

try {
  await invoke('set_max_concurrency', { value: 4 });
  console.log('Concurrency updated to 4');
} catch (error) {
  console.error('Failed to update concurrency:', error);
}

Process Control

Pause/Resume Implementation

Uses platform-specific process suspension: Unix (macOS/Linux):
libc::kill(pid, libc::SIGSTOP)  // Pause
libc::kill(pid, libc::SIGCONT)  // Resume
Windows:
NtSuspendProcess(process_handle)  // Pause
NtResumeProcess(process_handle)   // Resume

Termination

Cancellation uses forceful termination: Unix:
libc::kill(pid, libc::SIGCONT)  // Ensure not paused
libc::kill(pid, libc::SIGKILL)   // Kill
Windows:
NtResumeProcess(process_handle)       // Ensure not paused
TerminateProcess(process_handle, 1)   // Kill

Queue Processing Algorithm

The process_queue() function (manager.rs:201-246) runs whenever:
  • New task enqueued
  • Task completed/failed
  • Concurrency limit changed
Algorithm:
loop {
    if running_tasks.len() >= max_concurrency {
        break; // At capacity
    }
    
    if let Some(task) = queue.pop_front() {
        // Check if cancelled
        if cancelled_tasks.contains(task.id) {
            continue; // Skip this task
        }
        
        // Mark as running
        running_tasks.insert(task.id, ());
        
        // Spawn worker
        spawn(run_ffmpeg_worker(task));
    } else {
        break; // Queue empty
    }
}

State Synchronization

Thread-safe state managed with:
  • AtomicUsize - Max concurrency (lock-free reads)
  • Mutex HashMap - Active tasks map (PID lookup)
  • Mutex HashSet - Cancelled tasks set (cancellation checks)
All modifications are lock-ordered to prevent deadlocks.

Cleanup Operations

Temporary Upscale Directory

When ML upscaling is used, frames are extracted to:
/tmp/frame_upscale_{task_id}/
Cleaned up on:
  • Task completion (success or error)
  • Task cancellation
Cleanup is best-effort (errors ignored):
let temp_dir = std::env::temp_dir().join(format!("frame_upscale_{}", id));
if temp_dir.exists() {
    let _ = std::fs::remove_dir_all(&temp_dir);
}

Error Handling

Manager-specific errors:
  • Channel - Internal message channel closed (fatal)
  • TaskNotFound - Pause/resume/cancel on non-existent task
  • Shell - Failed to send signal to process (SIGSTOP/SIGKILL)
  • InvalidInput - Invalid concurrency value (0 or negative)

Performance Considerations

Default Concurrency

Default: 2 concurrent tasks (types.rs:3) Rationale:
  • Video encoding is CPU/GPU intensive
  • Multiple tasks compete for resources
  • Higher concurrency may slow overall throughput

Queue Strategy

FIFO (First-In-First-Out) using VecDeque:
  • Fair processing order
  • Predictable completion for users
  • Simple implementation

Duplicate Prevention

  • queued_ids HashSet prevents duplicate queue entries
  • running_tasks HashMap prevents double-execution
  • Task ID uniqueness is frontend’s responsibility

Example: Full Task Flow

import { invoke } from '@tauri-apps/api/core';
import { listen } from '@tauri-apps/api/event';

// Set up event listeners
await listen('conversion-started', (e) => {
  console.log(`Task ${e.payload.id} started`);
});

await listen('conversion-progress', (e) => {
  console.log(`Task ${e.payload.id}: ${e.payload.progress}%`);
});

await listen('conversion-completed', (e) => {
  console.log(`Task ${e.payload.id} completed: ${e.payload.outputPath}`);
});

await listen('conversion-error', (e) => {
  console.error(`Task ${e.payload.id} failed: ${e.payload.error}`);
});

// Configure concurrency
await invoke('set_max_concurrency', { value: 3 });

// Queue multiple tasks
for (let i = 0; i < 5; i++) {
  await invoke('queue_conversion', {
    id: `task-${i}`,
    filePath: `/path/to/video${i}.mp4`,
    outputName: `output${i}`,
    config: { /* ... */ }
  });
}

// Tasks 0, 1, 2 start immediately
// Tasks 3, 4 wait in queue
// As tasks complete, queued tasks start

// Pause a running task
await invoke('pause_conversion', { id: 'task-1' });

// Resume it later
await invoke('resume_conversion', { id: 'task-1' });

// Cancel a task
await invoke('cancel_conversion', { id: 'task-2' });

Build docs developers (and LLMs) love