1use bytes::Bytes;
17use nautilus_core::UUID4;
18use nautilus_model::identifiers::TraderId;
19use serde::{Deserialize, Serialize};
20use ustr::Ustr;
21
22use crate::enums::SerializationEncoding;
23
24#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
30#[serde(default)]
31pub struct DatabaseConfig {
32 #[serde(alias = "type")]
34 pub database_type: String,
35 pub host: Option<String>,
37 pub port: Option<u16>,
39 pub username: Option<String>,
41 pub password: Option<String>,
43 pub ssl: bool,
45 pub connection_timeout: u16,
47 pub response_timeout: u16,
49 pub number_of_retries: usize,
51 pub exponent_base: u64,
53 pub max_delay: u64,
55 pub factor: u64,
57}
58
59impl Default for DatabaseConfig {
60 fn default() -> Self {
62 Self {
63 database_type: "redis".to_string(),
64 host: None,
65 port: None,
66 username: None,
67 password: None,
68 ssl: false,
69 connection_timeout: 20,
70 response_timeout: 20,
71 number_of_retries: 100,
72 exponent_base: 2,
73 max_delay: 1000,
74 factor: 2,
75 }
76 }
77}
78
79#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(default)]
82pub struct MessageBusConfig {
83 pub database: Option<DatabaseConfig>,
85 pub encoding: SerializationEncoding,
87 pub timestamps_as_iso8601: bool,
90 pub buffer_interval_ms: Option<u32>,
94 pub autotrim_mins: Option<u32>,
98 pub use_trader_prefix: bool,
100 pub use_trader_id: bool,
102 pub use_instance_id: bool,
104 pub streams_prefix: String,
106 pub stream_per_topic: bool,
109 pub external_streams: Option<Vec<String>>,
111 pub types_filter: Option<Vec<String>>,
113 pub heartbeat_interval_secs: Option<u16>,
115}
116
117impl Default for MessageBusConfig {
118 fn default() -> Self {
120 Self {
121 database: None,
122 encoding: SerializationEncoding::MsgPack,
123 timestamps_as_iso8601: false,
124 buffer_interval_ms: None,
125 autotrim_mins: None,
126 use_trader_prefix: true,
127 use_trader_id: true,
128 use_instance_id: false,
129 streams_prefix: "stream".to_string(),
130 stream_per_topic: true,
131 external_streams: None,
132 types_filter: None,
133 heartbeat_interval_secs: None,
134 }
135 }
136}
137
138pub trait MessageBusDatabaseAdapter {
145 type DatabaseType;
146
147 fn new(
151 trader_id: TraderId,
152 instance_id: UUID4,
153 config: MessageBusConfig,
154 ) -> anyhow::Result<Self::DatabaseType>;
155 fn is_closed(&self) -> bool;
156 fn publish(&self, topic: Ustr, payload: Bytes);
157 fn close(&mut self);
158}
159
160#[cfg(test)]
164mod tests {
165 use rstest::*;
166 use serde_json::json;
167
168 use super::*;
169
170 #[rstest]
171 fn test_default_database_config() {
172 let config = DatabaseConfig::default();
173 assert_eq!(config.database_type, "redis");
174 assert_eq!(config.host, None);
175 assert_eq!(config.port, None);
176 assert_eq!(config.username, None);
177 assert_eq!(config.password, None);
178 assert!(!config.ssl);
179 assert_eq!(config.connection_timeout, 20);
180 assert_eq!(config.response_timeout, 20);
181 assert_eq!(config.number_of_retries, 100);
182 assert_eq!(config.exponent_base, 2);
183 assert_eq!(config.max_delay, 1000);
184 assert_eq!(config.factor, 2);
185 }
186
187 #[rstest]
188 fn test_deserialize_database_config() {
189 let config_json = json!({
190 "type": "redis",
191 "host": "localhost",
192 "port": 6379,
193 "username": "user",
194 "password": "pass",
195 "ssl": true,
196 "connection_timeout": 30,
197 "response_timeout": 10,
198 "number_of_retries": 3,
199 "exponent_base": 2,
200 "max_delay": 10,
201 "factor": 2
202 });
203 let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
204 assert_eq!(config.database_type, "redis");
205 assert_eq!(config.host, Some("localhost".to_string()));
206 assert_eq!(config.port, Some(6379));
207 assert_eq!(config.username, Some("user".to_string()));
208 assert_eq!(config.password, Some("pass".to_string()));
209 assert!(config.ssl);
210 assert_eq!(config.connection_timeout, 30);
211 assert_eq!(config.response_timeout, 10);
212 assert_eq!(config.number_of_retries, 3);
213 assert_eq!(config.exponent_base, 2);
214 assert_eq!(config.max_delay, 10);
215 assert_eq!(config.factor, 2);
216 }
217
218 #[rstest]
219 fn test_default_message_bus_config() {
220 let config = MessageBusConfig::default();
221 assert_eq!(config.encoding, SerializationEncoding::MsgPack);
222 assert!(!config.timestamps_as_iso8601);
223 assert_eq!(config.buffer_interval_ms, None);
224 assert_eq!(config.autotrim_mins, None);
225 assert!(config.use_trader_prefix);
226 assert!(config.use_trader_id);
227 assert!(!config.use_instance_id);
228 assert_eq!(config.streams_prefix, "stream");
229 assert!(config.stream_per_topic);
230 assert_eq!(config.external_streams, None);
231 assert_eq!(config.types_filter, None);
232 }
233
234 #[rstest]
235 fn test_deserialize_message_bus_config() {
236 let config_json = json!({
237 "database": {
238 "type": "redis",
239 "host": "localhost",
240 "port": 6379,
241 "username": "user",
242 "password": "pass",
243 "ssl": true,
244 "connection_timeout": 30,
245 "response_timeout": 10,
246 "number_of_retries": 3,
247 "exponent_base": 2,
248 "max_delay": 10,
249 "factor": 2
250 },
251 "encoding": "json",
252 "timestamps_as_iso8601": true,
253 "buffer_interval_ms": 100,
254 "autotrim_mins": 60,
255 "use_trader_prefix": false,
256 "use_trader_id": false,
257 "use_instance_id": true,
258 "streams_prefix": "data_streams",
259 "stream_per_topic": false,
260 "external_streams": ["stream1", "stream2"],
261 "types_filter": ["type1", "type2"]
262 });
263 let config: MessageBusConfig = serde_json::from_value(config_json).unwrap();
264 assert_eq!(config.encoding, SerializationEncoding::Json);
265 assert!(config.timestamps_as_iso8601);
266 assert_eq!(config.buffer_interval_ms, Some(100));
267 assert_eq!(config.autotrim_mins, Some(60));
268 assert!(!config.use_trader_prefix);
269 assert!(!config.use_trader_id);
270 assert!(config.use_instance_id);
271 assert_eq!(config.streams_prefix, "data_streams");
272 assert!(!config.stream_per_topic);
273 assert_eq!(
274 config.external_streams,
275 Some(vec!["stream1".to_string(), "stream2".to_string()])
276 );
277 assert_eq!(
278 config.types_filter,
279 Some(vec!["type1".to_string(), "type2".to_string()])
280 );
281 }
282}