Batch Emitter Component
The batch emitter component (internal/emit) provides reliable data transmission with batching, retries, and on-disk spooling for fault-tolerant data delivery.
Overview
The batch emitter aggregates discovered nodes and edges into batches and delivers them to ingestion endpoints with exponential backoff retry logic and disk-based spooling for maximum reliability.
Core Data Types
Node Types
NodeDomain
Represents a domain entity:
type NodeDomain struct {
Host string `json:"host"` // Full hostname
Apex string `json:"apex"` // Apex/root domain
FirstSeen time.Time `json:"first_seen"` // First observation
LastSeen time.Time `json:"last_seen"` // Last observation
}NodeIP
Represents an IP address entity:
type NodeIP struct {
IP string `json:"ip"` // IP address
FirstSeen time.Time `json:"first_seen"` // First observation
LastSeen time.Time `json:"last_seen"` // Last observation
}NodeCert
Represents a TLS certificate entity:
type NodeCert struct {
SPKI string `json:"spki_sha256"` // SHA-256 of Subject Public Key Info
SubjectCN string `json:"subject_cn"` // Certificate subject common name
IssuerCN string `json:"issuer_cn"` // Certificate issuer common name
NotBefore time.Time `json:"not_before"` // Certificate valid from
NotAfter time.Time `json:"not_after"` // Certificate valid until
}Edge Type
Edge
Represents relationships between entities:
type Edge struct {
Type string `json:"type"` // Edge type (RESOLVES_TO, LINKS_TO, etc.)
Source string `json:"source"` // Source entity identifier
Target string `json:"target"` // Target entity identifier
ObservedAt time.Time `json:"observed_at"` // Observation timestamp
ProbeID string `json:"probe_id"` // Probe instance identifier
RunID string `json:"run_id"` // Probe run identifier
}Supported Edge Types
RESOLVES_TO: Domain → IP address (A/AAAA records)USES_NS: Domain → Nameserver (NS records)ALIAS_OF: Domain → CNAME targetUSES_MX: Domain → Mail exchanger (MX records)LINKS_TO: Domain → External domains (from HTML links)USES_CERT: Domain → TLS certificate (SPKI hash)
Batch Structure
Batch
Container for nodes and edges:
type Batch struct {
ProbeID string `json:"probe_id"` // Probe instance identifier
RunID string `json:"run_id"` // Probe run identifier
NodesD []NodeDomain `json:"nodes_domain"` // Domain nodes
NodesIP []NodeIP `json:"nodes_ip"` // IP address nodes
NodesC []NodeCert `json:"nodes_cert"` // Certificate nodes
Edges []Edge `json:"edges"` // Relationship edges
}Emitter Architecture
EmitterConfig Structure
All emitter options are passed through EmitterConfig:
type EmitterConfig struct {
Ingest string // Ingestion endpoint URL (empty = stdout)
ProbeID string // Probe identifier stamped on batches
RunID string // Run identifier stamped on batches
BatchMaxEdges int // Max edges per batch before forced flush
FlushEvery time.Duration // Time-based flush interval
NodeThresholdRatio float64 // Flush when nodes exceed this ratio of BatchMaxEdges
SpoolDir string // On-disk spool directory for failed batches
MTLSCert string // Client certificate path (PEM)
MTLSKey string // Client key path (PEM)
MTLSCA string // CA bundle path (PEM)
RetryMaxElapsed time.Duration // Max total time for exponential-backoff retries
StdoutWriter StdoutWriter // Optional writer for stdout output formats
Store BatchStore // Optional database store (e.g. MongoDB)
}Core Functions
NewEmitter(cfg EmitterConfig) (*Emitter, error)
Creates a new batch emitter. Returns an error if mTLS certificates are misconfigured.
Configuration:
- Ingestion URL: HTTPS endpoint for batch delivery (empty = stdout)
- mTLS Support: Client certificate authentication
- Spool Directory: On-disk buffer for failed deliveries
- Batch Max Edges: Maximum edges per batch (default: 10,000)
- Flush Interval: Time-based batch flushing (default: 2s)
- RetryMaxElapsed: Context-aware exponential backoff stops after this duration
ForceFlush(ctx context.Context, log *zap.SugaredLogger)
Immediately flushes the current accumulator to the configured output (stdout, ingest endpoint, or database store). Used by the Control API to drain buffered data on demand without waiting for the next timed flush.
PendingStats() (edges, nodes int)
Returns the number of edges and nodes currently buffered in the accumulator. Used by the Control API's observability endpoints to expose real-time emitter queue depth.
AddEdge(edge Edge)
Adds an edge to the current batch buffer.
Features:
- Thread-Safe: Mutex-protected buffer access
- Automatic Flushing: Triggers flush when batch size reached
- Deduplication: Prevents duplicate edges in same batch
AddNode(node interface{})
Adds nodes (Domain, IP, or Certificate) to the current batch.
Type Detection:
- NodeDomain: Added to domain node array
- NodeIP: Added to IP node array
- NodeCert: Added to certificate node array
Reliability Features
Exponential Backoff Retry
- Initial Delay: 1 second
- Maximum Delay: 5 minutes
- Multiplier: 2x per retry
- Jitter: Randomization to prevent thundering herd
- Context-Aware: Retries honour the request context; if the context is cancelled (e.g. on shutdown), the retry loop exits immediately rather than waiting for
RetryMaxElapsed. - Max Elapsed Time: Configurable via
RetryMaxElapsed(default: 30s)
On-Disk Spooling
- Persistent Storage: Failed batches written to disk
- Automatic Recovery: Spooled batches replayed on startup
- Format: JSON files with timestamp prefixes
- Cleanup: Successful deliveries remove spool files
Circuit Breaker Integration
- Failure Detection: HTTP 5xx and connection errors
- Automatic Recovery: Gradual traffic restoration
- Per-Host Isolation: Independent failure tracking
HTTP Delivery
Request Format
POST /v1/batch HTTP/1.1
Host: ingest.example.com
Content-Type: application/json
User-Agent: spyder-probe/1.0
{
"probe_id": "prod-us-west-1",
"run_id": "run-20231201-143000",
"nodes_domain": [...],
"nodes_ip": [...],
"nodes_cert": [...],
"edges": [...]
}mTLS Authentication
// Client certificate configuration
cert, err := tls.LoadX509KeyPair("client.crt", "client.key")
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{cert},
},
},
}Monitoring and Metrics
Delivery Metrics
- Batch Success Rate: Percentage of successful deliveries
- Retry Attempts: Distribution of retry attempts per batch
- Delivery Latency: Time from buffer to successful delivery
- Spool Usage: Number and size of spooled batches
Performance Metrics
- Batch Size Distribution: Actual vs configured batch sizes
- Flush Triggers: Time-based vs size-based flush ratio
- Buffer Utilization: Current buffer fill percentage
- Throughput: Batches and items per second
Error Handling
Delivery Failures
- Temporary Failures: HTTP 5xx, timeouts, connection errors
- Permanent Failures: HTTP 4xx (except 429), authentication errors
- Retry Strategy: Exponential backoff for temporary failures
- Dead Letter: Permanent failures logged and discarded
Spool Management
- Write Failures: Log error and attempt in-memory retry
- Recovery Failures: Log error and continue with new batches
- Disk Full: Fallback to in-memory buffering only
Configuration Examples
Basic Configuration
emitter := emit.NewEmitter(emit.EmitterConfig{
IngestURL: "https://ingest.example.com/v1/batch",
BatchSize: 1000,
FlushInterval: 30 * time.Second,
SpoolDir: "/var/spool/spyder",
})mTLS Configuration
emitter := emit.NewEmitter(emit.EmitterConfig{
IngestURL: "https://ingest.example.com/v1/batch",
ClientCert: "/etc/spyder/client.crt",
ClientKey: "/etc/spyder/client.key",
BatchSize: 500,
FlushInterval: 60 * time.Second,
})High-Throughput Configuration
emitter := emit.NewEmitter(emit.EmitterConfig{
IngestURL: "https://ingest.example.com/v1/batch",
BatchSize: 5000,
FlushInterval: 10 * time.Second,
Concurrency: 4,
SpoolDir: "/tmp/spyder-spool",
})Integration Patterns
Probe Integration
func (p *Probe) ProcessDomain(domain string) {
// Discover relationships
edges := p.discoverEdges(domain)
nodes := p.discoverNodes(domain)
// Emit discoveries
for _, edge := range edges {
p.emitter.AddEdge(edge)
}
for _, node := range nodes {
p.emitter.AddNode(node)
}
}Graceful Shutdown
func (p *Probe) Shutdown() {
// Flush remaining batches
p.emitter.Flush()
// Wait for in-flight deliveries
p.emitter.Wait()
// Close resources
p.emitter.Close()
}Best Practices
Batch Sizing
- Memory Usage: Larger batches use more memory
- Network Efficiency: Larger batches reduce HTTP overhead
- Latency: Smaller batches reduce data freshness delay
- Recommended: 500-2000 items per batch
Error Recovery
- Spool Monitoring: Monitor spool directory size and age
- Retry Limits: Configure reasonable maximum retry attempts
- Dead Letter Handling: Implement alerting for permanent failures
Performance Tuning
- Concurrent Delivery: Use multiple emitter instances for high throughput
- Batch Optimization: Tune batch size based on network and server capacity
- Flush Frequency: Balance latency requirements with efficiency
Security Considerations
Data Transmission
- HTTPS Only: All data transmitted over encrypted connections
- mTLS Authentication: Mutual TLS for secure ingestion endpoints
- Certificate Validation: Verify server certificates
Data Integrity
- JSON Validation: Validate batch structure before transmission
- Retry Limits: Prevent infinite retry loops
- Spool Security: Secure file permissions for spooled data