Source code for fynance.features.filters

#!/usr/bin/env python3
# coding: utf-8

""" Filter functions for time-series financial data.

Numba-accelerated implementations of the linear Kalman filter and the
Rauch-Tung-Striebel (RTS) smoother, plus a maximum-likelihood routine
to fit the noise covariances on observed data.

Useful for denoising price series, estimating latent states (e.g. a
slowly varying mean or trend), and extracting smoothed signals for
downstream ML models. Causal output (:func:`kalman_filter`) is
appropriate for live signals; the RTS smoother is non-causal and
intended for offline analysis.

Main entry points
-----------------
- :func:`kalman_filter` — forward causal pass, returning filtered
  states and covariances.
- :func:`rts_smoother` — backward pass producing smoothed estimates.
- :func:`kalman_loglikelihood` — log-likelihood of observations under
  given parameters.
- :func:`fit_kalman` — maximum-likelihood fit of process and
  observation noise covariances.

References
----------
.. [1] Kalman, R. E. (1960). A new approach to linear filtering and
       prediction problems.

"""

# Built-in
import math

# Third-party
import numba as nb
import numpy as np
from numpy.typing import NDArray
from scipy.optimize import minimize

# Local

__all__ = ['kalman_filter', 'rts_smoother', 'kalman_loglikelihood', 'fit_kalman']


@nb.njit(cache=True)
def _kalman_filter_core(y, G, F, W, V, m0, C0):
    """Numba-compiled Kalman filter forward pass loop.

    Parameters
    ----------
    y, G, F, W, V, m0, C0 : float arrays
        Inputs as created in kalman_filter (already float64 dtype).

    Returns
    -------
    m, C, a, R, e, S : float arrays
        Outputs (same shapes as in kalman_filter).
    """
    T, n = y.shape

    m = np.zeros((T, n))
    C = np.zeros((T, n, n))
    a = np.zeros((T, n))
    R = np.zeros((T, n, n))
    e = np.zeros((T, n))
    S = np.zeros((T, n, n))

    m_prev = m0.copy()
    C_prev = C0.copy()

    for t in range(T):
        # Predict
        a[t] = G @ m_prev
        R[t] = G @ C_prev @ G.T + W

        # Innovation
        e[t] = y[t] - F @ a[t]
        S[t] = F @ R[t] @ F.T + V

        # Update
        K = R[t] @ F.T @ np.linalg.inv(S[t])
        m[t] = a[t] + K @ e[t]
        C[t] = (np.eye(n) - K @ F) @ R[t]

        m_prev = m[t]
        C_prev = C[t]

    return m, C, a, R, e, S


@nb.njit(cache=True)
def _rts_smoother_core(m, C, a, R, G):
    """Numba-compiled RTS smoother backward pass loop.

    Parameters
    ----------
    m, C, a, R, G : float arrays
        Inputs from kalman_filter and the transition matrix.

    Returns
    -------
    ms, Cs : float arrays
        Smoothed means and covariances.
    """
    T, n = m.shape
    ms = m.copy()
    Cs = C.copy()

    for t in range(T - 2, -1, -1):
        L = C[t] @ G.T @ np.linalg.pinv(R[t + 1])
        ms[t] = m[t] + L @ (ms[t + 1] - a[t + 1])
        Cs[t] = C[t] + L @ (Cs[t + 1] - R[t + 1]) @ L.T

    return ms, Cs


[docs] def kalman_filter( y: NDArray, G: NDArray, F: NDArray, W: NDArray, V: NDArray, m0: NDArray | None = None, C0: NDArray | None = None, ) -> tuple[NDArray, NDArray, NDArray, NDArray, NDArray, NDArray]: """ Linear Gaussian Kalman filter. Recursive Bayesian estimator for the latent state ``x_t`` of a linear Gaussian state-space model. At each step, the prior obtained from the dynamics ``G`` is updated with the new observation through the Kalman gain, yielding the posterior. The output is causal — only past and present observations are used — making the filter suitable for live signal extraction. For an offline smoothed estimate that also conditions on future observations, run :func:`rts_smoother` on the returned ``m, C, a, R``. Implementation is JIT-compiled with Numba; the first call incurs a one-off compilation cost. State-space model:: x_t = G @ x_{t-1} + w_t, w_t ~ N(0, W) y_t = F @ x_t + v_t, v_t ~ N(0, V) Alternates between a *predict* step (prior ``a``, ``R``) and an *update* step (posterior ``m``, ``C``). Parameters ---------- y : np.ndarray of shape (T, n) Observed data. G : np.ndarray of shape (n, n) State transition matrix. F : np.ndarray of shape (n, n) Observation matrix. W : np.ndarray of shape (n, n) Process noise covariance. V : np.ndarray of shape (n, n) Observation noise covariance. m0 : np.ndarray of shape (n,), optional Initial state mean (default: zeros). C0 : np.ndarray of shape (n, n), optional Initial state covariance (default: identity). Returns ------- m : np.ndarray of shape (T, n) Filtered state means (posterior). C : np.ndarray of shape (T, n, n) Filtered state covariances (posterior). a : np.ndarray of shape (T, n) Prior state means (predict step). R : np.ndarray of shape (T, n, n) Prior state covariances (predict step). e : np.ndarray of shape (T, n) Innovations ``y_t - F @ a_t``. S : np.ndarray of shape (T, n, n) Innovation covariances ``F @ R_t @ F.T + V``. References ---------- R. E. Kalman, "A New Approach to Linear Filtering and Prediction Problems", Journal of Basic Engineering, 1960. Examples -------- >>> import numpy as np >>> rng = np.random.default_rng(0) >>> y = rng.standard_normal((5, 1)) >>> G = F = W = V = np.eye(1) >>> m, C, a, R, e, S = kalman_filter(y, G, F, W, V) >>> m.shape (5, 1) >>> C.shape (5, 1, 1) >>> e.shape (5, 1) """ y = np.asarray(y, dtype=np.float64) G = np.asarray(G, dtype=np.float64) F = np.asarray(F, dtype=np.float64) W = np.asarray(W, dtype=np.float64) V = np.asarray(V, dtype=np.float64) T, n = y.shape m0_init = np.zeros(n, dtype=np.float64) if m0 is None else np.asarray(m0, dtype=np.float64) C0_init = np.eye(n, dtype=np.float64) if C0 is None else np.asarray(C0, dtype=np.float64) return _kalman_filter_core(y, G, F, W, V, m0_init, C0_init)
[docs] def rts_smoother( m: NDArray, C: NDArray, a: NDArray, R: NDArray, G: NDArray, ) -> tuple[NDArray, NDArray]: """ Rauch-Tung-Striebel (RTS) smoother. Backward pass over the output of :func:`kalman_filter`. Produces smoothed estimates that use all observations, not just past ones. Parameters ---------- m : np.ndarray of shape (T, n) Filtered means from :func:`kalman_filter`. C : np.ndarray of shape (T, n, n) Filtered covariances from :func:`kalman_filter`. a : np.ndarray of shape (T, n) Prior means from :func:`kalman_filter`. R : np.ndarray of shape (T, n, n) Prior covariances from :func:`kalman_filter`. G : np.ndarray of shape (n, n) State transition matrix (same as passed to :func:`kalman_filter`). Returns ------- ms : np.ndarray of shape (T, n) Smoothed state means. Cs : np.ndarray of shape (T, n, n) Smoothed state covariances. Examples -------- >>> import numpy as np >>> rng = np.random.default_rng(0) >>> y = rng.standard_normal((5, 1)) >>> G = F = W = V = np.eye(1) >>> m, C, a, R, e, S = kalman_filter(y, G, F, W, V) >>> ms, Cs = rts_smoother(m, C, a, R, G) >>> ms.shape (5, 1) >>> bool((np.abs(ms) <= np.abs(y) * 2).all()) True """ m = np.asarray(m, dtype=np.float64) C = np.asarray(C, dtype=np.float64) a = np.asarray(a, dtype=np.float64) R = np.asarray(R, dtype=np.float64) G = np.asarray(G, dtype=np.float64) return _rts_smoother_core(m, C, a, R, G)
[docs] def kalman_loglikelihood(e: NDArray, S: NDArray) -> float: """ Prediction-error decomposition log-likelihood. Computes:: L = -0.5 * sum_t [ log|S_t| + e_t^T S_t^{-1} e_t + n log(2π) ] Parameters ---------- e : np.ndarray of shape (T, n) Innovations from :func:`kalman_filter`. S : np.ndarray of shape (T, n, n) Innovation covariances from :func:`kalman_filter`. Returns ------- float Log-likelihood value (higher is better). Examples -------- >>> import numpy as np >>> rng = np.random.default_rng(0) >>> y = rng.standard_normal((20, 1)) >>> G = F = W = V = np.eye(1) >>> m, C, a, R, e, S = kalman_filter(y, G, F, W, V) >>> ll = kalman_loglikelihood(e, S) >>> isinstance(ll, float) True >>> ll < 0 True """ T, n = e.shape ll = 0.0 log2pi = math.log(2 * math.pi) for t in range(T): sign, logdet = np.linalg.slogdet(S[t]) if sign <= 0: return -np.inf quad = float(e[t] @ np.linalg.inv(S[t]) @ e[t]) ll -= 0.5 * (logdet + quad + n * log2pi) return ll
[docs] def fit_kalman( y: NDArray, G: NDArray, F: NDArray, *, fit_W: bool = True, fit_V: bool = True, W0: NDArray | None = None, V0: NDArray | None = None, m0: NDArray | None = None, C0: NDArray | None = None, ) -> dict: """ Estimate Kalman filter noise covariances by maximum likelihood. Optimises W and/or V via the prediction-error log-likelihood computed by :func:`kalman_loglikelihood`. Parameters are searched in log-space to guarantee positive-definiteness for diagonal matrices. Only diagonal W and V are supported for estimation. Pass fixed non-diagonal matrices through ``W0`` / ``V0`` with ``fit_W=False`` / ``fit_V=False``. Parameters ---------- y : np.ndarray of shape (T, n) Observed data. G : np.ndarray of shape (n, n) State transition matrix (fixed). F : np.ndarray of shape (n, n) Observation matrix (fixed). fit_W : bool, optional Whether to estimate W (default True). fit_V : bool, optional Whether to estimate V (default True). W0 : np.ndarray of shape (n, n), optional Initial / fixed W. If ``fit_W=False`` this value is used as-is. Defaults to identity. V0 : np.ndarray of shape (n, n), optional Initial / fixed V. If ``fit_V=False`` this value is used as-is. Defaults to identity. m0 : np.ndarray of shape (n,), optional Initial state mean (default: zeros). C0 : np.ndarray of shape (n, n), optional Initial state covariance (default: identity). Returns ------- dict with keys: - ``'W'`` : np.ndarray — estimated (or fixed) process noise covariance. - ``'V'`` : np.ndarray — estimated (or fixed) observation noise covariance. - ``'loglik'`` : float — log-likelihood at the optimum. - ``'success'`` : bool — whether the optimizer converged. Examples -------- >>> import numpy as np >>> rng = np.random.default_rng(42) >>> n, T = 1, 200 >>> W_true = np.eye(n) * 0.5 >>> V_true = np.eye(n) * 1.0 >>> x = np.cumsum(rng.multivariate_normal(np.zeros(n), W_true, T), axis=0) >>> y = x + rng.multivariate_normal(np.zeros(n), V_true, T) >>> G = F = np.eye(n) >>> result = fit_kalman(y, G, F) >>> result['success'] True >>> result['W'].shape (1, 1) """ y = np.asarray(y, dtype=float) G = np.asarray(G, dtype=float) F = np.asarray(F, dtype=float) n = y.shape[1] W_fixed = (np.eye(n) if W0 is None else np.asarray(W0, dtype=float)) V_fixed = (np.eye(n) if V0 is None else np.asarray(V0, dtype=float)) # Build initial log-space parameter vector x0_parts = [] if fit_W: x0_parts.append(np.log(np.diag(W_fixed))) if fit_V: x0_parts.append(np.log(np.diag(V_fixed))) if not x0_parts: _, _, _, _, e, S = kalman_filter(y, G, F, W_fixed, V_fixed, m0, C0) return { 'W': W_fixed, 'V': V_fixed, 'loglik': kalman_loglikelihood(e, S), 'success': True, } x0 = np.concatenate(x0_parts) def _neg_loglik(params): idx = 0 if fit_W: W = np.diag(np.exp(params[idx: idx + n])) idx += n else: W = W_fixed V = np.diag(np.exp(params[idx: idx + n])) if fit_V else V_fixed _, _, _, _, e, S = kalman_filter(y, G, F, W, V, m0, C0) return -kalman_loglikelihood(e, S) res = minimize(_neg_loglik, x0, method='L-BFGS-B') idx = 0 if fit_W: W_hat = np.diag(np.exp(res.x[idx: idx + n])) idx += n else: W_hat = W_fixed V_hat = np.diag(np.exp(res.x[idx: idx + n])) if fit_V else V_fixed _, _, _, _, e, S = kalman_filter(y, G, F, W_hat, V_hat, m0, C0) return { 'W': W_hat, 'V': V_hat, 'loglik': kalman_loglikelihood(e, S), 'success': bool(res.success), }