nautilus_demo/
websocket_server.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16// Under development
17#![allow(dead_code)]
18
19use std::net::SocketAddr;
20
21use futures::{SinkExt, StreamExt};
22use tokio::{task, time::Duration};
23
24pub struct NegativeStreamServer {
25    task: tokio::task::JoinHandle<()>,
26    port: u16,
27    pub address: SocketAddr,
28}
29
30impl NegativeStreamServer {
31    pub async fn setup() -> Self {
32        let server = tokio::net::TcpListener::bind("127.0.0.1:0".to_string())
33            .await
34            .unwrap();
35        let port = server.local_addr().unwrap().port();
36        let address = server.local_addr().unwrap();
37
38        let task = task::spawn(async move {
39            let (conn, _) = server.accept().await.unwrap();
40            let websocket = tokio_tungstenite::accept_async(conn).await.unwrap();
41            let (mut sender, mut receiver) = websocket.split();
42
43            // Create a counter for negative values
44            let counter = std::sync::Arc::new(std::sync::atomic::AtomicI32::new(0));
45            let counter_clone = counter.clone();
46            let counter_clone_2 = counter;
47
48            // Task to send negative numbers every second
49            let sender_task = task::spawn(async move {
50                loop {
51                    let value = counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
52                    let message = tokio_tungstenite::tungstenite::protocol::Message::Text(
53                        format!("{}", -value).into(),
54                    );
55
56                    if let Err(err) = sender.send(message).await {
57                        eprintln!("Error sending message: {err}");
58                        break;
59                    }
60
61                    tokio::time::sleep(Duration::from_secs(1)).await;
62                }
63            });
64
65            // Task to handle incoming messages
66            task::spawn(async move {
67                while let Some(Ok(msg)) = receiver.next().await {
68                    if let tokio_tungstenite::tungstenite::protocol::Message::Text(txt) = msg {
69                        if txt == "SKIP" {
70                            counter_clone_2.fetch_add(5, std::sync::atomic::Ordering::SeqCst);
71                        } else if txt == "STOP" {
72                            break;
73                        }
74                    }
75                }
76
77                // Cancel the sender task when we're done
78                sender_task.abort();
79            });
80        });
81
82        Self {
83            task,
84            port,
85            address,
86        }
87    }
88}
89
90impl Drop for NegativeStreamServer {
91    fn drop(&mut self) {
92        self.task.abort();
93    }
94}