Source code for fynance.features.momentums

#!/usr/bin/env python3
# coding: utf-8
# @Author: ArthurBernard
# @Email: arthur.bernard.92@gmail.com
# @Date: 2019-02-20 19:57:13
# @Last modified by: ArthurBernard
# @Last modified time: 2019-11-05 15:55:19

""" Statistical momentum functions.

Rolling moving averages and moving standard deviations in three flavors
— simple, weighted and exponential — used as building blocks for
technical indicators, scaling and signal generation.

Implementations are vectorized with NumPy and accept 1-D or 2-D inputs.
The window size ``w`` defaults to the full length of the series; the
``axis`` keyword controls the time axis on 2-D arrays.

Main entry points
-----------------
- :func:`sma`, :func:`wma`, :func:`ema` — moving averages (simple,
  weighted, exponential).
- :func:`smstd`, :func:`wmstd`, :func:`emstd` — moving standard
  deviations.

"""

from __future__ import annotations

# Built-in packages
# External packages
import numpy as np

# Local packages
from numba import njit
from numpy.typing import NDArray

from fynance._wrappers import WrapperArray

__all__ = [
    'sma', 'wma', 'ema', 'smstd', 'wmstd', 'emstd',
]


# --------------------------------------------------------------------------- #
#   numba kernels (ported 1:1 from the former Cython implementation)           #
# --------------------------------------------------------------------------- #


@njit(cache=True)
def _sma_1d(X, w):
    T = X.shape[0]
    ma = np.empty(T, dtype=np.float64)
    S = 0.0
    for t in range(T):
        if t < w:
            S += X[t]
            ma[t] = S / (t + 1)
        else:
            S += X[t] - X[t - w]
            ma[t] = S / w
    return ma


@njit(cache=True)
def _sma_2d(X, w):
    T, N = X.shape
    out = np.empty((T, N), dtype=np.float64)
    for n in range(N):
        out[:, n] = _sma_1d(X[:, n].copy(), w)
    return out


@njit(cache=True)
def _wma_1d(X, w):
    T = X.shape[0]
    ma = np.empty(T, dtype=np.float64)
    m = 1.0
    for t in range(T):
        S = 0.0
        if t < w:
            mm = float(t + 1)
            m = mm * (mm + 1.0) / 2.0
            i = 0
            while i <= t:
                S += (i + 1) * X[i]
                i += 1
        else:
            i = 0
            while i < w:
                S += (w - i) * X[t - i]
                i += 1
        ma[t] = S / m
    return ma


@njit(cache=True)
def _wma_2d(X, w):
    T, N = X.shape
    out = np.empty((T, N), dtype=np.float64)
    for n in range(N):
        out[:, n] = _wma_1d(X[:, n].copy(), w)
    return out


@njit(cache=True)
def _ema_1d(X, alpha):
    T = X.shape[0]
    ma = np.empty(T, dtype=np.float64)
    ma[0] = X[0]
    for t in range(1, T):
        ma[t] = alpha * ma[t - 1] + (1.0 - alpha) * X[t]
    return ma


@njit(cache=True)
def _ema_2d(X, alpha):
    T, N = X.shape
    out = np.empty((T, N), dtype=np.float64)
    for n in range(N):
        out[:, n] = _ema_1d(X[:, n].copy(), alpha)
    return out


@njit(cache=True)
def _smstd_1d(X, w, d):
    T = X.shape[0]
    sd = np.empty(T, dtype=np.float64)
    S = 0.0
    S2 = 0.0
    _w = 1.0
    _w_d = 1.0
    for t in range(T):
        if t < w:
            _w = float(t + 1)
            _w_d = float(t + 1 - d)
            sub_X = 0.0
        else:
            sub_X = X[t - w]
        S += X[t] - sub_X
        S2 += X[t] * X[t] - sub_X * sub_X
        if t < d:
            sd[t] = 0.0
        else:
            sd[t] = np.sqrt((S2 - (S / _w) * S) / _w_d)
    return sd


@njit(cache=True)
def _smstd_2d(X, w, d):
    T, N = X.shape
    out = np.empty((T, N), dtype=np.float64)
    for n in range(N):
        out[:, n] = _smstd_1d(X[:, n].copy(), w, d)
    return out


@njit(cache=True)
def _wmstd_1d(X, w):
    T = X.shape[0]
    sd = np.empty(T, dtype=np.float64)
    m = 1.0
    for t in range(T):
        S = 0.0
        S2 = 0.0
        if t < w:
            mm = float(t + 1)
            m = mm * (mm + 1.0) / 2.0
            i = 0
            while i <= t:
                S += (i + 1) * X[i]
                S2 += (i + 1) * X[i] * X[i]
                i += 1
        else:
            i = 0
            while i < w:
                S += (w - i) * X[t - i]
                S2 += (w - i) * X[t - i] * X[t - i]
                i += 1
        sd[t] = np.sqrt(S2 / m - (S / m) * (S / m))
    return sd


@njit(cache=True)
def _wmstd_2d(X, w):
    T, N = X.shape
    out = np.empty((T, N), dtype=np.float64)
    for n in range(N):
        out[:, n] = _wmstd_1d(X[:, n].copy(), w)
    return out


@njit(cache=True)
def _emstd_1d(X, alpha):
    T = X.shape[0]
    sd = np.empty(T, dtype=np.float64)
    m = X[0]
    sd[0] = 0.0
    for t in range(1, T):
        m = alpha * m + (1.0 - alpha) * X[t]
        m2 = (1.0 - alpha) * (X[t] - m) * (X[t] - m)
        sd[t] = np.sqrt(alpha * sd[t - 1] * sd[t - 1] + m2)
    return sd


@njit(cache=True)
def _emstd_2d(X, alpha):
    T, N = X.shape
    out = np.empty((T, N), dtype=np.float64)
    for n in range(N):
        out[:, n] = _emstd_1d(X[:, n].copy(), alpha)
    return out

# =========================================================================== #
#                               Moving Averages                               #
# =========================================================================== #


[docs] @WrapperArray('dtype', 'axis', 'window') def sma(X: NDArray, w: int | None = None, axis: int = 0, dtype=None) -> NDArray: r""" Compute simple moving average(s) of size `w` for each `X`' series. Equally weighted average over a sliding window of length ``w``. Reacts slowly to new information but is robust to noise and is the most common smoother in technical analysis. For a smoother that reacts faster to recent observations, see :func:`ema` (exponential weighting) or :func:`wma` (linear weighting). The first ``w-1`` values use a shrinking window (i.e. ``sma_t`` for ``t < w-1`` averages only the available observations, never NaN). .. math:: sma^w_t(X) = \frac{1}{w} \sum^{w-1}_{i=0} X_{t-i} Parameters ---------- X : np.ndarray[dtype, ndim=1 or 2] Elements to compute the moving average. w : int, optional Size of the lagged window of the moving average, must be positive. If ``w is None`` or ``w=0``, then ``w=X.shape[axis]``. Default is None. axis : {0, 1}, optional Axis along wich the computation is done. Default is 0. dtype : np.dtype, optional The type of the output array. If `dtype` is not given, infer the data type from `X` input. Returns ------- np.ndarray[dtype, ndim=1 or 2] Simple moving average of each series. Examples -------- >>> X = np.array([60, 100, 80, 120, 160, 80]) >>> sma(X, w=3, dtype=np.float64, axis=0) array([ 60., 80., 80., 100., 120., 120.]) >>> X = np.array([[60, 60], [100, 100], [80, 80], ... [120, 120], [160, 160], [80, 80]]) >>> sma(X, w=3, dtype=np.float64, axis=0) array([[ 60., 60.], [ 80., 80.], [ 80., 80.], [100., 100.], [120., 120.], [120., 120.]]) >>> sma(X, w=3, dtype=np.float64, axis=1) array([[ 60., 60.], [100., 100.], [ 80., 80.], [120., 120.], [160., 160.], [ 80., 80.]]) See Also -------- wma, ema, smstd """ return _sma(X, w)
def _sma(X, w): if len(X.shape) == 2: return _sma_2d(X, w) return _sma_1d(X, w)
[docs] @WrapperArray('dtype', 'axis', 'window') def wma(X: NDArray, w: int | None = None, axis: int = 0, dtype=None) -> NDArray: r""" Compute weighted moving average(s) of size `w` for each `X`' series. .. math:: wma^w_t(X) = \frac{2}{w (w-1)} \sum^{w-1}_{i=0} (w-i) \times X_{t-i} Parameters ---------- X : np.ndarray[dtype, ndim=1 or 2] Elements to compute the moving average. w : int, optional Size of the lagged window of the moving average, must be positive. If ``w is None`` or ``w=0``, then ``w=X.shape[axis]``. Default is None. axis : {0, 1}, optional Axis along wich the computation is done. Default is 0. dtype : np.dtype, optional The type of the output array. If `dtype` is not given, infer the data type from `X` input. Returns ------- np.ndarray[dtype, ndim=1 or 2] Weighted moving average of each series. Examples -------- >>> X = np.array([60, 100, 80, 120, 160, 80]) >>> wma(X, w=3, dtype=np.float64) array([ 60. , 86.66666667, 83.33333333, 103.33333333, 133.33333333, 113.33333333]) >>> X = X.reshape([6, 1]) >>> wma(X, w=3, dtype=np.float64).flatten() array([ 60. , 86.66666667, 83.33333333, 103.33333333, 133.33333333, 113.33333333]) See Also -------- sma, ema, wmstd """ return _wma(X, w)
def _wma(X, w): if len(X.shape) == 2: return _wma_2d(X, w) return _wma_1d(X, w)
[docs] @WrapperArray('dtype', 'axis') def ema(X: NDArray, alpha: float = 0.94, w: int | None = None, axis: int = 0, dtype=None) -> NDArray: r""" Compute exponential moving average(s) for each `X`' series. Geometrically decaying weighted average that gives more importance to recent observations. Reacts faster than :func:`sma` to regime changes; smaller ``alpha`` (or smaller equivalent window ``w``) increases reactivity at the cost of more noise. The recursive formulation makes computation O(T) per series, with no need to store the full window. Either ``alpha`` (smoothing factor in ``[0, 1]``) or ``w`` (window size mapped to ``alpha = 1 - 2 / (1 + w)``) can be specified. .. math:: ema^{\apha}_t(X) = \alpha \times ema^{\alpha}_{t-1} + (1-\alpha) \times X_t Parameters ---------- X : np.ndarray[dtype, ndim=1 or 2] Elements to compute the moving average. alpha : float, optional These coefficient represents the degree of weighting decrease, default is 0.94 corresponding at 20 lags memory. w : int, optional Size of the lagged window of the moving average, must be strictly positive. If ``w is None`` the window is ignored and the parameter `alpha` is used. Default is None. axis : {0, 1}, optional Axis along wich the computation is done. Default is 0. dtype : np.dtype, optional The type of the output array. If `dtype` is not given, infer the data type from `X` input. Returns ------- np.ndarray[dtype, ndim=1 or 2] Exponential moving average of each series. Examples -------- >>> X = np.array([60, 100, 80, 120, 160, 80]) >>> ema(X, w=3, dtype=np.float64) array([ 60., 80., 80., 100., 130., 105.]) >>> ema(X, alpha=0.5, dtype=np.float64) array([ 60., 80., 80., 100., 130., 105.]) Notes ----- If the lagged window `w` is specified :math:`\alpha` is overwritten by :math:`\alpha = 1 - \frac{2}{1 + w}` See Also -------- sma, wma, emstd """ if w is None: pass elif w <= 0: raise ValueError('lagged window of size {} is not available, \ must be greater than 0.'.format(w)) else: alpha = 1 - 2 / (1 + w) return _ema(X, alpha)
def _ema(X, alpha): if len(X.shape) == 2: return _ema_2d(X, float(alpha)) return _ema_1d(X, float(alpha)) # =========================================================================== # # Moving Standard Deviation # # =========================================================================== #
[docs] @WrapperArray('dtype', 'axis', 'window', 'ddof') def smstd(X: NDArray, w: int | None = None, ddof: int = 0, axis: int = 0, dtype=None) -> NDArray: r""" Compute simple moving standard deviation(s) for each `X`' series'. .. math:: smstd^w_t(X) = \sqrt{\frac{1}{w}\sum^{w-1}_{i=0} (X_{t-i} - sma^w_t)^2} Parameters ---------- X : np.ndarray[dtype, ndim=1 or 2] Elements to compute the moving standard deviation. w : int, optional Size of the lagged window of the moving average, must be positive. If ``w is None`` or ``w=0``, then ``w=X.shape[axis]``. Default is None. ddof : int, optional Means Delta Degrees of Freedom, the divisor used in calculations is ``w - ddof`` (must be strictly positive), where ``w`` represents the number of elements in time axis. Default is 0. axis : {0, 1}, optional Axis along wich the computation is done. Default is 0. dtype : np.dtype, optional The type of the output array. If `dtype` is not given, infer the data type from `X` input. Returns ------- np.ndarray[dtype, ndim=1 or 2] Simple moving standard deviation of each series. Examples -------- >>> X = np.array([60, 100, 80, 120, 160, 80]) >>> smstd(X, w=3, dtype=np.float64) array([ 0. , 20. , 16.32993162, 16.32993162, 32.65986324, 32.65986324]) >>> smstd(X.reshape([6, 1]), w=3, dtype=np.float64).flatten() array([ 0. , 20. , 16.32993162, 16.32993162, 32.65986324, 32.65986324]) See Also -------- sma, wmstd, emstd """ if ddof >= w: # type: ignore[operator] raise ValueError( 'size of the lagged window (w={}) must be strictly greater than ' 'degree of freedom (ddof={})'.format(w, ddof) ) return _smstd(X, w, ddof=ddof)
def _smstd(X, w, ddof=0): if len(X.shape) == 2: return _smstd_2d(X, w, ddof) return _smstd_1d(X, w, ddof)
[docs] @WrapperArray('dtype', 'axis', 'window') def wmstd(X: NDArray, w: int | None = None, axis: int = 0, dtype=None) -> NDArray: r""" Compute weighted moving standard(s) deviation for each `X`' series'. .. math:: wma^w_t(X) = \frac{2}{w (w-1)} \sum^{w-1}_{i=0} (w-i) \times X_{t-i} \\ wmstd^w_t(X) = \sqrt{\frac{2}{w(w-1)} \sum^{w-1}_{i=0} (w-i) \times (X_{t-i} - wma^w_t(X))^2} Parameters ---------- X : np.ndarray[dtype, ndim=1 or 2] Elements to compute the moving standard deviation. w : int, optional Size of the lagged window of the moving average, must be positive. If ``w is None`` or ``w=0``, then ``w=X.shape[axis]``. Default is None. axis : {0, 1}, optional Axis along wich the computation is done. Default is 0. dtype : np.dtype, optional The type of the output array. If `dtype` is not given, infer the data type from `X` input. Returns ------- np.ndarray[dtype, ndim=1 or 2] Weighted moving standard deviation of each series. Examples -------- >>> X = np.array([60, 100, 80, 120, 160, 80]) >>> wmstd(X, w=3, dtype=np.float64) array([ 0. , 18.85618083, 13.74368542, 17.95054936, 29.8142397 , 35.90109871]) >>> wmstd(X.reshape([6, 1]), w=3, dtype=np.float64).flatten() array([ 0. , 18.85618083, 13.74368542, 17.95054936, 29.8142397 , 35.90109871]) See Also -------- wma, smstd, emstd """ return _wmstd(X, w)
def _wmstd(X, w): if len(X.shape) == 2: return _wmstd_2d(X, w) return _wmstd_1d(X, w)
[docs] @WrapperArray('dtype', 'axis') def emstd(X: NDArray, alpha: float = 0.94, w: int | None = None, axis: int = 0, dtype=None) -> NDArray: r""" Compute exponential moving standard deviation(s) for each `X`' series. .. math:: emstd^{\alpha}_t(X) = \sqrt{\alpha\times emstd^{\alpha}_{t-1}^2 + (1-\alpha) \times X_t^2} Parameters ---------- X : np.ndarray[dtype, ndim=1 or 2] Elements to compute the moving standard deviation. alpha : float, optional These coefficient represents the degree of weighting decrease, default is 0.94 corresponding at 20 lags memory. w : int, optional Size of the lagged window of the moving average, must be strictly positive. If ``w is None`` the window is ignored and the parameter `alpha` is used. Default is None. axis : {0, 1}, optional Axis along wich the computation is done. Default is 0. dtype : np.dtype, optional The type of the output array. If `dtype` is not given, infer the data type from `X` input. Returns ------- np.ndarray[dtype, ndim=1 or 2] Exponential moving standard deviation of each series. Examples -------- >>> X = np.array([60, 100, 80, 120, 160, 80]) >>> emstd(X, w=3, dtype=np.float64) array([ 0. , 14.14213562, 10. , 15.8113883 , 23.97915762, 24.49489743]) >>> emstd(X.reshape([6, 1]), w=3, dtype=np.float64).flatten() array([ 0. , 14.14213562, 10. , 15.8113883 , 23.97915762, 24.49489743]) >>> emstd(X, alpha=0.5, dtype=np.float64) array([ 0. , 14.14213562, 10. , 15.8113883 , 23.97915762, 24.49489743]) Notes ----- If the lagged window `w` is specified :math:`\alpha` is overwritten by :math:`\alpha = 1 - \frac{2}{1 + w}` See Also -------- ema, smstd, wmstd """ if w is None: pass elif w <= 0: raise ValueError('lagged window of size {} is not available, \ must be greater than 0.'.format(w)) else: alpha = 1 - 2 / (1 + w) return _emstd(X, alpha)
def _emstd(X, alpha): if len(X.shape) == 2: return _emstd_2d(X, float(alpha)) return _emstd_1d(X, float(alpha)) if __name__ == '__main__': import doctest doctest.testmod()