businys.dev

Documentation

@businys/ops

Production MCP middleware. Observer Mode, stdio Bridge, OpenTelemetry, Agent Lineage, rate limiting, metering, audit logging, and agent reputation — extracted from the production stack at businys.app. MIT licensed. Zero external dependencies.

Installation

npm install @businys/ops
# or
pnpm add @businys/ops

Observer Mode

The fastest way to see what your MCP server is doing. One import, no account, no config. A local dashboard opens at localhost:3100 and streams every tool call in real time.

import { observe } from "@businys/ops"

const ops = await observe()
// → Dashboard running at http://localhost:3100

// Add ops.middleware to your MCP server pipeline
// Every tool call is now visible in the dashboard

Options

const ops = await observe({
  port: 3200,           // default: 3100
  hostname: "0.0.0.0",  // default: "localhost"
  maxCalls: 500,        // default: 200 (ring buffer)
})

console.log(ops.url)   // "http://0.0.0.0:3200"

// Programmatic access
const calls = await ops.storage.getCalls({ limit: 20 })
const stats = await ops.storage.getStats()

// Shut down
await ops.close()

Dashboard API

The Observer dashboard exposes a small HTTP API you can query directly:

GET /api/calls          # paginated call history
GET /api/stats          # aggregated stats
GET /api/reputation/:id # agent reputation record
GET /events             # SSE stream of new calls

CLI

The businys-ops CLI wraps Observer Mode and the stdio Bridge as standalone processes.

# Observer dashboard
npx @businys/ops observe
npx @businys/ops observe --port 3200

# Check stats from a running instance
npx @businys/ops status

# Wrap any stdio MCP server as managed HTTP
npx @businys/ops bridge node ./my-server.js
npx @businys/ops bridge python server.py --port 3200 --rate-limit 50
npx @businys/ops bridge npx some-mcp-server --name my-tools

stdio Bridge

Wrap any stdio MCP server as a managed Streamable HTTP endpoint with the full middleware pipeline. Works with any server — Node, Python, npx.

import { createBridge } from "@businys/ops"

const bridge = await createBridge(["node", "my-server.js"], {
  port: 3100,
  // proxy: createMCPProxy({ rateLimit: { globalMax: 100 } }),
})

console.log(bridge.url)  // http://localhost:3100
// GET  /health → { status: "ok", serverInfo: { ... } }
// POST /        → JSON-RPC with full middleware pipeline

await bridge.close()

OpenTelemetry

Pass any OTel-compatible Tracer — zero new dependencies. The middleware uses structural typing so it works with any OTel SDK version.

import { createMCPProxy } from "@businys/ops"
import { trace } from "@opentelemetry/api"

const proxy = createMCPProxy({
  telemetry: {
    tracer: trace.getTracer("my-mcp-server"),
    recordInput: true,    // attach input JSON as span attribute
    inputMaxLength: 500,  // truncate long inputs
    attributePrefix: "mcp", // default
  },
})

Span name format: serverName/toolName. Standard attributes include mcp.tool.name, mcp.agent.id, mcp.duration_ms, mcp.is_error, and mcp.tool.destructive.

Agent Lineage

A causal DAG that traces every tool call back to the originating human intent — through every agent delegation, with cryptographic integrity. Built for the EU AI Act, SOC 2 auditors, and multi-agent workflows.

import { createMCPProxy, MemoryLineageStore, verifyLineage } from "@businys/ops"

const store = new MemoryLineageStore()

const proxy = createMCPProxy({
  lineage: { store },
})

// After calls complete, verify the chain is intact
const result = await verifyLineage(rootId, store)
console.log(result.valid)     // true iff all hashes check out
console.log(result.maxDepth)  // delegation depth reached
console.log(result.errors)    // [] if valid

Header propagation

When using the Bridge, lineage context flows automatically via HTTP headers. Downstream agents receive X-Lineage-Id in the response and can pass it forward as X-Lineage-Parent to chain delegations.

// Inbound (set by calling agent)
X-Lineage-Root:   <rootId>   // originating human prompt
X-Lineage-Parent: <parentId> // immediate parent node
X-Lineage-Depth:  <number>   // delegation depth (0 = human)

// Outbound (returned by bridge)
X-Lineage-Id:     <nodeId>   // id of the node created for this call

Custom LineageStore

Swap MemoryLineageStore for any persistent backend by implementing the LineageStore interface:

import type { LineageStore, LineageNode } from "@businys/ops"

const myStore: LineageStore = {
  async recordNode(node: LineageNode) { /* persist to DB */ },
  async getChain(rootId: string) { /* return nodes sorted by timestamp */ },
  async getNode(id: string) { /* lookup single node */ },
}

createMCPProxy

The batteries-included factory. Pipeline order: lineage → telemetry → reputation → rate limit → confirmation → metering → audit → custom.

import { createMCPProxy, MemoryLineageStore } from "@businys/ops"

const proxy = createMCPProxy({
  rateLimit: {
    globalMax: 1000,  // calls per window across all agents
    groupMax: 100,    // calls per window per agent
    windowMs: 3600000 // 1 hour
  },
  lineage: { store: new MemoryLineageStore() },
  // telemetry: { tracer },
  auditLog: (entry) => console.log(entry), // default: stderr
  disable: {
    confirmation: true, // disable specific layers
  },
  middleware: [myCustomMiddleware], // append custom layers
})

// proxy.middleware — array of assembled middleware
// proxy.storage    — the MemoryAdapter
// proxy.run(ctx, handler) — run pipeline directly

Writing middleware

import type { Middleware } from "@businys/ops"

const myMiddleware: Middleware = {
  name: "my-middleware",
  async execute(ctx, next) {
    console.log("Before:", ctx.toolName)
    const result = await next()
    console.log("After:", result.isError ? "error" : "ok")
    return result
  },
}

MiddlewareContext

interface MiddlewareContext {
  toolName:   string
  toolGroup:  string
  toolTier:   "core" | "craft" | "kit"
  method:     string
  path:       string
  input:      Record<string, unknown>
  agentId:    string
  serverName: string
  startedAt:  number    // Date.now()
  destructive: boolean  // true = confirmation required
}

Custom storage

The default MemoryAdapter is a ring buffer. For persistent storage, implement StorageAdapter:

import type { StorageAdapter } from "@businys/ops"

const myAdapter: StorageAdapter = {
  async recordCall(record) { /* persist to DB */ },
  async getCalls(opts) { /* paginated query */ },
  async getStats() { /* aggregations */ },
  async getReputation(agentId) { /* fetch rep record */ },
  async updateReputation(agentId, signal) { /* update score */ },
  async checkLoop(agentId, hash) { /* ring buffer check */ },
  subscribe(listener) { /* SSE push, return unsub fn */ },
}

Roadmap

Observer Mode — zero-config dashboard
Rate limiting — per-agent, per-group, sliding window
Metering — call counting, cost attribution
Audit log — structured JSON, append-only
Agent reputation — score-based throttling + loop detection
OpenTelemetry — zero-dep structural typing, bring your own Tracer
stdio Bridge — wrap any stdio MCP server as managed HTTP
Agent Lineage — causal DAG, SHA-256 hash chain, EU AI Act ready
soonMCP Revenue — per-call billing + Stripe Connect