nautilus_common/msgbus/
database.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Posei Systems Pty Ltd. All rights reserved.
3//  https://poseitrader.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use bytes::Bytes;
17use nautilus_core::UUID4;
18use nautilus_model::identifiers::TraderId;
19use serde::{Deserialize, Serialize};
20use ustr::Ustr;
21
22use crate::enums::SerializationEncoding;
23
24/// Configuration for database connections.
25///
26/// # Notes
27///
28/// If `database_type` is `"redis"`, it requires Redis version 6.2 or higher for correct operation.
29#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
30#[serde(default)]
31pub struct DatabaseConfig {
32    /// The database type.
33    #[serde(alias = "type")]
34    pub database_type: String,
35    /// The database host address. If `None`, the typical default should be used.
36    pub host: Option<String>,
37    /// The database port. If `None`, the typical default should be used.
38    pub port: Option<u16>,
39    /// The account username for the database connection.
40    pub username: Option<String>,
41    /// The account password for the database connection.
42    pub password: Option<String>,
43    /// If the database should use an SSL-enabled connection.
44    pub ssl: bool,
45    /// The timeout (in seconds) to wait for a new connection.
46    pub connection_timeout: u16,
47    /// The timeout (in seconds) to wait for a response.
48    pub response_timeout: u16,
49    /// The number of retry attempts with exponential backoff for connection attempts.
50    pub number_of_retries: usize,
51    /// The base value for exponential backoff calculation.
52    pub exponent_base: u64,
53    /// The maximum delay between retry attempts (in seconds).
54    pub max_delay: u64,
55    /// The multiplication factor for retry delay calculation.
56    pub factor: u64,
57}
58
59impl Default for DatabaseConfig {
60    /// Creates a new default [`DatabaseConfig`] instance.
61    fn default() -> Self {
62        Self {
63            database_type: "redis".to_string(),
64            host: None,
65            port: None,
66            username: None,
67            password: None,
68            ssl: false,
69            connection_timeout: 20,
70            response_timeout: 20,
71            number_of_retries: 100,
72            exponent_base: 2,
73            max_delay: 1000,
74            factor: 2,
75        }
76    }
77}
78
79/// Configuration for `MessageBus` instances.
80#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
81#[serde(default)]
82pub struct MessageBusConfig {
83    /// The configuration for the message bus backing database.
84    pub database: Option<DatabaseConfig>,
85    /// The encoding for database operations, controls the type of serializer used.
86    pub encoding: SerializationEncoding,
87    /// If timestamps should be persisted as ISO 8601 strings.
88    /// If `false`, then timestamps will be persisted as UNIX nanoseconds.
89    pub timestamps_as_iso8601: bool,
90    /// The buffer interval (milliseconds) between pipelined/batched transactions.
91    /// The recommended range if using buffered pipelining is [10, 1000] milliseconds,
92    /// with a good compromise being 100 milliseconds.
93    pub buffer_interval_ms: Option<u32>,
94    /// The lookback window in minutes for automatic stream trimming.
95    /// The actual window may extend up to one minute beyond the specified value since streams are trimmed at most once every minute.
96    /// This feature requires Redis version 6.2 or higher; otherwise, it will result in a command syntax error.
97    pub autotrim_mins: Option<u32>,
98    /// If a 'trader-' prefix is used for stream names.
99    pub use_trader_prefix: bool,
100    /// If the trader's ID is used for stream names.
101    pub use_trader_id: bool,
102    /// If the trader's instance ID is used for stream names. Default is `false`.
103    pub use_instance_id: bool,
104    /// The prefix for externally published stream names. Must have a `database` config.
105    pub streams_prefix: String,
106    /// If `true`, messages will be written to separate streams per topic.
107    /// If `false`, all messages will be written to the same stream.
108    pub stream_per_topic: bool,
109    /// The external stream keys the message bus will listen to for publishing deserialized message payloads internally.
110    pub external_streams: Option<Vec<String>>,
111    /// A list of serializable types **not** to publish externally.
112    pub types_filter: Option<Vec<String>>,
113    /// The heartbeat interval (seconds).
114    pub heartbeat_interval_secs: Option<u16>,
115}
116
117impl Default for MessageBusConfig {
118    /// Creates a new default [`MessageBusConfig`] instance.
119    fn default() -> Self {
120        Self {
121            database: None,
122            encoding: SerializationEncoding::MsgPack,
123            timestamps_as_iso8601: false,
124            buffer_interval_ms: None,
125            autotrim_mins: None,
126            use_trader_prefix: true,
127            use_trader_id: true,
128            use_instance_id: false,
129            streams_prefix: "stream".to_string(),
130            stream_per_topic: true,
131            external_streams: None,
132            types_filter: None,
133            heartbeat_interval_secs: None,
134        }
135    }
136}
137
138/// A generic message bus database facade.
139///
140/// The main operations take a consistent `key` and `payload` which should provide enough
141/// information to implement the message bus database in many different technologies.
142///
143/// Delete operations may need a `payload` to target specific values.
144pub trait MessageBusDatabaseAdapter {
145    type DatabaseType;
146
147    /// # Errors
148    ///
149    /// Returns an error if initializing the database connection fails.
150    fn new(
151        trader_id: TraderId,
152        instance_id: UUID4,
153        config: MessageBusConfig,
154    ) -> anyhow::Result<Self::DatabaseType>;
155    fn is_closed(&self) -> bool;
156    fn publish(&self, topic: Ustr, payload: Bytes);
157    fn close(&mut self);
158}
159
160////////////////////////////////////////////////////////////////////////////////
161// Tests
162////////////////////////////////////////////////////////////////////////////////
163#[cfg(test)]
164mod tests {
165    use rstest::*;
166    use serde_json::json;
167
168    use super::*;
169
170    #[rstest]
171    fn test_default_database_config() {
172        let config = DatabaseConfig::default();
173        assert_eq!(config.database_type, "redis");
174        assert_eq!(config.host, None);
175        assert_eq!(config.port, None);
176        assert_eq!(config.username, None);
177        assert_eq!(config.password, None);
178        assert!(!config.ssl);
179        assert_eq!(config.connection_timeout, 20);
180        assert_eq!(config.response_timeout, 20);
181        assert_eq!(config.number_of_retries, 100);
182        assert_eq!(config.exponent_base, 2);
183        assert_eq!(config.max_delay, 1000);
184        assert_eq!(config.factor, 2);
185    }
186
187    #[rstest]
188    fn test_deserialize_database_config() {
189        let config_json = json!({
190            "type": "redis",
191            "host": "localhost",
192            "port": 6379,
193            "username": "user",
194            "password": "pass",
195            "ssl": true,
196            "connection_timeout": 30,
197            "response_timeout": 10,
198            "number_of_retries": 3,
199            "exponent_base": 2,
200            "max_delay": 10,
201            "factor": 2
202        });
203        let config: DatabaseConfig = serde_json::from_value(config_json).unwrap();
204        assert_eq!(config.database_type, "redis");
205        assert_eq!(config.host, Some("localhost".to_string()));
206        assert_eq!(config.port, Some(6379));
207        assert_eq!(config.username, Some("user".to_string()));
208        assert_eq!(config.password, Some("pass".to_string()));
209        assert!(config.ssl);
210        assert_eq!(config.connection_timeout, 30);
211        assert_eq!(config.response_timeout, 10);
212        assert_eq!(config.number_of_retries, 3);
213        assert_eq!(config.exponent_base, 2);
214        assert_eq!(config.max_delay, 10);
215        assert_eq!(config.factor, 2);
216    }
217
218    #[rstest]
219    fn test_default_message_bus_config() {
220        let config = MessageBusConfig::default();
221        assert_eq!(config.encoding, SerializationEncoding::MsgPack);
222        assert!(!config.timestamps_as_iso8601);
223        assert_eq!(config.buffer_interval_ms, None);
224        assert_eq!(config.autotrim_mins, None);
225        assert!(config.use_trader_prefix);
226        assert!(config.use_trader_id);
227        assert!(!config.use_instance_id);
228        assert_eq!(config.streams_prefix, "stream");
229        assert!(config.stream_per_topic);
230        assert_eq!(config.external_streams, None);
231        assert_eq!(config.types_filter, None);
232    }
233
234    #[rstest]
235    fn test_deserialize_message_bus_config() {
236        let config_json = json!({
237            "database": {
238                "type": "redis",
239                "host": "localhost",
240                "port": 6379,
241                "username": "user",
242                "password": "pass",
243                "ssl": true,
244                "connection_timeout": 30,
245                "response_timeout": 10,
246                "number_of_retries": 3,
247                "exponent_base": 2,
248                "max_delay": 10,
249                "factor": 2
250            },
251            "encoding": "json",
252            "timestamps_as_iso8601": true,
253            "buffer_interval_ms": 100,
254            "autotrim_mins": 60,
255            "use_trader_prefix": false,
256            "use_trader_id": false,
257            "use_instance_id": true,
258            "streams_prefix": "data_streams",
259            "stream_per_topic": false,
260            "external_streams": ["stream1", "stream2"],
261            "types_filter": ["type1", "type2"]
262        });
263        let config: MessageBusConfig = serde_json::from_value(config_json).unwrap();
264        assert_eq!(config.encoding, SerializationEncoding::Json);
265        assert!(config.timestamps_as_iso8601);
266        assert_eq!(config.buffer_interval_ms, Some(100));
267        assert_eq!(config.autotrim_mins, Some(60));
268        assert!(!config.use_trader_prefix);
269        assert!(!config.use_trader_id);
270        assert!(config.use_instance_id);
271        assert_eq!(config.streams_prefix, "data_streams");
272        assert!(!config.stream_per_topic);
273        assert_eq!(
274            config.external_streams,
275            Some(vec!["stream1".to_string(), "stream2".to_string()])
276        );
277        assert_eq!(
278            config.types_filter,
279            Some(vec!["type1".to_string(), "type2".to_string()])
280        );
281    }
282}