Module vorts.model

Model classes that act as drivers for the integration routines, directing input and collecting output, etc.


def fort_bool(b: bool)

Convert Python boolean to a string of the corresponding Fortran logical (.true./.false.).


class ModelBase (vortons: Vortons = None, tracers: Tracers = None, *, dt=0.1, nt=1000)

Abstract base class for the models.

Provides concrete methods and ModelBase.plot, which work properly if the inheriting class defines a _run method that integrates the system and updates ModelBase.hist, populating it with the run's output data.

  • vortons default to equilateral triangle (_add_to_vortons..f() with 3, G=1)
  • tracers default to None (no tracers used)
Expand source code Browse git
class ModelBase(abc.ABC):
    """Abstract base class for the models.

    Provides concrete methods `` and `ModelBase.plot`, which work properly if the
    inheriting class defines a `_run` method that integrates the system and updates `ModelBase.hist`,
    populating it with the run's output data.

    def __init__(
        vortons: Vortons = None,
        tracers: Tracers = None,
        * `vortons` default to equilateral triangle (`vorts.vortons.Vortons.regular_polygon` with `3, G=1`)
        * `tracers` default to `None` (no tracers used)
        # vortons (default to equilateral triangle)
        if vortons is None:
            vortons = Vortons.regular_polygon(3, G=1)
        self.vortons = vortons
        """`vorts.vortons.Vortons` for the model instance."""
        self.vortons0 = copy.deepcopy(vortons)  # store initial state (in hist as well)
        """A copy of the initial `vorts.vortons.Vortons`, corresponding to the model input."""

        # tracers (leave if `None`)
        self.tracers = tracers
        """`vorts.vortons.Tracers` for the model instance, or `None` if no tracers."""
        self.tracers0 = copy.deepcopy(self.tracers)
        """A copy of the initial `vorts.vortons.Tracers` for the model instance, or `None` if no tracers."""

        # sim settings/parameters for every model
        self.dt = float(dt)
        r"""Time step $\delta t$ for the model output."""
        self.nt = int(nt)
        """The number of time steps to run for, such that `t=nt*dt` is the last time simulated
        and we have a total of `nt+1` times when including the initial state.
        self.n_tracers = self.tracers0.n if self.tracers0 is not None else 0
        """The number of tracers."""
        self.n_vortons = self.vortons0.n
        """The number of vortons (tracers not included!)."""
        self.n_points = self.n_vortons + self.n_tracers
        """The number of vortons + tracers."""
        self.n_timesteps = self.nt
        """Alias for `ModelBase.nt`."""

        # combine vortons and tracers (used by some models to feed combined initial states to integrators)
        self._vt0 = self.vortons0._maybe_add_tracers(self.tracers0)

        # initially no history
        self.hist = None
        """An [`xr.Dataset`](
        with coordinates `'t'` (time) and `'v'` (vorton index, including any tracers).
        Equal to `None` before the model has run.

        # initially, the model hasn't been run
        self._has_run = False

        # attach _PlotMethods instance
        self.plot = _PlotMethods(self)

    def _run(self):
        """The `_run` method in concrete classes should:

        1. Integrate the system from t=0 -> t=(nt*dt).

        2. Update `hist`.

    def run(self):
        """Integrate and update the history dataset `hist`."""
        if self._has_run:
            warnings.warn("Note that the model has already been run.")
        self._has_run = True
        # TODO: with hist having been updated (presumably), update vortons?

        return self

    def _res_to_xr(self, xhist, yhist):
        """Take full trajectory histories `xhist` and `yhist`
        in $(t, v)$ dim order and create a model results `xr.dataset`.
        vt0 = self._vt0

        G = vt0.G
        t = np.arange(0, self.nt + 1) * self.dt
        v = np.arange(0, vt0.n)

        is_t = G == 0

        ds = xr.Dataset(
                "t": ("t", t, {"long_name": "Unitless elapsed time"}),
                "v": (
                        "long_name": "Vorton index",
                        "description": (
                            r"This includes true vortons (with nonzero $\Gamma$, coming first) "
                            r"and may also include tracers ($\Gamma = 0$, coming after)."
                "G": (("v",), G, {"long_name": r"Vorton strength $\Gamma$ (circulation)"}),
                "x": (("t", "v"), xhist, {"long_name": "Vorton $x$ position"}),
                "y": (("t", "v"), yhist, {"long_name": "Vorton $y$ position"}),
                "is_t": (
                        "long_name": "Tracer mask",
                        "description": (
                            r"Vortons have nonzero $\Gamma$ values. Tracers have no $\Gamma$."
                "model_class": self.__class__.__name__,
                "int_scheme_name": getattr(self, "int_scheme_name", "?"),
        return ds


  • abc.ABC


Instance variables

var dt

Time step $\delta t$ for the model output.

var hist

An xr.Dataset with coordinates 't' (time) and 'v' (vorton index, including any tracers). Equal to None before the model has run.

var n_points

The number of vortons + tracers.

var n_timesteps

Alias for ModelBase.nt.

var n_tracers

The number of tracers.

var n_vortons

The number of vortons (tracers not included!).

var nt

The number of time steps to run for, such that t=nt*dt is the last time simulated and we have a total of nt+1 times when including the initial state.

var tracers

Tracers for the model instance, or None if no tracers.

var tracers0

A copy of the initial Tracers for the model instance, or None if no tracers.

var vortons

Vortons for the model instance.

var vortons0

A copy of the initial Vortons, corresponding to the model input.


def run(self)

Integrate and update the history dataset hist.

class Model_f (vortons: Vortons = None, tracers: Tracers = None, *, dt=0.1, nt=1000, int_scheme_name='RK4', write_vortons=True, write_tracers=False, write_ps=False)

Thin wrapper for functionality of the Fortran model, whose source code is in vorts/f/src/.


In this implementation we communicate with the Fortran program via text files.


vortons : Vortons
default: equilateral triangle with inscribing circle radius of $1$ and all $G=1$.
tracers : Tracers
default: no tracers
dt : float
Time step $\delta t$ for the output. Additionally, for the integrators, dt is used as the constant or maximum integration time step depending on the integration scheme.
nt : int
Number of time steps to run (not including $t=0$).
int_scheme_name : str

Time integration scheme name.

options: 'RK4' (standard RK4; default), 'FT' (1st-order forward Euler)

Expand source code Browse git
class Model_f(ModelBase):
    """Thin wrapper for functionality of the Fortran model, whose source code is in `vorts/f/src/`.

    .. note::
       In this implementation we communicate with the Fortran program via text files.

    # _allowed_int_scheme_names = ("FT", "RK4")

    def __init__(
        vortons: Vortons = None,
        tracers: Tracers = None,
        # above are passed to base
        write_vortons=True,  # maybe should put `flag` or something in these names

        vortons : vorts.vortons.Vortons
            default: equilateral triangle with inscribing circle radius of $1$ and all $G=1$.

        tracers : vorts.vortons.Tracers
            default: no tracers

        dt : float
            Time step $\delta t$ for the output.
            Additionally, for the integrators, `dt` is used as the constant or maximum integration time step
            depending on the integration scheme.
        nt : int
            Number of time steps to run (not including $t=0$).

        int_scheme_name : str
            Time integration scheme name.

            options: `'RK4'` (standard RK4; default), `'FT'` (1st-order forward Euler)

        # call base initialization
        super().__init__(vortons, tracers, dt=dt, nt=nt)

        # other inputs
        self.int_scheme_name = int_scheme_name  # {'FT', 'RK4'}

        # output option flags
        self.f_write_out_vortons = write_vortons
        self.f_write_out_tracers = write_tracers
        self.f_write_out_ps = write_ps

        # executing the model
        assert (FORT_BASE_DIR / "src").exists(), "Fortran source directory not found!"
        exe_name = "vorts.exe" if platform.system() == "Windows" else "vorts"
        self.vorts_exe_path = FORT_BASE_DIR / "bin" / exe_name
        self.oe = ""  # we will store standard output and error here

        # create the needed subdirs if they don't exist (pip install)
        for sub in ("bin", "in", "out"):
            p = FORT_BASE_DIR / sub
            if not p.exists():

        # write the text input files to directory `vorts/f/in`

    def create_inputs(self):
        Create input files for the Fortran model
          describing the initial condition
          and the simulation settings.
        # # write vorton system initial state
        # mat = self.vortons0.state_mat_full()  # needs to be rows of G, xi, yi
        # np.savetxt(FORT_BASE_DIR / 'in/vorts_in.txt', mat,
        #            delimiter=' ', fmt='%.16f', header='Gamma xi yi')

        # # write tracers initial state (positions only)
        # mat = self.tracers0.state_mat
        # np.savetxt(FORT_BASE_DIR / 'in/tracers_in.txt', mat,
        #            delimiter=' ', fmt='%.16f', header='xi yi')

        # Write combined vortons + tracers, with vortons first
        mat = self._vt0.state_mat_full()
            FORT_BASE_DIR / "in/vortons.txt", mat, delimiter=" ", fmt="%.16f", header="Gamma xi yi"

        # write model options
        mat = [
        np.savetxt(FORT_BASE_DIR / "in/settings.txt", mat, delimiter=" ", fmt="%s")

    def _maybe_try_compile(self):
        """Try to run `make` if the executable is missing."""
        if not self.vorts_exe_path.exists():
            with _out_and_back(FORT_BASE_DIR / "src"):
                print(f"{self.vorts_exe_path!r} doesn't exist, attempting to `make`.\n")
                except Exception as e:
                    raise Exception(
                        "Attempted `make` failed with exception (see above). "
                        "The Fortran code must be compiled before running!"
                    ) from e

    # implement abstract method `_run`
    def _run(self):
        """Invoke the Fortran model's executable and load the results."""

        # Invoke the Fortran model's executable
        # Note: could instead pass FORT_BASE_DIR into the Fortran program so could run from anywhere
        cmd = str(self.vorts_exe_path.relative_to(FORT_BASE_DIR))
        with _out_and_back(FORT_BASE_DIR):
            for f in glob.glob("./out/*"):  # non-hidden files
            self.oe = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
            # Note: could instead use `error stop` in the Fortran to get non-zero exit code and `CalledProcessError`
            s_oe = str(self.oe, "utf-8")
            if s_oe.startswith("STOP"):
                raise Exception(f"the model stopped early, with message:\n{str(s_oe)}")

        # Load results from the text file outputs into `self.hist` (an `xr.Dataset`)

    def _load_results(self):
        """Load results from a run of the Fortran model."""
        sf = 0  # there may be blank line at end of the files?

        nt = self.nt  # time steps taken
        nv = self._vt0.n
        G = self._vt0.G
        is_t = G == 0
        is_v = ~is_t

        # TODO: was trying avoid pre-allocating empty array, but for now gonna do it...
        self.hist = self._res_to_xr(np.empty((nt + 1, nv)), np.empty((nt + 1, nv)))

        if self.f_write_out_vortons:
            vortons_file = FORT_BASE_DIR / "out/vortons.csv"
            data = np.genfromtxt(vortons_file, delimiter=",", skip_header=1, skip_footer=sf)
            nrows = data.shape[0]
            i1 = np.arange(0, nrows - 1, 2)
            i2 = np.arange(1, nrows, 2)
            # need to swap dims because t is first in hist
            self.hist["x"].loc[dict(v=is_v)] = data[i1, :].T
            self.hist["y"].loc[dict(v=is_v)] = data[i2, :].T

        if self.f_write_out_tracers:
            tracers_file = FORT_BASE_DIR / "out/tracers.csv"
            data = np.genfromtxt(tracers_file, delimiter=",", skip_header=1, skip_footer=sf)
            nrows = data.shape[0]
            i1 = np.arange(0, nrows - 1, 2)
            i2 = np.arange(1, nrows, 2)
            self.hist["x"].loc[dict(v=is_t)] = data[i1, :].T
            self.hist["y"].loc[dict(v=is_t)] = data[i2, :].T

        # note: the ps code of the Fortran model only works for a specific case
        # (initial equi tri with the second point having x=0 and y>0)
        if self.f_write_out_ps:
            ps_file = FORT_BASE_DIR / "out/ps.txt"
            data = np.genfromtxt(ps_file, skip_header=1, skip_footer=sf)
   = data



def create_inputs(self)

Create input files for the Fortran model describing the initial condition and the simulation settings.

Inherited members

class Model_jl (vortons: Vortons = None, tracers: Tracers = None, *, dt=0.1, nt=1000, int_scheme_name='Tsit5', use_sysimage=False, run_method='pyjulia')

Interface to the Julia model, whose source code is in vorts/jl/vorts.jl.


In this implementation we communicate with Julia using pyjulia


vortons : Vortons
default: equilateral triangle with inscribing circle radius of $1$ and all $G=1$.
tracers : Tracers
default: no tracers
dt : float
Time step $\delta t$ for the output. Additionally, for the integrators, dt is used as the constant or maximum integration time step depending on the integration scheme.
nt : int
Number of time steps to run (not including $t=0$).
int_scheme_name : str
Time integration scheme name. Any from the ODE solvers list are valid.
use_sysimage : bool
Whether to (try to) use the created by the compile.jl script.
run_method : str, {'pyjulia', 'diffeqpy'}
Currently, we can run using pyjulia alone or using diffeqpy. Both use the tendency function defined in vorts.jl.
Expand source code Browse git
class Model_jl(ModelBase):
    """Interface to the Julia model, whose source code is in `vorts/jl/vorts.jl`.

    .. note::
       In this implementation we communicate with Julia using

    def __init__(
        vortons: Vortons = None,
        tracers: Tracers = None,
        # above are passed to base

        vortons : vorts.vortons.Vortons
            default: equilateral triangle with inscribing circle radius of $1$ and all $G=1$.

        tracers : vorts.vortons.Tracers
            default: no tracers

        dt : float
            Time step $\delta t$ for the output.
            Additionally, for the integrators, `dt` is used as the constant or maximum integration time step
            depending on the integration scheme.
        nt : int
            Number of time steps to run (not including $t=0$).

        int_scheme_name : str
            Time integration scheme name. Any from the
            [ODE solvers list](
            are valid.
        use_sysimage : bool
            Whether to (try to) use the `` created by the `compile.jl` script.
        run_method : str, {'pyjulia', 'diffeqpy'}
            Currently, we can run using pyjulia alone or using diffeqpy.
            Both use the tendency function defined in `vorts.jl`.
        # Call base initialization
        super().__init__(vortons, tracers, dt=dt, nt=nt)

        # Other inputs
        self.int_scheme_name = int_scheme_name
        self.use_sysimage = use_sysimage
        self.run_method = run_method

    def _load_jl_sysimage():
        # Use our sysimage with DifferentialEquations (and our tend fn) pre-compiled
        # (has to be done before importing any Julia modules)
        from julia import Julia

        jl = Julia(sysimage=str(JL_BASE_DIR / ""))
        # TODO: might want to also support `Julia(compiled_modules=False)` in some way
        return jl

    # pyjulia alone
    def _run_pyjulia(self):
        from julia.core import JuliaError

        if self.use_sysimage:

        # For some reason it doesn't work the first time on Windows (can't find PyCall)
        # but will work if we just do it again...
            from julia import Base, Main, Pkg
        except JuliaError as e:
            warnings.warn(f"Julia imports failed the first time with message:\n{e}")
            from julia import Base, Main, Pkg

        # Load our code
        # Base.println("Initial env status:")
        # Pkg.status()
        Main.include((JL_BASE_DIR / "vorts.jl").as_posix())
        Base.println("vorts.jl has been loaded!")

        # Run by calling `integrate` from `vorts.jl`
        x, y = Main.integrate(

        # Set output dataset
        self.hist = self._res_to_xr(x, y)

    # pyjulia + diffeqpy
    def _run_diffeqpy(self):
        from julia.core import JuliaError

        if self.use_sysimage:

            from julia import Base, Main, Pkg
        except JuliaError as e:
            warnings.warn(f"Julia imports failed the first time with message:\n{e}")
            from julia import Base, Main, Pkg


        # `ode` from the readme doesn't work yet in stable (would have to install from GH)
        # This one would also take two tries in Win if we hadn't already done the above import
        from diffeqpy import de

        # Add the code with the tendency function
        Main.include((JL_BASE_DIR / "vorts.jl").as_posix())
        Base.println("vorts.jl has been loaded!")

        # Set up our inputs
        nt = self.nt
        dt = self.dt
        vt0 = self._vt0
        r0 = vt0.state_mat.T  # note transpose!
        tspan = (0, dt * nt)
        p = Main.TendParams(vt0.G)
        solver = getattr(de, self.int_scheme_name)()

        prob = de.ODEProblem(Main.tend_b, r0, tspan, p)
        sol = de.solve(prob, solver, saveat=np.arange(0, nt + 1) * dt)

        # Exract data and set the output dataset
        x = np.empty((self.nt + 1, vt0.n))
        y = np.empty_like(x)
        assert len(sol.u) == nt + 1
        assert sol.u[0].shape == (2, vt0.n)
        for i in range(len(sol.u)):
            x[i, :] = sol.u[i][0, :]
            y[i, :] = sol.u[i][1, :]

        self.hist = self._res_to_xr(x, y)

    def _run(self):
        if self.run_method == "pyjulia":
        elif self.run_method == "diffeqpy":
            raise ValueError("`run_method` must be 'pyjulia' or 'diffeqpy'.")


Inherited members

class Model_py (vortons: Vortons = None, tracers: Tracers = None, *, dt=0.1, nt=1000, int_scheme_name='RK4', **int_scheme_kwargs)

Model in Python.


vortons : Vortons
default: equilateral triangle with inscribing circle radius of $1$ and all $G=1$.
tracers : Tracers
default: no tracers
dt : float
Time step $\delta t$ for the output. Additionally, for the integrators, dt is used as the constant or maximum integration time step depending on the integration scheme.
nt : int
Number of time steps to run (not including $t=0$).
int_scheme_name : str

Time integration scheme name.

default: 'RK4' – handwritten standard RK4, using Numba to calculate the tendencies

Other currently valid options are:

  • 'scipy_*' methods – use scipy.integrate.solve_ivp

    where the * can be RK45, DOP854, Radau, BDF, LSODA

  • 'FT' – handwritten 1st-order forward Euler

Passed on to integrate_manual() or integrate_scipy().
Expand source code Browse git
class Model_py(ModelBase):
    """Model in Python."""

    _manual_steppers = MANUAL_STEPPERS
    _scipy_methods = SCIPY_METHODS
    _allowed_int_scheme_names = list(_manual_steppers) + list(_scipy_methods)
    # _allowed_int_scheme_names = list(_scipy_methods)  # temporarily disable manual steppers methods

    def __init__(
        vortons: Vortons = None,
        tracers: Tracers = None,
        # above are passed to base

        vortons : vorts.vortons.Vortons
            default: equilateral triangle with inscribing circle radius of $1$ and all $G=1$.

        tracers : vorts.vortons.Tracers
            default: no tracers

        dt : float
            Time step $\delta t$ for the output.
            Additionally, for the integrators, `dt` is used as the constant or maximum integration time step
            depending on the integration scheme.
        nt : int
            Number of time steps to run (not including $t=0$).

        int_scheme_name : str
            Time integration scheme name.

            default: `'RK4'` -- handwritten standard RK4, using Numba to calculate the tendencies

            Other currently valid options are:

            * `'scipy_*'` methods -- use [`scipy.integrate.solve_ivp`](

                where the `*` can be `RK45`, `DOP854`, `Radau`, `BDF`, `LSODA`

            * `'FT'` -- handwritten 1st-order forward Euler

            Passed on to `` or ``.

        # call base initialization
        super().__init__(vortons, tracers, dt=dt, nt=nt)

        # other inputs
        self.int_scheme_name = int_scheme_name
        self.int_scheme_kwargs = int_scheme_kwargs

        # check `int_scheme_name`
        if self.int_scheme_name not in self._allowed_int_scheme_names:
            raise ValueError(
                f"{self.int_scheme_name!r} is not one of the allowed options for `int_scheme_name`:\n"

        # calculate initial C, used for adaptive time stepping tolerance checks
        self.C_0 = self.vortons0.C()

    # implement abstract method `_run`
    def _run(self):
        dt, nt = self.dt, self.nt
        # t_eval = np.arange(dt, (nt+1)*dt, dt)
        t_eval = np.arange(0, nt + 1) * dt
        v0 = self._vt0

        # manual (handwritten) integrators
        if "scipy" not in self.int_scheme_name:
            x0 = v0.x
            y0 = v0.y
            G = v0.G
            xhist, yhist = integrate_manual(
            # returned data have shape (nv, nt)
            self.hist = self._res_to_xr(xhist.T, yhist.T)

        # integration using SciPy
            y0 = v0.xy.T.flatten()  # needs to be a 1-d array!
            G_col = v0.G_col
            data = integrate_scipy(
            # returned data has shape (2nv, nt), where n is number of vortons and nt number of time steps
            nv = v0.n
            self.hist = self._res_to_xr(data[:nv, :].T, data[nv:, :].T)


Inherited members