January 27, 2023
Let's say you want to compute a sample distribution, but the number of samples is too large to fit into memory. Specifically, we want to calculate the CDF of distribution, and use the inverse CDF to find how many samples are below a certain percentile. One common use case is latency monitoring, we often want to look at percentiles like p90, but have too many requests to provide an exact answer.
The first way I thought of tackling this problem is with bucketing. We can define buckets over the range of where our samples lie, creating new buckets if we see a sample that doesn't lie within any of our existing buckets. Then we calculate any percentile by constructing a CDF (via prefix sum).
There are a few issues with this approach. First is that towards the edges of our histogram, we want smaller buckets. This is because we don't average out outliers within one bucket [1]. Second, our histogram buckets don't change. As an example, say we have a bucket at $x=1$ with a width of 6. If we input a stream with only 3s, our histogram will define the middle of these values as 1, but the true middle is at 3. Is there a way to determine both size and location of a histogram bucket while ingesting a stream of requests?
From this question came $t$-digest, which I like to think of as a dynamic histogram bucketing technique. Assume we have a histogram of values, where each bucket is a cluster of points. Let's say we are adding a new sample. We create a bound function that checks if the cluster we are adding is too big. If so, we split up the cluster into two clusters. Notice how we already solved the problem of choosing the location of a histogram bucket, since we don't predefine the cluster location.
If we define our bound function carefully, we can solve the problem of having smaller buckets near the edges. One such definition is $B(x) = Compression * TotalMass * Quantile(x)(1-Quantile(x))$. In the middle of the clusters, our quantile will be around 0.5, which would be the max. Additionally, as we add more samples to our histogram, we allow our bounds to get bigger (TotalMass bound factor). The Compression factor is an adjustable constant.
The sample code shows the following:
from collections import namedtuple
from typing import List
import random
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import mlab
Cluster = namedtuple('Cluster', ['x', 'mass'])
def weighted_mean(x1: float, m1: float, x2: float, m2: float) -> float:
if m1 + m2 == 0:
return 0
return (x1 * m1 + x2 * m2) / (m1 + m2)
class TDigest:
def __init__(self, clusters: List[Cluster] = None, compression: float = 3) -> None:
if clusters is None:
clusters = []
self.clusters = clusters
self.compression = compression
self.total_mass = 0
def _get_closest_idx(self, x: float) -> int:
closest_idx = 0
closest = float('inf')
for i, cluster in enumerate(self.clusters):
if abs(cluster.x - x) < closest:
closest = abs(cluster.x - x)
closest_idx = i
return closest_idx
def quantile(self, x: float) -> float:
smaller_mass = 0
closest_cluster = self.clusters[self._get_closest_idx(x)]
for cluster in self.clusters:
if cluster.x < closest_cluster.x:
smaller_mass += cluster.mass
elif cluster.x == closest_cluster.x:
smaller_mass += cluster.mass / 2
return smaller_mass / self.total_mass
def bound(self, x: float) -> float:
q = self.quantile(x)
return self.compression * self.total_mass * q * (1 - q)
def update(self, x: int, mass: int = 1) -> None:
if not self.clusters:
self.clusters.append(Cluster(x, mass))
self.total_mass += mass
return
closest_idx = self._get_closest_idx(x)
cluster = self.clusters[closest_idx]
bound = self.bound(cluster.x)
if mass + cluster.mass > bound:
# split
self.clusters[closest_idx] = Cluster(weighted_mean(cluster.x, cluster.mass, x, bound - cluster.mass), bound)
self.clusters.append(Cluster(x, mass + cluster.mass - bound))
else:
# don't split
self.clusters[closest_idx] = Cluster(weighted_mean(cluster.x, cluster.mass, x, mass), mass + cluster.mass)
self.total_mass += mass
Notice that our quantile will never be completely 0 or 100 as we always choose a cluster.
Let's build the T-Digest of the uniform distribution from 0 to 10000, sampling 10000 times. To sanity check, we should check the quantile value at 9000 should be around 90%.
n = 10000
def generate_tdigest(distribution: np.ndarray) -> TDigest:
tdigest = TDigest()
for sample in distribution:
tdigest.update(sample)
return tdigest
tdigest = generate_tdigest(np.random.uniform(0,n,n))
print(tdigest.quantile(n * .9))
0.9111327199841893
Nice! To generalize the testing, we can look at the CDF of our T-Digest compared to the uniform distribution.
def plot_cdf(tdigests: List[TDigest], n: int) -> None:
x = []
for tdigest in tdigests:
x.append([tdigest.quantile(i) for i in range(n)])
fig, ax = plt.subplots(figsize=(8, 4))
# plot the cumulative histograms
for i in range(len(x)):
ax.hist(x[i], n, histtype='step', cumulative=True, label=f'TDigest {i}')
ax.hist(np.linspace(0,1,n), n, histtype='step', cumulative=True, label='Ground Truth')
# tidy up the figure
ax.grid(True)
ax.legend(loc='right')
ax.set_title('CDF')
ax.set_xlabel('Quantile')
ax.set_ylabel('Total Mass')
plt.show()
plot_cdf([tdigest], n)
This is exactly what we want. Our T-Digest approximates the uniform distribution, with closer estimates on the edges of the data (which are the points that we care about). We can visualize the cluster sizes, which look like a bell curve
cluster_x = lambda cluster: cluster.x
tdigest.clusters.sort(key=cluster_x)
num_clusters = len(tdigest.clusters)
ind = np.arange(num_clusters)
width = 0.35
plt.bar(ind, [cluster.mass for cluster in tdigest.clusters], width, label='Men')
plt.ylabel('Size')
plt.xlabel('Cluster')
plt.title('Cluster Size')
plt.legend(loc='best')
plt.show()
Oftentimes, we have multiple servers with their own T-Digest, but we want to aggregate them into one T-Digest (i.e. getting the overall latency of a system). We can already do a merge with our update(x, mass) function, simply go through all the clusters of one T-Digest and add those to another.
However, note that the order which we call self.update matters. Calling update(1), update(2), update(3), ... update(n) will produce a different result than a random permutation of 1 to n.
This is important when merging T-Digests, where we want our T-Digest to be accurate as possible. A small example of the difference between random merging and merging in sorted order in shown here:
def merge(tdigest: TDigest, merge_tdigest: TDigest) -> None:
for cluster in merge_tdigest.clusters:
tdigest.update(cluster.x, cluster.mass)
def merge_sorted(tdigest: TDigest, merge_tdigest: TDigest) -> TDigest:
new_tdigest = TDigest()
tdigest.clusters.sort(key=cluster_x)
merge_tdigest.clusters.sort(key=cluster_x)
tdigest_idx = 0
merge_tdigest_idx = 0
while tdigest_idx < len(tdigest.clusters) and merge_tdigest_idx < len(merge_tdigest.clusters):
new_tdigest.update(tdigest.clusters[tdigest_idx].x, tdigest.clusters[tdigest_idx].mass)
new_tdigest.update(merge_tdigest.clusters[merge_tdigest_idx].x, merge_tdigest.clusters[merge_tdigest_idx].mass)
tdigest_idx += 1
merge_tdigest_idx += 1
while tdigest_idx < len(tdigest.clusters):
new_tdigest.update(tdigest.clusters[tdigest_idx].x, tdigest.clusters[tdigest_idx].mass)
tdigest_idx += 1
while merge_tdigest_idx < len(merge_tdigest.clusters):
new_tdigest.update(merge_tdigest.clusters[merge_tdigest_idx].x, merge_tdigest.clusters[merge_tdigest_idx].mass)
merge_tdigest_idx += 1
return new_tdigest
tdigest = generate_tdigest(np.random.uniform(0,n,n))
merge(tdigest, generate_tdigest(np.random.uniform(0,n,n)))
tdigest_sorted = generate_tdigest(np.random.uniform(0,n,n))
tdigest_sorted = merge_sorted(tdigest_sorted, generate_tdigest(np.random.uniform(0,n,n)))
plot_cdf([tdigest, tdigest_sorted], n)
It isn't really clear from this plot what's going on, so looking at the numbers:
x1 = [tdigest.quantile(i) for i in range(n)]
x2 = [tdigest_sorted.quantile(i) for i in range(n)]
print('random', np.sum((np.array(x1) - np.linspace(0,1,n)) ** 2), len(tdigest.clusters))
print('sorted', np.sum((np.array(x2) - np.linspace(0,1,n)) ** 2), len(tdigest_sorted.clusters))
random 11.74210879447901 34 sorted 9.55337922247902 22
Sorted is better with fewer clusters, but this is one data point so it isn't too conclusive. I recommend looking at this talk [2] to see a more thorough analysis.
You might've noticed that update
is $O(n)$. For simplicity sake, I had the backing data structure of self.clusters
be an unsorted array. If the backing data structure was instead a balanced BST, we would be able to compute update
in $O(\log n)$.