Source code for fynance.data.base

#!/usr/bin/env python3
# coding: utf-8

""" Data-source port: base class, adapter registry and the ``load`` dispatcher.

:class:`DataSource` (in :mod:`fynance.core.protocols`) is the only I/O boundary
of the library. Concrete adapters (CSV, Parquet, ...) register here and the
:func:`load` dispatcher picks one by file extension.

"""

from __future__ import annotations

# Built-in packages
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Callable

# Local packages
from fynance.core import PriceSeries

__all__ = ['BaseDataSource', 'register', 'get_source', 'load']

_REGISTRY: dict[str, type["BaseDataSource"]] = {}

# Map file extensions to registered source names.
_EXT = {
    ".csv": "csv",
    ".txt": "csv",
    ".parquet": "parquet",
    ".pq": "parquet",
}


class BaseDataSource(ABC):
    """ Abstract base for data-source adapters (the I/O port). """

    @abstractmethod
    def load(self, path: str | Path, **kwargs: Any) -> Any:
        """ Load ``path`` into a :class:`PriceSeries` or a mapping of them. """
        ...


def register(name: str) -> Callable[[type[BaseDataSource]], type[BaseDataSource]]:
    """ Class decorator registering a data-source adapter under ``name``. """
    def deco(cls: type[BaseDataSource]) -> type[BaseDataSource]:
        _REGISTRY[name] = cls

        return cls

    return deco


def get_source(name: str) -> BaseDataSource:
    """ Instantiate a registered data source by name. """
    if name not in _REGISTRY:

        raise ValueError(
            f"unknown data source {name!r}; registered: {sorted(_REGISTRY)}"
        )

    return _REGISTRY[name]()


[docs] def load( path: str | Path, source: str = "auto", **kwargs: Any, ) -> PriceSeries | dict[str, PriceSeries]: """ Load a file into a :class:`PriceSeries` (or mapping for multi-column). Parameters ---------- path : str or pathlib.Path File to read. source : str Adapter name, or ``"auto"`` to select by file extension. **kwargs Forwarded to the adapter's ``load``. """ if source == "auto": ext = Path(path).suffix.lower() if ext not in _EXT: raise ValueError( f"cannot infer data source from extension {ext!r}; " f"pass source= explicitly" ) source = _EXT[ext] return get_source(source).load(path, **kwargs)
def frame_to_series( df: Any, value_col: str | None = None, index_col: str | None = None, freq: str | None = None, ) -> PriceSeries | dict[str, PriceSeries]: """ Convert a polars DataFrame to a :class:`PriceSeries` or a mapping. Shared by the CSV and Parquet adapters: resolves the temporal index column and the value column(s). """ import polars as pl if index_col is None: temporal = [ c for c, dt in zip(df.columns, df.dtypes) if dt in (pl.Date, pl.Datetime) ] index_col = temporal[0] if temporal else None index = df[index_col].to_numpy() if index_col is not None else None value_cols = [c for c in df.columns if c != index_col] if value_col is not None: return PriceSeries( df[value_col].to_numpy(), index=index, name=value_col, freq=freq, ) if len(value_cols) == 1: return PriceSeries( df[value_cols[0]].to_numpy(), index=index, name=value_cols[0], freq=freq, ) return { c: PriceSeries(df[c].to_numpy(), index=index, name=c, freq=freq) for c in value_cols }