Distributed Consensus and Leader Election
How do distributed consensus and leader election work?
TL;DR
- Bottom line: Distributed consensus algorithms (Raft, Paxos, ZAB) allow a cluster of nodes to agree on a single value or sequence of values despite failures, by electing a leader and replicating a log to a quorum (majority) of nodes before committing.
- Key tool/command:
etcdctl electfor leader election;etcdctl put/getfor consensus-backed key-value operations - Watch out for: Split brain -- running an even number of nodes or misconfiguring quorum so two partitions both believe they have a leader, causing divergent state.
- Works with: etcd (Raft), ZooKeeper (ZAB), Consul (Raft), CockroachDB (Raft), TiKV (Raft). Language-agnostic via gRPC/HTTP client libraries.
Constraints
- A consensus cluster requires an odd number of nodes (3, 5, 7) to tolerate f failures with 2f+1 nodes -- even-numbered clusters waste a node
- Consensus is impossible in a purely asynchronous system with even one crash failure (FLP impossibility) -- all practical algorithms use timeouts to circumvent this
- A quorum (majority) of nodes must be alive and reachable for the cluster to make progress -- losing majority means the cluster halts
- Leader election timeout must be significantly longer than heartbeat interval (typically 10x) to avoid unnecessary elections under normal network jitter
- Never run consensus across WAN without tuning election timeouts to account for cross-region latency (150-300ms RTT) -- default timeouts assume LAN
- Consensus is not a distributed transaction -- it replicates a log, not multi-shard ACID commits
Quick Reference
| Algorithm | Origin | Leader-Based | Leader Election | Message Types | Understandability | Key Implementations |
|---|---|---|---|---|---|---|
| Raft | Stanford, 2014 | Yes | Randomized timeout + RequestVote RPC | 4 | High | etcd, Consul, CockroachDB, TiKV |
| Paxos | Lamport, 1989 | Optional (Multi-Paxos) | Any proposer can propose | 4+ | Low | Google Chubby, Spanner, Cassandra LWT |
| ZAB | Yahoo, 2011 | Yes | Round-robin + epoch number | 10 | Medium | Apache ZooKeeper (exclusive) |
| Viewstamped Replication | MIT, 1988 | Yes | Round-robin (next in config) | 10 | Medium | Research implementations |
| Property | Raft | Paxos | ZAB | Viewstamped Replication |
|---|---|---|---|---|
| Fault tolerance | f failures with 2f+1 nodes | f failures with 2f+1 nodes | f failures with 2f+1 nodes | f failures with 2f+1 nodes |
| Ordering guarantee | Total order (per-term log) | Per-slot (Multi-Paxos for total) | Total order (FIFO + causal) | Total order |
| Stable storage | Required | Required | Required | Not required |
| Reconfiguration | Joint consensus (online) | Separate protocol | Stop-the-world | Stop-the-world |
| Latency (LAN) | 1-2ms | 2-4ms | 1-2ms | 1-2ms |
| Best for | General purpose, new projects | Existing Google/academic systems | ZooKeeper coordination | Research / education |
Decision Tree
START -- What do you need consensus for?
|-- Leader election / distributed lock only?
| |-- YES --> Use etcd lease-based election or ZooKeeper ephemeral znodes
| +-- NO (need replicated state machine) |
|-- Building a new distributed system from scratch?
| |-- YES --> Use Raft (most implementations, best docs, easiest to understand)
| | |-- Go: hashicorp/raft or etcd/raft
| | |-- Java: Apache Ratis, microraft
| | +-- Rust: tikv/raft-rs
| +-- NO (integrating with existing system) |
|-- Already using ZooKeeper for coordination?
| |-- YES --> Stay with ZAB -- do not add a second consensus system
| +-- NO |
|-- Need multi-datacenter / WAN consensus?
| |-- YES --> etcd with tuned timeouts or CockroachDB (multi-region Raft)
| +-- NO |
+-- DEFAULT --> etcd (Raft) for coordination, Consul (Raft) for service discovery
Step-by-Step Guide
1. Choose cluster size based on failure tolerance
The fundamental tradeoff is between fault tolerance and write latency. Every write must reach a quorum (majority), so more nodes means higher latency. [src1]
Nodes | Quorum | Tolerates | Write latency (LAN)
3 | 2 | 1 failure | ~1-2ms
5 | 3 | 2 failures| ~2-3ms
7 | 4 | 3 failures| ~3-5ms (diminishing returns)
Verify: For most production systems, 5 nodes is the sweet spot -- tolerates 2 simultaneous failures (rolling upgrade + 1 unexpected failure).
2. Deploy an etcd cluster with proper configuration
etcd is the most widely deployed Raft implementation (used by Kubernetes). Configure with explicit peer URLs and tuned timeouts. [src3]
# Node 1 (repeat for each node with appropriate names and URLs)
etcd --name node1 \
--initial-advertise-peer-urls http://10.0.0.1:2380 \
--listen-peer-urls http://10.0.0.1:2380 \
--listen-client-urls http://10.0.0.1:2379,http://127.0.0.1:2379 \
--advertise-client-urls http://10.0.0.1:2379 \
--initial-cluster "node1=http://10.0.0.1:2380,node2=http://10.0.0.2:2380,node3=http://10.0.0.3:2380" \
--initial-cluster-token my-cluster \
--initial-cluster-state new \
--heartbeat-interval 100 \
--election-timeout 1000
Verify: etcdctl endpoint status --cluster -w table -- all nodes should show IS LEADER for exactly one node and RAFT TERM should be identical.
3. Implement leader election using etcd leases
Leader election in etcd uses leases (TTL-based locks). A node acquires a lease, campaigns for election on a key, and keeps the lease alive. If the leader crashes, the lease expires and another node wins. [src3]
# pip install etcd3==0.12.0
import etcd3
import time
import socket
client = etcd3.client(host='10.0.0.1', port=2379)
hostname = socket.gethostname()
lease = client.lease(ttl=10)
# Campaign: put_if_absent via transaction
success, _ = client.transaction(
compare=[client.transactions.create('/leader/my-service') == 0],
success=[client.transactions.put('/leader/my-service', hostname, lease)],
failure=[]
)
if success:
print(f"{hostname} is the leader")
while True:
lease.refresh()
time.sleep(3)
else:
print(f"{hostname} is a follower, watching for leader changes")
events_iterator, cancel = client.watch('/leader/my-service')
for event in events_iterator:
if isinstance(event, etcd3.events.DeleteEvent):
print("Leader lost, attempting to become leader...")
break
Verify: Kill the leader process, observe that within ~10 seconds (lease TTL) another node takes over leadership.
4. Understand the Raft log replication flow
Every state change goes through the leader and is replicated to a majority before being committed. This is the core of Raft's safety guarantee. [src1]
Client Leader Follower-1 Follower-2
| | | |
|-- write(x=5) ->| | |
| |-- AppendEntries ->| |
| |-- AppendEntries ------------------>|
| |<-- ack -----------| |
| | (quorum: leader + follower-1 = 2/3)
| | COMMIT (apply to state machine) |
|<-- ok ---------| | |
| |-- AppendEntries(commit_index) ---->|
| | (follower-2 catches up) |
Verify: Use etcdctl put foo bar on the leader, then etcdctl get foo on any follower -- the value should be consistent within milliseconds.
5. Configure split-brain prevention with fencing tokens
Fencing tokens (epoch numbers) prevent stale leaders from corrupting state after a network partition heals. Every write includes the leader's term/epoch; followers reject writes from old terms. [src7]
def write_with_fencing(client, key, value, expected_epoch):
"""Write only succeeds if our epoch matches the current leader epoch."""
success, _ = client.transaction(
compare=[
client.transactions.mod('/leader/epoch') == expected_epoch
],
success=[client.transactions.put(key, value)],
failure=[]
)
if not success:
raise StaleLeaderError("Epoch changed -- another leader took over")
return True
Verify: Simulate a network partition (iptables drop), observe that the old leader's writes are rejected when the partition heals.
Code Examples
Python: Leader Election with etcd
Full script: python-leader-election-with-etcd.py (55 lines)
# Input: etcd cluster endpoint, service name
# Output: Elected leader performs work; followers watch and take over on failure
# Deps: pip install etcd3==0.12.0
import etcd3
import time
# ... (see full script)
Go: Raft-Based Configuration Store
Full script: go-raft-based-configuration-store.go (50 lines)
// Input: Raft cluster with 3+ nodes
// Output: Strongly consistent key-value store
// Deps: go get go.etcd.io/etcd/client/[email protected]
package main
import (
// ... (see full script)
Java: ZooKeeper Leader Election
Full script: java-zookeeper-leader-election.java (50 lines)
// Input: ZooKeeper cluster connection string
// Output: Leader election using ephemeral sequential znodes
// Deps: org.apache.zookeeper:zookeeper:3.9.2
import org.apache.zookeeper.*;
import java.util.Collections;
// ... (see full script)
Anti-Patterns
Wrong: Even number of consensus nodes
# BAD -- 4 nodes requires quorum of 3.
# Tolerates only 1 failure (same as 3 nodes), but costs an extra node.
etcd_cluster:
nodes: [node1, node2, node3, node4] # 4 nodes = waste of money
# Quorum: ceil(4/2) + 1 = 3
# Fault tolerance: 4 - 3 = 1 (same as 3-node cluster!)
Correct: Always use odd cluster sizes
# GOOD -- 3 or 5 nodes. Every node contributes to fault tolerance.
etcd_cluster:
nodes: [node1, node2, node3, node4, node5] # 5 nodes
# Quorum: 3
# Fault tolerance: 2 (one more than 4-node cluster, worth the extra node)
Wrong: Using wall-clock time for leader election timeout
# BAD -- system clock skew between nodes causes simultaneous elections,
# leading to split votes and extended unavailability.
import time
ELECTION_TIMEOUT = 1.0 # Fixed 1 second for all nodes
if time.time() - last_heartbeat > ELECTION_TIMEOUT:
start_election()
Correct: Randomized election timeouts to prevent split votes
# GOOD -- Raft uses randomized timeouts (150-300ms range).
# Different nodes timeout at different times, so one wins first.
import random
ELECTION_TIMEOUT_MIN = 150 # ms
ELECTION_TIMEOUT_MAX = 300 # ms
def get_election_timeout():
return random.randint(ELECTION_TIMEOUT_MIN, ELECTION_TIMEOUT_MAX) / 1000.0
if elapsed_since_heartbeat > get_election_timeout():
start_election()
Wrong: Reading from followers without understanding consistency
# BAD -- follower may be behind the leader.
# Client reads stale data, makes a decision, writes back -- lost update.
value = follower_client.get("/config/feature-flag") # might be stale!
Correct: Use linearizable reads for consistency-critical operations
# GOOD -- linearizable read goes through the leader.
# Guarantees freshest value.
value = client.get("/config/feature-flag",
serializable=False) # default: linearizable via leader
# For read-heavy workloads where slight staleness is OK:
cached_value = client.get("/config/feature-flag",
serializable=True) # fast, may be slightly stale
Wrong: No fencing after leader failover
# BAD -- old leader (zombie) wakes up after GC pause,
# still thinks it is leader, writes conflicting data.
def process_as_leader():
while self.is_leader: # stale boolean!
result = compute_something()
db.write(result) # DANGER: another leader may already exist
Correct: Fencing tokens / epoch validation on every write
# GOOD -- every write includes the leader's epoch/term number.
# Storage layer rejects writes from old epochs.
def process_as_leader():
while True:
try:
current_epoch = etcd.get("/leader/epoch")
result = compute_something()
etcd.transaction(
compare=[etcd.transactions.value("/leader/epoch") == current_epoch],
success=[etcd.transactions.put("/data/result", result)],
failure=[] # Abort: another leader took over
)
except StaleEpochError:
demote_to_follower()
break
Common Pitfalls
- Split brain with even cluster sizes: Running 2 or 4 nodes creates ambiguity about which partition has the majority during a network split. Fix: always deploy odd-numbered clusters (3, 5, 7). [src1]
- Election storms from aggressive timeouts: If election timeout is too close to heartbeat interval, transient network delays trigger unnecessary elections, causing cascading failures. Fix: set election timeout to 10x heartbeat interval minimum. [src1]
- WAN latency causing constant re-elections: Default etcd election timeout (1000ms) is too short for cross-datacenter deployments with 150-300ms RTT. Fix: tune
--election-timeoutto at least 5x the maximum RTT. [src3] - Split brain from zombie leaders: A leader pauses (GC, swap, CPU starvation), loses its lease, another leader is elected, then the old leader resumes and writes conflicting data. Fix: use fencing tokens (epoch numbers) validated on every write. [src2] [src7]
- Thundering herd on leader failure: All followers detect leader loss simultaneously and campaign at the same time, causing split votes and extended unavailability. Fix: Raft's randomized election timeout solves this -- ensure your implementation uses randomization. [src1]
- Data loss from non-durable writes: Writing to the Raft log without fsync means a power failure can lose committed entries. Fix: ensure WAL (Write-Ahead Log) is synced to disk before acknowledging writes. [src1]
- Quorum loss from correlated failures: Placing all consensus nodes in the same rack/AZ means a single power or network failure takes out the quorum. Fix: distribute nodes across failure domains. [src2]
- Confusing consensus with distributed transactions: Consensus replicates a single log across nodes; distributed transactions coordinate writes across multiple independent data stores. Fix: use consensus for state machine replication, 2PC/Saga for cross-service transactions. [src2]
Diagnostic Commands
# Check etcd cluster health
etcdctl endpoint health --cluster -w table
# Verify leader and Raft term consistency
etcdctl endpoint status --cluster -w table
# Check etcd member list and IDs
etcdctl member list -w table
# Monitor leader elections in real-time
etcdctl watch --prefix /leader/
# Check Raft log compaction and snapshot status
etcdctl alarm list
# ZooKeeper: check cluster status
echo stat | nc localhost 2181
# ZooKeeper: check if node is leader or follower
echo srvr | nc localhost 2181 | grep Mode
# Check etcd disk latency (must be <10ms for consensus performance)
etcdctl check perf --load="s"
Version History & Compatibility
| Implementation | Current Version | Key Changes | Notes |
|---|---|---|---|
| etcd | 3.5.x (stable) | v3.4: gRPC gateway; v3.5: improved compaction, tracing | Kubernetes default; use v3.5+ for production |
| ZooKeeper | 3.9.x | v3.8: TLS, JWT auth; v3.9: performance improvements | Mature but heavier than etcd; mainly for existing Kafka/Hadoop ecosystems |
| Consul | 1.19.x | v1.16: sameness groups; v1.18: catalog v2 | Best for service discovery + consensus combo |
| hashicorp/raft (Go) | 1.7.x | v1.6: pre-vote protocol; v1.7: improved snapshots | For building custom Raft-based systems in Go |
| Apache Ratis (Java) | 3.1.x | v3.0: streaming API; v3.1: async support | For building custom Raft-based systems in Java |
When to Use / When Not to Use
| Use When | Don't Use When | Use Instead |
|---|---|---|
| Need strong consistency across nodes (linearizable reads/writes) | Eventual consistency is acceptable (DNS caches, social media likes) | Gossip protocol, CRDTs, DynamoDB-style quorum reads |
| Coordinating leader election for stateful services | Stateless services that can run on any node independently | Load balancer round-robin (no coordination needed) |
| Distributed configuration store (feature flags, routing tables) | Configuration rarely changes and can be deployed via config files | Static config files + CI/CD deployment |
| Distributed locking (exactly-once processing, mutex) | Lock contention is low and operations are idempotent | Optimistic concurrency control (compare-and-swap) |
| Cluster membership and failure detection | Monitoring external services for health | Health check systems (Prometheus, Nagios) |
| Metadata management for distributed databases | Storing actual application data at high throughput | Purpose-built databases (PostgreSQL, Cassandra) |
Important Caveats
- Consensus is a coordination mechanism, not a data store -- do not use etcd/ZooKeeper as a general-purpose database; they are optimized for small amounts of critical metadata (typically <1GB)
- Write throughput is fundamentally limited by disk fsync latency and quorum round-trip time -- expect 10K-50K writes/sec for etcd, not millions
- Network partitions are more common than total node failures in cloud environments -- design for partition tolerance, not just crash tolerance
- Raft's single-leader architecture means all writes go through one node -- this is a throughput bottleneck by design, traded for simplicity and consistency
- Consensus does not solve Byzantine fault tolerance (malicious nodes) -- Raft/Paxos/ZAB assume nodes may crash but never lie; use PBFT or blockchain consensus for adversarial environments
- Adding more nodes beyond 7 provides diminishing returns in fault tolerance while linearly increasing write latency -- 5 nodes is the production sweet spot