import warnings
import numpy as np
from sklearn.cluster import KMeans, AgglomerativeClustering
from sklearn.exceptions import ConvergenceWarning
from scipy.optimize import minimize, OptimizeWarning
from skgstat.util import shannon_entropy
[docs]
def even_width_lags(distances, n, maxlag):
"""Even lag edges
Calculate the lag edges for a given amount of bins using the same lag
step width for all bins.
.. versionchanged:: 0.3.8
Function returns `None` as second value to indicate that
The number of lag classes was not changed
Parameters
----------
distances : numpy.array
Flat numpy array representing the upper triangle of
the distance matrix.
n : integer
Amount of lag classes to find
maxlag : integer, float
Limit the last lag class to this separating distance.
Returns
-------
bin_edges : numpy.ndarray
The **upper** bin edges of the lag classes
"""
# maxlags larger than the maximum separating distance will be ignored
if maxlag is None or maxlag > np.nanmax(distances):
maxlag = np.nanmax(distances)
return np.linspace(0, maxlag, n + 1)[1:], None
[docs]
def auto_derived_lags(distances, method_name, maxlag):
"""Derive bins automatically
.. versionadded:: 0.3.8
Uses `histogram_bin_edges <numpy.histogram_bin_edges>` to derive the
lag classes automatically. Supports any method supported by
`histogram_bin_edges <numpy.histogram_bin_edges>`. It is recommended
to use `'sturges'`, `'doane'` or `'fd'`.
Parameters
----------
distances : numpy.array
Flat numpy array representing the upper triangle of
the distance matrix.
maxlag : integer, float
Limit the last lag class to this separating distance.
method_name : str
Any method supported by
`histogram_bin_edges <numpy.histogram_bin_edges>`
Returns
-------
bin_edges : numpy.ndarray
The **upper** bin edges of the lag classes
See Also
--------
numpy.histogram_bin_edges
"""
# maxlags larger than maximum separating distance will be ignored
if maxlag is None or maxlag > np.nanmax(distances):
maxlag = np.nanmax(distances)
# filter for distances < maxlag
d = distances[np.where(distances <= maxlag)]
# calculate the edges
edges = np.histogram_bin_edges(d, bins=method_name)[1:]
return edges, len(edges)
[docs]
def kmeans(distances, n, maxlag, binning_random_state=42, **kwargs):
"""
.. versionadded:: 0.3.9
Clustering of pairwise separating distances between locations up to
maxlag. The lag class edges are formed equidistant from each cluster
center. Note: this does not necessarily result in equidistance lag classes.
Parameters
----------
distances : numpy.array
Flat numpy array representing the upper triangle of
the distance matrix.
n : integer
Amount of lag classes to find
maxlag : integer, float
Limit the last lag class to this separating distance.
Returns
-------
bin_edges : numpy.ndarray
The **upper** bin edges of the lag classes
See Also
--------
sklearn.cluster.KMeans
Note
----
The :class:`KMeans <sklearn.cluster.KMeans>` that is used under the hood is not
a deterministic algorithm, as the starting cluster centroids are seeded
randomly. This can yield slightly different results on reach run.
Thus, for this application, the random_state on KMeans is fixed to a
specific value. You can change the seed by passing another seed to
:class:`Variogram <skgstat.Variogram>` as `binning_random_state`.
.. versionchanged:: 1.0.9
KMeans is now initialized as ``KMeans(n_init=10)`` as this default value
will change in SciKit-Learn 1.4.
"""
# maxlags larger than maximum separating distance will be ignored
if maxlag is None or maxlag > np.nanmax(distances):
maxlag = np.nanmax(distances)
# filter for distances < maxlag
d = distances[np.where(distances <= maxlag)]
# filter the sklearn convervence warning, because working with
# undefined state in binning does not make any sense
with warnings.catch_warnings():
warnings.filterwarnings('error')
# cluster the filtered distances
try:
km = KMeans(n_clusters=n, random_state=binning_random_state, n_init=10).fit(d.reshape(-1, 1))
except ConvergenceWarning:
raise ValueError("KMeans failed to converge. Maybe you need to use a different n_lags.")
# get the centers
_centers = np.sort(km.cluster_centers_.flatten())
# build the upper edges
bounds = zip([0] + list(_centers)[:-1], _centers)
edges = np.fromiter(((low + up) / 2 for low, up in bounds), dtype=float)
return edges, None
[docs]
def ward(distances, n, maxlag, **kwargs):
"""
.. versionadded:: 0.3.9
Clustering of pairwise separating distances between locations up to
maxlag. The lag class edges are formed equidistant from each cluster
center. Note: this does not necessarily result in equidistance lag classes.
The clustering is done by merging pairs of clusters that minimize the
variance for the merged clusters, until `n` clusters are found.
Parameters
----------
distances : numpy.array
Flat numpy array representing the upper triangle of
the distance matrix.
n : integer
Amount of lag classes to find
maxlag : integer, float
Limit the last lag class to this separating distance.
Returns
-------
bin_edges : numpy.ndarray
The **upper** bin edges of the lag classes
See Also
--------
sklearn.clsuter.AgglomerativeClustering
"""
# maxlags larger than maximum separating distance will be ignored
if maxlag is None or maxlag > np.nanmax(distances):
maxlag = np.nanmax(distances)
# filter for distances < maxlag
d = distances[np.where(distances <= maxlag)]
# cluster the filtered distances
w = AgglomerativeClustering(linkage='ward', n_clusters=n).fit(d.reshape(-1, 1))
# get the aggregation function
if kwargs.get('binning_agg_func', False) == 'median':
agg = np.median
else:
agg = np.mean
# get the centers
_centers = np.sort([agg(d[np.where(w.labels_ == i)[0]]) for i in np.unique(w.labels_)])
# build the upper edges
bounds = zip([0] + list(_centers)[:-1], _centers)
edges = np.fromiter(((low + up) / 2 for low, up in bounds), dtype=float)
return edges, None
[docs]
def stable_entropy_lags(distances, n, maxlag, **kwargs):
"""
.. versionadded: 0.4.0
Optimizes the lag class edges for `n` lag classes.
The algorithm minimizes the difference between Shannon
Entropy for each lag class. Consequently, the final
lag classes should be of comparable uncertainty.
Parameters
----------
distances : numpy.array
Flat numpy array representing the upper triangle of
the distance matrix.
n : integer
Amount of lag classes to find
maxlag : integer, float
Limit the last lag class to this separating distance.
Keyword Arguments
-----------------
binning_maxiter : int
Maximum iterations before the optimization is stopped,
if the lag edges do not converge.
binning_entropy_bins : int, str
Binning method for calculating the shannon entropy
on each iteration.
Returns
-------
bin_edges : numpy.ndarray
The **upper** bin edges of the lag classes
"""
# maxlags larger than maximum separating distance will be ignored
if maxlag is None or maxlag > np.nanmax(distances):
maxlag = np.nanmax(distances)
# filter for distances < maxlag
d = distances[np.where(distances <= maxlag)]
# create a global binning and initial guess
bins = np.histogram_bin_edges(d, bins=kwargs.get('binning_entropy_bins', 'sqrt'))
initial_guess = np.linspace(0, np.nanmax(d), n + 1)[1:]
# define the loss function
def loss(edges):
# get the shannon entropy for the current binning
h = np.ones(len(edges) - 1) * 9999
for i, bnd in enumerate(zip(edges, edges[1:])):
l, u = bnd
x = d[np.where((d >= l) & (d < u))[0]]
if len(x) == 0:
continue
else:
h[i] = shannon_entropy(x, bins)
# return the absolute differences between the bins
return np.sum(np.abs(np.diff(h)))
# minimize the loss function
opt = dict(maxiter=kwargs.get('binning_maxiter', 5000))
res = minimize(loss, initial_guess, method='Nelder-Mead', options=opt)
if res.success:
return res.x, None
else: # pragma: no cover
raise OptimizeWarning("Failed to find optimal lag classes.")