Simulate diffusion on a grid using Python¶
Updated on January 2, 2024
This notebook is released under the BSD 3-Clause license.
References¶
This Jupyter Notebook uses the Python 3 programming language, the numpy library for n-dimensional array-programming and linear algebra, the numba Python compiler, and the matplotlib library for plotting.
The mean square displacement (MSD) of Brownian particles in n-dimensions Euclidean space is:
$$ MSD = \left<\left(x_1(t) - x_1(0)\right)^2 + \left(x_2(t) - x_2(0)\right)^2 + ... + \left(x_n(t) - x_n(0)\right)^2\right> = 2nDt $$
where $t$ is the time, $D$ the diffusion coefficient, $n$ the dimension, $x_n(t)$ the cartesian coordinate of a particle in dimension $n$ at time $t$, and $\left<\right>$ the ensemble average over all particles.
Some of the functions used in the first part of the simulation code:
range(stop)
returns a sequence of integers from 0 to stop (excluding).print(objects)
prints objects to the text stream file.numpy.zeros(shape, dtype)
returns an array of specified shape and data type, initialized with zeros.numpy.random.randint(high, size)
returns an array of shape "size", initialized with random integers between 0 and high (excluding).numpy.take(array, indices, axis)
returns elements at indices from an array along an axis.numpy.cumsum(array, axis)
returns the cumulative sum of an array along a specified axis. For example,numpy.cumsum([[1, 2, 3], [4, 5, 6]], axis=1)
is[[1, 3, 6], [4, 9, 15]]
.numpy.mean(array, axis)
returns the average of the elements along axis.numpy.arange(stop)
returns evenly spaced values up to stop.numpy.linalg.lstsq(a, b)
returns the least-squares solution to a linear matrix equation, i.e. it computes the vectorx
that approximately solves the equationa @ x = b
.array[..., numpy.newaxis]
appends a new dimension of size 1 to the array, e.g. to turn a vector into a matrix.array[index]
returns the value(s) of the array at the position(s) specified by index.
Diffusion on a one-dimensional grid¶
# import the numpy array-computing library
import numpy
# Define some variables to control the simulation:
# number of particles
particles = 1000
# duration of the simulation, i.e. the number of sampling periods
duration = 1000
# the sampling period in arbitrary time units
sampling_period = 1000
# probability that a particle moves in a certain direction during the sampling
# period. Given in number of time units of the sampling period
diffusion_speed = 10
# Allocate a two-dimensional array of integers, which can store
# the positions of all particles during the simulation.
positions = numpy.zeros((particles, duration), dtype=numpy.int32)
# Create a look-up-table of directions to move.
# This table will be indexed by random numbers in range 0 to `sampling_period`.
# TODO: the table can be extended by another axis to include movements
# in y and z directions.
directions = numpy.zeros(sampling_period, dtype=numpy.int32)
# move the particle in the positive direction
directions[0:diffusion_speed] = 1
# move the particle in the negative direction
directions[diffusion_speed : diffusion_speed * 2] = -1
# Run the simulation separately for all particles.
# TODO: this loop could be vectorized, i.e. below calculations could be done
# for all particles at once.
for particle in range(particles):
# Get a random number between 0 and `sampling_period`
# for all sampling periods in the duration of the simulation.
random_numbers = numpy.random.randint(sampling_period, size=duration)
# Index the first axis in the `directions` look-up-table with the random
# numbers to obtain the relative moves of the particle for all sampling
# periods.
moves = numpy.take(directions, random_numbers, axis=0)
# Set the first position of the particle to the origin
# TODO: the initial position could be randomized.
moves[0] = 0
# Calculate all positions of the particle for the duration of the
# simulation by cumulatively summing the relative moves.
# The result is stored in the positions array.
# TODO: to include obstacles in the simulation, the numpy.cumsum function
# could be replaced by a custom function restricting movement in and
# out of obstacles.
positions[particle] = numpy.cumsum(moves, axis=0)
# Calculate the mean square displacement (MSD) at each sampling period
# by squaring all positions and averaging them over particles.
msd = numpy.mean(numpy.square(positions), axis=0)
# Calculate the diffusion coefficient D from the slope of the MSD values vs
# time. The slope is fitted by solving a linear equation system.
time = numpy.arange(duration)[..., numpy.newaxis]
slope = numpy.linalg.lstsq(time, msd, rcond=None)[0][0]
D = slope / 2
print(D * sampling_period)
10.030524875050135
Plot the positions of some particles over time¶
from matplotlib import pyplot
pyplot.figure()
pyplot.title('Selected particle positions')
pyplot.xlabel('time')
pyplot.ylabel('position')
for i in range(5):
pyplot.plot(positions[i])
pyplot.show()
Plot all particle positions as a color-coded image¶
pyplot.figure()
pyplot.title('Particle positions')
pyplot.xlabel('time')
pyplot.ylabel('particle')
minmax = numpy.max(numpy.abs(positions))
pyplot.imshow(positions, cmap='seismic', vmin=-minmax, vmax=minmax)
pyplot.colorbar()
pyplot.show()
Plot a histogram of particle positions at the end of the simulation¶
pyplot.figure()
pyplot.title('Histogram of last particle position')
pyplot.xlabel('position')
pyplot.ylabel('frequency')
minmax = numpy.max(numpy.abs(positions))
pyplot.hist(positions[:, -1], numpy.arange(-minmax - 0.5, minmax + 0.5))
pyplot.show()
Plot MSD and line fit¶
pyplot.figure()
pyplot.title('MSD and linear fit')
pyplot.xlabel('time')
pyplot.ylabel('MSD')
pyplot.plot(time, msd, '.', label='simulation')
pyplot.plot(time, slope * time, '-', lw=3, label='fit')
pyplot.legend()
pyplot.show()
Remove all names defined above from the global namespace. Global names might interfere with following code.
%reset -f
Import libraries and modules used in this document:
import copy
import math
import ipywidgets
import numba
import numpy
import matplotlib
from matplotlib import pyplot
from mpl_toolkits.mplot3d import Axes3D
Diffusion on a n-dimensional grid¶
To make the code more modular, manageable, extensible, and reusable, it is refactored into small functions for simulation, particle counting, analysis, and plotting. The simulation model is extended to handle many dimensions. Hook functions allow customizing the initialization and restriction of particle movements. A simple detector "particle counter box" and methods to analyze and plot particle counts are added.
Diffusion in one, two, and three dimensions are simulated and compared.
def simulate_diffusion(
dimensions,
duration,
sampling_period,
number_particles,
diffusion_speed,
diffusion_model,
diffusion_model_args,
positions_init,
positions_init_args,
):
"""Return nD positions of all particles for duration of simulation."""
assert 0 < dimensions < 8
assert sampling_period > diffusion_speed * (dimensions + 1) * 2
# generate a look-up-table of directions to move.
# this table will be indexed by random numbers in range 0 to
# `sampling_period`
directions = numpy.zeros((sampling_period, dimensions), dtype=numpy.int32)
# generate combinations of all possible relative moves in all dimensions
all_possible_directions = numpy.stack(
numpy.meshgrid(*([-1, 0, 1],) * dimensions), -1
).reshape(-1, dimensions)
index = 0
for direction in all_possible_directions:
if numpy.sum(numpy.abs(direction)) != 1:
# particles can move only in one dimension per sampling_period
continue
# move the particle in the specified direction if random number is
# between `index` and `index + diffusion_speed * dimensions`
directions[index : index + diffusion_speed * dimensions] = direction
index += diffusion_speed * dimensions
# get a random number between 0 and `sampling_period`
# for all particles and sampling periods in the duration of the simulation
random_numbers = numpy.random.randint(
sampling_period, size=(number_particles, duration)
)
# index the first axis in the `directions` look-up-table with the random
# numbers to obtain the relative moves of all particles for all sampling
# periods
random_moves = numpy.take(directions, random_numbers, axis=0)
# set the initial positions of particles using a hook function
positions_init(random_moves, **positions_init_args)
if diffusion_model is None:
return random_moves
# calculate the positions of particles from the random moves using a
# hook function
positions = diffusion_model(random_moves, **diffusion_model_args)
return positions
def positions_init_origin(random_moves):
"""Set in-place initial position of particles to origin."""
random_moves[:, 0] = 0
def diffusion_model_unconstrained(random_moves, **kwargs):
"""Diffusion with no constraints."""
return numpy.cumsum(random_moves, axis=1)
def particle_counter_box(positions, counter_shape=None, counter_position=None):
"""Return number of particles in observation box over time.
Also return the indices of particles that were counted.
"""
dimensions = positions.shape[-1]
if counter_shape is None:
counter_shape = (1,) * dimensions # one element
if counter_position is None:
counter_position = (0,) * dimensions # center
lower = tuple(p - s // 2 for p, s in zip(counter_position, counter_shape))
upper = tuple(
p + s // 2 + s % 2 for p, s in zip(counter_position, counter_shape)
)
in_box = numpy.all((positions >= lower) & (positions < upper), axis=-1)
particle_counts = numpy.sum(in_box, axis=0)
particles_counted = numpy.nonzero(numpy.any(in_box, axis=1))
return particle_counts, particles_counted
def calculate_msd_d(positions):
"""Return mean square displacement and D of simulated positions."""
number_particles, duration, dimensions = positions.shape
msd = numpy.mean(
numpy.square(positions - positions[:, 0:1, :]), axis=(0, -1)
)
time = numpy.arange(duration)[..., numpy.newaxis]
slope = numpy.linalg.lstsq(time, msd, rcond=None)[0][0]
d = slope / (2 * dimensions)
return msd, d
def plot_positions(positions, selection=None, ax=None, title=None, label=None):
"""Plot positions of selected particles over duration of simulation."""
number_particles, duration, dimensions = positions.shape
if selection is None:
selection = slice(1) # first particle
threed = dimensions > 2 and not isinstance(selection, int)
ax_ = ax
if ax is None:
fig = pyplot.figure(figsize=(7.0, 7.0) if threed else None)
ax = fig.add_subplot(111, projection='3d' if threed else None)
if title is None:
title = 'Selected particle positions'
ax.set_title(title)
if isinstance(selection, int):
time = numpy.arange(duration)
ax.set_xlabel('time')
ax.set_ylabel('position')
label = '' if label is None else label + ' '
for i, dim in zip(range(dimensions - 1, -1, -1), 'xyzwvuts'):
ax.plot(time, positions[selection, :, i], label=label + dim)
elif dimensions == 1:
time = numpy.arange(duration)
ax.set_xlabel('time')
ax.set_ylabel('x')
for pos in positions[selection]:
ax.plot(time, pos, label=label)
elif dimensions == 2:
ax.set_aspect('equal')
ax.set_xlabel('x')
ax.set_ylabel('y')
for pos in positions[selection]:
ax.plot(pos[:, 1], pos[:, 0], label=label)
else:
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlabel('z')
for pos in positions[selection]:
ax.plot(pos[:, 2], pos[:, 1], pos[:, 0], label=label)
if label is not None:
ax.legend()
if ax_ is None:
pyplot.show()
def plot_msd(msd, d, dimensions=None, ax=None, labels=('simulation', 'fit')):
"""Plot MSD and line fit."""
duration = msd.shape[0]
time = numpy.arange(duration)
ax_ = ax
if ax is None:
fig = pyplot.figure()
ax = fig.add_subplot(111)
ax.set_title('MSD and line fit')
ax.set_xlabel('time')
ax.set_ylabel('MSD')
try:
label0, label1 = labels
except Exception:
label0, label1 = None, None
if dimensions:
ax.plot(time, msd, '.', label=label0)
ax.plot(time, d * 2 * dimensions * time, '-', lw=3, label=label1)
else:
ax.plot(time, msd, label=label0)
if label0 or label1:
ax.legend()
if ax_ is None:
pyplot.show()
def plot_particle_counts(particle_counts, ax=None, label=None):
"""Plot number of detected particles over time."""
duration = particle_counts.shape[0]
time = numpy.arange(duration)
ax_ = ax
if ax is None:
fig = pyplot.figure()
ax = fig.add_subplot(111)
ax.set_title('Detected particles')
ax.set_xlabel('time')
ax.set_ylabel('count')
ax.plot(time, particle_counts, label=label)
if label:
ax.legend()
if ax_ is None:
pyplot.show()
def example_nd_simulations():
"""Compare diffusion in 1, 2, and 3 dimensions."""
# create three empty plots
plots = []
for _ in range(3):
fig = pyplot.figure()
plots.append(fig.add_subplot(111))
# iterate over dimensions 1 to 3
for dimensions in range(1, 4):
# define simulation parameters
simulation_args = {
'dimensions': dimensions,
'duration': 2500,
'sampling_period': 1000,
'number_particles': 1000,
'diffusion_speed': 10,
'positions_init': positions_init_origin,
'positions_init_args': {},
'diffusion_model': diffusion_model_unconstrained,
'diffusion_model_args': {},
}
particle_counter_args = {
'counter_position': (0,) * dimensions,
'counter_shape': (10,) * dimensions,
}
# run simulation of model
positions = simulate_diffusion(**simulation_args)
# count particles
particle_counts, particles_counted = particle_counter_box(
positions, **particle_counter_args
)
# analyze positions and counted particles
msd, D = calculate_msd_d(positions)
# plot results of simulation and analysis
label = f'{dimensions}D'
plot_positions(positions, 0, ax=plots[0], label=label)
plot_particle_counts(particle_counts, ax=plots[1], label=label)
plot_msd(
msd,
D,
ax=plots[2],
labels=(
f'{label} D={D * simulation_args["sampling_period"]:.3f}',
None,
),
)
pyplot.show()
%time example_nd_simulations()
CPU times: total: 297 ms Wall time: 409 ms
Constrained diffusion in a box¶
To demonstrate diffusion under restrictions, particles are placed in a box. Particles can diffuse unconstrained within the box. Three different cases are explored when particles hit a box boundary:
- particles cannot leave the box.
- particles leaving the box immediately enter on the opposite side.
- particles leaving the box cannot re-enter (they are "absorbed" by the boundaries).
Hook functions are defined for each case and passed to the simulation function.
def diffusion_model_box_closed(random_moves, box_shape):
"""Diffusion in a box. Particles cannot leave box."""
positions = random_moves.copy() # modify in-place
number_particles, duration, dimensions = positions.shape
lower = tuple(-s // 2 for s in box_shape)
upper = tuple(s // 2 + s % 2 - 1 for s in box_shape)
for time in range(1, duration):
temp = positions[:, time]
temp += positions[:, time - 1] # cumsum axis=1
numpy.clip(temp, lower, upper, out=temp)
return positions
def diffusion_model_box_cyclic(random_moves, box_shape):
"""Diffusion in a box. Particles leaving box enter on opposite side."""
positions = random_moves.copy()
number_particles, duration, dimensions = positions.shape
lower = tuple(-s // 2 for s in box_shape)
for time in range(1, duration):
temp = positions[:, time]
temp += positions[:, time - 1] # cumsum axis=1
temp -= lower
temp %= box_shape
temp += lower
return positions
def diffusion_model_box_absorbing(random_moves, box_shape):
"""Diffusion in a box. Particles leaving box never re-enter."""
number_particles, duration, dimensions = random_moves.shape
positions = numpy.cumsum(random_moves, axis=1)
lower = tuple(-s // 2 for s in box_shape)
upper = tuple(s // 2 + s % 2 for s in box_shape)
leaving_box = numpy.argmax(
numpy.any((positions < lower) | (positions >= upper), axis=-1), axis=-1
)
for particle in range(number_particles):
index = leaving_box[particle]
if 0 < index < duration - 1:
positions[particle, index:] = positions[particle, index + 1]
return positions
def example_box_model_simulations(dimensions=3):
"""Compare box diffusion models."""
# define simulation parameters
box_model_args = {'box_shape': (20,) * dimensions}
simulation_args = {
'dimensions': dimensions,
'duration': 2500,
'sampling_period': 1000,
'number_particles': 1000,
'diffusion_speed': 10,
# 'diffusion_model': will be set later
'diffusion_model_args': box_model_args,
'positions_init': positions_init_origin,
'positions_init_args': {},
}
particle_counter_args = {
'counter_position': (0,) * dimensions,
'counter_shape': (10,) * dimensions,
}
# create empty plots
fig = pyplot.figure(figsize=(7.0, 7.0) if dimensions > 2 else None)
plot0 = fig.add_subplot(111, projection='3d' if dimensions > 2 else None)
fig = pyplot.figure()
plot1 = fig.add_subplot(111)
fig = pyplot.figure()
plot2 = fig.add_subplot(111)
# iterate over diffusion models
for diffusion_model in (
diffusion_model_unconstrained,
diffusion_model_box_closed,
diffusion_model_box_cyclic,
diffusion_model_box_absorbing,
):
# run simulation of model
positions = simulate_diffusion(
diffusion_model=diffusion_model, **simulation_args
)
# count particles
particle_counts, particles_counted = particle_counter_box(
positions, **particle_counter_args
)
# analyze positions and counted particles
msd, D = calculate_msd_d(positions)
# plot results of simulation and analysis
model = diffusion_model.__name__[16:]
plot_positions(positions, ax=plot0, label=model)
plot_particle_counts(particle_counts, ax=plot1, label=model)
plot_msd(
msd,
D,
ax=plot2,
labels=(
f'{model} D={D * simulation_args["sampling_period"]:.3f}',
None,
),
)
pyplot.show()
%time example_box_model_simulations()
CPU times: total: 531 ms Wall time: 798 ms
Membrane raft diffusion model¶
A membrane raft is approximated by a cylinder of a certain radius (in y, x dimensions) and thickness (in z dimension). It is placed at the origin. Particles moving into the cylinder are retained for some sampling periods and then released on the negative z side. A particle counter detector, elongated in the positive z direction, is placed at a distance in the y dimension (x=0, z=0).
The average trajectory of all detected particles, the number of particles in the observation volume over time and the MSD over time are plotted.
@numba.jit(nopython=True)
def diffusion_model_raft(random_moves, raft_shape, raft_delay):
"""Membrane raft diffusion model."""
positions = random_moves.copy()
number_particles, duration, dimensions = positions.shape
assert dimensions == 3
assert raft_delay >= 0
length, radius = raft_shape
radius *= radius
for particle in range(number_particles):
zyx = positions[particle, 0].copy()
t = 1
while t < duration:
zyx += positions[particle, t]
if (
zyx[0] >= 0
and zyx[0] < length
and zyx[1] * zyx[1] + zyx[2] * zyx[2] < radius
):
# particle entered raft, glue it for `raft_delay`
zyx[0] = 0
positions[particle, t : t + raft_delay] = zyx
t += raft_delay
if t