justinX

Documentation

Connect your real-time data sources to AI coding tools. This guide covers architecture, setup, and integration.

How data flows through JustinX

JustinX sits between your data sources and your AI tools. Every message flows through a single pipeline — no message transformation, no schema changes. We relay what your source sends, byte-for-byte.

Sources
MQTT
Webhook
Kafka

Redis Stream

Buffer & order

Watchers

Server-side TypeScript

Consumers
Dashboards
AI Agents
Apps

Connections

Entry point. Each connection subscribes to a data source and writes to a Redis stream. Supports MQTT, Webhook, and Kafka.

WebSocket Relay

Fans out stream entries to every connected client in real time. One buffer, many readers. Clients don't need to know about your broker.

Watchers

Server-side TypeScript processes that read from the Redis stream. Alerts, aggregation, anomaly detection — deployed via MCP.

Connect to live data in 5 steps

This walkthrough uses a public MQTT broker with IoT sensor data. You'll see real telemetry flowing within a few minutes.

1

Sign up and get your API key

Create an account at justinx.ai/signup. Navigate to Dashboard → Settings and copy your API key.

2

Add JustinX to your AI tool

Add the MCP server config to your AI tool. See the MCP Integration section below for Claude Code and Lovable configs.

3

Create an MQTT connection

Ask your AI tool to create a connection to a public IoT demo broker:

Prompt to your AI tool
Connect to the public MQTT broker at broker.emqx.io
and subscribe to the topic "justinx/demo/#"

Under the hood, this calls create_connection with the broker, port, and topic filter.

4

Read the stream

Verify data is flowing by sampling recent messages:

Prompt
Read the latest messages from the connection you just created

This calls read_stream. You'll see temperature, humidity, and air quality telemetry from IoT sensors.

5

Build something

Now ask your AI tool to build on the live data. The WebSocket URL returned by create_connection can be passed directly to any frontend:

Prompt
Build me a live dashboard showing temperature readings
from the sensors. Use the WebSocket URL from the connection.

Connect your AI tool

JustinX uses the Model Context Protocol (MCP) — an open standard that lets AI tools read external data sources. Add the config below to your AI tool and it can immediately create connections, read streams, and deploy watchers.

Claude Code

Add to .claude/settings.json in your project:

.claude/settings.json
{
  "mcpServers": {
    "justinx": {
      "url": "https://api.justinx.ai/mcp",
      "headers": {
        "Authorization": "Bearer YOUR_API_KEY"
      }
    }
  }
}

Lovable

In your Lovable project, go to Integrations → MCP and add a new server:

Lovable MCP config
Server URL:  https://api.justinx.ai/mcp
Header:      Authorization: Bearer YOUR_API_KEY

OpenClaw

Install the JustinX skill from ClawHub, then add your API key:

Terminal
clawhub install justinx
~/.openclaw/openclaw.json
{
  "skills": {
    "entries": {
      "justinx": {
        "enabled": true,
        "apiKey": "YOUR_API_KEY"
      }
    }
  }
}

Any MCP client

The MCP endpoint supports Streamable HTTP transport. Point any compatible client at:

POST https://api.justinx.ai/mcp
Authorization: Bearer YOUR_API_KEY

Consume live data directly

Every connection returns a WebSocket URL. Connect to it from any client — a React app, a Node script, a mobile app — to receive live data. No polling. Sub-second latency.

Message format

On connect, you receive a backfill of recent messages, then live entries as they arrive:

Backfill (sent once on connect)
{
  "type": "backfill",
  "entries": [
    {
      "id": "1771600700156-0",
      "fields": { "topic": "sensors/zone-a", "payload": "{...}" },
      "ts": 1771600700156
    }
  ]
}
Live entry (streamed in real time)
{
  "type": "entry",
  "id": "1771600701308-0",
  "fields": { "topic": "sensors/zone-c", "payload": "{...}" },
  "ts": 1771600701308
}

React example

Connect to the WebSocket URL returned by create_connection or list_connections:

useJustinXStream.ts
import { useEffect, useState, useRef } from "react";

interface StreamEntry {
  id: string;
  fields: { topic: string; payload: string };
  ts: number;
}

export function useJustinXStream(wsUrl: string) {
  const [entries, setEntries] = useState<StreamEntry[]>([]);
  const wsRef = useRef<WebSocket | null>(null);

  useEffect(() => {
    const ws = new WebSocket(wsUrl);
    wsRef.current = ws;

    ws.onmessage = (event) => {
      const msg = JSON.parse(event.data);

      if (msg.type === "backfill") {
        setEntries(msg.entries);
      } else if (msg.type === "entry") {
        setEntries((prev) => [...prev.slice(-99), msg]);
      }
    };

    return () => ws.close();
  }, [wsUrl]);

  return entries;
}

Topic filtering: Append ?topics=topic1,topic2 to the WebSocket URL to receive only matching entries.

Server-side processing on your stream

A watcher is a TypeScript process that runs on JustinX and reads every message from your connection's Redis stream. Use watchers for threshold alerts, data aggregation, anomaly detection, or any background logic.

How watchers work

  1. Your AI tool calls create_watcher with TypeScript source code
  2. JustinX spawns an isolated process with access to the connection's stream
  3. The watcher reads every message via blocking Redis XREAD — zero polling
  4. When conditions match, it fires — writes alerts to the stream, calls webhooks, etc.

Example: Temperature alert

This watcher fires an alert whenever a sensor's temperature exceeds the configured threshold:

Temperature alert watcher
import Redis from "ioredis";

const config = JSON.parse(process.env.WATCHER_CONFIG || "{}");
const threshold = config.threshold ?? 23;
const connId = process.env.WATCHER_CONNECTION_ID || "";
const redis = new Redis(process.env.REDIS_URL || "redis://localhost:6379");
const streamKey = `${connId}:stream`;

let lastId = "$";
console.log(`Watching for temperature > ${threshold}°C`);

while (true) {
  const result = await redis.xread("BLOCK", 5000, "STREAMS", streamKey, lastId);
  if (!result) continue;

  for (const [, entries] of result) {
    for (const [id, fields] of entries) {
      lastId = id;
      const payload = JSON.parse(fields.payload || "{}");
      const temp = payload.sensors?.environmental?.temperature?.value;
      const device = payload.metadata?.device_name;

      if (temp !== undefined && temp > threshold) {
        console.log(`ALERT: ${device} temperature ${temp.toFixed(1)}°C`);
        await redis.xadd(streamKey, "*",
          "type", "alert",
          "topic", "watcher/temperature-alert",
          "payload", JSON.stringify({ device, metric: "temperature", value: temp, threshold })
        );
      }
    }
  }
}

Environment variables

VariableDescription
WATCHER_IDUnique ID for this watcher instance
WATCHER_CONFIGJSON config string passed at creation
WATCHER_CONNECTION_IDThe connection this watcher is attached to
REDIS_URLRedis connection string for stream access

Tips: Use update_watcher_config to change thresholds without redeploying. Use get_watcher_logs to debug issues before deleting and recreating.

Full tool and REST reference

JustinX exposes 12 MCP tools for AI agents and a matching REST API for direct HTTP integration. The full reference with parameter schemas is auto-generated from the source code.

REST API base URL

https://api.justinx.ai

All endpoints require a Bearer token in the Authorization header (except GET /health and webhook ingest).

Self-hosting

Self-hosting JustinX is available for teams that need on-premise deployment. Contact us for details.

Ready to connect your data?

Free to start. No credit card required.

Start for free