Skip to content

Batch Emitter Component

The batch emitter component (internal/emit) provides reliable data transmission with batching, retries, and on-disk spooling for fault-tolerant data delivery.

Overview

The batch emitter aggregates discovered nodes and edges into batches and delivers them to ingestion endpoints with exponential backoff retry logic and disk-based spooling for maximum reliability.

Core Data Types

Node Types

NodeDomain

Represents a domain entity:

go
type NodeDomain struct {
    Host      string    `json:"host"`       // Full hostname
    Apex      string    `json:"apex"`       // Apex/root domain  
    FirstSeen time.Time `json:"first_seen"` // First observation
    LastSeen  time.Time `json:"last_seen"`  // Last observation
}

NodeIP

Represents an IP address entity:

go
type NodeIP struct {
    IP        string    `json:"ip"`         // IP address
    FirstSeen time.Time `json:"first_seen"` // First observation
    LastSeen  time.Time `json:"last_seen"`  // Last observation
}

NodeCert

Represents a TLS certificate entity:

go
type NodeCert struct {
    SPKI      string    `json:"spki_sha256"` // SHA-256 of Subject Public Key Info
    SubjectCN string    `json:"subject_cn"`  // Certificate subject common name
    IssuerCN  string    `json:"issuer_cn"`   // Certificate issuer common name
    NotBefore time.Time `json:"not_before"`  // Certificate valid from
    NotAfter  time.Time `json:"not_after"`   // Certificate valid until
}

Edge Type

Edge

Represents relationships between entities:

go
type Edge struct {
    Type       string    `json:"type"`        // Edge type (RESOLVES_TO, LINKS_TO, etc.)
    Source     string    `json:"source"`      // Source entity identifier
    Target     string    `json:"target"`      // Target entity identifier
    ObservedAt time.Time `json:"observed_at"` // Observation timestamp
    ProbeID    string    `json:"probe_id"`    // Probe instance identifier
    RunID      string    `json:"run_id"`      // Probe run identifier
}

Supported Edge Types

  • RESOLVES_TO: Domain → IP address (A/AAAA records)
  • USES_NS: Domain → Nameserver (NS records)
  • ALIAS_OF: Domain → CNAME target
  • USES_MX: Domain → Mail exchanger (MX records)
  • LINKS_TO: Domain → External domains (from HTML links)
  • USES_CERT: Domain → TLS certificate (SPKI hash)

Batch Structure

Batch

Container for nodes and edges:

go
type Batch struct {
    ProbeID string       `json:"probe_id"`     // Probe instance identifier
    RunID   string       `json:"run_id"`       // Probe run identifier
    NodesD  []NodeDomain `json:"nodes_domain"` // Domain nodes
    NodesIP []NodeIP     `json:"nodes_ip"`     // IP address nodes
    NodesC  []NodeCert   `json:"nodes_cert"`   // Certificate nodes
    Edges   []Edge       `json:"edges"`        // Relationship edges
}

Emitter Architecture

EmitterConfig Structure

All emitter options are passed through EmitterConfig:

go
type EmitterConfig struct {
    Ingest             string        // Ingestion endpoint URL (empty = stdout)
    ProbeID            string        // Probe identifier stamped on batches
    RunID              string        // Run identifier stamped on batches
    BatchMaxEdges      int           // Max edges per batch before forced flush
    FlushEvery         time.Duration // Time-based flush interval
    NodeThresholdRatio float64       // Flush when nodes exceed this ratio of BatchMaxEdges
    SpoolDir           string        // On-disk spool directory for failed batches
    MTLSCert           string        // Client certificate path (PEM)
    MTLSKey            string        // Client key path (PEM)
    MTLSCA             string        // CA bundle path (PEM)
    RetryMaxElapsed    time.Duration // Max total time for exponential-backoff retries
    StdoutWriter       StdoutWriter  // Optional writer for stdout output formats
    Store              BatchStore    // Optional database store (e.g. MongoDB)
}

Core Functions

NewEmitter(cfg EmitterConfig) (*Emitter, error)

Creates a new batch emitter. Returns an error if mTLS certificates are misconfigured.

Configuration:

  • Ingestion URL: HTTPS endpoint for batch delivery (empty = stdout)
  • mTLS Support: Client certificate authentication
  • Spool Directory: On-disk buffer for failed deliveries
  • Batch Max Edges: Maximum edges per batch (default: 10,000)
  • Flush Interval: Time-based batch flushing (default: 2s)
  • RetryMaxElapsed: Context-aware exponential backoff stops after this duration

ForceFlush(ctx context.Context, log *zap.SugaredLogger)

Immediately flushes the current accumulator to the configured output (stdout, ingest endpoint, or database store). Used by the Control API to drain buffered data on demand without waiting for the next timed flush.

PendingStats() (edges, nodes int)

Returns the number of edges and nodes currently buffered in the accumulator. Used by the Control API's observability endpoints to expose real-time emitter queue depth.

AddEdge(edge Edge)

Adds an edge to the current batch buffer.

Features:

  • Thread-Safe: Mutex-protected buffer access
  • Automatic Flushing: Triggers flush when batch size reached
  • Deduplication: Prevents duplicate edges in same batch

AddNode(node interface{})

Adds nodes (Domain, IP, or Certificate) to the current batch.

Type Detection:

  • NodeDomain: Added to domain node array
  • NodeIP: Added to IP node array
  • NodeCert: Added to certificate node array

Reliability Features

Exponential Backoff Retry

  • Initial Delay: 1 second
  • Maximum Delay: 5 minutes
  • Multiplier: 2x per retry
  • Jitter: Randomization to prevent thundering herd
  • Context-Aware: Retries honour the request context; if the context is cancelled (e.g. on shutdown), the retry loop exits immediately rather than waiting for RetryMaxElapsed.
  • Max Elapsed Time: Configurable via RetryMaxElapsed (default: 30s)

On-Disk Spooling

  • Persistent Storage: Failed batches written to disk
  • Automatic Recovery: Spooled batches replayed on startup
  • Format: JSON files with timestamp prefixes
  • Cleanup: Successful deliveries remove spool files

Circuit Breaker Integration

  • Failure Detection: HTTP 5xx and connection errors
  • Automatic Recovery: Gradual traffic restoration
  • Per-Host Isolation: Independent failure tracking

HTTP Delivery

Request Format

http
POST /v1/batch HTTP/1.1
Host: ingest.example.com
Content-Type: application/json
User-Agent: spyder-probe/1.0

{
  "probe_id": "prod-us-west-1",
  "run_id": "run-20231201-143000",
  "nodes_domain": [...],
  "nodes_ip": [...], 
  "nodes_cert": [...],
  "edges": [...]
}

mTLS Authentication

go
// Client certificate configuration
cert, err := tls.LoadX509KeyPair("client.crt", "client.key")
client := &http.Client{
    Transport: &http.Transport{
        TLSClientConfig: &tls.Config{
            Certificates: []tls.Certificate{cert},
        },
    },
}

Monitoring and Metrics

Delivery Metrics

  • Batch Success Rate: Percentage of successful deliveries
  • Retry Attempts: Distribution of retry attempts per batch
  • Delivery Latency: Time from buffer to successful delivery
  • Spool Usage: Number and size of spooled batches

Performance Metrics

  • Batch Size Distribution: Actual vs configured batch sizes
  • Flush Triggers: Time-based vs size-based flush ratio
  • Buffer Utilization: Current buffer fill percentage
  • Throughput: Batches and items per second

Error Handling

Delivery Failures

  • Temporary Failures: HTTP 5xx, timeouts, connection errors
  • Permanent Failures: HTTP 4xx (except 429), authentication errors
  • Retry Strategy: Exponential backoff for temporary failures
  • Dead Letter: Permanent failures logged and discarded

Spool Management

  • Write Failures: Log error and attempt in-memory retry
  • Recovery Failures: Log error and continue with new batches
  • Disk Full: Fallback to in-memory buffering only

Configuration Examples

Basic Configuration

go
emitter := emit.NewEmitter(emit.EmitterConfig{
    IngestURL: "https://ingest.example.com/v1/batch",
    BatchSize: 1000,
    FlushInterval: 30 * time.Second,
    SpoolDir: "/var/spool/spyder",
})

mTLS Configuration

go
emitter := emit.NewEmitter(emit.EmitterConfig{
    IngestURL: "https://ingest.example.com/v1/batch",
    ClientCert: "/etc/spyder/client.crt",
    ClientKey: "/etc/spyder/client.key",
    BatchSize: 500,
    FlushInterval: 60 * time.Second,
})

High-Throughput Configuration

go
emitter := emit.NewEmitter(emit.EmitterConfig{
    IngestURL: "https://ingest.example.com/v1/batch",
    BatchSize: 5000,
    FlushInterval: 10 * time.Second,
    Concurrency: 4,
    SpoolDir: "/tmp/spyder-spool",
})

Integration Patterns

Probe Integration

go
func (p *Probe) ProcessDomain(domain string) {
    // Discover relationships
    edges := p.discoverEdges(domain)
    nodes := p.discoverNodes(domain)
    
    // Emit discoveries
    for _, edge := range edges {
        p.emitter.AddEdge(edge)
    }
    for _, node := range nodes {
        p.emitter.AddNode(node)
    }
}

Graceful Shutdown

go
func (p *Probe) Shutdown() {
    // Flush remaining batches
    p.emitter.Flush()
    
    // Wait for in-flight deliveries
    p.emitter.Wait()
    
    // Close resources
    p.emitter.Close()
}

Best Practices

Batch Sizing

  • Memory Usage: Larger batches use more memory
  • Network Efficiency: Larger batches reduce HTTP overhead
  • Latency: Smaller batches reduce data freshness delay
  • Recommended: 500-2000 items per batch

Error Recovery

  • Spool Monitoring: Monitor spool directory size and age
  • Retry Limits: Configure reasonable maximum retry attempts
  • Dead Letter Handling: Implement alerting for permanent failures

Performance Tuning

  • Concurrent Delivery: Use multiple emitter instances for high throughput
  • Batch Optimization: Tune batch size based on network and server capacity
  • Flush Frequency: Balance latency requirements with efficiency

Security Considerations

Data Transmission

  • HTTPS Only: All data transmitted over encrypted connections
  • mTLS Authentication: Mutual TLS for secure ingestion endpoints
  • Certificate Validation: Verify server certificates

Data Integrity

  • JSON Validation: Validate batch structure before transmission
  • Retry Limits: Prevent infinite retry loops
  • Spool Security: Secure file permissions for spooled data