Skip to main content

Overview

The MCP Gateway pattern implements AARM as a proxy server that intercepts all Model Context Protocol traffic. Agents connect to the gateway instead of directly to MCP servers. This architecture follows the established pattern of security gateways and service meshes — a trusted intermediary that interposes on communication channels to enforce policy. The approach has proven effective in API security (API gateways), service-to-service communication (Envoy, Istio), and database access (SQL proxies).
Agent → AARM Gateway → MCP Server (Database)
                    → MCP Server (Email)
                    → MCP Server (Filesystem)
Key property: Enforcement happens at the network level. If network configuration ensures all tool traffic routes through the gateway, enforcement cannot be bypassed by agent-side code.

When to Use

Good fit:
  • MCP-based tool ecosystem
  • Network-level enforcement required
  • Minimal agent modification desired
  • You control the network between agents and tool servers
  • Multiple agents share the same tool backends
Not ideal:
  • Non-MCP tools (use SDK pattern)
  • Need rich agent context for policies (agent reasoning, chain-of-thought)
  • Can’t control network routing
  • Latency-sensitive workloads that can’t tolerate an extra network hop

Architecture Properties

PropertyGateway RatingNotes
Bypass ResistanceHighNetwork-level enforcement — agents cannot route around the gateway
Context RichnessLimitedOnly sees wire-level data: tool names, parameters, metadata headers
Defer SupportPartialCan hold execution, but lacks agent reasoning to resolve autonomously
AARM-ConformantYes (standalone)Satisfies R1–R6 when combined with proper identity and receipt config
Latency ImpactMediumOne additional network hop per tool call

Comparison with Other Architectures

The gateway trades context richness for bypass resistance. Unlike the SDK pattern, which can see agent internals (reasoning traces, chain-of-thought, embedding state), the gateway only sees what crosses the wire. However, the SDK pattern depends on correct integration — malicious code, bugs, or misconfiguration could invoke tools directly without passing through AARM. The gateway eliminates this class of bypass entirely. For maximum security, deploy the gateway alongside an SDK or kernel-level (eBPF) layer for defense-in-depth.

Implementation

Basic Gateway

# gateway/server.py
from mcp import MCPServer, ToolCallRequest, ToolCallResponse
from aarm import PolicyEngine, ReceiptGenerator

class AARMGateway(MCPServer):
    def __init__(self, backend_url: str, policy_path: str):
        self.backend = MCPClient(backend_url)
        self.policy = PolicyEngine(policy_path)
        self.receipts = ReceiptGenerator()

    async def handle_tool_call(self, request: ToolCallRequest) -> ToolCallResponse:
        # Build action from MCP request
        action = self.build_action(request)

        # Evaluate policy
        decision = self.policy.evaluate(action)

        # Enforce
        if decision.result == "DENY":
            self.receipts.emit(action, decision, None)
            return ToolCallResponse(
                error=f"Policy denied: {decision.reason}"
            )

        if decision.result == "STEP_UP":
            approval = await self.request_approval(action)
            if not approval.granted:
                self.receipts.emit(action, decision, None)
                return ToolCallResponse(error="Approval denied")

        # Forward to backend
        result = await self.backend.call(request)

        # Record receipt
        self.receipts.emit(action, decision, result)

        return result

    def build_action(self, request: ToolCallRequest) -> dict:
        return {
            "tool": request.tool_name,
            "operation": request.method,
            "parameters": request.params,
            "identity": self.extract_identity(request),
            "timestamp": datetime.utcnow().isoformat()
        }

Context Accumulator

The gateway must track session state across multiple tool calls. This is the foundation for detecting compositional threats — where individual actions are permitted but their combination constitutes a breach.
# gateway/context.py
from aarm import ContextAccumulator, DataClassification

class GatewayContextAccumulator:
    """
    Append-only, hash-chained session context.
    Tracks prior actions, data classifications, and tool outputs
    to enable context-dependent policy evaluation.
    """
    def __init__(self):
        self.sessions: dict[str, ContextAccumulator] = {}

    def get_or_create(self, session_id: str) -> ContextAccumulator:
        if session_id not in self.sessions:
            self.sessions[session_id] = ContextAccumulator(session_id)
        return self.sessions[session_id]

    def record_action(self, session_id: str, action: dict, decision: dict, result: dict):
        ctx = self.get_or_create(session_id)

        # Append action to hash-chained log
        ctx.append(action=action, decision=decision, result=result)

        # Track data classifications from tool outputs
        if result and result.get("data"):
            classifications = self.classify_data(result["data"])
            ctx.update_data_accessed(classifications)

    def get_context(self, session_id: str) -> dict:
        ctx = self.get_or_create(session_id)
        return {
            "prior_actions": ctx.action_history,
            "data_accessed": ctx.data_classifications,
            "original_request": ctx.original_request,
            "action_count": ctx.count,
            "session_start": ctx.created_at,
        }

    def classify_data(self, data: dict) -> list[DataClassification]:
        """Classify data sensitivity from tool outputs."""
        # Implementation depends on your data classification system.
        # Examples: PII detection, sensitivity labels, compliance tags.
        ...
Integrate the accumulator into the gateway:
class AARMGateway(MCPServer):
    def __init__(self, backend_url: str, policy_path: str):
        self.backend = MCPClient(backend_url)
        self.policy = PolicyEngine(policy_path)
        self.receipts = ReceiptGenerator()
        self.context = GatewayContextAccumulator()

    async def handle_tool_call(self, request: ToolCallRequest) -> ToolCallResponse:
        action = self.build_action(request)
        session_id = action["identity"]["session"]

        # Attach accumulated context to the action
        action["context"] = self.context.get_context(session_id)

        decision = self.policy.evaluate(action)

        if decision.result == "DENY":
            self.context.record_action(session_id, action, decision, None)
            self.receipts.emit(action, decision, None)
            return ToolCallResponse(error=f"Policy denied: {decision.reason}")

        if decision.result == "DEFER":
            return await self.handle_deferral(action, decision)

        if decision.result == "STEP_UP":
            approval = await self.request_approval(action)
            if not approval.granted:
                self.context.record_action(session_id, action, decision, None)
                self.receipts.emit(action, decision, None)
                return ToolCallResponse(error="Approval denied")

        if decision.result == "MODIFY":
            request = self.apply_modifications(request, decision.modifications)

        result = await self.backend.call(request)

        # Record in context and receipts
        self.context.record_action(session_id, action, decision, result)
        self.receipts.emit(action, decision, result)

        return result

Handling All Five Decisions

AARM requires five authorization decisions. Here is how each maps to gateway behavior:
async def handle_deferral(self, action: dict, decision: dict) -> ToolCallResponse:
    """
    DEFER: Hold execution until additional context resolves ambiguity.

    The gateway can collect context from:
    - Downstream verification services
    - User confirmation via the approval service
    - Session metadata updates

    Limitation: Without visibility into agent reasoning, the gateway
    may need to escalate to human review if it cannot resolve
    the deferral autonomously.
    """
    deferral = await self.deferral_service.create(
        action=action,
        reason=decision.reason,
        timeout=decision.timeout or 300,  # default 5 min
        context_needed=decision.context_needed,
    )

    # Wait for resolution (blocks the tool call)
    resolution = await deferral.wait()

    if resolution.resolved:
        # Re-evaluate with new context
        action["context"]["deferral_resolution"] = resolution.data
        new_decision = self.policy.evaluate(action)
        if new_decision.result == "ALLOW":
            result = await self.backend.call(action["original_request"])
            self.receipts.emit(action, new_decision, result)
            return result

    # Deferral expired or denied
    self.receipts.emit(action, decision, None)
    return ToolCallResponse(error=f"Deferred action not resolved: {decision.reason}")


def apply_modifications(self, request: ToolCallRequest, modifications: dict) -> ToolCallRequest:
    """
    MODIFY: Adjust parameters before forwarding.

    Examples:
    - Redact sensitive fields from query parameters
    - Add row-level security filters to database queries
    - Scope file access to a specific directory
    - Reduce batch sizes to limit blast radius
    """
    modified = request.copy()

    for field, value in modifications.items():
        if field == "params":
            modified.params = {**modified.params, **value}
        elif field == "redact":
            for key in value:
                if key in modified.params:
                    modified.params[key] = "[REDACTED]"
        elif field == "add_filter":
            modified.params["_aarm_filter"] = value

    return modified

Network Configuration

Ensure agents can only reach MCP servers through the gateway:
# kubernetes network policy
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: aarm-gateway-only
spec:
  podSelector:
    matchLabels:
      app: ai-agent
  egress:
    - to:
        - podSelector:
            matchLabels:
              app: aarm-gateway
      ports:
        - port: 443
This NetworkPolicy is what gives the gateway its high bypass resistance. Without it, agents could connect directly to MCP backends, bypassing all policy enforcement. Verify this policy is active before claiming AARM conformance.
For non-Kubernetes environments, enforce equivalent network isolation using firewall rules, VPC security groups, or service mesh policies (e.g., Istio AuthorizationPolicy).

Multi-Backend Routing

In production, the gateway routes to multiple MCP backends based on the tool being invoked:
# gateway/router.py

class BackendRouter:
    """Route tool calls to the correct MCP backend."""

    def __init__(self, config: dict):
        self.backends = {}
        self.tool_map = {}

        for backend in config["backends"]:
            client = MCPClient(backend["url"])
            self.backends[backend["name"]] = client

            # Map tools to backends (discovered or configured)
            for tool in backend.get("tools", []):
                self.tool_map[tool] = backend["name"]

    async def discover_tools(self):
        """Query each backend for its available tools."""
        for name, client in self.backends.items():
            tools = await client.list_tools()
            for tool in tools:
                self.tool_map[tool.name] = name

    def route(self, tool_name: str) -> MCPClient:
        backend_name = self.tool_map.get(tool_name)
        if not backend_name:
            raise ToolNotFoundError(f"No backend registered for tool: {tool_name}")
        return self.backends[backend_name]

Configuration

Gateway Config

# gateway/config.yaml
server:
  port: 8443
  tls:
    cert: /etc/aarm/tls.crt
    key: /etc/aarm/tls.key

backends:
  - name: database
    url: mcp://db-server:8080
    tools: ["sql_query", "sql_execute", "list_tables"]
  - name: email
    url: mcp://email-server:8080
    tools: ["send_email", "read_inbox", "search_email"]
  - name: filesystem
    url: mcp://fs-server:8080
    tools: ["read_file", "write_file", "list_directory"]

policy:
  path: /etc/aarm/policies/
  reload_interval: 60s

context:
  session_ttl: 3600s          # Expire session context after 1 hour
  max_actions_per_session: 500 # Limit to prevent unbounded accumulation
  hash_algorithm: sha256       # For hash-chained context log

receipts:
  store: postgresql://receipts-db/aarm
  signing_key: /etc/aarm/signing.key
  signing_algorithm: ed25519

deferral:
  default_timeout: 300s
  max_timeout: 3600s
  store: redis://deferral-store:6379

approval:
  service_url: https://approval-service.internal
  channels:
    - type: slack
      webhook: https://hooks.slack.com/...
    - type: email
      smtp: smtp://mail.internal:587

telemetry:
  exporter: otlp
  endpoint: https://otel-collector.internal:4317
  service_name: aarm-gateway

Policy Examples

Policies define the rules the gateway enforces. Here are examples covering AARM’s four action categories:
# policies/forbidden.yaml
# Forbidden: Always blocked regardless of context
rules:
  - name: block-destructive-sql
    match:
      tool: sql_execute
      parameters:
        query:
          regex: "(?i)(DROP|TRUNCATE|DELETE FROM)\\s+(DATABASE|TABLE|\\*)"
    decision: DENY
    reason: "Destructive SQL operations are forbidden"

  - name: block-known-malicious-domains
    match:
      tool: send_email
      parameters:
        to:
          domain_in: ["malicious.example.com", "exfil.attacker.net"]
    decision: DENY
    reason: "Recipient domain is on blocklist"
# policies/context-dependent.yaml
# Context-Dependent Deny: Allowed by policy, blocked when context reveals risk
rules:
  - name: deny-email-after-sensitive-read
    match:
      tool: send_email
      parameters:
        to:
          external: true
    condition:
      context:
        data_accessed:
          contains_classification: ["PII", "CONFIDENTIAL", "FINANCIAL"]
    decision: DENY
    reason: "Cannot send external email after accessing sensitive data in this session"

  - name: stepup-bulk-operations
    match:
      tool: sql_execute
      parameters:
        query:
          regex: "(?i)(UPDATE|DELETE).*WHERE"
        affected_rows_estimate:
          greater_than: 100
    decision: STEP_UP
    reason: "Bulk data modification requires human approval"

  - name: defer-ambiguous-credential-rotation
    match:
      tool: rotate_credentials
    condition:
      context:
        original_request:
          not_contains: ["rotate", "credentials", "security"]
    decision: DEFER
    timeout: 300
    reason: "Credential rotation not clearly aligned with user request"
# policies/modify.yaml
# Modify: Adjust parameters before forwarding
rules:
  - name: scope-file-access
    match:
      tool: read_file
    condition:
      identity:
        role: "analyst"
    decision: MODIFY
    modifications:
      add_filter:
        allowed_paths: ["/data/reports/", "/data/public/"]
    reason: "Analysts restricted to reports and public data"

  - name: redact-ssn-in-queries
    match:
      tool: sql_query
    decision: MODIFY
    modifications:
      params:
        query_transform: "MASK(ssn) AS ssn"
    reason: "SSN fields must be masked in query results"

Extracting Identity

The gateway must extract identity from MCP requests to satisfy conformance requirement R6 (identity binding):
def extract_identity(self, request: ToolCallRequest) -> dict:
    # From MCP headers/metadata
    identity = {
        "human": request.metadata.get("x-user-id"),
        "service": request.metadata.get("x-service-id"),
        "session": request.metadata.get("x-session-id"),
        "agent": request.metadata.get("x-agent-id"),
        "role": request.metadata.get("x-user-role"),
    }

    # Validate identity is present — AARM requires identity binding
    if not identity["human"] and not identity["service"]:
        raise AuthenticationError(
            "MCP request missing required identity headers. "
            "Set x-user-id or x-service-id in MCP client metadata."
        )

    return identity
Configure your agent to include identity headers:
# Agent configuration
mcp_client = MCPClient(
    url="https://aarm-gateway.internal",
    headers={
        "x-user-id": current_user.id,
        "x-service-id": "agent-service",
        "x-session-id": session.id,
        "x-agent-id": "research-agent-v2",
        "x-user-role": current_user.role,
    }
)

Identity Propagation in Multi-Agent Systems

When Agent A delegates to Agent B, identity must propagate to maintain the authorization chain:
# Multi-agent identity propagation
mcp_client = MCPClient(
    url="https://aarm-gateway.internal",
    headers={
        "x-user-id": original_user.id,           # Preserved from original request
        "x-service-id": "orchestrator-agent",
        "x-session-id": session.id,
        "x-agent-id": "sub-agent-email",
        "x-delegation-chain": "orchestrator->sub-agent-email",  # Track delegation
        "x-user-role": original_user.role,
    }
)

Threat Coverage

The gateway pattern addresses AARM’s threat model with varying effectiveness:
ThreatCoverageHow
Prompt InjectionStrongEnforces policy regardless of what caused the agent to act
Confused DeputyStrongIdentity binding + context prevents misuse of legitimate credentials
Over-Privileged CredentialsStrongMODIFY decisions can scope down permissions per-request
Data ExfiltrationStrongContext accumulator detects read-then-exfiltrate patterns
Goal HijackingModerateDetects forbidden actions, but cannot see agent reasoning
Intent DriftLimitedLimited to wire-level signals; no access to embedding similarity
Memory PoisoningLimitedCan detect behavioral drift across sessions via receipt analysis
Malicious Tool OutputsNoneGateway cannot inspect or sanitize tool responses before the agent processes them
Cross-Agent PropagationModerateIdentity propagation + delegation chain tracking
Side-Channel LeakageLimitedCan redact via MODIFY, but cannot control agent-side logging
Environmental ManipulationNoneOutside gateway’s observation scope
For threats rated “Limited” or “None”, deploy complementary layers (SDK instrumentation, eBPF backstop) for defense-in-depth.

High Availability

Deploy multiple gateway instances behind a load balancer:
# kubernetes deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: aarm-gateway
spec:
  replicas: 3
  selector:
    matchLabels:
      app: aarm-gateway
  template:
    metadata:
      labels:
        app: aarm-gateway
    spec:
      containers:
        - name: gateway
          image: aarm/gateway:latest
          ports:
            - containerPort: 8443
          readinessProbe:
            httpGet:
              path: /health
              port: 8443
            periodSeconds: 10
          livenessProbe:
            httpGet:
              path: /health
              port: 8443
            periodSeconds: 30
          resources:
            requests:
              cpu: "500m"
              memory: "256Mi"
            limits:
              cpu: "2000m"
              memory: "1Gi"
          volumeMounts:
            - name: tls
              mountPath: /etc/aarm/tls
              readOnly: true
            - name: policies
              mountPath: /etc/aarm/policies
              readOnly: true
            - name: signing-key
              mountPath: /etc/aarm/signing.key
              subPath: signing.key
              readOnly: true
      volumes:
        - name: tls
          secret:
            secretName: aarm-gateway-tls
        - name: policies
          configMap:
            name: aarm-policies
        - name: signing-key
          secret:
            secretName: aarm-signing-key

Session Affinity

Because the context accumulator tracks session state, you need either sticky sessions or shared state:
# Option A: Sticky sessions via load balancer
apiVersion: v1
kind: Service
metadata:
  name: aarm-gateway
spec:
  type: ClusterIP
  sessionAffinity: ClientIP
  sessionAffinityConfig:
    clientIP:
      timeoutSeconds: 3600
  ports:
    - port: 443
      targetPort: 8443
  selector:
    app: aarm-gateway
# Option B: Shared context store (preferred for HA)
# In gateway/config.yaml:
context:
  store: redis://context-store:6379
  session_ttl: 3600s
Option B (shared store) is preferred because it survives pod restarts and allows any gateway replica to serve any session.

Fail-Closed Behavior

The gateway must fail closed. If the policy engine, context store, or receipt store is unavailable, the gateway must deny all actions rather than allow them to pass through unmediated.
async def handle_tool_call(self, request: ToolCallRequest) -> ToolCallResponse:
    try:
        action = self.build_action(request)
        action["context"] = self.context.get_context(action["identity"]["session"])
        decision = self.policy.evaluate(action)
    except PolicyEngineUnavailable:
        # FAIL CLOSED — do not forward to backend
        return ToolCallResponse(
            error="AARM gateway: policy engine unavailable. Action denied."
        )
    except ContextStoreUnavailable:
        # FAIL CLOSED — context is required for conformance (R2)
        return ToolCallResponse(
            error="AARM gateway: context store unavailable. Action denied."
        )

    # ... normal flow

Observability

Export telemetry in OpenTelemetry format (conformance requirement R8):
# gateway/telemetry.py
from opentelemetry import trace, metrics

tracer = trace.get_tracer("aarm.gateway")
meter = metrics.get_meter("aarm.gateway")

# Metrics
actions_total = meter.create_counter(
    "aarm.actions.total",
    description="Total actions evaluated"
)
actions_denied = meter.create_counter(
    "aarm.actions.denied",
    description="Actions denied by policy"
)
actions_deferred = meter.create_counter(
    "aarm.actions.deferred",
    description="Actions deferred"
)
actions_stepup = meter.create_counter(
    "aarm.actions.stepup",
    description="Actions requiring approval"
)
evaluation_latency = meter.create_histogram(
    "aarm.evaluation.latency_ms",
    description="Policy evaluation latency"
)

async def handle_tool_call_with_telemetry(self, request: ToolCallRequest):
    with tracer.start_as_current_span("aarm.evaluate") as span:
        action = self.build_action(request)
        span.set_attribute("aarm.tool", action["tool"])
        span.set_attribute("aarm.identity.human", action["identity"].get("human", ""))
        span.set_attribute("aarm.session", action["identity"].get("session", ""))

        start = time.monotonic()
        decision = self.policy.evaluate(action)
        elapsed = (time.monotonic() - start) * 1000

        evaluation_latency.record(elapsed)
        actions_total.add(1, {"tool": action["tool"]})

        span.set_attribute("aarm.decision", decision.result)

        if decision.result == "DENY":
            actions_denied.add(1, {"tool": action["tool"], "reason": decision.reason})
        elif decision.result == "DEFER":
            actions_deferred.add(1, {"tool": action["tool"]})
        elif decision.result == "STEP_UP":
            actions_stepup.add(1, {"tool": action["tool"]})

        # ... continue with enforcement

Conformance Checklist

Use this checklist to verify your gateway implementation satisfies AARM requirements:
ReqLevelRequirementGateway Implementation
R1MUSTPre-execution interceptionAll tool calls pass through gateway before reaching backends
R2MUSTContext accumulationGatewayContextAccumulator tracks session state in hash-chained log
R3MUSTPolicy evaluation with intent alignmentPolicyEngine evaluates static policy + context-dependent rules
R4MUSTFive authorization decisionsALLOW, DENY, MODIFY, STEP_UP, DEFER all implemented
R5MUSTTamper-evident receiptsReceiptGenerator signs with Ed25519 key, stores to PostgreSQL
R6MUSTIdentity bindingIdentity extracted from MCP headers, validated on every request
R7SHOULDSemantic distance trackingLimited at gateway level — consider SDK layer for embedding similarity
R8SHOULDTelemetry exportOTel traces + metrics exported to collector
R9SHOULDLeast privilege enforcementMODIFY decisions scope credentials; combine with JIT credential issuer
AARM Core (R1-R6): Achievable with the gateway pattern alone.AARM Extended (R7-R9): R7 (semantic distance) is difficult at the gateway level since you lack access to agent embeddings. Pair with an SDK layer for full Extended compliance.

Next Steps

Approval Flows

Add human approval for high-risk actions via Slack, email, or custom approval UIs

Deferral Flows

Implement deferral workflows with progressive context collection and timeout handling

Receipt Signing

Cryptographic receipt generation, offline verification, and forensic reconstruction

OpenTelemetry Ingestion

Structured telemetry export for SIEM/SOAR integration and operational dashboards

Layered Deployment

Combine the gateway with SDK instrumentation and eBPF for defense-in-depth