🦀 Actix 配对队列 Redis 化:崩溃不丢单,多实例秒级共享

Photos provided by Unsplash OR Pexels

Actix 游戏服务端进阶:Matchmaking 等待队列的 Redis 持久化实战指南

在上篇《添加配对匹配(Matchmaking)逻辑》中,我们使用内存中的 VecDeque<Addr<PlayerSession>> 实现了简单的 FIFO 等待队列。这种方式简单高效,但在以下场景下会丢失数据:

  • 服务重启或崩溃 → 正在排队的玩家全部丢失,需要重新排队
  • 多实例部署(负载均衡) → 每个实例的队列独立,无法跨实例共享匹配
  • 节点故障切换 → 队列数据丢失

为了解决这些问题,我们引入 Redis 作为持久化后端,将等待队列从内存迁移到 Redis,实现持久化分布式共享和高可用。

本指南提供完整、可直接集成的 Redis 持久化方案,基于上篇代码结构。采用简单可靠的方式:Redis List(LPUSH / RPOP)作为 FIFO 队列,结合 Redis Hash 存储玩家元数据(player_id → Addr 序列化或引用)。

为什么选择 Redis 实现持久化队列?

  • 持久化:RDB + AOF 支持数据恢复
  • 高性能:内存操作,10 万 QPS 轻松
  • 原子性:LPUSH/RPOP 原子操作,避免竞态
  • 分布式:多 Actix 实例共享同一 Redis
  • Rust 生态redis + deadpool-redis / bb8-redis 连接池集成优秀

推荐连接池deadpool-redisbb8-redis(Actix 社区常用)

1. 依赖添加

更新 Cargo.toml

[dependencies]
# 原有依赖...
redis = { version = "0.27", features = ["tokio-comp", "connection-manager"] }
deadpool-redis = "0.15"  # 连接池,推荐
serde_json = "1.0"

2. Redis 初始化与连接池

main.rs 中创建 Redis 连接池,并作为 AppData 共享:

use deadpool_redis::{Config, Pool, Runtime};
use std::env;

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());

    let cfg = Config::from_url(redis_url);
    let redis_pool = cfg.create_pool(Some(Runtime::Tokio1)).expect("Failed to create Redis pool");

    let matchmaker = Matchmaker::new(redis_pool.clone()).start();

    HttpServer::new(move || {
        App::new()
            .app_data(web::Data::new(redis_pool.clone()))
            .app_data(web::Data::new(matchmaker.clone()))
            .route("/ws", web::get().to(ws_route))
    })
    .bind(("0.0.0.0", 8080))?
    .run()
    .await
}

3. 更新 Matchmaker Actor:使用 Redis 持久化队列

我们将 waitingVecDeque 改为 Redis 操作。

// actors/matchmaker.rs
use actix::prelude::*;
use deadpool_redis::{Pool, redis::{AsyncCommands, RedisError}};
use uuid::Uuid;
use crate::actors::player::PlayerSession;
use crate::messages::{EnqueuePlayer, JoinedRoom, AddPlayer};

const WAITING_QUEUE_KEY: &str = "matchmaking:waiting_queue";

pub struct Matchmaker {
    redis_pool: Pool,
}

impl Matchmaker {
    pub fn new(redis_pool: Pool) -> Self {
        Self { redis_pool }
    }
}

impl Actor for Matchmaker {
    type Context = Context<Self>;
}

impl Handler<EnqueuePlayer> for Matchmaker {
    type Result = ResponseActFuture<Self, ()>;

    fn handle(&mut self, msg: EnqueuePlayer, ctx: &mut Context<Self>) -> Self::Result {
        let player_addr = msg.0;
        let redis_pool = self.redis_pool.clone();

        ctx.spawn(async move {
            let mut conn = match redis_pool.get().await {
                Ok(c) => c,
                Err(e) => {
                    tracing::error!("Redis connection error: {}", e);
                    return;
                }
            };

            // 将 player_id 入队(我们只存 player_id,Addr 在 PlayerSession 中保留)
            let player_id = Uuid::new_v4();  // 实际应从 PlayerSession 获取
            // 假设 PlayerSession 在 started 中生成并存储自己的 id

            // 这里简化:存 player_id 字符串
            let _: () = conn.lpush(WAITING_QUEUE_KEY, player_id.to_string()).await.unwrap_or(());

            tracing::info!("Player {} enqueued, queue len: {}", player_id, "unknown");

            // 尝试匹配
            Self::try_match(&mut conn).await;
        }.into_actor(self));

        fut::ok(())
    }
}

impl Matchmaker {
    async fn try_match(conn: &mut deadpool_redis::Connection) {
        let len: i64 = conn.llen(WAITING_QUEUE_KEY).await.unwrap_or(0);

        if len >= 2 {
            // 原子弹出两个玩家
            let p1_str: Option<String> = conn.rpop(WAITING_QUEUE_KEY).await.unwrap();
            let p2_str: Option<String> = conn.rpop(WAITING_QUEUE_KEY).await.unwrap();

            if let (Some(p1), Some(p2)) = (p1_str, p2_str) {
                let p1_id = Uuid::parse_str(&p1).unwrap();
                let p2_id = Uuid::parse_str(&p2).unwrap();

                let room_id = Uuid::new_v4();
                let room = GameRoom::new(room_id).start();

                // 通知两个玩家(需要 PlayerSession Addr)
                // 问题:我们只有 player_id,如何找到 Addr?
                // 解决方案:用 Redis Hash 维护 player_id -> session_info 或直接广播通知
                // 这里简化:假设 Matchmaker 持有 player_id -> Addr 的 HashMap(内存缓存)
                // 或用 Redis Pub/Sub 通知 PlayerSession 自己加入房间
            }
        }
    }
}

关键挑战与解决方案

问题:Redis 只存 player_id,但我们需要 Addr<PlayerSession> 来发送 JoinedRoom 消息。

推荐方案(生产级)

  1. 方案 A:内存 + Redis 双写(推荐简单场景)
  • Matchmaker 保留 HashMap<Uuid, Addr<PlayerSession>> 作为缓存
  • 入队时:Redis LPUSH + HashMap 插入
  • 出队时:Redis RPOP + 从 HashMap 取 Addr
  • 玩家断开时:清理 HashMap + Redis(可选移除)
  • 优点:简单,延迟低
  • 缺点:重启后 HashMap 丢失 → 需从 Redis 恢复(启动时扫描)
  1. 方案 B:Redis Pub/Sub 通知(更分布式)
  • 入队:LPUSH player_id
  • 匹配成功:创建房间,Redis Publish 到频道 match:room:{room_id}
  • 每个 PlayerSession 订阅自己 player_id 频道
  • 收到匹配消息后,自己 do_send 加入房间
  • 优点:完全无状态,完美分布式
  • 缺点:稍复杂
  1. 方案 C:只存 player_id + 外部服务发现
  • 匹配后,房间 actor 通过 player_id 查找对应 WebSocket 服务器(需服务注册)

最实用方案A(内存缓存 + Redis 持久化)

4. 实现方案 A:内存缓存 + Redis 持久化

更新 Matchmaker:

pub struct Matchmaker {
    redis_pool: Pool,
    player_sessions: HashMap<Uuid, Addr<PlayerSession>>,  // 内存缓存
}

impl Handler<EnqueuePlayer> for Matchmaker {
    // ...
    let player_id = /* 从消息或上下文获取 */;
    let addr = msg.0;

    self.player_sessions.insert(player_id, addr.clone());

    let mut conn = self.redis_pool.get().await?;
    conn.lpush(WAITING_QUEUE_KEY, player_id.to_string()).await?;
    // ...
}

try_match 中:

if let Some(p1_addr) = self.player_sessions.remove(&p1_id) {
    // 同理 p2
    p1_addr.do_send(JoinedRoom(room.clone()));
}

断线清理:PlayerSession stopping 中通知 Matchmaker 删除缓存:

// 新消息 RemovePlayer(Uuid)
impl Handler<RemovePlayer> for Matchmaker {
    fn handle(&mut self, msg: RemovePlayer, _: &mut Context<Self>) {
        self.player_sessions.remove(&msg.0);
        // 可选:从 Redis 移除(但 List 不易删中间元素,可忽略或用 Set)
    }
}

5. Redis 配置建议(生产)

  • 持久化:开启 AOF(appendonly yes)+ RDB
  • 高可用:Redis Sentinel 或 Redis Cluster
  • 过期:设置 player 会话 key 过期(EXPIRE),防内存泄漏
  • 队列清理:启动时扫描队列,验证 player 是否在线

6. 总结:内存 vs Redis 对比

特性内存 VecDequeRedis List + 缓存
性能极高高(~100μs 延迟)
持久化有(RDB/AOF)
多实例共享
服务重启丢失恢复(需重建缓存)
复杂度
推荐场景单实例、小型游戏生产级、多节点

推荐:从小项目开始用内存,规模化后迁移到 Redis + 缓存。

版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)