How to Increase Replication Factor for a Kafka topic
Some we have observed that the current replication factor for some topics is not set to its best. The reason may be the initial setup of Kafka, less traffic or the product is in initial stages
In the future when our data grows, our aim shits towards durability and scalability and we want to make sure that the Kafka cluster is more resilient and available.
The aim is to make every message to be stored at least on N (say 3) brokers The blog captures how to increase the replication factor for some of the pre-existing Kafka topics.
Let’s take an example where we have some existing topics with some replication factor and we want to increase that.
Topic Name | Current Replication Factor | New Replication Factor |
_consumer_offsets | 1 | 3 |
alerts | 1 | 3 |
bank-balance | 1 | 3 |
Increase the replication factor for a topic is a 4 step process
- Check existing partition assignment
- Create a custom reassignment plan ( a JSON file)
- Do the reassignment process
- Verify the assignments
Approach
We will be creating a script that uses existing Kafka scripts to Generate, Execute & Verify the tasks. scripts are as below
- kafka-reassign-partitions.sh
- kafka-topics.sh
Option | Description |
–kafka | Kafka installation directory |
–zookeeper | The connection string for zookeeper |
–topic | The topic name for which replicas need to be altered |
–operation | What operation to perform, Possible values are : * currentconfig * newconfig * increasereplica * verify |
–brokers | a new set of brokers |
–replicafile | Optional File will be used to provide new replica configuration, can only be used with increasereplica and verify operations. if provided will use this else will expect file in the following format in tmp directory /tmp/<<TOPIC NAME>>_replica_change.json |
Check Existing Configuration ( –currentconfig)
Input
python ./increase_replication.py --kafka=/opt/kafka --zookeeper=localhost:2181 --topic=alerts --operation=currentconfig
Output
{"alerts":{"Topic":"alerts","PartitionCount":"9","ReplicationFactor":"1","Configs":{"cleanup.policy":"compact"},"partitions":[{"topic":"alerts","partition":0,"replicas":[3]},{"topic":"alerts","partition":1,"replicas":[2]},{"topic":"alerts","partition":2,"replicas":[1]},{"topic":"alerts","partition":3,"replicas":[3]},{"topic":"alerts","partition":4,"replicas":[2]},{"topic":"alerts","partition":5,"replicas":[1]},{"topic":"alerts","partition":6,"replicas":[3]},{"topic":"alerts","partition":7,"replicas":[2]},{"topic":"alerts","partition":8,"replicas":[1]}]}}
Generate New Configuration ( –newconfig)
Input
python ./increase_replication.py --kafka=/opt/kafka_2.11-1.1.0 --zookeeper=localhost:12181,localhost:22181,localhost:32181 --topic=bank-balance-application-bank-balance-agg-changelog --operation=newconfig --brokers=1,2,3
Output
{"version":1,"partitions":[{"topic":"alerts","partition":0,"replicas":[3,1,2]},{"topic":"alerts","partition":1,"replicas":[2,1,3]},{"topic":"alerts","partition":2,"replicas":[1,2,3]},{"topic":"alerts","partition":3,"replicas":[3,1,2]},{"topic":"alerts","partition":4,"replicas":[2,1,3]},{"topic":"alerts","partition":5,"replicas":[1,2,3]},{"topic":"alerts","partition":6,"replicas":[3,1,2]},{"topic":"alerts","partition":7,"replicas":[2,1,3]},{"topic":"alerts","partition":8,"replicas":[1,2,3]}
the above will create a JSON file from the output of the above response which will be used to increase new replicas( input file for next step)
Execute New Configuration (–increasereplica)
Input
python ./increase_replication.py --kafka=/opt/kafka_2.11-1.1.0 --zookeeper=localhost:12181,localhost:22181,localhost:32181 --topic=bank-balance-application-bank-balance-agg-changelog --operation=increasereplica --replicafile=/tmp/new_config.json
Output
{"version":1,"partitions":[{"topic":"alerts","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"alerts","partition":0,"replicas":[3],"log_dirs":["any"]},{"topic":"alerts","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"alerts","partition":3,"replicas":[3],"log_dirs":["any"]},{"topic":"alerts","partition":7,"replicas":[2],"log_dirs":["any"]},{"topic":"alerts","partition":4,"replicas":[2],"log_dirs":["any"]},{"topic":"alerts","partition":8,"replicas":[1],"log_dirs":["any"]},{"topic":"alerts","partition":6,"replicas":[3],"log_dirs":["any"]},{"topic":"alerts","partition":5,"replicas":[1],"log_dirs":["any"]}]}
Save this to use as the –reassignment-json-file option during rollback
Successfully started reassignment of partitions.
Verify Configuration (–verify)
Input
python ./increase_replication.py --kafka=/opt/kafka_2.11-1.1.0 --zookeeper=localhost:12181,localhost:22181,localhost:32181 --topic=bank-balance-application-bank-balance-agg-changelog --operation=verify --replicafile=/tmp/new_config.json
Output
Reassignment of partition alerts-1 completed successfully Reassignment of partition alerts-0 completed successfully Reassignment of partition alerts-2 completed successfully Reassignment of partition alerts-3 completed successfully Reassignment of partition alerts-7 completed successfully Reassignment of partition alerts-4 completed successfully Reassignment of partition alerts-8 completed successfully Reassignment of partition alerts-6 completed successfully Reassignment of partition alerts-5 completed successfully
Testing
We can verify the same using Kafka manager

References
- https://stackoverflow.com/questions/37960767/how-to-change-the-replicas-of-kafka-topic
- https://kafka.apache.org/documentation/#basic_ops_increase_replication_factor
- https://www.ibm.com/support/knowledgecenter/en/SSCVHB_1.2.0/admin/tnpi_reassign_partitions.html
Code
The code is available in GitHub: