🦀 Actix 事件溯源:Redis Streams 持久化,匹配全程可追溯

Photos provided by Unsplash OR Pexels

Actix 游戏服务端进阶:使用 Redis Streams 实现事件溯源(Event Sourcing)

在上篇使用 Redis Pub/Sub 实现分布式 Matchmaking 的方案中,我们解决了跨实例通知问题,但 Pub/Sub 是火并忘记(fire-and-forget)的模型:

  • 消息不持久化
  • 断线后历史事件丢失
  • 无法可靠地重放事件(replay)
  • 不适合需要审计、回滚或状态重建的场景

Redis Streams(Redis 5.0+ 引入)是专为持久化、可靠、可重放的事件日志设计的结构,非常适合事件溯源(Event Sourcing)可靠的消息队列。它结合了 append-only log + consumer groups 的能力,完美替代 Pub/Sub 在需要持久化和可靠投递的场景。

在本篇中,我们将把Matchmaking 流程迁移到 Redis Streams + Consumer Groups,实现:

  • 事件持久化(玩家入队、匹配成功、加入房间等事件永久保存)
  • 可靠投递 + 至少一次语义(at-least-once)
  • 支持事件重放(replay)重建状态
  • 分布式消费(多 Actix 实例并行处理)

1. Redis Streams vs Redis Pub/Sub 对比(针对事件溯源)

特性Redis Pub/SubRedis Streams适合场景
消息持久化无(内存中转瞬即逝)有(append-only log)Streams 胜出
历史回放(replay)不支持支持(XREAD 从任意 ID 开始)Streams 胜出
消费确认(ack)支持(XACK)Streams 胜出
Consumer Groups无(广播式)支持(负载均衡、pending 消息管理)Streams 胜出
至少一次投递可能丢失支持(结合 XACK + pending claim)Streams 胜出
实时广播极低延迟稍高延迟(但仍毫秒级)Pub/Sub 胜出
典型用途实时通知、聊天广播事件溯源、可靠队列、工作流

结论:如果你的游戏需要审计日志断线重连后恢复状态匹配历史分析分布式可靠处理Redis Streams 是更合适的选择。

2. 核心概念(Event Sourcing 视角)

  • 事件:不可变的事实记录,如 PlayerEnqueuedPlayersMatchedPlayerJoinedRoom
  • Stream:每个聚合根(aggregate)一个 stream,例如 matchmaking:events
  • Consumer Group:允许多个消费者(Actix 实例)并行消费,自动负载均衡
  • XACK:确认消费成功,防止重复投递
  • Pending Entries List (PEL):未确认的消息列表,可被 claim 重新分配

3. 实现方案

3.1 依赖

[dependencies]
redis = { version = "0.27", features = ["tokio-comp", "aio", "connection-manager"] }
deadpool-redis = "0.15"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
uuid = { version = "1.7", features = ["v4"] }
tokio = { version = "1", features = ["full"] }
tracing = "0.1"

3.2 事件定义

use serde::{Deserialize, Serialize};
use uuid::Uuid;
use chrono::{DateTime, Utc};

#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(tag = "type")]
pub enum MatchmakingEvent {
    PlayerEnqueued {
        player_id: Uuid,
        timestamp: DateTime<Utc>,
    },
    PlayersMatched {
        player1_id: Uuid,
        player2_id: Uuid,
        room_id: Uuid,
        timestamp: DateTime<Utc>,
    },
    PlayerJoinedRoom {
        player_id: Uuid,
        room_id: Uuid,
        timestamp: DateTime<Utc>,
    },
    // 可扩展:PlayerLeft, GameStarted, MatchCancelled...
}

3.3 写入事件(Producer – PlayerSession)

PlayerSession::started 中:

async fn enqueue_player(&self) {
    let event = MatchmakingEvent::PlayerEnqueued {
        player_id: self.id,
        timestamp: Utc::now(),
    };

    let mut conn = self.redis_pool.get().await.unwrap();
    let id: String = conn
        .xadd(
            "matchmaking:events",
            "*",
            &[("event", serde_json::to_string(&event).unwrap())],
        )
        .await
        .unwrap();

    tracing::info!("Enqueued player {} → event id {}", self.id, id);
}

3.4 Matchmaking Worker (Consumer Group)

我们启动一个独立的 Actix Actortokio task 作为消费者组成员:

pub struct MatchmakingConsumer {
    redis_pool: Pool,
    group_name: String,     // e.g. "matchmaking-workers"
    consumer_name: String,  // 唯一标识,如 "instance-1"
}

impl MatchmakingConsumer {
    pub fn new(pool: Pool, instance_id: &str) -> Self {
        Self {
            redis_pool: pool,
            group_name: "matchmaking-workers".to_string(),
            consumer_name: format!("consumer-{}", instance_id),
        }
    }

    pub async fn run(&self) {
        // 首次创建 consumer group(幂等)
        let mut conn = self.redis_pool.get().await.unwrap();
        let _: RedisResult<()> = redis::cmd("XGROUP")
            .arg("CREATE")
            .arg("matchmaking:events")
            .arg(&self.group_name)
            .arg("$")
            .arg("MKSTREAM")
            .query_async(&mut *conn)
            .await;

        loop {
            let mut conn = self.redis_pool.get().await.unwrap();

            // XREADGROUP 读取新消息
            let events: RedisResult<Vec<(String, Vec<(String, HashMap<String, String>)>)>> = redis::cmd("XREADGROUP")
                .arg("GROUP").arg(&self.group_name).arg(&self.consumer_name)
                .arg("COUNT").arg("10")
                .arg("BLOCK").arg("5000")  // 阻塞 5s
                .arg("STREAMS").arg("matchmaking:events").arg(">")
                .query_async(&mut *conn)
                .await;

            if let Ok(streams) = events {
                for (_stream, messages) in streams {
                    for (id, fields) in messages {
                        if let Some(event_json) = fields.get("event") {
                            if let Ok(event) = serde_json::from_str::<MatchmakingEvent>(event_json) {
                                self.process_event(&event, &id).await;
                            }
                        }

                        // 确认消费
                        let _: () = conn
                            .xack("matchmaking:events", &self.group_name, &[&id])
                            .await
                            .unwrap();
                    }
                }
            }
        }
    }

    async fn process_event(&self, event: &MatchmakingEvent, event_id: &str) {
        match event {
            MatchmakingEvent::PlayerEnqueued { player_id, .. } => {
                // 这里可以统计队列长度或触发匹配逻辑
                // 实际中可维护一个计数器或使用 ZSET 排序匹配
                tracing::info!("Processing enqueue: {}", player_id);
            }
            MatchmakingEvent::PlayersMatched { player1_id, player2_id, room_id, .. } => {
                // 通知玩家(通过 Redis Pub/Sub 或直接写另一个 stream)
                // 或者让 PlayerSession 订阅自己的 player stream
                tracing::info!("Match created: {} vs {} → room {}", player1_id, player2_id, room_id);
            }
            _ => {}
        }
    }
}

启动消费者(在 main 中):

let consumer = MatchmakingConsumer::new(redis_pool.clone(), "node-1");
tokio::spawn(async move {
    consumer.run().await;
});

4. 匹配逻辑迁移到 Streams

匹配逻辑可以作为单独的 周期性任务事件驱动

// 周期性扫描 pending 或维护队列长度
async fn matchmaking_loop(&self) {
    loop {
        tokio::time::sleep(Duration::from_secs(3)).await;

        // 1. 获取当前等待玩家(可以用另一个 stream 或 ZSET)
        // 2. 当 >=2 时,生成匹配事件
        let event = MatchmakingEvent::PlayersMatched {
            player1_id: /* ... */,
            player2_id: /* ... */,
            room_id: Uuid::new_v4(),
            timestamp: Utc::now(),
        };

        let mut conn = self.redis_pool.get().await.unwrap();
        conn.xadd(
            "matchmaking:events",
            "*",
            &[("event", serde_json::to_string(&event).unwrap())],
        ).await.unwrap();
    }
}

5. 事件重放(Replay)支持

async fn replay_events(&self, from_id: Option<String>) {
    let mut conn = self.redis_pool.get().await.unwrap();

    let start = from_id.unwrap_or("0-0".to_string());

    let events: Vec<(String, Vec<(String, HashMap<String, String>)>)> = redis::cmd("XREAD")
        .arg("STREAMS").arg("matchmaking:events").arg(start)
        .query_async(&mut *conn)
        .await
        .unwrap();

    // 按顺序处理所有事件,重建状态
}

6. 最佳实践

  • Stream 修剪XTRIM matchmaking:events MAXLEN ~100000 限制大小
  • Pending 消息处理:启动时 + 周期性 Xpending + Xclaim
  • Exactly-once:结合唯一事件 ID + 幂等处理器
  • 多 Stream:按聚合根拆分(player:{id}:events, room:{id}:events
  • 监控XLEN, XINFO GROUPS, XINFO CONSUMERS
  • 高可用:Redis Sentinel 或 Redis Cluster

7. 总结

目标Pub/Sub 方案Streams 方案
分布式通知优秀好(需额外机制)
事件持久化 & 重放优秀
可靠投递可能丢失支持(ack + pending)
事件溯源不适合非常适合
复杂度较低中等

推荐

  • 纯实时通知 → 继续用 Pub/Sub
  • 需要可靠日志、审计、状态重建、断线恢复 → 迁移到 Redis Streams

如果需要完整代码(包括 pending 消息 reclaim、事件幂等处理、房间状态重建),或者想结合 Actix 做更细粒度的 actor-per-aggregate 模式,告诉我,我可以继续深入!

版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)