Skip to content

Datapipes¤

pdearena.data.datapipes_common ¤

RandomTimeStepConditionedPDETrainData ¤

Bases: IterDataPipe

Randomized data for training conditioned PDEs.

Parameters:

Name Type Description Default
dp IterDataPipe

Data pipe that returns individual PDE trajectories.

required
n_input_scalar_components int

Number of input scalar components.

required
n_input_vector_components int

Number of input vector components.

required
n_output_scalar_components int

Number of output scalar components.

required
n_output_vector_components int

Number of output vector components.

required
trajlen int

Length of a trajectory in the dataset.

required
reweigh bool

Whether to rebalance the dataset so that longer horizon predictions get equal weightage despite there being fewer actual such datapoints in a trajectory. Defaults to True.

True
Source code in pdearena/data/datapipes_common.py
class RandomTimeStepConditionedPDETrainData(dp.iter.IterDataPipe):
    """Randomized data for training conditioned PDEs.

    Args:
        dp (IterDataPipe): Data pipe that returns individual PDE trajectories.
        n_input_scalar_components (int): Number of input scalar components.
        n_input_vector_components (int): Number of input vector components.
        n_output_scalar_components (int): Number of output scalar components.
        n_output_vector_components (int): Number of output vector components.
        trajlen (int): Length of a trajectory in the dataset.
        reweigh (bool, optional): Whether to rebalance the dataset so that longer horizon predictions get equal weightage despite there being fewer actual such datapoints in a trajectory. Defaults to True.
    """

    def __init__(
        self,
        dp,
        n_input_scalar_components: int,
        n_input_vector_components: int,
        n_output_scalar_components: int,
        n_output_vector_components: int,
        trajlen: int,
        reweigh=True,
    ) -> None:
        super().__init__()
        self.dp = dp
        self.n_input_scalar_components = n_input_scalar_components
        self.n_input_vector_components = n_input_vector_components
        self.n_output_scalar_components = n_output_scalar_components
        self.n_output_vector_components = n_output_vector_components

        self.trajlen = trajlen
        self.reweigh = reweigh

    def __iter__(self):
        time_resolution = self.trajlen

        for u, v, cond, grid in self.dp:
            if self.reweigh:
                end_time = random.choices(range(1, time_resolution), k=1)[0]
                start_time = random.choices(range(0, end_time), weights=1 / np.arange(1, end_time + 1), k=1)[0]
            else:
                end_time = torch.randint(low=1, high=time_resolution, size=(1,), dtype=torch.long).item()
                start_time = torch.randint(low=0, high=end_time.item(), size=(1,), dtype=torch.long).item()

            delta_t = end_time - start_time
            yield (
                *datautils.create_time_conditioned_data(
                    self.n_input_scalar_components,
                    self.n_input_vector_components,
                    self.n_output_scalar_components,
                    self.n_output_vector_components,
                    u,
                    v,
                    grid,
                    start_time,
                    end_time,
                    torch.tensor([delta_t]),
                ),
                cond,
            )

RandomizedPDETrainData ¤

Bases: IterDataPipe

Randomized data for training PDEs.

Parameters:

Name Type Description Default
dp IterDataPipe

Data pipe that returns individual PDE trajectories.

required
n_input_scalar_components int

Number of input scalar components.

required
n_input_vector_components int

Number of input vector components.

required
n_output_scalar_components int

Number of output scalar components.

required
n_output_vector_components int

Number of output vector components.

required
trajlen int

Length of a trajectory in the dataset.

required
time_history int

Number of time steps of inputs.

required
time_future int

Number of time steps of outputs.

required
time_gap int

Number of time steps between inputs and outputs.

required
Source code in pdearena/data/datapipes_common.py
class RandomizedPDETrainData(dp.iter.IterDataPipe):
    """Randomized data for training PDEs.

    Args:
        dp (IterDataPipe): Data pipe that returns individual PDE trajectories.
        n_input_scalar_components (int): Number of input scalar components.
        n_input_vector_components (int): Number of input vector components.
        n_output_scalar_components (int): Number of output scalar components.
        n_output_vector_components (int): Number of output vector components.
        trajlen (int): Length of a trajectory in the dataset.
        time_history (int): Number of time steps of inputs.
        time_future (int): Number of time steps of outputs.
        time_gap (int): Number of time steps between inputs and outputs.
    """

    def __init__(
        self,
        dp,
        n_input_scalar_components: int,
        n_input_vector_components: int,
        n_output_scalar_components: int,
        n_output_vector_components: int,
        trajlen: int,
        time_history: int,
        time_future: int,
        time_gap: int,
    ) -> None:
        super().__init__()
        self.dp = dp
        self.n_input_scalar_components = n_input_scalar_components
        self.n_input_vector_components = n_input_vector_components
        self.n_output_scalar_components = n_output_scalar_components
        self.n_output_vector_components = n_output_vector_components
        self.trajlen = trajlen
        self.time_history = time_history
        self.time_future = time_future
        self.time_gap = time_gap

    def __iter__(self):
        for batch in self.dp:
            if len(batch) == 3:
                (u, v, grid) = batch
                cond = None
            elif len(batch) == 4:
                (u, v, cond, grid) = batch
            else:
                raise ValueError(f"Unknown batch length of {len(batch)}.")

            # Length of trajectory
            time_resolution = min(u.shape[0], self.trajlen)
            # Max number of previous points solver can eat
            reduced_time_resolution = time_resolution - self.time_history
            # Number of future points to predict
            max_start_time = reduced_time_resolution - self.time_future - self.time_gap

            # Choose initial random time point at the PDE solution manifold
            start_time = random.choices([t for t in range(max_start_time + 1)], k=1)
            data, targets = datautils.create_data2D(
                self.n_input_scalar_components,
                self.n_input_vector_components,
                self.n_output_scalar_components,
                self.n_output_vector_components,
                u,
                v,
                grid,
                start_time[0],
                self.time_history,
                self.time_future,
                self.time_gap,
            )
            if cond is None and grid is None:
                yield data, targets
            elif cond is not None and grid is None:
                yield data, targets, cond
            else:
                yield data, targets, cond, grid

TimestepConditionedPDEEvalData ¤

Bases: IterDataPipe

Data for evaluation of time conditioned PDEs

Parameters:

Name Type Description Default
dp IterDataPipe

Data pipe that returns individual PDE trajectories.

required
trajlen int

Length of a trajectory in the dataset.

required
delta_t int

Evaluates predictions conditioned at that delta_t.

required
Tip

Make sure delta_t is less than half of trajlen.

Source code in pdearena/data/datapipes_common.py
class TimestepConditionedPDEEvalData(dp.iter.IterDataPipe):
    """Data for evaluation of time conditioned PDEs

    Args:
        dp (torchdata.datapipes.iter.IterDataPipe): Data pipe that returns individual PDE trajectories.
        trajlen (int): Length of a trajectory in the dataset.
        delta_t (int): Evaluates predictions conditioned at that delta_t.

    Tip:
        Make sure `delta_t` is less than half of `trajlen`.
    """

    def __init__(self, dp: dp.iter.IterDataPipe, trajlen: int, delta_t: int) -> None:
        super().__init__()
        self.dp = dp
        self.trajlen = trajlen
        if 2 * delta_t >= self.trajlen:
            raise ValueError("delta_t should be less than half the trajectory length")

        self.delta_t = delta_t

    def __iter__(self):
        for begin in range(self.trajlen - self.delta_t):
            for u, v, cond, grid in self.dp:
                newu = u[begin :: self.delta_t, ...]
                newv = v[begin :: self.delta_t, ...]
                max_start_time = newu.size(0)
                for start in range(max_start_time - 1):
                    end = start + 1
                    data = torch.cat((newu[start : start + 1], newv[start : start + 1]), dim=1).unsqueeze(0)
                    if grid is not None:
                        data = torch.cat((data, grid), dim=1)
                    label = torch.cat((newu[end : end + 1], newv[end : end + 1]), dim=1).unsqueeze(0)
                    if data.size(1) == 0:
                        raise ValueError("Data is empty. Likely indexing issue.")
                    if label.size(1) == 0:
                        raise ValueError("Label is empty. Likely indexing issue.")
                    yield data, label, torch.tensor([self.delta_t]), cond

ZarrLister ¤

Bases: IterDataPipe

Customized lister for zarr files.

Parameters:

Name Type Description Default
root Union[str, Sequence[str], IterDataPipe]

Root directory. Defaults to ".".

'.'

Yields:

Type Description
str

Path to the zarr file.

Source code in pdearena/data/datapipes_common.py
class ZarrLister(dp.iter.IterDataPipe):
    """Customized lister for zarr files.

    Args:
        root (Union[str, Sequence[str], dp.iter.IterDataPipe], optional): Root directory. Defaults to ".".

    Yields:
        (str): Path to the zarr file.
    """

    def __init__(
        self,
        root: Union[str, Sequence[str], dp.iter.IterDataPipe] = ".",
    ) -> None:
        super().__init__()

        if isinstance(root, str):
            root = [root]
        if not isinstance(root, dp.iter.IterDataPipe):
            root = dp.iter.IterableWrapper(root)

        self.datapipe: dp.iter.IterDataPipe = root

    def __iter__(self):
        for path in self.datapipe:
            for dirname in os.listdir(path):
                if dirname.endswith(".zarr"):
                    yield os.path.join(path, dirname)

build_datapipes(pde, data_path, limit_trajectories, usegrid, dataset_opener, lister, sharder, filter_fn, mode, time_history=1, time_future=1, time_gap=0, onestep=False, conditioned=False, delta_t=None, conditioned_reweigh=True) ¤

Build datapipes for training and evaluation.

Parameters:

Name Type Description Default
pde PDEDataConfig

PDE configuration.

required
data_path str

Path to the data.

required
limit_trajectories int

Number of trajectories to use.

required
usegrid bool

Whether to use spatial grid as input.

required
dataset_opener Callable[..., IterDataPipe]

Dataset opener.

required
lister Callable[..., IterDataPipe]

List files.

required
sharder Callable[..., IterDataPipe]

Shard files.

required
filter_fn Callable[..., IterDataPipe]

Filter files.

required
mode str

Mode of the data. ["train", "valid", "test"]

required
time_history int

Number of time steps in the past. Defaults to 1.

1
time_future int

Number of time steps in the future. Defaults to 1.

1
time_gap int

Number of time steps between the past and the future to be skipped. Defaults to 0.

0
onestep bool

Whether to use one-step prediction. Defaults to False.

False
conditioned bool

Whether to use conditioned data. Defaults to False.

False
delta_t Optional[int]

Time step size. Defaults to None. Only used for conditioned data.

None
conditioned_reweigh bool

Whether to reweight conditioned data. Defaults to True.

True

Returns:

Name Type Description
dpipe IterDataPipe

IterDataPipe for training and evaluation.

Source code in pdearena/data/datapipes_common.py
def build_datapipes(
    pde: PDEDataConfig,
    data_path,
    limit_trajectories,
    usegrid: bool,
    dataset_opener: Callable[..., dp.iter.IterDataPipe],
    lister: Callable[..., dp.iter.IterDataPipe],
    sharder: Callable[..., dp.iter.IterDataPipe],
    filter_fn: Callable[..., dp.iter.IterDataPipe],
    mode: str,
    time_history=1,
    time_future=1,
    time_gap=0,
    onestep=False,
    conditioned=False,
    delta_t: Optional[int] = None,
    conditioned_reweigh: bool = True,
):
    """Build datapipes for training and evaluation.

    Args:
        pde (PDEDataConfig): PDE configuration.
        data_path (str): Path to the data.
        limit_trajectories (int): Number of trajectories to use.
        usegrid (bool): Whether to use spatial grid as input.
        dataset_opener (Callable[..., dp.iter.IterDataPipe]): Dataset opener.
        lister (Callable[..., dp.iter.IterDataPipe]): List files.
        sharder (Callable[..., dp.iter.IterDataPipe]): Shard files.
        filter_fn (Callable[..., dp.iter.IterDataPipe]): Filter files.
        mode (str): Mode of the data. ["train", "valid", "test"]
        time_history (int, optional): Number of time steps in the past. Defaults to 1.
        time_future (int, optional): Number of time steps in the future. Defaults to 1.
        time_gap (int, optional): Number of time steps between the past and the future to be skipped. Defaults to 0.
        onestep (bool, optional): Whether to use one-step prediction. Defaults to False.
        conditioned (bool, optional): Whether to use conditioned data. Defaults to False.
        delta_t (Optional[int], optional): Time step size. Defaults to None. Only used for conditioned data.
        conditioned_reweigh (bool, optional): Whether to reweight conditioned data. Defaults to True.

    Returns:
        dpipe (IterDataPipe): IterDataPipe for training and evaluation.
    """
    dpipe = lister(
        data_path,
    ).filter(filter_fn=filter_fn)
    if mode == "train":
        dpipe = dpipe.shuffle()

    dpipe = dataset_opener(
        sharder(dpipe),
        mode=mode,
        limit_trajectories=limit_trajectories,
        usegrid=usegrid,
    )
    if mode == "train":
        # Make sure that in expectation we have seen all the data despite randomization
        dpipe = dpipe.cycle(pde.trajlen)

    if mode == "train":
        # Training data is randomized
        if conditioned:
            dpipe = RandomTimeStepConditionedPDETrainData(
                dpipe,
                pde.n_scalar_components,
                pde.n_vector_components,
                pde.n_scalar_components,
                pde.n_vector_components,
                pde.trajlen,
                conditioned_reweigh,
            )
        else:
            dpipe = RandomizedPDETrainData(
                dpipe,
                pde.n_scalar_components,
                pde.n_vector_components,
                pde.n_scalar_components,
                pde.n_vector_components,
                pde.trajlen,
                time_history,
                time_future,
                time_gap,
            )
    else:
        # Evaluation data is not randomized.
        if conditioned and onestep:
            assert delta_t is not None
            dpipe = TimestepConditionedPDEEvalData(dpipe, pde.trajlen, delta_t)
        elif onestep:
            dpipe = PDEEvalTimeStepData(
                dpipe,
                pde.n_scalar_components,
                pde.n_vector_components,
                pde.n_scalar_components,
                pde.n_vector_components,
                pde.trajlen,
                time_history,
                time_future,
                time_gap,
            )
        # For multi-step prediction, the original data pipe can be used without change.

    return dpipe

pdearena.data.oned.datapipes.kuramotosivashinsky1d ¤

KuramotoSivashinskyDatasetOpener ¤

Bases: IterDataPipe

DataPipe to load the Kuramoto-Sivashinsky dataset.

Parameters:

Name Type Description Default
dp IterDataPipe

List of hdf5 files containing Kolmogorov data.

required
mode str

Mode to load data from. Can be one of train, val, test.

required
preload bool

Whether to preload all data into memory. Defaults to True.

True
allow_shuffle bool

Whether to shuffle the data, recommended when preloading data. Defaults to True.

True
resolution int

Which resolution to load. Defaults to full data resolution.

-1
usegrid bool

Whether to output spatial grid or not. Defaults to False.

False

Yields:

Type Description
Tuple[Tensor, Tensor, Optional[Tensor], Optional[Tensor]]

Tuple containing particle scalar field, velocity vector field, and optionally buoyancy force parameter value and spatial grid.

Source code in pdearena/data/oned/datapipes/kuramotosivashinsky1d.py
class KuramotoSivashinskyDatasetOpener(dp.iter.IterDataPipe):
    """DataPipe to load the Kuramoto-Sivashinsky dataset.

    Args:
        dp (dp.iter.IterDataPipe): List of `hdf5` files containing Kolmogorov data.
        mode (str): Mode to load data from. Can be one of `train`, `val`, `test`.
        preload (bool, optional): Whether to preload all data into memory. Defaults to True.
        allow_shuffle (bool, optional): Whether to shuffle the data, recommended when preloading data. Defaults to True.
        resolution (int, optional): Which resolution to load. Defaults to full data resolution.
        usegrid (bool, optional): Whether to output spatial grid or not. Defaults to False.

    Yields:
        (Tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor], Optional[torch.Tensor]]): Tuple containing particle scalar field, velocity vector field, and optionally buoyancy force parameter value  and spatial grid.
    """

    def __init__(
        self,
        dp,
        mode: str,
        preload: bool = True,
        allow_shuffle: bool = True,
        resolution: int = -1,
        usegrid: bool = False,
        time_step: int = 4,
        **kwargs,
    ) -> None:
        super().__init__()
        self.dp = dp
        self.mode = mode
        self.allow_shuffle = allow_shuffle
        self.dtype = np.float32
        self.resolution = resolution
        self.usegrid = usegrid
        self.time_step = time_step
        print(f"Loading {mode} data from {len([p for p in dp])} files.")
        self.storage = {}
        if preload:
            for path in self.dp:
                self.storage[path] = self._load_data(path)

    def _load_data(self, path):
        if path in self.storage:
            return self.storage[path]
        else:
            with h5py.File(path, "r") as f:
                data_h5 = f[self.mode]
                data_key = [k for k in data_h5.keys() if k.startswith("pde_")][0]
                data = {
                    "u": torch.tensor(data_h5[data_key][:].astype(self.dtype)),
                    "dt": torch.tensor(data_h5["dt"][:].astype(self.dtype)),
                    "dx": torch.tensor(data_h5["dx"][:].astype(self.dtype)),
                }
                if "v" in data_h5:
                    data["v"] = torch.tensor(data_h5["v"][:].astype(self.dtype))

                data["orig_dt"] = data["dt"].clone()
                if data["u"].ndim == 3:
                    data["u"] = data["u"].unsqueeze(dim=-2)  # Add channel dimension
                # The KS equation is parameterized by [1] the time step between observations
                # (measured in seconds, usually around 0.2), [2] the spatial step between
                # data points in the spatial domain (measured in meters, usually around 0.2),
                # and finally [3] the viscosity parameter (measured in m^2/s, usually between 0.5 - 1.5).
                # We scale these parameters to be in the range [0, 10] to be visible changes in fourier embeds.
                # This accelerates learning and makes it easier for the models to learn the conditional dynamics.
                # Scaling time step.
                if data["dt"].min() > 0.15 and data["dt"].max() < 0.25:
                    data["dt"] = (data["dt"] - 0.15) * 100.0
                else:
                    print(
                        f"WARNING: dt is not in the expected range (min {data['dt'].min()}, max {data['dt'].max()}, mean {data['dt'].mean()}) - scaling may be incorrect."
                    )
                # Scaling spatial step.
                if data["dx"].min() > 0.2 and data["dx"].max() < 0.3:
                    data["dx"] = (data["dx"] - 0.2) * 100.0
                else:
                    print(
                        f"WARNING: dx is not in the expected range (min {data['dx'].min()}, max {data['dx'].max()}, mean {data['dx'].mean()}) - scaling may be incorrect."
                    )
                # Scaling viscosity.
                if "v" in data:
                    if data["v"].min() >= 0.5 and data["v"].max() <= 1.5:
                        data["v"] = (data["v"] - 0.5) * 100.0
                    else:
                        print(
                            f"WARNING: v is not in the expected range (min {data['v'].min()}, max {data['v'].max()}, mean {data['v'].mean()}) - scaling may be incorrect."
                        )

            return data

    def __iter__(self):
        for path in self.dp:
            data = self._load_data(path)
            u = data["u"]
            if u.ndim == 3:
                u = u.unsqueeze(0)
            if self.resolution > 0 and u.shape[-1] > self.resolution:
                step_size = u.shape[-1] // self.resolution
                start_idx = 0 if not self.mode == "train" else np.random.randint(0, step_size)
                u = u[..., start_idx::step_size]
            idxs = np.arange(u.shape[0])
            if self.mode == "train" and self.allow_shuffle:
                np.random.shuffle(idxs)
            for i in range(idxs.shape[0]):
                idx = idxs[i]
                cond = [data["dt"][idx], data["dx"][idx]]
                if "v" in data:
                    cond.append(data["v"][idx])
                if self.usegrid:
                    grid = np.linspace(0, 1, u.shape[-1])
                else:
                    grid = None
                u_sample = u[idx]
                if self.time_step > 1:
                    if self.mode == "train":
                        start_idx = np.random.randint(0, self.time_step)
                    else:
                        start_idx = 0
                    u_sample = u_sample[start_idx :: self.time_step]
                yield u_sample, torch.zeros_like(u_sample[:, 0:0]), torch.tensor(cond)[None], grid

pdearena.data.twod.datapipes.navierstokes2d ¤

NavierStokesDatasetOpener ¤

Bases: IterDataPipe

DataPipe to load Navier-Stokes dataset.

Parameters:

Name Type Description Default
dp IterDataPipe

List of hdf5 files containing Navier-Stokes data.

required
mode str

Mode to load data from. Can be one of train, val, test.

required
limit_trajectories int

Limit the number of trajectories to load from individual hdf5 file. Defaults to None.

None
usegrid bool

Whether to output spatial grid or not. Defaults to False.

False

Yields:

Type Description
Tuple[Tensor, Tensor, Optional[Tensor], Optional[Tensor]]

Tuple containing particle scalar field, velocity vector field, and optionally buoyancy force parameter value and spatial grid.

Source code in pdearena/data/twod/datapipes/navierstokes2d.py
class NavierStokesDatasetOpener(dp.iter.IterDataPipe):
    """DataPipe to load Navier-Stokes dataset.

    Args:
        dp (dp.iter.IterDataPipe): List of `hdf5` files containing Navier-Stokes data.
        mode (str): Mode to load data from. Can be one of `train`, `val`, `test`.
        limit_trajectories (int, optional): Limit the number of trajectories to load from individual `hdf5` file. Defaults to None.
        usegrid (bool, optional): Whether to output spatial grid or not. Defaults to False.

    Yields:
        (Tuple[torch.Tensor, torch.Tensor, Optional[torch.Tensor], Optional[torch.Tensor]]): Tuple containing particle scalar field, velocity vector field, and optionally buoyancy force parameter value  and spatial grid.
    """

    def __init__(self, dp, mode: str, limit_trajectories: Optional[int] = None, usegrid: bool = False, conditioned: bool = False) -> None:
        super().__init__()
        self.dp = dp
        self.mode = mode
        self.limit_trajectories = limit_trajectories
        self.usegrid = usegrid
        self.conditioned = conditioned

    def __iter__(self):
        for path in self.dp:
            with h5py.File(path, "r") as f:
                data = f[self.mode]
                if self.limit_trajectories is None or self.limit_trajectories == -1:
                    num = data["u"].shape[0]
                else:
                    num = self.limit_trajectories

                iter_start = 0
                iter_end = num

                for idx in range(iter_start, iter_end):
                    u = torch.tensor(data["u"][idx])
                    vx = torch.tensor(data["vx"][idx])
                    vy = torch.tensor(data["vy"][idx])
                    if "buo_y" in data and self.conditioned:
                        cond = torch.tensor(data["buo_y"][idx]).unsqueeze(0).float()
                    else:
                        cond = None

                    v = torch.cat((vx[:, None], vy[:, None]), dim=1)

                    if self.usegrid:
                        gridx = torch.linspace(0, 1, data["x"][idx].shape[0])
                        gridy = torch.linspace(0, 1, data["y"][idx].shape[0])
                        gridx = gridx.reshape(
                            1,
                            gridx.size(0),
                            1,
                        ).repeat(
                            1,
                            1,
                            gridy.size(0),
                        )
                        gridy = gridy.reshape(
                            1,
                            1,
                            gridy.size(0),
                        ).repeat(
                            1,
                            gridx.size(1),
                            1,
                        )
                        grid = torch.cat((gridx[:, None], gridy[:, None]), dim=1)
                    else:
                        grid = None
                    yield u.unsqueeze(1).float(), v.float(), cond, grid

pdearena.data.twod.datapipes.shallowwater2d ¤

ShallowWaterDatasetOpener ¤

Bases: IterDataPipe

DataPipe for loading the shallow water dataset

Parameters:

Name Type Description Default
dp IterDataPipe

datapipe with paths to load the dataset from.

required
mode str

"train" or "valid" or "test"

required
limit_trajectories Optional[int]

number of trajectories to load from the dataset

None
usevort bool

whether to use vorticity or velocity. If False, velocity is returned.

False
usegrid bool

whether to use grid or not. If False, no grid is returned.

False
sample_rate int

sample rate for the data. Default is 1, which means no sub-sampling.

1
Note

We manually manage the data distribution across workers and processes. So make sure not to use torchdata's dp.iter.Sharder with this data pipe.

Source code in pdearena/data/twod/datapipes/shallowwater2d.py
class ShallowWaterDatasetOpener(dp.iter.IterDataPipe):
    """DataPipe for loading the shallow water dataset

    Args:
        dp: datapipe with paths to load the dataset from.
        mode (str): "train" or "valid" or "test"
        limit_trajectories: number of trajectories to load from the dataset
        usevort (bool): whether to use vorticity or velocity. If False, velocity is returned.
        usegrid (bool): whether to use grid or not. If False, no grid is returned.
        sample_rate: sample rate for the data. Default is 1, which means no sub-sampling.

    Note:
        We manually manage the data distribution across workers and processes. So make sure not to use `torchdata`'s [dp.iter.Sharder][torchdata.datapipes.iter.ShardingFilter] with this data pipe.
    """

    def __init__(
        self,
        dp: dp.iter.IterDataPipe,
        mode: str,
        limit_trajectories: Optional[int] = None,
        usevort: bool = False,
        usegrid: bool = False,
        sample_rate: int = 1,
    ) -> None:
        super().__init__()
        self.dp = dp
        self.mode = mode
        self.limit_trajectories = limit_trajectories
        self.usevort = usevort
        self.usegrid = usegrid
        self.sample_rate = sample_rate

    def __iter__(self):
        for path in self.dp:
            if "zarr" in path:
                data = xr.open_zarr(path)
            else:
                # Note that this is much slower
                data = xr.open_mfdataset(
                    os.path.join(path, "seed=*", "run*", "output.nc"),
                    concat_dim="b",
                    combine="nested",
                    parallel=True,
                )

            normstat = torch.load(os.path.join(path, "..", "normstats.pt"))
            if self.limit_trajectories is None or self.limit_trajectories == -1:
                num = data["u"].shape[0]
            else:
                num = self.limit_trajectories

            if dist.is_initialized():
                rank = dist.get_rank()
                world_size = dist.get_world_size()
            else:
                rank = 0
                world_size = 1

            # Different workers should be using different trajectory batches
            worker_info = torch.utils.data.get_worker_info()
            if worker_info is not None:
                num_workers_per_dist = min(worker_info.num_workers, num)
                num_shards = num_workers_per_dist * world_size
                per_worker = int(math.floor(num / float(num_shards)))
                wid = rank * num_workers_per_dist + worker_info.id
                iter_start = wid * per_worker
                iter_end = iter_start + per_worker
            else:
                per_dist = int(math.floor(num / float(world_size)))
                iter_start = rank * per_dist
                iter_end = iter_start + per_dist

            for idx in range(iter_start, iter_end):
                if self.usevort:
                    vort = torch.tensor(data["vor"][idx].to_numpy())
                    vort = (vort - normstat["vor"]["mean"]) / normstat["vor"]["std"]
                else:
                    u = torch.tensor(data["u"][idx].to_numpy())
                    v = torch.tensor(data["v"][idx].to_numpy())
                    vecf = torch.cat((u, v), dim=1)

                pres = torch.tensor(data["pres"][idx].to_numpy())

                pres = (pres - normstat["pres"]["mean"]) / normstat["pres"]["std"]
                pres = pres.unsqueeze(1)

                if self.sample_rate > 1:
                    # TODO: hardocded skip_nt=4
                    pres = pres[4 :: self.sample_rate]
                    if self.usevort:
                        vort = vort[4 :: self.sample_rate]
                    else:
                        vecf = vecf[4 :: self.sample_rate]
                if self.usegrid:
                    raise NotImplementedError("Grid not implemented for weather data")
                else:
                    if self.usevort:
                        yield torch.cat((pres, vort), dim=1).float(), None, None, None
                    else:
                        yield pres.float(), vecf.float(), None, None

pdearena.data.threed.datapipes ¤

build_maxwell_datapipes(pde, data_path, limit_trajectories, usegrid, dataset_opener, lister, sharder, filter_fn, mode, time_history=1, time_future=1, time_gap=0, onestep=False) ¤

Build datapipes for training and evaluation.

Parameters:

Name Type Description Default
pde PDEDataConfig

PDE configuration.

required
data_path str

Path to the data.

required
limit_trajectories int

Number of trajectories to use.

required
usegrid bool

Whether to use spatial grid as input.

required
dataset_opener Callable[..., IterDataPipe]

Dataset opener.

required
lister Callable[..., IterDataPipe]

List files.

required
sharder Callable[..., IterDataPipe]

Shard files.

required
filter_fn Callable[..., IterDataPipe]

Filter files.

required
mode str

Mode of the data. ["train", "valid", "test"]

required
time_history int

Number of time steps in the past. Defaults to 1.

1
time_future int

Number of time steps in the future. Defaults to 1.

1
time_gap int

Number of time steps between the past and the future to be skipped. Defaults to 0.

0
onestep bool

Whether to use one-step prediction. Defaults to False.

False

Returns:

Name Type Description
dpipe IterDataPipe

IterDataPipe for training and evaluation.

Source code in pdearena/data/threed/datapipes.py
def build_maxwell_datapipes(
    pde: PDEDataConfig,
    data_path,
    limit_trajectories,
    usegrid: bool,
    dataset_opener: Callable[..., dp.iter.IterDataPipe],
    lister: Callable[..., dp.iter.IterDataPipe],
    sharder: Callable[..., dp.iter.IterDataPipe],
    filter_fn: Callable[..., dp.iter.IterDataPipe],
    mode: str,
    time_history=1,
    time_future=1,
    time_gap=0,
    onestep=False,
):
    """Build datapipes for training and evaluation.

    Args:
        pde (PDEDataConfig): PDE configuration.
        data_path (str): Path to the data.
        limit_trajectories (int): Number of trajectories to use.
        usegrid (bool): Whether to use spatial grid as input.
        dataset_opener (Callable[..., dp.iter.IterDataPipe]): Dataset opener.
        lister (Callable[..., dp.iter.IterDataPipe]): List files.
        sharder (Callable[..., dp.iter.IterDataPipe]): Shard files.
        filter_fn (Callable[..., dp.iter.IterDataPipe]): Filter files.
        mode (str): Mode of the data. ["train", "valid", "test"]
        time_history (int, optional): Number of time steps in the past. Defaults to 1.
        time_future (int, optional): Number of time steps in the future. Defaults to 1.
        time_gap (int, optional): Number of time steps between the past and the future to be skipped. Defaults to 0.
        onestep (bool, optional): Whether to use one-step prediction. Defaults to False.

    Returns:
        dpipe (IterDataPipe): IterDataPipe for training and evaluation.
    """
    dpipe = lister(
        data_path,
    ).filter(filter_fn=filter_fn)
    if mode == "train":
        dpipe = dpipe.shuffle()

    dpipe = dataset_opener(
        sharder(dpipe),
        mode=mode,
        limit_trajectories=limit_trajectories,
        usegrid=usegrid,
    )
    if mode == "train":
        # Make sure that in expectation we have seen all the data despite randomization
        dpipe = dpipe.cycle(pde.trajlen)

    if mode == "train":
        # Training data is randomized
        dpipe = RandomizedPDETrainData3D(
            dpipe,
            pde,
            time_history,
            time_future,
            time_gap,
        )
    else:
        # Evaluation data is not randomized.
        if onestep:
            dpipe = PDEEvalTimeStepData3D(
                dpipe,
                pde,
                time_history,
                time_future,
                time_gap,
            )

    return dpipe