# JustinX — Full Tool Reference > Real-time data streaming infrastructure for AI agents. Connect any data source > (MQTT, Webhook, Kafka) and stream it through Redis into WebSockets, with > server-side TypeScript watchers for alerts, anomaly detection, and aggregation. This is the detailed reference for all JustinX MCP tools. For a summary, see /llms.txt. ## Architecture Data flows through JustinX in a single pipeline: Source (MQTT | Webhook | Kafka) -> Redis Stream -> WebSocket relay -> Watchers - **Connections** are the entry point. Each connection subscribes to a data source and writes to a Redis stream. - **WebSocket** relay fans out stream entries to any connected client. The WS URL is returned by create_connection. - **Watchers** are server-side TypeScript processes that read from the Redis stream. They run as child processes with access to environment variables: WATCHER_ID, WATCHER_CONFIG, WATCHER_CONNECTION_ID. ## Authentication All API and MCP calls are scoped to an organization via API key (Bearer token) or JWT. - API keys are managed at https://justinx.ai/dashboard/settings - MCP config is available at GET /mcp/config (authenticated) ## WebSocket Protocol Connect to the WebSocket URL returned by create_connection or list_connections. Messages follow this protocol: - **Backfill message**: `{ "type": "backfill", "entries": [{ "id": "...", "fields": { "topic": "...", "payload": "..." }, "ts": 1234567890 }] }` - **Live entry**: `{ "type": "entry", "id": "...", "fields": { "topic": "...", "payload": "..." }, "ts": 1234567890 }` - **Topic filtering**: Append `?topics=topic1,topic2` to the WS URL to filter by topic. ## Instructions for AI Agents - Always call read_stream after creating a connection to verify data is flowing - Use the WebSocket URL for real-time consumers (dashboards, apps, scripts) - Use watchers for background processing (alerts, aggregation, anomaly detection) - Prefer update_watcher_config over delete + recreate when tuning watcher parameters - Watcher scripts are TypeScript — they can import from npm packages available in the runtime - Use get_watcher_logs to debug issues before deleting and recreating watchers ## MCP Tools Reference ### create_connection Create a data connection. For MQTT: connects to a broker and subscribes to topics. For Webhook: creates an HTTP ingest endpoint. For Kafka: connects to a Kafka broker via a consumer group and consumes from topics. All stream through Redis into a WebSocket. **Parameters:** - `type` (string, optional, default: "mqtt", one of: "mqtt" | "webhook" | "kafka"): Source type: "mqtt" (default), "webhook", or "kafka" - `name` (string, optional): Display name for the connection - `broker` (string, optional): MQTT broker hostname, e.g. "broker.emqx.io" (required for mqtt) - `port` (number, optional, default: 8883): Broker port (default 8883, mqtt only) - `tls` (boolean, optional, default: true): Use TLS (default true, mqtt only) - `username` (string, optional, default: ""): MQTT username - `password` (string, optional, default: ""): MQTT password - `topics` (array, optional): MQTT topic filters to subscribe to (required for mqtt) - `brokers` (array, optional): Kafka bootstrap servers, e.g. ["localhost:9092"] (required for kafka) - `groupId` (string, optional): Kafka consumer group ID (auto-generated if omitted) - `kafkaTopics` (array, optional): Kafka topics to consume from (required for kafka) - `saslUsername` (string, optional): Kafka SASL/PLAIN username - `saslPassword` (string, optional): Kafka SASL/PLAIN password - `ssl` (boolean, optional): Kafka TLS (defaults to true when SASL is provided) ### list_connections List all active connections and their status. **Parameters:** _(none)_ ### get_connection Get status of a specific connection (message count, WebSocket clients, stream key). **Parameters:** - `connectionId` (string, required): The connection ID (e.g. "conn_a1b2c3d4") ### destroy_connection Destroy a connection — stops source, cleans up Redis stream, kills watchers. **Parameters:** - `connectionId` (string, required): The connection ID to destroy ### read_stream Read entries from a connection's live stream. Reads backfill + live entries directly from Redis, then returns them. Use this to inspect what data is flowing through a connection. **Parameters:** - `connectionId` (string, required): The connection ID to read from - `backfillSeconds` (number, optional, default: 300): Include backfill from the last N seconds (0 = skip backfill, default 300 = 5 min) - `liveSeconds` (number, optional, default: 3): Listen for live entries for N seconds (0 = skip live, default 3) - `maxEntries` (number, optional, default: 50): Max entries to collect before stopping (default 50) ### create_watcher Deploy a watcher script on a connection. The script runs as a child process with access to the Redis stream. It receives WATCHER_ID, WATCHER_CONFIG, and WATCHER_CONNECTION_ID as environment variables. **Parameters:** - `connectionId` (string, required): The connection ID to attach the watcher to - `scriptContent` (string, required): TypeScript source code for the watcher - `config` (string, optional, default: "{}"): JSON config string passed to the watcher as WATCHER_CONFIG env var ### list_watchers List all watchers for a connection with their status, PID, and restart count. **Parameters:** - `connectionId` (string, required): The connection ID ### get_watcher Get details of a specific watcher including its source code, status, config, and restart count. **Parameters:** - `connectionId` (string, required): The connection ID - `watcherId` (string, required): The watcher ID ### get_watcher_logs Get stdout/stderr logs from a running or crashed watcher. **Parameters:** - `connectionId` (string, required): The connection ID - `watcherId` (string, required): The watcher ID ### update_watcher_config Update a watcher's config. The watcher process will be restarted with the new config. **Parameters:** - `connectionId` (string, required): The connection ID - `watcherId` (string, required): The watcher ID - `config` (string, required): New JSON config string ### restart_watcher Restart a stopped or crashed watcher. Sets the watcher status back to running so it will be re-spawned by the reconciliation loop. **Parameters:** - `connectionId` (string, required): The connection ID - `watcherId` (string, required): The watcher ID to restart ### delete_watcher Stop and remove a watcher from a connection. **Parameters:** - `connectionId` (string, required): The connection ID - `watcherId` (string, required): The watcher ID to delete ## REST API Endpoints For clients that prefer HTTP over MCP, the same operations are available via REST: - POST /connections — create connection (MQTT, Webhook, or Kafka) - GET /connections — list connections - GET /connections/:id — connection status - DELETE /connections/:id — destroy connection - WS /connections/:id/ws — live WebSocket stream - POST /connections/:id/ingest — webhook data ingest (no auth, by connection ID) - GET /connections/:id/stream/sample?count=5 — sample recent entries - POST /connections/:id/watchers — create watcher - GET /connections/:id/watchers — list watchers - GET /connections/:id/watchers/:wid — get watcher details - DELETE /connections/:id/watchers/:wid — delete watcher - PUT /connections/:id/watchers/:wid/config — update watcher config - PUT /connections/:id/watchers/:wid/restart — restart watcher - GET /connections/:id/watchers/:wid/logs — get watcher logs - POST /mcp — MCP HTTP transport (Streamable HTTP) - GET /mcp/config — MCP config snippet for Claude Code / Cursor - GET /health — health check (no auth) Base URL: https://api.justinx.ai