The MCP Gateway pattern implements AARM as a proxy server that intercepts all Model Context Protocol traffic. Agents connect to the gateway instead of directly to MCP servers. This architecture follows the established pattern of security gateways and service meshes — a trusted intermediary that interposes on communication channels to enforce policy. The approach has proven effective in API security (API gateways), service-to-service communication (Envoy, Istio), and database access (SQL proxies).
Agent → AARM Gateway → MCP Server (Database) → MCP Server (Email) → MCP Server (Filesystem)
Key property: Enforcement happens at the network level. If network configuration ensures all tool traffic routes through the gateway, enforcement cannot be bypassed by agent-side code.
The gateway trades context richness for bypass resistance. Unlike the SDK pattern, which can see agent internals (reasoning traces, chain-of-thought, embedding state), the gateway only sees what crosses the wire. However, the SDK pattern depends on correct integration — malicious code, bugs, or misconfiguration could invoke tools directly without passing through AARM. The gateway eliminates this class of bypass entirely.For maximum security, deploy the gateway alongside an SDK or kernel-level (eBPF) layer for defense-in-depth.
The gateway must track session state across multiple tool calls. This is the foundation for detecting compositional threats — where individual actions are permitted but their combination constitutes a breach.
# gateway/context.pyfrom aarm import ContextAccumulator, DataClassificationclass GatewayContextAccumulator: """ Append-only, hash-chained session context. Tracks prior actions, data classifications, and tool outputs to enable context-dependent policy evaluation. """ def __init__(self): self.sessions: dict[str, ContextAccumulator] = {} def get_or_create(self, session_id: str) -> ContextAccumulator: if session_id not in self.sessions: self.sessions[session_id] = ContextAccumulator(session_id) return self.sessions[session_id] def record_action(self, session_id: str, action: dict, decision: dict, result: dict): ctx = self.get_or_create(session_id) # Append action to hash-chained log ctx.append(action=action, decision=decision, result=result) # Track data classifications from tool outputs if result and result.get("data"): classifications = self.classify_data(result["data"]) ctx.update_data_accessed(classifications) def get_context(self, session_id: str) -> dict: ctx = self.get_or_create(session_id) return { "prior_actions": ctx.action_history, "data_accessed": ctx.data_classifications, "original_request": ctx.original_request, "action_count": ctx.count, "session_start": ctx.created_at, } def classify_data(self, data: dict) -> list[DataClassification]: """Classify data sensitivity from tool outputs.""" # Implementation depends on your data classification system. # Examples: PII detection, sensitivity labels, compliance tags. ...
Integrate the accumulator into the gateway:
class AARMGateway(MCPServer): def __init__(self, backend_url: str, policy_path: str): self.backend = MCPClient(backend_url) self.policy = PolicyEngine(policy_path) self.receipts = ReceiptGenerator() self.context = GatewayContextAccumulator() async def handle_tool_call(self, request: ToolCallRequest) -> ToolCallResponse: action = self.build_action(request) session_id = action["identity"]["session"] # Attach accumulated context to the action action["context"] = self.context.get_context(session_id) decision = self.policy.evaluate(action) if decision.result == "DENY": self.context.record_action(session_id, action, decision, None) self.receipts.emit(action, decision, None) return ToolCallResponse(error=f"Policy denied: {decision.reason}") if decision.result == "DEFER": return await self.handle_deferral(action, decision) if decision.result == "STEP_UP": approval = await self.request_approval(action) if not approval.granted: self.context.record_action(session_id, action, decision, None) self.receipts.emit(action, decision, None) return ToolCallResponse(error="Approval denied") if decision.result == "MODIFY": request = self.apply_modifications(request, decision.modifications) result = await self.backend.call(request) # Record in context and receipts self.context.record_action(session_id, action, decision, result) self.receipts.emit(action, decision, result) return result
AARM requires five authorization decisions. Here is how each maps to gateway behavior:
async def handle_deferral(self, action: dict, decision: dict) -> ToolCallResponse: """ DEFER: Hold execution until additional context resolves ambiguity. The gateway can collect context from: - Downstream verification services - User confirmation via the approval service - Session metadata updates Limitation: Without visibility into agent reasoning, the gateway may need to escalate to human review if it cannot resolve the deferral autonomously. """ deferral = await self.deferral_service.create( action=action, reason=decision.reason, timeout=decision.timeout or 300, # default 5 min context_needed=decision.context_needed, ) # Wait for resolution (blocks the tool call) resolution = await deferral.wait() if resolution.resolved: # Re-evaluate with new context action["context"]["deferral_resolution"] = resolution.data new_decision = self.policy.evaluate(action) if new_decision.result == "ALLOW": result = await self.backend.call(action["original_request"]) self.receipts.emit(action, new_decision, result) return result # Deferral expired or denied self.receipts.emit(action, decision, None) return ToolCallResponse(error=f"Deferred action not resolved: {decision.reason}")def apply_modifications(self, request: ToolCallRequest, modifications: dict) -> ToolCallRequest: """ MODIFY: Adjust parameters before forwarding. Examples: - Redact sensitive fields from query parameters - Add row-level security filters to database queries - Scope file access to a specific directory - Reduce batch sizes to limit blast radius """ modified = request.copy() for field, value in modifications.items(): if field == "params": modified.params = {**modified.params, **value} elif field == "redact": for key in value: if key in modified.params: modified.params[key] = "[REDACTED]" elif field == "add_filter": modified.params["_aarm_filter"] = value return modified
This NetworkPolicy is what gives the gateway its high bypass resistance. Without it, agents could connect directly to MCP backends, bypassing all policy enforcement. Verify this policy is active before claiming AARM conformance.
For non-Kubernetes environments, enforce equivalent network isolation using firewall rules, VPC security groups, or service mesh policies (e.g., Istio AuthorizationPolicy).
In production, the gateway routes to multiple MCP backends based on the tool being invoked:
# gateway/router.pyclass BackendRouter: """Route tool calls to the correct MCP backend.""" def __init__(self, config: dict): self.backends = {} self.tool_map = {} for backend in config["backends"]: client = MCPClient(backend["url"]) self.backends[backend["name"]] = client # Map tools to backends (discovered or configured) for tool in backend.get("tools", []): self.tool_map[tool] = backend["name"] async def discover_tools(self): """Query each backend for its available tools.""" for name, client in self.backends.items(): tools = await client.list_tools() for tool in tools: self.tool_map[tool.name] = name def route(self, tool_name: str) -> MCPClient: backend_name = self.tool_map.get(tool_name) if not backend_name: raise ToolNotFoundError(f"No backend registered for tool: {tool_name}") return self.backends[backend_name]
The gateway must fail closed. If the policy engine, context store, or receipt store is unavailable, the gateway must deny all actions rather than allow them to pass through unmediated.
async def handle_tool_call(self, request: ToolCallRequest) -> ToolCallResponse: try: action = self.build_action(request) action["context"] = self.context.get_context(action["identity"]["session"]) decision = self.policy.evaluate(action) except PolicyEngineUnavailable: # FAIL CLOSED — do not forward to backend return ToolCallResponse( error="AARM gateway: policy engine unavailable. Action denied." ) except ContextStoreUnavailable: # FAIL CLOSED — context is required for conformance (R2) return ToolCallResponse( error="AARM gateway: context store unavailable. Action denied." ) # ... normal flow
ALLOW, DENY, MODIFY, STEP_UP, DEFER all implemented
R5
MUST
Tamper-evident receipts
ReceiptGenerator signs with Ed25519 key, stores to PostgreSQL
R6
MUST
Identity binding
Identity extracted from MCP headers, validated on every request
R7
SHOULD
Semantic distance tracking
Limited at gateway level — consider SDK layer for embedding similarity
R8
SHOULD
Telemetry export
OTel traces + metrics exported to collector
R9
SHOULD
Least privilege enforcement
MODIFY decisions scope credentials; combine with JIT credential issuer
AARM Core (R1-R6): Achievable with the gateway pattern alone.AARM Extended (R7-R9): R7 (semantic distance) is difficult at the gateway level since you lack access to agent embeddings. Pair with an SDK layer for full Extended compliance.