Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ futures = "0.3"
md5 = "0.7"
rand = "0.8"
socket2 = "0.5"
arc-swap = "1"

[profile.release]
debug = true
Expand Down
12 changes: 12 additions & 0 deletions default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
slowlog_log_slower_than = 10000
# 慢查询日志最大保留条数;默认 128
slowlog_max_len = 128
# 热点 Key 分析参数
hotkey_sample_every = 32
hotkey_sketch_width = 4096
hotkey_sketch_depth = 4
hotkey_capacity = 512
hotkey_decay = 0.925

[[clusters]]
name = "test-cluster"
Expand Down Expand Up @@ -58,3 +64,9 @@
slowlog_log_slower_than = 10000
# 慢查询日志最大保留条数;默认 128
slowlog_max_len = 128
# 热点 Key 分析参数
hotkey_sample_every = 32
hotkey_sketch_width = 4096
hotkey_sketch_depth = 4
hotkey_capacity = 512
hotkey_decay = 0.925
6 changes: 6 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@ cargo build --release
- `read_from_slave`:Cluster 模式下允许从 replica 读取。
- `slowlog_log_slower_than`:慢查询阈值(微秒,默认 `10000`,设为 `-1` 关闭记录)。
- `slowlog_max_len`:慢查询日志最大保留条数(默认 `128`)。
- `hotkey_sample_every`:热点 Key 采样间隔(默认 `32`,越大代表对请求采样越稀疏)。
- `hotkey_sketch_width` / `hotkey_sketch_depth`:热点 Key 频率估算所用 Count-Min Sketch 宽度与深度,决定误差与内存占用。
- `hotkey_capacity`:HeavyKeeper 桶容量,用于保留候选热点 Key 数量上限。
- `hotkey_decay`:HeavyKeeper 衰减系数,取值 `(0, 1]`,越接近 `1` 越倾向保留历史数据。
- `auth` / `password`:前端 ACL,详见下文。
- `backend_auth` / `backend_password`:后端 ACL 认证,详见下文。

> 提示:代理原生支持 `SLOWLOG GET/LEN/RESET`,并按集群维度汇总慢查询;配置上述阈值和长度即可控制记录行为。
>
> 热点 Key 分析可通过 `HOTKEY ENABLE|DISABLE|GET|RESET` 控制,相关采样参数可在配置文件或 `CONFIG SET cluster.<name>.hotkey-*` 中动态调整。

示例参见仓库根目录的 `default.toml`。

Expand Down
29 changes: 29 additions & 0 deletions src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::auth::{AuthAction, BackendAuth, FrontendAuthenticator};
use crate::backend::client::{ClientId, FrontConnectionGuard};
use crate::backend::pool::{BackendNode, ConnectionPool, Connector, SessionCommand};
use crate::config::{ClusterConfig, ClusterRuntime, ConfigManager};
use crate::hotkey::Hotkey;
use crate::info::{InfoContext, ProxyMode};
use crate::metrics;
use crate::protocol::redis::{
Expand Down Expand Up @@ -54,6 +55,7 @@ pub struct ClusterProxy {
runtime: Arc<ClusterRuntime>,
config_manager: Arc<ConfigManager>,
slowlog: Arc<Slowlog>,
hotkey: Arc<Hotkey>,
listen_port: u16,
seed_nodes: usize,
}
Expand Down Expand Up @@ -88,6 +90,9 @@ impl ClusterProxy {
let slowlog = config_manager
.slowlog_for(&config.name)
.ok_or_else(|| anyhow!("missing slowlog state for cluster {}", config.name))?;
let hotkey = config_manager
.hotkey_for(&config.name)
.ok_or_else(|| anyhow!("missing hotkey state for cluster {}", config.name))?;
let proxy = Self {
cluster: cluster.clone(),
hash_tag,
Expand All @@ -100,6 +105,7 @@ impl ClusterProxy {
runtime,
config_manager,
slowlog,
hotkey,
listen_port,
seed_nodes: config.servers.len(),
};
Expand Down Expand Up @@ -301,6 +307,18 @@ impl ClusterProxy {
inflight += 1;
continue;
}
if let Some(response) = self.try_handle_hotkey(&cmd) {
let success = !response.is_error();
metrics::front_command(
self.cluster.as_ref(),
cmd.kind_label(),
success,
);
let fut = async move { response };
pending.push_back(Box::pin(fut));
inflight += 1;
continue;
}
if let Some(response) = self.try_handle_slowlog(&cmd) {
let success = !response.is_error();
metrics::front_command(
Expand Down Expand Up @@ -367,6 +385,13 @@ impl ClusterProxy {
))
}

fn try_handle_hotkey(&self, command: &RedisCommand) -> Option<RespValue> {
if !command.command_name().eq_ignore_ascii_case(b"HOTKEY") {
return None;
}
Some(crate::hotkey::handle_command(&self.hotkey, command.args()))
}

fn try_handle_info(&self, command: &RedisCommand) -> Option<RespValue> {
if !command.command_name().eq_ignore_ascii_case(b"INFO") {
return None;
Expand Down Expand Up @@ -675,6 +700,7 @@ impl ClusterProxy {
let fetch_trigger = self.fetch_trigger.clone();
let cluster = self.cluster.clone();
let slowlog = self.slowlog.clone();
let hotkey = self.hotkey.clone();
let kind_label = command.kind_label();
Box::pin(async move {
match dispatch_with_context(
Expand All @@ -685,6 +711,7 @@ impl ClusterProxy {
fetch_trigger,
client_id,
slowlog,
hotkey,
command,
)
.await
Expand Down Expand Up @@ -1263,6 +1290,7 @@ async fn dispatch_with_context(
fetch_trigger: mpsc::UnboundedSender<()>,
client_id: ClientId,
slowlog: Arc<Slowlog>,
hotkey: Arc<Hotkey>,
command: RedisCommand,
) -> Result<RespValue> {
let command_snapshot = command.clone();
Expand Down Expand Up @@ -1292,6 +1320,7 @@ async fn dispatch_with_context(
.await
};
slowlog.maybe_record(&command_snapshot, started.elapsed());
hotkey.record_command(&command_snapshot);
result
}

Expand Down
Loading