This repository has been archived on 2025-08-25. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
Alexandria/src/routes/websocket.rs

120 lines
3.8 KiB
Rust

use std::{net::SocketAddr, ops::ControlFlow, sync::Arc};
use axum::{
body::Bytes,
extract::{
ws::{Message, Utf8Bytes, WebSocket, WebSocketUpgrade},
ConnectInfo,
State
},
response::IntoResponse
};
use crate::{utils::events, AppState};
use futures_util::{sink::SinkExt, stream::StreamExt};
#[axum::debug_handler]
pub async fn ws_handler(
ws: WebSocketUpgrade,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(state): State<Arc<AppState>>
) -> impl IntoResponse {
log::debug!(target: "websocket", "{addr} connected.");
ws.on_upgrade(move |socket| handle_socket(socket, addr, state))
}
async fn handle_socket(mut socket: WebSocket, who: SocketAddr, state: Arc<AppState>) {
if socket
.send(Message::Ping(Bytes::from_static(&[4, 2])))
.await
.is_ok()
{
log::debug!(target: "websocket", "Pinged {who}...");
} else {
log::debug!(target: "websocket", "Could not send ping to {who}!");
return;
}
if let Some(msg) = socket.recv().await {
if let Ok(msg) = msg {
if process_message(msg, who).is_break() {
return;
}
} else {
log::debug!(target: "websocket", "Client {who} abruptly disconnected");
return;
}
}
let (mut sender, mut receiver) = socket.split();
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
if process_message(msg, who).is_break() {
break;
}
}
});
let mut send_task = tokio::spawn(async move {
let mut event_listener = state.event_bus.subscribe();
loop {
match event_listener.recv().await {
Err(_) => (),
Ok(event) => {
match event {
events::Event::WebsocketBroadcast(message) => {
let _ = sender.send(Message::Text(Utf8Bytes::from(message.to_json().to_string()))).await;
}
}
}
}
}
});
tokio::select! {
rv_a = (&mut send_task) => {
match rv_a {
Ok(()) => log::debug!(target: "websocket", "Sender connection with {who} gracefully shut down"),
Err(a) => log::debug!(target: "websocket", "Error sending messages {a:?}")
}
recv_task.abort();
},
rv_b = (&mut recv_task) => {
match rv_b {
Ok(()) => log::debug!(target: "websocket", "Receiver connection with {who} gracefully shut down"),
Err(b) => log::debug!(target: "websocket", "Error receiving messages {b:?}")
}
send_task.abort();
}
}
}
fn process_message(msg: Message, who: SocketAddr) -> ControlFlow<(), ()> {
match msg {
Message::Text(t) => {
log::debug!(target: "websocket", "{who} sent str: {t:?}");
}
Message::Binary(d) => {
log::debug!(target: "websocket", "{who} sent {} bytes: {d:?}", d.len());
}
Message::Close(c) => {
if let Some(cf) = c {
log::debug!(target: "websocket",
"{who} sent close with code {} and reason `{}`",
cf.code, cf.reason
);
} else {
log::debug!(target: "websocket", "{who} somehow sent close message without CloseFrame");
}
return ControlFlow::Break(());
}
Message::Pong(_v) => {
//log::debug!(target: "websocket", "{who} sent pong with {v:?}");
}
Message::Ping(_v) => {
//log::debug!(target: "websocket", "{who} sent ping with {v:?}");
}
}
ControlFlow::Continue(())
}