Documentation
@businys/ops
Production MCP middleware. Observer Mode, stdio Bridge, OpenTelemetry, Agent Lineage, rate limiting, metering, audit logging, and agent reputation — extracted from the production stack at businys.app. MIT licensed. Zero external dependencies.
Installation
npm install @businys/ops
# or
pnpm add @businys/opsObserver Mode
The fastest way to see what your MCP server is doing. One import, no account, no config. A local dashboard opens at localhost:3100 and streams every tool call in real time.
import { observe } from "@businys/ops"
const ops = await observe()
// → Dashboard running at http://localhost:3100
// Add ops.middleware to your MCP server pipeline
// Every tool call is now visible in the dashboardOptions
const ops = await observe({
port: 3200, // default: 3100
hostname: "0.0.0.0", // default: "localhost"
maxCalls: 500, // default: 200 (ring buffer)
})
console.log(ops.url) // "http://0.0.0.0:3200"
// Programmatic access
const calls = await ops.storage.getCalls({ limit: 20 })
const stats = await ops.storage.getStats()
// Shut down
await ops.close()Dashboard API
The Observer dashboard exposes a small HTTP API you can query directly:
GET /api/calls # paginated call history
GET /api/stats # aggregated stats
GET /api/reputation/:id # agent reputation record
GET /events # SSE stream of new callsCLI
The businys-ops CLI wraps Observer Mode and the stdio Bridge as standalone processes.
# Observer dashboard
npx @businys/ops observe
npx @businys/ops observe --port 3200
# Check stats from a running instance
npx @businys/ops status
# Wrap any stdio MCP server as managed HTTP
npx @businys/ops bridge node ./my-server.js
npx @businys/ops bridge python server.py --port 3200 --rate-limit 50
npx @businys/ops bridge npx some-mcp-server --name my-toolsstdio Bridge
Wrap any stdio MCP server as a managed Streamable HTTP endpoint with the full middleware pipeline. Works with any server — Node, Python, npx.
import { createBridge } from "@businys/ops"
const bridge = await createBridge(["node", "my-server.js"], {
port: 3100,
// proxy: createMCPProxy({ rateLimit: { globalMax: 100 } }),
})
console.log(bridge.url) // http://localhost:3100
// GET /health → { status: "ok", serverInfo: { ... } }
// POST / → JSON-RPC with full middleware pipeline
await bridge.close()OpenTelemetry
Pass any OTel-compatible Tracer — zero new dependencies. The middleware uses structural typing so it works with any OTel SDK version.
import { createMCPProxy } from "@businys/ops"
import { trace } from "@opentelemetry/api"
const proxy = createMCPProxy({
telemetry: {
tracer: trace.getTracer("my-mcp-server"),
recordInput: true, // attach input JSON as span attribute
inputMaxLength: 500, // truncate long inputs
attributePrefix: "mcp", // default
},
})Span name format: serverName/toolName. Standard attributes include mcp.tool.name, mcp.agent.id, mcp.duration_ms, mcp.is_error, and mcp.tool.destructive.
Agent Lineage
A causal DAG that traces every tool call back to the originating human intent — through every agent delegation, with cryptographic integrity. Built for the EU AI Act, SOC 2 auditors, and multi-agent workflows.
import { createMCPProxy, MemoryLineageStore, verifyLineage } from "@businys/ops"
const store = new MemoryLineageStore()
const proxy = createMCPProxy({
lineage: { store },
})
// After calls complete, verify the chain is intact
const result = await verifyLineage(rootId, store)
console.log(result.valid) // true iff all hashes check out
console.log(result.maxDepth) // delegation depth reached
console.log(result.errors) // [] if validHeader propagation
When using the Bridge, lineage context flows automatically via HTTP headers. Downstream agents receive X-Lineage-Id in the response and can pass it forward as X-Lineage-Parent to chain delegations.
// Inbound (set by calling agent)
X-Lineage-Root: <rootId> // originating human prompt
X-Lineage-Parent: <parentId> // immediate parent node
X-Lineage-Depth: <number> // delegation depth (0 = human)
// Outbound (returned by bridge)
X-Lineage-Id: <nodeId> // id of the node created for this callCustom LineageStore
Swap MemoryLineageStore for any persistent backend by implementing the LineageStore interface:
import type { LineageStore, LineageNode } from "@businys/ops"
const myStore: LineageStore = {
async recordNode(node: LineageNode) { /* persist to DB */ },
async getChain(rootId: string) { /* return nodes sorted by timestamp */ },
async getNode(id: string) { /* lookup single node */ },
}createMCPProxy
The batteries-included factory. Pipeline order: lineage → telemetry → reputation → rate limit → confirmation → metering → audit → custom.
import { createMCPProxy, MemoryLineageStore } from "@businys/ops"
const proxy = createMCPProxy({
rateLimit: {
globalMax: 1000, // calls per window across all agents
groupMax: 100, // calls per window per agent
windowMs: 3600000 // 1 hour
},
lineage: { store: new MemoryLineageStore() },
// telemetry: { tracer },
auditLog: (entry) => console.log(entry), // default: stderr
disable: {
confirmation: true, // disable specific layers
},
middleware: [myCustomMiddleware], // append custom layers
})
// proxy.middleware — array of assembled middleware
// proxy.storage — the MemoryAdapter
// proxy.run(ctx, handler) — run pipeline directlyWriting middleware
import type { Middleware } from "@businys/ops"
const myMiddleware: Middleware = {
name: "my-middleware",
async execute(ctx, next) {
console.log("Before:", ctx.toolName)
const result = await next()
console.log("After:", result.isError ? "error" : "ok")
return result
},
}MiddlewareContext
interface MiddlewareContext {
toolName: string
toolGroup: string
toolTier: "core" | "craft" | "kit"
method: string
path: string
input: Record<string, unknown>
agentId: string
serverName: string
startedAt: number // Date.now()
destructive: boolean // true = confirmation required
}Custom storage
The default MemoryAdapter is a ring buffer. For persistent storage, implement StorageAdapter:
import type { StorageAdapter } from "@businys/ops"
const myAdapter: StorageAdapter = {
async recordCall(record) { /* persist to DB */ },
async getCalls(opts) { /* paginated query */ },
async getStats() { /* aggregations */ },
async getReputation(agentId) { /* fetch rep record */ },
async updateReputation(agentId, signal) { /* update score */ },
async checkLoop(agentId, hash) { /* ring buffer check */ },
subscribe(listener) { /* SSE push, return unsub fn */ },
}