Source code for fynance.features.momentums

#!/usr/bin/env python3
# coding: utf-8
# @Author: ArthurBernard
# @Email: arthur.bernard.92@gmail.com
# @Date: 2019-02-20 19:57:13
# @Last modified by: ArthurBernard
# @Last modified time: 2019-11-05 15:55:19

""" Statistical momentum functions.

Rolling moving averages and moving standard deviations in three flavors
— simple, weighted and exponential — used as building blocks for
technical indicators, scaling and signal generation.

Implementations are vectorized with NumPy and accept 1-D or 2-D inputs.
The window size ``w`` defaults to the full length of the series; the
``axis`` keyword controls the time axis on 2-D arrays.

Main entry points
-----------------
- :func:`sma`, :func:`wma`, :func:`ema` — moving averages (simple,
  weighted, exponential).
- :func:`smstd`, :func:`wmstd`, :func:`emstd` — moving standard
  deviations.

"""

from __future__ import annotations

# Built-in packages
# External packages
import numpy as np
from numpy.typing import NDArray

from fynance._wrappers import WrapperArray

# Local packages
from fynance.features.momentums_cy import *

__all__ = [
    'sma', 'wma', 'ema', 'smstd', 'wmstd', 'emstd',
]

# TODO : - Momentums of order 3
#        - Momentums of order 4
#        - Momentums of order w

# =========================================================================== #
#                               Moving Averages                               #
# =========================================================================== #


[docs] @WrapperArray('dtype', 'axis', 'window') def sma(X: NDArray, w: int | None = None, axis: int = 0, dtype=None) -> NDArray: r""" Compute simple moving average(s) of size `w` for each `X`' series. Equally weighted average over a sliding window of length ``w``. Reacts slowly to new information but is robust to noise and is the most common smoother in technical analysis. For a smoother that reacts faster to recent observations, see :func:`ema` (exponential weighting) or :func:`wma` (linear weighting). The first ``w-1`` values use a shrinking window (i.e. ``sma_t`` for ``t < w-1`` averages only the available observations, never NaN). .. math:: sma^w_t(X) = \frac{1}{w} \sum^{w-1}_{i=0} X_{t-i} Parameters ---------- X : np.ndarray[dtype, ndim=1 or 2] Elements to compute the moving average. w : int, optional Size of the lagged window of the moving average, must be positive. If ``w is None`` or ``w=0``, then ``w=X.shape[axis]``. Default is None. axis : {0, 1}, optional Axis along wich the computation is done. Default is 0. dtype : np.dtype, optional The type of the output array. If `dtype` is not given, infer the data type from `X` input. Returns ------- np.ndarray[dtype, ndim=1 or 2] Simple moving average of each series. Examples -------- >>> X = np.array([60, 100, 80, 120, 160, 80]) >>> sma(X, w=3, dtype=np.float64, axis=0) array([ 60., 80., 80., 100., 120., 120.]) >>> X = np.array([[60, 60], [100, 100], [80, 80], ... [120, 120], [160, 160], [80, 80]]) >>> sma(X, w=3, dtype=np.float64, axis=0) array([[ 60., 60.], [ 80., 80.], [ 80., 80.], [100., 100.], [120., 120.], [120., 120.]]) >>> sma(X, w=3, dtype=np.float64, axis=1) array([[ 60., 60.], [100., 100.], [ 80., 80.], [120., 120.], [160., 160.], [ 80., 80.]]) See Also -------- wma, ema, smstd """ return _sma(X, w)
def _sma(X, w): if len(X.shape) == 2: return np.asarray(sma_cy_2d(X, w)) return np.asarray(sma_cy_1d(X, w))
[docs] @WrapperArray('dtype', 'axis', 'window') def wma(X: NDArray, w: int | None = None, axis: int = 0, dtype=None) -> NDArray: r""" Compute weighted moving average(s) of size `w` for each `X`' series. .. math:: wma^w_t(X) = \frac{2}{w (w-1)} \sum^{w-1}_{i=0} (w-i) \times X_{t-i} Parameters ---------- X : np.ndarray[dtype, ndim=1 or 2] Elements to compute the moving average. w : int, optional Size of the lagged window of the moving average, must be positive. If ``w is None`` or ``w=0``, then ``w=X.shape[axis]``. Default is None. axis : {0, 1}, optional Axis along wich the computation is done. Default is 0. dtype : np.dtype, optional The type of the output array. If `dtype` is not given, infer the data type from `X` input. Returns ------- np.ndarray[dtype, ndim=1 or 2] Weighted moving average of each series. Examples -------- >>> X = np.array([60, 100, 80, 120, 160, 80]) >>> wma(X, w=3, dtype=np.float64) array([ 60. , 86.66666667, 83.33333333, 103.33333333, 133.33333333, 113.33333333]) >>> X = X.reshape([6, 1]) >>> wma(X, w=3, dtype=np.float64).flatten() array([ 60. , 86.66666667, 83.33333333, 103.33333333, 133.33333333, 113.33333333]) See Also -------- sma, ema, wmstd """ return _wma(X, w)
def _wma(X, w): if len(X.shape) == 2: return np.asarray(wma_cy_2d(X, w)) return np.asarray(wma_cy_1d(X, w))
[docs] @WrapperArray('dtype', 'axis') def ema(X: NDArray, alpha: float = 0.94, w: int | None = None, axis: int = 0, dtype=None) -> NDArray: r""" Compute exponential moving average(s) for each `X`' series. Geometrically decaying weighted average that gives more importance to recent observations. Reacts faster than :func:`sma` to regime changes; smaller ``alpha`` (or smaller equivalent window ``w``) increases reactivity at the cost of more noise. The recursive formulation makes computation O(T) per series, with no need to store the full window. Either ``alpha`` (smoothing factor in ``[0, 1]``) or ``w`` (window size mapped to ``alpha = 1 - 2 / (1 + w)``) can be specified. .. math:: ema^{\apha}_t(X) = \alpha \times ema^{\alpha}_{t-1} + (1-\alpha) \times X_t Parameters ---------- X : np.ndarray[dtype, ndim=1 or 2] Elements to compute the moving average. alpha : float, optional These coefficient represents the degree of weighting decrease, default is 0.94 corresponding at 20 lags memory. w : int, optional Size of the lagged window of the moving average, must be strictly positive. If ``w is None`` the window is ignored and the parameter `alpha` is used. Default is None. axis : {0, 1}, optional Axis along wich the computation is done. Default is 0. dtype : np.dtype, optional The type of the output array. If `dtype` is not given, infer the data type from `X` input. Returns ------- np.ndarray[dtype, ndim=1 or 2] Exponential moving average of each series. Examples -------- >>> X = np.array([60, 100, 80, 120, 160, 80]) >>> ema(X, w=3, dtype=np.float64) array([ 60., 80., 80., 100., 130., 105.]) >>> ema(X, alpha=0.5, dtype=np.float64) array([ 60., 80., 80., 100., 130., 105.]) Notes ----- If the lagged window `w` is specified :math:`\alpha` is overwritten by :math:`\alpha = 1 - \frac{2}{1 + w}` See Also -------- sma, wma, emstd """ if w is None: pass elif w <= 0: raise ValueError('lagged window of size {} is not available, \ must be greater than 0.'.format(w)) else: alpha = 1 - 2 / (1 + w) return _ema(X, alpha)
def _ema(X, alpha): if len(X.shape) == 2: return np.asarray(ema_cy_2d(X, float(alpha))) return np.asarray(ema_cy_1d(X, float(alpha))) # =========================================================================== # # Moving Standard Deviation # # =========================================================================== #
[docs] @WrapperArray('dtype', 'axis', 'window', 'ddof') def smstd(X: NDArray, w: int | None = None, ddof: int = 0, axis: int = 0, dtype=None) -> NDArray: r""" Compute simple moving standard deviation(s) for each `X`' series'. .. math:: smstd^w_t(X) = \sqrt{\frac{1}{w}\sum^{w-1}_{i=0} (X_{t-i} - sma^w_t)^2} Parameters ---------- X : np.ndarray[dtype, ndim=1 or 2] Elements to compute the moving standard deviation. w : int, optional Size of the lagged window of the moving average, must be positive. If ``w is None`` or ``w=0``, then ``w=X.shape[axis]``. Default is None. ddof : int, optional Means Delta Degrees of Freedom, the divisor used in calculations is ``w - ddof`` (must be strictly positive), where ``w`` represents the number of elements in time axis. Default is 0. axis : {0, 1}, optional Axis along wich the computation is done. Default is 0. dtype : np.dtype, optional The type of the output array. If `dtype` is not given, infer the data type from `X` input. Returns ------- np.ndarray[dtype, ndim=1 or 2] Simple moving standard deviation of each series. Examples -------- >>> X = np.array([60, 100, 80, 120, 160, 80]) >>> smstd(X, w=3, dtype=np.float64) array([ 0. , 20. , 16.32993162, 16.32993162, 32.65986324, 32.65986324]) >>> smstd(X.reshape([6, 1]), w=3, dtype=np.float64).flatten() array([ 0. , 20. , 16.32993162, 16.32993162, 32.65986324, 32.65986324]) See Also -------- sma, wmstd, emstd """ if ddof >= w: raise ValueError( 'size of the lagged window (w={}) must be strictly greater than ' 'degree of freedom (ddof={})'.format(w, ddof) ) return _smstd(X, w, ddof=ddof)
def _smstd(X, w, ddof=0): if len(X.shape) == 2: return np.asarray(smstd_cy_2d(X, w, ddof)) return np.asarray(smstd_cy_1d(X, w, ddof))
[docs] @WrapperArray('dtype', 'axis', 'window') def wmstd(X: NDArray, w: int | None = None, axis: int = 0, dtype=None) -> NDArray: r""" Compute weighted moving standard(s) deviation for each `X`' series'. .. math:: wma^w_t(X) = \frac{2}{w (w-1)} \sum^{w-1}_{i=0} (w-i) \times X_{t-i} \\ wmstd^w_t(X) = \sqrt{\frac{2}{w(w-1)} \sum^{w-1}_{i=0} (w-i) \times (X_{t-i} - wma^w_t(X))^2} Parameters ---------- X : np.ndarray[dtype, ndim=1 or 2] Elements to compute the moving standard deviation. w : int, optional Size of the lagged window of the moving average, must be positive. If ``w is None`` or ``w=0``, then ``w=X.shape[axis]``. Default is None. axis : {0, 1}, optional Axis along wich the computation is done. Default is 0. dtype : np.dtype, optional The type of the output array. If `dtype` is not given, infer the data type from `X` input. Returns ------- np.ndarray[dtype, ndim=1 or 2] Weighted moving standard deviation of each series. Examples -------- >>> X = np.array([60, 100, 80, 120, 160, 80]) >>> wmstd(X, w=3, dtype=np.float64) array([ 0. , 18.85618083, 13.74368542, 17.95054936, 29.8142397 , 35.90109871]) >>> wmstd(X.reshape([6, 1]), w=3, dtype=np.float64).flatten() array([ 0. , 18.85618083, 13.74368542, 17.95054936, 29.8142397 , 35.90109871]) See Also -------- wma, smstd, emstd """ return _wmstd(X, w)
def _wmstd(X, w): if len(X.shape) == 2: return np.asarray(wmstd_cy_2d(X, w)) return np.asarray(wmstd_cy_1d(X, w))
[docs] @WrapperArray('dtype', 'axis') def emstd(X: NDArray, alpha: float = 0.94, w: int | None = None, axis: int = 0, dtype=None) -> NDArray: r""" Compute exponential moving standard deviation(s) for each `X`' series. .. math:: emstd^{\alpha}_t(X) = \sqrt{\alpha\times emstd^{\alpha}_{t-1}^2 + (1-\alpha) \times X_t^2} Parameters ---------- X : np.ndarray[dtype, ndim=1 or 2] Elements to compute the moving standard deviation. alpha : float, optional These coefficient represents the degree of weighting decrease, default is 0.94 corresponding at 20 lags memory. w : int, optional Size of the lagged window of the moving average, must be strictly positive. If ``w is None`` the window is ignored and the parameter `alpha` is used. Default is None. axis : {0, 1}, optional Axis along wich the computation is done. Default is 0. dtype : np.dtype, optional The type of the output array. If `dtype` is not given, infer the data type from `X` input. Returns ------- np.ndarray[dtype, ndim=1 or 2] Exponential moving standard deviation of each series. Examples -------- >>> X = np.array([60, 100, 80, 120, 160, 80]) >>> emstd(X, w=3, dtype=np.float64) array([ 0. , 14.14213562, 10. , 15.8113883 , 23.97915762, 24.49489743]) >>> emstd(X.reshape([6, 1]), w=3, dtype=np.float64).flatten() array([ 0. , 14.14213562, 10. , 15.8113883 , 23.97915762, 24.49489743]) >>> emstd(X, alpha=0.5, dtype=np.float64) array([ 0. , 14.14213562, 10. , 15.8113883 , 23.97915762, 24.49489743]) Notes ----- If the lagged window `w` is specified :math:`\alpha` is overwritten by :math:`\alpha = 1 - \frac{2}{1 + w}` See Also -------- ema, smstd, wmstd """ if w is None: pass elif w <= 0: raise ValueError('lagged window of size {} is not available, \ must be greater than 0.'.format(w)) else: alpha = 1 - 2 / (1 + w) return _emstd(X, alpha)
def _emstd(X, alpha): if len(X.shape) == 2: return np.asarray(emstd_cy_2d(X, float(alpha))) return np.asarray(emstd_cy_1d(X, float(alpha))) if __name__ == '__main__': import doctest doctest.testmod()