diff --git a/Cargo.lock b/Cargo.lock index fd82c7a..0bb2460 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -88,11 +88,18 @@ version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "aster-proxy" version = "1.3.4" dependencies = [ "anyhow", + "arc-swap", "async-trait", "axum", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 30118b4..e10e543 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ futures = "0.3" md5 = "0.7" rand = "0.8" socket2 = "0.5" +arc-swap = "1" [profile.release] debug = true diff --git a/default.toml b/default.toml index 63c2ad8..f8a42e1 100644 --- a/default.toml +++ b/default.toml @@ -26,6 +26,12 @@ slowlog_log_slower_than = 10000 # 慢查询日志最大保留条数;默认 128 slowlog_max_len = 128 + # 热点 Key 分析参数 + hotkey_sample_every = 32 + hotkey_sketch_width = 4096 + hotkey_sketch_depth = 4 + hotkey_capacity = 512 + hotkey_decay = 0.925 [[clusters]] name = "test-cluster" @@ -58,3 +64,9 @@ slowlog_log_slower_than = 10000 # 慢查询日志最大保留条数;默认 128 slowlog_max_len = 128 + # 热点 Key 分析参数 + hotkey_sample_every = 32 + hotkey_sketch_width = 4096 + hotkey_sketch_depth = 4 + hotkey_capacity = 512 + hotkey_decay = 0.925 diff --git a/docs/usage.md b/docs/usage.md index b63119c..651d747 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -28,10 +28,16 @@ cargo build --release - `read_from_slave`:Cluster 模式下允许从 replica 读取。 - `slowlog_log_slower_than`:慢查询阈值(微秒,默认 `10000`,设为 `-1` 关闭记录)。 - `slowlog_max_len`:慢查询日志最大保留条数(默认 `128`)。 +- `hotkey_sample_every`:热点 Key 采样间隔(默认 `32`,越大代表对请求采样越稀疏)。 +- `hotkey_sketch_width` / `hotkey_sketch_depth`:热点 Key 频率估算所用 Count-Min Sketch 宽度与深度,决定误差与内存占用。 +- `hotkey_capacity`:HeavyKeeper 桶容量,用于保留候选热点 Key 数量上限。 +- `hotkey_decay`:HeavyKeeper 衰减系数,取值 `(0, 1]`,越接近 `1` 越倾向保留历史数据。 - `auth` / `password`:前端 ACL,详见下文。 - `backend_auth` / `backend_password`:后端 ACL 认证,详见下文。 > 提示:代理原生支持 `SLOWLOG GET/LEN/RESET`,并按集群维度汇总慢查询;配置上述阈值和长度即可控制记录行为。 +> +> 热点 Key 分析可通过 `HOTKEY ENABLE|DISABLE|GET|RESET` 控制,相关采样参数可在配置文件或 `CONFIG SET cluster..hotkey-*` 中动态调整。 示例参见仓库根目录的 `default.toml`。 diff --git a/src/cluster/mod.rs b/src/cluster/mod.rs index f4aa6c0..01b7170 100644 --- a/src/cluster/mod.rs +++ b/src/cluster/mod.rs @@ -21,6 +21,7 @@ use crate::auth::{AuthAction, BackendAuth, FrontendAuthenticator}; use crate::backend::client::{ClientId, FrontConnectionGuard}; use crate::backend::pool::{BackendNode, ConnectionPool, Connector, SessionCommand}; use crate::config::{ClusterConfig, ClusterRuntime, ConfigManager}; +use crate::hotkey::Hotkey; use crate::info::{InfoContext, ProxyMode}; use crate::metrics; use crate::protocol::redis::{ @@ -54,6 +55,7 @@ pub struct ClusterProxy { runtime: Arc, config_manager: Arc, slowlog: Arc, + hotkey: Arc, listen_port: u16, seed_nodes: usize, } @@ -88,6 +90,9 @@ impl ClusterProxy { let slowlog = config_manager .slowlog_for(&config.name) .ok_or_else(|| anyhow!("missing slowlog state for cluster {}", config.name))?; + let hotkey = config_manager + .hotkey_for(&config.name) + .ok_or_else(|| anyhow!("missing hotkey state for cluster {}", config.name))?; let proxy = Self { cluster: cluster.clone(), hash_tag, @@ -100,6 +105,7 @@ impl ClusterProxy { runtime, config_manager, slowlog, + hotkey, listen_port, seed_nodes: config.servers.len(), }; @@ -301,6 +307,18 @@ impl ClusterProxy { inflight += 1; continue; } + if let Some(response) = self.try_handle_hotkey(&cmd) { + let success = !response.is_error(); + metrics::front_command( + self.cluster.as_ref(), + cmd.kind_label(), + success, + ); + let fut = async move { response }; + pending.push_back(Box::pin(fut)); + inflight += 1; + continue; + } if let Some(response) = self.try_handle_slowlog(&cmd) { let success = !response.is_error(); metrics::front_command( @@ -367,6 +385,13 @@ impl ClusterProxy { )) } + fn try_handle_hotkey(&self, command: &RedisCommand) -> Option { + if !command.command_name().eq_ignore_ascii_case(b"HOTKEY") { + return None; + } + Some(crate::hotkey::handle_command(&self.hotkey, command.args())) + } + fn try_handle_info(&self, command: &RedisCommand) -> Option { if !command.command_name().eq_ignore_ascii_case(b"INFO") { return None; @@ -675,6 +700,7 @@ impl ClusterProxy { let fetch_trigger = self.fetch_trigger.clone(); let cluster = self.cluster.clone(); let slowlog = self.slowlog.clone(); + let hotkey = self.hotkey.clone(); let kind_label = command.kind_label(); Box::pin(async move { match dispatch_with_context( @@ -685,6 +711,7 @@ impl ClusterProxy { fetch_trigger, client_id, slowlog, + hotkey, command, ) .await @@ -1263,6 +1290,7 @@ async fn dispatch_with_context( fetch_trigger: mpsc::UnboundedSender<()>, client_id: ClientId, slowlog: Arc, + hotkey: Arc, command: RedisCommand, ) -> Result { let command_snapshot = command.clone(); @@ -1292,6 +1320,7 @@ async fn dispatch_with_context( .await }; slowlog.maybe_record(&command_snapshot, started.elapsed()); + hotkey.record_command(&command_snapshot); result } diff --git a/src/config/mod.rs b/src/config/mod.rs index 908f8f8..69b3179 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -11,6 +11,10 @@ use tokio::fs; use tracing::{info, warn}; use crate::auth::{AuthUserConfig, BackendAuthConfig, FrontendAuthConfig}; +use crate::hotkey::{ + Hotkey, HotkeyConfig, DEFAULT_DECAY, DEFAULT_HOTKEY_CAPACITY, DEFAULT_SAMPLE_EVERY, + DEFAULT_SKETCH_DEPTH, DEFAULT_SKETCH_WIDTH, +}; use crate::protocol::redis::{RedisCommand, RespValue}; use crate::slowlog::Slowlog; @@ -27,6 +31,26 @@ fn default_slowlog_max_len() -> usize { 128 } +fn default_hotkey_sample_every() -> u64 { + DEFAULT_SAMPLE_EVERY +} + +fn default_hotkey_sketch_width() -> usize { + DEFAULT_SKETCH_WIDTH +} + +fn default_hotkey_sketch_depth() -> usize { + DEFAULT_SKETCH_DEPTH +} + +fn default_hotkey_capacity() -> usize { + DEFAULT_HOTKEY_CAPACITY +} + +fn default_hotkey_decay() -> f64 { + DEFAULT_DECAY +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Config { #[serde(default)] @@ -143,6 +167,16 @@ pub struct ClusterConfig { pub slowlog_log_slower_than: i64, #[serde(default = "default_slowlog_max_len")] pub slowlog_max_len: usize, + #[serde(default = "default_hotkey_sample_every")] + pub hotkey_sample_every: u64, + #[serde(default = "default_hotkey_sketch_width")] + pub hotkey_sketch_width: usize, + #[serde(default = "default_hotkey_sketch_depth")] + pub hotkey_sketch_depth: usize, + #[serde(default = "default_hotkey_capacity")] + pub hotkey_capacity: usize, + #[serde(default = "default_hotkey_decay")] + pub hotkey_decay: f64, } impl ClusterConfig { @@ -180,6 +214,21 @@ impl ClusterConfig { self.name ); } + if self.hotkey_sample_every == 0 { + bail!("cluster {} hotkey_sample_every must be > 0", self.name); + } + if self.hotkey_sketch_width == 0 { + bail!("cluster {} hotkey_sketch_width must be > 0", self.name); + } + if self.hotkey_sketch_depth == 0 { + bail!("cluster {} hotkey_sketch_depth must be > 0", self.name); + } + if self.hotkey_capacity == 0 { + bail!("cluster {} hotkey_capacity must be > 0", self.name); + } + if !(self.hotkey_decay > 0.0 && self.hotkey_decay <= 1.0) { + bail!("cluster {} hotkey_decay must be in (0, 1]", self.name); + } Ok(()) } @@ -310,6 +359,7 @@ struct ClusterEntry { index: usize, runtime: Arc, slowlog: Arc, + hotkey: Arc, } #[derive(Debug)] @@ -332,12 +382,21 @@ impl ConfigManager { cluster.slowlog_log_slower_than, cluster.slowlog_max_len, )); + let hotkey_config = HotkeyConfig { + sample_every: cluster.hotkey_sample_every, + sketch_width: cluster.hotkey_sketch_width, + sketch_depth: cluster.hotkey_sketch_depth, + capacity: cluster.hotkey_capacity, + decay: cluster.hotkey_decay, + }; + let hotkey = Arc::new(Hotkey::new(hotkey_config)); clusters.insert( key, ClusterEntry { index, runtime, slowlog, + hotkey, }, ); } @@ -361,6 +420,12 @@ impl ConfigManager { .map(|entry| entry.slowlog.clone()) } + pub fn hotkey_for(&self, name: &str) -> Option> { + self.clusters + .get(&name.to_ascii_lowercase()) + .map(|entry| entry.hotkey.clone()) + } + pub async fn handle_command(&self, command: &RedisCommand) -> Option { if !command.command_name().eq_ignore_ascii_case(b"CONFIG") { return None; @@ -483,6 +548,61 @@ impl ConfigManager { "cluster slowlog_max_len updated via CONFIG SET" ); } + ClusterField::HotkeySampleEvery => { + let parsed = parse_hotkey_sample_every(value)?; + entry.hotkey.update_config(|cfg| cfg.sample_every = parsed); + let mut guard = self.config.write(); + guard.clusters_mut()[entry.index].hotkey_sample_every = parsed; + info!( + cluster = cluster_name, + value = value, + "cluster hotkey_sample_every updated via CONFIG SET" + ); + } + ClusterField::HotkeySketchWidth => { + let parsed = parse_hotkey_sketch_width(value)?; + entry.hotkey.update_config(|cfg| cfg.sketch_width = parsed); + let mut guard = self.config.write(); + guard.clusters_mut()[entry.index].hotkey_sketch_width = parsed; + info!( + cluster = cluster_name, + value = value, + "cluster hotkey_sketch_width updated via CONFIG SET" + ); + } + ClusterField::HotkeySketchDepth => { + let parsed = parse_hotkey_sketch_depth(value)?; + entry.hotkey.update_config(|cfg| cfg.sketch_depth = parsed); + let mut guard = self.config.write(); + guard.clusters_mut()[entry.index].hotkey_sketch_depth = parsed; + info!( + cluster = cluster_name, + value = value, + "cluster hotkey_sketch_depth updated via CONFIG SET" + ); + } + ClusterField::HotkeyCapacity => { + let parsed = parse_hotkey_capacity(value)?; + entry.hotkey.update_config(|cfg| cfg.capacity = parsed); + let mut guard = self.config.write(); + guard.clusters_mut()[entry.index].hotkey_capacity = parsed; + info!( + cluster = cluster_name, + value = value, + "cluster hotkey_capacity updated via CONFIG SET" + ); + } + ClusterField::HotkeyDecay => { + let parsed = parse_hotkey_decay(value)?; + entry.hotkey.update_config(|cfg| cfg.decay = parsed); + let mut guard = self.config.write(); + guard.clusters_mut()[entry.index].hotkey_decay = parsed; + info!( + cluster = cluster_name, + value = value, + "cluster hotkey_decay updated via CONFIG SET" + ); + } } Ok(()) } @@ -519,6 +639,27 @@ impl ConfigManager { format!("cluster.{}.slowlog-max-len", name), entry.slowlog.max_len().to_string(), )); + let hotkey_cfg = entry.hotkey.config(); + entries.push(( + format!("cluster.{}.hotkey-sample-every", name), + hotkey_cfg.sample_every.to_string(), + )); + entries.push(( + format!("cluster.{}.hotkey-sketch-width", name), + hotkey_cfg.sketch_width.to_string(), + )); + entries.push(( + format!("cluster.{}.hotkey-sketch-depth", name), + hotkey_cfg.sketch_depth.to_string(), + )); + entries.push(( + format!("cluster.{}.hotkey-capacity", name), + hotkey_cfg.capacity.to_string(), + )); + entries.push(( + format!("cluster.{}.hotkey-decay", name), + hotkey_cfg.decay.to_string(), + )); } } entries.sort_by(|a, b| a.0.cmp(&b.0)); @@ -555,6 +696,11 @@ fn parse_key(key: &str) -> Result<(String, ClusterField)> { "write-timeout" => ClusterField::WriteTimeout, "slowlog-log-slower-than" => ClusterField::SlowlogLogSlowerThan, "slowlog-max-len" => ClusterField::SlowlogMaxLen, + "hotkey-sample-every" => ClusterField::HotkeySampleEvery, + "hotkey-sketch-width" => ClusterField::HotkeySketchWidth, + "hotkey-sketch-depth" => ClusterField::HotkeySketchDepth, + "hotkey-capacity" => ClusterField::HotkeyCapacity, + "hotkey-decay" => ClusterField::HotkeyDecay, unknown => bail!("unknown cluster field '{}'", unknown), }; Ok((cluster.to_string(), field)) @@ -593,6 +739,61 @@ fn parse_slowlog_len(value: &str) -> Result { usize::try_from(parsed).map_err(|_| anyhow!("slowlog-max-len is too large")) } +fn parse_hotkey_sample_every(value: &str) -> Result { + let parsed: u64 = value + .trim() + .parse() + .with_context(|| format!("invalid hotkey-sample-every value '{}'", value))?; + if parsed == 0 { + bail!("hotkey-sample-every must be > 0"); + } + Ok(parsed) +} + +fn parse_hotkey_sketch_width(value: &str) -> Result { + let parsed: usize = value + .trim() + .parse() + .with_context(|| format!("invalid hotkey-sketch-width value '{}'", value))?; + if parsed == 0 { + bail!("hotkey-sketch-width must be > 0"); + } + Ok(parsed) +} + +fn parse_hotkey_sketch_depth(value: &str) -> Result { + let parsed: usize = value + .trim() + .parse() + .with_context(|| format!("invalid hotkey-sketch-depth value '{}'", value))?; + if parsed == 0 { + bail!("hotkey-sketch-depth must be > 0"); + } + Ok(parsed) +} + +fn parse_hotkey_capacity(value: &str) -> Result { + let parsed: usize = value + .trim() + .parse() + .with_context(|| format!("invalid hotkey-capacity value '{}'", value))?; + if parsed == 0 { + bail!("hotkey-capacity must be > 0"); + } + Ok(parsed) +} + +fn parse_hotkey_decay(value: &str) -> Result { + let parsed: f64 = value + .trim() + .parse() + .with_context(|| format!("invalid hotkey-decay value '{}'", value))?; + if !(parsed > 0.0 && parsed <= 1.0) { + bail!("hotkey-decay must be in (0, 1]"); + } + Ok(parsed) +} + fn option_to_string(value: Option) -> String { value .map(|v| v.to_string()) @@ -618,6 +819,11 @@ enum ClusterField { WriteTimeout, SlowlogLogSlowerThan, SlowlogMaxLen, + HotkeySampleEvery, + HotkeySketchWidth, + HotkeySketchDepth, + HotkeyCapacity, + HotkeyDecay, } fn wildcard_match(pattern: &str, target: &str) -> bool { diff --git a/src/hotkey.rs b/src/hotkey.rs new file mode 100644 index 0000000..193c469 --- /dev/null +++ b/src/hotkey.rs @@ -0,0 +1,641 @@ +use std::cmp::Ordering as CmpOrdering; +use std::hash::{Hash, Hasher}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; + +use arc_swap::{ArcSwap, ArcSwapOption}; +use bytes::Bytes; +use hashbrown::HashMap; + +use crate::protocol::redis::{RedisCommand, RespValue}; + +pub const DEFAULT_SAMPLE_EVERY: u64 = 32; +pub const DEFAULT_SKETCH_WIDTH: usize = 4096; +pub const DEFAULT_SKETCH_DEPTH: usize = 4; +pub const DEFAULT_HOTKEY_CAPACITY: usize = 512; +pub const DEFAULT_DECAY: f64 = 0.925; +const HEAVY_BUCKET_SIZE: usize = 8; +const RNG_SEED: u64 = 0x9E37_79B9_7F4A_7C15; + +#[derive(Clone, Copy, Debug)] +pub struct HotkeyConfig { + pub sample_every: u64, + pub sketch_width: usize, + pub sketch_depth: usize, + pub capacity: usize, + pub decay: f64, +} + +impl Default for HotkeyConfig { + fn default() -> Self { + Self { + sample_every: DEFAULT_SAMPLE_EVERY, + sketch_width: DEFAULT_SKETCH_WIDTH, + sketch_depth: DEFAULT_SKETCH_DEPTH, + capacity: DEFAULT_HOTKEY_CAPACITY, + decay: DEFAULT_DECAY, + } + } +} + +#[derive(Clone, Debug)] +pub struct HotkeySample { + pub key: Bytes, + pub estimated_hits: u64, + pub error: u64, +} + +pub struct Hotkey { + enabled: AtomicBool, + core: ArcSwap, +} + +impl std::fmt::Debug for Hotkey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let core = self.core.load(); + f.debug_struct("Hotkey") + .field("enabled", &self.enabled.load(Ordering::Relaxed)) + .field("config", &core.config()) + .finish() + } +} + +impl Hotkey { + pub fn new(config: HotkeyConfig) -> Self { + let core = Arc::new(HotkeyCore::new(config)); + Self { + enabled: AtomicBool::new(false), + core: ArcSwap::from(core), + } + } + + pub fn new_default() -> Self { + Self::new(HotkeyConfig::default()) + } + + pub fn enable(&self) { + self.enabled.store(true, Ordering::Relaxed); + } + + pub fn disable(&self) { + self.enabled.store(false, Ordering::Relaxed); + } + + pub fn is_enabled(&self) -> bool { + self.enabled.load(Ordering::Relaxed) + } + + pub fn reset(&self) { + self.core.load().reset(); + } + + pub fn config(&self) -> HotkeyConfig { + self.core.load().config() + } + + pub fn reconfigure(&self, config: HotkeyConfig) { + let enabled = self.is_enabled(); + let new_core = Arc::new(HotkeyCore::new(config)); + self.core.store(new_core); + if !enabled { + self.disable(); + } + } + + pub fn update_config(&self, mutate: F) + where + F: FnOnce(&mut HotkeyConfig), + { + let mut config = self.config(); + mutate(&mut config); + self.reconfigure(config); + } + + pub fn record_command(&self, command: &RedisCommand) { + if !self.is_enabled() { + return; + } + let core = self.core.load(); + core.record_command(command); + } + + pub fn snapshot(&self, limit: Option) -> Vec { + self.core.load().snapshot(limit) + } +} + +struct HotkeyCore { + config: HotkeyConfig, + sample_counter: AtomicU64, + sketch: CountMinSketch, + heavy: Vec, + decay: f64, + rng_state: AtomicU64, +} + +impl HotkeyCore { + fn new(config: HotkeyConfig) -> Self { + let capacity = config.capacity.max(HEAVY_BUCKET_SIZE).next_power_of_two(); + let bucket_count = (capacity / HEAVY_BUCKET_SIZE).max(1); + let heavy = (0..bucket_count).map(|_| HeavyBucket::new()).collect(); + Self { + config, + sample_counter: AtomicU64::new(0), + sketch: CountMinSketch::new(config.sketch_width, config.sketch_depth), + heavy, + decay: config.decay, + rng_state: AtomicU64::new(splitmix64(RNG_SEED)), + } + } + + fn config(&self) -> HotkeyConfig { + self.config + } + + fn reset(&self) { + self.sketch.reset(); + for bucket in &self.heavy { + for entry in &bucket.entries { + entry.fingerprint.store(0, Ordering::Relaxed); + entry.score.store(0, Ordering::Relaxed); + entry.key.store(None); + } + } + self.sample_counter.store(0, Ordering::Relaxed); + self.rng_state + .store(splitmix64(RNG_SEED), Ordering::Relaxed); + } + + fn record_command(&self, command: &RedisCommand) { + if !self.should_sample() { + return; + } + + let args = command.args(); + if args.len() < 2 { + return; + } + if command.command_name().eq_ignore_ascii_case(b"HOTKEY") { + return; + } + + let name = command.command_name(); + if name.eq_ignore_ascii_case(b"MGET") + || name.eq_ignore_ascii_case(b"DEL") + || name.eq_ignore_ascii_case(b"UNLINK") + || name.eq_ignore_ascii_case(b"EXISTS") + || name.eq_ignore_ascii_case(b"TOUCH") + { + for key in args.iter().skip(1) { + self.observe(key); + } + } else if name.eq_ignore_ascii_case(b"MSET") + || name.eq_ignore_ascii_case(b"MSETNX") + || name.eq_ignore_ascii_case(b"HMSET") + { + for key in args.iter().skip(1).step_by(2) { + self.observe(key); + } + } else { + self.observe(&args[1]); + } + } + + fn snapshot(&self, limit: Option) -> Vec { + let mut candidates: HashMap = + HashMap::with_capacity(self.heavy.len() * HEAVY_BUCKET_SIZE); + let error_base = self + .sketch + .error_bound() + .saturating_mul(self.config.sample_every); + + for bucket in &self.heavy { + for entry in &bucket.entries { + let score = entry.score.load(Ordering::Relaxed); + if score == 0 { + continue; + } + if let Some(key_arc) = entry.key.load_full() { + let key = (*key_arc).clone(); + let estimate = self + .sketch + .estimate(key.as_ref()) + .saturating_mul(self.config.sample_every); + candidates + .entry(key) + .and_modify(|existing| { + if estimate > *existing { + *existing = estimate; + } + }) + .or_insert(estimate); + } + } + } + + let mut entries = candidates + .into_iter() + .map(|(key, estimated)| HotkeySample { + key, + estimated_hits: estimated, + error: error_base, + }) + .collect::>(); + + entries.sort_by(|a, b| match b.estimated_hits.cmp(&a.estimated_hits) { + CmpOrdering::Equal => a.key.cmp(&b.key), + other => other, + }); + + if let Some(limit) = limit { + entries.truncate(limit); + } + entries + } + + fn observe(&self, key: &Bytes) { + let fingerprint = hash_with_seed(key.as_ref(), RNG_SEED); + let estimate = self.sketch.increment(key.as_ref(), 1); + self.update_heavy(key, fingerprint, estimate); + } + + fn update_heavy(&self, key: &Bytes, fingerprint: u64, estimate: u64) { + if self.heavy.is_empty() { + return; + } + let bucket_idx = (fingerprint as usize) % self.heavy.len(); + let bucket = &self.heavy[bucket_idx]; + + for entry in &bucket.entries { + if entry.fingerprint.load(Ordering::Relaxed) != fingerprint { + continue; + } + if let Some(existing) = entry.key.load_full() { + if existing.as_ref() == key { + self.bump_entry(entry, estimate); + return; + } + } + } + + for entry in &bucket.entries { + if entry.score.load(Ordering::Relaxed) == 0 { + if entry + .score + .compare_exchange(0, estimate, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + entry.fingerprint.store(fingerprint, Ordering::Relaxed); + entry.key.store(Some(Arc::new(key.clone()))); + return; + } + } + } + + let mut victim_idx = 0usize; + let mut victim_score = u64::MAX; + for (idx, entry) in bucket.entries.iter().enumerate() { + let current = entry.score.load(Ordering::Relaxed); + if current < victim_score { + victim_score = current; + victim_idx = idx; + } + let chance = self.decay.powf(current as f64).clamp(0.0, 1.0); + if self.random_unit() < chance { + if current > 0 + && entry + .score + .compare_exchange( + current, + current - 1, + Ordering::Relaxed, + Ordering::Relaxed, + ) + .is_ok() + { + if current == 1 { + entry.fingerprint.store(fingerprint, Ordering::Relaxed); + entry.key.store(Some(Arc::new(key.clone()))); + entry.score.store(estimate, Ordering::Relaxed); + return; + } + } + } + } + + if victim_score < estimate { + let entry = &bucket.entries[victim_idx]; + if entry + .score + .compare_exchange(victim_score, estimate, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + entry.fingerprint.store(fingerprint, Ordering::Relaxed); + entry.key.store(Some(Arc::new(key.clone()))); + } + } + } + + fn bump_entry(&self, entry: &HeavyEntry, estimate: u64) { + let mut current = entry.score.load(Ordering::Relaxed); + loop { + let target = current.max(estimate).saturating_add(1); + match entry.score.compare_exchange( + current, + target, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => return, + Err(observed) => current = observed, + } + } + } + + fn should_sample(&self) -> bool { + let prev = self.sample_counter.fetch_add(1, Ordering::Relaxed); + prev % self.config.sample_every == 0 + } + + fn random_unit(&self) -> f64 { + let value = self.next_random(); + (value as f64) / (u64::MAX as f64) + } + + fn next_random(&self) -> u64 { + let mut state = self.rng_state.load(Ordering::Relaxed); + loop { + let next = xorshift64(state); + match self + .rng_state + .compare_exchange(state, next, Ordering::Relaxed, Ordering::Relaxed) + { + Ok(_) => return next, + Err(observed) => state = observed, + } + } + } +} + +#[derive(Default)] +struct HeavyEntry { + fingerprint: AtomicU64, + score: AtomicU64, + key: ArcSwapOption, +} + +struct HeavyBucket { + entries: [HeavyEntry; HEAVY_BUCKET_SIZE], +} + +impl HeavyBucket { + fn new() -> Self { + Self { + entries: std::array::from_fn(|_| HeavyEntry::default()), + } + } +} + +struct CountMinSketch { + width: usize, + depth: usize, + counters: Vec, + seeds: Vec, + total: AtomicU64, +} + +impl CountMinSketch { + fn new(width: usize, depth: usize) -> Self { + let width = width.max(1); + let depth = depth.max(1); + let mut seeds = Vec::with_capacity(depth); + let mut state = RNG_SEED; + for _ in 0..depth { + state = splitmix64(state); + seeds.push(state); + } + let counters = (0..width * depth) + .map(|_| AtomicU64::new(0)) + .collect::>(); + Self { + width, + depth, + counters, + seeds, + total: AtomicU64::new(0), + } + } + + fn increment(&self, key: &[u8], weight: u64) -> u64 { + let mut min = u64::MAX; + for depth in 0..self.depth { + let idx = self.hash_into_index(key, depth); + let cell = &self.counters[idx]; + let prev = cell.fetch_add(weight, Ordering::Relaxed); + let current = prev.saturating_add(weight); + min = min.min(current); + } + self.total.fetch_add(weight, Ordering::Relaxed); + min + } + + fn estimate(&self, key: &[u8]) -> u64 { + let mut min = u64::MAX; + for depth in 0..self.depth { + let idx = self.hash_into_index(key, depth); + let value = self.counters[idx].load(Ordering::Relaxed); + min = min.min(value); + } + min + } + + fn error_bound(&self) -> u64 { + if self.width == 0 { + return 0; + } + self.total.load(Ordering::Relaxed) / self.width as u64 + } + + fn reset(&self) { + for cell in &self.counters { + cell.store(0, Ordering::Relaxed); + } + self.total.store(0, Ordering::Relaxed); + } + + fn hash_into_index(&self, key: &[u8], depth: usize) -> usize { + let seed = self.seeds[depth]; + let hashed = hash_with_seed(key, seed) % self.width as u64; + depth * self.width + hashed as usize + } +} + +pub fn handle_command(hotkey: &Hotkey, args: &[Bytes]) -> RespValue { + if args.len() < 2 { + return hotkey_error("wrong number of arguments for 'hotkey' command"); + } + let sub = args[1].to_vec().to_ascii_uppercase(); + match sub.as_slice() { + b"ENABLE" => handle_enable(hotkey, args), + b"DISABLE" => handle_disable(hotkey, args), + b"GET" => handle_get(hotkey, args), + b"RESET" => handle_reset(hotkey, args), + _ => hotkey_error("unknown hotkey subcommand"), + } +} + +fn handle_enable(hotkey: &Hotkey, args: &[Bytes]) -> RespValue { + if args.len() != 2 { + return hotkey_error("wrong number of arguments for 'hotkey enable' command"); + } + hotkey.enable(); + RespValue::simple("OK") +} + +fn handle_disable(hotkey: &Hotkey, args: &[Bytes]) -> RespValue { + if args.len() != 2 { + return hotkey_error("wrong number of arguments for 'hotkey disable' command"); + } + hotkey.disable(); + RespValue::simple("OK") +} + +fn handle_get(hotkey: &Hotkey, args: &[Bytes]) -> RespValue { + if args.len() > 3 { + return hotkey_error("wrong number of arguments for 'hotkey get' command"); + } + let count = if args.len() == 3 { + match parse_non_negative(&args[2]) { + Ok(value) => Some(value), + Err(err) => return err, + } + } else { + None + }; + + let entries = hotkey.snapshot(count); + let payload = entries + .into_iter() + .map(|entry| { + RespValue::Array(vec![ + RespValue::BulkString(entry.key), + RespValue::Integer(entry.estimated_hits.min(i64::MAX as u64) as i64), + RespValue::Integer(entry.error.min(i64::MAX as u64) as i64), + ]) + }) + .collect(); + RespValue::Array(payload) +} + +fn handle_reset(hotkey: &Hotkey, args: &[Bytes]) -> RespValue { + if args.len() != 2 { + return hotkey_error("wrong number of arguments for 'hotkey reset' command"); + } + hotkey.reset(); + RespValue::simple("OK") +} + +fn parse_non_negative(arg: &Bytes) -> Result { + let text = std::str::from_utf8(arg).map_err(|_| hotkey_value_error())?; + let value: i64 = text.parse().map_err(|_| hotkey_value_error())?; + if value < 0 { + return Err(hotkey_value_error()); + } + usize::try_from(value).map_err(|_| hotkey_value_error()) +} + +fn hotkey_error(message: &str) -> RespValue { + RespValue::Error(Bytes::from(format!("ERR {message}"))) +} + +fn hotkey_value_error() -> RespValue { + RespValue::Error(Bytes::from_static( + b"ERR value is not an integer or out of range", + )) +} + +fn hash_with_seed(data: &[u8], seed: u64) -> u64 { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + seed.hash(&mut hasher); + data.hash(&mut hasher); + hasher.finish() +} + +fn splitmix64(mut state: u64) -> u64 { + state = state.wrapping_add(0x9E37_79B9_7F4A_7C15); + let mut z = state; + z = (z ^ (z >> 30)).wrapping_mul(0xBF58_476D_1CE4_E5B9); + z = (z ^ (z >> 27)).wrapping_mul(0x94D0_49BB_1331_11EB); + z ^ (z >> 31) +} + +fn xorshift64(mut x: u64) -> u64 { + x ^= x << 7; + x ^= x >> 9; + x ^= x << 8; + x +} + +#[cfg(test)] +mod tests { + use super::*; + + use bytes::Bytes; + + fn build_command(parts: &[&[u8]]) -> RedisCommand { + let args = parts + .iter() + .map(|p| Bytes::copy_from_slice(p)) + .collect::>(); + RedisCommand::new(args).unwrap() + } + + #[test] + fn record_and_snapshot() { + let mut config = HotkeyConfig::default(); + config.sample_every = 1; + let hotkey = Hotkey::new(config); + hotkey.enable(); + for _ in 0..100 { + let cmd = build_command(&[b"GET", b"a"]); + hotkey.record_command(&cmd); + } + for _ in 0..10 { + let cmd = build_command(&[b"GET", b"b"]); + hotkey.record_command(&cmd); + } + let entries = hotkey.snapshot(Some(2)); + assert_eq!(entries[0].key, Bytes::from_static(b"a")); + assert!(entries[0].estimated_hits >= entries[1].estimated_hits); + } + + #[test] + fn sampling_skips_when_disabled() { + let hotkey = Hotkey::new_default(); + let cmd = build_command(&[b"GET", b"key"]); + hotkey.record_command(&cmd); + assert!(hotkey.snapshot(None).is_empty()); + } + + #[test] + fn reset_clears_entries() { + let mut config = HotkeyConfig::default(); + config.sample_every = 1; + let hotkey = Hotkey::new(config); + hotkey.enable(); + let cmd = build_command(&[b"GET", b"key"]); + hotkey.record_command(&cmd); + assert!(!hotkey.snapshot(None).is_empty()); + hotkey.reset(); + assert!(hotkey.snapshot(None).is_empty()); + } + + #[test] + fn reconfigure_updates_settings() { + let hotkey = Hotkey::new_default(); + let mut cfg = hotkey.config(); + cfg.sample_every = 4; + hotkey.reconfigure(cfg); + assert_eq!(hotkey.config().sample_every, 4); + } +} diff --git a/src/lib.rs b/src/lib.rs index 5f72a1a..55fb3b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,6 +19,7 @@ pub mod auth; pub mod backend; pub mod cluster; pub mod config; +pub mod hotkey; pub mod info; pub mod meta; pub mod metrics; diff --git a/src/standalone/mod.rs b/src/standalone/mod.rs index 3bf4b75..5e427da 100644 --- a/src/standalone/mod.rs +++ b/src/standalone/mod.rs @@ -21,6 +21,7 @@ use crate::auth::{AuthAction, BackendAuth, FrontendAuthenticator}; use crate::backend::client::{ClientId, FrontConnectionGuard}; use crate::backend::pool::{BackendNode, ConnectionPool, Connector, SessionCommand}; use crate::config::{ClusterConfig, ClusterRuntime, ConfigManager}; +use crate::hotkey::Hotkey; use crate::info::{InfoContext, ProxyMode}; use crate::metrics; use crate::protocol::redis::{ @@ -51,6 +52,7 @@ pub struct StandaloneProxy { runtime: Arc, config_manager: Arc, slowlog: Arc, + hotkey: Arc, listen_port: u16, backend_nodes: usize, } @@ -90,6 +92,9 @@ impl StandaloneProxy { let slowlog = config_manager .slowlog_for(&config.name) .ok_or_else(|| anyhow!("missing slowlog state for cluster {}", config.name))?; + let hotkey = config_manager + .hotkey_for(&config.name) + .ok_or_else(|| anyhow!("missing hotkey state for cluster {}", config.name))?; Ok(Self { cluster, @@ -101,6 +106,7 @@ impl StandaloneProxy { runtime, config_manager, slowlog, + hotkey, listen_port, backend_nodes, }) @@ -135,6 +141,7 @@ impl StandaloneProxy { }; self.slowlog .maybe_record(&command_snapshot, started.elapsed()); + self.hotkey.record_command(&command_snapshot); result } @@ -446,6 +453,13 @@ impl StandaloneProxy { continue; } + if let Some(response) = self.try_handle_hotkey(&command) { + let success = !response.is_error(); + metrics::front_command(self.cluster.as_ref(), kind_label, success); + framed.send(response).await?; + continue; + } + if let Some(response) = self.try_handle_slowlog(&command) { let success = !response.is_error(); metrics::front_command(self.cluster.as_ref(), kind_label, success); @@ -482,6 +496,14 @@ impl StandaloneProxy { )) } + fn try_handle_hotkey(&self, command: &RedisCommand) -> Option { + if !command.command_name().eq_ignore_ascii_case(b"HOTKEY") { + return None; + } + + Some(crate::hotkey::handle_command(&self.hotkey, command.args())) + } + async fn try_handle_config(&self, command: &RedisCommand) -> Option { self.config_manager.handle_command(command).await }