Source code for fynance.algorithms.allocation

#!/usr/bin/env python3
# coding: utf-8
# @Author: ArthurBernard
# @Email: arthur.bernard.92@gmail.com
# @Date: 2019-09-12 14:52:08
# @Last modified by: ArthurBernard
# @Last modified time: 2019-11-05 17:20:52

""" Algorithms of portfolio allocation.

Risk-based and mean-variance methods that compute portfolio weights
from a sample of asset returns. Each function takes a 2-D array where
columns are asset price/return series and returns the optimal weights
as a 1-D array summing to one.

A walk-forward wrapper, :func:`rolling_allocation`, applies any of
these methods on a rolling training window — useful for backtesting
allocation strategies without lookahead bias.

Main entry points
-----------------
- :func:`ERC` — Equal Risk Contribution (risk-parity).
- :func:`HRP` — Hierarchical Risk Parity.
- :func:`IVP` — Inverse Variance Portfolio.
- :func:`MDP` — Maximum Diversified Portfolio.
- :func:`MVP`, :func:`MVP_uc` — Minimum Variance Portfolio (constrained
  / unconstrained).
- :func:`rolling_allocation` — walk-forward wrapper.

"""

from __future__ import annotations

# Built-in packages
from typing import Callable

# Third-party
import numpy as np
import pandas as pd
import scipy.cluster.hierarchy as sch
from numpy.typing import NDArray
from scipy.optimize import Bounds, LinearConstraint, minimize

# Local packages
from fynance.features.metrics import diversified_ratio

from .rolling import _RollingMechanism

# TODO : cython

__all__ = ['ERC', 'HRP', 'IVP', 'MDP', 'MVP', 'MVP_uc', 'rolling_allocation']


# =========================================================================== #
#                         Equal Risk Contribution                             #
# =========================================================================== #


[docs] def ERC( X: NDArray[np.float64], w0: NDArray[np.float64] | None = None, up_bound: float = 1., low_bound: float = 0., ) -> NDArray[np.float64]: r""" Get weights of Equal Risk Contribution portfolio allocation. Risk-parity allocation: each asset contributes the same amount of total portfolio variance. ERC sits between the naive 1/N and the minimum-variance portfolio — it requires only the covariance matrix and is robust to noisy expected returns, which makes it a common choice when return forecasts are unreliable. The optimizer (SLSQP) minimizes a smooth surrogate of the risk-contribution dispersion under sum-to-one and box constraints. Notes ----- Weights of Equal Risk Contribution, as described by S. Maillard, T. Roncalli and J. Teiletche [1]_, verify the following problem: .. math:: w = \text{arg min } f(w) \\ u.c. \begin{cases}w'e = 1 \\ 0 \leq w_i \leq 1 \\ \end{cases} With: .. math:: f(w) = N \sum_{i=1}^{N}w_i^2 (\Omega w)_i^2 - \sum_{i,j=1}^{N} w_i w_j (\Omega w)_i (\Omega w)_j Where :math:`\Omega` is the variance-covariance matrix of `X` and :math:`N` the number of assets. Parameters ---------- X : array_like Each column is a series of price or return's asset. w0 : array_like, optional Initial weights to maximize. up_bound, low_bound : float, optional Respectively maximum and minimum values of weights, such that low_bound :math:`\leq w_i \leq` up_bound :math:`\forall i`. Default is 0 and 1. Returns ------- array_like Weights that minimize the Equal Risk Contribution portfolio. References ---------- .. [1] http://thierry-roncalli.com/download/erc-slides.pdf """ T, N = X.shape SIGMA = np.cov(X, rowvar=False) up_bound = max(up_bound, 1 / N) def f_ERC(w): w = w.reshape([N, 1]) arg = N * np.sum(w ** 2 * (SIGMA @ w) ** 2) return arg - np.sum(w * (SIGMA @ w) * np.sum(w * (SIGMA @ w))) # Set inital weights if w0 is None: w0 = np.ones([N]) / N const_sum = LinearConstraint(np.ones([1, N]), [1], [1]) const_ind = Bounds(low_bound * np.ones([N]), up_bound * np.ones([N])) result = minimize( f_ERC, w0, method='SLSQP', constraints=[const_sum], bounds=const_ind ) return result.x.reshape([N, 1])
# =========================================================================== # # HRP developed by Marcos Lopez de Prado # # =========================================================================== # def _get_quasi_diag(link: NDArray[np.float64]) -> list[int]: """ Compute quasi diagonal matrix. TODO : verify the efficiency Parameter --------- link: list of N lists Linkage matrix, N list (cluster) of 4-tuple such that the two first elements are the costituents, third report the distance between the two first, and fourth is the number of element (<= N) in this cluster. Returns ------- sortIx: list Sorted list of items. """ link = link.astype(int) numItems = link[-1, 3] # number of original leaf items # Use a plain list to avoid pandas dtype coercion issues items = [int(link[-1, 0]), int(link[-1, 1])] while max(items) >= numItems: expanded = [] for item in items: if item >= numItems: cluster_idx = item - numItems expanded.append(int(link[cluster_idx, 0])) expanded.append(int(link[cluster_idx, 1])) else: expanded.append(item) items = expanded return items def _get_rec_bisec(mat_cov: NDArray[np.float64], sortIx: list[int]) -> NDArray[np.float64]: """ Compute weights via recursive bisection. Parameters ---------- mat_cov: np.ndarray Matrix variance-covariance (N x N). sortIx: list or np.ndarray of int Sorted list of asset indices (0..N-1). Returns ------- np.ndarray Weight vector of shape (N,) indexed by sortIx order. """ n = len(sortIx) w = np.ones(n) cItems = [list(range(n))] while len(cItems) > 0: cItems = [i[j: k] for i in cItems for j, k in ( (0, int(len(i) / 2)), (int(len(i) / 2), len(i)) ) if len(i) > 1] for i in range(0, len(cItems), 2): cItems0_idx = cItems[i] cItems1_idx = cItems[i + 1] cItems0 = [sortIx[j] for j in cItems0_idx] cItems1 = [sortIx[j] for j in cItems1_idx] cVar0 = _get_cluster(mat_cov, cItems0) cVar1 = _get_cluster(mat_cov, cItems1) alpha = 1 - cVar0 / (cVar0 + cVar1) w[cItems0_idx] *= alpha w[cItems1_idx] *= 1 - alpha return w def _get_cluster(mat_cov: NDArray[np.float64], cItems: list[int]) -> float: """ Compute cluster for variance. Parameters ---------- mat_cov: np.ndarray Covariance matrix. cItems: list or np.ndarray of int Cluster asset indices. Returns ------- cVar: float Cluster variance """ cov_ = mat_cov[np.ix_(cItems, cItems)] w_ = _get_IVP(cov_).reshape(-1, 1) cVar = (w_.T @ cov_) @ w_ return float(cVar.item()) def _get_IVP(mat_cov: NDArray[np.float64]) -> NDArray[np.float64]: """ Compute the inverse-variance matrix. Parameters ---------- mat_cov : NDArray[np.float64] Variance-covariance matrix. Returns ------- NDArray[np.float64] Inverse-variance weights. """ ivp = 1. / np.diag(mat_cov) ivp /= np.sum(ivp) return ivp
[docs] def HRP( X: NDArray[np.float64], method: str = 'single', metric: str = 'euclidean', low_bound: float = 0., up_bound: float = 1.0, ) -> NDArray[np.float64]: r""" Get weights of the Hierarchical Risk Parity allocation. Cluster-based allocation that avoids inverting the full covariance matrix. Compared with classical Markowitz solutions, HRP is far more stable when ``N`` is large or assets are highly correlated, because it groups similar assets first and only allocates risk *within* and *between* clusters. Three steps: (1) build a correlation distance and run hierarchical linkage, (2) reorder the covariance matrix into quasi-diagonal form, (3) recursively bisect the ordered tree, allocating weights by inverse variance to each branch. Notes ----- Hierarchical Risk Parity algorithm is developed by Marco Lopez de Prado [2]_. First step is clustering and second step is allocating weights. Parameters ---------- X : array_like Each column is a price or return's asset series. Some errors will happen if one or more series are constant. method, metric: str Parameters for linkage algorithm, default ``method='single'`` and ``metric='euclidean'``. low_bound, up_bound : float Respectively minimum and maximum value of weights, such that low_bound :math:`\leq w_i \leq` up_bound :math:`\forall i`. Default is 0 and 1. Returns ------- np.ndarray Vector of weights computed by HRP algorithm. References ---------- .. [2] https://ssrn.com/abstract=2708678 """ X = np.asarray(X, dtype=np.float64) T, N = X.shape up_bound = max(up_bound, 1.0 / N) low_bound = min(low_bound, 1.0 / N) mat_cov = np.cov(X, rowvar=False) diag_cov = np.sqrt(np.diag(mat_cov)) outer_diag = np.outer(diag_cov, diag_cov) with np.errstate(invalid='ignore', divide='ignore'): mat_corr = np.divide(mat_cov, outer_diag) mat_corr = np.clip(np.nan_to_num(mat_corr, nan=0.0, posinf=0.0, neginf=0.0), -1.0, 1.0) mat_dist = ((1.0 - mat_corr) / 2.0) ** 0.5 mat_dist_upper = mat_dist[np.triu_indices(N, k=1)] link = sch.linkage(mat_dist_upper, method=method, metric=metric) sortIx = _get_quasi_diag(link) w_sorted = _get_rec_bisec(mat_cov, sortIx) w = np.empty(N) for i, col_idx in enumerate(sortIx): w[col_idx] = w_sorted[i] return _normalize(w.reshape([N, 1]), up_bound=up_bound, low_bound=low_bound)
# =========================================================================== # # Inverse Variance Portfolio # # =========================================================================== #
[docs] def IVP( X: NDArray[np.float64], normalize: bool = False, low_bound: float = 0., up_bound: float = 1.0, ) -> NDArray[np.float64]: r""" Get weights of the Inverse Variance Portfolio allocation. Notes ----- w are computed by the inverse of the asset's variance [3]_ such that: .. math:: w_i = \frac{1}{\sigma_k^2} (\sum_{i} \frac{1}{\sigma_i^2})^{-1} With :math:`\sigma_i^2` is the variance of asset i. Parameters ---------- X : array_like Each column is a price or return's asset series. normalize : bool, optional If True normalize the weights such that :math:`\sum_{i=1}^{N} w_i = 1` and :math:`0 \leq w_i \leq 1`. Default is False. low_bound, up_bound : float, optional Respectively minimum and maximum values of weights, such that low_bound :math:`\leq w_i \leq` up_bound :math:`\forall i`. Default is 0 and 1. Returns ------- np.ndarray Vector of weights computed by the IVP algorithm. References ---------- .. [3] https://en.wikipedia.org/wiki/Inverse-variance_weighting """ mat_cov = np.cov(X, rowvar=False) w = _get_IVP(mat_cov) up_bound = max(up_bound, 1 / X.shape[1]) low_bound = min(low_bound, 1 / X.shape[1]) if normalize: w = w - np.min(w) w = w / np.sum(w) # return w.reshape([mat_cov.shape[0], 1]) w = _normalize(w, up_bound=up_bound, low_bound=low_bound) # w = w * (up_bound - low_bound) + low_bound return w.reshape([mat_cov.shape[0], 1])
# =========================================================================== # # Minimum Variance Portfolio # # =========================================================================== #
[docs] def MVP( X: NDArray[np.float64], normalize: bool = False, ) -> NDArray[np.float64]: r""" Get weights of the Minimum Variance Portfolio allocation. Closed-form Markowitz allocation that minimizes the portfolio variance subject only to a sum-to-one constraint. Weights are not constrained to be positive — short positions are allowed and the weights returned by the analytical formula can be negative or larger than one. Use :func:`MVP_uc` for a constrained variant. The covariance matrix must be invertible; pseudo-inverse is used as a fallback when ``X`` has linearly dependent columns. Notes ----- The vector of weights noted :math:`w` that minimize the portfolio variance [4]_ is define as below: .. math:: w = \frac{\Omega^{-1} e}{e' \Omega^{-1} e} \\ .. math:: \text{With } \sum_{i=1}^{N} w_i = 1 Where :math:`\Omega` is the asset's variance-covariance matrix and :math:`e` is a vector of ones. Parameters ---------- X : array_like Each column is a time-series of price or return's asset. normalize : boolean, optional If True normalize the weigths such that :math:`0 \leq w_i \leq 1` and :math:`\sum_{i=1}^{N} w_i = 1`, :math:`\forall i`. Default is False. Returns ------- array_like Vector of weights to apply to the assets. References ---------- .. [4] https://breakingdownfinance.com/finance-topics/modern-portfolio-theory/minimum-variance-portfolio/ See Also -------- HRP """ mat_cov = np.cov(X, rowvar=False) # Inverse variance matrix try: iv = np.linalg.inv(mat_cov) except np.linalg.LinAlgError: try: iv = np.linalg.pinv(mat_cov) except np.linalg.LinAlgError: print(mat_cov) raise np.linalg.LinAlgError e = np.ones([iv.shape[0], 1]) w = (iv @ e) / (e.T @ iv @ e) if normalize: w = w - np.min(w) return w / np.sum(w) return w
[docs] def MVP_uc( X: NDArray[np.float64], w0: NDArray[np.float64] | None = None, up_bound: float = 1., low_bound: float = 0., ) -> NDArray[np.float64]: r""" Get weights of the Minimum Variance Portfolio under constraints. Numerical (SLSQP) Markowitz allocation that minimizes portfolio variance subject to box constraints on each weight, in addition to the sum-to-one constraint. Use this variant whenever short selling must be excluded or per-asset caps need to be enforced; use :func:`MVP` for the unconstrained closed-form solution. Notes ----- Weights of Minimum Variance Portfolio verify the following problem: .. math:: w = \text{arg min } w' \Omega w \\ u.c. \begin{cases}w'e = 1 \\ 0 \leq w_i \leq 1 \\ \end{cases} Where :math:`\Omega` is the variance-covariance matrix of `X` and :math:`e` a vector of ones. Parameters ---------- X : array_like Each column is a series of price or return's asset. w0 : array_like, optional Initial weights to maximize. up_bound, low_bound : float, optional Respectively maximum and minimum values of weights, such that low_bound :math:`\leq w_i \leq` up_bound :math:`\forall i`. Default is 0 and 1. Returns ------- array_like Weights that minimize the variance of the portfolio. """ mat_cov = np.cov(X, rowvar=False) N = X.shape[1] up_bound = max(up_bound, 1 / N) def f_MVP(w): w = w.reshape([N, 1]) return w.T @ mat_cov @ w # Set inital weights if w0 is None: w0 = np.ones([N]) / N # Set constraints and minimze const_sum = LinearConstraint(np.ones([1, N]), [1], [1]) const_ind = Bounds(low_bound * np.ones([N]), up_bound * np.ones([N])) result = minimize( f_MVP, w0, method='SLSQP', constraints=[const_sum], bounds=const_ind ) return result.x.reshape([N, 1])
# =========================================================================== # # Maximum Diversification Portfolio developed by Choueifaty and Coignard # # =========================================================================== #
[docs] def MDP( X: NDArray[np.float64], w0: NDArray[np.float64] | None = None, up_bound: float = 1., low_bound: float = 0., ) -> NDArray[np.float64]: r""" Get weights of Maximum Diversified Portfolio allocation. Notes ----- Weights of Maximum Diversification Portfolio, as described by Y. Choueifaty and Y. Coignard [5]_, verify the following problem: .. math:: w = \text{arg max } D(w) \\ u.c. \begin{cases}w'e = 1 \\ 0 \leq w_i \leq 1 \\ \end{cases} Where :math:`D(w)` is the diversified ratio of portfolio weighted by `w`. Parameters ---------- X : array_like Each column is a series of price or return's asset. w0 : array_like, optional Initial weights to maximize. up_bound, low_bound : float, optional Respectively maximum and minimum values of weights, such that low_bound :math:`\leq w_i \leq` up_bound :math:`\forall i`. Default is 0 and 1. Returns ------- array_like Weights that maximize the diversified ratio of the portfolio. See Also -------- diversified_ratio References ---------- .. [5] `Choueifaty, Y., and Coignard, Y., 2008, Toward Maximum Diversification. <https://www.tobam.fr/wp-content/uploads/2014/12/ TOBAM-JoPM-Maximum-Div-2008.pdf>`_ """ T, N = X.shape up_bound = max(up_bound, 1 / N) # Set function to minimze def f_max_divers_weights(w): return - diversified_ratio(X, W=w).flatten() # Set inital weights if w0 is None: w0 = np.ones([N]) / N # Set constraints and minimze const_sum = LinearConstraint(np.ones([1, N]), [1], [1]) const_ind = Bounds(low_bound * np.ones([N]), up_bound * np.ones([N])) result = minimize( f_max_divers_weights, w0, method='SLSQP', constraints=[const_sum], bounds=const_ind ) return result.x.reshape([N, 1])
# =========================================================================== # # Rolling allocation # # =========================================================================== #
[docs] def rolling_allocation( f: Callable[..., NDArray[np.float64]], X: NDArray[np.float64], n: int = 252, s: int = 63, ret: bool = True, drift: bool = True, **kwargs, ) -> NDArray[np.float64]: r""" Roll an algorithm of portfolio allocation. Generic walk-forward backtester for any allocation function ``f`` (e.g. :func:`ERC`, :func:`HRP`, :func:`MVP`). At each step the weights are estimated on a training window of length ``n`` and held for the next ``s`` periods. By construction this respects strict temporal ordering — no future data leaks into the weights — which is the same no-lookahead pattern used in :class:`fynance.models.rolling._RollingBasis` for ML models. Assets that are constant on more than 50% of the training window are dropped from that step's allocation; the remaining weight mass is redistributed across the active assets. Notes ----- Weights are computed on the past data from ``t - n`` to ``t`` and are applied to backtest on data from ``t`` to ``t + s``. .. math:: \forall t \in [n, T], w_{t:t+s} = f(X_{t-n:t}) Parameters ---------- f : callable Allocation algorithm that take as parameters a subarray of ``X`` and ``**kwargs``, and return a vector (as ``np.ndarray``) of weights. X : array_like Data matrix, each columns is a series of prices, indexes or performances, each row is a observation at time ``t``. n, s : int Respectively the number of observations to compute weights and the number of observations to roll. Default is ``n=252`` and ``s=63``. ret : bool, optional If True (default) pass to ``f`` the returns of ``X``. Otherwise pass ``X`` to ``f``. drift : bool, optional If False performance of the portfolio is computed as if we rebalance the weights of asset at each timeframe. Otherwise we let to drift the weights. Default is True. **kwargs Any keyword arguments to pass to ``f``. Returns ------- pd.Series Performance of the portfolio allocated following ``f`` algorithm. pd.DataFrame Weights of the portfolio allocated following ``f`` algorithm. """ X = pd.DataFrame(X).ffill() idx = X.index w_mat = pd.DataFrame(index=idx, columns=X.columns) portfolio = pd.Series(100., index=idx, name='portfolio') if ret: X_ = X.pct_change() else: X_ = X roll = _RollingMechanism(idx, n=n, s=s) def process(series): # True if less than 50% of obs. are constant return series.value_counts(dropna=False).max() < 0.5 * n for slice_n, slice_s in roll(): # Select X sub_X = X_.loc[slice_n].copy() assets = list(X.columns[sub_X.apply(process)]) sub_X = sub_X.bfill() # Compute weights if len(assets) == 1: w = np.array([[1.]]) else: w = f(sub_X.loc[:, assets].values, **kwargs) w_mat.loc[roll.d, assets] = w.flatten() w_mat.loc[roll.d, :] = w_mat.loc[roll.d, :].fillna(0.) # Compute portfolio performance perf = _perf_alloc( X.loc[slice_s, assets].bfill().values, w=w, drift=drift ) portfolio.loc[slice_s] = portfolio.loc[roll.d] * perf.flatten() w_mat = w_mat.ffill().fillna(0.) return portfolio, w_mat
# =========================================================================== # # Tools # # =========================================================================== # def _perf_alloc(X: NDArray[np.float64], w: NDArray[np.float64], drift: bool = True) -> NDArray[np.float64]: # Compute portfolio performance following specified weights if w.ndim == 1 and not isinstance(w, pd.Series): w = w.reshape([w.size, 1]) if drift: return (X / X[0, :]) @ w perf = np.zeros(X.shape) perf[1:] = (X[1:] / X[:-1] - 1) return np.cumprod(perf @ w + 1) def _normalize(w: NDArray[np.float64], low_bound: float = 0., up_bound: float = 1., sum_w: float = 1., max_iter: int = 1000) -> NDArray[np.float64]: # Iterative algorithm to set bounds if up_bound < sum_w / w.size or low_bound > sum_w / w.size: raise ValueError('Low or up bound exceeded sum weight constraint.') j = 0 while (min(w) < low_bound or max(w) > up_bound) and j < max_iter: for i in range(w.size): w[i] = min(w[i], up_bound) w[i] = max(w[i], low_bound) w = sum_w * (w / sum(w)) j += 1 if j >= max_iter: print('Iterative normalize algorithm exceeded max iterations') return w