blpd.utils

Miscellaneous utility functions, mostly for internal use.

  1"""
  2Miscellaneous utility functions,
  3mostly for internal use.
  4"""
  5
  6import inspect
  7import sys
  8from collections import namedtuple
  9from functools import partial
 10
 11import matplotlib as mpl
 12import matplotlib.pyplot as plt
 13import numpy as np
 14
 15if sys.version_info >= (3, 9):
 16    from functools import cache
 17else:
 18    from functools import lru_cache
 19
 20    cache = lru_cache(maxsize=None)
 21
 22# Potentially publically useful
 23__all__ = (
 24    "auto_bins",
 25    "bin_ds",
 26    "bin_values_xy",
 27    "moving_average",
 28    "numbify",
 29    "unnumbify",
 30)
 31
 32_SNIPPETS = {
 33    "bins_params": """
 34bins
 35    If `bins='auto'`, `blpd.utils.auto_bins` is used to construct the bins.
 36    Alternatively, you can pass bin specifications in any of the forms described
 37    [here](https://numpy.org/doc/stable/reference/generated/numpy.histogramdd.html).
 38    """.strip(),
 39    "sdim_params": """
 40sdim : str
 41    String representation of the set of dimensions.
 42    Valid options: `["xy", "xz", "yz", "xyz", "3d", "3-d"]`.
 43    """.strip(),
 44}
 45
 46
 47def _add_snippets(func):
 48    """Add docstring snippets from `_SNIPPETS` dict."""
 49    # ref: https://github.com/lukelbd/proplot/blob/master/proplot/internals/docstring.py
 50    func.__doc__ = inspect.getdoc(func)
 51    if func.__doc__:
 52        func.__doc__ %= {key: value.strip() for key, value in _SNIPPETS.items()}
 53    return func
 54
 55
 56def get_open_fig_labels():
 57    return [plt.figure(num).get_label() for num in plt.get_fignums()]
 58
 59
 60def check_fig_num(label, n=0):
 61    """Create a figure `num` (label) that is not already taken,
 62    appending `_{n}` if a figure with `label` already exists.
 63    """
 64    current_labels = get_open_fig_labels()
 65    if n == 0:
 66        labeln = label
 67    else:
 68        labeln = f"{label}_{n}"
 69    if labeln in current_labels:
 70        return check_fig_num(label, n=n + 1)
 71    else:
 72        return labeln
 73
 74
 75def to_sci_not(f):
 76    """Convert float `f` to a scientific notation string
 77    (not including the `$`)
 78    by using string formats `e` and `g`.
 79    """
 80    s_e = f"{f:.4e}"
 81    s_dec, s_pow_ = s_e.split("e")
 82    s_dec = f"{float(s_dec):.4g}"
 83    pow_ = int(s_pow_)
 84    return f"{s_dec} \\times 10^{{ {pow_} }}"
 85
 86
 87def sec_to_str(total_seconds):
 88    """Choose best format to display the run time.
 89
 90    Note: can't use `strftime` with `timedelta`s
 91    """
 92    hours, remainder = divmod(total_seconds, 60 * 60)
 93    minutes, seconds = divmod(remainder, 60)
 94
 95    hours = int(hours)
 96    minutes = int(minutes)
 97    seconds = int(seconds)  # most likely wouldn't run for seconds fractions, only multiples
 98
 99    h = "\u2095"  # unicode subscript letters
100    m = "\u2098"
101    s = "\u209B"
102    sep = "\u2009"  # thin space
103
104    if total_seconds <= 60:
105        s = f"{seconds}{s}"
106    elif total_seconds <= 60 * 60:
107        s = f"{minutes}{m}{sep}{seconds}{s}"
108    else:
109        s = f"{hours}{h}{sep}{minutes}{m}{sep}{seconds}{s}"
110    return s
111    # TODO: probably a better way to do this, checking for zeros and not displaying those
112    #       or programmatically (iteratively or recursively) build the necessary fmt string
113
114
115def moving_average(a, n=3, axis=0):
116    """Moving average of `a`.
117
118    Implementation taken from: <https://stackoverflow.com/a/14314054>.
119    For now only `axis=0` works.
120    """
121    if axis != 0:
122        raise NotImplementedError
123    ret = np.cumsum(a, axis=axis, dtype=float)
124    ret[n:] = ret[n:] - ret[:-n]
125    return ret[n - 1 :] / n
126
127
128def s_t_info(p):
129    """Create string to display info about run time and time step
130    using model parameters `p`.
131    """
132    t_tot = p["t_tot"]
133    dt = p["dt"]
134    N_t = p["N_t"]
135    if N_t >= 1000:
136        s_N_t = to_sci_not(N_t)
137    else:
138        s_N_t = str(N_t)
139    s_t_tot = sec_to_str(t_tot)
140    return f"$t = ${s_t_tot}, $\\delta t = {dt}$ s, $N_t = {s_N_t}$"
141
142
143def s_sample_size(p, *, N_p_only=False):
144    """Create string with sample size info (number of particles `Np` and `N=Np*Nt`)."""
145    Np, Nt = p["Np_tot"], p["N_t"]
146    N = Np * Nt
147    s_N_p = f"$N_p = {to_sci_not(Np)}$"
148    s_N = f"$N = {to_sci_not(N)}$"
149    if N_p_only:
150        return s_N_p
151    else:
152        return "\n".join([s_N_p, s_N])
153
154
155# TODO: fn to pre-process state for plots, removing data outside certain limits or with too high vel components ?
156
157
158def _auto_bins_1d(x, *, nx_max, std_mult, method, pos_only=False):
159    x_bar, x_std = x.mean(), x.std()
160    if std_mult is not None:
161        a = 0 if pos_only else x_bar - std_mult * x_std
162        b = x_bar + std_mult * x_std
163        x_range = a, b
164    else:
165        x_range = None
166
167    x_edges = np.histogram_bin_edges(x, bins=method, range=x_range)
168
169    if x_edges.size > nx_max + 1:
170        x_edges = np.linspace(*x_range, nx_max + 1)
171
172    return x_edges
173
174
175@_add_snippets
176def auto_bins(
177    positions,
178    sdim="xy",
179    *,
180    nbins_max_1d: int = 100,
181    std_mult: float = 2.0,
182    method: str = "auto",
183):
184    """Determine bin edges for a 2- or 3-d horizontal grid
185    that lets us focus on where most of the data is.
186
187    Parameters
188    ----------
189    positions
190        Container of 1-D arrays (in *x*,*y*[,*z*] order)
191        OR single NumPy array where columns are *x*,*y*[,*z*].
192    %(sdim_params)s
193    nbins_max_1d
194        Maximum number of bins in any direction/dim.
195    std_mult
196        Standard deviation multiplier.
197        Increase to capture more of the domain.
198        Or set to `None` to capture all of it.
199    method
200        Passed to :func:`numpy.histogram_bin_edges`.
201        Usually use `'auto'` or `'sqrt'`.
202    """
203    pos = np.asarray(positions)
204    assert pos.ndim == 2
205    if pos.shape[0] in [2, 3]:
206        # ^ need to flip (or we only have 2 or 3 data points, but that is unlikely!)
207        pos = pos.T
208
209    dims = dims_from_sdim(sdim)
210    idims = ["xyz".index(dim1) for dim1 in dims]
211
212    # TODO: optional `z_range=` arg for including certain range of heights, with defaults np.inf or None
213    # TODO: optional centering cell over certain x, y value (like 0, 0)
214
215    kwargs_1d = dict(nx_max=nbins_max_1d, std_mult=std_mult, method=method)
216    bins = [_auto_bins_1d(pos[:, idim1], pos_only=idim1 == 2, **kwargs_1d) for idim1 in idims]
217
218    return bins
219
220
221auto_bins_xy = partial(auto_bins, sdim="xy")
222auto_bins_xy.__doc__ = ":func:`utils.auto_bins` with ``sdim='xy'``"
223
224
225_Binned_values_xy = namedtuple("Binned_values_xy", "x y xe ye v")
226
227
228@_add_snippets
229def bin_values_xy(x, y, values, *, bins="auto", stat="median"):
230    """Using the `x` and `y` positions of the particles, bin `values`,
231    and calculate a representative value in each bin.
232
233    Parameters
234    ----------
235    %(bins_params)s
236    stat : str
237        Statistic to calculate on the binned values.
238    """
239    from scipy import stats
240
241    if bins == "auto":
242        bins = auto_bins_xy((x, y))
243
244    # 1. Concentration of LPD particles
245    H, xe0, ye0 = np.histogram2d(x, y, bins=bins)  # H is binned particle count
246    conc_p_rel = (H / H.max()).T  # TODO: really should divide by level at source (closest bin?)
247
248    # 2. In-particle values in each bin
249    ret = stats.binned_statistic_2d(x, y, values, statistic=stat, bins=bins)
250    v0 = ret.statistic.T  # it is returned with dim (nx, ny), we need y to be rows (dim 0)
251    xe = ret.x_edge
252    ye = ret.y_edge
253    # ^ these are cell edges
254    xc = xe[:-1] + 0.5 * np.diff(xe)
255    yc = ye[:-1] + 0.5 * np.diff(ye)
256    # ^ these are cell centers
257    assert np.allclose(xe, xe0) and np.allclose(ye, ye0)  # confirm bins same
258
259    # 3. Multiply in-particle stat by particle conc. to get final values
260    v = conc_p_rel * v0
261
262    return _Binned_values_xy(xc, yc, xe, ye, v)
263
264
265@_add_snippets
266def bin_ds(ds, sdim="xy", *, variables="all", bins="auto"):
267    """Bin a particles dataset. For example,
268    the `blpd.model.Model.to_xarray` dataset
269    or `blpd.chem.calc_relative_levels_fixed_oxidants` dataset.
270
271    Parameters
272    ----------
273    %(sdim_params)s
274    variables : str, list(str)
275        `'all'` to bin all variables, or pass a list of variable names.
276    %(bins_params)s
277    """
278    import xarray as xr
279    from scipy import stats
280
281    if not tuple(ds.dims) == ("ip",):
282        raise NotImplementedError("Must have only particle dim for now.")
283
284    if variables == "all":
285        vns = [vn for vn in ds.variables if vn not in list("xyz") + list(ds.dims) + list(ds.coords)]
286    else:
287        vns = variables
288
289    # Deal with dims
290    dims = dims_from_sdim(sdim)
291    dims_e = tuple(f"{dim1}e" for dim1 in dims)  # bin edges
292    dims_c = tuple(f"{dim1}" for dim1 in dims)  # bin centers
293    pos0 = tuple(ds[dim1].values for dim1 in "xyz")  # must pass to auto_grid in xyz order
294    pos = tuple(ds[dim1].values for dim1 in dims)  # for the stats
295    if bins == "auto":
296        bins = auto_bins(pos0, sdim)
297
298    # Compute statistics, using `binned_statistic_dd` so we can pass the previous
299    # result for efficiency.
300    res = {}  # {vn: {stat: ...}, ...}
301    ret = None  # for first run we don't have a result yet
302    stats_to_calc = ["mean", "median", "std", "count"]
303    # ^ note that sum can be recovered with mean and particle count
304    for vn in vns:
305        values = ds[vn].values
306        rets = {}
307        for stat in stats_to_calc:
308            ret = stats.binned_statistic_dd(
309                pos,
310                values,
311                statistic=stat,
312                bins=bins,
313                binned_statistic_result=ret,
314            )
315            rets[stat] = ret
316        res[vn] = rets
317
318        if vn == vns[0]:
319            stats_to_calc.remove("count")  # only need it once
320
321    # Construct coordinates dict
322    coords = {}
323    for dim1, bins, dim_e, dim_c in zip(dims, ret.bin_edges, dims_e, dims_c):
324        xe = bins
325        xc = xe[:-1] + 0.5 * np.diff(xe)
326        coords[dim_c] = (dim_c, xc, {"units": "m", "long_name": f"{dim1} (bin center)"})
327        coords[dim_e] = (dim_e, xe, {"units": "m", "long_name": f"{dim1} (bin edge)"})
328
329    # Create dataset of the binned statistics on the variables
330    ds = xr.Dataset(
331        coords=coords,
332        data_vars={
333            f"{vn}_{stat}": (
334                dims_c,
335                ret.statistic,
336                {
337                    "units": ds[vn].attrs.get("units", ""),
338                    "long_name": ds[vn].attrs.get("long_name", ""),
339                },
340            )
341            for vn, d in res.items()
342            for stat, ret in d.items()
343        },
344    )
345
346    # Add particle count
347    ds["Np"] = (dims_c, res[vns[0]]["count"].statistic, {"long_name": "Lagrangian particle count"})
348
349    return ds.transpose()  # y first so 2-d xarray plots just work
350
351
352bin_ds_xy = partial(bin_ds, sdim="xy")
353bin_ds_xy.__doc__ = ":func:`utils.bin_ds` with ``sdim='xy'``"
354
355
356def calc_t_out(p):
357    """Calculate time-since-release for each particle at the end of simulation.
358
359    This works for a simulation with constant particle release rate
360    (number of particles released per time step per source).
361
362    Parameters
363    ----------
364    p : dict
365        The model parameters dict.
366
367    Returns
368    -------
369    np.array
370        time-since-release in seconds
371    """
372    # unpack needed model options/params
373    Np_tot = p["Np_tot"]
374    dt = p["dt"]
375    N_t = p["N_t"]  # number of time steps
376    t_tot = p["t_tot"]
377    dNp_dt_ds = p["dNp_per_dt_per_source"]
378    N_s = p["N_sources"]
379
380    # Calculate time-since-release for every particle
381    # ! the method here is based on time as outer loop
382    # ! and will be incorrect if that changes
383    # t_out = np.r_[[[(k+1)*numpart for p in range(numpart)] for k in range(N)]].flat
384    t_out = np.ravel(np.tile(np.arange(dt, N_t * dt + dt, dt)[:, np.newaxis], dNp_dt_ds * N_s))
385    # ^ need to find the best way to do this!
386    # note: apparently `(N_t+1)*dt` does not give the same stop as `N_t*dt+dt` sometimes (precision thing?)
387
388    # t_out = (t_out[::-1]+1) * dt
389    t_out = t_out[::-1]
390
391    # sanity checks
392    assert np.isclose(t_tot, t_out[0])  # the first particle has been out for the full time
393    assert t_out.size == Np_tot
394
395    return t_out
396
397
398def load_p(ds):
399    """Load the model parameters/info `dict` from JSON stored in `ds` :class:`xarray.Dataset`."""
400    import json
401
402    return json.loads(ds.attrs["p_json"])
403
404
405def maybe_log_cnorm(log_cnorm=True, levels=30, vmin=None, vmax=100):
406    if log_cnorm:
407        norm = mpl.colors.LogNorm(vmin=vmin, vmax=vmax)
408        if levels is not None:
409            # https://matplotlib.org/3.1.3/gallery/images_contours_and_fields/contourf_log.html
410            # https://matplotlib.org/3.1.3/api/ticker_api.html#matplotlib.ticker.LogLocator
411            # locator = mpl.ticker.LogLocator(subs=(0.25, 0.5, 1.0))  # another way to get more levels in between powers of 10
412            nlevels = levels if isinstance(levels, int) else np.asarray(levels).size
413            locator = mpl.ticker.LogLocator(subs="all", numticks=nlevels)
414            # TODO: although this^ works, the ticks are not all getting labeled. need to fix.
415        else:
416            locator = None
417    else:
418        norm = mpl.colors.Normalize(vmin=vmin, vmax=vmax)
419        locator = None
420
421    return norm, locator  # TODO: named tuple would be nicer
422
423
424def maybe_new_figure(try_num: str, ax=None, subplot_kw=None):
425    if subplot_kw is None:
426        subplot_kw = {}
427
428    if ax is None:
429        num = check_fig_num(try_num)
430        fig, ax = plt.subplots(num=num, subplot_kw=subplot_kw)
431    else:
432        fig = ax.get_figure()
433
434    return fig, ax
435
436
437def check_sdim(sdim: str):
438    """Validate `sdim`."""
439    valid_sdim = ["xy", "xz", "yz", "xyz", "3d", "3-d"]
440    if sdim not in valid_sdim:
441        raise ValueError(
442            "for `sdim`, pick 2 or 3 from 'x', 'y', and 'z'. For example, 'xy'. "
443            f"The full set of valid options is {valid_sdim}. "
444            "The last two are aliases for 'xyz'."
445        )
446
447
448def dims_from_sdim(sdim: str):
449    """Return tuple of dims corresponding to `sdim`."""
450    check_sdim(sdim)  # first validate
451    if sdim in ("xyz", "3d", "3-d"):
452        dims = list("xyz")
453    else:
454        dims = list(sdim)
455
456    return dims
457
458
459# TODO: float precision option
460def numbify(d, zerod_only=False):
461    """Convert dict `d` to Numba-suitable format.
462
463    Dict values should be NumPy arrays or individual floats.
464
465    References
466    ----------
467    * <https://numba.pydata.org/numba-doc/dev/reference/pysupported.html#typed-dict>
468    """
469    import numba
470
471    float64_array = numba.types.float64[:]  # maybe would be more efficient to pass size
472    float64 = numba.types.float64
473    d_nb = numba.typed.Dict.empty(
474        key_type=numba.types.unicode_type,
475        value_type=float64_array if not zerod_only else float64,
476    )
477    for k, v in d.items():
478        # print(k, v, np.asarray(v).shape, np.asarray(v).shape == ())
479
480        # d_nb[k] = np.asarray(v, dtype=np.float64)  # will convert bool to 0/1
481        # d_nb[k] = np.asarray(v).astype(np.float64)
482
483        # np.asarray leaves float/int with size=1 but shape=() so numba doesn't work
484        # so hacking this for now
485        if not zerod_only:
486            if np.asarray(v).shape == ():
487                x = np.float64((1,))
488                x[:] = np.float64(v)
489                d_nb[k] = x
490            else:
491                # d_nb[k] = np.asarray(v, dtype=np.float64)
492                d_nb[k] = np.asarray(v).astype(np.float64)
493        else:
494            d_nb[k] = np.float64(v)
495
496    # numba.typed.Dict.empty() creates a normal dict if it thinks jit is disabled
497    # ref: https://github.com/numba/numba/blob/868b8e3e8d034dac0440b75ca31595e07f632d27/numba/typed/typeddict.py#L95
498    assert isinstance(d_nb, numba.typed.Dict)
499
500    return d_nb
501
502
503def unnumbify(d_nb):
504    """Convert numba dict `d_nb` to a normal dict of numpy arrays, etc."""
505    import numba
506
507    if not isinstance(d_nb, numba.typed.Dict):
508        raise TypeError("this fn is for dicts of type `numba.typed.Dict`")
509
510    d = {}
511    for k, v in d_nb.items():
512        # if isinstance(v, numba.types.float64[:]):
513        if v.shape == (1,):
514            d[k] = v[0]
515        else:
516            d[k] = v
517
518    return d
519
520
521def disable_numba():
522    """Tell numba to disable JIT.
523
524    References
525    ----------
526    * disabling JIT: https://numba.pydata.org/numba-doc/latest/user/troubleshoot.html#disabling-jit-compilation
527    * reloading config: https://github.com/numba/numba/blob/868b8e3e8d034dac0440b75ca31595e07f632d27/numba/core/config.py#L369
528    """
529    import os
530
531    import numba
532
533    os.environ.update({"NUMBA_DISABLE_JIT": str(1)})
534    numba.config.reload_config()
535    assert numba.config.DISABLE_JIT == 1  # pylint: disable=no-member
536
537
538def enable_numba():
539    """Tell numba to enable JIT.
540    See references in `disable_numba`.
541    """
542    import os
543
544    import numba
545
546    # By default (when numba is loaded normally), this env var is not set, so remove it
547    os.environ.pop("NUMBA_DISABLE_JIT", default=None)  # avoid error
548    numba.config.reload_config()
549    assert numba.config.DISABLE_JIT == 0  # pylint: disable=no-member
def auto_bins( positions, sdim='xy', *, nbins_max_1d: int = 100, std_mult: float = 2.0, method: str = 'auto'):
176@_add_snippets
177def auto_bins(
178    positions,
179    sdim="xy",
180    *,
181    nbins_max_1d: int = 100,
182    std_mult: float = 2.0,
183    method: str = "auto",
184):
185    """Determine bin edges for a 2- or 3-d horizontal grid
186    that lets us focus on where most of the data is.
187
188    Parameters
189    ----------
190    positions
191        Container of 1-D arrays (in *x*,*y*[,*z*] order)
192        OR single NumPy array where columns are *x*,*y*[,*z*].
193    %(sdim_params)s
194    nbins_max_1d
195        Maximum number of bins in any direction/dim.
196    std_mult
197        Standard deviation multiplier.
198        Increase to capture more of the domain.
199        Or set to `None` to capture all of it.
200    method
201        Passed to :func:`numpy.histogram_bin_edges`.
202        Usually use `'auto'` or `'sqrt'`.
203    """
204    pos = np.asarray(positions)
205    assert pos.ndim == 2
206    if pos.shape[0] in [2, 3]:
207        # ^ need to flip (or we only have 2 or 3 data points, but that is unlikely!)
208        pos = pos.T
209
210    dims = dims_from_sdim(sdim)
211    idims = ["xyz".index(dim1) for dim1 in dims]
212
213    # TODO: optional `z_range=` arg for including certain range of heights, with defaults np.inf or None
214    # TODO: optional centering cell over certain x, y value (like 0, 0)
215
216    kwargs_1d = dict(nx_max=nbins_max_1d, std_mult=std_mult, method=method)
217    bins = [_auto_bins_1d(pos[:, idim1], pos_only=idim1 == 2, **kwargs_1d) for idim1 in idims]
218
219    return bins

Determine bin edges for a 2- or 3-d horizontal grid that lets us focus on where most of the data is.

Parameters
  • positions: Container of 1-D arrays (in x,y[,z] order) OR single NumPy array where columns are x,y[,z].
  • sdim (str): String representation of the set of dimensions. Valid options: ["xy", "xz", "yz", "xyz", "3d", "3-d"].
  • nbins_max_1d: Maximum number of bins in any direction/dim.
  • std_mult: Standard deviation multiplier. Increase to capture more of the domain. Or set to None to capture all of it.
  • method: Passed to numpy.histogram_bin_edges(). Usually use 'auto' or 'sqrt'.
def bin_ds(ds, sdim='xy', *, variables='all', bins='auto'):
266@_add_snippets
267def bin_ds(ds, sdim="xy", *, variables="all", bins="auto"):
268    """Bin a particles dataset. For example,
269    the `blpd.model.Model.to_xarray` dataset
270    or `blpd.chem.calc_relative_levels_fixed_oxidants` dataset.
271
272    Parameters
273    ----------
274    %(sdim_params)s
275    variables : str, list(str)
276        `'all'` to bin all variables, or pass a list of variable names.
277    %(bins_params)s
278    """
279    import xarray as xr
280    from scipy import stats
281
282    if not tuple(ds.dims) == ("ip",):
283        raise NotImplementedError("Must have only particle dim for now.")
284
285    if variables == "all":
286        vns = [vn for vn in ds.variables if vn not in list("xyz") + list(ds.dims) + list(ds.coords)]
287    else:
288        vns = variables
289
290    # Deal with dims
291    dims = dims_from_sdim(sdim)
292    dims_e = tuple(f"{dim1}e" for dim1 in dims)  # bin edges
293    dims_c = tuple(f"{dim1}" for dim1 in dims)  # bin centers
294    pos0 = tuple(ds[dim1].values for dim1 in "xyz")  # must pass to auto_grid in xyz order
295    pos = tuple(ds[dim1].values for dim1 in dims)  # for the stats
296    if bins == "auto":
297        bins = auto_bins(pos0, sdim)
298
299    # Compute statistics, using `binned_statistic_dd` so we can pass the previous
300    # result for efficiency.
301    res = {}  # {vn: {stat: ...}, ...}
302    ret = None  # for first run we don't have a result yet
303    stats_to_calc = ["mean", "median", "std", "count"]
304    # ^ note that sum can be recovered with mean and particle count
305    for vn in vns:
306        values = ds[vn].values
307        rets = {}
308        for stat in stats_to_calc:
309            ret = stats.binned_statistic_dd(
310                pos,
311                values,
312                statistic=stat,
313                bins=bins,
314                binned_statistic_result=ret,
315            )
316            rets[stat] = ret
317        res[vn] = rets
318
319        if vn == vns[0]:
320            stats_to_calc.remove("count")  # only need it once
321
322    # Construct coordinates dict
323    coords = {}
324    for dim1, bins, dim_e, dim_c in zip(dims, ret.bin_edges, dims_e, dims_c):
325        xe = bins
326        xc = xe[:-1] + 0.5 * np.diff(xe)
327        coords[dim_c] = (dim_c, xc, {"units": "m", "long_name": f"{dim1} (bin center)"})
328        coords[dim_e] = (dim_e, xe, {"units": "m", "long_name": f"{dim1} (bin edge)"})
329
330    # Create dataset of the binned statistics on the variables
331    ds = xr.Dataset(
332        coords=coords,
333        data_vars={
334            f"{vn}_{stat}": (
335                dims_c,
336                ret.statistic,
337                {
338                    "units": ds[vn].attrs.get("units", ""),
339                    "long_name": ds[vn].attrs.get("long_name", ""),
340                },
341            )
342            for vn, d in res.items()
343            for stat, ret in d.items()
344        },
345    )
346
347    # Add particle count
348    ds["Np"] = (dims_c, res[vns[0]]["count"].statistic, {"long_name": "Lagrangian particle count"})
349
350    return ds.transpose()  # y first so 2-d xarray plots just work

Bin a particles dataset. For example, the blpd.model.Model.to_xarray dataset or blpd.chem.calc_relative_levels_fixed_oxidants dataset.

Parameters
  • sdim (str): String representation of the set of dimensions. Valid options: ["xy", "xz", "yz", "xyz", "3d", "3-d"].
  • variables (str, list(str)): 'all' to bin all variables, or pass a list of variable names.
  • bins: If bins='auto', auto_bins is used to construct the bins. Alternatively, you can pass bin specifications in any of the forms described here.
def bin_values_xy(x, y, values, *, bins='auto', stat='median'):
229@_add_snippets
230def bin_values_xy(x, y, values, *, bins="auto", stat="median"):
231    """Using the `x` and `y` positions of the particles, bin `values`,
232    and calculate a representative value in each bin.
233
234    Parameters
235    ----------
236    %(bins_params)s
237    stat : str
238        Statistic to calculate on the binned values.
239    """
240    from scipy import stats
241
242    if bins == "auto":
243        bins = auto_bins_xy((x, y))
244
245    # 1. Concentration of LPD particles
246    H, xe0, ye0 = np.histogram2d(x, y, bins=bins)  # H is binned particle count
247    conc_p_rel = (H / H.max()).T  # TODO: really should divide by level at source (closest bin?)
248
249    # 2. In-particle values in each bin
250    ret = stats.binned_statistic_2d(x, y, values, statistic=stat, bins=bins)
251    v0 = ret.statistic.T  # it is returned with dim (nx, ny), we need y to be rows (dim 0)
252    xe = ret.x_edge
253    ye = ret.y_edge
254    # ^ these are cell edges
255    xc = xe[:-1] + 0.5 * np.diff(xe)
256    yc = ye[:-1] + 0.5 * np.diff(ye)
257    # ^ these are cell centers
258    assert np.allclose(xe, xe0) and np.allclose(ye, ye0)  # confirm bins same
259
260    # 3. Multiply in-particle stat by particle conc. to get final values
261    v = conc_p_rel * v0
262
263    return _Binned_values_xy(xc, yc, xe, ye, v)

Using the x and y positions of the particles, bin values, and calculate a representative value in each bin.

Parameters
  • bins: If bins='auto', auto_bins is used to construct the bins. Alternatively, you can pass bin specifications in any of the forms described here.
  • stat (str): Statistic to calculate on the binned values.
def moving_average(a, n=3, axis=0):
116def moving_average(a, n=3, axis=0):
117    """Moving average of `a`.
118
119    Implementation taken from: <https://stackoverflow.com/a/14314054>.
120    For now only `axis=0` works.
121    """
122    if axis != 0:
123        raise NotImplementedError
124    ret = np.cumsum(a, axis=axis, dtype=float)
125    ret[n:] = ret[n:] - ret[:-n]
126    return ret[n - 1 :] / n

Moving average of a.

Implementation taken from: https://stackoverflow.com/a/14314054. For now only axis=0 works.

def numbify(d, zerod_only=False):
461def numbify(d, zerod_only=False):
462    """Convert dict `d` to Numba-suitable format.
463
464    Dict values should be NumPy arrays or individual floats.
465
466    References
467    ----------
468    * <https://numba.pydata.org/numba-doc/dev/reference/pysupported.html#typed-dict>
469    """
470    import numba
471
472    float64_array = numba.types.float64[:]  # maybe would be more efficient to pass size
473    float64 = numba.types.float64
474    d_nb = numba.typed.Dict.empty(
475        key_type=numba.types.unicode_type,
476        value_type=float64_array if not zerod_only else float64,
477    )
478    for k, v in d.items():
479        # print(k, v, np.asarray(v).shape, np.asarray(v).shape == ())
480
481        # d_nb[k] = np.asarray(v, dtype=np.float64)  # will convert bool to 0/1
482        # d_nb[k] = np.asarray(v).astype(np.float64)
483
484        # np.asarray leaves float/int with size=1 but shape=() so numba doesn't work
485        # so hacking this for now
486        if not zerod_only:
487            if np.asarray(v).shape == ():
488                x = np.float64((1,))
489                x[:] = np.float64(v)
490                d_nb[k] = x
491            else:
492                # d_nb[k] = np.asarray(v, dtype=np.float64)
493                d_nb[k] = np.asarray(v).astype(np.float64)
494        else:
495            d_nb[k] = np.float64(v)
496
497    # numba.typed.Dict.empty() creates a normal dict if it thinks jit is disabled
498    # ref: https://github.com/numba/numba/blob/868b8e3e8d034dac0440b75ca31595e07f632d27/numba/typed/typeddict.py#L95
499    assert isinstance(d_nb, numba.typed.Dict)
500
501    return d_nb

Convert dict d to Numba-suitable format.

Dict values should be NumPy arrays or individual floats.

References
def unnumbify(d_nb):
504def unnumbify(d_nb):
505    """Convert numba dict `d_nb` to a normal dict of numpy arrays, etc."""
506    import numba
507
508    if not isinstance(d_nb, numba.typed.Dict):
509        raise TypeError("this fn is for dicts of type `numba.typed.Dict`")
510
511    d = {}
512    for k, v in d_nb.items():
513        # if isinstance(v, numba.types.float64[:]):
514        if v.shape == (1,):
515            d[k] = v[0]
516        else:
517            d[k] = v
518
519    return d

Convert numba dict d_nb to a normal dict of numpy arrays, etc.