feat: ping/pong messages and migrate to tokio-tungstenite

This commit is contained in:
trafficlunar 2025-09-22 12:02:44 +01:00
parent 5d2bd380ec
commit f389d21f20
6 changed files with 343 additions and 45 deletions

View file

@ -1,20 +1,24 @@
use std::{
net::TcpStream,
sync::{
atomic::{AtomicU16, Ordering},
Arc,
},
thread,
time::Duration,
time::{Duration, Instant},
};
use futures_util::{SinkExt, StreamExt};
use inputbot::{KeybdKey, MouseButton};
use sysinfo::System;
use tungstenite::{stream::MaybeTlsStream, WebSocket};
use tokio::{net::TcpStream, time::timeout};
use tokio_tungstenite::{
tungstenite::{Bytes, Message},
MaybeTlsStream, WebSocketStream,
};
use crate::websocket;
pub fn start_sending(socket: &mut WebSocket<MaybeTlsStream<TcpStream>>) {
pub async fn start_sending(socket: &mut WebSocketStream<MaybeTlsStream<TcpStream>>) {
let mut sys = System::new();
let key_counter = Arc::new(AtomicU16::new(0));
@ -24,7 +28,7 @@ pub fn start_sending(socket: &mut WebSocket<MaybeTlsStream<TcpStream>>) {
let click_counter_clone = Arc::clone(&click_counter);
// Keys and clicks handler
thread::spawn(move || {
tokio::task::spawn_blocking(move || {
KeybdKey::bind_all(move |_| {
key_counter_clone.fetch_add(1, Ordering::SeqCst);
});
@ -39,37 +43,75 @@ pub fn start_sending(socket: &mut WebSocket<MaybeTlsStream<TcpStream>>) {
inputbot::handle_input_events();
});
// Send to WebSocket every 60 seconds
let mut last_ping_sent = Instant::now() - Duration::from_secs(30);
let mut last_stats = Instant::now() - Duration::from_secs(60);
loop {
sys.refresh_cpu_usage();
sys.refresh_memory();
let now = Instant::now();
let cpu_usage = sys.global_cpu_usage().floor() as u8;
// Send ping every 30 seconds
if now.duration_since(last_ping_sent) >= Duration::from_secs(30) {
last_ping_sent = now;
let total_memory = sys.total_memory();
let used_memory = sys.used_memory();
let memory_usage = ((used_memory as f64) / (total_memory as f64) * 100.0).floor() as u8;
let keys = key_counter.load(Ordering::SeqCst);
let clicks = click_counter.load(Ordering::SeqCst);
match websocket::send(socket, cpu_usage, memory_usage, keys, clicks) {
Ok(_) => {
// Reset counters after sending
key_counter.store(0, Ordering::SeqCst);
click_counter.store(0, Ordering::SeqCst);
println!("Sending ping...");
if let Err(e) = socket.send(Message::Ping(Bytes::new())).await {
eprintln!("Failed to send ping: {}", e);
break;
}
Err(e) => {
eprintln!("Failed to send WebSocket message: {}", e);
// Avoid resetting counters because we'll try to resend them after reconnection
break; // triggers reconnection in main.rs
// Read incoming messages
match timeout(Duration::from_secs(10), socket.next()).await {
Ok(Some(msg)) => match msg {
Ok(Message::Pong(_)) => {
println!("Received pong");
}
Ok(Message::Close(_)) => {
eprintln!("Received close, reconnecting...");
break;
}
Ok(_) => {} // other messages
Err(e) => {
eprintln!("Error receiving message: {}, reconnecting...", e);
break;
}
},
Ok(None) => {
// Stream ended
eprintln!("WebSocket stream ended, reconnecting...");
break;
}
Err(_) => {
// Timed out waiting for message
eprintln!("No response received in 10 seconds, reconnecting...");
break;
}
}
}
// Reset counters after sending
key_counter.store(0, Ordering::SeqCst);
click_counter.store(0, Ordering::SeqCst);
// Send stats every 60 seconds
if now.duration_since(last_stats) >= Duration::from_secs(60) {
last_stats = now;
thread::sleep(Duration::from_secs(60));
sys.refresh_cpu_usage();
sys.refresh_memory();
let cpu_usage = sys.global_cpu_usage().floor() as u8;
let total_memory = sys.total_memory();
let used_memory = sys.used_memory();
let memory_usage = ((used_memory as f64) / (total_memory as f64) * 100.0).floor() as u8;
let keys = key_counter.load(Ordering::SeqCst);
let clicks = click_counter.load(Ordering::SeqCst);
if let Err(e) = websocket::send(socket, cpu_usage, memory_usage, keys, clicks).await {
eprintln!("Failed to send statistics: {}", e);
break;
}
key_counter.store(0, Ordering::SeqCst);
click_counter.store(0, Ordering::SeqCst);
}
thread::sleep(Duration::from_secs(1));
}
}

View file

@ -4,19 +4,21 @@ mod computer;
mod notifications;
mod websocket;
fn main() -> Result<(), Box<dyn Error>> {
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
dotenvy::dotenv()?;
loop {
match websocket::connect() {
match websocket::connect().await {
Ok(mut socket) => {
println!("WebSocket connected successfully");
// This will block until connection fails
computer::start_sending(&mut socket);
computer::start_sending(&mut socket).await;
// The connection failed if code has reached here
println!("WebSocket connection lost, attempting to reconnect in 10 seconds...");
notifications::send_error_notification("Connection lost! Is server down?");
}
Err(_) => {
println!("Retrying connection in 10 seconds...");

View file

@ -1,13 +1,18 @@
use std::{env, net::TcpStream};
use std::env;
use tungstenite::{
handshake::client::generate_key, http::Request, stream::MaybeTlsStream, Message, WebSocket,
use futures_util::SinkExt;
use tokio::net::TcpStream;
use tokio_tungstenite::{
connect_async,
tungstenite::{handshake::client::generate_key, http::Request, Message},
MaybeTlsStream, WebSocketStream,
};
use url::Url;
use crate::notifications;
pub fn connect() -> Result<WebSocket<MaybeTlsStream<TcpStream>>, tungstenite::Error> {
pub async fn connect(
) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, tokio_tungstenite::tungstenite::Error> {
let websocket_url =
Url::parse(&env::var("WEBSOCKET_URL").unwrap()).expect("Invalid WebSocket URL");
let host = websocket_url.host_str().expect("Host not found in URL");
@ -29,7 +34,7 @@ pub fn connect() -> Result<WebSocket<MaybeTlsStream<TcpStream>>, tungstenite::Er
.body(())
.unwrap();
let (socket, _) = match tungstenite::connect(request) {
let (socket, _) = match connect_async(request).await {
Ok(ws) => ws,
Err(err) => {
eprintln!("Unable to connect to WebSocket: {}", err);
@ -43,13 +48,13 @@ pub fn connect() -> Result<WebSocket<MaybeTlsStream<TcpStream>>, tungstenite::Er
Ok(socket)
}
pub fn send(
socket: &mut WebSocket<MaybeTlsStream<TcpStream>>,
pub async fn send(
socket: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
cpu: u8,
ram: u8,
keys: u16,
clicks: u16,
) -> Result<(), tungstenite::Error> {
) -> Result<(), tokio_tungstenite::tungstenite::Error> {
let message = format!(
"{{ \"cpu\": {}, \"ram\": {}, \"keys\": {}, \"clicks\": {} }}",
cpu, ram, keys, clicks
@ -57,5 +62,5 @@ pub fn send(
println!("Sending to WebSocket: {}", message);
socket.send(Message::Text(message.into()))
socket.send(Message::Text(message.into())).await
}