from dataclasses import dataclass
import math
import sys
from typing import Optional, List, Callable
from astropy.units import Quantity
import cupy as cp
import numpy as np
from disco._axes import Axes
from disco._dimensionalization import (
dim_time,
undim_time,
dim_space,
dim_momentum,
dim_magnetic_moment,
)
from disco._field_model import FieldModel
from disco._field_model_loader import FieldModelLoader, DynamicFieldModelLoader, StaticFieldModelLoader
from disco._particle_history import ParticleHistory, ParticleHistoryBuffer
from disco._kernels import do_step_kernel, rhs_kernel
from disco.constants import BLOCK_SIZE, NSTATE, RK45Coeffs
from astropy import units
__all__ = [
"Axes",
"FieldModel",
"DynamicFieldModelLoader",
"ParticleHistory",
"ParticleState",
"TraceConfig",
"trace_trajectory",
]
[docs]
class TraceConfig:
"""Configuration for running the tracing code."""
def __init__(
self,
t_final,
output_freq=None,
stopping_conditions=None,
t_initial=0 * units.s,
h_initial=1 * units.ms,
rtol=1e-2,
integrate_backwards=False,
iters_max=None,
reorder_freq=25,
):
"""Initialize a `TraceConfig` instance.
Parameters
----------
t_initial: scalar with time units
Start time of integration
t_final: scalar with time units
end time of integration (set to inf seconds if you want to stop
purely based on stopping conditions)
output_freq: int or None
How frequently (in iterations) to store output. Setting this
to non-None means memory will accumulate with time.
stopping_conditions: list of callables
List of callables (functions) that return bools. Arguments are
y, t, and field_model.
h_initial: scalar with time units
Initial step size in time (leave as positive even if integrating
backwards)
rtol: float
Relative tolerance for adaptive integration
integrate_backwards: bool
Set to True to integrate backwards in time
Examples
--------
Integrate between 0 and 10 seconds.
>>> config = disco.TraceConfig(t_final=10 * units.s)
Integrate backwards between 0 and -30 seconds.
>>> from astropy import units
>>> config = disco.TraceConfig(
t_final=-30 * units.s,
integrate_backwards=True
)
"""
self.t_final = t_final.to(units.s)
self.output_freq = output_freq
self.stopping_conditions = stopping_conditions or []
self.t_initial = t_initial.to(units.s)
self.h_initial = h_initial.to(units.ms)
self.rtol = rtol
self.integrate_backwards = integrate_backwards
self.iters_max = iters_max
self.reorder_freq = reorder_freq
[docs]
class ParticleState:
"""Initial conditions of particles."""
def __init__(self, x, y, z, ppar, magnetic_moment, mass, charge):
"""Initialize a `ParticleState` instance that is dimensionalized
and stored on the GPU.
All inputs are arrays except for mass and charge, which are
single values.
Parameters
----------
x: array with units
Starting X coordinate of particles
y: array with units
Starting Y coordinate of particles
z: array with units
Starting Z coordinate of particles
ppar: array with units
Starting parallel momentum of particles
magnetic_moment: array with units
First adiabatic invariant
mass: scalar with units
Mass of particles (one for all particles)
charge: scalar with units
Charge of particles (one for all particles)
"""
self.x = cp.array(dim_space(x))
self.y = cp.array(dim_space(y))
self.z = cp.array(dim_space(z))
self.ppar = cp.array(dim_momentum(ppar, mass))
self.magnetic_moment = cp.array(dim_magnetic_moment(magnetic_moment, charge))
self.mass = mass.to(units.kg)
self.charge = charge.to(units.coulomb)
[docs]
def trace_trajectory(config, particle_state, field_model, verbose=1):
"""Calculate particle trajectories.
Parameters
----------
config : `disco.TraceConfig`
Configuration for performing the trace
particle_state : `disco.ParticleState`
Initial conditions of the particles
field_model : `disco.FieldModel` or `disco.FieldModelLoader`
Magnetic and Electric field context
verbose : int
Set to zero to supress print statements
Returns
-------
History of the trajectories as a `ParticleHistory` object. If `config.output_freq` is `None`, contains
only the first and last step.
Examples
--------
>>> history = disco.trace_trajectory(
config, particle_state, field_model
)
>>> history.save("trajectories.h5")
"""
# If passing a field model, make a static field model loader
if isinstance(field_model, FieldModel):
field_model = field_model.dimensionalize(particle_state.mass, particle_state.charge)
field_model_loader = StaticFieldModelLoader(field_model)
elif isinstance(field_model, FieldModelLoader):
field_model_loader = field_model
else:
raise TypeError("field_model argument must be FieldModel or FieldModelLoader")
# This implements the RK45 adaptive integration algorithm, with
# absolute/relative tolerance and minimum/maximum step sizes
npart = particle_state.x.size
t_initial = dim_time(config.t_initial)
t = cp.zeros(npart) + t_initial
y = cp.zeros((npart, NSTATE))
y[:, 0] = particle_state.x
y[:, 1] = particle_state.y
y[:, 2] = particle_state.z
y[:, 3] = particle_state.ppar
y[:, 4] = particle_state.magnetic_moment
h = cp.zeros(npart) + dim_time(config.h_initial)
if config.integrate_backwards:
h *= -1
t_final = dim_time(config.t_final)
all_complete = False
stopped = cp.zeros(npart, dtype=bool)
total_reorder = cp.arange(npart, dtype=int)
stopped_cutoff = npart
iter_count = 0
R = RK45Coeffs
hist_buff = ParticleHistoryBuffer()
# Iteration
# ---------------------------
while True:
# Stop iterating when all_complete flag is set
if all_complete:
break
# Stop iterating if exceeding the maximum iterations
if config.iters_max and iter_count >= config.iters_max:
break
# Reorder batches based on stopped flag to group stopped particles
# on same warp (avoid blocking)
if config.reorder_freq is not None and (iter_count % config.reorder_freq == 0):
if verbose > 0:
print("Reordering to reduce GPU load...")
cur_reorder = cp.argsort(stopped)
y = y[cur_reorder]
t = t[cur_reorder]
h = h[cur_reorder]
stopped = stopped[cur_reorder]
total_reorder = total_reorder[cur_reorder]
stopped_cutoff = int(cp.searchsorted(stopped, cp.ones(1))[0])
# Cupy broadcasting workaround (implicit broading doesn't work)
h_ = cp.zeros((h.size, NSTATE))
for i in range(NSTATE):
h_[:, i] = h
# Call _rhs() function to implement multiple function evaluations of
# right hand side.
k1, B, extra_fields, paused1 = _rhs(t, y, field_model_loader, stopped_cutoff)
B = B.copy()
extra_fields = extra_fields.copy()
k2, _, _, paused2 = _rhs(
t + h * R.a2, y + h_ * R.b21 * k1, field_model_loader, stopped_cutoff
)
k3, _, _, paused3 = _rhs(
t + h * R.a3, y + h_ * (R.b31 * k1 + R.b32 * k2), field_model_loader, stopped_cutoff
)
k4, _, _, paused4 = _rhs(
t + h * R.a4,
y + h_ * (R.b41 * k1 + R.b42 * k2 + R.b43 * k3),
field_model_loader,
stopped_cutoff,
)
k5, _, _, paused5 = _rhs(
t + h * R.a5,
y + h_ * (R.b51 * k1 + R.b52 * k2 + R.b53 * k3 + R.b54 * k4),
field_model_loader,
stopped_cutoff,
)
k6, _, _, paused6 = _rhs(
t + h * R.a6,
y + h_ * (R.b61 * k1 + R.b62 * k2 + R.b63 * k3 + R.b64 * k4 + R.b65 * k5),
field_model_loader,
stopped_cutoff,
)
k1 *= h_
k2 *= h_
k3 *= h_
k4 *= h_
k5 *= h_
k6 *= h_
paused = paused1 | paused2 | paused3 | paused4 | paused5 | paused6
# Save incremented particles to history
save_history = iter_count == 0 or (
config.output_freq and (iter_count % config.output_freq == 0)
)
if save_history:
hist_buff.append(t, y, B, h, stopped, extra_fields, total_reorder)
# Do runge-kutta step, check to change stopping state, and change step size
# if step is performed
num_iterated = _do_step(
k1,
k2,
k3,
k4,
k5,
k6,
y,
h,
t,
t_final,
field_model_loader,
stopped,
paused,
config,
stopped_cutoff,
)
all_complete = cp.all(stopped)
iter_count += 1
# Print message to console if verbose enabled
if verbose > 0:
r_mean = cp.sqrt(y[:, 0] ** 2 + y[:, 1] ** 2 + y[:, 2] ** 2).mean()
h_step = undim_time(float(h.mean())).to(units.ms).value
t_progress = min((t.min() - t_initial) / (t_final - t_initial), 1)
print(
f"Time Complete: {100 * t_progress:.3f}% "
f"Stopped: {100 * stopped.sum() / stopped.size:.3f}% "
f"(iter {iter_count}, {num_iterated} iterated, h mean "
f"{h_step:.2f} ms, r mean {r_mean:.2f})"
)
sys.stdout.flush()
if verbose > 0:
print(f"Took {iter_count} iterations")
# Sanity Check to ensure order is preserved
total_reorder_rev = np.argsort(total_reorder)
assert cp.all(total_reorder[total_reorder_rev] == cp.arange(npart, dtype=int))
# Always save last step of each, even if not recording full history
hist_buff.append(t, y, B, h, stopped, extra_fields, total_reorder)
# Convert history buffer to ParticleHistory
hist = hist_buff.to_particle_history(particle_state, field_model)
return hist
def _rhs(t, y, field_model_loader, stopped_cutoff):
"""RIght hand side of the guiding center equation differential equation.
Parameters
-----------
t: cupy array (nparticles,)
Dimensionalized time variable
y: cupy array (nparticles,)
ODE state variable
field_model_loader: FieldModelLoader
Used to E and B fields and particles
config: TraceConfig
Tracing configuration
Returns
-------
dydt: cupy array (nparticles, NSTATE)
First three columns are position, fourth is parallel momentum,
fifth is relativistic magnetic moment. This variable is
dimensionalized. Filled up to `stopped_cutoff`.
B: cupy array (nparticles,)
Value of B at the particle positions, filled up to
`stopped_cutoff`.
extra_fields: cupy array (NEXTRA, nparticles)
Values of extra interpolated parameters at the particle positions, filled up to
`stopped_cutoff`.
"""
# Get B, E Values and partials
interp_result, paused = field_model_loader.multi_interp(t, y, stopped_cutoff)
# Launch Kernel to handle rest of RHS
arr_size = y.shape[0]
grid_size = int(math.ceil(stopped_cutoff / BLOCK_SIZE))
r_inner = cp.zeros(arr_size) + field_model_loader.axes.r_inner
dydt = cp.zeros((arr_size, NSTATE))
rhs_kernel[grid_size, BLOCK_SIZE](
dydt,
y[:stopped_cutoff],
t,
paused,
interp_result.Bx,
interp_result.By,
interp_result.Bz,
interp_result.B,
interp_result.Ex,
interp_result.Ey,
interp_result.Ez,
interp_result.dBdx,
interp_result.dBdy,
interp_result.dBdz,
interp_result.dBxdy,
interp_result.dBxdz,
interp_result.dBydx,
interp_result.dBydz,
interp_result.dBzdx,
interp_result.dBzdy,
field_model_loader.axes.x,
field_model_loader.axes.y,
field_model_loader.axes.z,
field_model_loader.axes.t,
field_model_loader.axes.x.size,
field_model_loader.axes.y.size,
field_model_loader.axes.z.size,
field_model_loader.axes.t.size,
r_inner,
)
return dydt, interp_result.B, interp_result.extra_fields, paused
def _do_step(
k1,
k2,
k3,
k4,
k5,
k6,
y,
h,
t,
t_final,
field_model_loader,
stopped,
paused,
config,
stopped_cutoff,
):
"""Do a Runge-Kutta Step.
Paramters
---------
k1-k6: K values for Runge-Kutta
y: current state vector
h: current vector of step sizes
t: current vector of particle times
t_final: final time (dimensionalized)
stopped: boolean array of whether integration has fully stopped
paused: boolean array of whether interpolation was skipped due to delayed loadeding
field_model: instance of FieldModelLoader
stopped: boolean array of whether integration has stopped
Returns
--------
num_iterated: int
Number of particles iterated
"""
# Evaluate Stopping Conditions
if config.stopping_conditions:
for stop_cond in config.stopping_conditions:
stopped |= stop_cond(y, t, field_model_loader)
# Nan signals some major problem in the code, better to stop immediately
stopped |= cp.isnan(h)
# Call Kernel to do the rest of the work
grid_size = int(math.ceil(stopped_cutoff / BLOCK_SIZE))
arr_size = y.shape[0]
y_next = cp.zeros(y.shape)
z_next = cp.zeros(y.shape)
rtol_arr = cp.zeros(arr_size) + config.rtol
t_final_arr = cp.zeros(arr_size) + t_final
r_inner = cp.zeros(arr_size) + field_model_loader.axes.r_inner
mask = cp.zeros(arr_size, dtype=bool)
do_step_kernel[grid_size, BLOCK_SIZE](
k1,
k2,
k3,
k4,
k5,
k6,
y[:stopped_cutoff],
y_next,
z_next,
h,
t,
rtol_arr,
t_final_arr,
mask,
stopped,
paused,
field_model_loader.axes.x,
field_model_loader.axes.x.size,
field_model_loader.axes.y,
field_model_loader.axes.y.size,
field_model_loader.axes.z,
field_model_loader.axes.z.size,
field_model_loader.axes.t,
field_model_loader.axes.t.size,
r_inner,
config.integrate_backwards,
)
num_iterated = mask.sum()
return num_iterated