Distributed Consensus and Leader Election

Type: Software Reference Confidence: 0.92 Sources: 7 Verified: 2026-02-23 Freshness: stable

TL;DR

Constraints

Quick Reference

AlgorithmOriginLeader-BasedLeader ElectionMessage TypesUnderstandabilityKey Implementations
RaftStanford, 2014YesRandomized timeout + RequestVote RPC4Highetcd, Consul, CockroachDB, TiKV
PaxosLamport, 1989Optional (Multi-Paxos)Any proposer can propose4+LowGoogle Chubby, Spanner, Cassandra LWT
ZABYahoo, 2011YesRound-robin + epoch number10MediumApache ZooKeeper (exclusive)
Viewstamped ReplicationMIT, 1988YesRound-robin (next in config)10MediumResearch implementations
PropertyRaftPaxosZABViewstamped Replication
Fault tolerancef failures with 2f+1 nodesf failures with 2f+1 nodesf failures with 2f+1 nodesf failures with 2f+1 nodes
Ordering guaranteeTotal order (per-term log)Per-slot (Multi-Paxos for total)Total order (FIFO + causal)Total order
Stable storageRequiredRequiredRequiredNot required
ReconfigurationJoint consensus (online)Separate protocolStop-the-worldStop-the-world
Latency (LAN)1-2ms2-4ms1-2ms1-2ms
Best forGeneral purpose, new projectsExisting Google/academic systemsZooKeeper coordinationResearch / education

Decision Tree

START -- What do you need consensus for?
|-- Leader election / distributed lock only?
|   |-- YES --> Use etcd lease-based election or ZooKeeper ephemeral znodes
|   +-- NO (need replicated state machine) |
|-- Building a new distributed system from scratch?
|   |-- YES --> Use Raft (most implementations, best docs, easiest to understand)
|   |   |-- Go: hashicorp/raft or etcd/raft
|   |   |-- Java: Apache Ratis, microraft
|   |   +-- Rust: tikv/raft-rs
|   +-- NO (integrating with existing system) |
|-- Already using ZooKeeper for coordination?
|   |-- YES --> Stay with ZAB -- do not add a second consensus system
|   +-- NO |
|-- Need multi-datacenter / WAN consensus?
|   |-- YES --> etcd with tuned timeouts or CockroachDB (multi-region Raft)
|   +-- NO |
+-- DEFAULT --> etcd (Raft) for coordination, Consul (Raft) for service discovery

Step-by-Step Guide

1. Choose cluster size based on failure tolerance

The fundamental tradeoff is between fault tolerance and write latency. Every write must reach a quorum (majority), so more nodes means higher latency. [src1]

Nodes  |  Quorum  |  Tolerates  |  Write latency (LAN)
  3    |    2     |   1 failure |  ~1-2ms
  5    |    3     |   2 failures|  ~2-3ms
  7    |    4     |   3 failures|  ~3-5ms (diminishing returns)

Verify: For most production systems, 5 nodes is the sweet spot -- tolerates 2 simultaneous failures (rolling upgrade + 1 unexpected failure).

2. Deploy an etcd cluster with proper configuration

etcd is the most widely deployed Raft implementation (used by Kubernetes). Configure with explicit peer URLs and tuned timeouts. [src3]

# Node 1 (repeat for each node with appropriate names and URLs)
etcd --name node1 \
  --initial-advertise-peer-urls http://10.0.0.1:2380 \
  --listen-peer-urls http://10.0.0.1:2380 \
  --listen-client-urls http://10.0.0.1:2379,http://127.0.0.1:2379 \
  --advertise-client-urls http://10.0.0.1:2379 \
  --initial-cluster "node1=http://10.0.0.1:2380,node2=http://10.0.0.2:2380,node3=http://10.0.0.3:2380" \
  --initial-cluster-token my-cluster \
  --initial-cluster-state new \
  --heartbeat-interval 100 \
  --election-timeout 1000

Verify: etcdctl endpoint status --cluster -w table -- all nodes should show IS LEADER for exactly one node and RAFT TERM should be identical.

3. Implement leader election using etcd leases

Leader election in etcd uses leases (TTL-based locks). A node acquires a lease, campaigns for election on a key, and keeps the lease alive. If the leader crashes, the lease expires and another node wins. [src3]

# pip install etcd3==0.12.0
import etcd3
import time
import socket

client = etcd3.client(host='10.0.0.1', port=2379)
hostname = socket.gethostname()
lease = client.lease(ttl=10)

# Campaign: put_if_absent via transaction
success, _ = client.transaction(
    compare=[client.transactions.create('/leader/my-service') == 0],
    success=[client.transactions.put('/leader/my-service', hostname, lease)],
    failure=[]
)

if success:
    print(f"{hostname} is the leader")
    while True:
        lease.refresh()
        time.sleep(3)
else:
    print(f"{hostname} is a follower, watching for leader changes")
    events_iterator, cancel = client.watch('/leader/my-service')
    for event in events_iterator:
        if isinstance(event, etcd3.events.DeleteEvent):
            print("Leader lost, attempting to become leader...")
            break

Verify: Kill the leader process, observe that within ~10 seconds (lease TTL) another node takes over leadership.

4. Understand the Raft log replication flow

Every state change goes through the leader and is replicated to a majority before being committed. This is the core of Raft's safety guarantee. [src1]

Client           Leader           Follower-1       Follower-2
  |                |                  |                |
  |-- write(x=5) ->|                  |                |
  |                |-- AppendEntries ->|                |
  |                |-- AppendEntries ------------------>|
  |                |<-- ack -----------|                |
  |                |  (quorum: leader + follower-1 = 2/3)
  |                |  COMMIT (apply to state machine)  |
  |<-- ok ---------|                  |                |
  |                |-- AppendEntries(commit_index) ---->|
  |                |  (follower-2 catches up)          |

Verify: Use etcdctl put foo bar on the leader, then etcdctl get foo on any follower -- the value should be consistent within milliseconds.

5. Configure split-brain prevention with fencing tokens

Fencing tokens (epoch numbers) prevent stale leaders from corrupting state after a network partition heals. Every write includes the leader's term/epoch; followers reject writes from old terms. [src7]

def write_with_fencing(client, key, value, expected_epoch):
    """Write only succeeds if our epoch matches the current leader epoch."""
    success, _ = client.transaction(
        compare=[
            client.transactions.mod('/leader/epoch') == expected_epoch
        ],
        success=[client.transactions.put(key, value)],
        failure=[]
    )
    if not success:
        raise StaleLeaderError("Epoch changed -- another leader took over")
    return True

Verify: Simulate a network partition (iptables drop), observe that the old leader's writes are rejected when the partition heals.

Code Examples

Python: Leader Election with etcd

Full script: python-leader-election-with-etcd.py (55 lines)

# Input:  etcd cluster endpoint, service name
# Output: Elected leader performs work; followers watch and take over on failure
# Deps:   pip install etcd3==0.12.0
import etcd3
import time
# ... (see full script)

Go: Raft-Based Configuration Store

Full script: go-raft-based-configuration-store.go (50 lines)

// Input:  Raft cluster with 3+ nodes
// Output: Strongly consistent key-value store
// Deps:   go get go.etcd.io/etcd/client/[email protected]
package main
import (
// ... (see full script)

Java: ZooKeeper Leader Election

Full script: java-zookeeper-leader-election.java (50 lines)

// Input:  ZooKeeper cluster connection string
// Output: Leader election using ephemeral sequential znodes
// Deps:   org.apache.zookeeper:zookeeper:3.9.2
import org.apache.zookeeper.*;
import java.util.Collections;
// ... (see full script)

Anti-Patterns

Wrong: Even number of consensus nodes

# BAD -- 4 nodes requires quorum of 3.
# Tolerates only 1 failure (same as 3 nodes), but costs an extra node.
etcd_cluster:
  nodes: [node1, node2, node3, node4]  # 4 nodes = waste of money
  # Quorum: ceil(4/2) + 1 = 3
  # Fault tolerance: 4 - 3 = 1 (same as 3-node cluster!)

Correct: Always use odd cluster sizes

# GOOD -- 3 or 5 nodes. Every node contributes to fault tolerance.
etcd_cluster:
  nodes: [node1, node2, node3, node4, node5]  # 5 nodes
  # Quorum: 3
  # Fault tolerance: 2 (one more than 4-node cluster, worth the extra node)

Wrong: Using wall-clock time for leader election timeout

# BAD -- system clock skew between nodes causes simultaneous elections,
# leading to split votes and extended unavailability.
import time
ELECTION_TIMEOUT = 1.0  # Fixed 1 second for all nodes
if time.time() - last_heartbeat > ELECTION_TIMEOUT:
    start_election()

Correct: Randomized election timeouts to prevent split votes

# GOOD -- Raft uses randomized timeouts (150-300ms range).
# Different nodes timeout at different times, so one wins first.
import random
ELECTION_TIMEOUT_MIN = 150  # ms
ELECTION_TIMEOUT_MAX = 300  # ms

def get_election_timeout():
    return random.randint(ELECTION_TIMEOUT_MIN, ELECTION_TIMEOUT_MAX) / 1000.0

if elapsed_since_heartbeat > get_election_timeout():
    start_election()

Wrong: Reading from followers without understanding consistency

# BAD -- follower may be behind the leader.
# Client reads stale data, makes a decision, writes back -- lost update.
value = follower_client.get("/config/feature-flag")  # might be stale!

Correct: Use linearizable reads for consistency-critical operations

# GOOD -- linearizable read goes through the leader.
# Guarantees freshest value.
value = client.get("/config/feature-flag",
                   serializable=False)  # default: linearizable via leader
# For read-heavy workloads where slight staleness is OK:
cached_value = client.get("/config/feature-flag",
                          serializable=True)  # fast, may be slightly stale

Wrong: No fencing after leader failover

# BAD -- old leader (zombie) wakes up after GC pause,
# still thinks it is leader, writes conflicting data.
def process_as_leader():
    while self.is_leader:  # stale boolean!
        result = compute_something()
        db.write(result)   # DANGER: another leader may already exist

Correct: Fencing tokens / epoch validation on every write

# GOOD -- every write includes the leader's epoch/term number.
# Storage layer rejects writes from old epochs.
def process_as_leader():
    while True:
        try:
            current_epoch = etcd.get("/leader/epoch")
            result = compute_something()
            etcd.transaction(
                compare=[etcd.transactions.value("/leader/epoch") == current_epoch],
                success=[etcd.transactions.put("/data/result", result)],
                failure=[]  # Abort: another leader took over
            )
        except StaleEpochError:
            demote_to_follower()
            break

Common Pitfalls

Diagnostic Commands

# Check etcd cluster health
etcdctl endpoint health --cluster -w table

# Verify leader and Raft term consistency
etcdctl endpoint status --cluster -w table

# Check etcd member list and IDs
etcdctl member list -w table

# Monitor leader elections in real-time
etcdctl watch --prefix /leader/

# Check Raft log compaction and snapshot status
etcdctl alarm list

# ZooKeeper: check cluster status
echo stat | nc localhost 2181

# ZooKeeper: check if node is leader or follower
echo srvr | nc localhost 2181 | grep Mode

# Check etcd disk latency (must be <10ms for consensus performance)
etcdctl check perf --load="s"

Version History & Compatibility

ImplementationCurrent VersionKey ChangesNotes
etcd3.5.x (stable)v3.4: gRPC gateway; v3.5: improved compaction, tracingKubernetes default; use v3.5+ for production
ZooKeeper3.9.xv3.8: TLS, JWT auth; v3.9: performance improvementsMature but heavier than etcd; mainly for existing Kafka/Hadoop ecosystems
Consul1.19.xv1.16: sameness groups; v1.18: catalog v2Best for service discovery + consensus combo
hashicorp/raft (Go)1.7.xv1.6: pre-vote protocol; v1.7: improved snapshotsFor building custom Raft-based systems in Go
Apache Ratis (Java)3.1.xv3.0: streaming API; v3.1: async supportFor building custom Raft-based systems in Java

When to Use / When Not to Use

Use WhenDon't Use WhenUse Instead
Need strong consistency across nodes (linearizable reads/writes)Eventual consistency is acceptable (DNS caches, social media likes)Gossip protocol, CRDTs, DynamoDB-style quorum reads
Coordinating leader election for stateful servicesStateless services that can run on any node independentlyLoad balancer round-robin (no coordination needed)
Distributed configuration store (feature flags, routing tables)Configuration rarely changes and can be deployed via config filesStatic config files + CI/CD deployment
Distributed locking (exactly-once processing, mutex)Lock contention is low and operations are idempotentOptimistic concurrency control (compare-and-swap)
Cluster membership and failure detectionMonitoring external services for healthHealth check systems (Prometheus, Nagios)
Metadata management for distributed databasesStoring actual application data at high throughputPurpose-built databases (PostgreSQL, Cassandra)

Important Caveats

Related Units