from typing import Tuple, List
from scipy.spatial.distance import pdist, cdist, squareform
from scipy.spatial import cKDTree
from scipy import sparse
import numpy as np
import multiprocessing as mp
def _sparse_dok_get(m, fill_value=np.nan):
"""Like m.toarray(), but setting empty values to `fill_value`, by
default `np.nan`, rather than 0.0.
Parameters
----------
m : scipy.sparse.dok_matrix
fill_value : float
"""
mm = np.full(m.shape, fill_value)
for (x, y), value in m.items():
mm[x, y] = value
return mm
class DistanceMethods(object):
def find_closest(self, idx, max_dist=None, N=None):
"""find neighbors
Find the (N) closest points (in the right set) to the point with
index idx (in the left set).
Parameters
----------
idx : int
Index of the point that the N closest neighbors
are searched for.
max_dist : float
Maximum distance at which other points are searched
N : int
Number of points searched.
Returns
-------
ridx : numpy.ndarray
Indices of the N closeset points to idx
"""
if max_dist is None:
max_dist = self.max_dist
else:
if self.max_dist is not None and max_dist != self.max_dist:
raise AttributeError("max_dist specified and max_dist != self.max_dist")
if isinstance(self.dists, sparse.spmatrix):
dists = self.dists.getrow(idx)
else:
dists = self.dists[idx, :]
if isinstance(dists, sparse.spmatrix):
ridx = np.array([k[1] for k in dists.todok().keys()])
elif max_dist is not None:
ridx = np.where(dists <= max_dist)[0]
else:
ridx = np.arange(len(dists))
if ridx.size > N:
if isinstance(dists, sparse.spmatrix):
selected_dists = dists[0, ridx].toarray()[0, :]
else:
selected_dists = dists[ridx]
sorted_ridx = np.argsort(selected_dists, kind="stable")
ridx = ridx[sorted_ridx][:N]
return ridx
[docs]
class MetricSpace(DistanceMethods):
"""
A MetricSpace represents a point cloud together with a distance
metric and possibly a maximum distance. It efficiently provides
the distances between each point pair (when shorter than the
maximum distance).
Note: If a max_dist is specified a sparse matrix representation is
used for the distances, which saves space and calculation time for
large datasets, especially where max_dist << the size of the point
cloud in space. However, it slows things down for small datasets.
"""
[docs]
def __init__(self, coords, dist_metric="euclidean", max_dist=None, dist_metric_kwargs={}):
"""ProbabalisticMetricSpace class
Parameters
----------
coords : numpy.ndarray
Coordinate array of shape (Npoints, Ndim)
dist_metric : str
Distance metric names as used by scipy.spatial.distance.pdist
max_dist : float
Maximum distance between points after which the distance
is considered infinite and not calculated.
"""
self.coords = coords.copy()
self.dist_metric = dist_metric
self.max_dist = max_dist
self._tree = None
self._dists = None
self.dist_metric_kwargs = dist_metric_kwargs
# Check if self.dist_metric is valid
try:
if self.dist_metric == "mahalanobis":
_ = pdist(
self.coords[: self.coords.shape[1] + 1, :], metric=self.dist_metric
)
else:
pdist(self.coords[:1, :], metric=self.dist_metric, **self.dist_metric_kwargs)
except ValueError as e:
raise e
@property
def tree(self):
"""If `self.dist_metric` is `euclidean`, a `scipy.spatial.cKDTree`
instance of `self.coords`. Undefined otherwise."""
# only Euclidean supported
if self.dist_metric != "euclidean":
raise ValueError(
("A coordinate tree can only be constructed " "for an euclidean space")
)
# if not cached - calculate
if self._tree is None:
self._tree = cKDTree(self.coords)
# return
return self._tree
@property
def dists(self):
"""A distance matrix of all point pairs. If `self.max_dist` is
not `None` and `self.dist_metric` is set to `euclidean`, a
`scipy.sparse.csr_matrix` sparse matrix is returned.
"""
# calculate if not cached
if self._dists is None:
# check if max dist is given
if self.max_dist is not None and self.dist_metric == "euclidean":
self._dists = self.tree.sparse_distance_matrix(
self.tree, self.max_dist, output_type="coo_matrix"
).tocsr()
# otherwise use pdist
else:
self._dists = squareform(
pdist(self.coords, metric=self.dist_metric, **self.dist_metric_kwargs)
)
# return
return self._dists
[docs]
def diagonal(self, idx=None):
"""
Return a diagonal matrix (as per
:func:`squareform <scipy.spatial.distance.squareform>`),
optionally for a subset of the points
Parameters
----------
idx : list
list of indices that the diagonal matrix is calculated for.
Returns
-------
diagonal : numpy.ndarray
squareform matrix of the subset of coordinates
"""
# get the dists
dist_mat = self.dists
# subset dists if requested
if idx is not None:
dist_mat = dist_mat[idx, :][:, idx]
# handle sparse matrix
if isinstance(self.dists, sparse.spmatrix):
dist_mat = _sparse_dok_get(dist_mat.todok(), np.inf)
np.fill_diagonal(dist_mat, 0) # Normally set to inf
return squareform(dist_mat)
def __len__(self):
return len(self.coords)
[docs]
class MetricSpacePair(DistanceMethods):
"""
A MetricSpacePair represents a set of point clouds (MetricSpaces).
It efficiently provides the distances between each point in one
point cloud and each point in the other point cloud (when shorter
than the maximum distance). The two point clouds are required to
have the same distance metric as well as maximum distance.
"""
[docs]
def __init__(self, ms1, ms2):
"""
Parameters
----------
ms1 : MetricSpace
ms2 : MetricSpace
Note: `ms1` and `ms2` need to have the same `max_dist` and
`distance_metric`.
"""
# check input data
# same distance metrix
if ms1.dist_metric != ms2.dist_metric:
raise ValueError("Both MetricSpaces need to have the same distance metric")
# same max_dist setting
if ms1.max_dist != ms2.max_dist:
raise ValueError("Both MetricSpaces need to have the same max_dist")
self.ms1 = ms1
self.ms2 = ms2
self._dists = None
@property
def dist_metric(self):
return self.ms1.dist_metric
@property
def max_dist(self):
return self.ms1.max_dist
@property
def dists(self):
"""A distance matrix of all point pairs. If `self.max_dist` is
not `None` and `self.dist_metric` is set to `euclidean`, a
`scipy.sparse.csr_matrix` sparse matrix is returned.
"""
# if not cached, calculate
if self._dists is None:
# handle euclidean with max_dist with Tree
if self.max_dist is not None and self.dist_metric == "euclidean":
self._dists = self.ms1.tree.sparse_distance_matrix(
self.ms2.tree, self.max_dist, output_type="coo_matrix"
).tocsr()
# otherwise Tree not possible
else:
self._dists = cdist(
self.ms1.coords,
self.ms2.coords,
metric=self.ms1.dist_metric,
**self.ms1.dist_metric_kwargs
)
# return
return self._dists
class ProbabalisticMetricSpace(MetricSpace):
"""Like MetricSpace but samples the distance pairs only returning a
`samples` sized subset. `samples` can either be a fraction of
the total number of pairs (float < 1), or an integer count.
"""
def __init__(
self, coords, dist_metric="euclidean", max_dist=None, samples=0.5, rnd=None
):
"""ProbabalisticMetricSpace class
Parameters
----------
coords : numpy.ndarray
Coordinate array of shape (Npoints, Ndim)
dist_metric : str
Distance metric names as used by scipy.spatial.distance.pdist
max_dist : float
Maximum distance between points after which the distance
is considered infinite and not calculated.
samples : float, int
Number of samples (int) or fraction of coords to sample (float < 1).
rnd : numpy.random.RandomState, int
Random state to use for the sampling.
"""
self.coords = coords.copy()
self.dist_metric = dist_metric
self.max_dist = max_dist
self.samples = samples
if rnd is None:
self.rnd = np.random
elif isinstance(rnd, np.random.RandomState):
self.rnd = rnd
else:
self.rnd = np.random.RandomState(
np.random.MT19937(np.random.SeedSequence(rnd))
)
self._lidx = None
self._ridx = None
self._ltree = None
self._rtree = None
self._dists = None
# Do a very quick check to see throw exceptions
# if self.dist_metric is invalid...
pdist(self.coords[:1, :], metric=self.dist_metric)
@property
def sample_count(self):
if isinstance(self.samples, int):
return self.samples
return int(self.samples * len(self.coords))
@property
def lidx(self):
"""The sampled indices into `self.coords` for the left sample."""
if self._lidx is None:
self._lidx = self.rnd.choice(
len(self.coords), size=self.sample_count, replace=False
)
return self._lidx
@property
def ridx(self):
"""The sampled indices into `self.coords` for the right sample."""
if self._ridx is None:
self._ridx = self.rnd.choice(
len(self.coords), size=self.sample_count, replace=False
)
return self._ridx
@property
def ltree(self):
"""If `self.dist_metric` is `euclidean`, a `scipy.spatial.cKDTree`
instance of the left sample of `self.coords`. Undefined otherwise."""
# only Euclidean supported
if self.dist_metric != "euclidean":
raise ValueError(
("A coordinate tree can only be constructed " "for an euclidean space")
)
if self._ltree is None:
self._ltree = cKDTree(self.coords[self.lidx, :])
return self._ltree
@property
def rtree(self):
"""If `self.dist_metric` is `euclidean`, a `scipy.spatial.cKDTree`
instance of the right sample of `self.coords`. Undefined otherwise."""
# only Euclidean supported
if self.dist_metric != "euclidean":
raise ValueError(
("A coordinate tree can only be constructed " "for an euclidean space")
)
if self._rtree is None:
self._rtree = cKDTree(self.coords[self.ridx, :])
return self._rtree
@property
def dists(self):
"""A distance matrix of the sampled point pairs as a
`scipy.sparse.csr_matrix` sparse matrix."""
if self._dists is None:
max_dist = self.max_dist
if max_dist is None:
max_dist = np.finfo(float).max
dists = self.ltree.sparse_distance_matrix(
self.rtree, max_dist, output_type="coo_matrix"
).tocsr()
dists.resize((len(self.coords), len(self.coords)))
dists.indices = self.ridx[dists.indices]
dists = dists.tocsc()
dists.indices = self.lidx[dists.indices]
dists = dists.tocsr()
self._dists = dists
return self._dists
# Subfunctions used in RasterEquidistantMetricSpace
# (outside class so that they can be pickled by multiprocessing)
def _get_disk_sample(
coords: np.ndarray,
center: Tuple[float, float],
center_radius: float,
rnd_func: np.random.RandomState,
sample_count: int,
):
"""
Subfunction for RasterEquidistantMetricSpace.
Calculates the indexes of a subsample in a disk "center sample".
Same parameters as in the class.
"""
# First index: preselect samples in a disk of certain radius
dist_center = np.sqrt(
(coords[:, 0] - center[0]) ** 2 + (coords[:, 1] - center[1]) ** 2
)
idx1 = dist_center < center_radius
count = np.count_nonzero(idx1)
indices1 = np.argwhere(idx1)
# Second index: randomly select half of the valid pixels,
# so that the other half can be used by the equidist
# sample for low distances
indices2 = rnd_func.choice(count, size=min(count, sample_count), replace=False)
if count != 1:
return indices1[indices2].squeeze()
else:
return indices1[indices2][0]
def _get_successive_ring_samples(
coords: np.ndarray,
center: Tuple[float, float],
equidistant_radii: List[float],
rnd_func: np.random.RandomState,
sample_count: int,
):
"""
Subfunction for RasterEquidistantMetricSpace.
Calculates the indexes of several subsamples within disks,
"equidistant sample". Same parameters as in the class.
"""
# First index: preselect samples in a ring of certain inside radius and outside radius
dist_center = np.sqrt(
(coords[:, 0] - center[0]) ** 2 + (coords[:, 1] - center[1]) ** 2
)
idx = np.logical_and(
dist_center[None, :] >= np.array(equidistant_radii[:-1])[:, None],
dist_center[None, :] < np.array(equidistant_radii[1:])[:, None],
)
# Loop over an iterative sampling in rings
list_idx = []
for i in range(len(equidistant_radii) - 1):
idx1 = idx[i, :]
count = np.count_nonzero(idx1)
indices1 = np.argwhere(idx1)
# Second index: randomly select half of the valid pixels, so that the other half can be used by the equidist
# sample for low distances
indices2 = rnd_func.choice(count, size=min(count, sample_count), replace=False)
sub_idx = indices1[indices2]
if count > 1:
list_idx.append(sub_idx.squeeze())
elif count == 1:
list_idx.append(sub_idx[0])
return np.concatenate(list_idx)
def _get_idx_dists(
coords: np.ndarray,
center: Tuple[float, float],
center_radius: float,
equidistant_radii: List[float],
rnd_func: np.random.RandomState,
sample_count: int,
max_dist: float,
i: int,
imax: int,
verbose: bool,
):
"""
Subfunction for RasterEquidistantMetricSpace.
Calculates the pairwise distances between a list of pairs of "center" and "equidistant" ensembles.
Same parameters as in the class.
"""
if verbose:
print("Working on subsample " + str(i + 1) + " out of " + str(imax))
cidx = _get_disk_sample(
coords=coords,
center=center,
center_radius=center_radius,
rnd_func=rnd_func,
sample_count=sample_count,
)
eqidx = _get_successive_ring_samples(
coords=coords,
center=center,
equidistant_radii=equidistant_radii,
rnd_func=rnd_func,
sample_count=sample_count,
)
ctree = cKDTree(coords[cidx, :])
eqtree = cKDTree(coords[eqidx, :])
dists = ctree.sparse_distance_matrix(eqtree, max_dist, output_type="coo_matrix")
return dists.data, cidx[dists.row], eqidx[dists.col]
def _mp_wrapper_get_idx_dists(argdict: dict):
"""
Multiprocessing wrapper for get_idx_dists.
"""
return _get_idx_dists(**argdict)
class RasterEquidistantMetricSpace(MetricSpace):
"""Like ProbabilisticMetricSpace but only applies to Raster data (2D gridded data) and
samples iteratively an `equidistant` subset within distances to a 'center' subset.
Subsets can either be a fraction of the total number of pairs (float < 1), or an integer count.
The 'center' subset corresponds to a disk centered on a point of the grid for which the location
randomly varies and can be redrawn and aggregated for several runs. The corresponding 'equidistant'
subset consists of a concatenation of subsets drawn from rings with radius gradually increasing
until the maximum extent of the grid is reached.
To define the subsampling, several parameters are available:
- The raw number of samples corresponds to the samples that will be drawn in each central disk.
Along with the ratio of samples drawn (see below), it will automatically define the radius
of the disk and rings for subsampling.
Note that the number of samples drawn will be repeatedly drawn for each equidistant rings
at a given radius, resulting in a several-fold amount of total samples for the equidistant
subset.
- The ratio of subsample defines the density of point sampled within each subset. It
defaults to 20%.
- The number of runs corresponds to the number of random center points repeated during the
subsampling. It defaults to a sampling of 1% of the grid with center subsets.
Alternatively, one can supply:
- The multiplicative factor to derive increasing rings radii, set as squareroot of 2 by
default in order to conserve a similar area for each ring and verify the sampling ratio.
Or directly:
- The radius of the central disk subset.
- A list of radii for the equidistant ring subsets.
When providing those spatial parameters, all other sampling parameters will be ignored
except for the raw number of samples to draw in each subset.
"""
def __init__(
self,
coords,
shape,
extent,
samples=100,
ratio_subsample=0.2,
runs=None,
n_jobs=1,
exp_increase_fac=np.sqrt(2),
center_radius=None,
equidistant_radii=None,
max_dist=None,
dist_metric="euclidean",
rnd=None,
verbose=False,
):
"""RasterEquidistantMetricSpace class
Parameters
----------
coords : numpy.ndarray
Coordinate array of shape (Npoints, 2)
shape : tuple[int, int]
Shape of raster (X, Y)
extent : tuple[float, float, float, float]
Extent of raster (Xmin, Xmax, Ymin, Ymax)
samples : float, int
Number of samples (int) or fraction of coords to sample (float < 1).
ratio_subsample:
Ratio of samples drawn within each subsample.
runs : int
Number of subsamplings based on a random center point
n_jobs : int
Number of jobs to use in multiprocessing for the subsamplings.
exp_increase_fac : float
Multiplicative factor of increasing radius for ring subsets
center_radius: float
Radius of center subset, overrides other sampling parameters.
equidistant_radii: list
List of radii of ring subset, overrides other sampling parameters.
dist_metric : str
Distance metric names as used by scipy.spatial.distance.pdist
max_dist : float
Maximum distance between points after which the distance
is considered infinite and not calculated.
verbose : bool
Whether to print statements in the console
rnd : numpy.random.RandomState, int
Random state to use for the sampling.
"""
if dist_metric != "euclidean":
raise ValueError(
(
"A RasterEquidistantMetricSpace class can only be constructed "
"for an euclidean space"
)
)
self.coords = coords.copy()
self.dist_metric = dist_metric
self.shape = shape
self.extent = extent
self.res = np.mean(
[
(extent[1] - extent[0]) / (shape[0] - 1),
(extent[3] - extent[2]) / (shape[1] - 1),
]
)
# if the maximum distance is not specified, find the maximum possible distance from the extent
if max_dist is None:
max_dist = np.sqrt(
(extent[1] - extent[0]) ** 2 + (extent[3] - extent[2]) ** 2
)
self.max_dist = max_dist
self.samples = samples
if runs is None:
# If None is provided, try to sample center samples for about one percent of the area
runs = int((self.shape[0] * self.shape[1]) / self.samples * 1 / 100.0)
self.runs = runs
self.n_jobs = n_jobs
if rnd is None:
self.rnd = np.random.default_rng()
elif isinstance(rnd, np.random.RandomState):
self.rnd = rnd
else:
self.rnd = np.random.RandomState(
np.random.MT19937(np.random.SeedSequence(rnd))
)
# Radius of center subsample, based on sample count
# If None is provided, the disk is defined with the exact size to hold the number of percentage of samples
# defined by the user
if center_radius is None:
center_radius = (
np.sqrt(1.0 / ratio_subsample * self.sample_count / np.pi) * self.res
)
if verbose:
print(
"Radius of center disk sample for sample count of "
+ str(self.sample_count)
+ " and subsampling ratio"
" of " + str(ratio_subsample) + ": " + str(center_radius)
)
self._center_radius = center_radius
# Radii of equidistant ring subsamples
# If None is provided, the rings are defined with exponentially increasing radii with a factor sqrt(2), which
# means each ring will have just enough area to sample at least the number of samples desired, and same
# for each of the following, due to:
# (sqrt(2)R)**2 - R**2 = R**2
if equidistant_radii is None:
equidistant_radii = [0.0]
increasing_rad = self._center_radius
while increasing_rad < self.max_dist:
equidistant_radii.append(increasing_rad)
increasing_rad *= exp_increase_fac
equidistant_radii.append(self.max_dist)
if verbose:
print(
"Radii of equidistant ring samples for increasing factor of "
+ str(exp_increase_fac)
+ ": "
)
print(equidistant_radii)
self._equidistant_radii = equidistant_radii
self.verbose = verbose
# Index and KDTree of center sample
self._cidx = None
self._ctree = None
# Index and KDTree of equidistant sample
self._eqidx = None
self._eqtree = None
self._centers = None
self._dists = None
# Do a very quick check to see throw exceptions
# if self.dist_metric is invalid...
pdist(self.coords[:1, :], metric=self.dist_metric)
@property
def sample_count(self):
if isinstance(self.samples, int):
return self.samples
return int(self.samples * len(self.coords))
@property
def cidx(self):
"""The sampled indices into `self.coords` for the center sample."""
return self._cidx
@property
def ctree(self):
"""If `self.dist_metric` is `euclidean`, a `scipy.spatial.cKDTree`
instance of the center sample of `self.coords`. Undefined otherwise."""
# only Euclidean supported
if self.dist_metric != "euclidean":
raise ValueError(
("A coordinate tree can only be constructed " "for an euclidean space")
)
if self._ctree is None:
self._ctree = [
cKDTree(self.coords[self.cidx[i], :]) for i in range(len(self.cidx))
]
return self._ctree
@property
def eqidx(self):
"""The sampled indices into `self.coords` for the equidistant sample."""
return self._eqidx
@property
def eqtree(self):
"""If `self.dist_metric` is `euclidean`, a `scipy.spatial.cKDTree`
instance of the equidistant sample of `self.coords`. Undefined otherwise."""
# only Euclidean supported
if self._eqtree is None:
self._eqtree = [
cKDTree(self.coords[self.eqidx[i], :]) for i in range(len(self.eqidx))
]
return self._eqtree
@property
def dists(self):
"""A distance matrix of the sampled point pairs as a
`scipy.sparse.csr_matrix` sparse matrix."""
# Derive distances
if self._dists is None:
idx_center = self.rnd.choice(
len(self.coords), size=min(self.runs, len(self.coords)), replace=False
)
# Each run has a different center
centers = self.coords[idx_center]
# Running on a single core: for loop
if self.n_jobs == 1:
list_dists, list_cidx, list_eqidx = ([] for i in range(3))
for i in range(self.runs):
center = centers[i]
dists, cidx, eqidx = _get_idx_dists(
self.coords,
center=center,
center_radius=self._center_radius,
equidistant_radii=self._equidistant_radii,
rnd_func=self.rnd,
sample_count=self.sample_count,
max_dist=self.max_dist,
i=i,
imax=self.runs,
verbose=self.verbose,
)
list_dists.append(dists)
list_cidx.append(cidx)
list_eqidx.append(eqidx)
# Running on several cores: multiprocessing
else:
# Arguments to pass: only centers and loop index for verbose are changing
argsin = [
{
"center": centers[i],
"coords": self.coords,
"center_radius": self._center_radius,
"equidistant_radii": self._equidistant_radii,
"rnd_func": self.rnd,
"sample_count": self.sample_count,
"max_dist": self.max_dist,
"i": i,
"imax": self.runs,
"verbose": self.verbose,
}
for i in range(self.runs)
]
# Process in parallel
pool = mp.Pool(self.n_jobs, maxtasksperchild=1)
outputs = pool.map(_mp_wrapper_get_idx_dists, argsin, chunksize=1)
pool.close()
pool.join()
# Get lists of outputs
list_dists, list_cidx, list_eqidx = list(zip(*outputs))
# Define class objects
self._centers = centers
self._cidx = list_cidx
self._eqidx = list_eqidx
# concatenate the coo matrixes
d = np.concatenate(list_dists)
c = np.concatenate(list_cidx)
eq = np.concatenate(list_eqidx)
# remove possible duplicates (that would be summed by default)
# from https://stackoverflow.com/questions/28677162/ignoring-duplicate-entries-in-sparse-matrix
# Stable solution but a bit slow
c, eq, d = zip(*set(zip(c, eq, d)))
dists = sparse.csr_matrix(
(d, (c, eq)), shape=(len(self.coords), len(self.coords))
)
# comment mmaelicke: We need to fall back to the slower solution as dok._update is not supported in scipy 1.0 anymore
#
# Solution 5+ times faster than the preceding, but relies on _update() which might change in scipy (which
# only has an implemented method for summing duplicates, and not ignoring them yet)
# dok = sparse.dok_matrix((len(self.coords), len(self.coords)))
# dok._update(zip(zip(c, eq), d))
# dists = dok.tocsr()
self._dists = dists
return self._dists