Source code for fynance.features.regime

#!/usr/bin/env python3
# coding: utf-8

""" Market-regime detection.

Unsupervised labelling of market regimes (e.g. calm / volatile, trending /
mean-reverting) by clustering rolling volatility and return features. Intended
for *analysis* and architecture conditioning (a model per regime, or a regime
embedding) — see the notes on causality.
"""

from __future__ import annotations

# Third-party packages
import numpy as np
from numpy.typing import NDArray
from scipy.cluster.vq import kmeans2

# Local packages
from fynance.features.indicators import realized_volatility

__all__ = ['detect_regimes']


[docs] def detect_regimes( X: NDArray, n_regimes: int = 3, w: int = 21, period: int = 252, seed: int = 0, ) -> NDArray: r""" Label market regimes by k-means on rolling vol / return features. Builds two features per date — trailing realized volatility and trailing mean return — standardizes them, and clusters with k-means into ``n_regimes`` groups. Labels are **ordered by mean volatility** (0 = calmest, ``n_regimes - 1`` = most volatile) so they are comparable across runs. .. note:: The clustering is fit **in-sample** (it sees the whole series), so the labels are appropriate for *analysis* and for studying regime conditioning — not as a strictly-causal online feature. A causal online assignment (fit on the past only) is a separate extension. Parameters ---------- X : np.ndarray One-dimensional price/level series. n_regimes : int, optional Number of regimes (clusters). Default 3. w : int, optional Rolling window for the features. Default 21. period : int, optional Annualization factor for the volatility feature. Default 252. seed : int, optional Seed for the k-means initialization. Default 0. Returns ------- np.ndarray Integer regime label per observation, shape ``(len(X),)``, in ``[0, n_regimes)`` and ordered by increasing average volatility. """ X = np.asarray(X, dtype=np.float64).reshape(-1) vol = np.asarray(realized_volatility(X, w=w, period=period)) ret = np.zeros_like(X) ret[1:] = np.log(X[1:] / X[:-1]) roll_ret = np.zeros_like(X) for t in range(X.shape[0]): roll_ret[t] = ret[max(0, t - w + 1):t + 1].mean() feats = np.column_stack([vol, roll_ret]) mu = feats.mean(axis=0) sd = feats.std(axis=0) sd[sd == 0] = 1.0 feats = (feats - mu) / sd _, labels = kmeans2(feats, n_regimes, seed=seed, minit='++', missing='raise') # Re-order labels by increasing mean volatility for stable interpretation. order = np.argsort([vol[labels == k].mean() for k in range(n_regimes)]) remap = np.empty(n_regimes, dtype=int) remap[order] = np.arange(n_regimes) return remap[labels]