Skip to content
Home » Leader Election in Zookeeper

Leader Election in Zookeeper

  • by

Recently encountered a use case in production where three distributed docker container were doing the same task where only one was supposed to do that.

For example there was a job of pushing some content of a csv file to DB. Some third party was regularly writing this csv to the directory. All the containers were receiving the file and updating database.

But the data in database was looking redundant.

Used Apache curator Leader election recipe to solve this case.

How Leader Election happens

  1. Each Contender creates a Ephemeral Sequential Node in Zookeeper
  2. Node with least value becomes leader
  3. When leader dies, the node gets removed and new node with lowest sequence get elected

Code

Here using two classes which will be showcasing the leader election code

  • Leader.java:  Takes care or registering Leader
  • LeaderListener.java: Whenever a leader get elected or leadership is revoked due to connection issues
public class Leader {

	private final static Logger log = LoggerFactory.getLogger(Leader.class);
	
	//At this path the leader election will take place
	//Zookeeper will create and ephemeral sequential node under this path
	private static String leaderSelectorPath = "/WBLeaderSelect";

	private LeaderListener leaderListener;
	public void init(String name, Consumer<Boolean> consumer) throws Exception {
		log.info("Leader Selection initiated");
		//Apache Library
		CuratorFramework client = Curator.curatorFramework();
		//Initialize the leadershiplistener
		leaderListener = new LeaderListener(name, consumer, client, leaderSelectorPath);
		leaderListener.start();
	}
	
	public boolean isLeader() {
		return leaderListener.isLeader();
	}
}

Second class is listener which will grant or invoke leadership

public class LeaderListener extends LeaderSelectorListenerAdapter implements Closeable {
	private final static Logger log = LoggerFactory.getLogger(LeaderListener.class);

	private final AtomicInteger leaderCount = new AtomicInteger(1);
	private LeaderSelector leaderSelector;
	private CountDownLatch countDownLatch = new CountDownLatch(1);
	private Consumer<Boolean> consumer;
	private String name;

	public LeaderListener(String name, Consumer<Boolean> consumer, CuratorFramework client, String path) {
		this.consumer = consumer;
		this.name = name;
		leaderSelector = new LeaderSelector(client, path, this);
		leaderSelector.autoRequeue();
	}

	public void start() throws IOException {
		leaderSelector.start();
	}

	@Override
	public void close() throws IOException {
		leaderSelector.close();
	}
	
	public boolean isLeader() {
		if(leaderSelector != null) {
			return leaderSelector.hasLeadership();
		}
		
		return false;
	}

	@Override
	public void stateChanged(CuratorFramework client, ConnectionState newState) {
		if ((newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST)) {
			log.info("Leadership revoked from ``{}`` ", this.name);
			countDownLatch.countDown();
			consumer.accept(false);
		} else if ((newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED)) {
			log.info("client `{}`. Attempting for leadership ``{}`` ",newState,  this.name);
			countDownLatch = new CountDownLatch(1);
		}
	}
	@Override
	public void takeLeadership(CuratorFramework client) throws Exception {
		log.info("Leader is ``{}`` for {} time ", this.name, this.leaderCount.getAndIncrement());
		try {
			new Thread(() -> {
				consumer.accept(true);
			}, "Leader-Child-Thread").start();
		} catch (Exception e) {
			log.error("Error starting thread, ", e.getMessage());
		}
		countDownLatch.await();
		log.debug("{} is not a Leader anymore  ", this.name);
	}
}

Complete Code is available at github

Leave a Reply

Your email address will not be published. Required fields are marked *