什么是 Queue Pilot?
用于检查 RabbitMQ 与 Kafka 消息的 MCP server,并支持结合 JSON Schema 进行校验。
README
Queue Pilot
MCP server for message queue development — combines message inspection with JSON Schema validation. Supports RabbitMQ and Kafka.
Designed for integration projects where multiple teams communicate via message brokers: inspect queues/topics, view messages, and validate payloads against agreed-upon schemas — all from your AI assistant.
<p align="center"> <img src="docs/queue-pilot-diagram.svg" alt="Queue Pilot architecture: MCP clients connect to Queue Pilot, which interfaces with RabbitMQ and Kafka" width="720"> </p>Features
- Multi-broker support — RabbitMQ and Apache Kafka via a unified adapter interface
- Message Inspection — Browse queues/topics, peek at messages without consuming them
- Schema Validation — Validate message payloads against JSON Schema definitions
- Combined Inspection —
inspect_queuepeeks messages AND validates each against its schema - Validated Publishing —
publish_messagevalidates against a schema before sending — invalid messages never hit the broker - Queue Management — Create queues/topics, bindings, and purge messages for dev/test workflows
- Broker Info — List exchanges, bindings, consumer groups, and partition details
Prerequisites
- Node.js >= 22 — Required runtime (check with
node --version) - A message broker:
- RabbitMQ with the management plugin enabled (HTTP API on port 15672), or
- Apache Kafka (requires
@confluentinc/kafka-javascriptas peer dependency)
- An MCP-compatible client — Claude Code, Claude Desktop, Cursor, VS Code (Copilot), Windsurf, etc.
Quick Start
1. Define your schemas
Create JSON Schema files in a directory:
schemas/order.created.json:
{
"$id": "order.created",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Order Created",
"description": "Emitted when a new order is placed",
"version": "1.0.0",
"type": "object",
"required": ["orderId", "amount"],
"properties": {
"orderId": { "type": "string" },
"amount": { "type": "number" }
}
}
2. Add to your MCP client
Generate the config for your client with queue-pilot init:
npx queue-pilot init --schemas /absolute/path/to/your/schemas --client <name>
Supported clients: claude-code, claude-desktop, vscode, cursor, windsurf. Omit --client for generic JSON.
For Kafka, add --broker kafka. The generated config automatically includes the required @confluentinc/kafka-javascript peer dependency.
Non-default credentials are included as environment variables to avoid exposing secrets in ps output:
npx queue-pilot init --schemas ./schemas --rabbitmq-user admin --rabbitmq-pass secret
Run npx queue-pilot init --help for all options including Kafka SASL authentication.
<details> <summary>Manual configuration (without init)</summary>Windows note: If
npxfails to resolve the package, trycmd /c npx queue-pilot init ....
Add the following server configuration to your MCP client:
RabbitMQ:
{
"mcpServers": {
"queue-pilot": {
"command": "npx",
"args": [
"-y",
"queue-pilot",
"--schemas", "/absolute/path/to/your/schemas"
]
}
}
}
Kafka:
{
"mcpServers": {
"queue-pilot": {
"command": "npx",
"args": [
"-y",
"--package=@confluentinc/kafka-javascript",
"--package=queue-pilot",
"queue-pilot",
"--schemas", "/absolute/path/to/your/schemas",
"--broker", "kafka"
],
"env": {
"KAFKA_BROKERS": "localhost:9092"
}
}
}
}
Schema path tip: Use an absolute path for
--schemas. Relative paths resolve from the MCP client's working directory, which may not be your project root.
| Client | Config file |
|---|---|
| Claude Code | .mcp.json (project) or ~/.claude.json (user) |
| Claude Desktop | claude_desktop_config.json |
| Cursor | .cursor/mcp.json |
| VS Code (Copilot) | .vscode/mcp.json (uses "servers" instead of "mcpServers") |
| Windsurf | ~/.codeium/windsurf/mcp_config.json |
{
"mcpServers": {
"queue-pilot": {
"command": "npx",
"args": [
"tsx",
"src/index.ts",
"--schemas", "./schemas"
],
"cwd": "/path/to/queue-pilot"
}
}
}
3. Use it
Ask your assistant things like:
- "Which queues are there and how many messages do they have?"
- "Show me the messages in the orders queue"
- "Inspect the registration queue and check if all messages are valid"
- "What schemas are available?"
- "Validate this message against the order.created schema"
- "Publish an order.created event to the events exchange"
- "Create a queue called dead-letters and bind it to the events exchange"
- "Purge all messages from the orders queue"
- "List all consumer groups" (Kafka)
- "Show me the partition details for the orders topic" (Kafka)
Universal tools (all brokers)
| Tool | Description |
|---|---|
list_schemas | List all loaded message schemas |
get_schema | Get the full definition of a specific schema |
validate_message | Validate a JSON message against a schema |
list_queues | List all queues/topics with message counts |
peek_messages | View messages in a queue/topic without consuming them |
inspect_queue | Peek messages + validate each against its schema |
get_overview | Get broker cluster overview |
check_health | Check broker health status |
get_queue | Get detailed information about a specific queue/topic |
list_consumers | List consumers (RabbitMQ) or consumer groups (Kafka) |
publish_message | Publish a message with optional schema validation gate |
purge_queue | Remove all messages from a queue/topic |
create_queue | Create a new queue/topic |
delete_queue | Delete a queue/topic |
RabbitMQ-specific tools
| Tool | Description |
|---|---|
list_exchanges | List all RabbitMQ exchanges |
create_exchange | Create a new exchange |
delete_exchange | Delete an exchange |
list_bindings | List bindings between exchanges and queues |
create_binding | Bind a queue to an exchange with a routing key |
delete_binding | Delete a binding |
list_connections | List all client connections to the broker |
Kafka-specific tools
| Tool | Description |
|---|---|
list_consumer_groups | List all consumer groups with their state |
describe_consumer_group | Show members, assignments, and state of a consumer group |
list_partitions | Show partition details for a topic (leader, replicas, ISR) |
get_offsets | Show earliest/latest offsets per partition |
Prompts
Pre-built workflow templates that guide your AI assistant through multi-step operations.
| Prompt | Parameters | Description |
|---|---|---|
debug-flow | exchange, queue | Trace bindings from exchange to queue, peek messages, and validate each against its schema |
health-report | (none) | Check broker health, get cluster overview, flag queues with backed-up messages |
schema-compliance | queue (optional) | Peek messages and validate each against its schema — for one queue or all queues |
Usage example (in any MCP-compatible client):
"Use the debug-flow prompt for exchange 'events' and queue 'orders'"
Resources
Each loaded schema is exposed as a readable MCP resource at schema:///<schema-name>.
Clients that support MCP resources can read schema definitions directly without calling tools. For example, a schema loaded from order.created.json is available at schema:///order.created.
Schema Format
Schemas follow JSON Schema draft-07 with a few conventions:
$id— Message type identifier (matches thetypeproperty on messages)version— Schema version (custom field, not validated by JSON Schema)- Standard JSON Schema validation including
required,properties,formatetc.
Schema matching: when inspecting a queue, the message's type property is used to find the corresponding schema by $id.
Configuration
CLI arguments take priority over environment variables, which take priority over defaults.
| Setting | CLI flag | Env var | Default |
|---|---|---|---|
| Schema directory | --schemas | — | (required) |
| Broker type | --broker | — | rabbitmq |
| RabbitMQ URL | --rabbitmq-url | RABBITMQ_URL | http://localhost:15672 |
| RabbitMQ user | --rabbitmq-user | RABBITMQ_USER | guest |
| RabbitMQ password | --rabbitmq-pass | RABBITMQ_PASS | guest |
| Kafka brokers | --kafka-brokers | KAFKA_BROKERS | localhost:9092 |
| Kafka client ID | --kafka-client-id | KAFKA_CLIENT_ID | queue-pilot |
| SASL mechanism | --kafka-sasl-mechanism | KAFKA_SASL_MECHANISM | (none) |
| SASL username | --kafka-sasl-username | KAFKA_SASL_USERNAME | (none) |
| SASL password | --kafka-sasl-password | KAFKA_SASL_PASSWORD | (none) |
Use environment variables in MCP client env blocks to avoid exposing credentials in ps output.
Development
npm install
npm test # Unit tests
npm run test:coverage # Coverage report
npm run build # TypeScript compilation
npm run typecheck # Type check
# Integration tests (requires RabbitMQ)
docker compose up -d --wait
npm run test:integration
Tech Stack
- TypeScript (strict mode, ESM)
- MCP SDK v1.26.0
- Ajv for JSON Schema validation
- Zod for MCP tool parameter definitions
- Vitest for testing
- RabbitMQ Management HTTP API
- Confluent Kafka JavaScript (optional, for Kafka support)
License
MIT
常见问题
Queue Pilot 是什么?
用于检查 RabbitMQ 与 Kafka 消息的 MCP server,并支持结合 JSON Schema 进行校验。
相关 Skills
MCP构建
by anthropics
聚焦高质量 MCP Server 开发,覆盖协议研究、工具设计、错误处理与传输选型,适合用 FastMCP 或 MCP SDK 对接外部 API、封装服务能力。
✎ 想让 LLM 稳定调用外部 API,就用 MCP构建:从 Python 到 Node 都有成熟指引,帮你更快做出高质量 MCP 服务器。
Slack动图
by anthropics
面向Slack的动图制作Skill,内置emoji/消息GIF的尺寸、帧率和色彩约束、校验与优化流程,适合把创意或上传图片快速做成可直接发送的Slack动画。
✎ 帮你快速做出适配 Slack 的动图,内置约束规则和校验工具,少踩上传与播放坑,做表情包和演示都更省心。
MCP服务构建器
by alirezarezvani
从 OpenAPI 一键生成 Python/TypeScript MCP server 脚手架,并校验 tool schema、命名规范与版本兼容性,适合把现有 REST API 快速发布成可生产演进的 MCP 服务。
✎ 帮你快速搭建 MCP 服务与后端 API,脚手架完善、扩展顺手,尤其适合想高效验证服务能力的开发者。
相关 MCP Server
Slack 消息
编辑精选by Anthropic
Slack 是让 AI 助手直接读写你的 Slack 频道和消息的 MCP 服务器。
✎ 这个服务器解决了团队协作中需要 AI 实时获取 Slack 信息的痛点,特别适合开发团队让 Claude 帮忙汇总频道讨论或发送通知。不过,它目前只是参考实现,文档有限,不建议在生产环境直接使用——更适合开发者学习 MCP 如何集成第三方服务。
by netdata
io.github.netdata/mcp-server 是让 AI 助手实时监控服务器指标和日志的 MCP 服务器。
✎ 这个工具解决了运维人员需要手动检查系统状态的痛点,最适合 DevOps 团队让 Claude 自动分析性能数据。不过,它依赖 NetData 的现有部署,如果你没用过这个监控平台,得先花时间配置。
by d4vinci
Scrapling MCP Server 是专为现代网页设计的智能爬虫工具,支持绕过 Cloudflare 等反爬机制。
✎ 这个工具解决了爬取动态网页和反爬网站时的头疼问题,特别适合需要批量采集电商价格或新闻数据的开发者。不过,它依赖外部浏览器引擎,资源消耗较大,不适合轻量级任务。