#!/usr/bin/env python3
# coding: utf-8
# @Author: ArthurBernard
# @Email: arthur.bernard.92@gmail.com
# @Date: 2019-02-20 19:57:13
# @Last modified by: ArthurBernard
# @Last modified time: 2019-11-05 15:55:19
""" Statistical momentum functions.
Rolling moving averages and moving standard deviations in three flavors
— simple, weighted and exponential — used as building blocks for
technical indicators, scaling and signal generation.
Implementations are vectorized with NumPy and accept 1-D or 2-D inputs.
The window size ``w`` defaults to the full length of the series; the
``axis`` keyword controls the time axis on 2-D arrays.
Main entry points
-----------------
- :func:`sma`, :func:`wma`, :func:`ema` — moving averages (simple,
weighted, exponential).
- :func:`smstd`, :func:`wmstd`, :func:`emstd` — moving standard
deviations.
"""
from __future__ import annotations
# Built-in packages
# External packages
import numpy as np
from numpy.typing import NDArray
from fynance._wrappers import WrapperArray
# Local packages
from fynance.features.momentums_cy import *
__all__ = [
'sma', 'wma', 'ema', 'smstd', 'wmstd', 'emstd',
]
# TODO : - Momentums of order 3
# - Momentums of order 4
# - Momentums of order w
# =========================================================================== #
# Moving Averages #
# =========================================================================== #
[docs]
@WrapperArray('dtype', 'axis', 'window')
def sma(X: NDArray, w: int | None = None, axis: int = 0, dtype=None) -> NDArray:
r""" Compute simple moving average(s) of size `w` for each `X`' series.
Equally weighted average over a sliding window of length ``w``.
Reacts slowly to new information but is robust to noise and is the
most common smoother in technical analysis. For a smoother that
reacts faster to recent observations, see :func:`ema` (exponential
weighting) or :func:`wma` (linear weighting).
The first ``w-1`` values use a shrinking window (i.e. ``sma_t`` for
``t < w-1`` averages only the available observations, never NaN).
.. math::
sma^w_t(X) = \frac{1}{w} \sum^{w-1}_{i=0} X_{t-i}
Parameters
----------
X : np.ndarray[dtype, ndim=1 or 2]
Elements to compute the moving average.
w : int, optional
Size of the lagged window of the moving average, must be positive. If
``w is None`` or ``w=0``, then ``w=X.shape[axis]``. Default is None.
axis : {0, 1}, optional
Axis along wich the computation is done. Default is 0.
dtype : np.dtype, optional
The type of the output array. If `dtype` is not given, infer the data
type from `X` input.
Returns
-------
np.ndarray[dtype, ndim=1 or 2]
Simple moving average of each series.
Examples
--------
>>> X = np.array([60, 100, 80, 120, 160, 80])
>>> sma(X, w=3, dtype=np.float64, axis=0)
array([ 60., 80., 80., 100., 120., 120.])
>>> X = np.array([[60, 60], [100, 100], [80, 80],
... [120, 120], [160, 160], [80, 80]])
>>> sma(X, w=3, dtype=np.float64, axis=0)
array([[ 60., 60.],
[ 80., 80.],
[ 80., 80.],
[100., 100.],
[120., 120.],
[120., 120.]])
>>> sma(X, w=3, dtype=np.float64, axis=1)
array([[ 60., 60.],
[100., 100.],
[ 80., 80.],
[120., 120.],
[160., 160.],
[ 80., 80.]])
See Also
--------
wma, ema, smstd
"""
return _sma(X, w)
def _sma(X, w):
if len(X.shape) == 2:
return np.asarray(sma_cy_2d(X, w))
return np.asarray(sma_cy_1d(X, w))
[docs]
@WrapperArray('dtype', 'axis', 'window')
def wma(X: NDArray, w: int | None = None, axis: int = 0, dtype=None) -> NDArray:
r""" Compute weighted moving average(s) of size `w` for each `X`' series.
.. math::
wma^w_t(X) = \frac{2}{w (w-1)} \sum^{w-1}_{i=0} (w-i) \times X_{t-i}
Parameters
----------
X : np.ndarray[dtype, ndim=1 or 2]
Elements to compute the moving average.
w : int, optional
Size of the lagged window of the moving average, must be positive. If
``w is None`` or ``w=0``, then ``w=X.shape[axis]``. Default is None.
axis : {0, 1}, optional
Axis along wich the computation is done. Default is 0.
dtype : np.dtype, optional
The type of the output array. If `dtype` is not given, infer the data
type from `X` input.
Returns
-------
np.ndarray[dtype, ndim=1 or 2]
Weighted moving average of each series.
Examples
--------
>>> X = np.array([60, 100, 80, 120, 160, 80])
>>> wma(X, w=3, dtype=np.float64)
array([ 60. , 86.66666667, 83.33333333, 103.33333333,
133.33333333, 113.33333333])
>>> X = X.reshape([6, 1])
>>> wma(X, w=3, dtype=np.float64).flatten()
array([ 60. , 86.66666667, 83.33333333, 103.33333333,
133.33333333, 113.33333333])
See Also
--------
sma, ema, wmstd
"""
return _wma(X, w)
def _wma(X, w):
if len(X.shape) == 2:
return np.asarray(wma_cy_2d(X, w))
return np.asarray(wma_cy_1d(X, w))
[docs]
@WrapperArray('dtype', 'axis')
def ema(X: NDArray, alpha: float = 0.94, w: int | None = None, axis: int = 0, dtype=None) -> NDArray:
r""" Compute exponential moving average(s) for each `X`' series.
Geometrically decaying weighted average that gives more importance
to recent observations. Reacts faster than :func:`sma` to regime
changes; smaller ``alpha`` (or smaller equivalent window ``w``)
increases reactivity at the cost of more noise. The recursive
formulation makes computation O(T) per series, with no need to
store the full window.
Either ``alpha`` (smoothing factor in ``[0, 1]``) or ``w`` (window
size mapped to ``alpha = 1 - 2 / (1 + w)``) can be specified.
.. math::
ema^{\apha}_t(X) = \alpha \times ema^{\alpha}_{t-1} + (1-\alpha)
\times X_t
Parameters
----------
X : np.ndarray[dtype, ndim=1 or 2]
Elements to compute the moving average.
alpha : float, optional
These coefficient represents the degree of weighting decrease, default
is 0.94 corresponding at 20 lags memory.
w : int, optional
Size of the lagged window of the moving average, must be strictly
positive. If ``w is None`` the window is ignored and the parameter
`alpha` is used. Default is None.
axis : {0, 1}, optional
Axis along wich the computation is done. Default is 0.
dtype : np.dtype, optional
The type of the output array. If `dtype` is not given, infer the data
type from `X` input.
Returns
-------
np.ndarray[dtype, ndim=1 or 2]
Exponential moving average of each series.
Examples
--------
>>> X = np.array([60, 100, 80, 120, 160, 80])
>>> ema(X, w=3, dtype=np.float64)
array([ 60., 80., 80., 100., 130., 105.])
>>> ema(X, alpha=0.5, dtype=np.float64)
array([ 60., 80., 80., 100., 130., 105.])
Notes
-----
If the lagged window `w` is specified :math:`\alpha` is overwritten by
:math:`\alpha = 1 - \frac{2}{1 + w}`
See Also
--------
sma, wma, emstd
"""
if w is None:
pass
elif w <= 0:
raise ValueError('lagged window of size {} is not available, \
must be greater than 0.'.format(w))
else:
alpha = 1 - 2 / (1 + w)
return _ema(X, alpha)
def _ema(X, alpha):
if len(X.shape) == 2:
return np.asarray(ema_cy_2d(X, float(alpha)))
return np.asarray(ema_cy_1d(X, float(alpha)))
# =========================================================================== #
# Moving Standard Deviation #
# =========================================================================== #
[docs]
@WrapperArray('dtype', 'axis', 'window', 'ddof')
def smstd(X: NDArray, w: int | None = None, ddof: int = 0, axis: int = 0, dtype=None) -> NDArray:
r""" Compute simple moving standard deviation(s) for each `X`' series'.
.. math::
smstd^w_t(X) = \sqrt{\frac{1}{w}\sum^{w-1}_{i=0} (X_{t-i} - sma^w_t)^2}
Parameters
----------
X : np.ndarray[dtype, ndim=1 or 2]
Elements to compute the moving standard deviation.
w : int, optional
Size of the lagged window of the moving average, must be positive. If
``w is None`` or ``w=0``, then ``w=X.shape[axis]``. Default is None.
ddof : int, optional
Means Delta Degrees of Freedom, the divisor used in calculations is
``w - ddof`` (must be strictly positive), where ``w`` represents the
number of elements in time axis. Default is 0.
axis : {0, 1}, optional
Axis along wich the computation is done. Default is 0.
dtype : np.dtype, optional
The type of the output array. If `dtype` is not given, infer the data
type from `X` input.
Returns
-------
np.ndarray[dtype, ndim=1 or 2]
Simple moving standard deviation of each series.
Examples
--------
>>> X = np.array([60, 100, 80, 120, 160, 80])
>>> smstd(X, w=3, dtype=np.float64)
array([ 0. , 20. , 16.32993162, 16.32993162, 32.65986324,
32.65986324])
>>> smstd(X.reshape([6, 1]), w=3, dtype=np.float64).flatten()
array([ 0. , 20. , 16.32993162, 16.32993162, 32.65986324,
32.65986324])
See Also
--------
sma, wmstd, emstd
"""
if ddof >= w:
raise ValueError(
'size of the lagged window (w={}) must be strictly greater than '
'degree of freedom (ddof={})'.format(w, ddof)
)
return _smstd(X, w, ddof=ddof)
def _smstd(X, w, ddof=0):
if len(X.shape) == 2:
return np.asarray(smstd_cy_2d(X, w, ddof))
return np.asarray(smstd_cy_1d(X, w, ddof))
[docs]
@WrapperArray('dtype', 'axis', 'window')
def wmstd(X: NDArray, w: int | None = None, axis: int = 0, dtype=None) -> NDArray:
r""" Compute weighted moving standard(s) deviation for each `X`' series'.
.. math::
wma^w_t(X) = \frac{2}{w (w-1)} \sum^{w-1}_{i=0} (w-i) \times X_{t-i} \\
wmstd^w_t(X) = \sqrt{\frac{2}{w(w-1)} \sum^{w-1}_{i=0}
(w-i) \times (X_{t-i} - wma^w_t(X))^2}
Parameters
----------
X : np.ndarray[dtype, ndim=1 or 2]
Elements to compute the moving standard deviation.
w : int, optional
Size of the lagged window of the moving average, must be positive. If
``w is None`` or ``w=0``, then ``w=X.shape[axis]``. Default is None.
axis : {0, 1}, optional
Axis along wich the computation is done. Default is 0.
dtype : np.dtype, optional
The type of the output array. If `dtype` is not given, infer the data
type from `X` input.
Returns
-------
np.ndarray[dtype, ndim=1 or 2]
Weighted moving standard deviation of each series.
Examples
--------
>>> X = np.array([60, 100, 80, 120, 160, 80])
>>> wmstd(X, w=3, dtype=np.float64)
array([ 0. , 18.85618083, 13.74368542, 17.95054936, 29.8142397 ,
35.90109871])
>>> wmstd(X.reshape([6, 1]), w=3, dtype=np.float64).flatten()
array([ 0. , 18.85618083, 13.74368542, 17.95054936, 29.8142397 ,
35.90109871])
See Also
--------
wma, smstd, emstd
"""
return _wmstd(X, w)
def _wmstd(X, w):
if len(X.shape) == 2:
return np.asarray(wmstd_cy_2d(X, w))
return np.asarray(wmstd_cy_1d(X, w))
[docs]
@WrapperArray('dtype', 'axis')
def emstd(X: NDArray, alpha: float = 0.94, w: int | None = None, axis: int = 0, dtype=None) -> NDArray:
r""" Compute exponential moving standard deviation(s) for each `X`' series.
.. math::
emstd^{\alpha}_t(X) = \sqrt{\alpha\times emstd^{\alpha}_{t-1}^2 +
(1-\alpha) \times X_t^2}
Parameters
----------
X : np.ndarray[dtype, ndim=1 or 2]
Elements to compute the moving standard deviation.
alpha : float, optional
These coefficient represents the degree of weighting decrease, default
is 0.94 corresponding at 20 lags memory.
w : int, optional
Size of the lagged window of the moving average, must be strictly
positive. If ``w is None`` the window is ignored and the parameter
`alpha` is used. Default is None.
axis : {0, 1}, optional
Axis along wich the computation is done. Default is 0.
dtype : np.dtype, optional
The type of the output array. If `dtype` is not given, infer the data
type from `X` input.
Returns
-------
np.ndarray[dtype, ndim=1 or 2]
Exponential moving standard deviation of each series.
Examples
--------
>>> X = np.array([60, 100, 80, 120, 160, 80])
>>> emstd(X, w=3, dtype=np.float64)
array([ 0. , 14.14213562, 10. , 15.8113883 , 23.97915762,
24.49489743])
>>> emstd(X.reshape([6, 1]), w=3, dtype=np.float64).flatten()
array([ 0. , 14.14213562, 10. , 15.8113883 , 23.97915762,
24.49489743])
>>> emstd(X, alpha=0.5, dtype=np.float64)
array([ 0. , 14.14213562, 10. , 15.8113883 , 23.97915762,
24.49489743])
Notes
-----
If the lagged window `w` is specified :math:`\alpha` is overwritten by
:math:`\alpha = 1 - \frac{2}{1 + w}`
See Also
--------
ema, smstd, wmstd
"""
if w is None:
pass
elif w <= 0:
raise ValueError('lagged window of size {} is not available, \
must be greater than 0.'.format(w))
else:
alpha = 1 - 2 / (1 + w)
return _emstd(X, alpha)
def _emstd(X, alpha):
if len(X.shape) == 2:
return np.asarray(emstd_cy_2d(X, float(alpha)))
return np.asarray(emstd_cy_1d(X, float(alpha)))
if __name__ == '__main__':
import doctest
doctest.testmod()