Overview
Every AARM action generates a signed receipt—a cryptographic record that proves:- What action was requested
- What policy decision was made
- What the outcome was
- That the record hasn’t been tampered with
Receipt Schema
Copy
{
"receipt_id": "rct_7f8a9b2c3d4e",
"version": "1.0",
"action": {
"action_id": "act_1a2b3c4d",
"timestamp": "2025-01-15T10:30:00.000Z",
"tool": "email",
"operation": "send",
"parameters": {
"to": "external@partner.com",
"subject": "Report"
},
"identity": {
"human": "alice@company.com",
"service": "agent-svc",
"session": "sess_xyz"
}
},
"decision": {
"result": "DENY",
"policy_id": "block-pii-external",
"reason": "Cannot send PII to external recipients"
},
"approval": null,
"execution": null,
"signature": {
"algorithm": "Ed25519",
"key_id": "aarm-signing-2025-01",
"value": "base64-encoded-signature"
}
}
Implementation
Receipt Generator
Copy
# aarm/receipts.py
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives import serialization
import json
import hashlib
class ReceiptGenerator:
def __init__(self, private_key_path: str, key_id: str):
with open(private_key_path, "rb") as f:
self.private_key = Ed25519PrivateKey.from_private_bytes(f.read())
self.key_id = key_id
def generate(
self,
action: dict,
decision: dict,
approval: dict | None,
execution: dict | None
) -> dict:
receipt = {
"receipt_id": f"rct_{generate_id()}",
"version": "1.0",
"action": action,
"decision": decision,
"approval": approval,
"execution": execution,
}
# Sign the receipt
receipt["signature"] = self.sign(receipt)
return receipt
def sign(self, receipt: dict) -> dict:
# Canonical serialization (sorted keys, no whitespace)
content = receipt.copy()
content.pop("signature", None)
canonical = json.dumps(content, sort_keys=True, separators=(",", ":"))
# Sign
signature = self.private_key.sign(canonical.encode())
return {
"algorithm": "Ed25519",
"key_id": self.key_id,
"value": base64.b64encode(signature).decode()
}
Receipt Verifier
Copy
# aarm/verify.py
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
class ReceiptVerifier:
def __init__(self, public_keys: dict[str, Ed25519PublicKey]):
self.public_keys = public_keys # key_id -> public_key
def verify(self, receipt: dict) -> bool:
sig = receipt.get("signature")
if not sig:
return False
# Get public key
key_id = sig.get("key_id")
public_key = self.public_keys.get(key_id)
if not public_key:
return False
# Reconstruct canonical content
content = receipt.copy()
content.pop("signature")
canonical = json.dumps(content, sort_keys=True, separators=(",", ":"))
# Verify signature
try:
signature = base64.b64decode(sig["value"])
public_key.verify(signature, canonical.encode())
return True
except Exception:
return False
Key Management
Generate Keys
Copy
# Generate Ed25519 key pair
openssl genpkey -algorithm ED25519 -out aarm-signing.key
openssl pkey -in aarm-signing.key -pubout -out aarm-signing.pub
Key Rotation
Rotate keys periodically without invalidating old receipts:Copy
class KeyRotator:
def __init__(self):
self.current_key_id = "aarm-signing-2025-01"
self.keys = {
"aarm-signing-2024-01": load_key("keys/2024-01.key"),
"aarm-signing-2025-01": load_key("keys/2025-01.key"),
}
def get_signing_key(self):
return self.keys[self.current_key_id]
def rotate(self, new_key_id: str, new_key_path: str):
self.keys[new_key_id] = load_key(new_key_path)
self.current_key_id = new_key_id
Storage
PostgreSQL
Copy
CREATE TABLE receipts (
receipt_id VARCHAR(255) PRIMARY KEY,
action_id VARCHAR(255) NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
tool VARCHAR(255) NOT NULL,
operation VARCHAR(255) NOT NULL,
decision VARCHAR(50) NOT NULL,
policy_id VARCHAR(255),
human_principal VARCHAR(255),
session_id VARCHAR(255),
receipt_json JSONB NOT NULL,
signature BYTEA NOT NULL,
INDEX idx_receipts_timestamp (timestamp),
INDEX idx_receipts_human (human_principal),
INDEX idx_receipts_session (session_id),
INDEX idx_receipts_decision (decision)
);
Immutable Storage
For compliance, consider append-only storage:- Amazon QLDB (quantum ledger)
- Azure Immutable Blob Storage
- Timestamped S3 with Object Lock
Verification Workflow
Copy
# Forensic investigation
def investigate_session(session_id: str):
receipts = receipt_store.get_by_session(session_id)
for receipt in receipts:
# Verify integrity
if not verifier.verify(receipt):
print(f"⚠️ TAMPERED: {receipt['receipt_id']}")
continue
# Reconstruct timeline
print(f"{receipt['action']['timestamp']}: "
f"{receipt['action']['tool']}.{receipt['action']['operation']} "
f"→ {receipt['decision']['result']}")