nautilus_demo/
websocket_server.rs1#![allow(dead_code)]
18
19use std::net::SocketAddr;
20
21use futures::{SinkExt, StreamExt};
22use tokio::{task, time::Duration};
23
24pub struct NegativeStreamServer {
25 task: tokio::task::JoinHandle<()>,
26 port: u16,
27 pub address: SocketAddr,
28}
29
30impl NegativeStreamServer {
31 pub async fn setup() -> Self {
32 let server = tokio::net::TcpListener::bind("127.0.0.1:0".to_string())
33 .await
34 .unwrap();
35 let port = server.local_addr().unwrap().port();
36 let address = server.local_addr().unwrap();
37
38 let task = task::spawn(async move {
39 let (conn, _) = server.accept().await.unwrap();
40 let websocket = tokio_tungstenite::accept_async(conn).await.unwrap();
41 let (mut sender, mut receiver) = websocket.split();
42
43 let counter = std::sync::Arc::new(std::sync::atomic::AtomicI32::new(0));
45 let counter_clone = counter.clone();
46 let counter_clone_2 = counter;
47
48 let sender_task = task::spawn(async move {
50 loop {
51 let value = counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
52 let message = tokio_tungstenite::tungstenite::protocol::Message::Text(
53 format!("{}", -value).into(),
54 );
55
56 if let Err(err) = sender.send(message).await {
57 eprintln!("Error sending message: {err}");
58 break;
59 }
60
61 tokio::time::sleep(Duration::from_secs(1)).await;
62 }
63 });
64
65 task::spawn(async move {
67 while let Some(Ok(msg)) = receiver.next().await {
68 if let tokio_tungstenite::tungstenite::protocol::Message::Text(txt) = msg {
69 if txt == "SKIP" {
70 counter_clone_2.fetch_add(5, std::sync::atomic::Ordering::SeqCst);
71 } else if txt == "STOP" {
72 break;
73 }
74 }
75 }
76
77 sender_task.abort();
79 });
80 });
81
82 Self {
83 task,
84 port,
85 address,
86 }
87 }
88}
89
90impl Drop for NegativeStreamServer {
91 fn drop(&mut self) {
92 self.task.abort();
93 }
94}