Have you noticed that some topics in your Kafka cluster have a suboptimal replication factor? This could be due to initial setup, low traffic, or the product being in its early stages.
As your data grows, it’s important to ensure durability and scalability of your Kafka cluster. Our aim is to make every message stored on at least N (e.g. 3) brokers, for better resiliency and availability.
This blog post will guide you through increasing the replication factor for pre-existing Kafka topics. Let’s take a look at an example where we need to increase the replication factor for some existing topics.
Topic Name | Current Replication Factor | New Replication Factor |
_consumer_offsets | 1 | 3 |
alerts | 1 | 3 |
bank-balance | 1 | 3 |
Increase the replication factor for a topic is a 4 step process
- Check existing partition assignment
- Create a custom reassignment plan ( a JSON file)
- Do the reassignment process
- Verify the assignments
Approach
We will be creating a script that uses existing Kafka scripts to Generate, Execute & Verify the tasks. scripts are as below
- kafka-reassign-partitions.sh
- kafka-topics.sh
Option | Description |
–kafka | Kafka installation directory |
–zookeeper | The connection string for zookeeper |
–topic | The topic name for which replicas need to be altered |
–operation | What operation to perform, Possible values are : * currentconfig * newconfig * increasereplica * verify |
–brokers | a new set of brokers |
–replicafile | Optional File will be used to provide new replica configuration, can only be used with increasereplica and verify operations. if provided will use this else will expect file in the following format in tmp directory /tmp/<<TOPIC NAME>>_replica_change.json |
Check Existing Configuration ( –currentconfig)
Input
python ./increase_replication.py --kafka=/opt/kafka --zookeeper=localhost:2181 --topic=alerts --operation=currentconfig
Output
<pre class="wp-block-syntaxhighlighter-code">{"alerts":{"Topic":"alerts","PartitionCount":"9","ReplicationFactor":"1","Configs":{"cleanup.policy":"compact"},"partitions":[{"topic":"alerts","partition":0,"replicas":[3]},{"topic":"alerts","partition":1,"replicas":[2]},{"topic":"alerts","partition":2,"replicas":[1]},{"topic":"alerts","partition":3,"replicas":[3]},{"topic":"alerts","partition":4,"replicas":[2]},{"topic":"alerts","partition":5,"replicas":[1]},{"topic":"alerts","partition":6,"replicas":[3]},{"topic":"alerts","partition":7,"replicas":[2]},{"topic":"alerts","partition":8,"replicas":[1]}]}}</pre>
Generate New Configuration ( –newconfig)
Input
python ./increase_replication.py --kafka=/opt/kafka_2.11-1.1.0 --zookeeper=localhost:12181,localhost:22181,localhost:32181 --topic=bank-balance-application-bank-balance-agg-changelog --operation=newconfig --brokers=1,2,3
Output
{"version":1,"partitions":[{"topic":"alerts","partition":0,"replicas":[3,1,2]},{"topic":"alerts","partition":1,"replicas":[2,1,3]},{"topic":"alerts","partition":2,"replicas":[1,2,3]},{"topic":"alerts","partition":3,"replicas":[3,1,2]},{"topic":"alerts","partition":4,"replicas":[2,1,3]},{"topic":"alerts","partition":5,"replicas":[1,2,3]},{"topic":"alerts","partition":6,"replicas":[3,1,2]},{"topic":"alerts","partition":7,"replicas":[2,1,3]},{"topic":"alerts","partition":8,"replicas":[1,2,3]}
the above will create a JSON file from the output of the above response which will be used to increase new replicas( input file for next step)
Execute New Configuration (–increasereplica)
Input
python ./increase_replication.py --kafka=/opt/kafka_2.11-1.1.0 --zookeeper=localhost:12181,localhost:22181,localhost:32181 --topic=bank-balance-application-bank-balance-agg-changelog --operation=increasereplica --replicafile=/tmp/new_config.json
Output
{"version":1,"partitions":[{"topic":"alerts","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"alerts","partition":0,"replicas":[3],"log_dirs":["any"]},{"topic":"alerts","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"alerts","partition":3,"replicas":[3],"log_dirs":["any"]},{"topic":"alerts","partition":7,"replicas":[2],"log_dirs":["any"]},{"topic":"alerts","partition":4,"replicas":[2],"log_dirs":["any"]},{"topic":"alerts","partition":8,"replicas":[1],"log_dirs":["any"]},{"topic":"alerts","partition":6,"replicas":[3],"log_dirs":["any"]},{"topic":"alerts","partition":5,"replicas":[1],"log_dirs":["any"]}]}
Save this to use as the –reassignment-json-file option during rollback
Successfully started reassignment of partitions.
Verify Configuration (–verify)
Input
python ./increase_replication.py --kafka=/opt/kafka_2.11-1.1.0 --zookeeper=localhost:12181,localhost:22181,localhost:32181 --topic=bank-balance-application-bank-balance-agg-changelog --operation=verify --replicafile=/tmp/new_config.json
Output
Reassignment of partition alerts-1 completed successfully
Reassignment of partition alerts-0 completed successfully
Reassignment of partition alerts-2 completed successfully
Reassignment of partition alerts-3 completed successfully
Reassignment of partition alerts-7 completed successfully
Reassignment of partition alerts-4 completed successfully
Reassignment of partition alerts-8 completed successfully
Reassignment of partition alerts-6 completed successfully
Reassignment of partition alerts-5 completed successfully
Testing
We can verify the same using Kafka manager
References
- https://stackoverflow.com/questions/37960767/how-to-change-the-replicas-of-kafka-topic
- https://kafka.apache.org/documentation/#basic_ops_increase_replication_factor
- https://www.ibm.com/support/knowledgecenter/en/SSCVHB_1.2.0/admin/tnpi_reassign_partitions.html
Code
The code is available in GitHub: