From f389d21f2021e9179f34bac07f18315d970a9104 Mon Sep 17 00:00:00 2001 From: trafficlunar Date: Mon, 22 Sep 2025 12:02:44 +0100 Subject: [PATCH] feat: ping/pong messages and migrate to tokio-tungstenite --- Cargo.lock | 251 ++++++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 4 +- README.md | 2 +- src/computer.rs | 100 +++++++++++++------ src/main.rs | 8 +- src/websocket.rs | 23 +++-- 6 files changed, 343 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 88a1b5b..14ab81f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "addr2line" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" + [[package]] name = "async-broadcast" version = "0.7.2" @@ -145,6 +160,21 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "backtrace" +version = "0.3.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6806a6321ec58106fea15becdad98371e28d92ccbc7c8f1b3b6dd724fe8f1002" +dependencies = [ + "addr2line", + "cfg-if 1.0.1", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets 0.52.6", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -242,10 +272,12 @@ name = "computer" version = "0.1.0" dependencies = [ "dotenvy", + "futures-util", "inputbot", "notify-rust", "sysinfo", - "tungstenite", + "tokio", + "tokio-tungstenite", "url", ] @@ -488,6 +520,44 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-core", + "futures-macro", + "futures-sink", + "futures-task", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "gcc" version = "0.3.55" @@ -513,9 +583,15 @@ dependencies = [ "cfg-if 1.0.1", "libc", "r-efi", - "wasi", + "wasi 0.14.2+wasi-0.2.4", ] +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + [[package]] name = "hashbrown" version = "0.15.4" @@ -710,6 +786,17 @@ dependencies = [ "x11", ] +[[package]] +name = "io-uring" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "046fa2d4d00aea763528b4950358d0ead425372445dc8ff86312b3c69ff7727b" +dependencies = [ + "bitflags 2.9.1", + "cfg-if 1.0.1", + "libc", +] + [[package]] name = "ioctl-sys" version = "0.5.2" @@ -759,6 +846,16 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956" +[[package]] +name = "lock_api" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.27" @@ -801,6 +898,26 @@ dependencies = [ "autocfg", ] +[[package]] +name = "miniz_oxide" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" +dependencies = [ + "adler2", +] + +[[package]] +name = "mio" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" +dependencies = [ + "libc", + "wasi 0.11.1+wasi-snapshot-preview1", + "windows-sys 0.59.0", +] + [[package]] name = "native-tls" version = "0.2.14" @@ -935,6 +1052,15 @@ dependencies = [ "objc2-core-foundation", ] +[[package]] +name = "object" +version = "0.36.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -1001,6 +1127,29 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" +[[package]] +name = "parking_lot" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70d58bf43669b5795d1576d0641cfb6fbb2057bf629506267a92807158584a13" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" +dependencies = [ + "cfg-if 1.0.1", + "libc", + "redox_syscall", + "smallvec", + "windows-targets 0.52.6", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -1013,6 +1162,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "piper" version = "0.2.4" @@ -1139,6 +1294,21 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redox_syscall" +version = "0.5.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5407465600fb0548f1442edf71dd20683c6ed326200ace4b1ef0763521bb3b77" +dependencies = [ + "bitflags 2.9.1", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" + [[package]] name = "rustix" version = "1.0.8" @@ -1167,6 +1337,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "security-framework" version = "2.11.1" @@ -1259,6 +1435,16 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "socket2" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -1403,6 +1589,61 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tokio" +version = "1.47.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038" +dependencies = [ + "backtrace", + "bytes 1.10.1", + "io-uring", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "slab", + "socket2", + "tokio-macros", + "windows-sys 0.59.0", +] + +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "489a59b6730eda1b0171fcfda8b121f4bee2b35cba8645ca35c5f7ba3eb736c1" +dependencies = [ + "futures-util", + "log", + "native-tls", + "tokio", + "tokio-native-tls", + "tungstenite", +] + [[package]] name = "toml_datetime" version = "0.6.11" @@ -1567,6 +1808,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + [[package]] name = "wasi" version = "0.14.2+wasi-0.2.4" diff --git a/Cargo.toml b/Cargo.toml index 883169c..cf087cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,8 +5,10 @@ edition = "2021" [dependencies] dotenvy = "0.15.7" +futures-util = "0.3.31" inputbot = "0.6.0" notify-rust = "4.11.3" sysinfo = "0.36.1" -tungstenite = { version = "0.27.0", features = ["native-tls"] } +tokio = { version = "1.47.1", features = ["full"] } +tokio-tungstenite = { version = "0.27.0", features = ["native-tls"] } url = "2.5.4" diff --git a/README.md b/README.md index a4bd485..3138a37 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ ExecStart=%h/Projects/computer-statistics/target/release/computer Environment="RUST_LOG=info" [Install] -WantedBy=multi-user.target +WantedBy=default.target ``` ```bash diff --git a/src/computer.rs b/src/computer.rs index c058a22..2c531d1 100644 --- a/src/computer.rs +++ b/src/computer.rs @@ -1,20 +1,24 @@ use std::{ - net::TcpStream, sync::{ atomic::{AtomicU16, Ordering}, Arc, }, thread, - time::Duration, + time::{Duration, Instant}, }; +use futures_util::{SinkExt, StreamExt}; use inputbot::{KeybdKey, MouseButton}; use sysinfo::System; -use tungstenite::{stream::MaybeTlsStream, WebSocket}; +use tokio::{net::TcpStream, time::timeout}; +use tokio_tungstenite::{ + tungstenite::{Bytes, Message}, + MaybeTlsStream, WebSocketStream, +}; use crate::websocket; -pub fn start_sending(socket: &mut WebSocket>) { +pub async fn start_sending(socket: &mut WebSocketStream>) { let mut sys = System::new(); let key_counter = Arc::new(AtomicU16::new(0)); @@ -24,7 +28,7 @@ pub fn start_sending(socket: &mut WebSocket>) { let click_counter_clone = Arc::clone(&click_counter); // Keys and clicks handler - thread::spawn(move || { + tokio::task::spawn_blocking(move || { KeybdKey::bind_all(move |_| { key_counter_clone.fetch_add(1, Ordering::SeqCst); }); @@ -39,37 +43,75 @@ pub fn start_sending(socket: &mut WebSocket>) { inputbot::handle_input_events(); }); - // Send to WebSocket every 60 seconds + let mut last_ping_sent = Instant::now() - Duration::from_secs(30); + let mut last_stats = Instant::now() - Duration::from_secs(60); + loop { - sys.refresh_cpu_usage(); - sys.refresh_memory(); + let now = Instant::now(); - let cpu_usage = sys.global_cpu_usage().floor() as u8; + // Send ping every 30 seconds + if now.duration_since(last_ping_sent) >= Duration::from_secs(30) { + last_ping_sent = now; - let total_memory = sys.total_memory(); - let used_memory = sys.used_memory(); - let memory_usage = ((used_memory as f64) / (total_memory as f64) * 100.0).floor() as u8; - - let keys = key_counter.load(Ordering::SeqCst); - let clicks = click_counter.load(Ordering::SeqCst); - - match websocket::send(socket, cpu_usage, memory_usage, keys, clicks) { - Ok(_) => { - // Reset counters after sending - key_counter.store(0, Ordering::SeqCst); - click_counter.store(0, Ordering::SeqCst); + println!("Sending ping..."); + if let Err(e) = socket.send(Message::Ping(Bytes::new())).await { + eprintln!("Failed to send ping: {}", e); + break; } - Err(e) => { - eprintln!("Failed to send WebSocket message: {}", e); - // Avoid resetting counters because we'll try to resend them after reconnection - break; // triggers reconnection in main.rs + + // Read incoming messages + match timeout(Duration::from_secs(10), socket.next()).await { + Ok(Some(msg)) => match msg { + Ok(Message::Pong(_)) => { + println!("Received pong"); + } + Ok(Message::Close(_)) => { + eprintln!("Received close, reconnecting..."); + break; + } + Ok(_) => {} // other messages + Err(e) => { + eprintln!("Error receiving message: {}, reconnecting...", e); + break; + } + }, + Ok(None) => { + // Stream ended + eprintln!("WebSocket stream ended, reconnecting..."); + break; + } + Err(_) => { + // Timed out waiting for message + eprintln!("No response received in 10 seconds, reconnecting..."); + break; + } } } - // Reset counters after sending - key_counter.store(0, Ordering::SeqCst); - click_counter.store(0, Ordering::SeqCst); + // Send stats every 60 seconds + if now.duration_since(last_stats) >= Duration::from_secs(60) { + last_stats = now; - thread::sleep(Duration::from_secs(60)); + sys.refresh_cpu_usage(); + sys.refresh_memory(); + + let cpu_usage = sys.global_cpu_usage().floor() as u8; + let total_memory = sys.total_memory(); + let used_memory = sys.used_memory(); + let memory_usage = ((used_memory as f64) / (total_memory as f64) * 100.0).floor() as u8; + + let keys = key_counter.load(Ordering::SeqCst); + let clicks = click_counter.load(Ordering::SeqCst); + + if let Err(e) = websocket::send(socket, cpu_usage, memory_usage, keys, clicks).await { + eprintln!("Failed to send statistics: {}", e); + break; + } + + key_counter.store(0, Ordering::SeqCst); + click_counter.store(0, Ordering::SeqCst); + } + + thread::sleep(Duration::from_secs(1)); } } diff --git a/src/main.rs b/src/main.rs index 92e3af2..0250680 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,19 +4,21 @@ mod computer; mod notifications; mod websocket; -fn main() -> Result<(), Box> { +#[tokio::main] +async fn main() -> Result<(), Box> { dotenvy::dotenv()?; loop { - match websocket::connect() { + match websocket::connect().await { Ok(mut socket) => { println!("WebSocket connected successfully"); // This will block until connection fails - computer::start_sending(&mut socket); + computer::start_sending(&mut socket).await; // The connection failed if code has reached here println!("WebSocket connection lost, attempting to reconnect in 10 seconds..."); + notifications::send_error_notification("Connection lost! Is server down?"); } Err(_) => { println!("Retrying connection in 10 seconds..."); diff --git a/src/websocket.rs b/src/websocket.rs index a68c095..4e034ca 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -1,13 +1,18 @@ -use std::{env, net::TcpStream}; +use std::env; -use tungstenite::{ - handshake::client::generate_key, http::Request, stream::MaybeTlsStream, Message, WebSocket, +use futures_util::SinkExt; +use tokio::net::TcpStream; +use tokio_tungstenite::{ + connect_async, + tungstenite::{handshake::client::generate_key, http::Request, Message}, + MaybeTlsStream, WebSocketStream, }; use url::Url; use crate::notifications; -pub fn connect() -> Result>, tungstenite::Error> { +pub async fn connect( +) -> Result>, tokio_tungstenite::tungstenite::Error> { let websocket_url = Url::parse(&env::var("WEBSOCKET_URL").unwrap()).expect("Invalid WebSocket URL"); let host = websocket_url.host_str().expect("Host not found in URL"); @@ -29,7 +34,7 @@ pub fn connect() -> Result>, tungstenite::Er .body(()) .unwrap(); - let (socket, _) = match tungstenite::connect(request) { + let (socket, _) = match connect_async(request).await { Ok(ws) => ws, Err(err) => { eprintln!("Unable to connect to WebSocket: {}", err); @@ -43,13 +48,13 @@ pub fn connect() -> Result>, tungstenite::Er Ok(socket) } -pub fn send( - socket: &mut WebSocket>, +pub async fn send( + socket: &mut WebSocketStream>, cpu: u8, ram: u8, keys: u16, clicks: u16, -) -> Result<(), tungstenite::Error> { +) -> Result<(), tokio_tungstenite::tungstenite::Error> { let message = format!( "{{ \"cpu\": {}, \"ram\": {}, \"keys\": {}, \"clicks\": {} }}", cpu, ram, keys, clicks @@ -57,5 +62,5 @@ pub fn send( println!("Sending to WebSocket: {}", message); - socket.send(Message::Text(message.into())) + socket.send(Message::Text(message.into())).await }