blpd.bees
Bee flight model – the "b" in blpd
.
Based on the Lévy flight random walk model as implemented in Fuentes et al. (2016) but with some enhancements.
1""" 2Bee flight model – the "b" in ``blpd``. 3 4Based on the Lévy flight random walk model as implemented in 5[Fuentes et al. (2016)](https://doi.org/10.1016/j.atmosenv.2016.07.002) 6but with some enhancements. 7""" 8 9import math 10 11import numpy as np 12from scipy import stats 13 14__all__ = ( 15 "draw_step_length", 16 "flight", 17) 18 19 20PI = math.pi 21 22 23def draw_step_length(*, l_0=1.0, mu=2.0, q=None): 24 r""" 25 Draw a step length `l` from the power law distribution 26 27 .. math:: 28 P(l) = (l/l_0)^{-\mu} 29 30 Parameters 31 ---------- 32 l_0 : float 33 The minimum step size. 34 mu : float 35 Distribution shape parameter. 36 37 $3 \implies$ Brownian motion 38 39 $2 \implies$ "super diffusive Lévy walk" 40 q : float, array, optional 41 Random number in $[0, 1)$. By default, we draw a single value from uniform. 42 """ 43 if mu <= 1 or mu > 3: 44 raise ValueError(f"`mu` should be in (1, 3] but is {mu!r}") 45 46 if q is None: 47 q = np.random.rand() # draw from [0, 1) (uniform) 48 49 l = l_0 * (1 - q) ** (1 / (1 - mu)) # noqa: E741 50 # ^ note 1-mu not mu, which comes from the inverse of the CDF 51 52 return l 53 54 55def flight( 56 n, 57 *, 58 x0=(0, 0), 59 l_0=1.0, 60 mu=2.0, 61 l_max=None, 62 heading0=PI / 2, 63 heading_model="uniform", 64 heading_model_kwargs=None, 65): 66 r""" 67 Parameters 68 ---------- 69 n : int 70 Number of steps taken in the simulated flight. 71 The final trajectory will have `n`+1 points. 72 x0 : array_like 73 xy-coordinates of the starting location. 74 l_0, mu : float 75 Parameters of the step length power law distribution. 76 l_max : float, optional 77 Used to clip the step size if provided. 78 heading0 : float 79 Initial heading of the flight. 80 Default: $\pi/2$ 81 (in standard polar coordinates, so, north). 82 heading_model : {'uniform', 'truncnorm'} 83 Relative heading model. 84 By default, directions for each step are drawn from a uniform random distribution. 85 With the truncnorm model, there is a preference for a certain relative change in direction 86 (by default, a preference for continuing in the current direction). 87 heading_model_kwargs : dict 88 Additional parameters for the relative heading model. 89 90 For the 91 [truncnorm](https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.truncnorm.html) 92 heading model (`heading_model='truncnorm'`): 93 - `'std'` (default `1.5`) 94 - `'mean'` (default `0`). 95 """ 96 assert np.asarray(x0).size == 2, "xy-coordinates" 97 98 if heading_model_kwargs is None: 99 heading_model_kwargs = {} 100 101 # Draw steps 102 q = np.random.rand(n) 103 steps = draw_step_length(l_0=l_0, mu=mu, q=q) 104 105 # Clip steps if desired 106 if l_max is not None: 107 np.clip(steps, None, l_max, out=steps) 108 109 # Draw (relative) headings 110 if heading_model == "uniform": 111 headings = np.random.uniform(-PI, PI, size=n - 1) 112 elif heading_model == "truncnorm": 113 std = heading_model_kwargs.get("std", 1.5) 114 mean = heading_model_kwargs.get("mean", 0) 115 assert -PI < mean < PI, "`mean` for truncnorm must be in (-pi, pi)" 116 clip_a, clip_b = -PI, PI 117 a, b = (clip_a - mean) / std, (clip_b - mean) / std 118 dist = stats.truncnorm(a, b, scale=PI / b) 119 # print(f"99.99th: {dist.ppf(0.9999)}") 120 headings = dist.rvs(n - 1) 121 else: 122 raise NotImplementedError 123 124 # Convert heading -> direction 125 # (Heading is relative to current direction) 126 angles = np.full((n,), heading0) 127 for i, heading in enumerate(headings): 128 angles[i + 1] = angles[i] + heading 129 130 # Note: the above loop can be replaced by 131 # `angles[1:] = np.cumsum(headings) + heading0` 132 # (similarly for below loop) 133 134 # Convert angles to [0, 2pi) range 135 min_angle = angles.min() 136 # Find the multiple of 2pi needed to make all values positive 137 n_ = -math.floor(angles.min() / (2 * PI)) if min_angle < 0 else 0 138 angles_mod = np.mod(angles + n_ * 2 * PI, 2 * PI) 139 assert np.allclose(np.cos(angles), np.cos(angles_mod)) 140 angles = angles_mod 141 142 # # Investigate the angles 143 # import matplotlib.pyplot as plt 144 # fig, [ax1, ax2, ax3] = plt.subplots(1, 3, figsize=(9, 3.5)) 145 # ax1.hist(headings); ax1.set_title("headings") 146 # ax2.hist(angles); ax2.set_title("angles") 147 # ax3.hist(angles_mod); ax3.set_title("angles % 2pi") 148 # fig.tight_layout() 149 150 # Walk 151 x = np.full((n + 1,), x0[0], dtype=float) 152 y = np.full((n + 1,), x0[1], dtype=float) 153 for i, (step, angle) in enumerate(zip(steps, angles)): 154 dx = step * math.cos(angle) 155 dy = step * math.sin(angle) 156 x[i + 1] = x[i] + dx 157 y[i + 1] = y[i] + dy 158 159 return x, y 160 161 162# Note: `scipy.stats.powerlaw` doesn't let its param `a` be negative, so can't use it 163 164 165class _step_length_dist_gen(stats.rv_continuous): 166 """Step length distribution class using the `scipy.stats` machinery.""" 167 168 def _pdf(self, x, l_0, mu): 169 c = -l_0 / (1 - mu) 170 p = (x / l_0) ** (-mu) 171 return p / c 172 173 def _cdf(self, x, l_0, mu): 174 # https://www.wolframalpha.com/input/?i=anti-derivative+%28l%2Fc%29%5E-mu+dl 175 # https://www.wolframalpha.com/input/?i=anti-derivative+%28l%2Fc%29%5E-mu+dl%2C+from+l%3Dc+to+x 176 c = -l_0 / (1 - mu) 177 # F = x * (x/l_0)**(-mu) / (1-mu) + c 178 F = l_0**mu * x ** (1 - mu) / (1 - mu) + c 179 # F[x < l_0] = 0 180 return F / c 181 182 def _ppf(self, q, l_0, mu): 183 c = -l_0 / (1 - mu) 184 x = ((q * c * (1 - mu) + l_0) * l_0**-mu) ** (1.0 / (1.0 - mu)) 185 186 return x 187 188 def _argcheck(self, *args): 189 l_0, mu = args # unpack 190 191 # Minimum step length `l_0` must be positive 192 cond_l_0 = np.asarray(l_0) > 0 193 194 # Shape parameter `mu` is supposed to be in range [1, 3] 195 # not sure why it can't just be positive tho... 196 arr_mu = np.asarray(mu) 197 cond_mu = arr_mu > 1 and arr_mu <= 3 198 199 arr_l_0 = np.asarray(l_0) 200 cond_mu_2 = 1 * (1 - arr_mu) + arr_l_0 >= 0 201 202 cond = 1 203 for cond_ in [cond_l_0, cond_mu, cond_mu_2]: 204 cond = np.logical_and(cond, cond_) 205 206 return cond 207 208 def _get_support(self, *args, **kwargs): 209 return args[0], np.inf 210 211 212step_length_dist = _step_length_dist_gen(name="step_length_dist") 213 214 215# scaling with `scale` not working for pdf/cdf 216# need to specify them differently I guess 217class _step_length_dist2_gen(stats.rv_continuous): 218 """Step length distribution class using the `scipy.stats` machinery.""" 219 220 # Instead of including l_0 as a param, 221 # we just let scipy.stats do that via the `scale` param 222 223 def _pdf(self, x, mu): 224 c = -1.0 / (1 - mu) # normalization const 225 return (x**-mu) / c 226 227 def _cdf(self, x, mu): 228 c = -1.0 / (1 - mu) 229 F = (x ** (1 - mu) - 1) / (1 - mu) 230 return F / c 231 232 def _ppf(self, q, mu): 233 c = -1.0 / (1 - mu) 234 x = (q * c * (1 - mu) + 1) ** (1.0 / (1.0 - mu)) 235 return x 236 237 def _argcheck(self, *args, **kwargs): 238 mu = args # unpack 239 240 arr_mu = np.asarray(mu) 241 cond_mu = arr_mu > 1 and arr_mu <= 3 242 243 return cond_mu 244 245 246step_length_dist2 = _step_length_dist2_gen(a=1.0, name="step_length_dist2") 247 248 249def _compare_step_dists(mu=2.0, l_0=1.0): 250 """Compare the different representations of the step length dist in this module.""" 251 import matplotlib.pyplot as plt 252 253 # Set dist parameters 254 # Note that all 3 agree for mu=2.0, l_0=1.0 (the values used in the paper) 255 # But the analytical PDF/CDFs do not agree with the hist for mu < 2 256 # Since the rvs results agree, this means the `_cdf` and `_pdf` methods are still off 257 # though the `_ppf`s are fine. 258 assert 1 - mu + l_0 >= 0 259 n_steps = int(5e5) 260 x_stop = 1000 # rightmost bin edge 261 x = np.linspace(0, 100, 1000) # for the line (analytical) plots 262 bins = np.arange(0, x_stop, 0.1) # for the histograms 263 264 # Draw using original method 265 steps = [draw_step_length(l_0=l_0, mu=mu) for _ in range(n_steps)] 266 267 fig, [ax1, ax2] = plt.subplots(1, 2, figsize=(8, 4)) 268 269 # Histogram of steps using original method 270 ax1.hist( 271 steps, 272 bins=bins, 273 density=True, 274 histtype="stepfilled", 275 alpha=0.7, 276 label="orig drawing method", 277 ) 278 279 # Create corresponding distribution objects 280 dist = step_length_dist(mu=mu, l_0=l_0) 281 dist2 = step_length_dist2(mu=mu, scale=l_0, loc=0) 282 steps2 = dist.rvs(n_steps) 283 steps3 = dist2.rvs(n_steps) 284 285 # PDF 286 kwargs = dict( # shared by the other two 287 bins=bins, 288 density=True, 289 histtype="stepfilled", 290 alpha=0.25, 291 ) 292 ax1.hist( 293 steps2, 294 **kwargs, 295 color="orange", 296 label="using dist.rvs", 297 ) 298 ax1.hist( 299 steps3, 300 **kwargs, 301 color="magenta", 302 label="using dist2.rvs", 303 ) 304 ax1.plot(x, dist.pdf(x), lw=3, label="pdf - analytical", color="orange") 305 ax1.plot(x, dist2.pdf(x), ":", lw=1, label="pdf - analytical 2", color="magenta") 306 307 # CDF 308 kwargs.update(cumulative=True) 309 ax2.hist( 310 steps, 311 **kwargs, 312 label="orig drawing method", 313 ) 314 ax2.hist( 315 steps2, 316 **kwargs, 317 color="orange", 318 label="using dist.rvs", 319 ) 320 ax2.hist( 321 steps3, 322 **kwargs, 323 color="magenta", 324 label="using dist2.rvs", 325 ) 326 ax2.plot(x, dist.cdf(x), lw=3, label="cdf - analytical", color="orange") 327 ax2.plot(x, dist2.cdf(x), ":", lw=1, label="cdf - analytical 2", color="magenta") 328 329 ax1.set_title(rf"$\mu={mu:.3g}, l_0 = {l_0:.3g}$") 330 ax1.set_xlim((0, l_0 * 10)) 331 ax1.legend() 332 ax2.set_xlim((0, l_0 * 10)) 333 ax2.legend() 334 335 fig.tight_layout() 336 337 338if __name__ == "__main__": 339 import matplotlib.pyplot as plt 340 341 plt.close("all") 342 343 _compare_step_dists()
def
draw_step_length(*, l_0=1.0, mu=2.0, q=None):
24def draw_step_length(*, l_0=1.0, mu=2.0, q=None): 25 r""" 26 Draw a step length `l` from the power law distribution 27 28 .. math:: 29 P(l) = (l/l_0)^{-\mu} 30 31 Parameters 32 ---------- 33 l_0 : float 34 The minimum step size. 35 mu : float 36 Distribution shape parameter. 37 38 $3 \implies$ Brownian motion 39 40 $2 \implies$ "super diffusive Lévy walk" 41 q : float, array, optional 42 Random number in $[0, 1)$. By default, we draw a single value from uniform. 43 """ 44 if mu <= 1 or mu > 3: 45 raise ValueError(f"`mu` should be in (1, 3] but is {mu!r}") 46 47 if q is None: 48 q = np.random.rand() # draw from [0, 1) (uniform) 49 50 l = l_0 * (1 - q) ** (1 / (1 - mu)) # noqa: E741 51 # ^ note 1-mu not mu, which comes from the inverse of the CDF 52 53 return l
Draw a step length l
from the power law distribution
$$P(l) = (l/l_0)^{-\mu}$$
Parameters
- l_0 (float): The minimum step size.
mu (float): Distribution shape parameter.
$3 \implies$ Brownian motion
$2 \implies$ "super diffusive Lévy walk"
- q (float, array, optional): Random number in $[0, 1)$. By default, we draw a single value from uniform.
def
flight( n, *, x0=(0, 0), l_0=1.0, mu=2.0, l_max=None, heading0=1.5707963267948966, heading_model='uniform', heading_model_kwargs=None):
56def flight( 57 n, 58 *, 59 x0=(0, 0), 60 l_0=1.0, 61 mu=2.0, 62 l_max=None, 63 heading0=PI / 2, 64 heading_model="uniform", 65 heading_model_kwargs=None, 66): 67 r""" 68 Parameters 69 ---------- 70 n : int 71 Number of steps taken in the simulated flight. 72 The final trajectory will have `n`+1 points. 73 x0 : array_like 74 xy-coordinates of the starting location. 75 l_0, mu : float 76 Parameters of the step length power law distribution. 77 l_max : float, optional 78 Used to clip the step size if provided. 79 heading0 : float 80 Initial heading of the flight. 81 Default: $\pi/2$ 82 (in standard polar coordinates, so, north). 83 heading_model : {'uniform', 'truncnorm'} 84 Relative heading model. 85 By default, directions for each step are drawn from a uniform random distribution. 86 With the truncnorm model, there is a preference for a certain relative change in direction 87 (by default, a preference for continuing in the current direction). 88 heading_model_kwargs : dict 89 Additional parameters for the relative heading model. 90 91 For the 92 [truncnorm](https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.truncnorm.html) 93 heading model (`heading_model='truncnorm'`): 94 - `'std'` (default `1.5`) 95 - `'mean'` (default `0`). 96 """ 97 assert np.asarray(x0).size == 2, "xy-coordinates" 98 99 if heading_model_kwargs is None: 100 heading_model_kwargs = {} 101 102 # Draw steps 103 q = np.random.rand(n) 104 steps = draw_step_length(l_0=l_0, mu=mu, q=q) 105 106 # Clip steps if desired 107 if l_max is not None: 108 np.clip(steps, None, l_max, out=steps) 109 110 # Draw (relative) headings 111 if heading_model == "uniform": 112 headings = np.random.uniform(-PI, PI, size=n - 1) 113 elif heading_model == "truncnorm": 114 std = heading_model_kwargs.get("std", 1.5) 115 mean = heading_model_kwargs.get("mean", 0) 116 assert -PI < mean < PI, "`mean` for truncnorm must be in (-pi, pi)" 117 clip_a, clip_b = -PI, PI 118 a, b = (clip_a - mean) / std, (clip_b - mean) / std 119 dist = stats.truncnorm(a, b, scale=PI / b) 120 # print(f"99.99th: {dist.ppf(0.9999)}") 121 headings = dist.rvs(n - 1) 122 else: 123 raise NotImplementedError 124 125 # Convert heading -> direction 126 # (Heading is relative to current direction) 127 angles = np.full((n,), heading0) 128 for i, heading in enumerate(headings): 129 angles[i + 1] = angles[i] + heading 130 131 # Note: the above loop can be replaced by 132 # `angles[1:] = np.cumsum(headings) + heading0` 133 # (similarly for below loop) 134 135 # Convert angles to [0, 2pi) range 136 min_angle = angles.min() 137 # Find the multiple of 2pi needed to make all values positive 138 n_ = -math.floor(angles.min() / (2 * PI)) if min_angle < 0 else 0 139 angles_mod = np.mod(angles + n_ * 2 * PI, 2 * PI) 140 assert np.allclose(np.cos(angles), np.cos(angles_mod)) 141 angles = angles_mod 142 143 # # Investigate the angles 144 # import matplotlib.pyplot as plt 145 # fig, [ax1, ax2, ax3] = plt.subplots(1, 3, figsize=(9, 3.5)) 146 # ax1.hist(headings); ax1.set_title("headings") 147 # ax2.hist(angles); ax2.set_title("angles") 148 # ax3.hist(angles_mod); ax3.set_title("angles % 2pi") 149 # fig.tight_layout() 150 151 # Walk 152 x = np.full((n + 1,), x0[0], dtype=float) 153 y = np.full((n + 1,), x0[1], dtype=float) 154 for i, (step, angle) in enumerate(zip(steps, angles)): 155 dx = step * math.cos(angle) 156 dy = step * math.sin(angle) 157 x[i + 1] = x[i] + dx 158 y[i + 1] = y[i] + dy 159 160 return x, y
Parameters
- n (int):
Number of steps taken in the simulated flight.
The final trajectory will have
n
+1 points. - x0 (array_like): xy-coordinates of the starting location.
- l_0, mu (float): Parameters of the step length power law distribution.
- l_max (float, optional): Used to clip the step size if provided.
- heading0 (float): Initial heading of the flight. Default: $\pi/2$ (in standard polar coordinates, so, north).
- heading_model ({'uniform', 'truncnorm'}): Relative heading model. By default, directions for each step are drawn from a uniform random distribution. With the truncnorm model, there is a preference for a certain relative change in direction (by default, a preference for continuing in the current direction).
heading_model_kwargs (dict): Additional parameters for the relative heading model.
For the truncnorm heading model (
heading_model='truncnorm'
):'std'
(default1.5
)'mean'
(default0
).