Skip to main content

Kafka DSL

Architecture

The Kafka DSL is based on the possibility to inject a Kafka proxy client into the main test code with just a few annotations and to use a set of predefined actions on the proxy client that are optimised for the need of testing.

To have the Kafka proxy client injected in the test the only required step is to include an argument of type KafkaApplication in the list of arguments of the test and to annotate it with the minimum required annotations. The respective object is initialised automatically and allows to be readily used in the test.

Example:


class TestClass {

@Test

public void sampleTest(

@Capability(key = "browserName", value = "pn5-driver8")

@Capability(key = "pn5:kafkaUrl", value = "config.kafka.url", type = ValueType.STRING_PROPERTY)

KafkaApplication kafka) {


kafka

.listTopics()

.queryTopic("topic1")

.forLatestPerPartitionCount(10)

.getMessages()

.filterMessagesHavingSubstring("000")

.assertThatMessageCountIsAtLeast(1);

}

}

On the background Pumpo#5 starts a session on the testing farm and transfers specified capabilities to the proxy client. Those are mainly related to the credentials required to connect to the Kafka broker(s).

The capabilities that can be passed to the Kafka proxy client are documented in the section related to configuring connection to systems under test

Wrapping in custom objects

Instead of using the predefined methods in the KafkaApplication interface, it is possible to define a custom interface that extends KafkaApplication and to wrap the predefined actions into more business oriented.

Example:


class TestClass {

@Test

public void sampleTest(MyCustomKafkaApplication kafka) {

kafka

.checkThat000MessageWasRecentlyReceived();

}

}


@Capability(key = "browserName", value = "pn5-driver8")

@Capability(key = "pn5:kafkaUrl", value = "config.kafka.url", type = ValueType.STRING_PROPERTY)

public interface MyCustomKafkaApplication extends KafkaApplication {

default MyCustomKafkaApplication checkThat000MessageWasRecentlyReceived() {

queryTopic("topic1")

.forLatestPerPartitionCount(10)

.getMessages()

.filterMessagesHavingSubstring("000")

.assertThatMessageCountIsAtLeast(1);

return null;

}

}

This pattern is common for all domains handled by Pumpo#5 and is the recommended coding style to keep tests readable also for people not having deeper knowledge of implementation details.

Method reference

KafkaApplication::createTopic

Parameter | Type | Description

---|---|---

newTopic | String | The name of the new topic to create.

Creates a topic with the specified name and returns a KafkaResponse. In case the topic could not be created throws an exception.

KafkaApplication::listTopics

Lists topics available and returns a KafkaListResponse. Only topics that are visible to the current user will be returned.

KafkaApplication::queryTopic

Parameter | Type | Description

---|---|---

topic | String | The name of the topic to query.

Creates a ReadQuery builder for the specified topic. The builder allows then to set various filters that will be used to query Kafka by the Kafka client proxy. Those filters allow defining:

  • A specific partition

  • A specific offset

  • A specific message count per partition from the tail of the partition

  • A specific range of timestamp defined either explicitly in milliseconds or as age from now in seconds

  • A maximum total count of messages to be returned

Some combinations of filters do not have any sens and IllegalArgumentException will be thrown. A detailed description is available for each builder method.

Once the filter is set, the method getMessages is to be used to actually send the request to the Kafka proxy client.

KafkaApplication::withPartition

Parameter | Type | Description

---|---|---

partitionId | int | The partition identifier to which the message should be written in a topic

Before writing a message to a topic, you can set a partition identifier to which this message will be written, in the specified topic.

KafkaApplication::writeMessage

Parameter | Type | Mandatory | Description

---|---|---|---

topic | String | mandatory | The name of the topic to which to publish the new message (record).

message | String | mandatory | The explicit content of the value of the record to create.

key | String | optional | Key to determine the partition within a log to which a message should be appended to

headers | Map | optional | A map containing headers to be included with the message

Writes a message with the specified value to the specific topic and returns a KafkaWriteResponse. In case the record could not be written, an exception is thrown.

ReadQuery::limitedToMaxMessageCountOf

Parameter | Type | Description

---|---|---

maxCount | int | The max message count to be returned

Sets the limit of messages to transfer within the read query. This limit is applied after getting messages from Kafka and sorting them by timestamp on the Kafka proxy client side. In case tail messages were requested (if no end date has been set using any of the ReadQuery builder methods) then the list of messages is truncated from the end so the list will indeed contain tail messages. But, in case the filter indicates that a range of messages that might not be tail messages was requested then the truncating will be applied to the naturally sorted records by timestamp. Typically, if requesting messages between a lower date and an upper date bounds and the total number of messages is higher, then the returned list will contain the messages close to the lower date bound but messages close to the upper date bound might be missing.

The default limit of messages returned is 1000 and this limit can be increased to 10000 using this method.

ReadQuery::forLatestPerPartitionCount

Parameter | Type | Description

---|---|---

tailOffsets | int | The number of offsets backward from the last offset, 1 means one message will be retrieved

Sets the primary filter to tail number of messages per partition. Retrieved messages are then consolidated and sorted by timestamp and then truncated to the maximum message count.

ReadQuery::forAgedBetween

Parameter | Type | Description

---|---|---

minAgeInSeconds | int | The minimum age in seconds inclusive.

maxAgeInSeconds | int | The maximum age in seconds inclusive.

Sets the primary filter to an age range. The age is calculated from now and is in seconds.

Age is converted to timestamp based on current instant. The minAgeInSeconds can be negative in which case the driver will wait for new messages until the respective instant. The maximum future outlook is 60s.

Be aware of some internal clock difference and latency between the test runtime and the Kafka brokers.

ReadQuery::onPartition

Parameter | Type | Description

---|---|---

partition | int | The id of the partition to which to limit the query.

Sets the primary filter to a specific partition. Only one partition can be specified.

ReadQuery::forTimestampBetween

Parameter | Type | Description

---|---|---

dateStart | long | The start date of the range to retrieve inclusive in milliseconds from Unix Epoch.

dateEnd | long | The end date of the range to retrieve inclusive in milliseconds from Unix Epoch.

Sets the primary filter to a timestamp range. It is allowed to specify up to 60s in the future for the upper limit.

Be aware of some internal clock difference and latency between the test runtime and the Kafka brokers.

ReadQuery::forTimestampAfter

Parameter | Type | Description

---|---|---

milliStart | long | The start date of the range to retrieve inclusive in milliseconds from Unix Epoch.

Sets a lower timestamp limit on the primary filter.

Be aware of some internal clock difference and latency between the test runtime and the Kafka brokers.

ReadQuery::forTimestampBefore

Parameter | Type | Description

---|---|---

millisEnd | long | The end date of the range to retrieve inclusive in milliseconds from Unix Epoch.

Sets an upper timestamp limit on the primary filter. Timestamp should be passed in milliseconds from Unix Epoch. In case the upper timestamp limit is not set by this method or any other method, it will be automatically set to 2s in the future. It is allowed to specify up to 60s in the future.

Be aware of some internal clock difference and latency between the test runtime and the Kafka brokers.

ReadQuery::getMessages

Sends the read query to the Kafka proxy and returns a KafkaReadResponse

ReadQuery::atOffset

Parameter | Type | Description

---|---|---

offset | int | The exact offset to look for on each partition included.

Sets the primary filter to target a specific offset on each partition. In case no specific partition is specified, the offset will set on each partition. This can result in an exception because some partitions might be well behind in the number of messages.

ReadQuery::forOlderThan

Parameter | Type | Description

---|---|---

ageInSeconds | int | The minimum age inclusive in seconds. Age is calculated from now.

Sets a lower age limit on the primary filter.

Be aware of some internal clock difference and latency between the test runtime and the Kafka brokers.

ReadQuery::forYoungerThan

Parameter | Type | Description

---|---|---

ageInSeconds | int | The maximum age inclusive in seconds. Age is calculated from now.

Sets an upper age limit on the primary filter. The ageInSeconds can be negative in which case the upper timestamp limit will be in the future. In case the upper limit is not specified using this method or any other method, it will be automatically set to 2s in the future. It is allowed to specify up to 60s in the future.

Be aware of some internal clock difference and latency between the test runtime and the Kafka brokers.

KafkaReadResponse::assertThatContainsMessageValue

Parameter | Type | Description

---|---|---

value | String | The exact value the message needs to contain to stay in the filtered list.

Asserts that the list of messages contains an exact message value. The assertion uses String comparison so the exact value needs to be provided.

KafkaReadResponse::assertThatMessageCountIsAtLeast

Parameter | Type | Description

---|---|---

minCount | int | The minimum count of message required to pass the assert.

Asserts that the list of messages contains at least the specified number of messages.

KafkaReadResponse::filterMessagesHaving

Parameter | Type | Description

---|---|---

poJoClass | Class | The class to which values of messages should be deserialised from JSON before proceeding with the predicate

assertion | Function<PoJoClass,Boolean> | A function (lambda is fine) to run on each deserialised object: if true is returned than the message stays in the filtered list.

Filters the returned list of messages based on a custom callback on a custom PoJo class to which all messages are deserialised from JSON. The PoJo class can be declared as inner class or externally. The PoJo structure can have nested classes. The deserialisation will ignore JSON fields not present in the PoJo structure and will only consider PoJo members that are public or have public setters.

All secondary filters including this one can be applied unlimitedly in any order and shoud produce the same result.

When the filter is applied the KafkaReadResponse still maintains a full list of messages in a hidden attribute and it is possible to restore this full list using the method unfliter().

KafkaReadResponse::filterMessagesHavingSubstring

Parameter | Type | Description

---|---|---

substring | String | A substring that the message value needs to contain to stay in the filtered list

Filters the returned list of messages based on whether they contain the provided substring. Simple string comparison is used to apply the filter.

All secondary filters including this one can be applied unlimitedly in any order and shoud produce the same result.

When the filter is applied the KafkaReadResponse still maintains a full list of messages in a hidden attribute and it is possible to restore this full list using the method unfliter().

KafkaReadResponse::messagesAsCollectionOf

Parameter | Type | Description

---|---|---

poJoClass | Class | The class to which values of messages should be deserialised from JSON

Returns the filtered messages as a Collection<PoJo> where PoJo has been provided and is used to deserialise from JSON. The same rules for the deseralisation apply as for the method filterMessagesHaving. Typically the same PoJo class can be used in both methods.

KafkaReadResponse::unfilter

Cancels all secondary filters applied and allows restarting filtering all received messages.

KafkaReadResponse.messages

List of messages as Collection<KafkaMessage>. KafkaMessage has public members that will be proposed by the autocomplete of your IDE.

KafkaWriteResponse.offset

Offset where the record was written.

KafkaWriteResponse.partition

Partition on which the record was written.

KafkaWriteResponse.serializedKeySize

The size of serialised key. Serialisation used is String so there is no transformation applied by the Kafka client proxy.

KafkaWriteResponse.serializedValueSize

The size of serialised value. Serialisation used is String so there is no transformation applied by the Kafka client proxy.

KafkaWriteResponse.timestamp

Timestamp of the record. This will be the timestamp as per the Kafka broker that processed the record.

ListTopicsResponse::assertThatContainsTopicNamed

Parameter | Type | Description

---|---|---

topic | String | The name of the topic that needs to be present in order to pass the assertion.

Asserts that the list of topics returned contains the specified topic name. Basic string comparison is used.

ListTopicsResponse::assertThatDoesNotContainTopicNamed

Parameter | Type | Description

---|---|---

topic | String | The name of the topic that needs to not be present in order to pass the assertion.

Asserts that the list of topics returned does not contain the specified topic name. Basic string comparison is used.

ListTopicsResponse.topics

List of topics as Collection<String> for custom processing.

Any KafkaResponse::andThen

Returns the initial KafkaApplication to be able to continue the flow. Typically, after a set of assertions this method can be used to re-query Kafka for something else to run additional assertions.