#!/usr/bin/env python3
# coding: utf-8
# @Author: ArthurBernard
# @Email: arthur.bernard.92@gmail.com
# @Date: 2019-02-20 19:57:13
# @Last modified by: ArthurBernard
# @Last modified time: 2019-11-05 15:55:19
""" Statistical momentum functions.
Rolling moving averages and moving standard deviations in three flavors
— simple, weighted and exponential — used as building blocks for
technical indicators, scaling and signal generation.
Implementations are vectorized with NumPy and accept 1-D or 2-D inputs.
The window size ``w`` defaults to the full length of the series; the
``axis`` keyword controls the time axis on 2-D arrays.
Main entry points
-----------------
- :func:`sma`, :func:`wma`, :func:`ema` — moving averages (simple,
weighted, exponential).
- :func:`smstd`, :func:`wmstd`, :func:`emstd` — moving standard
deviations.
"""
from __future__ import annotations
# Built-in packages
# External packages
import numpy as np
# Local packages
from numba import njit
from numpy.typing import NDArray
from fynance._wrappers import WrapperArray
__all__ = [
'sma', 'wma', 'ema', 'smstd', 'wmstd', 'emstd',
]
# --------------------------------------------------------------------------- #
# numba kernels (ported 1:1 from the former Cython implementation) #
# --------------------------------------------------------------------------- #
@njit(cache=True)
def _sma_1d(X, w):
T = X.shape[0]
ma = np.empty(T, dtype=np.float64)
S = 0.0
for t in range(T):
if t < w:
S += X[t]
ma[t] = S / (t + 1)
else:
S += X[t] - X[t - w]
ma[t] = S / w
return ma
@njit(cache=True)
def _sma_2d(X, w):
T, N = X.shape
out = np.empty((T, N), dtype=np.float64)
for n in range(N):
out[:, n] = _sma_1d(X[:, n].copy(), w)
return out
@njit(cache=True)
def _wma_1d(X, w):
T = X.shape[0]
ma = np.empty(T, dtype=np.float64)
m = 1.0
for t in range(T):
S = 0.0
if t < w:
mm = float(t + 1)
m = mm * (mm + 1.0) / 2.0
i = 0
while i <= t:
S += (i + 1) * X[i]
i += 1
else:
i = 0
while i < w:
S += (w - i) * X[t - i]
i += 1
ma[t] = S / m
return ma
@njit(cache=True)
def _wma_2d(X, w):
T, N = X.shape
out = np.empty((T, N), dtype=np.float64)
for n in range(N):
out[:, n] = _wma_1d(X[:, n].copy(), w)
return out
@njit(cache=True)
def _ema_1d(X, alpha):
T = X.shape[0]
ma = np.empty(T, dtype=np.float64)
ma[0] = X[0]
for t in range(1, T):
ma[t] = alpha * ma[t - 1] + (1.0 - alpha) * X[t]
return ma
@njit(cache=True)
def _ema_2d(X, alpha):
T, N = X.shape
out = np.empty((T, N), dtype=np.float64)
for n in range(N):
out[:, n] = _ema_1d(X[:, n].copy(), alpha)
return out
@njit(cache=True)
def _smstd_1d(X, w, d):
T = X.shape[0]
sd = np.empty(T, dtype=np.float64)
S = 0.0
S2 = 0.0
_w = 1.0
_w_d = 1.0
for t in range(T):
if t < w:
_w = float(t + 1)
_w_d = float(t + 1 - d)
sub_X = 0.0
else:
sub_X = X[t - w]
S += X[t] - sub_X
S2 += X[t] * X[t] - sub_X * sub_X
if t < d:
sd[t] = 0.0
else:
sd[t] = np.sqrt((S2 - (S / _w) * S) / _w_d)
return sd
@njit(cache=True)
def _smstd_2d(X, w, d):
T, N = X.shape
out = np.empty((T, N), dtype=np.float64)
for n in range(N):
out[:, n] = _smstd_1d(X[:, n].copy(), w, d)
return out
@njit(cache=True)
def _wmstd_1d(X, w):
T = X.shape[0]
sd = np.empty(T, dtype=np.float64)
m = 1.0
for t in range(T):
S = 0.0
S2 = 0.0
if t < w:
mm = float(t + 1)
m = mm * (mm + 1.0) / 2.0
i = 0
while i <= t:
S += (i + 1) * X[i]
S2 += (i + 1) * X[i] * X[i]
i += 1
else:
i = 0
while i < w:
S += (w - i) * X[t - i]
S2 += (w - i) * X[t - i] * X[t - i]
i += 1
sd[t] = np.sqrt(S2 / m - (S / m) * (S / m))
return sd
@njit(cache=True)
def _wmstd_2d(X, w):
T, N = X.shape
out = np.empty((T, N), dtype=np.float64)
for n in range(N):
out[:, n] = _wmstd_1d(X[:, n].copy(), w)
return out
@njit(cache=True)
def _emstd_1d(X, alpha):
T = X.shape[0]
sd = np.empty(T, dtype=np.float64)
m = X[0]
sd[0] = 0.0
for t in range(1, T):
m = alpha * m + (1.0 - alpha) * X[t]
m2 = (1.0 - alpha) * (X[t] - m) * (X[t] - m)
sd[t] = np.sqrt(alpha * sd[t - 1] * sd[t - 1] + m2)
return sd
@njit(cache=True)
def _emstd_2d(X, alpha):
T, N = X.shape
out = np.empty((T, N), dtype=np.float64)
for n in range(N):
out[:, n] = _emstd_1d(X[:, n].copy(), alpha)
return out
# =========================================================================== #
# Moving Averages #
# =========================================================================== #
[docs]
@WrapperArray('dtype', 'axis', 'window')
def sma(X: NDArray, w: int | None = None, axis: int = 0, dtype=None) -> NDArray:
r""" Compute simple moving average(s) of size `w` for each `X`' series.
Equally weighted average over a sliding window of length ``w``.
Reacts slowly to new information but is robust to noise and is the
most common smoother in technical analysis. For a smoother that
reacts faster to recent observations, see :func:`ema` (exponential
weighting) or :func:`wma` (linear weighting).
The first ``w-1`` values use a shrinking window (i.e. ``sma_t`` for
``t < w-1`` averages only the available observations, never NaN).
.. math::
sma^w_t(X) = \frac{1}{w} \sum^{w-1}_{i=0} X_{t-i}
Parameters
----------
X : np.ndarray[dtype, ndim=1 or 2]
Elements to compute the moving average.
w : int, optional
Size of the lagged window of the moving average, must be positive. If
``w is None`` or ``w=0``, then ``w=X.shape[axis]``. Default is None.
axis : {0, 1}, optional
Axis along wich the computation is done. Default is 0.
dtype : np.dtype, optional
The type of the output array. If `dtype` is not given, infer the data
type from `X` input.
Returns
-------
np.ndarray[dtype, ndim=1 or 2]
Simple moving average of each series.
Examples
--------
>>> X = np.array([60, 100, 80, 120, 160, 80])
>>> sma(X, w=3, dtype=np.float64, axis=0)
array([ 60., 80., 80., 100., 120., 120.])
>>> X = np.array([[60, 60], [100, 100], [80, 80],
... [120, 120], [160, 160], [80, 80]])
>>> sma(X, w=3, dtype=np.float64, axis=0)
array([[ 60., 60.],
[ 80., 80.],
[ 80., 80.],
[100., 100.],
[120., 120.],
[120., 120.]])
>>> sma(X, w=3, dtype=np.float64, axis=1)
array([[ 60., 60.],
[100., 100.],
[ 80., 80.],
[120., 120.],
[160., 160.],
[ 80., 80.]])
See Also
--------
wma, ema, smstd
"""
return _sma(X, w)
def _sma(X, w):
if len(X.shape) == 2:
return _sma_2d(X, w)
return _sma_1d(X, w)
[docs]
@WrapperArray('dtype', 'axis', 'window')
def wma(X: NDArray, w: int | None = None, axis: int = 0, dtype=None) -> NDArray:
r""" Compute weighted moving average(s) of size `w` for each `X`' series.
.. math::
wma^w_t(X) = \frac{2}{w (w-1)} \sum^{w-1}_{i=0} (w-i) \times X_{t-i}
Parameters
----------
X : np.ndarray[dtype, ndim=1 or 2]
Elements to compute the moving average.
w : int, optional
Size of the lagged window of the moving average, must be positive. If
``w is None`` or ``w=0``, then ``w=X.shape[axis]``. Default is None.
axis : {0, 1}, optional
Axis along wich the computation is done. Default is 0.
dtype : np.dtype, optional
The type of the output array. If `dtype` is not given, infer the data
type from `X` input.
Returns
-------
np.ndarray[dtype, ndim=1 or 2]
Weighted moving average of each series.
Examples
--------
>>> X = np.array([60, 100, 80, 120, 160, 80])
>>> wma(X, w=3, dtype=np.float64)
array([ 60. , 86.66666667, 83.33333333, 103.33333333,
133.33333333, 113.33333333])
>>> X = X.reshape([6, 1])
>>> wma(X, w=3, dtype=np.float64).flatten()
array([ 60. , 86.66666667, 83.33333333, 103.33333333,
133.33333333, 113.33333333])
See Also
--------
sma, ema, wmstd
"""
return _wma(X, w)
def _wma(X, w):
if len(X.shape) == 2:
return _wma_2d(X, w)
return _wma_1d(X, w)
[docs]
@WrapperArray('dtype', 'axis')
def ema(X: NDArray, alpha: float = 0.94, w: int | None = None, axis: int = 0, dtype=None) -> NDArray:
r""" Compute exponential moving average(s) for each `X`' series.
Geometrically decaying weighted average that gives more importance
to recent observations. Reacts faster than :func:`sma` to regime
changes; smaller ``alpha`` (or smaller equivalent window ``w``)
increases reactivity at the cost of more noise. The recursive
formulation makes computation O(T) per series, with no need to
store the full window.
Either ``alpha`` (smoothing factor in ``[0, 1]``) or ``w`` (window
size mapped to ``alpha = 1 - 2 / (1 + w)``) can be specified.
.. math::
ema^{\apha}_t(X) = \alpha \times ema^{\alpha}_{t-1} + (1-\alpha)
\times X_t
Parameters
----------
X : np.ndarray[dtype, ndim=1 or 2]
Elements to compute the moving average.
alpha : float, optional
These coefficient represents the degree of weighting decrease, default
is 0.94 corresponding at 20 lags memory.
w : int, optional
Size of the lagged window of the moving average, must be strictly
positive. If ``w is None`` the window is ignored and the parameter
`alpha` is used. Default is None.
axis : {0, 1}, optional
Axis along wich the computation is done. Default is 0.
dtype : np.dtype, optional
The type of the output array. If `dtype` is not given, infer the data
type from `X` input.
Returns
-------
np.ndarray[dtype, ndim=1 or 2]
Exponential moving average of each series.
Examples
--------
>>> X = np.array([60, 100, 80, 120, 160, 80])
>>> ema(X, w=3, dtype=np.float64)
array([ 60., 80., 80., 100., 130., 105.])
>>> ema(X, alpha=0.5, dtype=np.float64)
array([ 60., 80., 80., 100., 130., 105.])
Notes
-----
If the lagged window `w` is specified :math:`\alpha` is overwritten by
:math:`\alpha = 1 - \frac{2}{1 + w}`
See Also
--------
sma, wma, emstd
"""
if w is None:
pass
elif w <= 0:
raise ValueError('lagged window of size {} is not available, \
must be greater than 0.'.format(w))
else:
alpha = 1 - 2 / (1 + w)
return _ema(X, alpha)
def _ema(X, alpha):
if len(X.shape) == 2:
return _ema_2d(X, float(alpha))
return _ema_1d(X, float(alpha))
# =========================================================================== #
# Moving Standard Deviation #
# =========================================================================== #
[docs]
@WrapperArray('dtype', 'axis', 'window', 'ddof')
def smstd(X: NDArray, w: int | None = None, ddof: int = 0, axis: int = 0, dtype=None) -> NDArray:
r""" Compute simple moving standard deviation(s) for each `X`' series'.
.. math::
smstd^w_t(X) = \sqrt{\frac{1}{w}\sum^{w-1}_{i=0} (X_{t-i} - sma^w_t)^2}
Parameters
----------
X : np.ndarray[dtype, ndim=1 or 2]
Elements to compute the moving standard deviation.
w : int, optional
Size of the lagged window of the moving average, must be positive. If
``w is None`` or ``w=0``, then ``w=X.shape[axis]``. Default is None.
ddof : int, optional
Means Delta Degrees of Freedom, the divisor used in calculations is
``w - ddof`` (must be strictly positive), where ``w`` represents the
number of elements in time axis. Default is 0.
axis : {0, 1}, optional
Axis along wich the computation is done. Default is 0.
dtype : np.dtype, optional
The type of the output array. If `dtype` is not given, infer the data
type from `X` input.
Returns
-------
np.ndarray[dtype, ndim=1 or 2]
Simple moving standard deviation of each series.
Examples
--------
>>> X = np.array([60, 100, 80, 120, 160, 80])
>>> smstd(X, w=3, dtype=np.float64)
array([ 0. , 20. , 16.32993162, 16.32993162, 32.65986324,
32.65986324])
>>> smstd(X.reshape([6, 1]), w=3, dtype=np.float64).flatten()
array([ 0. , 20. , 16.32993162, 16.32993162, 32.65986324,
32.65986324])
See Also
--------
sma, wmstd, emstd
"""
if ddof >= w: # type: ignore[operator]
raise ValueError(
'size of the lagged window (w={}) must be strictly greater than '
'degree of freedom (ddof={})'.format(w, ddof)
)
return _smstd(X, w, ddof=ddof)
def _smstd(X, w, ddof=0):
if len(X.shape) == 2:
return _smstd_2d(X, w, ddof)
return _smstd_1d(X, w, ddof)
[docs]
@WrapperArray('dtype', 'axis', 'window')
def wmstd(X: NDArray, w: int | None = None, axis: int = 0, dtype=None) -> NDArray:
r""" Compute weighted moving standard(s) deviation for each `X`' series'.
.. math::
wma^w_t(X) = \frac{2}{w (w-1)} \sum^{w-1}_{i=0} (w-i) \times X_{t-i} \\
wmstd^w_t(X) = \sqrt{\frac{2}{w(w-1)} \sum^{w-1}_{i=0}
(w-i) \times (X_{t-i} - wma^w_t(X))^2}
Parameters
----------
X : np.ndarray[dtype, ndim=1 or 2]
Elements to compute the moving standard deviation.
w : int, optional
Size of the lagged window of the moving average, must be positive. If
``w is None`` or ``w=0``, then ``w=X.shape[axis]``. Default is None.
axis : {0, 1}, optional
Axis along wich the computation is done. Default is 0.
dtype : np.dtype, optional
The type of the output array. If `dtype` is not given, infer the data
type from `X` input.
Returns
-------
np.ndarray[dtype, ndim=1 or 2]
Weighted moving standard deviation of each series.
Examples
--------
>>> X = np.array([60, 100, 80, 120, 160, 80])
>>> wmstd(X, w=3, dtype=np.float64)
array([ 0. , 18.85618083, 13.74368542, 17.95054936, 29.8142397 ,
35.90109871])
>>> wmstd(X.reshape([6, 1]), w=3, dtype=np.float64).flatten()
array([ 0. , 18.85618083, 13.74368542, 17.95054936, 29.8142397 ,
35.90109871])
See Also
--------
wma, smstd, emstd
"""
return _wmstd(X, w)
def _wmstd(X, w):
if len(X.shape) == 2:
return _wmstd_2d(X, w)
return _wmstd_1d(X, w)
[docs]
@WrapperArray('dtype', 'axis')
def emstd(X: NDArray, alpha: float = 0.94, w: int | None = None, axis: int = 0, dtype=None) -> NDArray:
r""" Compute exponential moving standard deviation(s) for each `X`' series.
.. math::
emstd^{\alpha}_t(X) = \sqrt{\alpha\times emstd^{\alpha}_{t-1}^2 +
(1-\alpha) \times X_t^2}
Parameters
----------
X : np.ndarray[dtype, ndim=1 or 2]
Elements to compute the moving standard deviation.
alpha : float, optional
These coefficient represents the degree of weighting decrease, default
is 0.94 corresponding at 20 lags memory.
w : int, optional
Size of the lagged window of the moving average, must be strictly
positive. If ``w is None`` the window is ignored and the parameter
`alpha` is used. Default is None.
axis : {0, 1}, optional
Axis along wich the computation is done. Default is 0.
dtype : np.dtype, optional
The type of the output array. If `dtype` is not given, infer the data
type from `X` input.
Returns
-------
np.ndarray[dtype, ndim=1 or 2]
Exponential moving standard deviation of each series.
Examples
--------
>>> X = np.array([60, 100, 80, 120, 160, 80])
>>> emstd(X, w=3, dtype=np.float64)
array([ 0. , 14.14213562, 10. , 15.8113883 , 23.97915762,
24.49489743])
>>> emstd(X.reshape([6, 1]), w=3, dtype=np.float64).flatten()
array([ 0. , 14.14213562, 10. , 15.8113883 , 23.97915762,
24.49489743])
>>> emstd(X, alpha=0.5, dtype=np.float64)
array([ 0. , 14.14213562, 10. , 15.8113883 , 23.97915762,
24.49489743])
Notes
-----
If the lagged window `w` is specified :math:`\alpha` is overwritten by
:math:`\alpha = 1 - \frac{2}{1 + w}`
See Also
--------
ema, smstd, wmstd
"""
if w is None:
pass
elif w <= 0:
raise ValueError('lagged window of size {} is not available, \
must be greater than 0.'.format(w))
else:
alpha = 1 - 2 / (1 + w)
return _emstd(X, alpha)
def _emstd(X, alpha):
if len(X.shape) == 2:
return _emstd_2d(X, float(alpha))
return _emstd_1d(X, float(alpha))
if __name__ == '__main__':
import doctest
doctest.testmod()