Kafka Testing Guide

Overview

Cuppet Core now includes comprehensive Apache Kafka testing support, allowing you to test event-driven architectures, microservices communication, and streaming data pipelines using Cucumber BDD scenarios.

Built on KafkaJS , this integration provides a robust and developer-friendly way to test Kafka producers, consumers, and message flows in your applications.

Features

✅ Hook-based Connection Management ✅ Topic Subscriptions ✅ Message Publishing ✅ Message Keys Support ✅ JSON Message Validation ✅ Variable Storage ✅ SASL Authentication ✅ SSL/TLS Support
  • Hook-based Connection Management - Automatic connection/cleanup with @kafka tag
  • Topic Subscriptions - Subscribe to single or multiple Kafka topics
  • Message Publishing - Send text and JSON messages with optional keys
  • Message Keys - Support for keyed messages for partitioning
  • JSON Message Validation - Assert message properties, nested values, and types
  • Variable Storage - Save and reuse message data across scenarios
  • SASL Authentication - Support for PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512
  • SSL/TLS Support - Secure connections to Kafka brokers

Setup

Install Dependencies

KafkaJS is included as a dependency in Cuppet Core. No additional installation required.

Configure Kafka Broker

Add your Kafka broker configuration to config/default.json :

{
    "kafka": {
        "brokers": ["localhost:9092"],
        "clientId": "cuppet-test",
        "connectionTimeout": 5000,
        "requestTimeout": 30000,
        "logLevel": 1
    }
}

Configuration Options:

Option Description Default
brokers Array of Kafka broker addresses ["localhost:9092"]
clientId Client identifier for Kafka Auto-generated
connectionTimeout Connection timeout in milliseconds 5000
requestTimeout Request timeout in milliseconds 30000
logLevel KafkaJS log level (0=NOTHING, 1=ERROR, 2=WARN, 4=INFO, 5=DEBUG) 1 (ERROR)

SASL Authentication Configuration:

{
    "kafka": {
        "brokers": ["kafka.example.com:9093"],
        "sasl": {
            "mechanism": "plain",
            "username": "your-username",
            "password": "your-password"
        },
        "ssl": true
    }
}

Supported SASL Mechanisms:

  • plain - SASL/PLAIN authentication
  • scram-sha-256 - SASL/SCRAM-SHA-256
  • scram-sha-512 - SASL/SCRAM-SHA-512

Usage

Basic Scenario Structure

Tag your scenarios with @kafka to automatically connect to the broker:

@kafka
Feature: Kafka Message Testing

  Scenario: Publish and consume a simple message
    Given I subscribe to Kafka topic "test-topic"
    Then I listen for a kafka message on the subscribed topics
    When I send a kafka message to topic "test-topic" with value "Hello Kafka"
    Then I should receive a kafka message with value "Hello Kafka"
    And I unsubscribe from all Kafka topics
Important: Always subscribe to topics and start listening BEFORE publishing messages to ensure messages are captured.

Available Step Definitions

Subscription Steps

# Subscribe to a single topic
Given I subscribe to Kafka topic "orders"

# Subscribe to multiple topics (comma-separated)
Given I subscribe to Kafka topic/topics "orders,payments,shipments"

# Start listening for messages
Then I listen for a kafka message on the subscribed topics

# Unsubscribe from all topics
Given I unsubscribe from all Kafka topics

Publishing Steps

# Send a simple text message
When I send a kafka message to topic "test-topic" with value "Hello World"

# Send a message with a key
When I send a kafka message to topic "orders" with value "Order data" and key "order-123"

# Send a JSON message
When I send a kafka message to topic "events" with JSON value
  """
  {
    "eventType": "user.registered",
    "userId": "12345",
    "timestamp": 1234567890
  }
  """

# Send a JSON message with a key
When I send a kafka message to topic "users" with key "user-456" and JSON value
  """
  {
    "name": "John Doe",
    "email": "john@example.com"
  }
  """

# Use saved variables in messages
When I send a kafka message to topic "notifications" with value "%savedMessage%"

Message Validation Steps

# Validate simple message value
Then I should receive a kafka message with value "Expected message"

# Validate message with key and value
Then I should receive a kafka message with key "order-123" and value "Order confirmed"

# Validate JSON property value
Then I should receive a kafka message with property "eventType" and value "user.registered"

# Validate JSON property with key
Then I should receive a kafka message with property "status" and value "success" and key "order-789"

# Validate property does NOT match a value
Then I should receive a kafka message with property "status" which value does not match "failed"

# Validate property does NOT match with key
Then I should receive a kafka message with property "status" and key "order-123" which value does not match "pending"

Nested Property Access:

Use dot notation to access nested JSON properties:

Then I should receive a kafka message with property "user.profile.email" and value "test@example.com"
Then I should receive a kafka message with property "metadata.timestamp" and value "1234567890"

Examples

Example 1: Simple Publish and Subscribe

@kafka
Feature: Basic Kafka Testing

  Scenario: Simple message publish and receive
    Given I subscribe to Kafka topic "test-topic"
    Then I listen for a kafka message on the subscribed topics
    When I send a kafka message to topic "test-topic" with value "Hello Kafka"
    Then I should receive a kafka message with value "Hello Kafka"
    And I unsubscribe from all Kafka topics

Example 2: JSON Message Validation

@kafka
Feature: Kafka JSON Message Testing

  Scenario: Validate JSON event structure
    Given I subscribe to Kafka topic "events"
    Then I listen for a kafka message on the subscribed topics
    When I send a kafka message to topic "events" with JSON value
      """
      {
        "eventType": "order.created",
        "orderId": "ORD-12345",
        "amount": 99.99,
        "currency": "USD"
      }
      """
    Then I should receive a kafka message with property "eventType" and value "order.created"
    And I should receive a kafka message with property "orderId" and value "ORD-12345"
    And I should receive a kafka message with property "amount" and value "99.99"
    And I unsubscribe from all Kafka topics

Example 3: Messages with Keys

@kafka
Feature: Kafka Keyed Messages

  Scenario: Send and validate keyed messages
    Given I subscribe to Kafka topic "user-events"
    Then I listen for a kafka message on the subscribed topics
    When I send a kafka message to topic "user-events" with key "user-123" and JSON value
      """
      {
        "action": "login",
        "timestamp": 1234567890
      }
      """
    Then I should receive a kafka message with property "action" and value "login" and key "user-123"
    And I unsubscribe from all Kafka topics

Example 4: Complex Nested JSON Validation

@kafka
Feature: Complex Kafka Message Validation

  Scenario: Validate nested JSON properties
    Given I subscribe to Kafka topic "orders"
    Then I listen for a kafka message on the subscribed topics
    When I send a kafka message to topic "orders" with JSON value
      """
      {
        "orderId": "ORD-789",
        "customer": {
          "id": "CUST-456",
          "name": "Jane Smith",
          "contact": {
            "email": "jane@example.com",
            "phone": "+1234567890"
          }
        },
        "items": [
          {
            "productId": "PROD-001",
            "quantity": 2,
            "price": 29.99
          }
        ],
        "total": 59.98
      }
      """
    Then I should receive a kafka message with property "orderId" and value "ORD-789"
    And I should receive a kafka message with property "customer.id" and value "CUST-456"
    And I should receive a kafka message with property "customer.name" and value "Jane Smith"
    And I should receive a kafka message with property "customer.contact.email" and value "jane@example.com"
    And I should receive a kafka message with property "total" and value "59.98"
    And I unsubscribe from all Kafka topics

Example 5: Multiple Topics

@kafka
Feature: Multiple Kafka Topics

  Scenario: Subscribe to multiple topics
    Given I subscribe to Kafka topic/topics "orders,payments,notifications"
    Then I listen for a kafka message on the subscribed topics
    When I send a kafka message to topic "orders" with value "New order"
    When I send a kafka message to topic "payments" with value "Payment received"
    When I send a kafka message to topic "notifications" with value "Order confirmed"
    And I unsubscribe from all Kafka topics

Example 6: Using Variables

@kafka @api
Feature: Kafka with Variable Storage

  Scenario: Use API response in Kafka message
    # Make API call and save response
    Given I set the request body to '{"productName": "Widget"}'
    When I send a "POST" request to "/api/products"
    Then the response code should be "201"
    And I remember the value of the "id" property as "productId"

    # Use saved variable in Kafka message
    Given I subscribe to Kafka topic "product-events"
    Then I listen for a kafka message on the subscribed topics
    When I send a kafka message to topic "product-events" with JSON value
      """
      {
        "eventType": "product.created",
        "productId": "%productId%"
      }
      """
    Then I should receive a kafka message with property "productId" and value "%productId%"
    And I unsubscribe from all Kafka topics

Testing Different Kafka Brokers

Local Kafka (Docker)

# Start Kafka with Docker Compose
docker-compose up -d

# Or use Confluent Platform
docker run -d \
  --name kafka \
  -p 9092:9092 \
  -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
  confluentinc/cp-kafka:latest
{
    "kafka": {
        "brokers": ["localhost:9092"],
        "clientId": "cuppet-test"
    }
}

Confluent Cloud

{
    "kafka": {
        "brokers": ["pkc-xxxxx.us-east-1.aws.confluent.cloud:9092"],
        "clientId": "cuppet-test",
        "sasl": {
            "mechanism": "plain",
            "username": "YOUR_API_KEY",
            "password": "YOUR_API_SECRET"
        },
        "ssl": true
    }
}

AWS MSK (Managed Streaming for Kafka)

{
    "kafka": {
        "brokers": [
            "b-1.mycluster.xxxxx.kafka.us-east-1.amazonaws.com:9092",
            "b-2.mycluster.xxxxx.kafka.us-east-1.amazonaws.com:9092"
        ],
        "clientId": "cuppet-test",
        "ssl": true
    }
}

Azure Event Hubs (Kafka-compatible)

{
    "kafka": {
        "brokers": ["your-namespace.servicebus.windows.net:9093"],
        "clientId": "cuppet-test",
        "sasl": {
            "mechanism": "plain",
            "username": "$ConnectionString",
            "password": "Endpoint=sb://your-namespace.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=..."
        },
        "ssl": true
    }
}

Programmatic Usage

You can also use Kafka functions programmatically in your code:

const { kafkaFunctions, KafkaManager } = require('@cuppet/core');

// Create and initialize Kafka manager
const kafkaManager = new KafkaManager({
    brokers: ['localhost:9092'],
    clientId: 'my-test-client'
});
await kafkaManager.initialize();

// Subscribe to topics
await kafkaFunctions.subscribeToTopics(kafkaManager, 'test-topic');

// Listen for messages
const messagePromise = kafkaFunctions.listenForMessage(kafkaManager);

// Send a message
await kafkaFunctions.sendMessage(kafkaManager, 'test-topic', '{"test": "data"}');

// Wait for and validate message
const message = await messagePromise;
await kafkaFunctions.validateSimpleMessage('{"test": "data"}');

// Cleanup
await kafkaManager.stop();

Troubleshooting

Connection Issues

  • Verify Kafka broker is running and accessible
  • Check broker addresses and ports in configuration
  • Ensure firewall/network allows connections to Kafka ports
  • Verify SASL credentials if using authentication
  • Check SSL/TLS configuration for secure connections

Messages Not Received

  • Always subscribe and start listening BEFORE publishing messages
  • Check topic names (case-sensitive)
  • Verify consumer group is not blocked by another consumer
  • Check Kafka broker logs for errors
  • Ensure topics exist (auto-creation may be disabled)

Authentication Errors

  • Verify SASL mechanism matches broker configuration
  • Check username and password are correct
  • Ensure SSL is enabled when required by broker
  • Verify client has permissions for topics

Reducing Log Verbosity

To suppress KafkaJS INFO logs, set the log level in your configuration:

{
    "kafka": {
        "logLevel": 1
    }
}

Log levels: 0=NOTHING, 1=ERROR, 2=WARN, 4=INFO, 5=DEBUG

Best Practices

  1. Use @kafka tag: Let hooks manage connections automatically
  2. Subscribe before publishing: Always set up subscriptions and listeners before sending messages
  3. Use message keys: For partitioning and ordering guarantees
  4. Meaningful topic names: Follow naming conventions (e.g., domain.entity.event )
  5. Clean up connections: Always unsubscribe from topics after tests
  6. Use unique consumer groups: Auto-generated group IDs prevent conflicts
  7. JSON for complex data: Use JSON format for structured messages
  8. Variable storage: Save dynamic IDs and reuse them across steps
  9. Test isolation: Use unique topic names per test to avoid interference

Architecture

The Kafka implementation follows Cuppet's existing patterns:

  • features/app/managers/kafkaManager.js : Connection lifecycle management (like BrowserManager and MqttManager)
  • src/kafkaFunctions.js : Core Kafka operations (like apiFunctions and mqttFunctions)
  • features/app/stepDefinitions/kafkaSteps.js : Cucumber step definitions for Kafka testing
  • features/app/hooks.js : Automatic connection via @kafka tag

Key Components

KafkaManager

Manages the Kafka client lifecycle, including:

  • Client initialization with configuration
  • Producer and consumer creation
  • Connection management
  • Graceful shutdown and cleanup

kafkaFunctions

Provides high-level operations:

  • Topic subscription management
  • Message publishing with keys
  • Message consumption and listening
  • JSON and text message validation
  • Nested property validation

kafkaSteps

Cucumber step definitions that bridge Gherkin syntax to kafkaFunctions:

  • Natural language step definitions
  • DocString support for JSON messages
  • Variable interpolation
  • Integration with Cuppet's data storage

This design ensures consistency with your existing testing framework!

↑ Back to Top