Architecture Overview
RLAAS uses a provider-agnostic core with thin adapter layers for each telemetry or traffic provider. The core engine only knows about model.RequestContext → model.Decision. Every provider adapter simply converts its native format into this universal contract.
Provider Adapters (thin mapping) Core Engine (unchanged) ┌──────────────────────┐ ┌────────────────────────┐ │ OpenTelemetry │──┐ │ │ │ (logs, traces) │ │ Provider │ TelemetryRecord │ ├──────────────────────┤ │ SPI │ ↓ │ │ Datadog Agent │──┼──────────────→ model.RequestContext │ │ (logs, metrics) │ │ Interface │ ↓ │ ├──────────────────────┤ │ │ Policy Matcher │ │ Fluent Bit │──┤ │ ↓ │ │ (logs) │ │ │ Algorithm Engine │ ├──────────────────────┤ │ │ ↓ │ │ Envoy / Istio │──┤ │ model.Decision │ │ (HTTP rate limit) │ │ │ │ ├──────────────────────┤ │ └────────────────────────┘ │ Kafka │──┤ │ (events) │ │ ├──────────────────────┤ │ │ Your Custom Provider │──┘ └──────────────────────┘
Provider SPI
The Service Provider Interface lives in pkg/provider/ and defines three core types:
TelemetryRecord
The universal input format. Any provider converts its native signals into this struct:
type TelemetryRecord struct {
Signal SignalKind // log, span, metric, event, http
OrgID string // organization
Service string // originating service
Operation string // span name, endpoint, topic
Severity string // INFO, WARN, ERROR
Environment string // prod, staging, dev
Region string // us-east-1, eu-west-1
Endpoint string // HTTP path
Method string // HTTP method
Tags map[string]string // arbitrary metadata
}
Adapter Interface
Every provider implements this:
type Adapter interface {
Name() string // "otel", "datadog", etc.
SignalKinds() []SignalKind // what signals it handles
ProcessBatch(ctx, records) ([]Decision, error) // evaluate a batch
}
BatchProcessor
A reusable concurrent evaluator that any adapter can embed:
bp := provider.BatchProcessor{
Eval: rlaasClient, // any Evaluator
Workers: 4, // concurrent goroutines
FailOpen: true, // allow on engine error
}
decisions := bp.Process(ctx, records)
OpenTelemetry
internal/adapter/otelThe OTEL adapter processes log and span batches with concurrent workers. It's the original adapter and implements both the legacy API and the Provider SPI.
Usage — Direct Processor
import "github.com/suresh-p26/RLAAS/internal/adapter/otel"
proc := otel.NewProcessor(rlaasClient, 4, true) // 4 workers, fail-open
keptLogs := proc.ProcessLogs(ctx, logs)
keptSpans := proc.ProcessSpans(ctx, spans)
stats := proc.Stats() // {Allowed: 9500, Dropped: 500, Errors: 0}
Usage — Via Provider SPI
adapter := otel.NewProviderAdapter(rlaasClient, 4, true)
registry.Register(adapter) // registers as "otel"
decisions, _ := adapter.ProcessBatch(ctx, []provider.TelemetryRecord{
{Signal: provider.SignalLog, OrgID: "northwind", Service: "payments", Severity: "INFO"},
{Signal: provider.SignalSpan, OrgID: "northwind", Service: "trading", Operation: "execute_trade"},
})
Collector Pipeline Integration
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
rlaas:
policy_file: /etc/rlaas/policies.json
workers: 4
fail_open: true
org_id: "northwind"
service_attribute: "service.name"
exporters:
otlp:
endpoint: tempo:4317
service:
pipelines:
traces:
receivers: [otlp]
processors: [rlaas]
exporters: [otlp]
logs:
receivers: [otlp]
processors: [rlaas]
exporters: [otlp]
Datadog
internal/adapter/datadogThe Datadog adapter converts Datadog-native LogEntry and MetricSample records into RLAAS evaluations. Datadog tag format (key:value) is automatically parsed.
Filter Logs
import "github.com/suresh-p26/RLAAS/internal/adapter/datadog"
adapter := datadog.NewAdapter(rlaasClient, "northwind", 4, true)
logs := []datadog.LogEntry{
{Service: "payments", Status: "info", Tags: []string{"env:prod", "team:pay"}},
{Service: "auth", Status: "warn", Tags: []string{"env:prod"}},
}
kept := adapter.FilterLogs(ctx, logs)
// Only logs within rate limits are kept
Filter Metrics (cost control)
metrics := []datadog.MetricSample{
{Metric: "cpu.user", Service: "market-data", Type: "gauge"},
{Metric: "orders.count", Service: "trading", Type: "count"},
}
kept := adapter.FilterMetrics(ctx, metrics)
// Enforce Datadog metric submission quotas
Fluent Bit / Fluentd
internal/adapter/fluentbitThe Fluent Bit adapter handles the standard Fluent Bit record format: a routing tag plus key-value field map. Service name resolution falls back from field to tag automatically.
Usage
import "github.com/suresh-p26/RLAAS/internal/adapter/fluentbit"
adapter := fluentbit.NewAdapter(rlaasClient, "northwind", 4, true)
// Optionally customize field mapping:
adapter.ServiceField = "kubernetes.container_name"
adapter.SeverityField = "log_level"
records := []fluentbit.Record{
{Tag: "kube.payments", Fields: map[string]string{
"kubernetes.container_name": "payments",
"log_level": "info",
"message": "transaction processed",
}},
}
kept := adapter.FilterRecords(ctx, records)
Fluent Bit Pipeline Config
[INPUT]
Name tail
Path /var/log/containers/*.log
Parser docker
Tag kube.*
[FILTER]
Name kubernetes
Match kube.*
# RLAAS rate limiting applied here via Go plugin or sidecar
[OUTPUT]
Name splunk
Match *
Host splunk.internal
Token ${SPLUNK_TOKEN}
Envoy / Istio
internal/adapter/envoyThe Envoy adapter supports two integration modes: Envoy's native rate limit service protocol (gRPC descriptors) and the ext_authz external authorization check.
Rate Limit Service
import "github.com/suresh-p26/RLAAS/internal/adapter/envoy"
adapter := envoy.NewAdapter(rlaasClient, "northwind", 4, true)
// Envoy sends descriptors like:
// [{"entries": [{"key": "service", "value": "risk-api"}, {"key": "path", "value": "/v1/risk/assess"}]}]
descriptors := []envoy.Descriptor{
{Entries: []envoy.DescriptorEntry{
{Key: "service", Value: "risk-api"},
{Key: "path", Value: "/v1/risk/assess"},
{Key: "method", Value: "POST"},
}},
}
response := adapter.CheckRateLimit(ctx, "northwind", descriptors)
// response.OverallCode == RateLimitOK or RateLimitOverLimit
ext_authz Mode
authResp := adapter.CheckAuth(ctx, envoy.AuthRequest{
Method: "POST",
Path: "/v1/risk/assess",
Service: "risk-api",
Headers: map[string]string{"x-api-key": "key-123"},
})
// authResp.Allowed == true/false
// authResp.StatusCode == 200 or 429
Envoy Config
# envoy.yaml
http_filters:
- name: envoy.filters.http.ratelimit
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.ratelimit.v3.RateLimit
domain: northwind
rate_limit_service:
grpc_service:
envoy_grpc:
cluster_name: rlaas
transport_api_version: V3
clusters:
- name: rlaas
type: STRICT_DNS
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: rlaas
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: rlaas-server
port_value: 9090
Kafka
internal/adapter/kafkaThe Kafka adapter enforces per-topic, per-consumer-group rate limits on event streams. Use it as a consumer interceptor to protect downstream systems from burst traffic.
Usage
import "github.com/suresh-p26/RLAAS/internal/adapter/kafka"
adapter := kafka.NewAdapter(rlaasClient, "northwind", 4, true)
messages := []kafka.Message{
{Topic: "orders.placed", Service: "order-service", Key: "o-123",
ConsumerGroup: "order-processor"},
{Topic: "orders.placed", Service: "order-service", Key: "o-456",
ConsumerGroup: "order-processor"},
}
kept := adapter.FilterMessages(ctx, messages)
// Process only the messages that pass rate limiting
delay action instead of drop to slow consumers rather than losing events. The retry_after field tells the consumer when to retry.
Building a Custom Provider
Adding a new provider takes ~100 lines. Here's the full pattern:
package myprovider
import (
"context"
"github.com/suresh-p26/RLAAS/pkg/provider"
)
// MyRecord is your provider's native format.
type MyRecord struct {
Service string
Data map[string]string
}
type Adapter struct {
bp provider.BatchProcessor
orgID string
}
func NewAdapter(eval provider.Evaluator, orgID string) *Adapter {
return &Adapter{
bp: provider.BatchProcessor{Eval: eval, Workers: 4, FailOpen: true},
orgID: orgID,
}
}
func (a *Adapter) Name() string { return "myprovider" }
func (a *Adapter) SignalKinds() []provider.SignalKind {
return []provider.SignalKind{provider.SignalLog}
}
func (a *Adapter) ProcessBatch(ctx context.Context, records []provider.TelemetryRecord) ([]provider.Decision, error) {
return a.bp.Process(ctx, records), nil
}
func (a *Adapter) Filter(ctx context.Context, records []MyRecord) []MyRecord {
telRecords := make([]provider.TelemetryRecord, len(records))
for i, r := range records {
telRecords[i] = provider.TelemetryRecord{
Signal: provider.SignalLog,
OrgID: a.orgID,
Service: r.Service,
Tags: r.Data,
}
}
decisions := a.bp.Process(ctx, telRecords)
out := make([]MyRecord, 0, len(records))
for i, d := range decisions {
if d.Allowed {
out = append(out, records[i])
}
}
return out
}
var _ provider.Adapter = (*Adapter)(nil)
Provider Registry
The registry is a thread-safe store for dynamic adapter lookup at runtime:
import "github.com/suresh-p26/RLAAS/pkg/provider"
// Register adapters at startup
registry := provider.NewRegistry()
registry.MustRegister(otelAdapter)
registry.MustRegister(datadogAdapter)
registry.MustRegister(envoyAdapter)
registry.MustRegister(kafkaAdapter)
// Lookup by name at runtime
adapter, ok := registry.Get("datadog")
if ok {
decisions, _ := adapter.ProcessBatch(ctx, records)
}
// List all registered providers
names := registry.List() // ["otel", "datadog", "envoy", "kafka"]
There's also a global provider.DefaultRegistry for simple use cases where providers self-register via init().
Enterprise Example: Northwind
Here's how a Northwind-class enterprise would deploy RLAAS across multiple providers with per-service policies:
┌─────────────────────────────────────────────────────────────────┐
│ Northwind Microservices │
│ payments │ auth │ trading │ market-data │ risk-api │ orders │
└─────┬──────┬──────┬──────────┬────────────┬──────────┬──────────┘
│OTLP │OTLP │DD Logs │DD Metrics │HTTP │Kafka
▼ ▼ ▼ ▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ OTEL │ │ Datadog │ │ Fluent │ │ Envoy │ │ Kafka │
│Collector │ │ Agent │ │ Bit │ │ Proxy │ │Consumer │
│Processor │ │ Adapter │ │ Adapter │ │ ext_authz│ │Intercept │
└─────┬────┘ └─────┬────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │ │ │
└────────────┴───────────┴──────┬──────┴────────────┘
│
┌───────▼────────┐
│ RLAAS Server │
│ (Policy Engine)│
│ ┌────────────┐ │
│ │ 12 Policies│ │
│ │ per-service│ │
│ │ per-signal │ │
│ └────────────┘ │
│ Redis counters │
│ Postgres store │
└────────────────┘
Policy Matrix
| Service | Provider | Signal | Algorithm | Limit | Action |
|---|---|---|---|---|---|
| payments | OTEL | log | Token Bucket | 10K/min + 2K burst | drop |
| payments | OTEL | log (ERROR) | Token Bucket | 50K/min | allow |
| auth | OTEL | log | Sliding Window | 5K/min | drop |
| trading-engine | OTEL | span | Fixed Window | 50K/min | drop |
| market-data | Fluent Bit | log | Leaky Bucket | 200/s (smooth) | drop |
| market-data | Datadog | metric | Quota | 1M points/hour | drop_low_priority |
| risk-api | Envoy | http | Token Bucket | 500 req/s | deny |
| risk-api | Envoy | http | Concurrency | 50 in-flight | deny |
| orders | Kafka | event | Sliding Log | 100K/min | delay |
| notifications | Fluent Bit | log | Fixed Window | 2K/min | drop |
| all others | any | any | Fixed Window | 5K/min | drop |
examples/northwind-policies.json — 12 production-ready policies covering all providers and use cases.