🦀 Actix 配对队列 Redis 化:崩溃不丢单,多实例秒级共享
Actix 游戏服务端进阶:Matchmaking 等待队列的 Redis 持久化实战指南
在上篇《添加配对匹配(Matchmaking)逻辑》中,我们使用内存中的 VecDeque<Addr<PlayerSession>> 实现了简单的 FIFO 等待队列。这种方式简单高效,但在以下场景下会丢失数据:
- 服务重启或崩溃 → 正在排队的玩家全部丢失,需要重新排队
- 多实例部署(负载均衡) → 每个实例的队列独立,无法跨实例共享匹配
- 节点故障切换 → 队列数据丢失
为了解决这些问题,我们引入 Redis 作为持久化后端,将等待队列从内存迁移到 Redis,实现持久化、分布式共享和高可用。
本指南提供完整、可直接集成的 Redis 持久化方案,基于上篇代码结构。采用简单可靠的方式:Redis List(LPUSH / RPOP)作为 FIFO 队列,结合 Redis Hash 存储玩家元数据(player_id → Addr 序列化或引用)。
为什么选择 Redis 实现持久化队列?
- 持久化:RDB + AOF 支持数据恢复
- 高性能:内存操作,10 万 QPS 轻松
- 原子性:LPUSH/RPOP 原子操作,避免竞态
- 分布式:多 Actix 实例共享同一 Redis
- Rust 生态:
redis+deadpool-redis/bb8-redis连接池集成优秀
推荐连接池:deadpool-redis 或 bb8-redis(Actix 社区常用)
1. 依赖添加
更新 Cargo.toml:
[dependencies]
# 原有依赖...
redis = { version = "0.27", features = ["tokio-comp", "connection-manager"] }
deadpool-redis = "0.15" # 连接池,推荐
serde_json = "1.0"
2. Redis 初始化与连接池
在 main.rs 中创建 Redis 连接池,并作为 AppData 共享:
use deadpool_redis::{Config, Pool, Runtime};
use std::env;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
let cfg = Config::from_url(redis_url);
let redis_pool = cfg.create_pool(Some(Runtime::Tokio1)).expect("Failed to create Redis pool");
let matchmaker = Matchmaker::new(redis_pool.clone()).start();
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(redis_pool.clone()))
.app_data(web::Data::new(matchmaker.clone()))
.route("/ws", web::get().to(ws_route))
})
.bind(("0.0.0.0", 8080))?
.run()
.await
}
3. 更新 Matchmaker Actor:使用 Redis 持久化队列
我们将 waiting 从 VecDeque 改为 Redis 操作。
// actors/matchmaker.rs
use actix::prelude::*;
use deadpool_redis::{Pool, redis::{AsyncCommands, RedisError}};
use uuid::Uuid;
use crate::actors::player::PlayerSession;
use crate::messages::{EnqueuePlayer, JoinedRoom, AddPlayer};
const WAITING_QUEUE_KEY: &str = "matchmaking:waiting_queue";
pub struct Matchmaker {
redis_pool: Pool,
}
impl Matchmaker {
pub fn new(redis_pool: Pool) -> Self {
Self { redis_pool }
}
}
impl Actor for Matchmaker {
type Context = Context<Self>;
}
impl Handler<EnqueuePlayer> for Matchmaker {
type Result = ResponseActFuture<Self, ()>;
fn handle(&mut self, msg: EnqueuePlayer, ctx: &mut Context<Self>) -> Self::Result {
let player_addr = msg.0;
let redis_pool = self.redis_pool.clone();
ctx.spawn(async move {
let mut conn = match redis_pool.get().await {
Ok(c) => c,
Err(e) => {
tracing::error!("Redis connection error: {}", e);
return;
}
};
// 将 player_id 入队(我们只存 player_id,Addr 在 PlayerSession 中保留)
let player_id = Uuid::new_v4(); // 实际应从 PlayerSession 获取
// 假设 PlayerSession 在 started 中生成并存储自己的 id
// 这里简化:存 player_id 字符串
let _: () = conn.lpush(WAITING_QUEUE_KEY, player_id.to_string()).await.unwrap_or(());
tracing::info!("Player {} enqueued, queue len: {}", player_id, "unknown");
// 尝试匹配
Self::try_match(&mut conn).await;
}.into_actor(self));
fut::ok(())
}
}
impl Matchmaker {
async fn try_match(conn: &mut deadpool_redis::Connection) {
let len: i64 = conn.llen(WAITING_QUEUE_KEY).await.unwrap_or(0);
if len >= 2 {
// 原子弹出两个玩家
let p1_str: Option<String> = conn.rpop(WAITING_QUEUE_KEY).await.unwrap();
let p2_str: Option<String> = conn.rpop(WAITING_QUEUE_KEY).await.unwrap();
if let (Some(p1), Some(p2)) = (p1_str, p2_str) {
let p1_id = Uuid::parse_str(&p1).unwrap();
let p2_id = Uuid::parse_str(&p2).unwrap();
let room_id = Uuid::new_v4();
let room = GameRoom::new(room_id).start();
// 通知两个玩家(需要 PlayerSession Addr)
// 问题:我们只有 player_id,如何找到 Addr?
// 解决方案:用 Redis Hash 维护 player_id -> session_info 或直接广播通知
// 这里简化:假设 Matchmaker 持有 player_id -> Addr 的 HashMap(内存缓存)
// 或用 Redis Pub/Sub 通知 PlayerSession 自己加入房间
}
}
}
}
关键挑战与解决方案
问题:Redis 只存 player_id,但我们需要 Addr<PlayerSession> 来发送 JoinedRoom 消息。
推荐方案(生产级):
- 方案 A:内存 + Redis 双写(推荐简单场景)
- Matchmaker 保留
HashMap<Uuid, Addr<PlayerSession>>作为缓存 - 入队时:Redis LPUSH + HashMap 插入
- 出队时:Redis RPOP + 从 HashMap 取 Addr
- 玩家断开时:清理 HashMap + Redis(可选移除)
- 优点:简单,延迟低
- 缺点:重启后 HashMap 丢失 → 需从 Redis 恢复(启动时扫描)
- 方案 B:Redis Pub/Sub 通知(更分布式)
- 入队:LPUSH player_id
- 匹配成功:创建房间,Redis Publish 到频道
match:room:{room_id} - 每个 PlayerSession 订阅自己 player_id 频道
- 收到匹配消息后,自己 do_send 加入房间
- 优点:完全无状态,完美分布式
- 缺点:稍复杂
- 方案 C:只存 player_id + 外部服务发现
- 匹配后,房间 actor 通过 player_id 查找对应 WebSocket 服务器(需服务注册)
最实用方案:A(内存缓存 + Redis 持久化)
4. 实现方案 A:内存缓存 + Redis 持久化
更新 Matchmaker:
pub struct Matchmaker {
redis_pool: Pool,
player_sessions: HashMap<Uuid, Addr<PlayerSession>>, // 内存缓存
}
impl Handler<EnqueuePlayer> for Matchmaker {
// ...
let player_id = /* 从消息或上下文获取 */;
let addr = msg.0;
self.player_sessions.insert(player_id, addr.clone());
let mut conn = self.redis_pool.get().await?;
conn.lpush(WAITING_QUEUE_KEY, player_id.to_string()).await?;
// ...
}
在 try_match 中:
if let Some(p1_addr) = self.player_sessions.remove(&p1_id) {
// 同理 p2
p1_addr.do_send(JoinedRoom(room.clone()));
}
断线清理:PlayerSession stopping 中通知 Matchmaker 删除缓存:
// 新消息 RemovePlayer(Uuid)
impl Handler<RemovePlayer> for Matchmaker {
fn handle(&mut self, msg: RemovePlayer, _: &mut Context<Self>) {
self.player_sessions.remove(&msg.0);
// 可选:从 Redis 移除(但 List 不易删中间元素,可忽略或用 Set)
}
}
5. Redis 配置建议(生产)
- 持久化:开启 AOF(appendonly yes)+ RDB
- 高可用:Redis Sentinel 或 Redis Cluster
- 过期:设置 player 会话 key 过期(EXPIRE),防内存泄漏
- 队列清理:启动时扫描队列,验证 player 是否在线
6. 总结:内存 vs Redis 对比
| 特性 | 内存 VecDeque | Redis List + 缓存 |
|---|---|---|
| 性能 | 极高 | 高(~100μs 延迟) |
| 持久化 | 无 | 有(RDB/AOF) |
| 多实例共享 | 否 | 是 |
| 服务重启 | 丢失 | 恢复(需重建缓存) |
| 复杂度 | 低 | 中 |
| 推荐场景 | 单实例、小型游戏 | 生产级、多节点 |
推荐:从小项目开始用内存,规模化后迁移到 Redis + 缓存。
版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)