Table of Contents
In this article, I will take you through 17 Useful Apache Kafka Examples on Linux(RedHat/CentOS 7/8). For many decades Organizations are using databases to store objects and entities. In a more general term objects and entities state but there were also a separate class of people who started thinking about Object events rather than Object state. Events can be stored in a Log which can further be managed by Apache Kafka. Logs in other way named as topics in Kafka Architecture.
Apache Kafka is a fast and fault-tolerance distributed messaging system. It was initially developed at LinkedIn but later became a Open Source Project under Kafka.
Some Important Terminologies
Topics : An ordered persistent sequence of Events stored in durable way.
Producer : It is responsible for writing kafka data to subscribed topics.
Consumer: It is responsible for reading Kafka data from subscribed topics. You can find more on Explaining Apache Kafka Architecture in 3 Popular Steps about Consumer.
Kafka Stream: It is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters.
Kafka Broker: It stores and manages messages from producer and allows consumers to read messages from topic.
Partitions : A partition is a basic unit of Topic. Each topic consists of multiple partitions. Each partitions holds messages and can be distributed across system to consume messages from multiple consumers.
Apache Kafka Examples
Also Read: Apache Kafka Tutorial for Beginners with 3 Best Examples
Example 1: Check Kafka Version
To check Kafka version, you need to use --version
option as shown below. As you can see from below output, current kafka version is 2.4.1
.
[root@localhost kafka_2.13-2.4.1]# bin/kafka-topics.sh --version 2.4.1 (Commit:c57222ae8cd7866b)
--version :
Display Kafka version.
NOTE:
root
user. You can use any user with sudo access or with appropriate permissions to run these scripts. You can check How to add User to Sudoers to know more about providing sudo access to User.
Example 2. Create a Apache Kafka Topic
In this Apache Kafka Example, you will know how to create a Kafka topic. Here we are creating a topic testTopic1
by using --create
option with kafka-topics.sh
script using 1 partition and with replication factor 1.
[root@localhost kafka_2.13-2.4.1]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic1 Created topic testTopic1.
--create :
Creates a new topic
--zookeeper :
The connection string for the zookeeper connection in the form host:port
--replication-factor :
The replication factor for each partition of the topic being created.
--partitions :
The number of partitions of the topic being created or altered
--topic :
A topic or topics to be altered, created, deleted, or listed and described
Example 3. List All Apache Kafka Topics
If you want to check the list of all Apache Kafka topics, then you need to use --list
option as shown below. As you can see from below output, 3 topics are currently available in the System.
[root@localhost kafka_2.13-2.4.1]# bin/kafka-topics.sh --list --zookeeper localhost:2181 __consumer_offsets sampleTopic testTopic1
--list :
List all available topics.
Example 4: Describe About a Kafka Topic
If you want to see all the information about a Kafka topic then you need to use --describe
as shown in below command. Here we checking partition details, replicas detail, replicationfactor and other information about topic testTopic1
.
[root@localhost kafka_2.13-2.4.1]# bin/kafka-topics.sh --describe --topic testTopic1 --zookeeper localhost:2181 Topic: testTopic1 PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: testTopic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Example 5: Delete a Topic
If you want to delete a Kafka topic, then you need to use --delete
option as shown below. In this apache kafka example, we are trying to delete testTopic1
topic using kafka-topics.sh
script.
[root@localhost kafka_2.13-2.4.1]# bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic testTopic1 Topic testTopic1 is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
--delete :
Delete a topic
Example 6: Start Kafka Server
To start Kafka Server you need to run kafka-server-start.sh
script with configuration properties passed as an argument as mentioned below.
[root@localhost kafka_2.13-2.4.1]# bin/kafka-server-start.sh config/server.properties
[2020-04-03 17:28:46,764] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2020-04-03 17:28:47,519] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2020-04-03 17:28:47,520] INFO starting (kafka.server.KafkaServer)
[2020-04-03 17:28:47,521] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2020-04-03 17:28:47,557] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2020-04-03 17:28:47,566] INFO Client environment:zookeeper.version=3.5.7-f0fdd52973d373ffd9c86b81d99842dc2c7f660e, built on 02/10/2020 11:30 GMT (org.apache.zookeeper.ZooKeeper)
Example 7: Stop Kafka Server
If you want to stop Kafka Server then you need to use kafka-server-stop.sh
script as shown below.
[root@localhost kafka_2.13-2.4.1]# bin/kafka-server-stop.sh
Example 8: Check Configuration of All Topics
If you want to check the configurations of all topics, then you need to mention the --entity-type
and --describe
option with kafka-configs-sh
script to check that.
[root@localhost kafka_2.13-2.4.1]# bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --describe Configs for topic 'sampleTopic' are Configs for topic 'testTopic1' are Configs for topic '__consumer_offsets' are segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
--entity-type :
Type of entity(topics/clients/users/brokers/broker-loggers)
--describe :
List configs for the given entity.
Example 9: Dump Kafka Logs
Sometimes you may want to debug any Kafka error you are currently facing in your Kafka Cluster. To do that you need to dump kafka log using kafka-run-class.sh
script as shown below. This will help you in analyzing various Kafka segments for any possible error.
[root@localhost kafka_2.13-2.4.1]# bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --files /tmp/kafka-logs/testTopic1-0/00000000000000000000.log Dumping /tmp/kafka-logs/testTopic1-0/00000000000000000000.log Starting offset: 0
--deep-iteration :
if set, uses deep instead of shallow iteration
--files :
The comma separated list of data and index log files to be dumped
Example 10: Display API Versions of the Cluster Node
If you want to display API Versions of the cluster nodes then you need to use kafka-broker-api-versions.sh
script as shown below.
[root@localhost kafka_2.13-2.4.1]# bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092 localhost:9092 (id: 0 rack: null) -> ( Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 6 [usable: 6], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 6 [usable: 6], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 4 [usable: 4], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 2 [usable: 2], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 2], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 1], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0] )
--bootstrap-server :
The server to connect to.
Example 11: Increase Partition for a Topic
Sometimes you might want to increase the number of partitions for a topic you can alter the topic by using --alter
option with kafka-topics.sh
script and increase the number of partitions using --partitions
option as shown below.
[root@localhost kafka_2.13-2.4.1]# bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic testTopic1 --partitions 4 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded!
--alter :
Alter the number of partitions,replica assignment, and/or configuration for the topic.
--partitions :
The number of partitions for the topic being created or altered
Example 12: Update Topic Config
If you want to update any topic configuration for example testTopic1
configuration in this case then you need to use --add-config
option with kafka-configs.sh
script as shown below.
[root@localhost kafka_2.13-2.4.1]# bin/kafka-configs.sh --alter --zookeeper localhost:2181 --entity-name testTopic1 --entity-type topics --add-config cleanup.policy=compact Completed Updating config for entity: topic 'testTopic1'.
--entity-name :
Name of entity (topic name/client id/user principal name/broker id)
--entity-type :
Type of entity (topics/clients/users/brokers/broker-loggers)
--add-config :
Key Value pairs of configs to add.
Example 13. Leader Election
If you want to balance the topics using a json file then you can use kafka-leader-election.sh
script and pass the json file as an argument as specified below.
[root@localhost kafka_2.13-2.4.1]# bin/kafka-leader-election.sh --path-to-json-file elect-leader.json --election-type preferred --bootstrap-server localhost:9092 Valid replica already elected for partitions
--path-to-json-file :
The JSON file with the list of partition for which leader elections should be performed.
--election-type :
Type of election to attempt. Possible values are "preferred" for preferred leader election or "unclean" for unclean leader election.
You can find elect-leader.json
file below.
[root@localhost kafka_2.13-2.4.1]# vi elect-leader.json { "partitions": [ { "topic": "testTopic3", "partition": 1 }, { "topic": "testTopic4", "partition": 2 } ] }
Example 14: Push Messages through a File
If you want to push messages from a file then you can use kafka-console-producer.sh
script to take the messages from a file say pushMessage.log
and specify the topic on which you want to push using --topic
option and push the messages.
[root@localhost kafka_2.13-2.4.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic4 < pushMessage.log >>
Example 15: Set Message Retention Time in a Topic
If you want to set retention time of messages in a topic then you can alter the topics configuration by using kafka-configs.sh
script and pass the retention time value in retention.ms
parameter as shown below to add this configuration. Please note that here time is in milliseconds value.
[root@localhost kafka_2.13-2.4.1]# bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name testTopic4 --add-config retention.ms=800 Completed Updating config for entity: topic 'testTopic4'.
Example 16: Get the Earliest Offset in a Topic
To get the earliest offset in a topic you need to use GetOffsetShell
with kafka-run-class.sh
script as mentioned below.
[root@localhost kafka_2.13-2.4.1]# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic testTopic3 --time -2 testTopic3:0:0 testTopic3:1:0 testTopic3:2:0
--time :
<Long: timestamp in milliseconds / -1(latest) / -2 (earliest) timestamp; offsets will come before this timestamp, as in getOffsetsBefore >
--offsets :
number of offsets returned (default: 1)
Example 17: Check Other Options with --help
If you want to check all options that can be used with kafka-topics.sh
script then you need to use --help
option as shown below.
[root@localhost kafka_2.13-2.4.1]# bin/kafka-topics.sh --help This tool helps to create, delete, describe, or change a topic. Option Description ------ ----------- --alter Alter the number of partitions,replica assignment, and/or configuration for the topic. --at-min-isr-partitions if set when describing topics, only show partitions whose isr count is equal to the configured minimum. Not supported with the --zookeeper option. --bootstrap-server <String: server to REQUIRED: The Kafka server to connect connect to> to. In case of providing this, a direct Zookeeper connection won't be required. --command-config <String: command Property file containing configs to be config property file> passed to Admin Client. This is used only with --bootstrap-server option for describing and altering broker configs.