Source code for skgstat.SpaceTimeVariogram

"""

"""
import numpy as np
from scipy.spatial.distance import pdist
from scipy.optimize import curve_fit
import matplotlib.pyplot as plt
import inspect

from skgstat import binning, estimators, Variogram, stmodels, plotting


[docs] class SpaceTimeVariogram: """ """
[docs] def __init__(self, coordinates, values, xdist_func='euclidean', tdist_func='euclidean', x_lags=10, t_lags='max', maxlag=None, xbins='even', tbins='even', estimator='matheron', use_nugget=False, model='product-sum', verbose=False ): # set coordinates array self._X = np.asarray(coordinates) # combined pairwise differences self._diff = None # set verbosity, not implemented yet self.verbose = verbose # set attributes to be fulled during calculation self.cov = None self.cof = None self.XMarginal = None self.TMarginal = None # set values self._values = None self.set_values(values=values) # distance matrix for space and time self._xdist = None self._tdist = None # set distance calculation functions self._xdist_func = None self._tdist_func = None self.set_xdist_func(func_name=xdist_func) self.set_tdist_func(func_name=tdist_func) # lags and max lag self._x_lags = None self.x_lags = x_lags self._t_lags = None self.t_lags = t_lags self._maxlag = None self.maxlag = maxlag # estimator settings self._estimator = None self.set_estimator(estimator_name=estimator) # initialize binning arrays # space self._xbin_func = None self._xbin_func_name = None self._xgroups = None self._xbins = None self.set_bin_func(bin_func=xbins, axis='space') # time self._tbin_func = None self._tbin_func_name = None self._tgroups = None self._tbins = None self.set_bin_func(bin_func=tbins, axis='time') # set nugget self._use_nugget = None self.use_nugget = use_nugget # set the model self._model = model self.set_model(model_name=model) self._model_params = {} # _x and values are set, build the marginal Variogram objects # marginal space variogram self.create_XMarginal() # marginal time variogram self.create_TMarginal()
# fit the model with forced preprocessing #self.fit(force=True) # ----------------------------------------------------------------------- # # ATTRIBUTE SETTING # # ----------------------------------------------------------------------- # @property def values(self): """Values The SpaceTimeVariogram stores (and needs) the observations as a two dimensional array. The first axis (rows) need to match the coordinate array, but instead of containing one value for each location, the values shall contain a time series per location. Returns ------- values : numpy.array Returns a two dimensional array of all observations. The first dimension (rows) matches the coordinate array and the second axis contains the time series for each observation point. """ return self._values
[docs] def set_values(self, values): """Set new values The values should be an (m, n) array with m matching the size of coordinates first dimension and n is the time dimension. Raises ------ ValueError : in case n <= 1 or values are not an array of correct dimensionality AttributeError : in case values cannot be converted to a numpy.array """ values = np.asarray(values) # check dtype if not isinstance(values, np.ndarray) or \ (values.dtype is not np.dtype(float) and values.dtype is not np.dtype(int)): raise AttributeError('values cannot be converted to a proper ' '(m,n) shaped array.') # check shape try: m, n = values.shape if m != self._X.shape[0]: raise ValueError except ValueError: raise ValueError('The values shape do not match coordinates.') if n <= 1: raise ValueError('A SpaceTimeVariogram needs more than one ' 'observation on the time axis.') # save new values self._values = values # dismiss the pairwise differences, and lags self._diff = None # recreate the space marginal variogram if self.XMarginal is not None: self.create_XMarginal() if self.TMarginal is not None: self.create_TMarginal()
@values.setter def values(self, new_values): self.set_values(values=new_values) @property def xdist_func(self): return self._xdist_func @xdist_func.setter def xdist_func(self, func): self.set_xdist_func(func_name=func)
[docs] def set_xdist_func(self, func_name): """Set new space distance function Set a new function for calculating the distance matrix in the space dimension. At the moment only strings are supported. Will be passed to scipy.spatical.distance.pdist as 'metric' attribute. Parameters ---------- func_name : str The name of the function used to calculate the pairwise distances. Will be passed to scipy.spatial.distance.pdist as the 'metric' attribute. Raises ------ ValueError : in case a non-string argument is passed. """ if isinstance(func_name, str): self._xdist_func_name = func_name self._xdist_func = lambda x: pdist(x, metric=func_name) else: raise ValueError('For now only str arguments are supported.') # reset the distances self._xdist = None # update marignal self._set_xmarg_params()
@property def tdist_func(self): return self._tdist_func @tdist_func.setter def tdist_func(self, func): self.set_tdist_func(func_name=func)
[docs] def set_tdist_func(self, func_name): """Set new space distance function Set a new function for calculating the distance matrix in the space dimension. At the moment only strings are supported. Will be passed to scipy.spatical.distance.pdist as 'metric' attribute. Parameters ---------- func_name : str The name of the function used to calculate the pairwise distances. Will be passed to scipy.spatial.distance.pdist as the 'metric' attribute. Raises ------ ValueError : in case a non-string argument is passed. """ if isinstance(func_name, str): self._tdist_func_name = func_name self._tdist_func = lambda t: pdist(t, metric=func_name) else: raise ValueError('For now only str arguments are supported.') # reset the distances self._tdist = None # update marignal self._set_tmarg_params()
@property def distance(self): """Distance matrices Returns both the space and time distance matrix. This property is equivalent to two separate calls of :func:`xdistance <skgstat.SpaceTimeVariogram.xdistance>` and :func:`tdistance <skgstat.SpaceTimeVariogram.tdistance>`. Returns ------- distance matrices : (numpy.array, numpy.array) Returns a tuple of the two distance matrices in space and time. Each distance matrix is a flattened upper triangle of the distance matrix squareform in row orientation. """ return self.xdistance, self.tdistance @property def xdistance(self): """Distance matrix (space) Return the upper triangle of the squareform pairwise distance matrix. Returns ------- xdistance : numpy.array 1D-array of the upper triangle of a squareform representation of the distance matrix. """ self.__calc_xdist(force=False) return self._xdist @property def tdistance(self): """Time distance Returns a distance matrix containing the distance of all observation points in time. The time 'coordiantes' are created from the values multidimensional array, where the second dimension is assumed to be time. The unit will be time steps. Returns ------- tdistance : numpy.array 1D-array of the upper triangle of a squareform representation of the distance matrix. """ self.__calc_tdist(force=False) return self._tdist @property def x_lags(self): if self._x_lags is None: self._x_lags = len(self.xbins) return self._x_lags @x_lags.setter def x_lags(self, lags): if not isinstance(lags, int): raise ValueError('Only integers are supported as lag counts.') # set new value self._x_lags = lags # reset bins and groups self._xbins = None self._xgroups = None # update marignal self._set_xmarg_params() @property def t_lags(self): if isinstance(self._t_lags, str): if self._t_lags.lower() == 'max': return self.values.shape[1] - 1 else: raise ValueError("Only 'max' supported as string argument.") elif self._t_lags is None: self._t_lags = len(self.tbins) return self._t_lags @t_lags.setter def t_lags(self, lags): # set new value self._t_lags = lags # reset bins self._tbins = None self._tgroups = None # update marignal self._set_tmarg_params() @property def maxlag(self): return self._maxlag @maxlag.setter def maxlag(self, value): # reset fitting self.cov, self.cof = None, None # remove binning self._xbins = None self._xgroups = None # set the new value if value is None: self._maxlag = None elif isinstance(value, str): if value == 'median': self._maxlag = np.median(self.xdistance) elif value == 'mean': self._maxlag = np.mean(self.xdistance) elif value < 1: self._maxlag = value * np.max(self.xdistance) else: self._maxlag = value # update marignal self._set_xmarg_params()
[docs] def set_bin_func(self, bin_func, axis): """Set binning function Set a new binning function to either the space or time axis. Both axes support the methods: ['even', 'uniform']: * **'even'**, create even width bins * **'uniform'**, create bins of uniform distribution Parameters ---------- bin_func : str Specifies the function to be loaded. Can be either 'even' or 'uniform'. axis : str Specifies the axis to be used for binning. Can be either 'space' or 'time', or one of the two shortcuts 's' and 't' See Also -------- skgstat.binning.even_width_lags skgstat.binning.uniform_count_lags """ adjust_n_lags = False # switch the function if bin_func.lower() == 'even': f = binning.even_width_lags elif bin_func.lower() == 'uniform': f = binning.uniform_count_lags elif isinstance(bin_func, str): # define a wrapper to pass the name def wrapper(distances, n, maxlag): return binning.auto_derived_lags(distances, bin_func.lower(), maxlag) f = wrapper adjust_n_lags = True else: raise ValueError('%s binning method is not known' % bin_func) # switch the axis if axis.lower() == 'space' or axis.lower() == 's': self._xbin_func = f self._xbin_func_name = bin_func if adjust_n_lags: self._x_lags = None # update marginal self._set_xmarg_params() # reset self._xgroups = None self._xbins = None elif axis.lower() == 'time' or axis.lower() == 't': self._tbin_func = f self._tbin_func_name = bin_func if adjust_n_lags: self._t_lags = None # update marignal self._set_tmarg_params() # reset self._tgroups = None self._tbins = None else: raise ValueError('%s is not a valid axis' % axis) # reset fitting params self.cof, self.cof = None, None
@property def xbins(self): """Spatial binning Returns the bin edges over the spatial axis. These can be used to align the spatial lag class grouping to actual distance lags. The length of the array matches the number of spatial lag classes. Returns ------- bins : numpy.array Returns the edges of the current spatial binning. """ # check if cached if self._xbins is None: self._xbins, n = self._xbin_func(self.xdistance, self._x_lags, self.maxlag) # if n is not None, the binning func overwrote it if n is not None: self._x_lags = n return self._xbins @xbins.setter def xbins(self, bins): if isinstance(bins, int): self._xbins = None self._x_lags = bins elif isinstance(bins, (list, tuple, np.ndarray)): self._xbins = np.asarray(bins) self._x_lags = len(self._xbins) elif isinstance(bins, str): self.set_bin_func(bin_func=bins, axis='space') else: raise AttributeError('bin value cannot be parsed.') # reset the groups self._xgroups = None # update marignal self._set_xmarg_params() @property def tbins(self): """Temporal binning Returns the bin edges over the temporal axis. These can be used to align the temporal lag class grouping to actual time lags. The length of the array matches the number of temporal lag classes. Returns ------- bins : numpy.array Returns the edges of the current temporal binning. """ if self._tbins is None: # this is a bit dumb, but we cannot pass a string as n param tn = self._t_lags if self._t_lags != 'max' else self.t_lags self._tbins, n = self._tbin_func(self.tdistance, tn, None) # if n is not None, the binning func overwote it if n is not None: self._t_lags = n return self._tbins @tbins.setter def tbins(self, bins): if isinstance(bins, int): self._tbins = None self._t_lags = bins elif isinstance(bins, (list, tuple, np.ndarray)): self._tbins = np.asarray(bins) self._t_lags = len(self._tbins) elif isinstance(bins, str): self.set_bin_func(bin_func=bins, axis='time') else: raise AttributeError('bin value cannot be parsed.') # reset the groups self._tgroups = None # update marignal self._set_tmarg_params() @property def meshbins(self): return np.meshgrid(self.xbins, self.tbins) @property def use_nugget(self): return self._use_nugget @use_nugget.setter def use_nugget(self, nugget): if not isinstance(nugget, bool): raise ValueError('use_nugget has to be a boolean value.') self._use_nugget = nugget # update marginals self._set_xmarg_params() self._set_tmarg_params() @property def estimator(self): return self._estimator @estimator.setter def estimator(self, value): self.set_estimator(estimator_name=value)
[docs] def set_estimator(self, estimator_name): # reset the fitting self.cof, self.cov = None, None if isinstance(estimator_name, str): if estimator_name.lower() == 'matheron': self._estimator = estimators.matheron elif estimator_name.lower() == 'cressie': self._estimator = estimators.cressie elif estimator_name.lower() == 'dowd': self._estimator = estimators.dowd elif estimator_name.lower() == 'genton': self._estimator = estimators.genton elif estimator_name.lower() == 'minmax': self._estimator = estimators.minmax elif estimator_name.lower() == 'percentile': self._estimator = estimators.percentile elif estimator_name.lower() == 'entropy': self._estimator = estimators.entropy else: raise ValueError( ('Variogram estimator %s is not understood, please' + 'provide the function.') % estimator_name ) elif callable(estimator_name): self._estimator = estimator_name else: raise ValueError('The estimator has to be a string or callable.') # update marignal self._set_xmarg_params() self._set_tmarg_params()
@property def model(self): return self._model @model.setter def model(self, value): self.set_model(model_name=value)
[docs] def set_model(self, model_name): """Set space-time model Set a new space-time model. It has to be either a callable of correct signature or a string identifying one of the predefined models Parameters ---------- model_name : str, callable Either a callable of correct signature or a valid model name. Valid names are: * sum * product * product-sum """ # reset fitting self.cof, self.cov = None, None if isinstance(model_name, str): name = model_name.lower() if name == 'sum': self._model = stmodels.sum elif name == 'product': self._model = stmodels.product elif name == 'product-sum' or name == 'product_sum': self._model = stmodels.product_sum elif callable(model_name): self._model = model_name else: raise ValueError('model_name has to be a string or callable.')
[docs] def create_XMarginal(self): """ Create an instance of skgstat.Variogram for the space marginal variogram by arranging the coordinates and values and infer parameters from this SpaceTimeVariogram instance. """ self.XMarginal = Variogram( np.vstack([self._X] * self._values.shape[1]), self._values.T.flatten() ) self._set_xmarg_params()
[docs] def create_TMarginal(self): """ Create an instance of skgstat.Variogram for the time marginal variogram by arranging the coordinates and values and infer parameters from this SpaceTimeVariogram instance. """ coords = np.stack(( np.arange(self._values.shape[1]), [0] * self._values.shape[1] ), axis=1) self.TMarginal = Variogram( np.vstack([coords] * self._values.shape[0]), self._values.flatten() ) self._set_tmarg_params()
def _set_xmarg_params(self): """ Update the parameters for the space marginal variogram with any parameter that can be inferred from the current SpaceTimeVariogram instance. """ # if not marginal variogram is set, return if self.XMarginal is None: return # distance # FIXME: Handle xdist_func_name vs xdist_func better (like in Variogra.py) self.XMarginal.dist_function = self._xdist_func_name self.XMarginal.n_lags = self.x_lags # binning self.XMarginal.bin_func = self._xbin_func_name self.XMarginal.maxlag = self.maxlag # nugget self.XMarginal.use_nugget = self.use_nugget # estimator self.XMarginal.estimator = self.estimator.__name__ def _set_tmarg_params(self): """ Update the parameters for the time marginal variogram with any parameter that can be inferred from the current SpaceTimeVariogram instance. """ # if no marginal variogram is set, return if self.TMarginal is None: return # distance self.TMarginal.dist_function = self._tdist_func_name self.TMarginal.n_lags = self.t_lags # binning self.TMarginal.bin_func = self._tbin_func_name # nugget self.TMarginal.use_nugget = self.use_nugget # estimator self.TMarginal.estimator = self.estimator.__name__ # ------------------------------------------------------------------------ # # PRE-PROCESSING # # ------------------------------------------------------------------------ #
[docs] def lag_groups(self, axis): """Lag class group mask array Returns a mask array for the given axis (either 'space' or 'time'). It will have as amany elements as the respective distance matrices. **Unlike the base Variogram class, it does not mask the array of pairwise differences.**. It will mask the distance matrix of the respective axis. Parameters ---------- axis : str Can either be 'space' or 'time'. Specifies the axis the mask array shall be returned for. Returns ------- masK_array : numpy.array mask array that identifies the lag class group index for each pair of points on the given axis. """ if not isinstance(axis, str): raise AttributeError('axis has to be of type string.') # space axis if axis.lower() == 'space' or axis.lower() == 's': if self._xgroups is None: self._calc_group(axis=axis, force=False) return self._xgroups # time axis elif axis.lower() == 'time' or axis.lower() == 't': if self._tgroups is None: self._calc_group(axis=axis, force=False) return self._tgroups else: raise ValueError("axis has to be one of 'space', 'time'.")
[docs] def lag_classes(self): """Iterator over all lag classes Returns an iterator over all lag classes by aligning all time lags over all space lags. This means that it will yield all time lag groups for a space lag of index 0 at first and then iterate the space lags. Returns ------- iterator """ # are differences already calculated if self._diff is None: self._calc_diff(force=False) # get the group masking arrays xgrp = self.lag_groups(axis='space') tgrp = self.lag_groups(axis='time') def diff_select(x, t): return self._diff[np.where(xgrp == x)[0]][:, np.where(tgrp == t)[0]] # iterate for x in range(self.x_lags): for t in range(self.t_lags): yield diff_select(x, t).flatten()
def _get_experimental(self): # TODO: fix this if self.estimator.__name__ == 'entropy': raise NotImplementedError # this might z = np.fromiter( (self.estimator(vals) for vals in self.lag_classes()), dtype=float ) return z.copy() @property def experimental(self): """Experimental Variogram Returns an experimental variogram for the given data. The semivariances are arranged over the spatial binning as defined in SpaceTimeVariogram.xbins and temporal binning defined in SpaceTimeVariogram.tbins. Returns ------- variogram : numpy.ndarray Returns an two dimensional array of semivariances over space on the first axis and time over the second axis. """ return self._get_experimental() def __calc_xdist(self, force=False): """Calculate distance in space Use :func:`xdist_func <skgstat.SpaceTimeVariogram.xdist_func>` to calculate the pairwise space distance matrix. The calculation will be cached and not recalculated. The recalculation can be forced setting ``force=True``. Parameters ---------- force : bool If True, an eventually cached version of the distance matrix will be deleted. """ if self._xdist is None or force: self._xdist = self.xdist_func(self._X) def __calc_tdist(self, force=False): """Calculate distance in time Use :func:`tdist_func <skgstat.SpaceTimeVariogram.tdist_func>` to calculate the pairwise time distance matrix. The calculation will be cached and not recalculated. The recalculation can be forced setting ``force=True``. Parameters ---------- force : bool If True, an eventually cached version of the distance matrix will be deleted. """ if self._tdist is None or force: # extract the timestamps t = np.stack(( np.arange(self.values.shape[1]), [0] * self.values.shape[1] ), axis=1) self._tdist = self.tdist_func(t) def _calc_diff(self, force=False): """Calculate pairwise differences Calculate the the pairwise differences for all space lag and time lag class combinations. The result is stored in the SpaceTimeVariogram._diff matrix, which has the form (m, n) with m the size of the space distance array and n the size of the time distance array. Parameters ---------- force : bool If True, any calculated and cached result will be deleted and a clean calculation will be performed. Notes ----- This is a Python only implementation that can get quite slow as any added observation on the space or time axis will increase the matrix dimension by one. It is also slow as 4 loops are needed to loop the matrix. I am evaluating at the moment if the function performs better using numpys vectorizations or by implementing a Cython, Fortran, Rust lib that can do the heavy stuff. """ # check the force if not force and self._diff is not None: return # get size of distance matrices xn = self.xdistance.size tn = self.tdistance.size # get outer and inner iterators outer, inner = self.values.shape v = self.values # prepare TODO: here the Fortran, Rust, whatever calc self._diff = np.zeros((xn, tn)) * np.nan xidx = 0 for xi in range(outer): for xj in range(outer): if xi < xj: tidx = 0 for ti in range(inner): for tj in range(inner): if ti < tj: self._diff[xidx][tidx] = np.abs(v[xi, ti] - v[xj, tj]) tidx += 1 xidx += 1 def _calc_group(self, axis, force=False): """Calculate lag class grouping Calculate a lag class grouping mask array for the given axis. The axis can be either 'space' or 'time'. The result will be cached either in the _sgroups (space) or _tgroups (time) array will match the respective distance matrix. The group value indicates the lag class index for the matching point pair. If force is False (default) and the groups have been calculated, no new calculation will be started. Parameters ---------- axis : str Can be either 'space' for the space lag grouping or 'time' for the temporal lag grouping. force : bool If True, any present cached grouping array will be overwritten. Returns ------- void """ # switch the axis if axis.lower() == 'space' or axis.lower() == 's': grp = self._xgroups fmt = 'x' elif axis.lower() == 'time' or axis.lower() == 't': grp = self._tgroups fmt = 't' else: raise ValueError('Axis %s is not supported' % axis) # check the force if not force and grp is not None: return # copy the arrays bins = getattr(self, '%sbins' % fmt) d = getattr(self, '%sdistance' % fmt) # set all groups to -1 grp = np.ones(len(d), dtype=int) * -1 # go for the classification for i, bounds in enumerate(zip([0] + list(bins), bins)): grp[np.where((d > bounds[0]) & (d <= bounds[1]))] = i # save setattr(self, '_%sgroups' % fmt, grp)
[docs] def preprocessing(self, force=False): """Preprocessing Start all necessary calculation jobs needed to derive an experimental variogram. This hasto be present before the model fitting can be done. The force parameter will make all calculation functions to delete all cached intermediate results and make a clean calculation. Parameters ---------- force : bool If True, all cached intermediate results will be deleted and a clean calculation will be done. """ # recalculate distances self.__calc_xdist(force=force) self.__calc_tdist(force=force) self._calc_diff(force=force) self._calc_group(axis='space', force=force) self._calc_group(axis='time', force=force)
# ------------------------------------------------------------------------ # # FITTING # # ------------------------------------------------------------------------ #
[docs] def fit(self, force=False): # delete the last cov and cof self.cof = None self.cov = None # if force, force a clean preprocessing self.preprocessing(force=force) # load the fitting data xx, yy = self.meshbins z = self.experimental # remove NaN values ydata = z[np.where(~np.isnan(z))] _xx = xx.flatten()[np.where(~np.isnan(z))[0]] _yy = yy.flatten()[np.where(~np.isnan(z))[0]] xdata = np.vstack((_xx, _yy)) # get the marginal variogram functions Vx = self.XMarginal.fitted_model Vt = self.TMarginal.fitted_model # get the params of the model _code_obj = self._model.__wrapped__.__code__ model_args = inspect.getargs(_code_obj).args self._model_params = dict() # fix the sills? fix_sills = True # TODO: Make this a param in __init__ if fix_sills and 'Cx' in model_args: self._model_params['Cx'] = self.XMarginal.describe()['sill'] if fix_sills and 'Ct' in model_args: self._model_params['Ct'] = self.TMarginal.describe()['sill'] # are there parameters left to fit? free_args = len(model_args) - 3 - len(self._model_params.keys()) if free_args == 0: # no params left self.cof = [] self.cov = [] return # wrap the model def _model(lags, *args): return self._model(lags, Vx, Vt, *args, **self._model_params) self.cof, self.cov = curve_fit( _model, xdata.T, ydata, bounds=[0, np.inf], p0=[1.] * free_args ) return
@property def fitted_model(self): """ Returns ------- """ # if not model not fitted, fit it if self.cof is None or self.cov is None: self.fit(force=False) # get the model func func = self._model # get the marginal Variograms Vx = self.XMarginal.fitted_model Vt = self.TMarginal.fitted_model cof = self.cof if self.cof is not None else [] params = self._model_params if self._model_params is not None else {} # define the function def model(lags): return func(lags, Vx, Vt, *cof, **params) return model # ------------------------------------------------------------------------ # # RESULTS # # ------------------------------------------------------------------------ #
[docs] def get_marginal(self, axis, lag=0): """Marginal Variogram Returns the marginal experimental variogram of axis for the given lag on the other axis. Axis can either be 'space' or 'time'. The parameter lag specifies the index of the desired lag class on the other axis. Parameters ---------- axis : str The axis a marginal variogram shall be calculated for. Can either be ' space' or 'time'. lag : int Index of the lag class group on the other axis to be used. In case this is 0, this is often considered to be *the* marginal variogram of the axis. Returns ------- variogram : numpy.array Marginal variogram of the given axis """ # check the axis if not isinstance(axis, str): raise AttributeError('axis has to be of type string.') if axis.lower() == 'space' or axis.lower() == 's': return np.fromiter( (self.estimator(self._get_member(i, lag)) for i in range(self.x_lags)), dtype=float ) elif axis.lower() == 'time' or axis.lower() == 't': return np.fromiter( (self.estimator(self._get_member(lag, j)) for j in range(self.t_lags)), dtype=float ) else: raise ValueError("axis can either be 'space' or 'time'.")
def _get_member(self, xlag, tlag): x_idxs = self._xgroups == xlag t_idxs = self._tgroups == tlag return self._diff[np.where(x_idxs)[0]][:, np.where(t_idxs)[0]].flatten() # ------------------------------------------------------------------------ # # PLOTTING # # ------------------------------------------------------------------------ #
[docs] def plot(self, kind='scatter', ax=None, **kwargs): # pragma: no cover """Plot the experimental variogram At the current version the SpaceTimeVariogram class is not capable of modeling a spe-time variogram function, therefore all plots will only show the experimental variogram. As the experimental space-time semivariance is depending on a space and a time lag, one would basically need a 3D scatter plot, which is the default plot. However, 3D plots can be, especially for scientific usage, a bit problematic. Therefore the plot function can plot a variety of 3D and 2D plots. Parameters ---------- kind : str Has to be one of: * **scatter** * **surface** * **contour** * **contourf** * **matrix** * **marginals** ax : matplotlib.AxesSubplot, mpl_toolkits.mplot3d.Axes3D, None If None, the function will create a new figure and suitable Axes. Else, the Axes object can be passed to plot the variogram into an existing figure. In this case, one has to pass the correct type of Axes, whether it's a 3D or 2D kind of a plot. kwargs : dict All keyword arguments are passed down to the actual plotting function. Refer to their documentation for a more detailed description. Returns ------- fig : matplotlib.Figure See Also -------- SpaceTimeVariogram.scatter SpaceTimeVariogram.surface SpaceTimeVariogram.marginals """ # switch the plot kind if not isinstance(kind, str): raise ValueError('kind has to be of type string.') if kind.lower() == 'scatter': return self.scatter(ax=ax, **kwargs) elif kind.lower() == 'surf' or kind.lower() == 'surface': return self.surface(ax=ax, **kwargs) elif kind.lower() == 'contour': return self.contour(ax=ax) elif kind.lower() == 'contourf': return self.contourf(ax=ax) elif kind.lower() == 'matrix' or kind.lower() == 'mat': raise NotImplementedError elif kind.lower() == 'marginals': return self.marginals(plot=True, axes=ax, **kwargs) else: raise ValueError('kind %s is not a valid value.')
[docs] def scatter(self, ax=None, elev=30, azim=220, c='blue', depthshade=True, **kwargs): # pragma: no cover """3D Scatter Variogram Plot the experimental variogram into a 3D matplotlib.Figure. The two variogram axis (space, time) will span a meshgrid over the x and y axis and the semivariance will be plotted as z value over the respective space and time lag coordinate. Parameters ---------- ax : mpl_toolkits.mplot3d.Axes3D, None If ax is None (default), a new Figure and Axes instance will be created. If ax is given, this instance will be used for the plot. elev : int The elevation of the 3D plot, which is a rotation over the xy-plane. azim : int The azimuth of the 3D plot, which is a rotation over the z-axis. c : str Color of the scatter points, will be passed to the matplotlib ``c`` argument. The function also accepts ``color`` as an alias. depthshade : bool If True, the scatter points will change their color according to the distance from the viewport for illustration reasons. kwargs : dict Other kwargs accepted are only ``color`` as an alias for ``c`` and ``figsize``, if ax is None. Anything else will be ignored. Returns ------- fig : matplotlib.Figure Examples -------- In case an ax shall be passed to the function, note that this plot requires an AxesSubplot, that is capable of creating a 3D plot. This can be done like: .. code-block:: python from mpl_toolkits.mplot3d import Axes3D import matplotlib.pyplot as plt fig = plt.figure() ax = fig.add_subplot(111, projection='3d') # STV is an instance of SpaceTimeVariogram STV.scatter(ax=ax) See Also -------- SpaceTimeVariogram.surface """ return self._plot3d(kind='scatter', ax=ax, elev=elev, azim=azim, c=c, depthshade=depthshade, **kwargs)
[docs] def surface(self, ax=None, elev=30, azim=220, color='blue', alpha=0.5, **kwargs): # pragma: no cover """3D Scatter Variogram Plot the experimental variogram into a 3D matplotlib.Figure. The two variogram axis (space, time) will span a meshgrid over the x and y axis and the semivariance will be plotted as z value over the respective space and time lag coordinate. Unlike :func:`scatter <skgstat.SpaceTimeVariogram.scatter>` the semivariance will not be scattered as points but rather as a surface plot. The surface is approximated by (Delauney) triangulation of the z-axis. Parameters ---------- ax : mpl_toolkits.mplot3d.Axes3D, None If ax is None (default), a new Figure and Axes instance will be created. If ax is given, this instance will be used for the plot. elev : int The elevation of the 3D plot, which is a rotation over the xy-plane. azim : int The azimuth of the 3D plot, which is a rotation over the z-axis. color : str Color of the scatter points, will be passed to the matplotlib ``color`` argument. The function also accepts ``c`` as an alias. alpha : float Sets the transparency of the surface as 0 <= alpha <= 1, with 0 being completely transparent. kwargs : dict Other kwargs accepted are only ``color`` as an alias for ``c`` and ``figsize``, if ax is None. Anything else will be ignored. Returns ------- fig : matplotlib.Figure Notes ----- In case an ax shall be passed to the function, note that this plot requires an AxesSubplot, that is capable of creating a 3D plot. This can be done like: .. code-block:: python from mpl_toolkits.mplot3d import Axes3D import matplotlib.pyplot as plt fig = plt.figure() ax = fig.add_subplot(111, projection='3d') # STV is an instance of SpaceTimeVariogram STV.surface(ax=ax) See Also -------- SpaceTimeVariogram.scatter """ return self._plot3d(kind='surf', ax=ax, elev=elev, azim=azim, color=color, alpha=alpha, **kwargs)
def _plot3d(self, kind='scatter', ax=None, elev=30, azim=220, **kwargs): # pragma: no cover # get the backend used_backend = plotting.backend() if used_backend == 'matplotlib': return plotting.matplotlib_plot_3d(self, kind=kind, ax=ax, elev=elev, azim=azim, **kwargs) elif used_backend == 'plotly': return plotting.plotly_plot_3d(self, kind=kind, fig=ax, **kwargs) # if we reach this line, somethings wrong with plotting backend raise ValueError('The plotting backend has an undefined state.')
[docs] def contour(self, ax=None, zoom_factor=100., levels=10, colors='k', linewidths=0.3, method="fast", **kwargs): # pragma: no cover """Variogram 2D contour plot Plot a 2D contour plot of the experimental variogram. The experimental semi-variance values are spanned over a space - time lag meshgrid. This grid is (linear) interpolated onto the given resolution for visual reasons. Then, contour lines are calculated from the denser grid. Their number can be specified by *levels*. Parameters ---------- ax : matplotlib.AxesSubplot, None If None a new matplotlib.Figure will be created, otherwise the plot will be rendered into the given subplot. zoom_factor : float The experimental variogram will be interpolated onto a regular grid for visual reasons. The density of this plot can be set by zoom_factor. A factor of 10 will enlarge each of the axes by 10. Higher zoom_factors result in smoother contours, but are expansive in calculation time. levels : int Number of levels to be formed for finding contour lines. More levels result in more detailed plots, but are expansive in terms of calculation time. colors : str, list Will be passed down to matplotlib.pyplot.contour as *c* parameter. linewidths : float, list Will be passed down to matplotlib.pyplot.contour as *linewidths* parameter. method : str The method used for densifying the meshgrid. Can be one of 'fast' or 'precise'. Fast will use the scipy.ndimage.zoom method to incresae the node density. This is fast, but cannot interpolate *behind* any NaN occurrence. 'Precise' performs an actual linear interpolation between the nodes using scipy.interpolate.griddata. This takes more time, but the result is less smoothed out. kwargs : dict Other arguments that can be specific to *contour* or *contourf* type. Accepts *xlabel*, *ylabel*, *xlim* and *ylim* as of this writing. Returns ------- fig : matplotlib.Figure The Figure object used for rendering the contour plot. See Also -------- SpaceTimeVariogram.contourf """ return self._plot2d(kind='contour', ax=ax, zoom_factor=zoom_factor, levels=levels, colors=colors, method=method, linewidths=linewidths, **kwargs)
[docs] def contourf(self, ax=None, zoom_factor=100., levels=10, cmap='RdYlBu_r', method="fast", **kwargs): # pragma: no cover """Variogram 2D filled contour plot Plot a 2D filled contour plot of the experimental variogram. The experimental semi-variance values are spanned over a space - time lag meshgrid. This grid is (linear) interpolated onto the given resolution for visual reasons. Then, contour lines are calculated from the denser grid. Their number can be specified by *levels*. Finally, each contour region is filled with a color supplied by the specified *cmap*. Parameters ---------- ax : matplotlib.AxesSubplot, None If None a new matplotlib.Figure will be created, otherwise the plot will be rendered into the given subplot. zoom_factor : float The experimental variogram will be interpolated onto a regular grid for visual reasons. The density of this plot can be set by zoom_factor. A factor of 10 will enlarge each of the axes by 10. Higher zoom_factors result in smoother contours, but are expansive in calculation time. levels : int Number of levels to be formed for finding contour lines. More levels result in more detailed plots, but are expansive in terms of calculation time. cmap : str Will be passed down to matplotlib.pyplot.contourf as *cmap* parameter. Can be any valid color range supported by matplotlib. method : str The method used for densifying the meshgrid. Can be one of 'fast' or 'precise'. Fast will use the scipy.ndimage.zoom method to incresae the node density. This is fast, but cannot interpolate *behind* any NaN occurrence. 'Precise' performs an actual linear interpolation between the nodes using scipy.interpolate.griddata. This takes more time, but the result is less smoothed out. kwargs : dict Other arguments that can be specific to *contour* or *contourf* type. Accepts *xlabel*, *ylabel*, *xlim* and *ylim* as of this writing. Returns ------- fig : matplotlib.Figure The Figure object used for rendering the contour plot. See Also -------- SpaceTimeVariogram.contour """ return self._plot2d(kind='contourf', ax=ax, zoom_factor=zoom_factor, levels=levels, cmap=cmap, method=method, **kwargs)
def _plot2d(self, kind='contour', ax=None, zoom_factor=100., levels=10, method="fast", **kwargs): # pragma: no cover # get the backend used_backend = plotting.backend() if used_backend == 'matplotlib': return plotting.matplotlib_plot_2d(self, kind=kind, ax=ax, zoom_factor=zoom_factor, level=10, method=method, **kwargs) elif used_backend == 'plotly': return plotting.plotly_plot_2d(self, kind=kind, fig=ax, **kwargs) # if we reach this line, somethings wrong with plotting backend raise ValueError('The plotting backend has an undefined state.')
[docs] def marginals(self, plot=True, axes=None, sharey=True, include_model=False, **kwargs): # pragma: no cover """Plot marginal variograms Plots the two marginal variograms into a new or existing figure. The space marginal variogram is defined to be the variogram of temporal lag class 0, while the time marginal variogram uses only spatial lag class 0. In case the expected variability is not of same magnitude, the sharey parameter should be set to ``False`` in order to use separated y-axes. Parameters ---------- plot : bool .. deprecated:: 0.4 With version 0.4, this parameter will be removed If set to False, no matplotlib.Figure will be returned. Instead a tuple of the two marginal experimental variogram values is returned. axes : list Is either ``None`` to create a new matplotlib.Figure. Otherwise it has to be a list of two matplotlib.AxesSubplot instances, which will then be used for plotting. sharey : bool If True (default), the two marginal variograms will share their y-axis to increase comparability. Should be set to False in the variances are of different magnitude. include_model : bool If True, the marginal variogram models fitted to the respective axis are included into the plot. kwargs : dict Only kwargs accepted is ``figsize``, if ax is None. Anything else will be ignored. Returns ------- variograms : tuple If plot is False, a tuple of numpy.arrays are returned. These are the two experimental marginal variograms. plots : matplotlib.Figure If plot is True, the matplotlib.Figure will be returned. """ # handle plot if not plot: raise DeprecationWarning('The plot parameter will be removed.') return ( self.XMarginal.experimental, self.TMarginal.experimental ) # backend used_backend = plotting.backend() if used_backend == 'matplotlib': return plotting.matplotlib_marginal(self, axes=axes, sharey=sharey, include_model=include_model, **kwargs) elif used_backend == 'plotly': return plotting.plotly_marginal(self, fig=axes, include_model=include_model, **kwargs) # if we reach this line, somethings wrong with plotting backend raise ValueError('The plotting backend has an undefined state.')