system-design.io

Sharding Strategy and Hotspot Mitigation

“Scaling is not just about adding more machines — it’s about being smart about who talks to which machine.”


The Supermarket Analogy: Before We Write a Single Query

Imagine you manage a supermarket with one checkout lane. For the first few months, business is slow, and that single lane handles every customer just fine. Then summer arrives. Lines snake through the snack aisle. Customers are abandoning full carts and leaving angry. You hire more cashiers — but you only have one lane to funnel everyone through. Adding cashiers doesn’t help if the bottleneck is the queue itself.

So you build ten checkout lanes. You assign customers based on their last name: A–C go to Lane 1, D–F to Lane 2, and so on. Problem solved! Except… your town has an unusually high population of people whose last names start with “S.” Lane 8 has a forty-person line. Lanes 1, 3, and 6 are empty. You’ve distributed the load, but you haven’t balanced it. One lane is on fire while others are sleeping.

This, in a nutshell, is the story of database sharding — and the persistent villain that haunts every sharding strategy: the hotspot.


Part 1 — The Single Database: Where Every Story Begins

The Atomic Unit

Before sharding exists, there is a single database. One machine. One data store. Every read, every write, every join — it all goes through the same box.

svgbob
+---------------------------+
|      Application          |
|      Server               |
+---------------------------+
            |
            | ALL reads + writes
            v
+---------------------------+
|    Single Database        |
|    (PostgreSQL / MySQL)   |
|    - Users Table          |
|    - Orders Table         |
|    - Products Table       |
+---------------------------+

In Figure 1.1, notice that every operation — reads, writes, and complex joins — flows into a single machine. This simplicity is a genuine engineering virtue. Transactions are ACID-compliant by default. Debugging is trivial. You can JOIN across any two tables in microseconds because all the data lives on the same disk.

This model works right up until the moment it doesn’t. And when it breaks, it breaks fast.

The Pain Point: The Four Walls of a Single Node

A single database node has four hard limits that sharding is designed to break through:

Constraint What Happens When You Hit It
Storage Disk fills up; data can’t grow beyond one machine’s capacity
Read Throughput Too many concurrent reads overwhelm the CPU and I/O
Write Throughput A single write lock becomes a serial bottleneck
Latency Users far from the single DC experience slow queries

We can throw vertical scaling at this for a while — buy a bigger machine, add more RAM, get faster SSDs. This worked well in the 1990s and early 2000s. But eventually, vertical scaling hits a physical and economic ceiling. A machine with 128 cores and 4 TB of RAM costs an extraordinary amount. You’re paying exponentially for linear gains.

The industry’s answer was horizontal scaling: instead of making one machine bigger, we spread the data across many machines. This is the core idea behind sharding.


Part 2 — What is Sharding?

The Core Idea

Sharding is the process of horizontally partitioning a database into smaller, independent pieces called shards. Each shard holds a distinct subset of the total data and operates as an independent database node. Together, all shards represent the complete dataset.

svgbob
+------------------+
|   Application    |
|   (Shard Router) |
+------------------+
    |       |       |
    v       v       v
+-------+ +-------+ +-------+
|Shard 0| |Shard 1| |Shard 2|
|Users  | |Users  | |Users  |
|0-33k  | |33-66k | |66-99k |
+-------+ +-------+ +-------+

In Figure 2.1, we have split a single “Users” table across three shards based on user_id ranges. A shard router (also called a query router or proxy) sits in front of all shards and directs each query to the correct shard based on the query’s key.

Key Terminology


Part 3 — The Three Core Sharding Strategies

We group sharding strategies by how they decide which shard owns which data. There are three fundamental approaches, and each one is a direct trade-off.

Strategy 1: Range-Based Sharding

The Idea: Assign records to shards based on a continuous range of the shard key’s value.

def get_shard_range(user_id: int, shard_count: int, max_id: int) -> int:
    shard_size = max_id // shard_count
    return user_id // shard_size

# Example: 3 shards, max user_id = 99999
print(get_shard_range(15000, 3, 99999))   # -> Shard 0
print(get_shard_range(50000, 3, 99999))   # -> Shard 1
print(get_shard_range(82000, 3, 99999))   # -> Shard 2

Where range-based sharding shines: Range queries. If your application frequently asks “give me all orders placed between Jan 1 and Jan 31,” range sharding lets you hit a single shard — because sequential timestamps live together. This is exactly why time-series databases like InfluxDB use this approach.

Where it fails — the temporal hotspot:

svgbob
                  Shard 0     Shard 1     Shard 2
                  (Jan)       (Feb)       (Mar)
                 +---------+ +---------+ +---------+
New Year's Day   | ████████| |         | |         |
Traffic spike -> | ████████| |         | |         |
                 | ████████| |         | |         |
                 +---------+ +---------+ +---------+
                  OVERLOADED    idle        idle

In Figure 3.1, a New Year’s Day traffic spike means every new write in January hits Shard 0 exclusively. Shards 1 and 2 are completely idle. This is the temporal hotspot — one of the most common failure modes in production systems using range sharding with timestamps.


Strategy 2: Hash-Based Sharding

The Idea: Apply a deterministic hash function to the shard key and use the result (modulo the shard count) to determine which shard owns the record.

import hashlib

def get_shard_hash(key: str, shard_count: int) -> int:
    hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
    return hash_value % shard_count

users = ["alice", "bob", "charlie", "diana", "eve"]
for user in users:
    shard = get_shard_hash(user, 3)
    print(f"User '{user}' -> Shard {shard}")

The distribution is now pseudo-random. Sequential user IDs no longer map to the same shard. Temporal spikes are scattered across all shards uniformly.

svgbob
User IDs:  1    2    3    4    5    6    7    8    9
           |    |    |    |    |    |    |    |    |
hash(id) % 3:
           v    v    v    v    v    v    v    v    v
           [^2]  [^0]    [^2]  [^0]    [^2]  [^0]
          |    |    |    |    |    |    |    |    |
          v    v    v    v    v    v    v    v    v
        Shard0    Shard1    Shard2   <- even spread

Figure 3.2 shows how hash-based routing scatters consecutive IDs across all three shards.

The Fatal Flaw — Resharding:

# BEFORE: 3 shards
get_shard_hash("alice", 3)    # -> Shard 2

# AFTER adding a 4th shard
get_shard_hash("alice", 4)    # -> Shard 1  <- DIFFERENT SHARD!

When we change shard_count from 3 to 4, almost every record maps to a different shard. For a petabyte-scale system, this is catastrophically expensive. This limitation drove the distributed systems community to invent Consistent Hashing.


Strategy 3: Directory-Based Sharding

The Idea: Maintain an explicit lookup table that maps each key to a specific shard. No mathematical formula — just a direct lookup.

class ShardDirectory:
    def __init__(self):
        self.directory = {}

    def assign(self, key: str, shard_id: int):
        self.directory[key] = shard_id

    def lookup(self, key: str) -> int:
        return self.directory.get(key, hash(key) % 3)

shard_dir = ShardDirectory()
shard_dir.assign("tenant_netflix", 0)
shard_dir.assign("tenant_spotify", 1)
shard_dir.assign("tenant_airbnb", 2)

print(shard_dir.lookup("tenant_netflix"))  # -> 0

Power: Full manual control. A “whale” tenant generating 40% of your traffic can be isolated to its own dedicated shard. Weakness: The directory itself becomes a single point of failure and a performance bottleneck — every query must consult it first.

Strategy Comparison

Strategy Even Distribution Range Queries Easy Resharding Hotspot Control
Range-Based ⚠️ Risky ✅ Excellent ✅ Easy ❌ Temporal hotspots
Hash-Based ✅ Great ❌ Scatter-gather ❌ Costly ✅ Good
Directory-Based ✅ Manual ✅ Possible ✅ Flexible ✅ Full control

Part 4 — Consistent Hashing: The Elegant Fix

The Hash Ring

Imagine a circular ring with positions numbered from 0 to 2^32. We place both nodes and keys onto this ring by hashing them. A key is “owned” by the first node encountered moving clockwise from the key’s position on the ring. Consistent Hashing was introduced by Karger et al. at MIT in 1997.

svgbob
                  0 / 2^32
                     *
              *             *
         Node_A               Node_C
       (hash=10)             (hash=80)
          *                     *
              
          Key_1                Key_3
        (hash=25)             (hash=70)
              
              *             *
               Node_B
              (hash=50)
                     *
                  (180)

In Figure 4.1, moving clockwise: Key_1 (hash=25) → first node clockwise is Node_B (hash=50). Key_3 (hash=70) → first node clockwise is Node_C (hash=80). When we add a new Node_D at hash=60, only the keys between Node_B and Node_D need to move. Key_1 is completely unaffected — we’ve reduced data movement from nearly everything to just a fraction.

Python Implementation

import hashlib
import bisect
from typing import Optional

class ConsistentHashRing:
    def __init__(self, virtual_nodes: int = 150):
        self.virtual_nodes = virtual_nodes
        self.ring = {}
        self.sorted_keys = []

    def _hash(self, key: str) -> int:
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

    def add_node(self, node: str):
        for i in range(self.virtual_nodes):
            virtual_key = f"{node}:vnode:{i}"
            position = self._hash(virtual_key)
            self.ring[position] = node
            bisect.insort(self.sorted_keys, position)

    def remove_node(self, node: str):
        for i in range(self.virtual_nodes):
            virtual_key = f"{node}:vnode:{i}"
            position = self._hash(virtual_key)
            if position in self.ring:
                del self.ring[position]
                self.sorted_keys.remove(position)

    def get_node(self, key: str) -> Optional[str]:
        if not self.ring:
            return None
        key_hash = self._hash(key)
        idx = bisect.bisect_right(self.sorted_keys, key_hash)
        if idx == len(self.sorted_keys):
            idx = 0  # wrap around the ring
        return self.ring[self.sorted_keys[idx]]

# Usage
ring = ConsistentHashRing(virtual_nodes=150)
ring.add_node("shard-0")
ring.add_node("shard-1")
ring.add_node("shard-2")

for key in [f"user:{i}" for i in range(5)]:
    print(f"{key:15} -> {ring.get_node(key)}")

Notice the virtual_nodes parameter. Each physical shard gets virtual_nodes phantom positions scattered around the ring, averaging out the hash distribution and preventing any one physical node from owning a disproportionately large arc.


Part 5 — Hotspots: A Taxonomy of the Enemy

Before we can mitigate hotspots, we need to precisely understand what kind we’re dealing with. Not all hotspots are created equal, and the wrong treatment can make things worse.

Type 1: Temporal Hotspots (Write-Heavy)

Cause: The shard key is time-based (e.g., created_at timestamp). All new writes go to the “most recent” shard. The “current” shard absorbs 100% of writes while all historical shards are read-only artifacts.

Signature: One shard has CPU/IO spiking at the same time others flatline. The hot shard is always the “newest” one.

Type 2: Popularity Hotspots (Read-Heavy)

Cause: A small set of keys receives a vastly disproportionate number of reads. Think: a viral tweet, a celebrity’s profile page, a trending product. This is driven by Zipf’s Law — real-world data access is inherently skewed.

svgbob
User Activity Distribution (Zipf's Law)
                  
 Reads/sec
    ^
    |  *
    |   *
    |    *
    |      *
    |         *
    |              *
    |                    *  *  *  *  *
    +-----------------------------------------> User Rank
       Top 1%              Long Tail
       (Celebrity)         (Normal Users)

Figure 5.1 shows the classic Zipf distribution. The top 1% of keys account for the majority of reads. Any sharding strategy mapping these to a single shard without mitigation creates a hotspot.

Type 3: Skewed Key Distribution (Structural Hotspot)

Cause: The shard key itself has a non-uniform distribution — sharding by last_name when 40% of your users have names starting with “S,” or sharding by country_code when 70% of users are from the US.

Signature: One shard permanently holds 3x–10x more data than peers. The hotspot is baked into the data structure itself.


Part 6 — Hotspot Mitigation Techniques

We cover five practical techniques, ordered from “easy to add to an existing system” to “requires architectural changes.”

Technique 1: Virtual Nodes — Fix for Structural Imbalance

Without virtual nodes, each physical shard owns one contiguous arc of the hash ring. Uneven hash distribution means some shards own larger arcs — and thus more keys.

svgbob
WITHOUT Virtual Nodes (3 physical nodes, uneven arcs):

   *----*-----------*
   |                |    <- Node_A owns 45% of ring
   *   ring         *
   |                |
   *-*--------------*
  Node_B  Node_C
  (15%)   (40%)

WITH Virtual Nodes (each node has 3 vnodes, balanced arcs):

   A1  B1  C1  A2  B2  C2  A3  B3  C3  ...
   *---*---*---*---*---*---*---*---*---
   ^           ^           ^
 Node_A      Node_B      Node_C  (each ~33%)

Figure 6.1 contrasts the two approaches. With virtual nodes, each physical shard is represented by many small, evenly distributed positions around the ring. The typical production sweet spot is 100–200 virtual nodes per physical node.


Technique 2: Shard Key Salting — Fix for Temporal Hotspots

The Problem: A viral event causes millions of writes per second all targeting the same key. All those writes hit the same shard.

The Fix: Add a random salt suffix to the shard key, spreading writes across multiple virtual copies of that key.

import random
import hashlib

def get_salted_shard(user_id: int, salt_factor: int, shard_count: int) -> str:
    """
    salt_factor: Number of virtual copies to spread writes across.
    Reads must query ALL virtual copies and merge (fan-out read).
    """
    salt = random.randint(0, salt_factor - 1)
    salted_key = f"{user_id}_{salt}"
    hash_val = int(hashlib.md5(salted_key.encode()).hexdigest(), 16)
    shard = hash_val % shard_count
    return f"shard-{shard} (key: {salted_key})"

# Same celebrity_id, written 5 times — distributed across shards
celebrity_id = 42
for _ in range(5):
    print(get_salted_shard(celebrity_id, salt_factor=8, shard_count=4))
# Output: different shards each time

The trade-off: Writes are beautifully distributed, but reads become expensive. To read all data for user_id=42, we must query all salt variants (42_0 through 42_7) across potentially different shards, then merge — a fan-out read. This makes sense for write-heavy hot keys, not for frequently read small records.


Technique 3: Hot Key Isolation — Dedicated Shards

The Problem: Your top 50 celebrity accounts generate 30% of all traffic. Read-heavy celebrities still punish a single shard even after salting.

The Fix: Move hot keys to dedicated shards — often called whale shards. You explicitly over-provision resources for these keys using the directory-based approach applied surgically.

class AdaptiveShardRouter:
    HOT_THRESHOLD = 10_000  # reads/sec considered "hot"

    def __init__(self, default_shard_count: int):
        self.default_shard_count = default_shard_count
        self.hot_key_registry = {}
        self.access_counter = {}

    def record_access(self, key: str):
        self.access_counter[key] = self.access_counter.get(key, 0) + 1
        if self.access_counter[key] > self.HOT_THRESHOLD:
            self._promote_to_hot_shard(key)

    def _promote_to_hot_shard(self, key: str):
        if key not in self.hot_key_registry:
            dedicated_shard = f"hot-shard-{len(self.hot_key_registry)}"
            self.hot_key_registry[key] = dedicated_shard
            print(f"[HotKey] '{key}' promoted to: {dedicated_shard}")

    def route(self, key: str) -> str:
        if key in self.hot_key_registry:
            return self.hot_key_registry[key]
        return f"shard-{hash(key) % self.default_shard_count}"

This pattern mirrors Amazon DynamoDB’s Adaptive Capacity, which automatically detects hot partitions and reallocates throughput capacity to them in real time.


Technique 4: Caching Layer — The Read Hotspot Firewall

Even with perfect write distribution, read hotspots persist. If a celebrity’s profile page is fetched 500,000 times per second, no number of shards will be enough if each fetch hits the database. The fix is a distributed cache (Redis, Memcached) between the application and the shard layer.

svgbob
Request Flow with Cache Layer:

Client Request
      |
      v
+-------------+    HIT     +--------------------+
|  App Server |----------->|   Redis Cache       |
|             |<-----------| (in-memory, ~1ms)   |
+-------------+  CACHED    +--------------------+
      |
      | MISS (first request or TTL expired)
      v
+----------------+
|  Shard Router  |
+----------------+
    |    |    |
    v    v    v
  [S0] [S1] [S2]  <- only cache misses reach shards

In Figure 6.2, the cache acts as a firewall. In steady state for a celebrity account, 99.9% of read traffic is absorbed by Redis. The shard only serves the initial population and periodic TTL refreshes — exactly how Twitter, Instagram, and Facebook handle celebrity traffic.

Beware the Cache Stampede (Thundering Herd): When a cached key expires and thousands of simultaneous cache misses all rush to the database at once. Mitigate this with the XFetch probabilistic early-expiry algorithm:

import random

def should_early_refresh(ttl: float, current_age: float, beta: float = 1.0) -> bool:
    """
    Returns True if we should proactively refresh the cache now,
    before the actual TTL expires. Avoids stampede on expiry.
    beta > 1 refreshes more aggressively; beta < 1 waits longer.
    """
    remaining = ttl - current_age
    gap = -beta * ttl * (random.random() ** 0.5)
    return gap > remaining

# 200s TTL, 180s have passed -> likely True (refresh proactively)
print(should_early_refresh(ttl=200, current_age=180, beta=1.0))

Technique 5: Read Replicas + Fan-Out — Scaling Reads Independently

Sometimes hotspots are unavoidable by design — a product page during a flash sale receives millions of reads per minute for the same product_id. Writes go to the primary (leader); reads are distributed across replicas via round-robin or least-connection routing.

svgbob
Hot Shard Architecture with Read Replicas:

              Writes
                |
                v
        +--------------+
        | Primary Shard |  <- only write target
        +--------------+
         /     |      \
        /      |       \   async replication
       v       v        v
  +--------+ +--------+ +--------+
  |Replica | |Replica | |Replica |
  |   0    | |   1    | |   2    |
  +--------+ +--------+ +--------+
      ^           ^           ^
      |  Reads distributed    |
      +-------+-------+-------+
                  |
             Read Router
             (Round Robin)

Figure 6.3 shows how read throughput scales linearly with the number of replicas added. The trade-off is replication lag — in asynchronous replication, replicas may be milliseconds to seconds behind the primary. Applications requiring strong consistency (read-your-writes) must route those specific operations to the primary.


Part 7 — Choosing a Shard Key: The Art Behind the Science

Everything hinges on shard key selection. A bad shard key will create hotspots that no downstream mitigation can fully contain.

The Four Properties of a Great Shard Key

The Composite Shard Key Pattern

When a single column can’t satisfy all four properties, we combine columns into a composite shard key. This is exactly how Apache Cassandra’s data model works:

def composite_shard_key(user_id: int, content_type: str) -> str:
    """
    Combines user_id + content_type so all content for a user
    is co-located on the same shard — enabling efficient feed queries.
    """
    return f"{user_id}:{content_type}"

key1 = composite_shard_key(user_id=1001, content_type="post")
key2 = composite_shard_key(user_id=1001, content_type="comment")
# key1 and key2 hash to the same shard -> single-shard user feed query

The partition key (first part) determines the shard; the clustering key (remaining columns) determines the order within the shard. This gives you both uniform distribution and efficient range queries within each partition.


Part 8 — A Decision Framework

When you encounter a sharding problem in a system design interview, work through this decision tree:

svgbob
Start: Does your dataset require sharding?
              |
              v
     Is access pattern read-heavy?
      /                  \
    YES                   NO (write-heavy)
     |                    |
     v                    v
  Add Read Replicas    Is the write pattern
  + Cache Layer        time-series/sequential?
     |                  /         \
     |                YES          NO
     |                 |            |
     |                 v            v
     |          Range Sharding  Hash-Based Sharding
     |          + Key Salting   (Consistent Hash)
     |                 \           /
     |                  v         v
     +--------------> Monitor for Hotspots
                             |
                             v
                   Hotspot Detected?
                    /            \
                  YES             NO
                   |               |
                   v               v
            Isolate Hot Key    Keep monitoring.
            to Dedicated Shard

In Figure 8.1, notice that monitoring is never optional — it’s a permanent step in the loop. Hotspots are dynamic: a key that’s cold today can become viral tomorrow.

The Three Metrics to Watch

def detect_hotspot(shard_metrics: dict, threshold_ratio: float = 2.0) -> list:
    """shard_metrics: { shard_id: requests_per_second }"""
    if not shard_metrics:
        return []
    avg_load = sum(shard_metrics.values()) / len(shard_metrics)
    return [
        shard_id
        for shard_id, rps in shard_metrics.items()
        if rps > avg_load * threshold_ratio
    ]

metrics = {
    "shard-0": 1200,
    "shard-1": 1100,
    "shard-2": 8900,   # <- hot!
    "shard-3": 950,
}
print(detect_hotspot(metrics, threshold_ratio=2.0))
# Output: ['shard-2']

Part 9 — Real-World Case Study: Discord’s Message Storage

In 2017, Discord published a case study about migrating their message storage from MongoDB to Apache Cassandra. It’s a masterclass in sharding gone wrong and then done right.

Phase 1 (MongoDB, sharded by channel_id): Discord sharded by channel ID because almost all queries were “get recent messages in channel X.” This gave perfect query locality. But popular channels — large public servers — became brutal hotspots. Millions of users reading the same channel at the same time overwhelmed those shards.

Phase 2 (Cassandra, composite key (channel_id, message_id)): By moving to Cassandra with a composite key, they got:

The critical insight: sharding by channel_id alone was correct for query patterns but wrong for load distribution. The composite key plus caching handled both problems simultaneously.


Part 10 — Interview Cheat Sheet

For your next system design interview, always hit these points when sharding comes up:

State the trade-offs explicitly:

Always address shard key selection criteria: cardinality, uniformity, stability, query locality.

Winning answers to common follow-up questions:


Closing Thoughts: Balance Is the Goal

Sharding is fundamentally an act of diplomacy — you’re negotiating between conflicting demands: query performance wants co-location; load balance wants distribution; operational simplicity wants fewer moving parts; fault tolerance wants redundancy.

There is no universally correct sharding strategy. The right answer always depends on your access patterns, your write-to-read ratio, your data distribution, and your growth trajectory. The engineer who understands why each strategy exists — and what each one sacrifices — will always outperform the one who has merely memorized the techniques.

And the next time you’re standing in a grocery checkout line watching one lane stack up while others sit empty, you’ll know exactly what’s happening: someone chose the wrong shard key.