nautilus_network/
fix.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Simple FIX message buffer processor.
17
18use std::sync::Arc;
19
20use memchr::memchr;
21
22use crate::socket::TcpMessageHandler;
23
24const MIN_MESSAGE_SIZE: usize = 10; // Minimum length for "8=FIX" + "10=xxx|"
25const MAX_MESSAGE_SIZE: usize = 8192; // Max message size to prevent buffer bloat
26const CHECKSUM_LEN: usize = 7; // Length of "10=xxx|"
27const CHECKSUM_TAG: &[u8] = b"10=";
28const START_PATTERN: &[u8] = b"8=FIX";
29const START_CHAR: u8 = b'8';
30const DELIMITER: u8 = b'\x01';
31
32/// Processes a mutable byte buffer containing FIX protocol messages.
33///
34/// Extracts complete messages starting with "8=FIX" (supporting various FIX versions)
35/// and ending with "10=xxx|" (where xxx is a three-digit checksum), passes them to the
36/// provided handler, and removes them from the buffer, leaving incomplete data for
37/// future processing.
38///
39/// # Assumptions
40///
41/// - Fields are delimited by SOH (`\x01`).
42/// - The checksum field is "10=xxx|" where xxx is a three-digit ASCII number.
43/// - Messages are ASCII-encoded.
44///
45/// # Behavior
46///
47/// - Uses `memchr` for efficient message start detection.
48/// - Discards malformed data up to the next potential message start.
49/// - Retains incomplete messages in the buffer for additional data.
50/// - Enforces a maximum message size to prevent buffer overflow.
51///
52/// # Warning
53///
54/// This parser is designed for basic FIX message processing and does not support all features
55/// of the FIX protocol. Notably, it lacks handling for repeating groups and other advanced
56/// structures, which may be required for full protocol compliance in complex scenarios.
57pub(crate) fn process_fix_buffer(buf: &mut Vec<u8>, handler: &Arc<TcpMessageHandler>) {
58    let mut processed_to = 0;
59
60    while processed_to < buf.len() {
61        if buf.len() - processed_to < MIN_MESSAGE_SIZE {
62            break;
63        }
64
65        // Find the potential start of a FIX message
66        let start_idx = memchr(START_CHAR, &buf[processed_to..]).map(|i| processed_to + i);
67        if let Some(idx) = start_idx {
68            if idx + START_PATTERN.len() <= buf.len()
69                && &buf[idx..idx + START_PATTERN.len()] == START_PATTERN
70            {
71                // Search for message end
72                if let Some(end_pos) = find_message_end(&buf[idx..]) {
73                    let message_end = idx + end_pos;
74                    let message_len = message_end - idx;
75
76                    // Check if message exceeds max size
77                    if message_len > MAX_MESSAGE_SIZE {
78                        // Message exceeds max size, discard up to this point
79                        processed_to = idx + 1;
80                        continue;
81                    }
82
83                    let message = &buf[idx..message_end];
84                    handler(message); // Pass complete message to handler
85                    processed_to = message_end; // Update processed position
86                } else {
87                    // Incomplete message, wait for more data
88                    break;
89                }
90            } else {
91                // Invalid start pattern, discard data up to this point
92                processed_to = idx + 1;
93            }
94        } else {
95            // No message start found in the remaining buffer, clear it to avoid garbage buildup
96            buf.clear();
97            return;
98        }
99    }
100
101    // Remove all processed data from the buffer
102    if processed_to > 0 {
103        buf.drain(0..processed_to);
104    }
105}
106
107/// Locate the end of a FIX message. Searches for "10=xxx|" where xxx is a three-digit ASCII number.
108#[inline(always)]
109fn find_message_end(buf: &[u8]) -> Option<usize> {
110    let mut idx = 0;
111    while idx + CHECKSUM_LEN <= buf.len() {
112        if buf[idx..idx + CHECKSUM_LEN].starts_with(CHECKSUM_TAG)
113            && buf[idx + 3].is_ascii_digit()
114            && buf[idx + 4].is_ascii_digit()
115            && buf[idx + 5].is_ascii_digit()
116            && buf[idx + 6] == DELIMITER
117        {
118            return Some(idx + CHECKSUM_LEN);
119        }
120        idx += 1;
121    }
122    None
123}
124
125#[cfg(test)]
126mod process_fix_buffer_tests {
127    use std::sync::{Arc, Mutex};
128
129    use rstest::rstest;
130
131    use crate::{fix::process_fix_buffer, socket::TcpMessageHandler};
132
133    #[rstest]
134    fn test_process_empty_buffer() {
135        let mut buffer = Vec::new();
136        let received = Arc::new(Mutex::new(Vec::new()));
137        let received_clone = received.clone();
138
139        let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
140            received_clone.lock().unwrap().push(data.to_vec());
141        });
142
143        process_fix_buffer(&mut buffer, &handler);
144
145        // Buffer was empty, so no messages should be processed
146        assert!(received.lock().unwrap().is_empty());
147        assert!(buffer.is_empty());
148    }
149
150    #[rstest]
151    fn test_process_incomplete_message() {
152        // A partial FIX message without end
153        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec();
154        let received = Arc::new(Mutex::new(Vec::new()));
155        let received_clone = received.clone();
156
157        let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
158            received_clone.lock().unwrap().push(data.to_vec());
159        });
160
161        process_fix_buffer(&mut buffer, &handler);
162
163        // No complete message, nothing should be processed
164        assert!(received.lock().unwrap().is_empty());
165        // Buffer should be preserved for more data
166        assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec());
167    }
168
169    #[rstest]
170    fn test_process_complete_message() {
171        // A complete FIX message
172        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
173        let received = Arc::new(Mutex::new(Vec::new()));
174        let received_clone = received.clone();
175
176        let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
177            received_clone.lock().unwrap().push(data.to_vec());
178        });
179
180        process_fix_buffer(&mut buffer, &handler);
181
182        assert!(buffer.is_empty() || received.lock().unwrap().len() == 1);
183    }
184
185    #[rstest]
186    fn test_process_message_with_garbage_prefix() {
187        // Message with garbage before the FIX header
188        let mut buffer = b"GARBAGE8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
189        let received = Arc::new(Mutex::new(Vec::new()));
190        let received_clone = received.clone();
191
192        let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
193            received_clone.lock().unwrap().push(data.to_vec());
194        });
195
196        process_fix_buffer(&mut buffer, &handler);
197
198        assert!(buffer.is_empty() || received.lock().unwrap().len() == 1);
199    }
200
201    #[rstest]
202    fn test_process_partial_checksum() {
203        // Message with partial checksum (missing the SOH)
204        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec();
205        let received = Arc::new(Mutex::new(Vec::new()));
206        let received_clone = received.clone();
207
208        let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
209            received_clone.lock().unwrap().push(data.to_vec());
210        });
211
212        process_fix_buffer(&mut buffer, &handler);
213
214        // No complete message, nothing should be processed
215        assert!(received.lock().unwrap().is_empty());
216        // Buffer should be preserved
217        assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec());
218    }
219
220    #[rstest]
221    fn test_process_multiple_messages_single_call() {
222        // Two complete messages
223        let mut buffer =
224            b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x018=FIXT.1.1\x019=200\x0135=D\x0110=456\x01"
225                .to_vec();
226        let received = Arc::new(Mutex::new(Vec::new()));
227        let received_clone = received.clone();
228
229        let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
230            received_clone.lock().unwrap().push(data.to_vec());
231        });
232
233        process_fix_buffer(&mut buffer, &handler);
234
235        assert_eq!(received.lock().unwrap().len(), 2);
236        assert_eq!(
237            received.lock().unwrap()[0],
238            b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec()
239        );
240        assert_eq!(
241            received.lock().unwrap()[1],
242            b"8=FIXT.1.1\x019=200\x0135=D\x0110=456\x01".to_vec()
243        );
244        assert!(buffer.is_empty());
245    }
246
247    #[rstest]
248    fn test_process_message_with_invalid_checksum() {
249        // Message with invalid checksum format (not 3 digits)
250        let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec();
251        let received = Arc::new(Mutex::new(Vec::new()));
252        let received_clone = received.clone();
253
254        let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
255            received_clone.lock().unwrap().push(data.to_vec());
256        });
257
258        process_fix_buffer(&mut buffer, &handler);
259
260        // No message should be processed due to invalid checksum format
261        assert!(received.lock().unwrap().is_empty());
262        // Buffer should be preserved
263        assert_eq!(
264            buffer,
265            b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec()
266        );
267    }
268
269    #[rstest]
270    fn test_process_message_with_multiple_checksums() {
271        let mut buffer = b"8=FIX.4.4\x019=100\x0110=123\x0110=456\x01".to_vec();
272        let received = Arc::new(Mutex::new(Vec::new()));
273        let received_clone = received.clone();
274
275        let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
276            received_clone.lock().unwrap().push(data.to_vec());
277        });
278
279        process_fix_buffer(&mut buffer, &handler);
280
281        // One message processed, extra data retained
282        assert_eq!(received.lock().unwrap().len(), 1);
283        assert_eq!(
284            received.lock().unwrap()[0],
285            b"8=FIX.4.4\x019=100\x0110=123\x01".to_vec()
286        );
287        assert_eq!(buffer, b"10=456\x01".to_vec());
288    }
289
290    #[rstest]
291    fn test_process_large_buffer() {
292        let mut buffer = Vec::new();
293        let message = b"8=FIX.4.4\x019=100\x0135=D\x0110=123\x01";
294        for _ in 0..1000 {
295            buffer.extend_from_slice(message);
296        }
297        let received = Arc::new(Mutex::new(Vec::new()));
298        let received_clone = received.clone();
299
300        let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
301            received_clone.lock().unwrap().push(data.to_vec());
302        });
303
304        process_fix_buffer(&mut buffer, &handler);
305
306        // 1000 messages processed, buffer empty
307        assert_eq!(received.lock().unwrap().len(), 1000);
308        assert!(buffer.is_empty());
309    }
310}