A Model Context Protocol (MCP) server for Apache Kafka implemented in Go, leveraging franz-go and mcp-go.
This server provides an implementation for interacting with Kafka via the MCP protocol, enabling LLM models to perform common Kafka operations through a standardized interface.
The Kafka MCP Server bridges the gap between LLM models and Apache Kafka, allowing them to:
All through the standardized Model Context Protocol (MCP).
The easiest way to install kafka-mcp-server is using Homebrew:
# Add the tap repository
brew tap tuannvm/mcp
# Install kafka-mcp-server
brew install kafka-mcp-server
To update to the latest version:
brew update && brew upgrade kafka-mcp-server
# Clone the repository
git clone https://github.com/tuannvm/kafka-mcp-server.git
cd kafka-mcp-server
# Build the server
go build -o kafka-mcp-server ./cmd/server
This MCP server can be integrated with several AI applications:
To integrate with MCP-compatible clients, add this configuration to your client’s settings:
{
"mcpServers": {
"kafka": {
"command": "kafka-mcp-server",
"env": {
"KAFKA_BROKERS": "localhost:9092",
"KAFKA_CLIENT_ID": "kafka-mcp-server",
"MCP_TRANSPORT": "stdio",
"KAFKA_SASL_MECHANISM": "",
"KAFKA_SASL_USER": "",
"KAFKA_SASL_PASSWORD": "",
"KAFKA_TLS_ENABLE": "false",
"KAFKA_TLS_INSECURE_SKIP_VERIFY": "false"
}
}
}
}
For secured environments:
{
"mcpServers": {
"kafka": {
"command": "kafka-mcp-server",
"env": {
"KAFKA_BROKERS": "kafka-broker-1:9092,kafka-broker-2:9092",
"KAFKA_CLIENT_ID": "kafka-mcp-server",
"MCP_TRANSPORT": "stdio",
"KAFKA_SASL_MECHANISM": "scram-sha-512",
"KAFKA_SASL_USER": "kafka-user",
"KAFKA_SASL_PASSWORD": "kafka-password",
"KAFKA_TLS_ENABLE": "true",
"KAFKA_TLS_INSECURE_SKIP_VERIFY": "false"
}
}
}
}
To use the Docker image instead of a local binary:
{
"mcpServers": {
"kafka": {
"command": "docker",
"args": [
"run",
"--rm",
"-i",
"-e", "KAFKA_BROKERS=kafka-broker:9092",
"-e", "KAFKA_CLIENT_ID=kafka-mcp-server",
"-e", "MCP_TRANSPORT=stdio",
"-e", "KAFKA_SASL_MECHANISM=",
"-e", "KAFKA_SASL_USER=",
"-e", "KAFKA_SASL_PASSWORD=",
"-e", "KAFKA_TLS_ENABLE=false",
"ghcr.io/tuannvm/kafka-mcp-server:latest"
],
"env": {}
}
}
}
Note: If connecting to Kafka running on your host machine from Docker, use
host.docker.internal
as the broker address on macOS and Windows. For Linux, use--network=host
in your Docker run command or the host’s actual IP address.
To use with Cursor, create or edit ~/.cursor/mcp.json
:
{
"mcpServers": {
"kafka": {
"command": "kafka-mcp-server",
"args": [],
"env": {
"KAFKA_BROKERS": "localhost:9092",
"KAFKA_CLIENT_ID": "kafka-mcp-server",
"MCP_TRANSPORT": "stdio",
"KAFKA_SASL_MECHANISM": "",
"KAFKA_SASL_USER": "",
"KAFKA_SASL_PASSWORD": "",
"KAFKA_TLS_ENABLE": "false"
}
}
}
}
For HTTP+SSE transport mode (if supported):
{
"mcpServers": {
"kafka-http": {
"url": "http://localhost:9097/sse"
}
}
}
Then start the server in a separate terminal with:
MCP_TRANSPORT=http KAFKA_BROKERS=localhost:9092 kafka-mcp-server
To use with Claude Desktop, edit your Claude configuration file:
~/Library/Application Support/Claude/claude_desktop_config.json
%APPDATA%\Claude\claude_desktop_config.json
{
"mcpServers": {
"kafka": {
"command": "kafka-mcp-server",
"args": [],
"env": {
"KAFKA_BROKERS": "localhost:9092",
"KAFKA_CLIENT_ID": "kafka-mcp-server",
"MCP_TRANSPORT": "stdio",
"KAFKA_SASL_MECHANISM": "",
"KAFKA_SASL_USER": "",
"KAFKA_SASL_PASSWORD": "",
"KAFKA_TLS_ENABLE": "false"
}
}
}
}
After updating the configuration, restart Claude Desktop. You should see the Kafka MCP tools available in the tools menu.
To use with Windsurf, create or edit your mcp_config.json
:
{
"mcpServers": {
"kafka": {
"command": "kafka-mcp-server",
"args": [],
"env": {
"KAFKA_BROKERS": "localhost:9092",
"KAFKA_CLIENT_ID": "kafka-mcp-server",
"MCP_TRANSPORT": "stdio",
"KAFKA_SASL_MECHANISM": "",
"KAFKA_SASL_USER": "",
"KAFKA_SASL_PASSWORD": "",
"KAFKA_TLS_ENABLE": "false"
}
}
}
}
Restart Windsurf to apply the changes. The Kafka MCP tools will be available to the AI assistant.
To use with ChatWise, follow these steps:
kafka
(or any name you prefer)kafka-mcp-server
KAFKA_BROKERS=localhost:9092
KAFKA_CLIENT_ID=kafka-mcp-server
MCP_TRANSPORT=stdio
KAFKA_SASL_MECHANISM=
KAFKA_SASL_USER=
KAFKA_SASL_PASSWORD=
KAFKA_TLS_ENABLE=false
Alternatively, you can import the configuration using the import option.
Managing MCP server configurations across multiple clients can become challenging. mcpenetes is a dedicated tool that makes this process significantly easier:
# Install mcpenetes
go install github.com/tuannvm/mcpenetes@latest
# Search for available MCP servers including kafka-mcp-server
mcpenetes search
# Apply kafka-mcp-server configuration to all your clients at once
mcpenetes apply
# Load a configuration from your clipboard
mcpenetes load
With mcpenetes, you can maintain multiple Kafka configurations (development, production, etc.) and switch between them instantly across all your clients (Cursor, Claude Desktop, Windsurf, ChatWise) without manually editing each client’s configuration files.
The server exposes tools for Kafka interaction:
Produces a single message to a Kafka topic with optional key.
Sample Prompt:
“Send a new order update to the orders topic with order ID 12345.”
Example:
{
"topic": "orders",
"key": "12345",
"value": "{\"order_id\":\"12345\",\"status\":\"shipped\"}"
}
Response:
"Message produced successfully to topic orders"
Consumes a batch of messages from one or more Kafka topics.
Sample Prompt:
“Retrieve the latest messages from the customer-events topic so I can see recent customer activity.”
Example:
{
"topics": ["customer-events"],
"max_messages": 5
}
Response:
[
{
"topic": "customer-events",
"partition": 0,
"offset": 1042,
"timestamp": 1650123456789,
"key": "customer-123",
"value": "{\"customer_id\":\"123\",\"action\":\"login\",\"timestamp\":\"2023-04-16T12:34:56Z\"}"
},
// Additional messages...
]
Lists the configured Kafka broker addresses the server is connected to.
Sample Prompt:
“What Kafka brokers do we have available in our cluster?”
Example:
{}
Response:
[
"kafka-broker-1:9092",
"kafka-broker-2:9092",
"kafka-broker-3:9092"
]
Provides detailed metadata for a specific Kafka topic.
Sample Prompt:
“Show me the configuration and partition details for our orders topic.”
Example:
{
"topic_name": "orders"
}
Response:
{
"name": "orders",
"partitions": [
{
"partitionID": 0,
"leader": 1,
"replicas": [1, 2, 3],
"ISR": [1, 2, 3],
"errorCode": 0
},
{
"partitionID": 1,
"leader": 2,
"replicas": [2, 3, 1],
"ISR": [2, 3, 1],
"errorCode": 0
}
],
"isInternal": false
}
Enumerates active consumer groups known by the Kafka cluster.
Sample Prompt:
“What consumer groups are currently active in our Kafka cluster?”
Example:
{}
Response:
[
{
"groupID": "order-processor",
"state": "Stable",
"errorCode": 0
},
{
"groupID": "analytics-pipeline",
"state": "Stable",
"errorCode": 0
}
]
Shows details for a specific consumer group, including state, members, and partition offsets.
Sample Prompt:
“Tell me about the order-processor consumer group. Are there any lagging consumers?”
Example:
{
"group_id": "order-processor",
"include_offsets": true
}
Response:
{
"groupID": "order-processor",
"state": "Stable",
"members": [
{
"memberID": "consumer-1-uuid",
"clientID": "consumer-1",
"clientHost": "10.0.0.101",
"assignments": [
{"topic": "orders", "partitions": [0, 2, 4]}
]
},
{
"memberID": "consumer-2-uuid",
"clientID": "consumer-2",
"clientHost": "10.0.0.102",
"assignments": [
{"topic": "orders", "partitions": [1, 3, 5]}
]
}
],
"offsets": [
{
"topic": "orders",
"partition": 0,
"commitOffset": 10045,
"lag": 5
},
// More partitions...
],
"errorCode": 0
}
Fetches configuration entries for a specific resource (topic or broker).
Sample Prompt:
“What’s the retention configuration for our clickstream topic?”
Example:
{
"resource_type": "topic",
"resource_name": "clickstream",
"config_keys": ["retention.ms", "retention.bytes"]
}
Response:
{
"configs": [
{
"name": "retention.ms",
"value": "604800000",
"source": "DYNAMIC_TOPIC_CONFIG",
"isSensitive": false,
"isReadOnly": false
},
{
"name": "retention.bytes",
"value": "1073741824",
"source": "DYNAMIC_TOPIC_CONFIG",
"isSensitive": false,
"isReadOnly": false
}
]
}
Aggregates high-level cluster health data, including controller, brokers, topics, and partition status.
Sample Prompt:
“Give me an overview of our Kafka cluster health.”
Example:
{}
Response:
{
"brokerCount": 3,
"controllerID": 1,
"topicCount": 24,
"partitionCount": 120,
"underReplicatedPartitionsCount": 0,
"offlinePartitionsCount": 0,
"offlineBrokerIDs": []
}
The server provides the following resources that can be accessed through the MCP protocol:
Provides a summary of Kafka cluster health and metrics.
Example Response:
{
"timestamp": "2023-08-15T12:34:56Z",
"broker_count": 3,
"controller_id": 1,
"topic_count": 24,
"partition_count": 120,
"under_replicated_partitions": 0,
"offline_partitions": 0,
"offline_broker_ids": [],
"health_status": "healthy"
}
Performs a comprehensive health assessment of the Kafka cluster.
Example Response:
{
"timestamp": "2023-08-15T12:34:56Z",
"broker_status": {
"total_brokers": 3,
"offline_brokers": 0,
"offline_broker_ids": [],
"status": "healthy"
},
"controller_status": {
"controller_id": 1,
"status": "healthy"
},
"partition_status": {
"total_partitions": 120,
"under_replicated_partitions": 0,
"offline_partitions": 0,
"status": "healthy"
},
"consumer_status": {
"total_groups": 5,
"groups_with_high_lag": 0,
"status": "healthy",
"error": ""
},
"overall_status": "healthy"
}
Provides a detailed report of under-replicated partitions in the cluster.
Example Response:
{
"timestamp": "2023-08-15T12:34:56Z",
"under_replicated_partition_count": 2,
"details": [
{
"topic": "orders",
"partition": 3,
"leader": 1,
"replica_count": 3,
"isr_count": 2,
"replicas": [1, 2, 3],
"isr": [1, 2],
"missing_replicas": [3]
},
{
"topic": "clickstream",
"partition": 5,
"leader": 2,
"replica_count": 3,
"isr_count": 2,
"replicas": [2, 3, 1],
"isr": [2, 1],
"missing_replicas": [3]
}
],
"recommendations": [
"Check broker health for any offline or struggling brokers",
"Verify network connectivity between brokers",
"Monitor disk space on broker nodes",
"Review broker logs for detailed error messages",
"Consider increasing replication timeouts if network is slow"
]
}
Analyzes consumer group lag across the cluster. Accepts an optional “threshold” query parameter to set the lag threshold.
Example Response:
{
"timestamp": "2023-08-15T12:34:56Z",
"lag_threshold": 1000,
"group_count": 3,
"group_summary": [
{
"group_id": "order-processor",
"state": "Stable",
"member_count": 2,
"topic_count": 1,
"total_lag": 15420,
"has_high_lag": true
},
{
"group_id": "analytics-pipeline",
"state": "Stable",
"member_count": 3,
"topic_count": 2,
"total_lag": 520,
"has_high_lag": false
}
],
"high_lag_details": [
{
"group_id": "order-processor",
"topic": "orders",
"partition": 2,
"current_offset": 1045822,
"log_end_offset": 1061242,
"lag": 15420
}
],
"recommendations": [
"Check consumer instances for errors or slowdowns",
"Consider scaling up consumer groups with high lag",
"Review consumer configuration settings",
"Examine processing bottlenecks in consumer application logic"
]
}
The server includes the following pre-configured prompts for Kafka operations and diagnostics:
Provides a summary of Kafka cluster health and metrics.
Arguments:
cluster
(required): The Kafka cluster nameExample Response:
# Kafka Cluster Overview
**Time**: 2023-08-15T12:34:56Z
- **Broker Count**: 3
- **Active Controller ID**: 1
- **Total Topics**: 24
- **Total Partitions**: 120
- **Under-Replicated Partitions**: 0
- **Offline Partitions**: 0
**Overall Status**: ✅ Healthy
Runs a comprehensive health check on the Kafka cluster.
Arguments:
cluster
(required): The Kafka cluster nameExample Response:
# Kafka Cluster Health Check Report
**Time**: 2023-08-15T12:34:56Z
## Broker Status
- ✅ **All 3 brokers are online**
## Controller Status
- ✅ **Active controller**: Broker 1
## Partition Health
- ✅ **All 120 partitions are online**
- ✅ **No under-replicated partitions detected**
## Consumer Group Health
- ✅ **5 consumer groups are active**
- ✅ **No consumer groups with significant lag detected**
## Overall Health Assessment
✅ **HEALTHY**: All systems are operating normally.
Lists topics and partitions where ISR count is less than replication factor.
Arguments:
cluster
(required): The Kafka cluster nameExample Response:
# Under-Replicated Partitions Report
**Time**: 2023-08-15T12:34:56Z
⚠️ **Found 2 under-replicated partitions**
| Topic | Partition | Leader | Replica Count | ISR Count | Missing Replicas |
|:------|----------:|-------:|--------------:|----------:|:-----------------|
| orders | 3 | 1 | 3 | 2 | 3 |
| clickstream | 5 | 2 | 3 | 2 | 3 |
## Possible Causes
Under-replicated partitions occur when one or more replicas are not in sync with the leader. Common causes include:
- **Broker failure or network partition**
- **High load on brokers**
- **Insufficient disk space**
- **Network bandwidth limitations**
- **Misconfigured topic replication factor**
## Recommendations
1. **Check broker health** for any offline or struggling brokers
2. **Verify network connectivity** between brokers
3. **Monitor disk space** on broker nodes
4. **Review broker logs** for detailed error messages
5. **Consider increasing replication timeouts** if network is slow
Provides a detailed report on consumer lag across all consumer groups.
Arguments:
cluster
(required): The Kafka cluster namethreshold
(optional): Lag threshold for highlighting high lag (default: 1000)Example Response:
# Kafka Consumer Lag Report
**Time**: 2023-08-15T12:34:56Z
**Lag Threshold**: 1000 messages
Found 3 consumer group(s)
## Consumer Group Summary
| Group ID | State | Members | Topics | Total Lag | High Lag |
|:---------|:------|--------:|-------:|----------:|:---------|
| order-processor | Stable | 2 | 1 | 15,420 | ⚠️ Yes |
| analytics-pipeline | Stable | 3 | 2 | 520 | No |
| monitoring | Stable | 1 | 3 | 0 | No |
## High Lag Details
### Group: order-processor
| Topic | Partition | Current Offset | Log End Offset | Lag |
|:------|----------:|--------------:|--------------:|----:|
| orders | 2 | 1,045,822 | 1,061,242 | 15,420 |
## Recommendations
1. **Check consumer instances** for errors or slowdowns
2. **Scale up consumer groups** with high lag
3. **Review consumer configuration** settings
4. **Examine processing bottlenecks** in consumer application logic
Here’s an interaction example showing how an AI assistant might use these tools to help a user:
User Query: “We need to investigate why our order processing pipeline is falling behind. Can you help me check the consumer lag?”
AI Assistant’s workflow:
> Using list_consumer_groups tool
> Discovers "order-processor" group
> Using describe_consumer_group tool with group_id "order-processor" and include_offsets=true
> Discovers lag on several partitions
> Using cluster_overview tool
> Confirms no under-replicated partitions or offline brokers
> Using consumer_lag_report prompt with threshold=1000
> Provides detailed analysis of lag across consumer groups
Based on my analysis, your order-processor consumer group has significant lag on topic orders, partitions 2 and 3, with lag values of 15,420 and 12,305 messages respectively.
The cluster itself is healthy with no under-replicated partitions, suggesting this is an issue with the consumer application rather than Kafka infrastructure.
I recommend:
1. Check the consumer instances processing those partitions for slowdowns or errors
2. Consider scaling up your consumer group by adding more instances
3. Review the consumer configuration, particularly max.poll.records and max.poll.interval.ms
4. Examine any processing bottlenecks in the consumer logic
This seamless workflow demonstrates how the Kafka MCP tools enable LLM models to perform sophisticated diagnostics and provide actionable insights.
The server can be configured using the following environment variables:
Variable | Description | Default |
---|---|---|
KAFKA_BROKERS |
Comma-separated list of Kafka broker addresses | localhost:9092 |
KAFKA_CLIENT_ID |
Kafka client ID used for connections | kafka-mcp-server |
MCP_TRANSPORT |
MCP transport method (stdio/http) | stdio |
KAFKA_SASL_MECHANISM |
SASL mechanism: plain , scram-sha-256 , scram-sha-512 , or "" (disabled) |
"" |
KAFKA_SASL_USER |
Username for SASL authentication | "" |
KAFKA_SASL_PASSWORD |
Password for SASL authentication | "" |
KAFKA_TLS_ENABLE |
Enable TLS for Kafka connection (true or false ) |
false |
KAFKA_TLS_INSECURE_SKIP_VERIFY |
Skip TLS certificate verification (true or false ) |
false |
Security Note: When using
KAFKA_TLS_INSECURE_SKIP_VERIFY=true
, the server will skip TLS certificate verification. This should only be used in development or testing environments, or when using self-signed certificates.
The server is designed with enterprise-grade security in mind:
Comprehensive test coverage ensures reliability:
# Run all tests (requires Docker for integration tests)
go test ./...
# Run tests excluding integration tests
go test -short ./...
# Run integration tests with specific Kafka brokers
export KAFKA_BROKERS="your-broker:9092"
export SKIP_KAFKA_TESTS="false"
go test ./kafka -v -run Test
Contributions are welcome! Please feel free to submit a Pull Request.
This project is licensed under the MIT License - see the LICENSE file for details.