Skip to main content

Overview

Transmitters are responsible for delivering messages to their final destination. The current implementation supports HTTP webhooks with configurable headers, TLS settings, and retry handling.

Transmitter Interface

All transmitters implement this simple interface:
type Transmitter interface {
    Tx(io.Reader, transmitter.TransmitAttributes) error
}
Parameters:
  • io.Reader: The message body
  • transmitter.TransmitAttributes: Key-value metadata (converted to headers)
Return:
  • error: Standard error for non-retryable failures, or *TransmitRetryableError for retryable failures

Webhook Transmitter

Configuration

From transmitter/webhook/transmitter.go:38-44:
type TransmitterConfig struct {
    Endpoint              string        // Target webhook URL
    TLSInsecureSkipVerify bool          // Skip TLS verification (dev only)
    DefaultContentType    string        // Fallback Content-Type
    RequestTimeout        time.Duration // HTTP client timeout
}

HTTP Client Setup

The transmitter creates a configured HTTP client with TLS settings:
func NewTransmitter(c *TransmitterConfig) *Transmitter {
    var idleConnTimeout time.Duration
    if c.RequestTimeout > 0 {
        idleConnTimeout = c.RequestTimeout + (30 * time.Second)
    }
    return &Transmitter{
        endpoint: c.Endpoint,
        client: &http.Client{
            Timeout: c.RequestTimeout,
            Transport: &http.Transport{
                IdleConnTimeout: idleConnTimeout,
                TLSClientConfig: &tls.Config{
                    InsecureSkipVerify: c.TLSInsecureSkipVerify,
                    MinVersion:         tls.VersionTLS13,
                },
            },
        },
        defaultContentType: c.DefaultContentType,
    }
}
Security: TLSInsecureSkipVerify should only be used in development. Production deployments should always verify TLS certificates.

TLS Configuration

Carrier enforces strong TLS defaults:
TLSClientConfig: &tls.Config{
    InsecureSkipVerify: c.TLSInsecureSkipVerify,
    MinVersion:         tls.VersionTLS13,  // Requires TLS 1.3+
}
Features:
  • Minimum TLS 1.3 for all connections
  • Configurable certificate verification
  • Connection reuse via IdleConnTimeout

HTTP POST Implementation

Request Creation

From transmitter/webhook/transmitter.go:76-95:
func (t *Transmitter) newRequest(message io.Reader, attributes transmitter.TransmitAttributes) (*http.Request, error) {
    req, err := http.NewRequest(http.MethodPost, t.endpoint, message)
    if err != nil {
        return req, err
    }
    for k, v := range attributes {
        if k == HeaderContentType {
            // send Content-Type header unmodified
            req.Header.Add(k, v)
        } else {
            req.Header.Add(fmt.Sprintf("%s%s", HeaderPrefix, k), v)
        }
    }
    if req.Header.Get(HeaderContentType) == "" && t.defaultContentType != "" {
        // add the default Content-Type header
        req.Header.Add(HeaderContentType, t.defaultContentType)
    }
    return req, err
}
Request properties:
  • Method: POST
  • Body: Raw message from queue
  • Headers: Generated from transmit attributes

Header Handling

X-Carrier-* Headers

Transmit attributes are converted to HTTP headers with the X-Carrier- prefix:
const HeaderPrefix = "X-Carrier-"
Example transformation:
TransmitAttributes{
    "Receive-Count": "1",
    "First-Receive-Time": "1620000000000",
}
Becomes:
X-Carrier-Receive-Count: 1
X-Carrier-First-Receive-Time: 1620000000000

Content-Type Handling

Content-Type receives special treatment:
if k == HeaderContentType {
    // send Content-Type header unmodified
    req.Header.Add(k, v)
} else {
    req.Header.Add(fmt.Sprintf("%s%s", HeaderPrefix, k), v)
}
Fallback behavior:
if req.Header.Get(HeaderContentType) == "" && t.defaultContentType != "" {
    req.Header.Add(HeaderContentType, t.defaultContentType)
}
If the SQS message has a Body.ContentType attribute, it’s used. Otherwise, the configured default (typically application/json) is applied.

Transmission Logic

From transmitter/webhook/transmitter.go:100-133:
func (t *Transmitter) Tx(message io.Reader, attributes transmitter.TransmitAttributes) error {
    req, err := t.newRequest(message, attributes)
    if err != nil {
        return fmt.Errorf("%w: failed to create request: %w", transmitter.ErrTransmitFailed, err)
    }
    res, err := t.client.Do(req)
    if res != nil && res.Body != nil {
        defer res.Body.Close()
    }
    if err != nil {
        return fmt.Errorf("%w: failed to send request: %w", transmitter.ErrTransmitFailed, err)
    }
    switch res.StatusCode {
    case http.StatusOK:
        return nil
    case http.StatusTooManyRequests:
        // Handle 429 with Retry-After
        retryAfter := res.Header.Get(HeaderRetryAfter)
        if retryAfter != "" {
            seconds, err := strconv.Atoi(retryAfter)
            if err != nil {
                return fmt.Errorf("%w: %w: %w", transmitter.ErrTransmitFailed, ErrStatusCode429, err)
            }
            return transmitter.NewTransmitRetryableError(ErrStatusCode429, time.Duration(seconds*int(time.Second)))
        }
        return fmt.Errorf("%w: %w: %w", transmitter.ErrTransmitFailed, ErrStatusCode429, ErrNoRetryAfterHeader)
    default:
        return fmt.Errorf("%w: %w: %d", transmitter.ErrTransmitFailed, ErrNon200StatusCode, res.StatusCode)
    }
}

Status Code Handling

Status CodeBehavior
200 OKSuccess - message deleted from queue
429 Too Many RequestsRetryable - visibility timeout updated based on Retry-After header
Other non-200Non-retryable - message stays in queue, error logged

Error Handling

Retryable Errors

From transmitter/error.go:14-20:
type TransmitRetryableError struct {
    Err        error
    RetryAfter time.Duration
}
Usage:
return transmitter.NewTransmitRetryableError(
    ErrStatusCode429, 
    time.Duration(seconds*int(time.Second))
)
When a retryable error is returned, the receiver updates the SQS message visibility timeout:
VisibilityTimeout: int32(err.RetryAfter.Seconds())

Non-Retryable Errors

Non-retryable errors wrap transmitter.ErrTransmitFailed:
return fmt.Errorf("%w: %w: %d", 
    transmitter.ErrTransmitFailed, 
    ErrNon200StatusCode, 
    res.StatusCode
)
Messages with non-retryable errors remain in the queue until they:
  • Are successfully transmitted on a retry
  • Reach the maximum receive count (then moved to dead-letter queue)
  • Expire based on message retention settings

TransmitAttributes Type

From transmitter/attribute.go:3-7:
type TransmitAttributes map[string]string
A simple string map that carries metadata from the receiver to the transmitter. The webhook transmitter converts these to HTTP headers.

Configuration Reference

Environment VariableDescriptionDefault
WEBHOOK_ENDPOINTTarget webhook URLhttp://localhost:9000
WEBHOOK_DEFAULT_CONTENT_TYPEDefault Content-Type headerapplication/json
WEBHOOK_REQUEST_TIMEOUTHTTP request timeout60s
WEBHOOK_TLS_INSECURE_SKIP_VERIFYSkip TLS verificationfalse
WEBHOOK_HEALTH_CHECK_ENDPOINTHealth check path(none)
WEBHOOK_HEALTH_CHECK_INTERVALHealth check frequency60s
WEBHOOK_HEALTH_CHECK_TIMEOUTHealth check timeout10s
WEBHOOK_OFFLINE_THRESHOLD_COUNTFailed checks before offline5

Example Webhook Request

Given an SQS message with:
  • Body: {"event": "user.created"}
  • Attribute: Body.ContentType = application/json
  • Receive count: 1
Carrier will send:
POST /webhook HTTP/1.1
Host: example.com
Content-Type: application/json
X-Carrier-Receive-Count: 1
X-Carrier-First-Receive-Time: 1620000000000

{"event": "user.created"}

Extending Transmitters

To create a new transmitter type (e.g., gRPC, SNS):
  1. Create a new package (e.g., transmitter/grpc)
  2. Implement the interface:
    func (t *GRPCTransmitter) Tx(message io.Reader, attributes transmitter.TransmitAttributes) error {
        // Read message body
        body, err := io.ReadAll(message)
        
        // Convert attributes to gRPC metadata
        md := metadata.New(attributes)
        
        // Send via gRPC
        return t.client.Send(ctx, body, md)
    }
    
  3. Update main.go to use the new transmitter
The Transmitter interface is intentionally simple to make it easy to add new destination types.

Build docs developers (and LLMs) love