疾风 Hyper + Axum 双剑合璧:WebSocket 风暴席卷,异步王者之争——从零实战到高负载征服

疾风 Hyper + Axum 双剑合璧:WebSocket 风暴席卷,异步王者之争——从零实战到高负载征服

Photos provided by Unsplash OR Pexels

Hyper 是 Rust 异步 HTTP 的底层引擎Axum 是其上的优雅框架。本篇全面升级

  • Hyper 原生 WebSocket 实战(低级掌控)
  • Axum 一行实现 WebSocket(高级优雅)
  • Axum vs Hyper 深度对比
  • 全代码增强注释(每行必注,适合学习/生产)
  • 高负载配置兼容 WebSocket

一、Hyper 原生 WebSocket:底层掌控,极致性能

1.1 依赖升级

# Cargo.toml
hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["full"] }
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.23"  # WebSocket 协议实现
futures-util = "0.3"

1.2 Hyper + WebSocket 完整服务器(带详细注释)

// src/main.rs
use std::convert::Infallible;
use std::net::SocketAddr;

use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response, StatusCode, Upgrade};
use hyper::body::{Bytes, Incoming};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tokio_tungstenite::tungstenite::protocol::Message;
use futures_util::{SinkExt, StreamExt};

// === 服务处理函数:HTTP + WebSocket 升级 ===
// 每个请求进入这里,判断路径并处理
async fn ws_handler(req: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
    // 只处理 /ws 路径的 WebSocket 升级请求
    if req.uri().path() == "/ws" {
        // 检查是否为 WebSocket 升级请求(必须有 upgrade: websocket 头)
        if hyper::header::HeaderValue::from_str("websocket")
            .map(|v| req.headers().get(hyper::header::UPGRADE) == Some(&v))
            .unwrap_or(false)
        {
            // 异步处理升级(spawn 新任务,避免阻塞)
            tokio::task::spawn(async move {
                match hyper::upgrade::on(req).await {
                    Ok(upgraded) => {
                        // 升级成功:将 Hyper 的 Upgraded 转为 tokio-tungstenite 的 WebSocketStream
                        let ws_stream = tokio_tungstenite::WebSocketStream::from_raw_socket(
                            TokioIo::new(upgraded),
                            tokio_tungstenite::tungstenite::protocol::Role::Server,
                            None,
                        ).await;

                        let (mut ws_sender, mut ws_receiver) = ws_stream.split();

                        // 模拟广播:收到消息后回显 + 广播给所有客户端(生产用 Channel)
                        while let Some(Ok(msg)) = ws_receiver.next().await {
                            match msg {
                                Message::Text(text) => {
                                    println!("收到:{}", text);
                                    // 回显给发送者
                                    let _ = ws_sender.send(Message::Text(format!("Echo: {}", text))).await;
                                }
                                Message::Ping(data) => {
                                    let _ = ws_sender.send(Message::Pong(data)).await;
                                }
                                Message::Close(_) => break,
                                _ => {}
                            }
                        }
                    }
                    Err(e) => eprintln!("WebSocket 升级失败:{}", e),
                }
            });

            // 返回 101 Switching Protocols
            let mut resp = Response::new(Full::new(Bytes::from("")));
            *resp.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
            resp.headers_mut().append(
                hyper::header::UPGRADE,
                hyper::header::HeaderValue::from_static("websocket"),
            );
            resp.headers_mut().append(
                hyper::header::CONNECTION,
                hyper::header::HeaderValue::from_static("Upgrade"),
            );
            Ok(resp)
        } else {
            // 非 WebSocket 请求,返回 400
            Ok(Response::builder()
                .status(StatusCode::BAD_REQUEST)
                .body(Full::new(Bytes::from("WebSocket 升级失败"))).unwrap())
        }
    } else {
        // 普通 HTTP 响应
        Ok(Response::new(Full::new(Bytes::from("Hyper HTTP Server 🌪️\n访问 /ws 连接 WebSocket"))))
    }
}

// === 主函数:启动服务器 ===
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    let listener = TcpListener::bind(&addr).await?;
    println!("Hyper WebSocket 服务器启动:http://{}", addr);
    println!("  - HTTP: curl http://{}");
    println!("  - WS : wscat -c ws://{}/ws", addr, addr);

    loop {
        let (stream, _) = listener.accept().await?;
        let io = TokioIo::new(stream);

        tokio::task::spawn(async move {
            // 使用 http1 协议(Hyper 支持 HTTP/2 需额外配置)
            if let Err(err) = http1::Builder::new()
                .serve_connection(io, service_fn(ws_handler))
                .await
            {
                eprintln!("连接错误:{:?}", err);
            }
        });
    }
}

1.3 测试 WebSocket

# 安装 wscat
npm install -g wscat

# 连接
wscat -c ws://127.0.0.1:3000/ws
> hello
< Echo: hello

二、Axum 框架:一行 WebSocket,优雅至极

2.1 Axum 依赖(推荐生产)

axum = { version = "0.7", features = ["ws"] }
tokio = { version = "1", features = ["full"] }
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.5", features = ["trace"] }

2.2 Axum WebSocket 一行实现(带完整注释)

// src/main_axum.rs
use axum::{
    routing::get,
    Router,
    extract::WebSocketUpgrade,
    response::IntoResponse,
    ws::{WebSocket, Message},
};
use std::net::SocketAddr;
use tower_http::trace::TraceLayer;
use futures_util::{SinkExt, StreamExt};

// === WebSocket 处理函数:一行升级,优雅流式处理 ===
async fn ws_handler(
    ws: WebSocketUpgrade,  // Axum 自动提取升级请求
) -> impl IntoResponse {
    // 升级并处理
    ws.on_upgrade(|socket| async move {
        let (mut sender, mut receiver) = socket.split();

        while let Some(Ok(msg)) = receiver.next().await {
            match msg {
                Message::Text(text) => {
                    println!("Axum 收到:{}", text);
                    let _ = sender.send(Message::Text(format!("Axum Echo: {}", text))).await;
                }
                Message::Ping(data) => {
                    let _ = sender.send(Message::Pong(data)).await;
                }
                Message::Close(_) => break,
                _ => {}
            }
        }
    })
}

// === HTTP 路由:根路径提示 ===
async fn root() -> &'static str {
    "Axum WebSocket 服务器\n访问 /ws 连接 WebSocket"
}

#[tokio::main]
async fn main() {
    // 构建路由
    let app = Router::new()
        .route("/", get(root))
        .route("/ws", get(ws_handler))
        .layer(TraceLayer::new_for_http());  // 自动日志

    let addr = SocketAddr::from(([127, 0, 0, 1], 3001));
    println!("Axum WebSocket 启动:http://{}", addr);

    // Axum 自动使用 Tokio runtime
    let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

三、Axum vs Hyper 深度对比

维度Hyper(原生)Axum(框架)
学习曲线陡峭(需手动处理 upgrade、IO)极简(一行升级,自动路由)
代码量50+ 行 WebSocket5 行 搞定
灵活性完全掌控(自定义协议、连接池)受限但足够(扩展用 Extractor)
性能最高(零抽象)接近 Hyper(<5% 开销)
生态底层库,需搭配 tungstenite完整框架(路由、中间件、JSON)
生产推荐极致性能 / 自定义协议99% 场景首选
调试需手动日志内置 TraceLayer、自动错误处理

结论Hyper = 火箭引擎Axum = 智能驾驶
生产选 Axum,研究选 Hyper


四、高负载配置升级:支持 WebSocket + 多协议

4.1 使用 hyper-utilauto 模式(HTTP/1 + HTTP/2 + WebSocket)

hyper-util = { version = "0.1", features = ["full", "server-auto"] }

4.2 终极高负载 WebSocket 服务器(Axum + Hyper 底层)

// 使用 Axum + hyper-util auto(支持 HTTP/2 + WebSocket)
use hyper_util::server::conn::auto::Builder as AutoConn;
use hyper_util::rt::TokioExecutor;

let executor = TokioExecutor::new();
let make_service = app.into_make_service();

// 在 loop 中
let io = TokioIo::new(stream);
tokio::task::spawn(async move {
    let _ = AutoConn::new(executor.clone())
        .serve_connection(io, make_service)
        .await;
});

五、诗意收尾

Hyper 如疾风,Axum 似流云
WebSocket 穿梭,异步舞乾坤
一行代码起,十万连接存
Rust 铸不朽,网络任我尊


六、完整项目结构

hyper-axum-ws/
├── Cargo.toml
├── config.toml
├── src/
│   ├── main_hyper.rs       # Hyper 原生 WebSocket
│   ├── main_axum.rs        # Axum 优雅实现
│   └── main_highload.rs    # 高负载配置版
└── test_ws.html            # 前端测试页面

下一步

  • 加入 Redis Pub/Sub 实现广播
  • 使用 SQLx 持久化消息
  • 部署 Kubernetes + HPA 自动扩容

你已掌握 Rust Web 异步双雄下一站:百万 QPS

版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)