--- title: AsyncAPI Broker Mocking description: Mock Kafka and MQTT brokers using AsyncAPI 2.x/3.x specs — publish examples, subscribe to record messages, validate against JSON Schema, all via REST or Java API. shortTitle: AsyncAPI Messaging layout: page pageOrder: 11.5 section: 'Mock Server' subsection: true sitemap: priority: 0.7 changefreq: 'monthly' lastmod: 2026-06-01T14:00:00+00:00 ---

MockServer's AsyncAPI broker mocking lets you drive Kafka and MQTT brokers with realistic example messages, all derived directly from an AsyncAPI 2.x or 3.x specification. MockServer can also subscribe to broker channels to record incoming messages for verification — mirroring how HTTP requests are recorded.

 

How it works

  1. ParseAsyncApiParser reads the AsyncAPI document (JSON or YAML, auto-detected) and builds an in-memory model of channels and their message schemas.
  2. GenerateMessageExampleGenerator produces a JSON example payload for each channel. It uses explicit examples from the spec when present; otherwise it synthesises a schema-aware example (respecting enum, default, format, minimum/maximum, minLength, and other JSON Schema constraints).
  3. PublishAsyncApiMockOrchestrator sends each payload to the broker via a MessagePublisher adapter. Kafka publishing supports record keys and headers; MQTT supports configurable QoS (0/1/2) and binary payloads.
  4. Subscribe (optional) — MessageSubscriber implementations subscribe to broker channels and record incoming messages, including keys, headers, and schema validation results.
 

REST control-plane

Load an AsyncAPI spec and start mocking via the REST API:

Load a spec: PUT /mockserver/asyncapi

The request body can be either a plain AsyncAPI spec (JSON or YAML) or a JSON wrapper with broker configuration:

{
  "spec": {
    "asyncapi": "2.6.0",
    "info": { "title": "Orders API", "version": "1.0.0" },
    "channels": {
      "orders": {
        "publish": {
          "message": {
            "payload": {
              "type": "object",
              "properties": {
                "orderId": { "type": "integer" },
                "status": { "type": "string", "enum": ["pending", "shipped"] }
              },
              "required": ["orderId"]
            }
          }
        }
      }
    }
  },
  "brokerConfig": {
    "kafkaBootstrapServers": "localhost:9092",
    "publishOnLoad": true,
    "consume": true
  }
}

Broker configuration options

FieldTypeDefaultDescription
kafkaBootstrapServersstringnullKafka bootstrap servers (e.g. localhost:9092)
kafkaGroupIdstringmockserver-async-consumerConsumer group ID for Kafka subscribers
mqttBrokerUrlstringnullMQTT broker URL (e.g. tcp://localhost:1883)
mqttClientIdstringauto-generatedMQTT client ID prefix
mqttQosint1MQTT QoS level (0, 1, or 2)
publishOnLoadbooleantruePublish example messages immediately when spec is loaded
publishIntervalMillislong0 (disabled)Publish examples periodically at this interval
consumebooleanfalseSubscribe to channels and record incoming messages

Check status: GET /mockserver/asyncapi

Returns the loaded spec info, active channels, publisher/subscriber counts, and recorded messages (including per-message schema validation).

Verify recorded messages: PUT /mockserver/asyncapi/verify

Verify that messages recorded by subscribers match given criteria. This mirrors the semantics of PUT /mockserver/verify for HTTP requests.

The request body is a JSON object with the following fields:

FieldTypeRequiredDescription
channel string yes The channel or topic to check for messages
payloadSubstring string no The message payload must contain this substring
payloadJsonPath string no Dot-notation JSON path to extract from the payload (e.g. user.name)
expectedValue string no Expected value at the JSON path (used together with payloadJsonPath)
count object no Count constraints: {atLeast, atMost, exactly}. Default: {atLeast: 1}

Responses

StatusMeaning
202 AcceptedVerification passed
406 Not AcceptableVerification failed (body contains a human-readable failure reason)
400 Bad RequestMalformed request (missing channel, invalid JSON)
501 Not ImplementedThe mockserver-async module is not on the classpath

Example: verify a message with a specific field value

PUT /mockserver/asyncapi/verify HTTP/1.1
Content-Type: application/json

{
  "channel": "orders",
  "payloadJsonPath": "user.name",
  "expectedValue": "Alice",
  "count": { "atLeast": 1 }
}

Example: verify no error messages were published (negative assertion)

PUT /mockserver/asyncapi/verify HTTP/1.1
Content-Type: application/json

{
  "channel": "errors",
  "count": { "exactly": 0 }
}

Reset

All async mocking state (publishers, subscribers, and recorded messages) is cleared when you call PUT /mockserver/reset.

 

Supported AsyncAPI versions

VersionChannel structureExample resolution
AsyncAPI 2.x channels.<name>.publish|subscribe.message.payload Inline payload.example or message.examples[].payload
AsyncAPI 3.x channels.<name>.messages.<msgName>.payload examples[].payload; basic $ref to #/components/messages/<name>

Both JSON and YAML spec formats are accepted. Missing or incomplete structures are tolerated gracefully.

 

Schema validation

When a channel's message definition includes a JSON Schema (payload), MockServer validates:

Schema validation supports JSON Schema Draft 4 through Draft 2019-09, including constraints like required, enum, minimum/maximum, pattern, and format.

 

Java usage

Add the mockserver-async dependency to your project, then wire the components together:

import org.mockserver.async.AsyncApiMockOrchestrator;
import org.mockserver.async.asyncapi.AsyncApiParser;
import org.mockserver.async.asyncapi.AsyncApiSpec;
import org.mockserver.async.publish.KafkaMessagePublisher;

// 1. Parse your AsyncAPI spec (from a string, file, or resource)
String specYaml = Files.readString(Path.of("asyncapi.yaml"));
AsyncApiSpec spec = new AsyncApiParser().parse(specYaml);

// 2. Create a publisher pointed at your test broker
KafkaMessagePublisher publisher = new KafkaMessagePublisher("localhost:9092");

// 3. Create the orchestrator and publish once
AsyncApiMockOrchestrator orchestrator = new AsyncApiMockOrchestrator(spec, publisher);
orchestrator.publishAll();

// Or publish repeatedly on a schedule (e.g. every 500 ms)
orchestrator.startPublishing(500);
// ... run your consumer tests ...
orchestrator.stop();

// Always close the publisher to release broker connections
publisher.close();

For subscribing to record messages:

import org.mockserver.async.subscribe.KafkaMessageSubscriber;
import org.mockserver.async.subscribe.RecordedMessage;

KafkaMessageSubscriber subscriber = new KafkaMessageSubscriber("localhost:9092", "test-group");
subscriber.subscribe("orders");

// ... wait for messages ...

List<RecordedMessage> messages = subscriber.getRecordedMessages("orders");
for (RecordedMessage msg : messages) {
    System.out.println("Key: " + msg.getKey() + ", Payload: " + msg.getPayload());
}

subscriber.close();
 

Supported brokers

BrokerPublisherSubscriberFeatures
Kafka KafkaMessagePublisher KafkaMessageSubscriber Record keys, headers, consumer group
MQTT MqttMessagePublisher MqttMessageSubscriber QoS 0/1/2, binary payloads
 

Configuration properties

These properties provide server-wide defaults for async messaging. Per-request brokerConfig values override them.

Property Env Variable Type Default Description
mockserver.asyncKafkaBootstrapServers MOCKSERVER_ASYNC_KAFKA_BOOTSTRAP_SERVERS string "" (unset) Default Kafka bootstrap servers used when the per-request brokerConfig does not include kafkaBootstrapServers.
mockserver.asyncMqttBrokerUrl MOCKSERVER_ASYNC_MQTT_BROKER_URL string "" (unset) Default MQTT broker URL used when the per-request brokerConfig does not include mqttBrokerUrl.
mockserver.asyncRecordedMessageMaxEntries MOCKSERVER_ASYNC_RECORDED_MESSAGE_MAX_ENTRIES int 1000 Maximum number of recorded messages retained per channel. When the cap is reached, the oldest messages are evicted (FIFO).

See the Configuration Properties page for the full four-form reference (Java code, system property, environment variable, property file).

 

Java client helpers

The MockServerClient class provides three convenience methods for async messaging:

MockServerClient client = new MockServerClient("localhost", 1080);

// 1. Load an AsyncAPI spec and start mocking
String status = client.loadAsyncApi("{\"spec\":{\"asyncapi\":\"2.6.0\", ...}, \"brokerConfig\":{\"kafkaBootstrapServers\":\"localhost:9092\", \"consume\":true}}");

// 2. Check current async mocking status
String currentStatus = client.asyncApiStatus();

// 3. Verify recorded messages
client.verifyAsyncMessage("{\"channel\":\"orders\", \"payloadJsonPath\":\"user.name\", \"expectedValue\":\"Alice\", \"count\":{\"atLeast\":1}}");
// throws AssertionError if verification fails
 

Not yet supported