--- title: AsyncAPI Broker Mocking description: Mock Kafka and MQTT brokers using AsyncAPI 2.x/3.x specs — publish examples, subscribe to record messages, validate against JSON Schema, all via REST or Java API. shortTitle: AsyncAPI Messaging layout: page pageOrder: 11.5 section: 'Mock Server' subsection: true sitemap: priority: 0.7 changefreq: 'monthly' lastmod: 2026-06-01T14:00:00+00:00 ---
MockServer's AsyncAPI broker mocking lets you drive Kafka and MQTT brokers with realistic example messages, all derived directly from an AsyncAPI 2.x or 3.x specification. MockServer can also subscribe to broker channels to record incoming messages for verification — mirroring how HTTP requests are recorded.
Load an AsyncAPI spec and start mocking via the REST API:
The request body can be either a plain AsyncAPI spec (JSON or YAML) or a JSON wrapper with broker configuration:
{
"spec": {
"asyncapi": "2.6.0",
"info": { "title": "Orders API", "version": "1.0.0" },
"channels": {
"orders": {
"publish": {
"message": {
"payload": {
"type": "object",
"properties": {
"orderId": { "type": "integer" },
"status": { "type": "string", "enum": ["pending", "shipped"] }
},
"required": ["orderId"]
}
}
}
}
}
},
"brokerConfig": {
"kafkaBootstrapServers": "localhost:9092",
"publishOnLoad": true,
"consume": true
}
}
| Field | Type | Default | Description |
|---|---|---|---|
| kafkaBootstrapServers | string | null | Kafka bootstrap servers (e.g. localhost:9092) |
| kafkaGroupId | string | mockserver-async-consumer | Consumer group ID for Kafka subscribers |
| mqttBrokerUrl | string | null | MQTT broker URL (e.g. tcp://localhost:1883) |
| mqttClientId | string | auto-generated | MQTT client ID prefix |
| mqttQos | int | 1 | MQTT QoS level (0, 1, or 2) |
| publishOnLoad | boolean | true | Publish example messages immediately when spec is loaded |
| publishIntervalMillis | long | 0 (disabled) | Publish examples periodically at this interval |
| consume | boolean | false | Subscribe to channels and record incoming messages |
Returns the loaded spec info, active channels, publisher/subscriber counts, and recorded messages (including per-message schema validation).
Verify that messages recorded by subscribers match given criteria. This mirrors the semantics of PUT /mockserver/verify for HTTP requests.
The request body is a JSON object with the following fields:
| Field | Type | Required | Description |
|---|---|---|---|
| channel | string | yes | The channel or topic to check for messages |
| payloadSubstring | string | no | The message payload must contain this substring |
| payloadJsonPath | string | no | Dot-notation JSON path to extract from the payload (e.g. user.name) |
| expectedValue | string | no | Expected value at the JSON path (used together with payloadJsonPath) |
| count | object | no | Count constraints: {atLeast, atMost, exactly}. Default: {atLeast: 1} |
| Status | Meaning |
|---|---|
| 202 Accepted | Verification passed |
| 406 Not Acceptable | Verification failed (body contains a human-readable failure reason) |
| 400 Bad Request | Malformed request (missing channel, invalid JSON) |
| 501 Not Implemented | The mockserver-async module is not on the classpath |
PUT /mockserver/asyncapi/verify HTTP/1.1
Content-Type: application/json
{
"channel": "orders",
"payloadJsonPath": "user.name",
"expectedValue": "Alice",
"count": { "atLeast": 1 }
}
PUT /mockserver/asyncapi/verify HTTP/1.1
Content-Type: application/json
{
"channel": "errors",
"count": { "exactly": 0 }
}
All async mocking state (publishers, subscribers, and recorded messages) is cleared when you call PUT /mockserver/reset.
| Version | Channel structure | Example resolution |
|---|---|---|
| AsyncAPI 2.x | channels.<name>.publish|subscribe.message.payload | Inline payload.example or message.examples[].payload |
| AsyncAPI 3.x | channels.<name>.messages.<msgName>.payload | examples[].payload; basic $ref to #/components/messages/<name> |
Both JSON and YAML spec formats are accepted. Missing or incomplete structures are tolerated gracefully.
When a channel's message definition includes a JSON Schema (payload), MockServer validates:
Schema validation supports JSON Schema Draft 4 through Draft 2019-09, including constraints like required, enum, minimum/maximum, pattern, and format.
Add the mockserver-async dependency to your project, then wire the components together:
import org.mockserver.async.AsyncApiMockOrchestrator;
import org.mockserver.async.asyncapi.AsyncApiParser;
import org.mockserver.async.asyncapi.AsyncApiSpec;
import org.mockserver.async.publish.KafkaMessagePublisher;
// 1. Parse your AsyncAPI spec (from a string, file, or resource)
String specYaml = Files.readString(Path.of("asyncapi.yaml"));
AsyncApiSpec spec = new AsyncApiParser().parse(specYaml);
// 2. Create a publisher pointed at your test broker
KafkaMessagePublisher publisher = new KafkaMessagePublisher("localhost:9092");
// 3. Create the orchestrator and publish once
AsyncApiMockOrchestrator orchestrator = new AsyncApiMockOrchestrator(spec, publisher);
orchestrator.publishAll();
// Or publish repeatedly on a schedule (e.g. every 500 ms)
orchestrator.startPublishing(500);
// ... run your consumer tests ...
orchestrator.stop();
// Always close the publisher to release broker connections
publisher.close();
For subscribing to record messages:
import org.mockserver.async.subscribe.KafkaMessageSubscriber;
import org.mockserver.async.subscribe.RecordedMessage;
KafkaMessageSubscriber subscriber = new KafkaMessageSubscriber("localhost:9092", "test-group");
subscriber.subscribe("orders");
// ... wait for messages ...
List<RecordedMessage> messages = subscriber.getRecordedMessages("orders");
for (RecordedMessage msg : messages) {
System.out.println("Key: " + msg.getKey() + ", Payload: " + msg.getPayload());
}
subscriber.close();
| Broker | Publisher | Subscriber | Features |
|---|---|---|---|
| Kafka | KafkaMessagePublisher | KafkaMessageSubscriber | Record keys, headers, consumer group |
| MQTT | MqttMessagePublisher | MqttMessageSubscriber | QoS 0/1/2, binary payloads |
These properties provide server-wide defaults for async messaging. Per-request brokerConfig values override them.
| Property | Env Variable | Type | Default | Description |
|---|---|---|---|---|
| mockserver.asyncKafkaBootstrapServers | MOCKSERVER_ASYNC_KAFKA_BOOTSTRAP_SERVERS | string | "" (unset) | Default Kafka bootstrap servers used when the per-request brokerConfig does not include kafkaBootstrapServers. |
| mockserver.asyncMqttBrokerUrl | MOCKSERVER_ASYNC_MQTT_BROKER_URL | string | "" (unset) | Default MQTT broker URL used when the per-request brokerConfig does not include mqttBrokerUrl. |
| mockserver.asyncRecordedMessageMaxEntries | MOCKSERVER_ASYNC_RECORDED_MESSAGE_MAX_ENTRIES | int | 1000 | Maximum number of recorded messages retained per channel. When the cap is reached, the oldest messages are evicted (FIFO). |
See the Configuration Properties page for the full four-form reference (Java code, system property, environment variable, property file).
The MockServerClient class provides three convenience methods for async messaging:
MockServerClient client = new MockServerClient("localhost", 1080);
// 1. Load an AsyncAPI spec and start mocking
String status = client.loadAsyncApi("{\"spec\":{\"asyncapi\":\"2.6.0\", ...}, \"brokerConfig\":{\"kafkaBootstrapServers\":\"localhost:9092\", \"consume\":true}}");
// 2. Check current async mocking status
String currentStatus = client.asyncApiStatus();
// 3. Verify recorded messages
client.verifyAsyncMessage("{\"channel\":\"orders\", \"payloadJsonPath\":\"user.name\", \"expectedValue\":\"Alice\", \"count\":{\"atLeast\":1}}");
// throws AssertionError if verification fails