Skip to content

How to Increase Replication Factor for a Kafka topic

Have you noticed that some topics in your Kafka cluster have a suboptimal replication factor? This could be due to initial setup, low traffic, or the product being in its early stages.

As your data grows, it’s important to ensure durability and scalability of your Kafka cluster. Our aim is to make every message stored on at least N (e.g. 3) brokers, for better resiliency and availability.

This blog post will guide you through increasing the replication factor for pre-existing Kafka topics. Let’s take a look at an example where we need to increase the replication factor for some existing topics.

Topic NameCurrent Replication FactorNew Replication Factor
_consumer_offsets13
alerts13
bank-balance13

Increase the replication factor for a topic is a 4 step process

  • Check existing partition assignment
  • Create a custom reassignment plan ( a JSON file)
  • Do the reassignment process
  • Verify the assignments

Approach

  We will be creating a script that uses existing Kafka scripts to Generate, Execute & Verify the tasks. scripts are as below

  • kafka-reassign-partitions.sh
  • kafka-topics.sh
OptionDescription
–kafka
Kafka installation directory
–zookeeper  The connection string for zookeeper
–topicThe topic name for which replicas need to be altered
–operation

What operation to perform,
Possible values are :
* currentconfig
* newconfig
* increasereplica
* verify
–brokers
a new set of brokers
–replicafileOptional
File will be used to provide new replica configuration, can only be used with increasereplica and verify operations.
if provided will use this else will expect file in the following format in tmp directory
/tmp/<<TOPIC NAME>>_replica_change.json

Check Existing Configuration ( –currentconfig)       

Input         
[python] python ./increase_replication.py –kafka=/opt/kafka –zookeeper=localhost:2181 –topic=alerts –operation=currentconfig [/python]
Output
[java]{"alerts":{"Topic":"alerts","PartitionCount":"9","ReplicationFactor":"1","Configs":{"cleanup.policy":"compact"},"partitions":[{"topic":"alerts","partition":0,"replicas":[3]},{"topic":"alerts","partition":1,"replicas":[2]},{"topic":"alerts","partition":2,"replicas":[1]},{"topic":"alerts","partition":3,"replicas":[3]},{"topic":"alerts","partition":4,"replicas":[2]},{"topic":"alerts","partition":5,"replicas":[1]},{"topic":"alerts","partition":6,"replicas":[3]},{"topic":"alerts","partition":7,"replicas":[2]},{"topic":"alerts","partition":8,"replicas":[1]}]}}[/java]

Generate New Configuration ( –newconfig)             

Input     
[python] python ./increase_replication.py –kafka=/opt/kafka_2.11-1.1.0 –zookeeper=localhost:12181,localhost:22181,localhost:32181 –topic=bank-balance-application-bank-balance-agg-changelog –operation=newconfig –brokers=1,2,3 [/python]
Output     
[java]{"version":1,"partitions":[{"topic":"alerts","partition":0,"replicas":[3,1,2]},{"topic":"alerts","partition":1,"replicas":[2,1,3]},{"topic":"alerts","partition":2,"replicas":[1,2,3]},{"topic":"alerts","partition":3,"replicas":[3,1,2]},{"topic":"alerts","partition":4,"replicas":[2,1,3]},{"topic":"alerts","partition":5,"replicas":[1,2,3]},{"topic":"alerts","partition":6,"replicas":[3,1,2]},{"topic":"alerts","partition":7,"replicas":[2,1,3]},{"topic":"alerts","partition":8,"replicas":[1,2,3]}[/java]

the above will create a JSON file from the output of the above response which will be used to increase new replicas( input file for next step)

Execute New Configuration (–increasereplica) 

Input
[python] python ./increase_replication.py –kafka=/opt/kafka_2.11-1.1.0 –zookeeper=localhost:12181,localhost:22181,localhost:32181 –topic=bank-balance-application-bank-balance-agg-changelog –operation=increasereplica –replicafile=/tmp/new_config.json [/python]
Output
[java] {"version":1,"partitions":[{"topic":"alerts","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"alerts","partition":0,"replicas":[3],"log_dirs":["any"]},{"topic":"alerts","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"alerts","partition":3,"replicas":[3],"log_dirs":["any"]},{"topic":"alerts","partition":7,"replicas":[2],"log_dirs":["any"]},{"topic":"alerts","partition":4,"replicas":[2],"log_dirs":["any"]},{"topic":"alerts","partition":8,"replicas":[1],"log_dirs":["any"]},{"topic":"alerts","partition":6,"replicas":[3],"log_dirs":["any"]},{"topic":"alerts","partition":5,"replicas":[1],"log_dirs":["any"]}]} [/java]

Save this to use as the –reassignment-json-file option during rollback
Successfully started reassignment of partitions.

Verify Configuration (–verify)

Input
[python] python ./increase_replication.py –kafka=/opt/kafka_2.11-1.1.0 –zookeeper=localhost:12181,localhost:22181,localhost:32181 –topic=bank-balance-application-bank-balance-agg-changelog –operation=verify –replicafile=/tmp/new_config.json [/python]
Output
[java] Reassignment of partition alerts-1 completed successfully Reassignment of partition alerts-0 completed successfully Reassignment of partition alerts-2 completed successfully Reassignment of partition alerts-3 completed successfully Reassignment of partition alerts-7 completed successfully Reassignment of partition alerts-4 completed successfully Reassignment of partition alerts-8 completed successfully Reassignment of partition alerts-6 completed successfully Reassignment of partition alerts-5 completed successfully [/java]

Testing

We can verify the same using Kafka manager

Kafka Increasing Replication Factor
Kafka Increasing Replication Factor
References

Code 

The code is available in GitHub:

Leave a Reply

Your email address will not be published. Required fields are marked *