1use std::sync::Arc;
19
20use memchr::memchr;
21
22use crate::socket::TcpMessageHandler;
23
24const MIN_MESSAGE_SIZE: usize = 10; const MAX_MESSAGE_SIZE: usize = 8192; const CHECKSUM_LEN: usize = 7; const CHECKSUM_TAG: &[u8] = b"10=";
28const START_PATTERN: &[u8] = b"8=FIX";
29const START_CHAR: u8 = b'8';
30const DELIMITER: u8 = b'\x01';
31
32pub(crate) fn process_fix_buffer(buf: &mut Vec<u8>, handler: &Arc<TcpMessageHandler>) {
58 let mut processed_to = 0;
59
60 while processed_to < buf.len() {
61 if buf.len() - processed_to < MIN_MESSAGE_SIZE {
62 break;
63 }
64
65 let start_idx = memchr(START_CHAR, &buf[processed_to..]).map(|i| processed_to + i);
67 if let Some(idx) = start_idx {
68 if idx + START_PATTERN.len() <= buf.len()
69 && &buf[idx..idx + START_PATTERN.len()] == START_PATTERN
70 {
71 if let Some(end_pos) = find_message_end(&buf[idx..]) {
73 let message_end = idx + end_pos;
74 let message_len = message_end - idx;
75
76 if message_len > MAX_MESSAGE_SIZE {
78 processed_to = idx + 1;
80 continue;
81 }
82
83 let message = &buf[idx..message_end];
84 handler(message); processed_to = message_end; } else {
87 break;
89 }
90 } else {
91 processed_to = idx + 1;
93 }
94 } else {
95 buf.clear();
97 return;
98 }
99 }
100
101 if processed_to > 0 {
103 buf.drain(0..processed_to);
104 }
105}
106
107#[inline(always)]
109fn find_message_end(buf: &[u8]) -> Option<usize> {
110 let mut idx = 0;
111 while idx + CHECKSUM_LEN <= buf.len() {
112 if buf[idx..idx + CHECKSUM_LEN].starts_with(CHECKSUM_TAG)
113 && buf[idx + 3].is_ascii_digit()
114 && buf[idx + 4].is_ascii_digit()
115 && buf[idx + 5].is_ascii_digit()
116 && buf[idx + 6] == DELIMITER
117 {
118 return Some(idx + CHECKSUM_LEN);
119 }
120 idx += 1;
121 }
122 None
123}
124
125#[cfg(test)]
126mod process_fix_buffer_tests {
127 use std::sync::{Arc, Mutex};
128
129 use rstest::rstest;
130
131 use crate::{fix::process_fix_buffer, socket::TcpMessageHandler};
132
133 #[rstest]
134 fn test_process_empty_buffer() {
135 let mut buffer = Vec::new();
136 let received = Arc::new(Mutex::new(Vec::new()));
137 let received_clone = received.clone();
138
139 let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
140 received_clone.lock().unwrap().push(data.to_vec());
141 });
142
143 process_fix_buffer(&mut buffer, &handler);
144
145 assert!(received.lock().unwrap().is_empty());
147 assert!(buffer.is_empty());
148 }
149
150 #[rstest]
151 fn test_process_incomplete_message() {
152 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec();
154 let received = Arc::new(Mutex::new(Vec::new()));
155 let received_clone = received.clone();
156
157 let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
158 received_clone.lock().unwrap().push(data.to_vec());
159 });
160
161 process_fix_buffer(&mut buffer, &handler);
162
163 assert!(received.lock().unwrap().is_empty());
165 assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x01".to_vec());
167 }
168
169 #[rstest]
170 fn test_process_complete_message() {
171 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
173 let received = Arc::new(Mutex::new(Vec::new()));
174 let received_clone = received.clone();
175
176 let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
177 received_clone.lock().unwrap().push(data.to_vec());
178 });
179
180 process_fix_buffer(&mut buffer, &handler);
181
182 assert!(buffer.is_empty() || received.lock().unwrap().len() == 1);
183 }
184
185 #[rstest]
186 fn test_process_message_with_garbage_prefix() {
187 let mut buffer = b"GARBAGE8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec();
189 let received = Arc::new(Mutex::new(Vec::new()));
190 let received_clone = received.clone();
191
192 let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
193 received_clone.lock().unwrap().push(data.to_vec());
194 });
195
196 process_fix_buffer(&mut buffer, &handler);
197
198 assert!(buffer.is_empty() || received.lock().unwrap().len() == 1);
199 }
200
201 #[rstest]
202 fn test_process_partial_checksum() {
203 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec();
205 let received = Arc::new(Mutex::new(Vec::new()));
206 let received_clone = received.clone();
207
208 let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
209 received_clone.lock().unwrap().push(data.to_vec());
210 });
211
212 process_fix_buffer(&mut buffer, &handler);
213
214 assert!(received.lock().unwrap().is_empty());
216 assert_eq!(buffer, b"8=FIXT.1.1\x019=100\x0135=D\x0110=123".to_vec());
218 }
219
220 #[rstest]
221 fn test_process_multiple_messages_single_call() {
222 let mut buffer =
224 b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x018=FIXT.1.1\x019=200\x0135=D\x0110=456\x01"
225 .to_vec();
226 let received = Arc::new(Mutex::new(Vec::new()));
227 let received_clone = received.clone();
228
229 let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
230 received_clone.lock().unwrap().push(data.to_vec());
231 });
232
233 process_fix_buffer(&mut buffer, &handler);
234
235 assert_eq!(received.lock().unwrap().len(), 2);
236 assert_eq!(
237 received.lock().unwrap()[0],
238 b"8=FIXT.1.1\x019=100\x0135=D\x0110=123\x01".to_vec()
239 );
240 assert_eq!(
241 received.lock().unwrap()[1],
242 b"8=FIXT.1.1\x019=200\x0135=D\x0110=456\x01".to_vec()
243 );
244 assert!(buffer.is_empty());
245 }
246
247 #[rstest]
248 fn test_process_message_with_invalid_checksum() {
249 let mut buffer = b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec();
251 let received = Arc::new(Mutex::new(Vec::new()));
252 let received_clone = received.clone();
253
254 let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
255 received_clone.lock().unwrap().push(data.to_vec());
256 });
257
258 process_fix_buffer(&mut buffer, &handler);
259
260 assert!(received.lock().unwrap().is_empty());
262 assert_eq!(
264 buffer,
265 b"8=FIXT.1.1\x019=100\x0135=D\x0110=1X3\x01".to_vec()
266 );
267 }
268
269 #[rstest]
270 fn test_process_message_with_multiple_checksums() {
271 let mut buffer = b"8=FIX.4.4\x019=100\x0110=123\x0110=456\x01".to_vec();
272 let received = Arc::new(Mutex::new(Vec::new()));
273 let received_clone = received.clone();
274
275 let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
276 received_clone.lock().unwrap().push(data.to_vec());
277 });
278
279 process_fix_buffer(&mut buffer, &handler);
280
281 assert_eq!(received.lock().unwrap().len(), 1);
283 assert_eq!(
284 received.lock().unwrap()[0],
285 b"8=FIX.4.4\x019=100\x0110=123\x01".to_vec()
286 );
287 assert_eq!(buffer, b"10=456\x01".to_vec());
288 }
289
290 #[rstest]
291 fn test_process_large_buffer() {
292 let mut buffer = Vec::new();
293 let message = b"8=FIX.4.4\x019=100\x0135=D\x0110=123\x01";
294 for _ in 0..1000 {
295 buffer.extend_from_slice(message);
296 }
297 let received = Arc::new(Mutex::new(Vec::new()));
298 let received_clone = received.clone();
299
300 let handler: Arc<TcpMessageHandler> = Arc::new(move |data: &[u8]| {
301 received_clone.lock().unwrap().push(data.to_vec());
302 });
303
304 process_fix_buffer(&mut buffer, &handler);
305
306 assert_eq!(received.lock().unwrap().len(), 1000);
308 assert!(buffer.is_empty());
309 }
310}