'''This file is part of AeoLiS.
AeoLiS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
AeoLiS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with AeoLiS. If not, see <http://www.gnu.org/licenses/>.
AeoLiS Copyright (C) 2015 Bas Hoonhout
bas.hoonhout@deltares.nl b.m.hoonhout@tudelft.nl
Deltares Delft University of Technology
Unit of Hydraulic Engineering Faculty of Civil Engineering and Geosciences
Boussinesqweg 1 Stevinweg 1
2629 HVDelft 2628CN Delft
The Netherlands The Netherlands
'''
import re
import numpy as np
#import numba
#import numba_scipy
import scipy
[docs]def isiterable(x):
'''Check if variable is iterable'''
if isinstance(x, str):
return False
try:
_ = [i for i in x]
except:
return False
return True
[docs]def makeiterable(x):
'''Ensure that variable is iterable'''
if not isiterable(x):
if x is None:
x = np.asarray([])
else:
x = np.asarray([x])
return np.asarray(x)
[docs]def isarray(x):
'''Check if variable is an array'''
if isinstance(x, str):
return False
if hasattr(x, '__getitem__'):
return True
else:
return False
[docs]def interp_array(x, xp, fp, circular=False, **kwargs):
'''Interpolate multiple time series at once
Parameters
----------
x : array_like
The x-coordinates of the interpolated values.
xp : 1-D sequence of floats
The x-coordinates of the data points, must be increasing.
fp : 2-D sequence of floats
The y-coordinates of the data points, same length as ``xp``.
circular : bool
Use the :func:`interp_circular` function rather than the
:func:`numpy.interp` function.
kwargs : dict
Keyword options to the :func:`numpy.interp` function
Returns
-------
ndarray
The interpolated values, same length as second dimension of ``fp``.
'''
f = np.zeros((fp.shape[1],))
for i in range(fp.shape[1]):
if circular:
f[i] = interp_circular(x, xp, fp[:,i], **kwargs)
else:
f[i] = np.interp(x, xp, fp[:,i], **kwargs)
return f
[docs]def interp_circular(x, xp, fp, **kwargs):
'''One-dimensional linear interpolation.
Returns the one-dimensional piecewise linear interpolant to a
function with given values at discrete data-points. Values beyond
the limits of ``x`` are interpolated in circular manner. For
example, a value of ``x > x.max()`` evaluates as ``f(x-x.max())``
assuming that ``x.max() - x < x.max()``.
Parameters
----------
x : array_like
The x-coordinates of the interpolated values.
xp : 1-D sequence of floats
The x-coordinates of the data points, must be increasing.
fp : 1-D sequence of floats
The y-coordinates of the data points, same length as ``xp``.
kwargs : dict
Keyword options to the :func:`numpy.interp` function
Returns
-------
y : {float, ndarray}
The interpolated values, same shape as ``x``.
Raises
------
ValueError
If ``xp`` and ``fp`` have different length
'''
xmin = xp.min()
xmax = xp.max()
xrng = xmax - xmin
x = xmin + np.mod(x - xmax - 1., xrng + 1.)
return np.interp(x, xp, fp, **kwargs)
[docs]def normalize(x, ref=None, axis=0, fill=0.):
'''Normalize array
Normalizes an array to make it sum to unity over a specific
axis. The procedure is safe for dimensions that sum to zero. These
dimensions return the ``fill`` value instead.
Parameters
----------
x : array_like
The array to be normalized
ref : array_like, optional
Alternative normalization reference, if not specified, the sum of x is used
axis : int, optional
The normalization axis (default: 0)
fill : float, optional
The return value for all-zero dimensions (default: 0.)
'''
x = makeiterable(x)
if ref is None:
ref = np.sum(x, axis=axis, keepdims=True).repeat(x.shape[axis], axis=axis)
ix = ref != 0.
y = np.zeros(x.shape) + fill
y[ix] = x[ix] / ref[ix]
return y
[docs]def prevent_tiny_negatives(x, max_error=1e-10, replacement=0.):
'''Replace tiny negative values in array
Parameters
----------
x : np.ndarray
Array with potential tiny negative values
max_error : float
Maximum absolute value to be replaced
replacement : float
Replacement value
Returns
-------
np.ndarray
Array with tiny negative values removed
'''
ix = (x < 0.) & (x > -max_error)
x[ix] = replacement
return x
[docs]def print_value(val, fill='<novalue>'):
'''Construct a string representation from an arbitrary value
Parameters
----------
val : misc
Value to be represented as string
fill : str, optional
String representation used in case no value is given
Returns
-------
str
String representation of value
'''
if isiterable(val):
return ' '.join([print_value(x) for x in val])
elif val is None:
return fill
elif isinstance(val, bool):
return 'T' if val else 'F'
elif isinstance(val, int):
return '%d' % val
elif isinstance(val, float):
if val < 1.:
return '%0.6f' % val
else:
return '%0.2f' % val
else:
return str(val)
[docs]def apply_mask(arr, mask):
'''Apply complex mask
The real part of the complex mask is multiplied with the input
array. Subsequently the imaginary part is added and the result
returned.
The shape of the mask is assumed to match the first few dimensions
of the input array. If the input array is larger than the mask,
the mask is repeated for any additional dimensions.
Parameters
----------
arr : numpy.ndarray
Array or matrix to which the mask needs to be applied
mask : numpy.ndarray
Array or matrix with complex mask values
Returns
-------
arr : numpy.ndarray
Array or matrix to which the mask is applied
'''
# repeat mask to match shape of input array
mask = np.asarray(mask)
shp = arr.shape[mask.ndim:]
for d in shp:
mask = mask.reshape(mask.shape + tuple([1])).repeat(d, axis=-1)
# apply mask
arr *= np.real(mask)
arr += np.imag(mask)
return arr
[docs]def rotate(x, y, alpha, origin=(0,0)):
'''Rotate a matrix over given angle around given origin'''
xr = x - origin[0]
yr = y - origin[1]
a = alpha / 180. * np.pi
R = np.asmatrix([[np.cos(a), -np.sin(a)],
[np.sin(a), np.cos(a)]])
xy = np.concatenate((xr.reshape((-1,1)),
yr.reshape((-1,1))), axis=1) * R
return (np.asarray(xy[:,0].reshape(x.shape) + origin[0]),
np.asarray(xy[:,1].reshape(y.shape) + origin[1]))
# @numba.njit
def sc_kv(v, z):
return scipy.special.kv(v, z)
[docs]def calc_grain_size(p, s, percent):
'''Calculate grain size characteristics based on mass in each fraction
Calculate grain size distribution for each cell based on weight
distribution over the fractions. Interpolates to the requested percentage
in the grain size distribution. For example, percent=50 will result
in calculation of the D50. Calculation is only executed for the top layer
Parameters
----------
s : dict
Spatial grids
p : dict
Model configuration parameters
percent : float
Requested percentage in grain size dsitribution
Returns
-------
array
grain size per grid cell
'''
from scipy.interpolate import interp1d
mass = s['mass'][:,:,0,:] # only select upper, surface layer because that is the relevant layer for transport
D = np.zeros((mass.shape[0], mass.shape[1]))
for yi in range(mass.shape[0]):
for xi in range(mass.shape[1]):
diameters = np.insert(p['grain_size'], 0, 0)
cummulative_weights = np.cumsum(np.insert(mass[yi, xi,:], 0, 0))
percentages = 100*cummulative_weights/np.max(cummulative_weights)
f_linear = interp1d(list(percentages), diameters, fill_value='extrapolate') # get interpolation function
# Retrieve grain size characteristics based on interpolation
D[yi, xi] = f_linear(percent)
return D
[docs]def calc_mean_grain_size(p, s):
'''Calculate mean grain size based on mass in each fraction
Calculate mean grain size for each cell based on weight distribution
over the fractions. Calculation is only executed for the top layer.
Parameters
----------
s : dict
Spatial grids
p : dict
Model configuration parameters
percent : float
Requested percentage in grain size dsitribution
Returns
-------
array
mean grain size per grid cell
'''
mass = s['mass'][:,:,0,:] # only select upper, surface layer because that is the relevant layer for transport
D_mean = np.zeros((mass.shape[0], mass.shape[1]))
for yi in range(mass.shape[0]):
for xi in range(mass.shape[1]):
diameters = p['grain_size']
weights = mass[yi, xi,:]/ np.sum(mass[yi, xi,:])
# Retrieve mean grain size based on weight of mass
D_mean[yi, xi] = np.sum(diameters*weights)
return D_mean