Persistence
The persistence subpackage handles data storage and retrieval, mainly to support backtesting.
class BaseDataCatalog
Bases: ABC
Provides a abstract base class for a queryable data catalog.
abstractmethod classmethod from_env() → BaseDataCatalog
abstractmethod classmethod from_uri(uri: str, storage_options: dict[str, str] | None = None) → BaseDataCatalog
instruments(instrument_type: type | None = None, instrument_ids: list[str] | None = None, **kwargs: Any) → list[Instrument]
instrument_status(instrument_ids: list[str] | None = None, **kwargs: Any) → list[InstrumentStatus]
instrument_closes(instrument_ids: list[str] | None = None, **kwargs: Any) → list[InstrumentClose]
order_book_deltas(instrument_ids: list[str] | None = None, batched: bool = False, **kwargs: Any) → list[OrderBookDelta] | list[OrderBookDeltas]
order_book_depth10(instrument_ids: list[str] | None = None, **kwargs: Any) → list[OrderBookDepth10]
quote_ticks(instrument_ids: list[str] | None = None, **kwargs: Any) → list[QuoteTick]
trade_ticks(instrument_ids: list[str] | None = None, **kwargs: Any) → list[TradeTick]
bars(bar_types: list[str] | None = None, instrument_ids: list[str] | None = None, **kwargs: Any) → list[Bar]
custom_data(cls: type, instrument_ids: list[str] | None = None, as_nautilus: bool = False, metadata: dict | None = None, **kwargs: Any) → list[CustomData]
abstractmethod query(data_cls: type, identifiers: list[str] | None = None, **kwargs: Any) → list[Data]
abstractmethod query_last_timestamp(data_cls: type, identifier: str | None = None) → Timestamp | None
abstractmethod list_data_types() → list[str]
list_generic_data_types() → list[str]
abstractmethod list_backtest_runs() → list[str]
abstractmethod list_live_runs() → list[str]
abstractmethod read_live_run(instance_id: str, **kwargs: Any) → list[str]
abstractmethod read_backtest(instance_id: str, **kwargs: Any) → list[str]
class FeatherFile
Bases: NamedTuple
FeatherFile(path, class_name)
path : str
Alias for field number 0
class_name : str
Alias for field number 1
count(value,)
Return number of occurrences of value.
index(value, start=0, stop=9223372036854775807,)
Return first index of value.
Raises ValueError if the value is not present.
class ParquetDataCatalog
Bases: BaseDataCatalog
Provides a queryable data catalog persisted to files in Parquet (Arrow) format.
- Parameters:
- path (PathLike *[*str ] | str) – The root path for this data catalog. Must exist and must be an absolute path.
- fs_protocol (str , default 'file') – The filesystem protocol used by fsspec to handle file operations. This determines how the data catalog interacts with storage, be it local filesystem, cloud storage, or others. Common protocols include ‘file’ for local storage, ‘s3’ for Amazon S3, and ‘gcs’ for Google Cloud Storage. If not provided, it defaults to ‘file’, meaning the catalog operates on the local filesystem.
- fs_storage_options (dict , optional) – The fs storage options.
- max_rows_per_group (int , default 5000) – The maximum number of rows per group. If the value is greater than 0, then the dataset writer may split up large incoming batches into multiple row groups.
- show_query_paths (bool , default False) – If globed query paths should be printed to stdout.
WARNING
The data catalog is not threadsafe. Using it in a multithreaded environment can lead to unexpected behavior.
classmethod from_env() → ParquetDataCatalog
Create a data catalog instance by accessing the ‘NAUTILUS_PATH’ environment variable.
- Return type: ParquetDataCatalog
- Raises: OSError – If the ‘NAUTILUS_PATH’ environment variable is not set.
classmethod from_uri(uri: str, storage_options: dict[str, str] | None = None) → ParquetDataCatalog
Create a data catalog instance from the given uri with optional storage options.
- Parameters:
- uri (str) – The URI string for the backing path.
- storage_options (dict *[*str , str ] , optional) – Storage-specific configuration options. For S3: endpoint_url, region, access_key_id, secret_access_key, session_token, etc. For GCS: service_account_path, service_account_key, project_id, etc. For Azure: account_name, account_key, sas_token, etc.
- Return type: ParquetDataCatalog
write_data(data: list[Data | Event] | list[OrderBookDelta | OrderBookDepth10 | QuoteTick | TradeTick | Bar], start: int | None = None, end: int | None = None, skip_disjoint_check: bool = False) → None
Write the given data to the catalog.
The function categorizes the data based on their class name and, when applicable, their associated instrument ID. It then delegates the actual writing process to the write_chunk method.
- Parameters:
- data (list [Data | Event ]) – The data or event objects to be written to the catalog.
- start (int , optional) – The start timestamp for the data chunk.
- end (int , optional) – The end timestamp for the data chunk.
- skip_disjoint_check (bool , default False) – If True, skip the disjoint intervals check.
WARNING
Any existing data which already exists under a filename will be overwritten.
- Raises: ValueError – If data of the same type is not monotonically increasing (or non-decreasing) based on ts_init.
extend_file_name(data_cls: type[Data], identifier: str | None = None, start: int | None = None, end: int | None = None)
Extend the timestamp range of an existing parquet file by renaming it.
This method looks for parquet files that are adjacent to the specified timestamp range and renames them to include the new range. It’s useful for extending existing files without having to rewrite them when a query returns an empty list.
- Parameters:
- data_cls (type [Data ]) – The data class type to extend files for.
- identifier (str , optional) – The instrument ID to filter files by. If None, applies to all instruments.
- start (int , optional) – The start timestamp (nanoseconds) of the new range.
- end (int , optional) – The end timestamp (nanoseconds) of the new range.
reset_catalog_file_names() → None
Reset the filenames of all parquet files in the catalog to match their actual content timestamps.
This method identifies all leaf directories in the catalog that contain parquet files and resets their filenames to accurately reflect the minimum and maximum timestamps of the data they contain. It does this by examining the parquet metadata for each file and renaming the file to follow the pattern ‘{first_timestamp}-{last_timestamp}.parquet’.
This is useful when file names may have become inconsistent with their content, for example after manual file operations or data corruption. It ensures that query operations that rely on filename-based filtering will work correctly.
reset_data_file_names(data_cls: type, identifier: str | None = None) → None
Reset the filenames of parquet files for a specific data class and instrument ID.
This method resets the filenames of parquet files for the specified data class and instrument ID to accurately reflect the minimum and maximum timestamps of the data they contain. It examines the parquet metadata for each file and renames the file to follow the pattern ‘{first_timestamp}-{last_timestamp}.parquet’.
- Parameters:
- data_cls (type) – The data class type to reset filenames for (e.g., QuoteTick, TradeTick, Bar).
- identifier (str , optional) – The specific instrument ID to reset filenames for. If None, resets filenames for all instruments of the specified data class.
consolidate_catalog(start: int | str | float | None = None, end: int | str | float | None = None, ensure_contiguous_files: bool = True) → None
Consolidate all parquet files across the entire catalog within the specified time range.
This method identifies all leaf directories in the catalog that contain parquet files and consolidates them. A leaf directory is one that contains files but no subdirectories. This is a convenience method that effectively calls consolidate_data for all data types and instrument IDs in the catalog.
- Parameters:
- start (TimestampLike , optional) – The start timestamp for the consolidation range. Only files with timestamps greater than or equal to this value will be consolidated. If None, all files from the beginning of time will be considered.
- end (TimestampLike , optional) – The end timestamp for the consolidation range. Only files with timestamps less than or equal to this value will be consolidated. If None, all files up to the end of time will be considered.
- ensure_contiguous_files (bool , default True) – If True, ensures that files have contiguous timestamps before consolidation.
consolidate_data(data_cls: type, identifier: str | None = None, start: int | str | float | None = None, end: int | str | float | None = None, ensure_contiguous_files: bool = True) → None
Consolidate multiple parquet files for a specific data class and instrument ID into a single file.
This method identifies all parquet files within the specified time range for the given data class and instrument ID, then combines them into a single parquet file. This helps improve query performance and reduces storage overhead by eliminating small fragmented files.
- Parameters:
- data_cls (type) – The data class type to consolidate (e.g., QuoteTick, TradeTick, Bar).
- identifier (str , optional) – The specific instrument ID to consolidate data for. If None, consolidates data for all instruments of the specified data class.
- start (TimestampLike , optional) – The start timestamp for the consolidation range. Only files with timestamps greater than or equal to this value will be consolidated. If None, all files from the beginning of time will be considered.
- end (TimestampLike , optional) – The end timestamp for the consolidation range. Only files with timestamps less than or equal to this value will be consolidated. If None, all files up to the end of time will be considered.
- ensure_contiguous_files (bool , default True) – If True, ensures that files have contiguous timestamps before consolidation.
consolidate_catalog_by_period(period: Timedelta = Timedelta('1 days 00:00:00'), start: int | str | float | None = None, end: int | str | float | None = None, ensure_contiguous_files: bool = True) → None
Consolidate all parquet files across the entire catalog by splitting them into fixed time periods.
This method identifies all leaf directories in the catalog that contain parquet files and consolidates them by period. A leaf directory is one that contains files but no subdirectories. This is a convenience method that effectively calls consolidate_data_by_period for all data types and instrument IDs in the catalog.
- Parameters:
- period (pd.Timedelta , default pd.Timedelta *(*days=1 )) – The period duration for consolidation. Default is 1 day. Examples: pd.Timedelta(hours=1), pd.Timedelta(days=7), pd.Timedelta(minutes=30)
- start (TimestampLike , optional) – The start timestamp for the consolidation range. Only files with timestamps greater than or equal to this value will be consolidated. If None, all files from the beginning of time will be considered.
- end (TimestampLike , optional) – The end timestamp for the consolidation range. Only files with timestamps less than or equal to this value will be consolidated. If None, all files up to the end of time will be considered.
- ensure_contiguous_files (bool , default True) – If True, uses period boundaries for file naming. If False, uses actual data timestamps for file naming.
consolidate_data_by_period(data_cls: type, identifier: str | None = None, period: Timedelta = Timedelta('1 days 00:00:00'), start: int | str | float | None = None, end: int | str | float | None = None, ensure_contiguous_files: bool = True) → None
Consolidate data files by splitting them into fixed time periods.
This method queries data by period and writes consolidated files immediately, using the skip_disjoint_check parameter to avoid interval conflicts during the consolidation process. When start/end boundaries intersect existing files, the function automatically splits those files to preserve all data.
- Parameters:
- data_cls (type) – The data class type to consolidate.
- identifier (str , optional) – The instrument ID to consolidate. If None, consolidates all instruments.
- period (pd.Timedelta , default pd.Timedelta *(*days=1 )) – The period duration for consolidation. Default is 1 day. Examples: pd.Timedelta(hours=1), pd.Timedelta(days=7), pd.Timedelta(minutes=30)
- start (TimestampLike , optional) – The start timestamp for consolidation range. If None, uses earliest available data. If specified and intersects existing files, those files will be split to preserve data outside the consolidation range.
- end (TimestampLike , optional) – The end timestamp for consolidation range. If None, uses latest available data. If specified and intersects existing files, those files will be split to preserve data outside the consolidation range.
- ensure_contiguous_files (bool , default True) – If True, uses period boundaries for file naming. If False, uses actual data timestamps for file naming.
query(data_cls: type, identifiers: list[str] | None = None, start: int | str | float | None = None, end: int | str | float | None = None, where: str | None = None, files: list[str] | None = None, **kwargs: Any) → list[Data | CustomData]
Query the catalog for data matching the specified criteria.
This method retrieves data from the catalog based on the provided filters. It automatically selects the appropriate query implementation (Rust or PyArrow) based on the data class and filesystem protocol.
- Parameters:
- data_cls (type) – The data class type to query for.
- identifiers (list *[*str ] , optional) – A list of instrument IDs to filter by. If None, all instruments are included.
- start (TimestampLike , optional) – The start timestamp for the query range. If None, no lower bound is applied.
- end (TimestampLike , optional) – The end timestamp for the query range. If None, no upper bound is applied.
- where (str , optional) – An additional SQL WHERE clause to filter the data (used in Rust queries).
- files (list *[*str ] , optional) – A specific list of files to query from. If provided, these files are used instead of discovering files through the normal process.
- **kwargs (Any) – Additional keyword arguments passed to the underlying query implementation.
- Returns: A list of data objects matching the query criteria.
- Return type: list[Data | CustomData]
backend_session(data_cls: type, identifiers: list[str] | None = None, start: int | str | float | None = None, end: int | str | float | None = None, where: str | None = None, session: DataBackendSession | None = None, files: list[str] | None = None, **kwargs: Any) → DataBackendSession
Create or update a DataBackendSession for querying data using the Rust backend.
This method is used internally by the query_rust method to set up the query session. It identifies the relevant parquet files and adds them to the session with appropriate SQL queries.
-
Parameters:
- data_cls (type) – The data class type to query for.
- identifiers (list *[*str ] , optional) – A list of instrument IDs to filter by. If None, all instruments are included.
- start (TimestampLike , optional) – The start timestamp for the query range. If None, no lower bound is applied.
- end (TimestampLike , optional) – The end timestamp for the query range. If None, no upper bound is applied.
- where (str , optional) – An additional SQL WHERE clause to filter the data.
- session (DataBackendSession , optional) – An existing session to update. If None, a new session is created.
- files (list *[*str ] , optional) – A specific list of files to query from. If provided, these files are used instead of discovering files through the normal process.
- **kwargs (Any) – Additional keyword arguments.
-
Returns: The updated or newly created session.
-
Return type: DataBackendSession
-
Raises: RuntimeError – If the data class is not supported by the Rust backend.
query_last_timestamp(data_cls: type, identifier: str | None = None) → Timestamp | None
get_missing_intervals_for_request(start: int, end: int, data_cls: type, identifier: str | None = None) → list[tuple[int, int]]
Find the missing time intervals for a specific data class and instrument ID.
This method identifies the gaps in the data between the specified start and end timestamps. It’s useful for determining what data needs to be fetched or generated to complete a time series.
- Parameters:
- start (int) – The start timestamp (nanoseconds) of the request range.
- end (int) – The end timestamp (nanoseconds) of the request range.
- data_cls (type) – The data class type to check for.
- identifier (str , optional) – The instrument ID to check for. If None, checks across all instruments.
- Returns: A list of (start, end) timestamp tuples representing the missing intervals. Each tuple represents a continuous range of missing data.
- Return type: list[tuple[int, int]]
get_intervals(data_cls: type, identifier: str | None = None) → list[tuple[int, int]]
Get the time intervals covered by parquet files for a specific data class and instrument ID.
This method retrieves the timestamp ranges of all parquet files for the specified data class and instrument ID. Each parquet file in the catalog follows a naming convention of ‘{start_timestamp}-{end_timestamp}.parquet’, which this method parses to determine the available data intervals.
- Parameters:
- data_cls (type) – The data class type to get intervals for.
- identifier (str , optional) – The instrument ID to get intervals for. If None, gets intervals across all instruments for the specified data class.
- Returns: A list of (start, end) timestamp tuples representing the available data intervals. Each tuple contains the start and end timestamps (in nanoseconds) of a continuous range of data. The intervals are sorted by start time.
- Return type: list[tuple[int, int]]
list_data_types() → list[str]
List all data types available in the catalog.
- Returns:
- list[str]
- A list of data type names (as directory stems) in the catalog.
list_backtest_runs() → list[str]
List all backtest run IDs available in the catalog.
- Returns:
- list[str]
- A list of backtest run IDs (as directory stems) in the catalog.
list_live_runs() → list[str]
List all live run IDs available in the catalog.
- Returns:
- list[str]
- A list of live run IDs (as directory stems) in the catalog.
read_live_run(instance_id: str, **kwargs: Any) → list[Data]
Read data from a live run.
This method reads all data associated with a specific live run instance from feather files.
- Parameters:
- instance_id (str) – The ID of the live run instance.
- **kwargs (Any) – Additional keyword arguments passed to the underlying _read_feather method.
- Returns: A list of data objects from the live run, sorted by timestamp.
- Return type: list[Data]
read_backtest(instance_id: str, **kwargs: Any) → list[Data]
Read data from a backtest run.
This method reads all data associated with a specific backtest run instance from feather files.
- Parameters:
- instance_id (str) – The ID of the backtest run instance.
- **kwargs (Any) – Additional keyword arguments passed to the underlying _read_feather method.
- Returns: A list of data objects from the backtest run, sorted by timestamp.
- Return type: list[Data]
convert_stream_to_data(instance_id: str, data_cls: type, other_catalog: ParquetDataCatalog | None = None, subdirectory: str = 'backtest') → None
Convert stream data from feather files to parquet files.
This method reads data from feather files generated during a backtest or live run and writes it to the catalog in parquet format. It’s useful for converting temporary stream data into a more permanent and queryable format.
- Parameters:
- instance_id (str) – The ID of the backtest or live run instance.
- data_cls (type) – The data class type to convert.
- other_catalog (ParquetDataCatalog , optional) – An alternative catalog to write the data to. If None, writes to this catalog.
- subdirectory (str , default "backtest") – The subdirectory containing the feather files. Either “backtest” or “live”.
bars(bar_types: list[str] | None = None, instrument_ids: list[str] | None = None, **kwargs: Any) → list[Bar]
custom_data(cls: type, instrument_ids: list[str] | None = None, as_nautilus: bool = False, metadata: dict | None = None, **kwargs: Any) → list[CustomData]
instrument_closes(instrument_ids: list[str] | None = None, **kwargs: Any) → list[InstrumentClose]
instrument_status(instrument_ids: list[str] | None = None, **kwargs: Any) → list[InstrumentStatus]
instruments(instrument_type: type | None = None, instrument_ids: list[str] | None = None, **kwargs: Any) → list[Instrument]
list_generic_data_types() → list[str]
order_book_deltas(instrument_ids: list[str] | None = None, batched: bool = False, **kwargs: Any) → list[OrderBookDelta] | list[OrderBookDeltas]
order_book_depth10(instrument_ids: list[str] | None = None, **kwargs: Any) → list[OrderBookDepth10]
quote_ticks(instrument_ids: list[str] | None = None, **kwargs: Any) → list[QuoteTick]
trade_ticks(instrument_ids: list[str] | None = None, **kwargs: Any) → list[TradeTick]
class BarDataWrangler
Bases: object
BarDataWrangler(BarType bar_type, Instrument instrument)
Provides a means of building lists of Posei Bar objects.
- Parameters:
- bar_type (BarType) – The bar type for the wrangler.
- instrument (Instrument) – The instrument for the wrangler.
bar_type
instrument
process(self, data: pd.DataFrame, double default_volume: float = 1000000.0, int ts_init_delta: int = 0)
Process the given bar dataset into Posei Bar objects.
Expects columns [‘open’, ‘high’, ‘low’, ‘close’, ‘volume’] with ‘timestamp’ index. Note: The ‘volume’ column is optional, if one does not exist then will use the default_volume.
- Parameters:
- data (pd.DataFrame) – The data to process.
- default_volume (float) – The default volume for each bar (if not provided).
- ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Posei system.
- Return type: list[Bar]
- Raises: ValueError – If data is empty.
class OrderBookDeltaDataWrangler
Bases: object
OrderBookDeltaDataWrangler(Instrument instrument)
Provides a means of building lists of Posei OrderBookDelta objects.
- Parameters: instrument (Instrument) – The instrument for the data wrangler.
instrument
process(self, data: pd.DataFrame, int ts_init_delta: int = 0, bool is_raw=False)
Process the given order book dataset into Posei OrderBookDelta objects.
- Parameters:
- data (pd.DataFrame) – The data to process.
- ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Posei system.
- is_raw (bool , default False) – If the data is scaled to Posei fixed-point values.
- Raises: ValueError – If data is empty.
class QuoteTickDataWrangler
Bases: object
QuoteTickDataWrangler(Instrument instrument)
Provides a means of building lists of Posei QuoteTick objects.
- Parameters: instrument (Instrument) – The instrument for the data wrangler.
instrument
process(self, data: pd.DataFrame, double default_volume: float = 1000000.0, int ts_init_delta: int = 0)
Process the given tick dataset into Posei QuoteTick objects.
Expects columns [‘bid_price’, ‘ask_price’] with ‘timestamp’ index. Note: The ‘bid_size’ and ‘ask_size’ columns are optional, will then use the default_volume.
- Parameters:
- data (pd.DataFrame) – The tick data to process.
- default_volume (float) – The default volume for each tick (if not provided).
- ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Posei system. Cannot be negative.
- Return type: list[QuoteTick]
process_bar_data(self, bid_data: pd.DataFrame, ask_data: pd.DataFrame, double default_volume: float = 1000000.0, int ts_init_delta: int = 0, int offset_interval_ms: int = 100, bool timestamp_is_close: bool = True, int random_seed: int | None = None, bool is_raw: bool = False, bool sort_data: bool = True)
Process the given bar datasets into Posei QuoteTick objects.
Expects columns [‘open’, ‘high’, ‘low’, ‘close’, ‘volume’] with ‘timestamp’ index. Note: The ‘volume’ column is optional, will then use the default_volume.
- Parameters:
- bid_data (pd.DataFrame) – The bid bar data.
- ask_data (pd.DataFrame) – The ask bar data.
- default_volume (float) – The volume per tick if not available from the data.
- ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Posei system.
- offset_interval_ms (int , default 100) – The number of milliseconds to offset each tick for the bar timestamps. If timestamp_is_close then will use negative offsets, otherwise will use positive offsets (see also timestamp_is_close).
- random_seed (int , optional) – The random seed for shuffling order of high and low ticks from bar
data. If random_seed is
None
then won’t shuffle. - is_raw (bool , default False) – If the data is scaled to Posei fixed-point values.
- timestamp_is_close (bool , default True) – If bar timestamps are at the close. If True, then open, high, low timestamps are offset before the close timestamp. If False, then high, low, close timestamps are offset after the open timestamp.
- sort_data (bool , default True) – If the data should be sorted by timestamp.
class TradeTickDataWrangler
Bases: object
TradeTickDataWrangler(Instrument instrument)
Provides a means of building lists of Posei TradeTick objects.
- Parameters: instrument (Instrument) – The instrument for the data wrangler.
instrument
process(self, data: pd.DataFrame, int ts_init_delta: int = 0, bool is_raw=False)
Process the given trade tick dataset into Posei TradeTick objects.
- Parameters:
- data (pd.DataFrame) – The data to process.
- ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Posei system.
- is_raw (bool , default False) – If the data is scaled to Posei fixed-point values.
- Return type: list[TradeTick]
- Raises: ValueError – If data is empty.
process_bar_data(self, data: pd.DataFrame, int ts_init_delta: int = 0, int offset_interval_ms: int = 100, bool timestamp_is_close: bool = True, int random_seed: int | None = None, bool is_raw: bool = False, bool sort_data: bool = True)
Process the given bar datasets into Posei TradeTick objects.
Expects columns [‘open’, ‘high’, ‘low’, ‘close’, ‘volume’] with ‘timestamp’ index. Note: The ‘volume’ column is optional, will then use the default_volume.
- Parameters:
- data (pd.DataFrame) – The trade bar data.
- ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Posei system.
- offset_interval_ms (int , default 100) – The number of milliseconds to offset each tick for the bar timestamps. If timestamp_is_close then will use negative offsets, otherwise will use positive offsets (see also timestamp_is_close).
- random_seed (int , optional) – The random seed for shuffling order of high and low ticks from bar
data. If random_seed is
None
then won’t shuffle. - is_raw (bool , default False) – If the data is scaled to Posei fixed-point.
- timestamp_is_close (bool , default True) – If bar timestamps are at the close. If True, then open, high, low timestamps are offset before the close timestamp. If False, then high, low, close timestamps are offset after the open timestamp.
- sort_data (bool , default True) – If the data should be sorted by timestamp.
- Return type: list[TradeTick]
- Raises: ValueError – If data is empty.
processed_data
align_bid_ask_bar_data(bid_data: pd.DataFrame, ask_data: pd.DataFrame)
Merge bid and ask data into a single DataFrame with prefixed column names.
- Parameters:
-
bid_data (pd.DataFrame) – The DataFrame containing bid data.
-
ask_data (pd.DataFrame) – The DataFrame containing ask data.
-
Returns
-
pd.DataFrame – A merged DataFrame with columns prefixed by ‘
bid_
’ for bid data and ‘
ask_
’ for ask data, joined on their indexes.
-
calculate_bar_price_offsets(num_records, timestamp_is_close: bool, int offset_interval_ms: int, random_seed=None)
Calculate and potentially randomize the time offsets for bar prices based on the closeness of the timestamp.
- Parameters:
- num_records (int) – The number of records for which offsets are to be generated.
- timestamp_is_close (bool) – A flag indicating whether the timestamp is close to the trading time.
- offset_interval_ms (int) – The offset interval in milliseconds to be applied.
- random_seed (Optional *[*int ]) – The seed for random number generation to ensure reproducibility.
- Returns: dict – high and low offsets are randomized.
- Return type: A dictionary with arrays of offsets for open, high, low, and close prices. If random_seed is provided,
calculate_volume_quarter(volume: np.ndarray, int precision: int, double size_increment: float)
Convert raw volume data to quarter precision.
- Parameters:
- volume (np.ndarray) – An array of volume data to be processed.
- precision (int) – The decimal precision to which the volume data is rounded.
- Returns: The volume data adjusted to quarter precision.
- Return type: np.ndarray
prepare_event_and_init_timestamps(index: pd.DatetimeIndex, int ts_init_delta: int)
preprocess_bar_data(data: pd.DataFrame, is_raw: bool)
Preprocess financial bar data to a standardized format.
Ensures the DataFrame index is labeled as “timestamp”, converts the index to UTC, removes time zone awareness, drops rows with NaN values in critical columns, and optionally scales the data.
- Parameters:
- data (pd.DataFrame) – The input DataFrame containing financial bar data.
- is_raw (bool) – A flag to determine whether the data should be scaled. If True, scales the data back by FIXED_SCALAR.
- Returns: pd.DataFrame
- Return type: The preprocessed DataFrame with a cleaned and standardized structure.
class RotationMode
Bases: Enum
SIZE = 0
INTERVAL = 1
SCHEDULED_DATES = 2
NO_ROTATION = 3
class StreamingFeatherWriter
Bases: object
Provides a stream writer of Posei objects into feather files with rotation capabilities.
- Parameters:
- path (str) – The path to persist the stream to. Must be a directory.
- cache (Cache) – The cache for the query info.
- clock (Clock) – The clock to use for time-related operations.
- fs_protocol (str , default 'file') – The fsspec file system protocol.
- flush_interval_ms (int , optional) – The flush interval (milliseconds) for writing chunks.
- replace (bool , default False) – If existing files at the given path should be replaced.
- include_types (list *[*type ] , optional) – A list of Arrow serializable types to write. If this is specified then only the included types will be written.
- rotation_mode (RotationMode, default RotationMode.NO_ROTATION) – The mode for file rotation.
- max_file_size (int , default 1GB) – The maximum file size in bytes before rotation (for SIZE mode).
- rotation_interval (pd.Timedelta , default 1 day) – The time interval for file rotation (for INTERVAL mode and SCHEDULED_DATES mode).
- rotation_time (datetime.time , default 00:00) – The time of day for file rotation (for SCHEDULED_DATES mode).
- rotation_timezone (str , default 'UTC') – The timezone for rotation calculations(for SCHEDULED_DATES mode).
write(obj: object) → None
Write the object to the stream.
- Parameters: obj (object) – The object to write.
- Raises:
ValueError – If obj is
None
.
check_flush() → None
Flush all stream writers if current time greater than the next flush interval.
flush() → None
Flush all stream writers.
close() → None
Flush and close all stream writers.
get_current_file_info() → dict[str | tuple[str, str], dict[str, Any]]
Get information about the current files being written.
- Returns: A dictionary containing file information for each table.
- Return type: dict[str | tuple[str, str], dict[str, Any]]
get_next_rotation_time(table_name: str | tuple[str, str]) → Timestamp | None
Get the expected time for the next file rotation.
- Parameters: table_name (str | tuple *[*str , str ]) – The specific table name to get the next rotation time for.
- Returns: The next rotation time for the specified table, or None if not set.
- Return type: pd.Timestamp | None
property is_closed : bool
Return whether all file streams are closed.
- Returns: True if all streams are closed, False otherwise.
- Return type: bool