blpd.bees

Bee flight model – the "b" in blpd.

Based on the Lévy flight random walk model as implemented in Fuentes et al. (2016) but with some enhancements.

  1"""
  2Bee flight model – the "b" in ``blpd``.
  3
  4Based on the Lévy flight random walk model as implemented in
  5[Fuentes et al. (2016)](https://doi.org/10.1016/j.atmosenv.2016.07.002)
  6but with some enhancements.
  7"""
  8
  9import math
 10
 11import numpy as np
 12from scipy import stats
 13
 14__all__ = (
 15    "draw_step_length",
 16    "flight",
 17)
 18
 19
 20PI = math.pi
 21
 22
 23def draw_step_length(*, l_0=1.0, mu=2.0, q=None):
 24    r"""
 25    Draw a step length `l` from the power law distribution
 26
 27    .. math::
 28       P(l) = (l/l_0)^{-\mu}
 29
 30    Parameters
 31    ----------
 32    l_0 : float
 33        The minimum step size.
 34    mu : float
 35        Distribution shape parameter.
 36
 37        $3 \implies$ Brownian motion
 38
 39        $2 \implies$ "super diffusive Lévy walk"
 40    q : float, array, optional
 41        Random number in $[0, 1)$. By default, we draw a single value from uniform.
 42    """
 43    if mu <= 1 or mu > 3:
 44        raise ValueError(f"`mu` should be in (1, 3] but is {mu!r}")
 45
 46    if q is None:
 47        q = np.random.rand()  # draw from [0, 1) (uniform)
 48
 49    l = l_0 * (1 - q) ** (1 / (1 - mu))  # noqa: E741
 50    # ^ note 1-mu not mu, which comes from the inverse of the CDF
 51
 52    return l
 53
 54
 55def flight(
 56    n,
 57    *,
 58    x0=(0, 0),
 59    l_0=1.0,
 60    mu=2.0,
 61    l_max=None,
 62    heading0=PI / 2,
 63    heading_model="uniform",
 64    heading_model_kwargs=None,
 65):
 66    r"""
 67    Parameters
 68    ----------
 69    n : int
 70        Number of steps taken in the simulated flight.
 71        The final trajectory will have `n`+1 points.
 72    x0 : array_like
 73        xy-coordinates of the starting location.
 74    l_0, mu : float
 75        Parameters of the step length power law distribution.
 76    l_max : float, optional
 77        Used to clip the step size if provided.
 78    heading0 : float
 79        Initial heading of the flight.
 80        Default: $\pi/2$
 81        (in standard polar coordinates, so, north).
 82    heading_model : {'uniform', 'truncnorm'}
 83        Relative heading model.
 84        By default, directions for each step are drawn from a uniform random distribution.
 85        With the truncnorm model, there is a preference for a certain relative change in direction
 86        (by default, a preference for continuing in the current direction).
 87    heading_model_kwargs : dict
 88        Additional parameters for the relative heading model.
 89
 90        For the
 91        [truncnorm](https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.truncnorm.html)
 92        heading model (`heading_model='truncnorm'`):
 93        - `'std'` (default `1.5`)
 94        - `'mean'` (default `0`).
 95    """
 96    assert np.asarray(x0).size == 2, "xy-coordinates"
 97
 98    if heading_model_kwargs is None:
 99        heading_model_kwargs = {}
100
101    # Draw steps
102    q = np.random.rand(n)
103    steps = draw_step_length(l_0=l_0, mu=mu, q=q)
104
105    # Clip steps if desired
106    if l_max is not None:
107        np.clip(steps, None, l_max, out=steps)
108
109    # Draw (relative) headings
110    if heading_model == "uniform":
111        headings = np.random.uniform(-PI, PI, size=n - 1)
112    elif heading_model == "truncnorm":
113        std = heading_model_kwargs.get("std", 1.5)
114        mean = heading_model_kwargs.get("mean", 0)
115        assert -PI < mean < PI, "`mean` for truncnorm must be in (-pi, pi)"
116        clip_a, clip_b = -PI, PI
117        a, b = (clip_a - mean) / std, (clip_b - mean) / std
118        dist = stats.truncnorm(a, b, scale=PI / b)
119        # print(f"99.99th: {dist.ppf(0.9999)}")
120        headings = dist.rvs(n - 1)
121    else:
122        raise NotImplementedError
123
124    # Convert heading -> direction
125    # (Heading is relative to current direction)
126    angles = np.full((n,), heading0)
127    for i, heading in enumerate(headings):
128        angles[i + 1] = angles[i] + heading
129
130    # Note: the above loop can be replaced by
131    # `angles[1:] = np.cumsum(headings) + heading0`
132    # (similarly for below loop)
133
134    # Convert angles to [0, 2pi) range
135    min_angle = angles.min()
136    # Find the multiple of 2pi needed to make all values positive
137    n_ = -math.floor(angles.min() / (2 * PI)) if min_angle < 0 else 0
138    angles_mod = np.mod(angles + n_ * 2 * PI, 2 * PI)
139    assert np.allclose(np.cos(angles), np.cos(angles_mod))
140    angles = angles_mod
141
142    # # Investigate the angles
143    # import matplotlib.pyplot as plt
144    # fig, [ax1, ax2, ax3] = plt.subplots(1, 3, figsize=(9, 3.5))
145    # ax1.hist(headings); ax1.set_title("headings")
146    # ax2.hist(angles); ax2.set_title("angles")
147    # ax3.hist(angles_mod); ax3.set_title("angles % 2pi")
148    # fig.tight_layout()
149
150    # Walk
151    x = np.full((n + 1,), x0[0], dtype=float)
152    y = np.full((n + 1,), x0[1], dtype=float)
153    for i, (step, angle) in enumerate(zip(steps, angles)):
154        dx = step * math.cos(angle)
155        dy = step * math.sin(angle)
156        x[i + 1] = x[i] + dx
157        y[i + 1] = y[i] + dy
158
159    return x, y
160
161
162# Note: `scipy.stats.powerlaw` doesn't let its param `a` be negative, so can't use it
163
164
165class _step_length_dist_gen(stats.rv_continuous):
166    """Step length distribution class using the `scipy.stats` machinery."""
167
168    def _pdf(self, x, l_0, mu):
169        c = -l_0 / (1 - mu)
170        p = (x / l_0) ** (-mu)
171        return p / c
172
173    def _cdf(self, x, l_0, mu):
174        # https://www.wolframalpha.com/input/?i=anti-derivative+%28l%2Fc%29%5E-mu+dl
175        # https://www.wolframalpha.com/input/?i=anti-derivative+%28l%2Fc%29%5E-mu+dl%2C+from+l%3Dc+to+x
176        c = -l_0 / (1 - mu)
177        # F = x * (x/l_0)**(-mu) / (1-mu) + c
178        F = l_0**mu * x ** (1 - mu) / (1 - mu) + c
179        # F[x < l_0] = 0
180        return F / c
181
182    def _ppf(self, q, l_0, mu):
183        c = -l_0 / (1 - mu)
184        x = ((q * c * (1 - mu) + l_0) * l_0**-mu) ** (1.0 / (1.0 - mu))
185
186        return x
187
188    def _argcheck(self, *args):
189        l_0, mu = args  # unpack
190
191        # Minimum step length `l_0` must be positive
192        cond_l_0 = np.asarray(l_0) > 0
193
194        # Shape parameter `mu` is supposed to be in range [1, 3]
195        # not sure why it can't just be positive tho...
196        arr_mu = np.asarray(mu)
197        cond_mu = arr_mu > 1 and arr_mu <= 3
198
199        arr_l_0 = np.asarray(l_0)
200        cond_mu_2 = 1 * (1 - arr_mu) + arr_l_0 >= 0
201
202        cond = 1
203        for cond_ in [cond_l_0, cond_mu, cond_mu_2]:
204            cond = np.logical_and(cond, cond_)
205
206        return cond
207
208    def _get_support(self, *args, **kwargs):
209        return args[0], np.inf
210
211
212step_length_dist = _step_length_dist_gen(name="step_length_dist")
213
214
215# scaling with `scale` not working for pdf/cdf
216# need to specify them differently I guess
217class _step_length_dist2_gen(stats.rv_continuous):
218    """Step length distribution class using the `scipy.stats` machinery."""
219
220    # Instead of including l_0 as a param,
221    # we just let scipy.stats do that via the `scale` param
222
223    def _pdf(self, x, mu):
224        c = -1.0 / (1 - mu)  # normalization const
225        return (x**-mu) / c
226
227    def _cdf(self, x, mu):
228        c = -1.0 / (1 - mu)
229        F = (x ** (1 - mu) - 1) / (1 - mu)
230        return F / c
231
232    def _ppf(self, q, mu):
233        c = -1.0 / (1 - mu)
234        x = (q * c * (1 - mu) + 1) ** (1.0 / (1.0 - mu))
235        return x
236
237    def _argcheck(self, *args, **kwargs):
238        mu = args  # unpack
239
240        arr_mu = np.asarray(mu)
241        cond_mu = arr_mu > 1 and arr_mu <= 3
242
243        return cond_mu
244
245
246step_length_dist2 = _step_length_dist2_gen(a=1.0, name="step_length_dist2")
247
248
249def _compare_step_dists(mu=2.0, l_0=1.0):
250    """Compare the different representations of the step length dist in this module."""
251    import matplotlib.pyplot as plt
252
253    # Set dist parameters
254    # Note that all 3 agree for mu=2.0, l_0=1.0 (the values used in the paper)
255    # But the analytical PDF/CDFs do not agree with the hist for mu < 2
256    # Since the rvs results agree, this means the `_cdf` and `_pdf` methods are still off
257    # though the `_ppf`s are fine.
258    assert 1 - mu + l_0 >= 0
259    n_steps = int(5e5)
260    x_stop = 1000  # rightmost bin edge
261    x = np.linspace(0, 100, 1000)  # for the line (analytical) plots
262    bins = np.arange(0, x_stop, 0.1)  # for the histograms
263
264    # Draw using original method
265    steps = [draw_step_length(l_0=l_0, mu=mu) for _ in range(n_steps)]
266
267    fig, [ax1, ax2] = plt.subplots(1, 2, figsize=(8, 4))
268
269    # Histogram of steps using original method
270    ax1.hist(
271        steps,
272        bins=bins,
273        density=True,
274        histtype="stepfilled",
275        alpha=0.7,
276        label="orig drawing method",
277    )
278
279    # Create corresponding distribution objects
280    dist = step_length_dist(mu=mu, l_0=l_0)
281    dist2 = step_length_dist2(mu=mu, scale=l_0, loc=0)
282    steps2 = dist.rvs(n_steps)
283    steps3 = dist2.rvs(n_steps)
284
285    # PDF
286    kwargs = dict(  # shared by the other two
287        bins=bins,
288        density=True,
289        histtype="stepfilled",
290        alpha=0.25,
291    )
292    ax1.hist(
293        steps2,
294        **kwargs,
295        color="orange",
296        label="using dist.rvs",
297    )
298    ax1.hist(
299        steps3,
300        **kwargs,
301        color="magenta",
302        label="using dist2.rvs",
303    )
304    ax1.plot(x, dist.pdf(x), lw=3, label="pdf - analytical", color="orange")
305    ax1.plot(x, dist2.pdf(x), ":", lw=1, label="pdf - analytical 2", color="magenta")
306
307    # CDF
308    kwargs.update(cumulative=True)
309    ax2.hist(
310        steps,
311        **kwargs,
312        label="orig drawing method",
313    )
314    ax2.hist(
315        steps2,
316        **kwargs,
317        color="orange",
318        label="using dist.rvs",
319    )
320    ax2.hist(
321        steps3,
322        **kwargs,
323        color="magenta",
324        label="using dist2.rvs",
325    )
326    ax2.plot(x, dist.cdf(x), lw=3, label="cdf - analytical", color="orange")
327    ax2.plot(x, dist2.cdf(x), ":", lw=1, label="cdf - analytical 2", color="magenta")
328
329    ax1.set_title(rf"$\mu={mu:.3g}, l_0 = {l_0:.3g}$")
330    ax1.set_xlim((0, l_0 * 10))
331    ax1.legend()
332    ax2.set_xlim((0, l_0 * 10))
333    ax2.legend()
334
335    fig.tight_layout()
336
337
338if __name__ == "__main__":
339    import matplotlib.pyplot as plt
340
341    plt.close("all")
342
343    _compare_step_dists()
def draw_step_length(*, l_0=1.0, mu=2.0, q=None):
24def draw_step_length(*, l_0=1.0, mu=2.0, q=None):
25    r"""
26    Draw a step length `l` from the power law distribution
27
28    .. math::
29       P(l) = (l/l_0)^{-\mu}
30
31    Parameters
32    ----------
33    l_0 : float
34        The minimum step size.
35    mu : float
36        Distribution shape parameter.
37
38        $3 \implies$ Brownian motion
39
40        $2 \implies$ "super diffusive Lévy walk"
41    q : float, array, optional
42        Random number in $[0, 1)$. By default, we draw a single value from uniform.
43    """
44    if mu <= 1 or mu > 3:
45        raise ValueError(f"`mu` should be in (1, 3] but is {mu!r}")
46
47    if q is None:
48        q = np.random.rand()  # draw from [0, 1) (uniform)
49
50    l = l_0 * (1 - q) ** (1 / (1 - mu))  # noqa: E741
51    # ^ note 1-mu not mu, which comes from the inverse of the CDF
52
53    return l

Draw a step length l from the power law distribution

$$P(l) = (l/l_0)^{-\mu}$$

Parameters
  • l_0 (float): The minimum step size.
  • mu (float): Distribution shape parameter.

    $3 \implies$ Brownian motion

    $2 \implies$ "super diffusive Lévy walk"

  • q (float, array, optional): Random number in $[0, 1)$. By default, we draw a single value from uniform.
def flight( n, *, x0=(0, 0), l_0=1.0, mu=2.0, l_max=None, heading0=1.5707963267948966, heading_model='uniform', heading_model_kwargs=None):
 56def flight(
 57    n,
 58    *,
 59    x0=(0, 0),
 60    l_0=1.0,
 61    mu=2.0,
 62    l_max=None,
 63    heading0=PI / 2,
 64    heading_model="uniform",
 65    heading_model_kwargs=None,
 66):
 67    r"""
 68    Parameters
 69    ----------
 70    n : int
 71        Number of steps taken in the simulated flight.
 72        The final trajectory will have `n`+1 points.
 73    x0 : array_like
 74        xy-coordinates of the starting location.
 75    l_0, mu : float
 76        Parameters of the step length power law distribution.
 77    l_max : float, optional
 78        Used to clip the step size if provided.
 79    heading0 : float
 80        Initial heading of the flight.
 81        Default: $\pi/2$
 82        (in standard polar coordinates, so, north).
 83    heading_model : {'uniform', 'truncnorm'}
 84        Relative heading model.
 85        By default, directions for each step are drawn from a uniform random distribution.
 86        With the truncnorm model, there is a preference for a certain relative change in direction
 87        (by default, a preference for continuing in the current direction).
 88    heading_model_kwargs : dict
 89        Additional parameters for the relative heading model.
 90
 91        For the
 92        [truncnorm](https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.truncnorm.html)
 93        heading model (`heading_model='truncnorm'`):
 94        - `'std'` (default `1.5`)
 95        - `'mean'` (default `0`).
 96    """
 97    assert np.asarray(x0).size == 2, "xy-coordinates"
 98
 99    if heading_model_kwargs is None:
100        heading_model_kwargs = {}
101
102    # Draw steps
103    q = np.random.rand(n)
104    steps = draw_step_length(l_0=l_0, mu=mu, q=q)
105
106    # Clip steps if desired
107    if l_max is not None:
108        np.clip(steps, None, l_max, out=steps)
109
110    # Draw (relative) headings
111    if heading_model == "uniform":
112        headings = np.random.uniform(-PI, PI, size=n - 1)
113    elif heading_model == "truncnorm":
114        std = heading_model_kwargs.get("std", 1.5)
115        mean = heading_model_kwargs.get("mean", 0)
116        assert -PI < mean < PI, "`mean` for truncnorm must be in (-pi, pi)"
117        clip_a, clip_b = -PI, PI
118        a, b = (clip_a - mean) / std, (clip_b - mean) / std
119        dist = stats.truncnorm(a, b, scale=PI / b)
120        # print(f"99.99th: {dist.ppf(0.9999)}")
121        headings = dist.rvs(n - 1)
122    else:
123        raise NotImplementedError
124
125    # Convert heading -> direction
126    # (Heading is relative to current direction)
127    angles = np.full((n,), heading0)
128    for i, heading in enumerate(headings):
129        angles[i + 1] = angles[i] + heading
130
131    # Note: the above loop can be replaced by
132    # `angles[1:] = np.cumsum(headings) + heading0`
133    # (similarly for below loop)
134
135    # Convert angles to [0, 2pi) range
136    min_angle = angles.min()
137    # Find the multiple of 2pi needed to make all values positive
138    n_ = -math.floor(angles.min() / (2 * PI)) if min_angle < 0 else 0
139    angles_mod = np.mod(angles + n_ * 2 * PI, 2 * PI)
140    assert np.allclose(np.cos(angles), np.cos(angles_mod))
141    angles = angles_mod
142
143    # # Investigate the angles
144    # import matplotlib.pyplot as plt
145    # fig, [ax1, ax2, ax3] = plt.subplots(1, 3, figsize=(9, 3.5))
146    # ax1.hist(headings); ax1.set_title("headings")
147    # ax2.hist(angles); ax2.set_title("angles")
148    # ax3.hist(angles_mod); ax3.set_title("angles % 2pi")
149    # fig.tight_layout()
150
151    # Walk
152    x = np.full((n + 1,), x0[0], dtype=float)
153    y = np.full((n + 1,), x0[1], dtype=float)
154    for i, (step, angle) in enumerate(zip(steps, angles)):
155        dx = step * math.cos(angle)
156        dy = step * math.sin(angle)
157        x[i + 1] = x[i] + dx
158        y[i + 1] = y[i] + dy
159
160    return x, y
Parameters
  • n (int): Number of steps taken in the simulated flight. The final trajectory will have n+1 points.
  • x0 (array_like): xy-coordinates of the starting location.
  • l_0, mu (float): Parameters of the step length power law distribution.
  • l_max (float, optional): Used to clip the step size if provided.
  • heading0 (float): Initial heading of the flight. Default: $\pi/2$ (in standard polar coordinates, so, north).
  • heading_model ({'uniform', 'truncnorm'}): Relative heading model. By default, directions for each step are drawn from a uniform random distribution. With the truncnorm model, there is a preference for a certain relative change in direction (by default, a preference for continuing in the current direction).
  • heading_model_kwargs (dict): Additional parameters for the relative heading model.

    For the truncnorm heading model (heading_model='truncnorm'):

    • 'std' (default 1.5)
    • 'mean' (default 0).