Skip to main content
The Scribe pipeline implements a sophisticated error handling system that distinguishes between fatal errors (stop execution) and non-fatal errors (continue with degraded service).

StepResult Pattern

Every pipeline step returns a StepResult object that encapsulates success/failure state, error messages, and warnings.

StepResult Structure

@dataclass
class StepResult:
    """
    Result of a pipeline step execution.
    
    Returned by BasePipelineStep.execute() to indicate success/failure.
    """
    
    success: bool
    """Whether the step completed successfully"""
    
    step_name: str
    """Name of the step that produced this result"""
    
    error: Optional[str] = None
    """Error message if success=False"""
    
    metadata: Optional[Dict[str, Any]] = None
    """
    Optional metadata about execution:
    - duration: float (seconds)
    - output_size: int (bytes/chars)
    - api_calls_made: int
    - retries_attempted: int
    """
    
    warnings: List[str] = field(default_factory=list)
    """Non-fatal warnings (e.g., 'some URLs failed to scrape')"""
The StepResult pattern provides a clean separation between success with warnings (continue pipeline) and failure (stop pipeline).

Example: Successful Step

# Email composer successfully generates email
return StepResult(
    success=True,
    step_name="email_composer",
    metadata={
        "email_id": str(email_id),
        "word_count": len(email_text.split()),
        "duration": 3.1
    }
)

Example: Success with Warnings

# Web scraper succeeds but some URLs failed
return StepResult(
    success=True,
    step_name="web_scraper",
    warnings=["2 out of 5 URLs failed to scrape"],
    metadata={
        "citation_count": 3,
        "failed_urls": ["https://example.com/timeout"]
    }
)

Example: Fatal Failure

# Template parser failed - cannot continue
return StepResult(
    success=False,
    step_name="template_parser",
    error="Failed to classify template type: LLM timeout"
)
If success=False, the error field must be populated. The StepResult.__post_init__ method validates this:
def __post_init__(self):
    """Validation: if success=False, error must be set"""
    if not self.success and not self.error:
        raise ValueError("StepResult with success=False must have error message")

Error Categories

Fatal Errors (Pipeline Stops)

Fatal errors indicate the pipeline cannot continue and should immediately terminate. When to use:
  • Template parser fails (no search terms to proceed)
  • Email composer database write fails
  • Missing required input fields
  • Invalid user authentication
Implementation in runner.py:247-254:
# Execute step
result = await step.execute(pipeline_data, progress_callback)

# Check for failure
if not result.success:
    raise StepExecutionError(
        step.step_name,
        Exception(result.error or "Unknown error")
    )

Non-Fatal Errors (Pipeline Continues)

Non-fatal errors are logged but don’t stop the pipeline. The step returns success=True with warnings. When to use:
  • Some web scraping URLs timeout (use successful ones)
  • ArXiv API unavailable (skip academic papers)
  • Email validation warnings (still persist email)
  • JSON parsing fallback (use plain text)
Example from email_composer/main.py:86-99:
try:
    parsed = json.loads(response_text)
    email_text = parsed["email"]
    is_confident = parsed.get("is_confident", False)
except (json.JSONDecodeError, KeyError) as e:
    logfire.warning(
        "Failed to parse JSON response, falling back to plain text",
        error=str(e),
        response_preview=response_text[:200]
    )
    # Fallback: treat entire response as email text
    email_text = response_text
    is_confident = False  # Continue with degraded confidence
Non-fatal errors should still be logged to Logfire for monitoring and debugging. Use logfire.warning() to track degraded service patterns.

Custom Exceptions

The pipeline defines a hierarchy of custom exceptions for intelligent error handling and retry logic.

Exception Hierarchy

class PipelineExecutionError(Exception):
    """
    Base exception for pipeline execution failures.
    
    All step-specific exceptions inherit from this.
    Celery can catch this for retry logic.
    """
    pass


class StepExecutionError(PipelineExecutionError):
    """
    Raised when a pipeline step fails.
    
    Attributes:
        step_name: Name of the failed step
        original_error: The underlying exception
    """
    
    def __init__(self, step_name: str, original_error: Exception):
        self.step_name = step_name
        self.original_error = original_error
        # Embed step_name in message for Celery serialization
        error_message = f"Step '{step_name}' failed: {str(original_error)}"
        super().__init__(error_message)


class ValidationError(PipelineExecutionError):
    """
    Raised when step input/output validation fails.
    
    Example: Required field missing from previous step
    """
    pass


class ExternalAPIError(PipelineExecutionError):
    """
    Raised when external API calls fail (Anthropic, Exa, ArXiv).
    
    This is a retriable error - Celery should retry with exponential backoff.
    """
    pass

Using Custom Exceptions

In step implementation (web_scraper/main.py:124-132):
try:
    result = await self.exa_client.dual_answer(
        background_query=background_query,
        publications_query=publications_query,
        timeout=45.0
    )
except TimeoutError as e:
    logfire.error("Exa timeout", error=str(e), recipient=pipeline_data.recipient_name)
    raise ExternalAPIError(f"Exa search timed out: {e}")
except ConnectionError as e:
    logfire.error("Exa connection error", error=str(e))
    raise ExternalAPIError(f"Failed to connect to Exa API: {e}")
except Exception as e:
    logfire.error("Exa failed", error_type=type(e).__name__, error=str(e))
    raise ExternalAPIError(f"Exa search failed ({type(e).__name__}): {e}")
In BasePipelineStep (runner.py:114-136):
except Exception as e:
    # Calculate duration even on failure
    duration = time.perf_counter() - start_time
    
    # Log error with full context
    logfire.error(
        f"{self.step_name} failed",
        task_id=pipeline_data.task_id,
        error=str(e),
        error_type=type(e).__name__,
        duration=duration,
        exc_info=True  # Include stack trace
    )
    
    # Record error in pipeline data
    pipeline_data.add_error(self.step_name, str(e))
    
    # Wrap exception for clarity
    raise StepExecutionError(self.step_name, e) from e

Error Propagation

Errors propagate through multiple layers: Step → Runner → Celery Task → Queue System.

Layer 1: Step Execution

In BasePipelineStep.execute() (runner.py:66-136):
with logfire.span(
    f"pipeline.{self.step_name}",
    task_id=pipeline_data.task_id,
    step=self.step_name
):
    try:
        # Execute the step-specific logic
        result = await self._execute_step(pipeline_data)
        
        # Log success
        logfire.info(
            f"{self.step_name} completed",
            task_id=pipeline_data.task_id,
            success=result.success
        )
        
        return result
        
    except Exception as e:
        # Log error with context
        logfire.error(
            f"{self.step_name} failed",
            error=str(e),
            error_type=type(e).__name__,
            exc_info=True
        )
        
        # Record in pipeline data for debugging
        pipeline_data.add_error(self.step_name, str(e))
        
        # Wrap and re-raise
        raise StepExecutionError(self.step_name, e) from e

Layer 2: Pipeline Runner

In PipelineRunner.run() (runner.py:223-274):
with logfire.span(
    "pipeline.full_run",
    task_id=pipeline_data.task_id,
    user_id=pipeline_data.user_id
):
    # Execute each step sequentially
    for i, step in enumerate(self.steps):
        # Execute step
        result = await step.execute(pipeline_data, progress_callback)
        
        # Check for failure
        if not result.success:
            raise StepExecutionError(
                step.step_name,
                Exception(result.error or "Unknown error")
            )
    
    # Verify email_id was set
    email_id = pipeline_data.metadata.get("email_id")
    if not email_id:
        raise ValueError(
            "Pipeline completed but email_id not set. "
            "EmailComposer step must set pipeline_data.metadata['email_id']"
        )
    
    return email_id

Layer 3: Celery Task

In generate_email_task() (tasks/email_tasks.py:238-281):
try:
    email_id = asyncio.run(_execute_pipeline())
except (StepExecutionError, PipelineExecutionError) as exc:
    failed_step = getattr(exc, "step_name", None)
    error_message = str(exc)
    
    is_timeout = "timed out" in error_message.lower()
    is_final_attempt = self.request.retries >= self.max_retries
    
    # Retry logic for timeouts
    if is_timeout and not is_final_attempt:
        logfire.warning(
            "Pipeline timed out, scheduling retry",
            task_id=public_task_id,
            failed_step=failed_step,
            attempt=self.request.retries + 1
        )
        reset_queue_item_for_retry()
        raise self.retry(exc=exc)
    
    # Final failure
    logfire.error(
        "Pipeline execution failed",
        error=error_message,
        failed_step=failed_step
    )
    
    # Update queue status
    update_queue_status(QueueStatus.FAILED, error_message=error_message)
    
    # Prevent Celery from overwriting FAILURE state
    raise Ignore()

Layer 4: Queue System

Error truncation for database storage (email_tasks.py:100-107):
if error_message:
    # Truncate error message to 1000 characters to prevent database bloat
    # Stack traces can be very large and degrade query performance
    MAX_ERROR_LENGTH = 1000
    if len(error_message) > MAX_ERROR_LENGTH:
        item.error_message = error_message[:MAX_ERROR_LENGTH] + "... [truncated]"
    else:
        item.error_message = error_message
Error messages stored in the database are truncated to 1000 characters. Full stack traces are available in Logfire for debugging.

Retry Strategies

Celery Task Retries

Configuration in email_tasks.py:30:
@celery_app.task(bind=True, max_retries=1)
def generate_email_task(self, *, queue_item_id: Optional[str] = None, ...):
    # Task implementation
Retry conditions:
  • External API timeouts (Anthropic, Exa, ArXiv)
  • Database connection errors
  • Network failures
Non-retriable errors:
  • Invalid input data (would fail again)
  • User not found in database
  • Validation errors

LLM Agent Retries

Built into agent creation (email_composer/main.py:30-38):
self.composition_agent = create_agent(
    model=self.model,
    system_prompt=SYSTEM_PROMPT,
    temperature=self.temperature,
    max_tokens=self.max_tokens,
    retries=3,  # 3 automatic retries for LLM calls
    timeout=90.0  # 90 second timeout per attempt
)

Exa Search Timeout

Per-request timeout in web_scraper/main.py:64-68:
result = await self.exa_client.dual_answer(
    background_query=background_query,
    publications_query=publications_query,
    timeout=45.0  # 45 second timeout
)

Error Recording in PipelineData

Non-fatal errors are tracked in the PipelineData.errors list for debugging and monitoring. Helper method in core.py:148-150:
def add_error(self, step_name: str, error_message: str) -> None:
    """Record non-fatal error"""
    self.errors.append(f"{step_name}: {error_message}")
Usage example:
# In step implementation
try:
    await scrape_url(url)
except TimeoutError as e:
    # Record but continue
    pipeline_data.add_error("web_scraper", f"URL timeout: {url}")
    logfire.warning("URL scraping timeout", url=url, error=str(e))
    # Continue with other URLs
Final logging in runner.py:266-272:
logfire.info(
    "Pipeline execution completed",
    task_id=pipeline_data.task_id,
    email_id=email_id,
    total_duration=total_duration,
    step_timings=pipeline_data.step_timings,
    errors=pipeline_data.errors  # Include all non-fatal errors
)

Best Practices

1. Always Return StepResult

# ✅ Good: Explicit success/failure
if validation_failed:
    return StepResult(
        success=False,
        step_name=self.step_name,
        error="Validation failed: missing required field"
    )

return StepResult(
    success=True,
    step_name=self.step_name,
    metadata={"items_processed": count}
)
# ❌ Bad: Raising exception for non-fatal issues
if some_urls_failed:
    raise Exception("Some URLs failed")  # Stops pipeline unnecessarily

2. Use Appropriate Exception Types

# ✅ Good: Use ExternalAPIError for retriable failures
try:
    response = await external_api_call()
except TimeoutError as e:
    raise ExternalAPIError(f"API timeout: {e}")  # Celery can retry
# ❌ Bad: Generic exception loses retry information
try:
    response = await external_api_call()
except TimeoutError as e:
    raise Exception(f"API failed: {e}")  # Celery doesn't know if retriable

3. Log Errors with Context

# ✅ Good: Rich context for debugging
logfire.error(
    "Email composer failed",
    task_id=pipeline_data.task_id,
    user_id=pipeline_data.user_id,
    error=str(e),
    error_type=type(e).__name__,
    recipient=pipeline_data.recipient_name,
    exc_info=True  # Include stack trace
)
# ❌ Bad: Missing context
logfire.error("Failed", error=str(e))

4. Validate Input Early

# ✅ Good: Fail fast with clear error
async def _validate_input(self, pipeline_data: PipelineData) -> Optional[str]:
    if not pipeline_data.scraped_content:
        return "scraped_content is missing (Step 2 must run first)"
    
    if not pipeline_data.user_id:
        return "user_id is missing (required for database write)"
    
    return None

5. Track Both Fatal and Non-Fatal Errors

# ✅ Good: Distinguish error severities
warnings = []
for url in urls:
    try:
        content = await scrape(url)
        results.append(content)
    except TimeoutError:
        warnings.append(f"URL timeout: {url}")
        pipeline_data.add_error("web_scraper", f"Timeout: {url}")
        # Continue with other URLs

if not results:
    # No successful scrapes - fatal
    return StepResult(
        success=False,
        step_name=self.step_name,
        error="All scraping attempts failed"
    )

return StepResult(
    success=True,
    step_name=self.step_name,
    warnings=warnings  # Non-fatal issues logged
)

Build docs developers (and LLMs) love