Multi-Provider Architecture

One policy engine. Every observability and traffic pipeline. Rate limit OpenTelemetry, Datadog, Fluent Bit, Envoy, Kafka — and anything you build.

Architecture Overview

RLAAS uses a provider-agnostic core with thin adapter layers for each telemetry or traffic provider. The core engine only knows about model.RequestContextmodel.Decision. Every provider adapter simply converts its native format into this universal contract.

Key insight: The core engine is never coupled to any provider. Adding a new provider is ~100 lines of mapping code with zero changes to the engine.
Provider Adapters (thin mapping)          Core Engine (unchanged)
┌──────────────────────┐                 ┌────────────────────────┐
│ OpenTelemetry        │──┐              │                        │
│ (logs, traces)       │  │   Provider   │  TelemetryRecord       │
├──────────────────────┤  │   SPI        │        ↓               │
│ Datadog Agent        │──┼──────────────→  model.RequestContext  │
│ (logs, metrics)      │  │   Interface  │        ↓               │
├──────────────────────┤  │              │  Policy Matcher         │
│ Fluent Bit           │──┤              │        ↓               │
│ (logs)               │  │              │  Algorithm Engine       │
├──────────────────────┤  │              │        ↓               │
│ Envoy / Istio        │──┤              │  model.Decision         │
│ (HTTP rate limit)    │  │              │                        │
├──────────────────────┤  │              └────────────────────────┘
│ Kafka                │──┤
│ (events)             │  │
├──────────────────────┤  │
│ Your Custom Provider │──┘
└──────────────────────┘

Provider SPI

The Service Provider Interface lives in pkg/provider/ and defines three core types:

TelemetryRecord

The universal input format. Any provider converts its native signals into this struct:

type TelemetryRecord struct {
    Signal      SignalKind        // log, span, metric, event, http
    OrgID       string            // organization
    Service     string            // originating service
    Operation   string            // span name, endpoint, topic
    Severity    string            // INFO, WARN, ERROR
    Environment string            // prod, staging, dev
    Region      string            // us-east-1, eu-west-1
    Endpoint    string            // HTTP path
    Method      string            // HTTP method
    Tags        map[string]string // arbitrary metadata
}

Adapter Interface

Every provider implements this:

type Adapter interface {
    Name() string                                     // "otel", "datadog", etc.
    SignalKinds() []SignalKind                         // what signals it handles
    ProcessBatch(ctx, records) ([]Decision, error)     // evaluate a batch
}

BatchProcessor

A reusable concurrent evaluator that any adapter can embed:

bp := provider.BatchProcessor{
    Eval:     rlaasClient,  // any Evaluator
    Workers:  4,            // concurrent goroutines
    FailOpen: true,         // allow on engine error
}
decisions := bp.Process(ctx, records)

OpenTelemetry

Signal Types
Logs, Spans
Package
internal/adapter/otel
Integration
Embedded processor in Collector agent

The OTEL adapter processes log and span batches with concurrent workers. It's the original adapter and implements both the legacy API and the Provider SPI.

Usage — Direct Processor

import "github.com/suresh-p26/RLAAS/internal/adapter/otel"

proc := otel.NewProcessor(rlaasClient, 4, true) // 4 workers, fail-open
keptLogs := proc.ProcessLogs(ctx, logs)
keptSpans := proc.ProcessSpans(ctx, spans)
stats := proc.Stats() // {Allowed: 9500, Dropped: 500, Errors: 0}

Usage — Via Provider SPI

adapter := otel.NewProviderAdapter(rlaasClient, 4, true)
registry.Register(adapter) // registers as "otel"

decisions, _ := adapter.ProcessBatch(ctx, []provider.TelemetryRecord{
  {Signal: provider.SignalLog, OrgID: "northwind", Service: "payments", Severity: "INFO"},
  {Signal: provider.SignalSpan, OrgID: "northwind", Service: "trading", Operation: "execute_trade"},
})

Collector Pipeline Integration

# otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  rlaas:
    policy_file: /etc/rlaas/policies.json
    workers: 4
    fail_open: true
    org_id: "northwind"
    service_attribute: "service.name"

exporters:
  otlp:
    endpoint: tempo:4317

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [rlaas]
      exporters: [otlp]
    logs:
      receivers: [otlp]
      processors: [rlaas]
      exporters: [otlp]

Datadog

Signal Types
Logs, Metrics
Package
internal/adapter/datadog
Integration
Agent check, log processor

The Datadog adapter converts Datadog-native LogEntry and MetricSample records into RLAAS evaluations. Datadog tag format (key:value) is automatically parsed.

Filter Logs

import "github.com/suresh-p26/RLAAS/internal/adapter/datadog"

adapter := datadog.NewAdapter(rlaasClient, "northwind", 4, true)

logs := []datadog.LogEntry{
    {Service: "payments", Status: "info", Tags: []string{"env:prod", "team:pay"}},
    {Service: "auth",     Status: "warn", Tags: []string{"env:prod"}},
}
kept := adapter.FilterLogs(ctx, logs)
// Only logs within rate limits are kept

Filter Metrics (cost control)

metrics := []datadog.MetricSample{
    {Metric: "cpu.user",     Service: "market-data", Type: "gauge"},
    {Metric: "orders.count", Service: "trading",     Type: "count"},
}
kept := adapter.FilterMetrics(ctx, metrics)
// Enforce Datadog metric submission quotas
Cost control: Datadog charges per metric point. Use the quota algorithm to enforce hourly/daily budgets on metric submissions before they hit intake.

Fluent Bit / Fluentd

Signal Types
Logs
Package
internal/adapter/fluentbit
Integration
Go output plugin, sidecar filter

The Fluent Bit adapter handles the standard Fluent Bit record format: a routing tag plus key-value field map. Service name resolution falls back from field to tag automatically.

Usage

import "github.com/suresh-p26/RLAAS/internal/adapter/fluentbit"

adapter := fluentbit.NewAdapter(rlaasClient, "northwind", 4, true)
// Optionally customize field mapping:
adapter.ServiceField = "kubernetes.container_name"
adapter.SeverityField = "log_level"

records := []fluentbit.Record{
    {Tag: "kube.payments", Fields: map[string]string{
        "kubernetes.container_name": "payments",
        "log_level": "info",
        "message": "transaction processed",
    }},
}
kept := adapter.FilterRecords(ctx, records)

Fluent Bit Pipeline Config

[INPUT]
    Name              tail
    Path              /var/log/containers/*.log
    Parser            docker
    Tag               kube.*

[FILTER]
    Name              kubernetes
    Match             kube.*

# RLAAS rate limiting applied here via Go plugin or sidecar

[OUTPUT]
    Name              splunk
    Match             *
    Host              splunk.internal
    Token             ${SPLUNK_TOKEN}

Envoy / Istio

Signal Types
HTTP traffic
Package
internal/adapter/envoy
Integration
Rate Limit Service, ext_authz

The Envoy adapter supports two integration modes: Envoy's native rate limit service protocol (gRPC descriptors) and the ext_authz external authorization check.

Rate Limit Service

import "github.com/suresh-p26/RLAAS/internal/adapter/envoy"

adapter := envoy.NewAdapter(rlaasClient, "northwind", 4, true)

// Envoy sends descriptors like:
// [{"entries": [{"key": "service", "value": "risk-api"}, {"key": "path", "value": "/v1/risk/assess"}]}]
descriptors := []envoy.Descriptor{
    {Entries: []envoy.DescriptorEntry{
        {Key: "service", Value: "risk-api"},
        {Key: "path", Value: "/v1/risk/assess"},
        {Key: "method", Value: "POST"},
    }},
}
response := adapter.CheckRateLimit(ctx, "northwind", descriptors)
// response.OverallCode == RateLimitOK or RateLimitOverLimit

ext_authz Mode

authResp := adapter.CheckAuth(ctx, envoy.AuthRequest{
    Method:  "POST",
    Path:    "/v1/risk/assess",
    Service: "risk-api",
    Headers: map[string]string{"x-api-key": "key-123"},
})
// authResp.Allowed == true/false
// authResp.StatusCode == 200 or 429

Envoy Config

# envoy.yaml
http_filters:
  - name: envoy.filters.http.ratelimit
    typed_config:
      "@type": type.googleapis.com/envoy.extensions.filters.http.ratelimit.v3.RateLimit
      domain: northwind
      rate_limit_service:
        grpc_service:
          envoy_grpc:
            cluster_name: rlaas
        transport_api_version: V3

clusters:
  - name: rlaas
    type: STRICT_DNS
    lb_policy: ROUND_ROBIN
    load_assignment:
      cluster_name: rlaas
      endpoints:
        - lb_endpoints:
            - endpoint:
                address:
                  socket_address:
                    address: rlaas-server
                    port_value: 9090

Kafka

Signal Types
Events
Package
internal/adapter/kafka
Integration
Consumer interceptor, producer filter

The Kafka adapter enforces per-topic, per-consumer-group rate limits on event streams. Use it as a consumer interceptor to protect downstream systems from burst traffic.

Usage

import "github.com/suresh-p26/RLAAS/internal/adapter/kafka"

adapter := kafka.NewAdapter(rlaasClient, "northwind", 4, true)

messages := []kafka.Message{
    {Topic: "orders.placed", Service: "order-service", Key: "o-123",
     ConsumerGroup: "order-processor"},
    {Topic: "orders.placed", Service: "order-service", Key: "o-456",
     ConsumerGroup: "order-processor"},
}
kept := adapter.FilterMessages(ctx, messages)
// Process only the messages that pass rate limiting
Pattern: Use the delay action instead of drop to slow consumers rather than losing events. The retry_after field tells the consumer when to retry.

Building a Custom Provider

Adding a new provider takes ~100 lines. Here's the full pattern:

package myprovider

import (
    "context"
    "github.com/suresh-p26/RLAAS/pkg/provider"
)

// MyRecord is your provider's native format.
type MyRecord struct {
    Service string
    Data    map[string]string
}

type Adapter struct {
    bp    provider.BatchProcessor
    orgID string
}

func NewAdapter(eval provider.Evaluator, orgID string) *Adapter {
    return &Adapter{
        bp:    provider.BatchProcessor{Eval: eval, Workers: 4, FailOpen: true},
        orgID: orgID,
    }
}

func (a *Adapter) Name() string { return "myprovider" }

func (a *Adapter) SignalKinds() []provider.SignalKind {
    return []provider.SignalKind{provider.SignalLog}
}

func (a *Adapter) ProcessBatch(ctx context.Context, records []provider.TelemetryRecord) ([]provider.Decision, error) {
    return a.bp.Process(ctx, records), nil
}

func (a *Adapter) Filter(ctx context.Context, records []MyRecord) []MyRecord {
    telRecords := make([]provider.TelemetryRecord, len(records))
    for i, r := range records {
        telRecords[i] = provider.TelemetryRecord{
            Signal:  provider.SignalLog,
            OrgID:   a.orgID,
            Service: r.Service,
            Tags:    r.Data,
        }
    }
    decisions := a.bp.Process(ctx, telRecords)
    out := make([]MyRecord, 0, len(records))
    for i, d := range decisions {
        if d.Allowed {
            out = append(out, records[i])
        }
    }
    return out
}

var _ provider.Adapter = (*Adapter)(nil)

Provider Registry

The registry is a thread-safe store for dynamic adapter lookup at runtime:

import "github.com/suresh-p26/RLAAS/pkg/provider"

// Register adapters at startup
registry := provider.NewRegistry()
registry.MustRegister(otelAdapter)
registry.MustRegister(datadogAdapter)
registry.MustRegister(envoyAdapter)
registry.MustRegister(kafkaAdapter)

// Lookup by name at runtime
adapter, ok := registry.Get("datadog")
if ok {
    decisions, _ := adapter.ProcessBatch(ctx, records)
}

// List all registered providers
names := registry.List() // ["otel", "datadog", "envoy", "kafka"]

There's also a global provider.DefaultRegistry for simple use cases where providers self-register via init().

Enterprise Example: Northwind

Here's how a Northwind-class enterprise would deploy RLAAS across multiple providers with per-service policies:

┌─────────────────────────────────────────────────────────────────┐
│ Northwind Microservices                                         │
│  payments │ auth │ trading │ market-data │ risk-api │ orders    │
└─────┬──────┬──────┬──────────┬────────────┬──────────┬──────────┘
      │OTLP  │OTLP  │DD Logs   │DD Metrics  │HTTP      │Kafka
      ▼      ▼      ▼          ▼            ▼          ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ OTEL     │ │ Datadog  │ │ Fluent   │ │ Envoy    │ │ Kafka    │
│Collector │ │ Agent    │ │ Bit      │ │ Proxy    │ │Consumer  │
│Processor │ │ Adapter  │ │ Adapter  │ │ ext_authz│ │Intercept │
└─────┬────┘ └─────┬────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘
      │            │           │             │            │
      └────────────┴───────────┴──────┬──────┴────────────┘
                                      │
                              ┌───────▼────────┐
                              │  RLAAS Server   │
                              │  (Policy Engine)│
                              │  ┌────────────┐ │
                              │  │ 12 Policies│ │
                              │  │ per-service│ │
                              │  │ per-signal │ │
                              │  └────────────┘ │
                              │  Redis counters │
                              │  Postgres store │
                              └────────────────┘

Policy Matrix

ServiceProviderSignalAlgorithmLimitAction
paymentsOTELlogToken Bucket10K/min + 2K burstdrop
paymentsOTELlog (ERROR)Token Bucket50K/minallow
authOTELlogSliding Window5K/mindrop
trading-engineOTELspanFixed Window50K/mindrop
market-dataFluent BitlogLeaky Bucket200/s (smooth)drop
market-dataDatadogmetricQuota1M points/hourdrop_low_priority
risk-apiEnvoyhttpToken Bucket500 req/sdeny
risk-apiEnvoyhttpConcurrency50 in-flightdeny
ordersKafkaeventSliding Log100K/mindelay
notificationsFluent BitlogFixed Window2K/mindrop
all othersanyanyFixed Window5K/mindrop
See it live: The full Northwind policy set is available at examples/northwind-policies.json — 12 production-ready policies covering all providers and use cases.