Source code for fynance.models.rolling

#!/usr/bin/env python3
# coding: utf-8

""" Walk-forward training wrappers for time-series models.

Iterator-based base class :class:`_RollingBasis` that re-fits a model
on a sliding training window and predicts on the next out-of-sample
window. The pattern enforces strict temporal ordering, eliminating
lookahead bias and matching how a strategy would actually be retrained
in production.

A concrete :class:`RollMultiLayerPerceptron` combines this iterator
with :class:`fynance.models.mlp.MultiLayerPerceptron`. The
same pattern is applied to portfolio allocation in
:func:`fynance.algorithms.allocation.rolling_allocation`.

Main entry points
-----------------
- :class:`_RollingBasis` — iterator that yields ``(eval_set,
  test_set)`` slices and tracks training/evaluation/test losses.
- :class:`RollMultiLayerPerceptron` — walk-forward MLP, ready to use
  via :meth:`RollMultiLayerPerceptron.set_roll_period` and
  :meth:`RollMultiLayerPerceptron.run`.

"""

from __future__ import annotations

# Built-in packages
from dataclasses import dataclass
from multiprocessing import Process
from typing import Callable

# External packages
import numpy as np
import pandas as pd
import torch
from matplotlib import pyplot as plt
from numpy.typing import NDArray

from fynance.backtest.dynamic_plot_backtest import BacktestNeuralNet

# Local packages
from fynance.models.mlp import MultiLayerPerceptron

plt.style.use('seaborn-v0_8')


__all__ = ['CVResult', '_RollingBasis', 'RollMultiLayerPerceptron']


[docs] @dataclass class CVResult: """ Results from :meth:`_RollingBasis.cross_validate`. Attributes ---------- oof_predictions : np.ndarray Out-of-fold predictions, shape ``(T, n_out)``. Positions that fall before the first test fold are filled with ``NaN``. fold_metrics : list of float Per-fold metric values (empty list when ``metric_fn`` is None). mean_metric : float or None Mean of ``fold_metrics``, or None when no metric was provided. std_metric : float or None Standard deviation of ``fold_metrics``, or None when no metric was provided. """ oof_predictions: np.ndarray fold_metrics: list mean_metric: float | None std_metric: float | None
[docs] class _RollingBasis: r""" Base object to roll a model over a time axis. At each step the model trains on ``X[t-n:t]`` and predicts on ``X[t:t+s]``. Call :meth:`set_roll_period` (or :meth:`__call__`) to configure the window sizes, then iterate with :func:`run`. The leading underscore signals that this class is **internal** — its ``_train``, ``_get_loss_on`` and ``sub_predict`` hooks are expected to be overridden by a concrete subclass mixed with a :class:`~fynance.models._base.BaseNeuralNet` descendant. The public, stable entry point is :class:`RollMultiLayerPerceptron`. Other ready-to-use combinations can be added by following the same pattern (multiple inheritance + ``set_roll_period`` instead of ``__call__``, since the latter is captured by ``torch.nn.Module``). Parameters ---------- X, y : array_like Respectively input and output data, shaped ``(T, N)`` and ``(T, M)``. Strict temporal ordering is required — no shuffling, no future leakage. f : callable, optional Function to transform target, e.g. ``torch.sign``. index : array_like, optional Time index of data. Attributes ---------- n, s, r : int Respectively size of training, testing and rolling period. b, e, T : int Respectively batch size, number of epochs and size of entire dataset. t, _e, i : int Respectively the current time period, the current epoch and the current iteration. n_iter : int Total number of iterations. y_eval, y_test : np.ndarray Respectively evaluating and testing predictions. log : list of dict Per-step record of ``{step, train_loss, eval_loss, test_loss}``, populated by :meth:`run`. Use :meth:`get_stats` to get a DataFrame. """ def __init__( self, X: NDArray | torch.Tensor, y: NDArray | torch.Tensor, f: Callable | None = None, index: NDArray | None = None, ): self.T = X.shape[0] self.y_shape = y.shape self.f = (lambda x: x) if f is None else f self.idx = np.arange(self.T) if index is None else index self.log: list[str] = []
[docs] def __call__( self, train_period: int, test_period: int, start: int = 0, end: int | None = None, roll_period: int | None = None, eval_period: int | None = None, batch_size: int = 64, epochs: int = 1, ) -> _RollingBasis: """ Configure rolling window parameters. Parameters ---------- train_period, test_period : int Size of respectively training and testing sub-periods. start : int, optional Starting observation, default is first observation. end : int, optional Ending observation, default is last observation. roll_period : int, optional Size of the rolling period, default equals ``test_period``. eval_period : int, optional Size of the evaluating period (unused, kept for API compat). batch_size : int, optional Training batch size, default is 64. epochs : int, optional Number of epochs per sub-period, default is 1. Returns ------- _RollingBasis """ self.n = train_period self.s = test_period self.r = test_period if roll_period is None else roll_period self.b = batch_size self.e = epochs self.T = self.T if end is None else min(self.T, end) self.t0 = max(self.n - self.r, min(start, self.T - self.n - self.s)) self.n_iter = (self.T - self.t0 - self.s) // self.r * self.e self.log = [] return self
def __iter__(self): self.y_eval = np.zeros(self.y_shape, dtype=np.float64) self.y_test = np.zeros(self.y_shape, dtype=np.float64) self.loss_eval = np.zeros([self.n_iter], dtype=np.float64) self.loss_test = np.zeros([self.n_iter], dtype=np.float64) self.loss_train = np.zeros([self.n_iter], dtype=np.float64) self._e = self.e self.t = self.t0 self.i = -1 return self def __next__(self): self._e += 1 self.i += 1 if self._e > self.e: self._e = 1 if self.t + self.r + self.s > self.T: raise StopIteration self.t += self.r self.t_idx = np.arange(self.t - self.n, self.t) eval_set = slice(self.t - self.r, self.t) test_set = slice(self.t, self.t + self.s) return eval_set, test_set def _fold_slices(self): """ Yield ``(train_slice, test_slice)`` for every walk-forward fold. Unlike :meth:`__next__`, this generator has no epoch loop and allocates nothing — it is a pure windowing helper shared by :meth:`cross_validate` and any future CV utilities. Yields ------ train_slice : slice Window ``[t - n, t)`` used for training. test_slice : slice Window ``[t, t + s)`` used for out-of-sample evaluation. """ t = self.t0 + self.r while t + self.s <= self.T: yield slice(t - self.n, t), slice(t, t + self.s) t += self.r
[docs] def cross_validate(self, model_factory, X, y, metric_fn=None, epochs=1): """ Walk-forward cross-validation with out-of-fold predictions. At each fold a **fresh** model is created via ``model_factory()``, trained on the rolling training window, and used to predict the next out-of-sample window. Results are accumulated across all folds without any state leaking between them. Call :meth:`__call__` (or :meth:`set_roll_period` for ``RollMultiLayerPerceptron``) to configure ``train_period``, ``test_period``, and ``roll_period`` before calling this method. Parameters ---------- model_factory : callable Called with no arguments before every fold. Must return an object that exposes ``train_on(X, y)`` and ``predict(X) -> NDArray | Tensor`` (the :class:`~fynance.models._base.BaseNeuralNet` interface). X, y : array_like Input and target arrays shaped ``(T, N)`` and ``(T, M)``. metric_fn : callable, optional ``metric_fn(y_true, y_pred) -> float`` evaluated on the test window of each fold. If None, :attr:`CVResult.fold_metrics` is an empty list and the mean/std fields are None. epochs : int, optional Number of full training passes per fold, default 1. Returns ------- CVResult Examples -------- >>> import numpy as np >>> import torch, torch.nn as nn >>> from fynance.models.mlp import MultiLayerPerceptron >>> from fynance.models.rolling import _RollingBasis >>> rng = np.random.default_rng(0) >>> X = rng.standard_normal((80, 4)).astype(np.float32) >>> y = rng.standard_normal((80, 1)).astype(np.float32) >>> Xt, yt = torch.from_numpy(X), torch.from_numpy(y) >>> def factory(): ... m = MultiLayerPerceptron(4, 1, layers=[8]) ... m.set_optimizer(nn.MSELoss, torch.optim.Adam, lr=1e-3) ... return m >>> rb = _RollingBasis(Xt, yt) >>> _ = rb(train_period=40, test_period=10, roll_period=10) >>> result = rb.cross_validate(factory, Xt, yt) >>> result.oof_predictions.shape (80, 1) """ n_out = y.shape[1] if y.ndim > 1 else 1 oof = np.full((X.shape[0], n_out), np.nan) fold_metrics = [] for train_sl, test_sl in self._fold_slices(): model = model_factory() X_tr, y_tr = X[train_sl], y[train_sl] for _ in range(epochs): model.train_on(X_tr, y_tr) pred = model.predict(X[test_sl]) if hasattr(pred, 'numpy'): pred = pred.numpy() oof[test_sl] = pred.reshape(-1, n_out) if metric_fn is not None: fold_metrics.append(float(metric_fn(y[test_sl], pred))) mean_m = float(np.mean(fold_metrics)) if fold_metrics else None std_m = float(np.std(fold_metrics)) if fold_metrics else None return CVResult(oof, fold_metrics, mean_m, std_m)
[docs] def get_stats(self): """ Return per-step loss history as a DataFrame. Returns ------- pd.DataFrame Columns: ``step``, ``train_loss``, ``eval_loss``, ``test_loss``. """ if not self.log: return pd.DataFrame( columns=['step', 'train_loss', 'eval_loss', 'test_loss'] ) return pd.DataFrame(self.log)
[docs] def plot_loss(self, figsize=(9, 4)): """ Plot train / eval / test loss curves. Parameters ---------- figsize : tuple of int, optional Returns ------- matplotlib.figure.Figure """ df = self.get_stats() if df.empty: raise RuntimeError('No log data — run the model first.') fig, ax = plt.subplots(figsize=figsize) ax.plot(df['step'], df['train_loss'], label='Train') ax.plot(df['step'], df['eval_loss'], label='Eval') ax.plot(df['step'], df['test_loss'], label='Test') ax.set_xlabel('Step') ax.set_ylabel('Loss') ax.legend() fig.tight_layout() return fig
def _training(self): loss_epoch = 0. np.random.shuffle(self.t_idx) for t in range(0, self.n, self.b): s = min(t + self.b, self.n) train_slice = self.t_idx[t: s] try: lo = self._train( X=self.X[train_slice], y=self.f(self.y[train_slice]), ) except Exception as e: print(train_slice) print(self.X[train_slice]) print(self.f(self.y[train_slice])) raise e loss_epoch += lo.item() self.loss_train[self.i] = loss_epoch / s
[docs] def run(self, backtest_plot=True, backtest_kpi=True, figsize=(9, 6), func=np.sign): """ Run the rolling model and collect backtest predictions. Parameters ---------- backtest_plot : bool, optional If True, display a live backtest performance plot. backtest_kpi : bool, optional If True, print KPIs to stdout at each step. figsize : tuple of int, optional Figure size. func : callable, optional Function applied to predictions before computing returns. """ y = self.y.numpy() r = np.exp(y) - 1 y_perf = np.exp(np.cumsum(y, axis=0)) y_perf = 100. * y_perf / y_perf[self.t0] self.perf_eval = 100. * np.ones(y.shape, dtype=np.float64) self.perf_test = 100. * np.ones(y.shape, dtype=np.float64) self.bnn = BacktestNeuralNet(figsize) self.log = [] p_print = None for eval_set, test_set in self: self._training() self.y_eval[eval_set] = self.sub_predict(self.X[eval_set]) self.y_test[test_set] = self.sub_predict(self.X[test_set]) self.loss_eval[self.i] = self._get_loss_on(self.y_eval, eval_set) self.loss_test[self.i] = self._get_loss_on(self.y_test, test_set) self.log.append({ 'step': self.i, 'train_loss': self.loss_train[self.i], 'eval_loss': self.loss_eval[self.i], 'test_loss': self.loss_test[self.i], }) if self._e == self.e: v0 = self.perf_eval[self.t - self.r - 1] self.perf_eval[eval_set] = get_perf2( r[eval_set], func(self.y_eval[eval_set]), v0=v0 ) v0 = self.perf_test[self.t - 1] self.perf_test[test_set] = get_perf2( r[test_set], func(self.y_test[test_set]), v0=v0 ) if self.t > self.t0 + self.r: if p_print is None or not p_print.is_alive(): p_print = Process( target=self._print, args=(self.t, self.i, r, y_perf, func, backtest_plot, backtest_kpi) ) p_print.start() self._print(self.t, self.i, r, y_perf, func, backtest_plot, backtest_kpi) return self
def _print(self, t, i, r, y_perf, func, backtest_plot, backtest_kpi): if backtest_kpi: self._display_kpi(t) if backtest_plot: self._display_plot_loss(self.bnn, i) self._display_plot_perf( self.bnn, self.perf_test, self.perf_eval, y_perf, t ) self.bnn.f.canvas.draw() def _get_loss_on(self, y, _slice): lo = self.criterion( torch.from_numpy(y[_slice]).to(torch.float32), self.y[_slice].to(torch.float32) ) return lo.item() def _display_kpi(self, t): pct = t - self.n - self.s pct = pct / (self.T - self.n - self.T % self.s) txt = '{:5.2%} is done | '.format(pct) txt += 'Eval loss is {:5.2} | '.format(self.loss_eval[-1]) txt += 'Test loss is {:5.2} | '.format(self.loss_test[-1]) print(txt, end='\r') def _display_plot_loss(self, bnn, i): bnn.plot_loss(self.loss_test[: i], self.loss_eval[: i], self.loss_train[: i]) def _display_plot_perf(self, bnn, perf_test, perf_eval, y_perf, t): bnn.plot_perf(perf_test[self.t0: t + self.s], perf_eval[self.t0 - self.s: t], y_perf[self.t0 - self.s: t], self.idx[self.t0 - self.s: t + self.s])
def get_perf2(ret, signal, v0=100): return v0 * np.cumprod(ret * signal + 1, axis=0) def get_perf(signal, underlying, v0=100): return v0 * np.exp(np.cumsum(signal * underlying, axis=0))
[docs] class RollMultiLayerPerceptron(MultiLayerPerceptron, _RollingBasis): """ Rolling version of the multi-layer perceptron model. End-to-end walk-forward training pipeline for an MLP: at each step the model is fitted on a sliding window of length ``n``, evaluated on the previous out-of-sample slice, then used to predict the next slice. Losses (train, eval, test) and out-of-sample predictions are accumulated step by step in ``self.log``, ``self.y_eval`` and ``self.y_test`` for downstream analysis. Use :meth:`set_roll_period` to configure window sizes and batch options, then :meth:`run` to execute the loop. ``run`` can also drive a live :class:`fynance.backtest.dynamic_plot_backtest.BacktestNeuralNet` figure to monitor convergence. Combines :class:`MultiLayerPerceptron` with the walk-forward iterator from :class:`_RollingBasis`. Use :meth:`set_roll_period` instead of calling the object directly (``__call__`` is captured by ``torch.nn.Module``). Methods ------- set_roll_period sub_predict """ def __init__( self, X: NDArray | torch.Tensor, y: NDArray | torch.Tensor, layers: list[int] = [], activation: type[torch.nn.Module] | None = None, drop: float | None = None, bias: bool = True, x_type=None, y_type=None, activation_kwargs: dict = {}, **kwargs, ): _RollingBasis.__init__(self, X, y, **kwargs) MultiLayerPerceptron.__init__(self, X, y, layers=layers, bias=bias, activation=activation, drop=drop, x_type=x_type, y_type=y_type, activation_kwargs=activation_kwargs)
[docs] def set_roll_period( self, train_period: int, test_period: int, start: int = 0, end: int | None = None, roll_period: int | None = None, eval_period: int | None = None, batch_size: int = 64, epochs: int = 1, ) -> _RollingBasis: """ Configure rolling window parameters. This is the preferred entry-point for ``RollMultiLayerPerceptron`` because ``__call__`` is captured by ``torch.nn.Module``. Parameters ---------- train_period, test_period : int Size of respectively training and testing sub-periods. start : int, optional end : int, optional roll_period : int, optional eval_period : int, optional batch_size : int, optional epochs : int, optional Returns ------- _RollingBasis """ return _RollingBasis.__call__( self, train_period=train_period, test_period=test_period, start=start, end=end, roll_period=roll_period, eval_period=eval_period, batch_size=batch_size, epochs=epochs )
def _train(self, X, y): return self.train_on(X=X, y=y)
[docs] def sub_predict(self, X: torch.Tensor) -> NDArray: """ Return predictions as a numpy array. """ return self.predict(X=X).numpy()
[docs] def save(self, path): """ Save the model weights. Parameters ---------- path : str Destination path. """ torch.save(self.state_dict(), path)