Source code for ADFWI.model.base

import numpy as np
import torch
from torch import Tensor
from abc import abstractmethod
from typing import Optional,Tuple,Union
from ADFWI.utils import gpu2cpu,numpy2tensor


units = {
    "vp"    : "m/s",
    "vs"    : "m/s",
    "rho"   : "kg/m^3",
    "lam"   : "Pa",
    "mu"    : "Pa",
    "eps"   : "none",
    "gamma" : "none",
    "delta" : "none",
    "vs_vp" : "none",
}
eps = 1e-7

[docs]class AbstractModel(torch.nn.Module): """ Abstract model class for FWI models Parameters ---------- ox : float Origin of the model in x-direction (m) oz : float Origin of the model in z-direction (m) dx : float Grid size in x-direction (m) dz : float Grid size in z-direction (m) nx : int Number of grid points in x-direction nz : int Number of grid points in z-direction free_surface : bool, optional Flag for free surface, default is False abc_type : str, optional The type of absorbing boundary condition: PML, Jerjan, and other abc_jerjan_alpha : float, optional The attenuation factor for Jerjan boundary condition nabc : int, optional Number of absorbing boundary cells, default is 20 device : str, optional The running device dtype : dtype, optional The dtypes for PyTorch variable, default is torch.float32 """ def __init__(self, ox:float,oz:float, nx:int ,nz:int, dx:float,dz:float, free_surface:Optional[bool] = False, abc_type:Optional[str] = 'PML', abc_jerjan_alpha:Optional[float]= 0.0053, nabc:Optional[int] = 20, device = 'cpu', dtype = torch.float32 )->None: super().__init__() # initialize the common model parameters self.ox = ox self.oz = oz self.dx = dx self.dz = dz self.nx = nx self.nz = nz self.free_surface = free_surface self.abc_type = abc_type self.abc_jerjan_alpha = abc_jerjan_alpha self.nabc = nabc self.device = device self.dtype = dtype assert self.dx == self.dz, "Model grid size dx and dz must be the same" # set derived model parameters self.x = np.arange(nx)*self.dx + self.ox self.z = np.arange(nz)*self.dz + self.oz # initialize some empty model and associated parameters self.pars = [] self.requires_grad = {} self.lower_bound = {} self.upper_bound = {} def __repr__(self) -> str: """Representation of the model object """ info = f"model with parameters {self.pars}:\n" for par in self.pars: par_min = self.get_model(par).min() par_max = self.get_model(par).max() requires_grad = self.requires_grad[par] lower_bound = self.lower_bound[par] upper_bound = self.upper_bound[par] info += ( f" Model {par:4s}: {par_min:8.2f} - {par_max:8.2f} {units[par]:6s}, " f"requires_grad = {requires_grad}, " f"constrain bound: {lower_bound} - {upper_bound}\n" ) info += f" Model orig: ox = {self.ox:6.2f}, oz = {self.oz:6.2f} m\n" info += f" Model grid: dx = {self.dx:6.2f}, dz = {self.dz:6.2f} m\n" info += f" Model dims: nx = {self.nx:6d}, nz = {self.nz:6d}\n" info += f" Model size: {self.nx * self.nz * len(self.pars)}\n" info += f" Free surface: {self.free_surface}\n" info += f" Absorbing layers: {self.nabc}\n" return info
[docs] @abstractmethod def forward(self, *args, **kwargs) -> Tuple[Tensor, Tensor, Tensor]: """ Forward method of the model class that outputs the elastic parameters of lambda, mu, and buoyancy required for the wave equation propagator. """ raise NotImplementedError("Forward method must be implemented by the subclass")
def _check_bounds(self): """Check the provided model bounds are legal """ for par in self.pars: if self.lower_bound[par] is not None and self.upper_bound[par] is not None: assert ( self.lower_bound[par] < self.upper_bound[par] ), "Lower bound must be smaller than upper bound" # if self.lower_bound[par] is not None: # if self.lower_bound[par] + eps > self.get_model(par).min(): # Warning(f"Lower bound must be larger than minimum value, set to {self.get_model(par).min()}") # self.lower_bound[par] = self.get_model(par).min() - eps # if self.upper_bound[par] is not None: # if self.upper_bound[par] - eps < self.get_model(par).max(): # Warning(f"Upper bound must be smaller than maximum value, set to {self.get_model(par).max()}") # self.upper_bound[par] = self.get_model(par).max() + eps
[docs] def check_dims(self) -> None: """Check the provided model dimensions are legal """ for par in self.pars: assert ( self.get_model(par).shape == (self.nz, self.nx) ), "Model dimensions must be (nz, nx)" return
[docs] def get_model(self, par: str) -> np.ndarray: """Return the model as numpy array """ if par not in self.pars: raise ValueError(f"Parameter {par} not in model") data = getattr(self, par) data = gpu2cpu(data.clone()) return data
[docs] def set_model(self, par: str, model:Optional[Union[np.array,Tensor]]) -> None: """Set the model as nn.Parameter """ if par not in self.pars: raise ValueError(f"Parameter {par} not in model") if model.shape != (self.nz, self.nx): raise ValueError("Model dimensions must be (nz, nx)") model = gpu2cpu(model) model = numpy2tensor(model) setattr(self, par, torch.nn.Parameter(model.to(torch.float32), requires_grad=self.requires_grad[par]))
[docs] def get_bound(self, par: str) -> Tuple[float, float]: """Return the bound of the model """ if par not in self.pars: raise ValueError("Parameter {} not in model".format(par)) m_min = self.lower_bound[par] m_max = self.upper_bound[par] if m_min is None or m_max is None: return [None, None] return [m_min, m_max]
[docs] def get_requires_grad(self, par: str) -> bool: """Return the gradient of the model """ if par not in self.pars: raise ValueError("Parameter {} not in model".format(par)) return self.requires_grad[par]
[docs] def get_grad(self, par: str) -> np.ndarray: """Return the gradient of the model as numpy array """ if par not in self.pars: raise ValueError("Parameter {} not in model".format(par)) # access the model parameter m = getattr(self, par) if m.grad is None: return np.zeros(m.shape) else: return m.grad.cpu().detach().numpy()
[docs] def get_clone_data(self) -> Tuple: """Return the data required for cloning the model """ kwargs = {} for par in self.pars: kwargs[par] = self.get_model(par) kwargs[par + "_bound"] = self.get_bound(par) kwargs[par + "_grad"] = self.get_requires_grad(par) kwargs['ox'] = self.ox kwargs['oz'] = self.oz kwargs['dx'] = self.dx kwargs['dz'] = self.dz kwargs['nx'] = self.nx kwargs['nz'] = self.nz kwargs["free_surface"] = self.free_surface kwargs["nabc"] = self.nabc return kwargs
[docs] def save(self, filename: str) -> None: """Save the model object to a file """ kwargs = self.get_clone_data() # save the model to npz file np.savez(filename, **kwargs) return
[docs] def clip_params(self)->None: """Clip the model parameters to the given bounds """ for par in self.pars: if self.lower_bound[par] is not None and self.upper_bound[par] is not None: m = getattr(self,par) min_value = self.lower_bound[par] max_value = self.upper_bound[par] # clip the model parametrs m.data.clamp_(min_value,max_value) return
[docs] def constrain_range(self, value, min_value, max_value) -> Tensor: """Constrain the value to the range [min_value, max_value] by using the torch.clamp function """ if min_value is not None and max_value is not None: # return torch.clamp(value, min_value, max_value) if torch.isnan(value).any(): # replace nan with the mean value value[torch.isnan(value)] = max_value # value_const[~torch.isnan(value_const)].mean() value = torch.clamp(value, min_value, max_value) if torch.isinf(value).any(): raise ValueError("Value is inf") if (value < 0.0).any(): raise ValueError("Value is negative") if torch.isnan(value).any(): raise ValueError("Value is nan") return value else: return value