Source code for fynance.data.align

#!/usr/bin/env python3
# coding: utf-8

""" Multi-asset alignment and frequency resampling.

All operations are causal: forward-fill uses only past values, and downsampling
aggregations never look past a period's right edge.

"""

from __future__ import annotations

# Built-in packages
# Third-party packages
import numpy as np

# Local packages
from fynance.core import PriceSeries

__all__ = ['align', 'resample']


def _ffill(values: np.ndarray) -> np.ndarray:
    """ Forward-fill NaNs using past values only. """
    out = values.copy()
    mask = np.isnan(out)
    idx = np.where(~mask, np.arange(out.size), 0)
    np.maximum.accumulate(idx, out=idx)
    out = out[idx]
    # Leading NaNs (before the first valid value) stay NaN.
    first_valid = np.argmax(~mask) if mask.any() and (~mask).any() else 0
    out[:first_valid] = np.nan

    return out


[docs] def align( series: dict[str, PriceSeries], how: str = "outer", fill: str | None = "ffill", ) -> dict[str, PriceSeries]: """ Align several series onto a common index. Parameters ---------- series : dict of str to PriceSeries Named series to align. how : {"outer", "inner"} ``outer`` uses the union of indices, ``inner`` the intersection. fill : {"ffill", None} Forward-fill (past-only) missing values after an outer align. Returns ------- dict of str to PriceSeries Series sharing a common index. """ index_sets = [set(ps.index.tolist()) for ps in series.values()] if how == "outer": common = sorted(set().union(*index_sets)) elif how == "inner": common = sorted(set(index_sets[0]).intersection(*index_sets[1:])) else: raise ValueError(f"unknown how: {how!r}") common_arr = np.array(common) out: dict[str, PriceSeries] = {} for name, ps in series.items(): mapping = dict(zip(ps.index.tolist(), ps.values.tolist())) vals = np.array([mapping.get(t, np.nan) for t in common], dtype=float) if fill == "ffill" and how == "outer": vals = _ffill(vals) out[name] = PriceSeries(vals, index=common_arr, name=name, freq=ps.freq) return out
[docs] def resample( ps: PriceSeries, freq: str, agg: str = "last", ) -> PriceSeries | dict[str, PriceSeries]: """ Downsample a series to a coarser frequency. Parameters ---------- ps : PriceSeries Series with a datetime index. freq : str Target polars frequency (e.g. ``"1w"``, ``"1mo"``). agg : {"last", "mean", "ohlc"} Aggregation. ``ohlc`` returns a mapping with open/high/low/close. """ import polars as pl df = pl.DataFrame({"_t": ps.index, "_v": ps.values}).sort("_t") gb = df.group_by_dynamic("_t", every=freq) if agg == "last": res = gb.agg(pl.col("_v").last()) return PriceSeries(res["_v"].to_numpy(), index=res["_t"].to_numpy(), name=ps.name, freq=freq) if agg == "mean": res = gb.agg(pl.col("_v").mean()) return PriceSeries(res["_v"].to_numpy(), index=res["_t"].to_numpy(), name=ps.name, freq=freq) if agg == "ohlc": res = gb.agg( pl.col("_v").first().alias("open"), pl.col("_v").max().alias("high"), pl.col("_v").min().alias("low"), pl.col("_v").last().alias("close"), ) idx = res["_t"].to_numpy() return { c: PriceSeries(res[c].to_numpy(), index=idx, name=c, freq=freq) for c in ("open", "high", "low", "close") } raise ValueError(f"unknown agg: {agg!r}")