Kafka Testing Guide
Overview
Cuppet Core now includes comprehensive Apache Kafka testing support, allowing you to test event-driven architectures, microservices communication, and streaming data pipelines using Cucumber BDD scenarios.
Built on KafkaJS , this integration provides a robust and developer-friendly way to test Kafka producers, consumers, and message flows in your applications.
Features
-
Hook-based Connection Management
- Automatic connection/cleanup with
@kafkatag - Topic Subscriptions - Subscribe to single or multiple Kafka topics
- Message Publishing - Send text and JSON messages with optional keys
- Message Keys - Support for keyed messages for partitioning
- JSON Message Validation - Assert message properties, nested values, and types
- Variable Storage - Save and reuse message data across scenarios
- SASL Authentication - Support for PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512
- SSL/TLS Support - Secure connections to Kafka brokers
Setup
Install Dependencies
KafkaJS is included as a dependency in Cuppet Core. No additional installation required.
Configure Kafka Broker
Add your Kafka broker configuration to
config/default.json
:
{
"kafka": {
"brokers": ["localhost:9092"],
"clientId": "cuppet-test",
"connectionTimeout": 5000,
"requestTimeout": 30000,
"logLevel": 1
}
}
Configuration Options:
| Option | Description | Default |
|---|---|---|
brokers |
Array of Kafka broker addresses | ["localhost:9092"] |
clientId |
Client identifier for Kafka | Auto-generated |
connectionTimeout |
Connection timeout in milliseconds | 5000 |
requestTimeout |
Request timeout in milliseconds | 30000 |
logLevel |
KafkaJS log level (0=NOTHING, 1=ERROR, 2=WARN, 4=INFO, 5=DEBUG) |
1
(ERROR)
|
SASL Authentication Configuration:
{
"kafka": {
"brokers": ["kafka.example.com:9093"],
"sasl": {
"mechanism": "plain",
"username": "your-username",
"password": "your-password"
},
"ssl": true
}
}
Supported SASL Mechanisms:
-
plain- SASL/PLAIN authentication -
scram-sha-256- SASL/SCRAM-SHA-256 -
scram-sha-512- SASL/SCRAM-SHA-512
Usage
Basic Scenario Structure
Tag your scenarios with
@kafka
to automatically connect to the broker:
@kafka
Feature: Kafka Message Testing
Scenario: Publish and consume a simple message
Given I subscribe to Kafka topic "test-topic"
Then I listen for a kafka message on the subscribed topics
When I send a kafka message to topic "test-topic" with value "Hello Kafka"
Then I should receive a kafka message with value "Hello Kafka"
And I unsubscribe from all Kafka topics
Available Step Definitions
Subscription Steps
# Subscribe to a single topic
Given I subscribe to Kafka topic "orders"
# Subscribe to multiple topics (comma-separated)
Given I subscribe to Kafka topic/topics "orders,payments,shipments"
# Start listening for messages
Then I listen for a kafka message on the subscribed topics
# Unsubscribe from all topics
Given I unsubscribe from all Kafka topics
Publishing Steps
# Send a simple text message
When I send a kafka message to topic "test-topic" with value "Hello World"
# Send a message with a key
When I send a kafka message to topic "orders" with value "Order data" and key "order-123"
# Send a JSON message
When I send a kafka message to topic "events" with JSON value
"""
{
"eventType": "user.registered",
"userId": "12345",
"timestamp": 1234567890
}
"""
# Send a JSON message with a key
When I send a kafka message to topic "users" with key "user-456" and JSON value
"""
{
"name": "John Doe",
"email": "john@example.com"
}
"""
# Use saved variables in messages
When I send a kafka message to topic "notifications" with value "%savedMessage%"
Message Validation Steps
# Validate simple message value
Then I should receive a kafka message with value "Expected message"
# Validate message with key and value
Then I should receive a kafka message with key "order-123" and value "Order confirmed"
# Validate JSON property value
Then I should receive a kafka message with property "eventType" and value "user.registered"
# Validate JSON property with key
Then I should receive a kafka message with property "status" and value "success" and key "order-789"
# Validate property does NOT match a value
Then I should receive a kafka message with property "status" which value does not match "failed"
# Validate property does NOT match with key
Then I should receive a kafka message with property "status" and key "order-123" which value does not match "pending"
Nested Property Access:
Use dot notation to access nested JSON properties:
Then I should receive a kafka message with property "user.profile.email" and value "test@example.com"
Then I should receive a kafka message with property "metadata.timestamp" and value "1234567890"
Examples
Example 1: Simple Publish and Subscribe
@kafka
Feature: Basic Kafka Testing
Scenario: Simple message publish and receive
Given I subscribe to Kafka topic "test-topic"
Then I listen for a kafka message on the subscribed topics
When I send a kafka message to topic "test-topic" with value "Hello Kafka"
Then I should receive a kafka message with value "Hello Kafka"
And I unsubscribe from all Kafka topics
Example 2: JSON Message Validation
@kafka
Feature: Kafka JSON Message Testing
Scenario: Validate JSON event structure
Given I subscribe to Kafka topic "events"
Then I listen for a kafka message on the subscribed topics
When I send a kafka message to topic "events" with JSON value
"""
{
"eventType": "order.created",
"orderId": "ORD-12345",
"amount": 99.99,
"currency": "USD"
}
"""
Then I should receive a kafka message with property "eventType" and value "order.created"
And I should receive a kafka message with property "orderId" and value "ORD-12345"
And I should receive a kafka message with property "amount" and value "99.99"
And I unsubscribe from all Kafka topics
Example 3: Messages with Keys
@kafka
Feature: Kafka Keyed Messages
Scenario: Send and validate keyed messages
Given I subscribe to Kafka topic "user-events"
Then I listen for a kafka message on the subscribed topics
When I send a kafka message to topic "user-events" with key "user-123" and JSON value
"""
{
"action": "login",
"timestamp": 1234567890
}
"""
Then I should receive a kafka message with property "action" and value "login" and key "user-123"
And I unsubscribe from all Kafka topics
Example 4: Complex Nested JSON Validation
@kafka
Feature: Complex Kafka Message Validation
Scenario: Validate nested JSON properties
Given I subscribe to Kafka topic "orders"
Then I listen for a kafka message on the subscribed topics
When I send a kafka message to topic "orders" with JSON value
"""
{
"orderId": "ORD-789",
"customer": {
"id": "CUST-456",
"name": "Jane Smith",
"contact": {
"email": "jane@example.com",
"phone": "+1234567890"
}
},
"items": [
{
"productId": "PROD-001",
"quantity": 2,
"price": 29.99
}
],
"total": 59.98
}
"""
Then I should receive a kafka message with property "orderId" and value "ORD-789"
And I should receive a kafka message with property "customer.id" and value "CUST-456"
And I should receive a kafka message with property "customer.name" and value "Jane Smith"
And I should receive a kafka message with property "customer.contact.email" and value "jane@example.com"
And I should receive a kafka message with property "total" and value "59.98"
And I unsubscribe from all Kafka topics
Example 5: Multiple Topics
@kafka
Feature: Multiple Kafka Topics
Scenario: Subscribe to multiple topics
Given I subscribe to Kafka topic/topics "orders,payments,notifications"
Then I listen for a kafka message on the subscribed topics
When I send a kafka message to topic "orders" with value "New order"
When I send a kafka message to topic "payments" with value "Payment received"
When I send a kafka message to topic "notifications" with value "Order confirmed"
And I unsubscribe from all Kafka topics
Example 6: Using Variables
@kafka @api
Feature: Kafka with Variable Storage
Scenario: Use API response in Kafka message
# Make API call and save response
Given I set the request body to '{"productName": "Widget"}'
When I send a "POST" request to "/api/products"
Then the response code should be "201"
And I remember the value of the "id" property as "productId"
# Use saved variable in Kafka message
Given I subscribe to Kafka topic "product-events"
Then I listen for a kafka message on the subscribed topics
When I send a kafka message to topic "product-events" with JSON value
"""
{
"eventType": "product.created",
"productId": "%productId%"
}
"""
Then I should receive a kafka message with property "productId" and value "%productId%"
And I unsubscribe from all Kafka topics
Testing Different Kafka Brokers
Local Kafka (Docker)
# Start Kafka with Docker Compose
docker-compose up -d
# Or use Confluent Platform
docker run -d \
--name kafka \
-p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
confluentinc/cp-kafka:latest
{
"kafka": {
"brokers": ["localhost:9092"],
"clientId": "cuppet-test"
}
}
Confluent Cloud
{
"kafka": {
"brokers": ["pkc-xxxxx.us-east-1.aws.confluent.cloud:9092"],
"clientId": "cuppet-test",
"sasl": {
"mechanism": "plain",
"username": "YOUR_API_KEY",
"password": "YOUR_API_SECRET"
},
"ssl": true
}
}
AWS MSK (Managed Streaming for Kafka)
{
"kafka": {
"brokers": [
"b-1.mycluster.xxxxx.kafka.us-east-1.amazonaws.com:9092",
"b-2.mycluster.xxxxx.kafka.us-east-1.amazonaws.com:9092"
],
"clientId": "cuppet-test",
"ssl": true
}
}
Azure Event Hubs (Kafka-compatible)
{
"kafka": {
"brokers": ["your-namespace.servicebus.windows.net:9093"],
"clientId": "cuppet-test",
"sasl": {
"mechanism": "plain",
"username": "$ConnectionString",
"password": "Endpoint=sb://your-namespace.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=..."
},
"ssl": true
}
}
Programmatic Usage
You can also use Kafka functions programmatically in your code:
const { kafkaFunctions, KafkaManager } = require('@cuppet/core');
// Create and initialize Kafka manager
const kafkaManager = new KafkaManager({
brokers: ['localhost:9092'],
clientId: 'my-test-client'
});
await kafkaManager.initialize();
// Subscribe to topics
await kafkaFunctions.subscribeToTopics(kafkaManager, 'test-topic');
// Listen for messages
const messagePromise = kafkaFunctions.listenForMessage(kafkaManager);
// Send a message
await kafkaFunctions.sendMessage(kafkaManager, 'test-topic', '{"test": "data"}');
// Wait for and validate message
const message = await messagePromise;
await kafkaFunctions.validateSimpleMessage('{"test": "data"}');
// Cleanup
await kafkaManager.stop();
Troubleshooting
Connection Issues
- Verify Kafka broker is running and accessible
- Check broker addresses and ports in configuration
- Ensure firewall/network allows connections to Kafka ports
- Verify SASL credentials if using authentication
- Check SSL/TLS configuration for secure connections
Messages Not Received
- Always subscribe and start listening BEFORE publishing messages
- Check topic names (case-sensitive)
- Verify consumer group is not blocked by another consumer
- Check Kafka broker logs for errors
- Ensure topics exist (auto-creation may be disabled)
Authentication Errors
- Verify SASL mechanism matches broker configuration
- Check username and password are correct
- Ensure SSL is enabled when required by broker
- Verify client has permissions for topics
Reducing Log Verbosity
To suppress KafkaJS INFO logs, set the log level in your configuration:
{
"kafka": {
"logLevel": 1
}
}
Log levels: 0=NOTHING, 1=ERROR, 2=WARN, 4=INFO, 5=DEBUG
Best Practices
-
Use
@kafkatag: Let hooks manage connections automatically - Subscribe before publishing: Always set up subscriptions and listeners before sending messages
- Use message keys: For partitioning and ordering guarantees
-
Meaningful topic names:
Follow naming conventions (e.g.,
domain.entity.event) - Clean up connections: Always unsubscribe from topics after tests
- Use unique consumer groups: Auto-generated group IDs prevent conflicts
- JSON for complex data: Use JSON format for structured messages
- Variable storage: Save dynamic IDs and reuse them across steps
- Test isolation: Use unique topic names per test to avoid interference
Architecture
The Kafka implementation follows Cuppet's existing patterns:
-
features/app/managers/kafkaManager.js: Connection lifecycle management (like BrowserManager and MqttManager) -
src/kafkaFunctions.js: Core Kafka operations (like apiFunctions and mqttFunctions) -
features/app/stepDefinitions/kafkaSteps.js: Cucumber step definitions for Kafka testing -
features/app/hooks.js: Automatic connection via@kafkatag
Key Components
KafkaManager
Manages the Kafka client lifecycle, including:
- Client initialization with configuration
- Producer and consumer creation
- Connection management
- Graceful shutdown and cleanup
kafkaFunctions
Provides high-level operations:
- Topic subscription management
- Message publishing with keys
- Message consumption and listening
- JSON and text message validation
- Nested property validation
kafkaSteps
Cucumber step definitions that bridge Gherkin syntax to kafkaFunctions:
- Natural language step definitions
- DocString support for JSON messages
- Variable interpolation
- Integration with Cuppet's data storage
This design ensures consistency with your existing testing framework!