Source code for fynance.features.regime

#!/usr/bin/env python3
# coding: utf-8

""" Market-regime detection.

Unsupervised labelling of market regimes (e.g. calm / volatile, trending /
mean-reverting) by clustering rolling volatility and return features. Intended
for *analysis* and architecture conditioning (a model per regime, or a regime
embedding) — see the notes on causality.
"""

from __future__ import annotations

# Third-party packages
import numpy as np
from numpy.typing import NDArray
from scipy.cluster.vq import kmeans2

# Local packages
from fynance.features.indicators import realized_volatility

__all__ = ['detect_regimes', 'regime_features', 'RegimeDetector']


[docs] def regime_features(X: NDArray, w: int = 21, period: int = 252) -> NDArray: """ Build the causal regime feature matrix: trailing vol and mean return. Both columns use **trailing** windows (past only), so the matrix is causal — the value at ``t`` depends on ``X[:t+1]`` only. Parameters ---------- X : np.ndarray One-dimensional price/level series. w : int Rolling window. period : int Annualization factor for the volatility column. Returns ------- np.ndarray Shape ``(len(X), 2)`` — ``[realized_volatility, rolling_mean_log_return]``. """ X = np.asarray(X, dtype=np.float64).reshape(-1) vol = np.asarray(realized_volatility(X, w=w, period=period)) ret = np.zeros_like(X) ret[1:] = np.log(X[1:] / X[:-1]) roll_ret = np.zeros_like(X) for t in range(X.shape[0]): roll_ret[t] = ret[max(0, t - w + 1):t + 1].mean() return np.column_stack([vol, roll_ret])
[docs] class RegimeDetector: """ Causal market-regime detector (fit on the past, assign online). Unlike :func:`detect_regimes` (which clusters in-sample and therefore peeks at the whole series), this fits k-means on a **training** slice only and assigns any later point to the nearest training centroid — so labels are a **strictly causal** feature usable in a backtest. Labels are ordered by increasing mean volatility (0 = calmest), like :func:`detect_regimes`. Parameters ---------- n_regimes : int Number of regimes (clusters). w : int Rolling window for the features. period : int Annualization factor for the volatility feature. seed : int Seed for the k-means initialization. Examples -------- >>> import numpy as np >>> from fynance.features import RegimeDetector >>> rng = np.random.default_rng(0) >>> p = 100 * np.exp(np.cumsum(rng.standard_normal(400) * 0.01)) >>> det = RegimeDetector(n_regimes=2, w=10).fit(p[:300]) >>> labels = det.predict(p) >>> labels.shape, set(np.unique(labels)) <= {0, 1} ((400,), True) """ def __init__(self, n_regimes: int = 3, w: int = 21, period: int = 252, seed: int = 0): self.n_regimes = n_regimes self.w = w self.period = period self.seed = seed
[docs] def fit(self, X: NDArray) -> RegimeDetector: """ Fit k-means on the training series ``X`` (past only). """ feats = regime_features(X, self.w, self.period) self._mu = feats.mean(axis=0) self._sd = feats.std(axis=0) self._sd[self._sd == 0] = 1.0 z = (feats - self._mu) / self._sd centroids, labels = kmeans2(z, self.n_regimes, seed=self.seed, minit='++', missing='raise') self._centroids = centroids # Order clusters by mean (unstandardized) volatility for stable labels. vol = feats[:, 0] order = np.argsort([vol[labels == k].mean() for k in range(self.n_regimes)]) remap = np.empty(self.n_regimes, dtype=int) remap[order] = np.arange(self.n_regimes) self._remap = remap return self
[docs] def predict(self, X: NDArray) -> NDArray: """ Assign each point of ``X`` to the nearest training centroid. """ feats = regime_features(X, self.w, self.period) z = (feats - self._mu) / self._sd dist = ((z[:, None, :] - self._centroids[None, :, :]) ** 2).sum(axis=-1) return self._remap[dist.argmin(axis=1)]
[docs] def fit_predict(self, X: NDArray) -> NDArray: """ Convenience: :meth:`fit` then :meth:`predict` on the same series. """ return self.fit(X).predict(X)
[docs] def detect_regimes( X: NDArray, n_regimes: int = 3, w: int = 21, period: int = 252, seed: int = 0, ) -> NDArray: r""" Label market regimes by k-means on rolling vol / return features. Builds two features per date — trailing realized volatility and trailing mean return — standardizes them, and clusters with k-means into ``n_regimes`` groups. Labels are **ordered by mean volatility** (0 = calmest, ``n_regimes - 1`` = most volatile) so they are comparable across runs. .. note:: The clustering is fit **in-sample** (it sees the whole series), so the labels are appropriate for *analysis* and for studying regime conditioning — not as a strictly-causal online feature. A causal online assignment (fit on the past only) is a separate extension. Parameters ---------- X : np.ndarray One-dimensional price/level series. n_regimes : int, optional Number of regimes (clusters). Default 3. w : int, optional Rolling window for the features. Default 21. period : int, optional Annualization factor for the volatility feature. Default 252. seed : int, optional Seed for the k-means initialization. Default 0. Returns ------- np.ndarray Integer regime label per observation, shape ``(len(X),)``, in ``[0, n_regimes)`` and ordered by increasing average volatility. """ feats = regime_features(X, w=w, period=period) vol = feats[:, 0] mu = feats.mean(axis=0) sd = feats.std(axis=0) sd[sd == 0] = 1.0 z = (feats - mu) / sd _, labels = kmeans2(z, n_regimes, seed=seed, minit='++', missing='raise') # Re-order labels by increasing mean volatility for stable interpretation. order = np.argsort([vol[labels == k].mean() for k in range(n_regimes)]) remap = np.empty(n_regimes, dtype=int) remap[order] = np.arange(n_regimes) return remap[labels]