Queue Pilot

平台与服务

by larscowe

用于检查 RabbitMQ 与 Kafka 消息的 MCP server,并支持结合 JSON Schema 进行校验。

什么是 Queue Pilot

用于检查 RabbitMQ 与 Kafka 消息的 MCP server,并支持结合 JSON Schema 进行校验。

README

Queue Pilot

npm version license CI npm downloads node TypeScript

MCP server for message queue development — combines message inspection with JSON Schema validation. Supports RabbitMQ and Kafka.

Designed for integration projects where multiple teams communicate via message brokers: inspect queues/topics, view messages, and validate payloads against agreed-upon schemas — all from your AI assistant.

<p align="center"> <img src="docs/queue-pilot-diagram.svg" alt="Queue Pilot architecture: MCP clients connect to Queue Pilot, which interfaces with RabbitMQ and Kafka" width="720"> </p>

Features

  • Multi-broker support — RabbitMQ and Apache Kafka via a unified adapter interface
  • Message Inspection — Browse queues/topics, peek at messages without consuming them
  • Schema Validation — Validate message payloads against JSON Schema definitions
  • Combined Inspectioninspect_queue peeks messages AND validates each against its schema
  • Validated Publishingpublish_message validates against a schema before sending — invalid messages never hit the broker
  • Queue Management — Create queues/topics, bindings, and purge messages for dev/test workflows
  • Broker Info — List exchanges, bindings, consumer groups, and partition details

Prerequisites

  • Node.js >= 22 — Required runtime (check with node --version)
  • A message broker:
    • RabbitMQ with the management plugin enabled (HTTP API on port 15672), or
    • Apache Kafka (requires @confluentinc/kafka-javascript as peer dependency)
  • An MCP-compatible client — Claude Code, Claude Desktop, Cursor, VS Code (Copilot), Windsurf, etc.

Quick Start

1. Define your schemas

Create JSON Schema files in a directory:

schemas/order.created.json:

json
{
  "$id": "order.created",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Order Created",
  "description": "Emitted when a new order is placed",
  "version": "1.0.0",
  "type": "object",
  "required": ["orderId", "amount"],
  "properties": {
    "orderId": { "type": "string" },
    "amount": { "type": "number" }
  }
}

2. Add to your MCP client

Generate the config for your client with queue-pilot init:

bash
npx queue-pilot init --schemas /absolute/path/to/your/schemas --client <name>

Supported clients: claude-code, claude-desktop, vscode, cursor, windsurf. Omit --client for generic JSON.

For Kafka, add --broker kafka. The generated config automatically includes the required @confluentinc/kafka-javascript peer dependency.

Non-default credentials are included as environment variables to avoid exposing secrets in ps output:

bash
npx queue-pilot init --schemas ./schemas --rabbitmq-user admin --rabbitmq-pass secret

Run npx queue-pilot init --help for all options including Kafka SASL authentication.

Windows note: If npx fails to resolve the package, try cmd /c npx queue-pilot init ....

<details> <summary>Manual configuration (without init)</summary>

Add the following server configuration to your MCP client:

RabbitMQ:

json
{
  "mcpServers": {
    "queue-pilot": {
      "command": "npx",
      "args": [
        "-y",
        "queue-pilot",
        "--schemas", "/absolute/path/to/your/schemas"
      ]
    }
  }
}

Kafka:

json
{
  "mcpServers": {
    "queue-pilot": {
      "command": "npx",
      "args": [
        "-y",
        "--package=@confluentinc/kafka-javascript",
        "--package=queue-pilot",
        "queue-pilot",
        "--schemas", "/absolute/path/to/your/schemas",
        "--broker", "kafka"
      ],
      "env": {
        "KAFKA_BROKERS": "localhost:9092"
      }
    }
  }
}

Schema path tip: Use an absolute path for --schemas. Relative paths resolve from the MCP client's working directory, which may not be your project root.

ClientConfig file
Claude Code.mcp.json (project) or ~/.claude.json (user)
Claude Desktopclaude_desktop_config.json
Cursor.cursor/mcp.json
VS Code (Copilot).vscode/mcp.json (uses "servers" instead of "mcpServers")
Windsurf~/.codeium/windsurf/mcp_config.json
</details> <details> <summary>Development (running from source)</summary>
json
{
  "mcpServers": {
    "queue-pilot": {
      "command": "npx",
      "args": [
        "tsx",
        "src/index.ts",
        "--schemas", "./schemas"
      ],
      "cwd": "/path/to/queue-pilot"
    }
  }
}
</details>

3. Use it

Ask your assistant things like:

  • "Which queues are there and how many messages do they have?"
  • "Show me the messages in the orders queue"
  • "Inspect the registration queue and check if all messages are valid"
  • "What schemas are available?"
  • "Validate this message against the order.created schema"
  • "Publish an order.created event to the events exchange"
  • "Create a queue called dead-letters and bind it to the events exchange"
  • "Purge all messages from the orders queue"
  • "List all consumer groups" (Kafka)
  • "Show me the partition details for the orders topic" (Kafka)
<details> <summary>MCP Tools</summary>

Universal tools (all brokers)

ToolDescription
list_schemasList all loaded message schemas
get_schemaGet the full definition of a specific schema
validate_messageValidate a JSON message against a schema
list_queuesList all queues/topics with message counts
peek_messagesView messages in a queue/topic without consuming them
inspect_queuePeek messages + validate each against its schema
get_overviewGet broker cluster overview
check_healthCheck broker health status
get_queueGet detailed information about a specific queue/topic
list_consumersList consumers (RabbitMQ) or consumer groups (Kafka)
publish_messagePublish a message with optional schema validation gate
purge_queueRemove all messages from a queue/topic
create_queueCreate a new queue/topic
delete_queueDelete a queue/topic

RabbitMQ-specific tools

ToolDescription
list_exchangesList all RabbitMQ exchanges
create_exchangeCreate a new exchange
delete_exchangeDelete an exchange
list_bindingsList bindings between exchanges and queues
create_bindingBind a queue to an exchange with a routing key
delete_bindingDelete a binding
list_connectionsList all client connections to the broker

Kafka-specific tools

ToolDescription
list_consumer_groupsList all consumer groups with their state
describe_consumer_groupShow members, assignments, and state of a consumer group
list_partitionsShow partition details for a topic (leader, replicas, ISR)
get_offsetsShow earliest/latest offsets per partition
</details> <details> <summary>MCP Prompts & Resources</summary>

Prompts

Pre-built workflow templates that guide your AI assistant through multi-step operations.

PromptParametersDescription
debug-flowexchange, queueTrace bindings from exchange to queue, peek messages, and validate each against its schema
health-report(none)Check broker health, get cluster overview, flag queues with backed-up messages
schema-compliancequeue (optional)Peek messages and validate each against its schema — for one queue or all queues

Usage example (in any MCP-compatible client):

"Use the debug-flow prompt for exchange 'events' and queue 'orders'"

Resources

Each loaded schema is exposed as a readable MCP resource at schema:///<schema-name>.

Clients that support MCP resources can read schema definitions directly without calling tools. For example, a schema loaded from order.created.json is available at schema:///order.created.

</details>

Schema Format

Schemas follow JSON Schema draft-07 with a few conventions:

  • $id — Message type identifier (matches the type property on messages)
  • version — Schema version (custom field, not validated by JSON Schema)
  • Standard JSON Schema validation including required, properties, format etc.

Schema matching: when inspecting a queue, the message's type property is used to find the corresponding schema by $id.

Configuration

CLI arguments take priority over environment variables, which take priority over defaults.

SettingCLI flagEnv varDefault
Schema directory--schemas(required)
Broker type--brokerrabbitmq
RabbitMQ URL--rabbitmq-urlRABBITMQ_URLhttp://localhost:15672
RabbitMQ user--rabbitmq-userRABBITMQ_USERguest
RabbitMQ password--rabbitmq-passRABBITMQ_PASSguest
Kafka brokers--kafka-brokersKAFKA_BROKERSlocalhost:9092
Kafka client ID--kafka-client-idKAFKA_CLIENT_IDqueue-pilot
SASL mechanism--kafka-sasl-mechanismKAFKA_SASL_MECHANISM(none)
SASL username--kafka-sasl-usernameKAFKA_SASL_USERNAME(none)
SASL password--kafka-sasl-passwordKAFKA_SASL_PASSWORD(none)

Use environment variables in MCP client env blocks to avoid exposing credentials in ps output.

Development

bash
npm install
npm test                    # Unit tests
npm run test:coverage       # Coverage report
npm run build               # TypeScript compilation
npm run typecheck           # Type check

# Integration tests (requires RabbitMQ)
docker compose up -d --wait
npm run test:integration

Tech Stack

License

MIT

常见问题

Queue Pilot 是什么?

用于检查 RabbitMQ 与 Kafka 消息的 MCP server,并支持结合 JSON Schema 进行校验。

相关 Skills

MCP构建

by anthropics

Universal
热门

聚焦高质量 MCP Server 开发,覆盖协议研究、工具设计、错误处理与传输选型,适合用 FastMCP 或 MCP SDK 对接外部 API、封装服务能力。

想让 LLM 稳定调用外部 API,就用 MCP构建:从 Python 到 Node 都有成熟指引,帮你更快做出高质量 MCP 服务器。

平台与服务
未扫描114.1k

Slack动图

by anthropics

Universal
热门

面向Slack的动图制作Skill,内置emoji/消息GIF的尺寸、帧率和色彩约束、校验与优化流程,适合把创意或上传图片快速做成可直接发送的Slack动画。

帮你快速做出适配 Slack 的动图,内置约束规则和校验工具,少踩上传与播放坑,做表情包和演示都更省心。

平台与服务
未扫描114.1k

MCP服务构建器

by alirezarezvani

Universal
热门

从 OpenAPI 一键生成 Python/TypeScript MCP server 脚手架,并校验 tool schema、命名规范与版本兼容性,适合把现有 REST API 快速发布成可生产演进的 MCP 服务。

帮你快速搭建 MCP 服务与后端 API,脚手架完善、扩展顺手,尤其适合想高效验证服务能力的开发者。

平台与服务
未扫描10.2k

相关 MCP Server

Slack 消息

编辑精选

by Anthropic

热门

Slack 是让 AI 助手直接读写你的 Slack 频道和消息的 MCP 服务器。

这个服务器解决了团队协作中需要 AI 实时获取 Slack 信息的痛点,特别适合开发团队让 Claude 帮忙汇总频道讨论或发送通知。不过,它目前只是参考实现,文档有限,不建议在生产环境直接使用——更适合开发者学习 MCP 如何集成第三方服务。

平台与服务
83.4k

by netdata

热门

io.github.netdata/mcp-server 是让 AI 助手实时监控服务器指标和日志的 MCP 服务器。

这个工具解决了运维人员需要手动检查系统状态的痛点,最适合 DevOps 团队让 Claude 自动分析性能数据。不过,它依赖 NetData 的现有部署,如果你没用过这个监控平台,得先花时间配置。

平台与服务
78.4k

by d4vinci

热门

Scrapling MCP Server 是专为现代网页设计的智能爬虫工具,支持绕过 Cloudflare 等反爬机制。

这个工具解决了爬取动态网页和反爬网站时的头疼问题,特别适合需要批量采集电商价格或新闻数据的开发者。不过,它依赖外部浏览器引擎,资源消耗较大,不适合轻量级任务。

平台与服务
35.4k

评论