Source code for MCintegration.integrators

# integrators.py
# This file contains Monte Carlo integration methods and related utilities.
# It implements various integration techniques including plain Monte Carlo and MCMC.

from typing import Callable
import torch
from MCintegration.utils import RAvg, get_device
from MCintegration.maps import Configuration, CompositeMap
from MCintegration.base import Uniform, EPSILON, LinearMap
import numpy as np
from warnings import warn

import os
import torch.distributed as dist
import socket


[docs] def get_ip() -> str: """ Get the current machine's IP address. Used for distributed computing setup. Returns: str: IP address of the current machine """ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) # Doesn't need to be reachable return s.getsockname()[0]
[docs] def get_open_port() -> int: """ Find an available open port on the current machine. Used for distributed computing setup. Returns: int: Available port number """ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("", 0)) return s.getsockname()[1]
[docs] def setup(backend="gloo"): """ Set up distributed processing environment for multi-GPU/CPU computation. Args: backend (str): Backend to use for distributed computing ('gloo', 'nccl', etc.) """ # get IDs of reserved GPU distributed_init_method = f"tcp://{get_ip()}:{get_open_port()}" dist.init_process_group( backend=backend ) # , init_method=distributed_init_method, self.world_size = int(os.environ["self.world_size"]), self.rank = int(os.environ["self.rank"])) # init_method='env://', # self.world_size=int(os.environ["self.world_size"]), # self.rank=int(os.environ['SLURM_PROCID'])) torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
[docs] def cleanup(): """ Clean up distributed processing environment. Should be called after distributed computing is no longer needed. """ dist.destroy_process_group()
[docs] class Integrator: """ Base class for all integrators. This class is designed to handle integration tasks over a specified domain (bounds) using a sampling method (q0) and optional transformation maps. """ def __init__( self, bounds, f: Callable, f_dim=1, maps=None, q0=None, batch_size=1000, device=None, dtype=None, ): """ Initialize the Integrator. Args: bounds (list): Integration bounds as list of (min, max) tuples for each dimension f (Callable): Integrand function to evaluate f_dim (int): Dimension of the output of f maps (Map, optional): Transformation maps for importance sampling q0 (BaseDistribution, optional): Base sampling distribution (default: Uniform) batch_size (int): Number of points to sample at once device (str or torch.device): Device for computation dtype (torch.dtype): Data type for computation """ self.f = f self.f_dim = f_dim if maps: if dtype is None or dtype == maps.dtype: self.dtype = maps.dtype else: raise ValueError( "Data type of the variables of integrator should be same as maps." ) if device is None: self.device = maps.device else: self.device = device maps.to(self.device) maps.device = self.device else: if dtype is None: self.dtype = torch.float32 else: self.dtype = dtype if device is None: self.device = get_device() else: self.device = device if isinstance(bounds, (list, np.ndarray)): self.bounds = torch.tensor(bounds, dtype=self.dtype, device=self.device) elif isinstance(bounds, torch.Tensor): self.bounds = bounds.to(dtype=self.dtype, device=self.device) else: raise TypeError("bounds must be a list, NumPy array, or torch.Tensor.") assert self.bounds.shape[1] == 2, "bounds must be a 2D array" linear_map = LinearMap( self.bounds[:, 1] - self.bounds[:, 0], self.bounds[:, 0], device=self.device, dtype=self.dtype, ) if maps: self.maps = CompositeMap( [maps, linear_map], device=self.device, dtype=self.dtype ) else: self.maps = linear_map self.dim = self.bounds.shape[0] if not q0: q0 = Uniform(self.dim, device=self.device, dtype=self.dtype) self.q0 = q0 self.batch_size = batch_size self.f = f self.f_dim = f_dim if dist.is_initialized(): self.rank = dist.get_rank() self.world_size = dist.get_world_size() else: # Fallback defaults when distributed is not initialized self.rank = 0 self.world_size = 1
[docs] def __call__(self, **kwargs): """ Call the integrator to perform the integration. This should be implemented by subclasses. Returns: Integration result """ raise NotImplementedError("Subclasses must implement this method.")
[docs] def sample(self, config, **kwargs): """ Generate samples for integration. Args: config (Configuration): The configuration object to store samples **kwargs: Additional parameters """ config.u, config.detJ = self.q0.sample_with_detJ(config.batch_size) if not self.maps: config.x[:] = config.u else: config.x[:], detj = self.maps.forward_with_detJ(config.u) config.detJ *= detj self.f(config.x, config.fx)
[docs] def statistics(self, means, vars, neval=None): """ Calculate statistics of the integration results. Args: means (torch.Tensor): Mean values from integration vars (torch.Tensor): Variance values from integration neval (int, optional): Number of function evaluations Returns: tuple: Statistics of the integration including mean, error, and chi-squared """ nblock = means.shape[0] f_dim = means.shape[1] nblock_total = nblock * self.world_size weighted = nblock_total < 32 if self.world_size > 1: # Gather mean and variance statistics to self.rank 0 if self.rank == 0: gathered_means = [ torch.zeros_like(means) for _ in range(self.world_size) ] if weighted: gathered_vars = [ torch.zeros_like(vars) for _ in range(self.world_size) ] dist.gather(means, gathered_means if self.rank == 0 else None, dst=0) if weighted: dist.gather(vars, gathered_vars if self.rank == 0 else None, dst=0) if self.rank == 0: results = np.array([RAvg() for _ in range(f_dim)]) if weighted: for i in range(f_dim): for iblock in range(nblock): for igpu, (_mean, _var) in enumerate( zip(gathered_means, gathered_vars) ): results[i].update( _mean[iblock, i].item(), _var[iblock, i].item(), neval, ) else: for i in range(f_dim): _means = torch.empty( nblock_total, dtype=self.dtype, device=self.device ) for igpu in range(self.world_size): _means[igpu * nblock : (igpu + 1) * nblock] = ( gathered_means[igpu][:, i] ) results[i].update( _means.mean().item(), _means.var().item() / nblock_total, neval * nblock, ) else: return None else: results = np.array([RAvg() for _ in range(f_dim)]) if weighted: for i in range(f_dim): for iblock in range(nblock): results[i].update( means[iblock, i].item(), vars[iblock, i].item(), neval, ) else: for i in range(f_dim): results[i].update( means[:, i].mean().item(), means[:, i].var().item() / nblock_total, neval * nblock, ) return results
[docs] class MonteCarlo(Integrator): """ Plain Monte Carlo integration method. Uses uniform or importance sampling to estimate integrals. """ def __init__( self, bounds, f: Callable, f_dim=1, maps=None, q0=None, batch_size: int = 1000, device=None, dtype=None, ): """ Initialize the Monte Carlo integrator. Args: bounds (list): Integration bounds as list of (min, max) tuples for each dimension f (Callable): Integrand function to evaluate f_dim (int): Dimension of the output of f maps (Map, optional): Transformation maps for importance sampling q0 (BaseDistribution, optional): Base sampling distribution (default: Uniform) batch_size (int): Number of points to sample at once device (str or torch.device): Device for computation dtype (torch.dtype): Data type for computation """ super(MonteCarlo, self).__init__( bounds, f, f_dim, maps, q0, batch_size, device, dtype ) self._rangebounds = self.bounds[:, 1] - self.bounds[:, 0]
[docs] def __call__(self, neval, nblock=32, verbose=-1, **kwargs): """ Perform Monte Carlo integration. Args: neval (int): Number of function evaluations nblock (int): Number of blocks for error estimation verbose (int): Verbosity level (-1: silent, 0: minimal, 1+: more details) **kwargs: Additional parameters passed to the sample method Returns: RAvg or list of RAvg: Integration results with error estimates """ neval = neval // self.world_size neval = -(-neval // self.batch_size) * self.batch_size epoch = neval // self.batch_size epoch_perblock = epoch // nblock if epoch_perblock == 0: warn( f"neval is too small to be divided into {nblock} blocks. Reset nblock to {epoch}." ) epoch_perblock = 1 nblock = epoch else: nblock = epoch // epoch_perblock if verbose > 0: print( f"nblock = {nblock}, n_steps_perblock = {epoch_perblock}, batch_size = {self.batch_size}, actual neval = {self.batch_size * epoch_perblock * nblock}" ) config = Configuration( self.batch_size, self.dim, self.f_dim, self.device, self.dtype ) epoch = neval // self.batch_size integ_values = torch.zeros( (self.batch_size, self.f_dim), dtype=self.dtype, device=self.device ) means = torch.zeros((nblock, self.f_dim), dtype=self.dtype, device=self.device) vars = torch.zeros_like(means) for iblock in range(nblock): for _ in range(epoch_perblock): self.sample(config) config.fx.mul_(config.detJ.unsqueeze_(1)) integ_values += config.fx / epoch_perblock means[iblock, :] = integ_values.mean(dim=0) vars[iblock, :] = integ_values.var(dim=0) / self.batch_size integ_values.zero_() results = self.statistics(means, vars, epoch_perblock * self.batch_size) if self.rank == 0: if self.f_dim == 1: # return results[0] / self._rangebounds.prod() return results[0] else: # return results / self._rangebounds.prod().item() return results
[docs] def random_walk(dim, device, dtype, u, **kwargs): """ Random walk proposal distribution for MCMC. Args: dim (int): Dimensionality of the space device (str or torch.device): Computation device dtype (torch.dtype): Data type u (torch.Tensor): Current position **kwargs: Additional parameters Returns: torch.Tensor: Proposed new position """ step_size = kwargs.get("step_size", 0.2) step_sizes = torch.ones(dim, device=device) * step_size step = torch.empty(dim, device=device, dtype=dtype).uniform_(-1, 1) * step_sizes new_u = (u + step) % 1.0 return new_u
[docs] def uniform(dim, device, dtype, u, **kwargs): """ Uniform proposal distribution for MCMC. Args: dim (int): Dimensionality of the space device (str or torch.device): Computation device dtype (torch.dtype): Data type u (torch.Tensor): Current position (ignored) **kwargs: Additional parameters Returns: torch.Tensor: Uniformly sampled new position """ return torch.rand_like(u)
[docs] def gaussian(dim, device, dtype, u, **kwargs): """ Gaussian proposal distribution for MCMC. Args: dim (int): Dimensionality of the space device (str or torch.device): Computation device dtype (torch.dtype): Data type u (torch.Tensor): Current position **kwargs: Additional parameters Returns: torch.Tensor: Proposed new position from Gaussian distribution centered at u """ mean = kwargs.get("mean", torch.zeros_like(u)) std = kwargs.get("std", torch.ones_like(u)) return torch.normal(mean, std)
[docs] class MarkovChainMonteCarlo(Integrator): """ Markov Chain Monte Carlo (MCMC) integration method. Uses Metropolis-Hastings algorithm to generate samples from the target distribution. """ def __init__( self, bounds, f: Callable, f_dim: int = 1, maps=None, q0=None, proposal_dist=None, batch_size: int = 1000, nburnin: int = 10, device=None, dtype=None, ): """ Initialize the MCMC integrator. Args: bounds (list): Integration bounds as list of (min, max) tuples for each dimension f (Callable): Integrand function to evaluate f_dim (int): Dimension of the output of f maps (Map, optional): Transformation maps for importance sampling q0 (BaseDistribution, optional): Base sampling distribution (default: Uniform) proposal_dist (Callable, optional): Proposal distribution for MCMC (default: random_walk) batch_size (int): Number of points to sample at once nburnin (int): Number of burn-in steps before sampling device (str or torch.device): Device for computation dtype (torch.dtype): Data type for computation """ super(MarkovChainMonteCarlo, self).__init__( bounds, f, f_dim, maps, q0, batch_size, device, dtype ) self.nburnin = nburnin if not proposal_dist: self.proposal_dist = uniform else: if not isinstance(proposal_dist, Callable): raise TypeError("proposal_dist must be a callable function.") self.proposal_dist = proposal_dist self._rangebounds = self.bounds[:, 1] - self.bounds[:, 0]
[docs] def sample(self, config, nsteps=1, mix_rate=0.5, **kwargs): """ Generate samples using MCMC. Args: config (Configuration): Configuration object to store samples nsteps (int): Number of MCMC steps mix_rate (float): Mixing rate between previous and new samples (0-1) **kwargs: Additional parameters Returns: Configuration: Updated configuration with new samples """ for _ in range(nsteps): proposed_y = self.proposal_dist( self.dim, self.device, self.dtype, config.u, **kwargs ) proposed_x, new_detJ = self.maps.forward_with_detJ(proposed_y) new_weight = ( mix_rate / new_detJ + (1 - mix_rate) * self.f(proposed_x, config.fx).abs() ) new_weight.masked_fill_(new_weight < EPSILON, EPSILON) acceptance_probs = new_weight / config.weight * new_detJ / config.detJ accept = ( torch.rand(self.batch_size, dtype=self.dtype, device=self.device) <= acceptance_probs ) accept_expanded = accept.unsqueeze(1) config.u.mul_(~accept_expanded).add_(proposed_y * accept_expanded) config.x.mul_(~accept_expanded).add_(proposed_x * accept_expanded) config.weight.mul_(~accept).add_(new_weight * accept) config.detJ.mul_(~accept).add_(new_detJ * accept)
[docs] def __call__( self, neval, mix_rate=0.5, nblock=32, meas_freq: int = 1, verbose=-1, **kwargs, ): """ Perform MCMC integration. Args: neval (int): Number of function evaluations mix_rate (float): Mixing rate between previous and new samples (0-1) nblock (int): Number of blocks for error estimation meas_freq (int): Measurement frequency verbose (int): Verbosity level (-1: silent, 0: minimal, 1+: more details) **kwargs: Additional parameters passed to the sample method Returns: RAvg or list of RAvg: Integration results with error estimates """ neval = neval // self.world_size neval = -(-neval // self.batch_size) * self.batch_size epoch = neval // self.batch_size nsteps_perblock = epoch // nblock if nsteps_perblock == 0: warn( f"neval is too small to be divided into {nblock} blocks. Reset nblock to {epoch}." ) nsteps_perblock = 1 nblock = epoch else: nblock = epoch // nsteps_perblock n_meas_perblock = nsteps_perblock // meas_freq assert n_meas_perblock > 0, ( f"neval ({neval}) should be larger than batch_size * nblock * meas_freq ({self.batch_size} * {nblock} * {meas_freq})" ) if verbose > 0: print( f"nblock = {nblock}, n_meas_perblock = {n_meas_perblock}, meas_freq = {meas_freq}, batch_size = {self.batch_size}, actual neval = {self.batch_size * nsteps_perblock * nblock}" ) config = Configuration( self.batch_size, self.dim, self.f_dim, self.device, self.dtype ) config.u, config.detJ = self.q0.sample_with_detJ(self.batch_size) config.x, detj = self.maps.forward_with_detJ(config.u) config.detJ *= detj config.weight = ( mix_rate / config.detJ + (1 - mix_rate) * self.f(config.x, config.fx).abs_() ) config.weight.masked_fill_(config.weight < EPSILON, EPSILON) for _ in range(self.nburnin): self.sample(config, mix_rate=mix_rate, **kwargs) values = torch.zeros( (self.batch_size, self.f_dim), dtype=self.dtype, device=self.device ) refvalues = torch.zeros(self.batch_size, dtype=self.dtype, device=self.device) means = torch.zeros((nblock, self.f_dim), dtype=self.dtype, device=self.device) vars = torch.zeros_like(means) means_ref = torch.zeros((nblock, 1), dtype=self.dtype, device=self.device) vars_ref = torch.zeros_like(means_ref) for iblock in range(nblock): for _ in range(n_meas_perblock): self.sample(config, meas_freq, mix_rate, **kwargs) self.f(config.x, config.fx) config.fx.div_(config.weight.unsqueeze(1)) values += config.fx / n_meas_perblock refvalues += 1 / (config.detJ * config.weight) / n_meas_perblock means[iblock, :] = values.mean(dim=0) vars[iblock, :] = values.var(dim=0) / self.batch_size means_ref[iblock, 0] = refvalues.mean() vars_ref[iblock, 0] = refvalues.var() / self.batch_size values.zero_() refvalues.zero_() results_unnorm = self.statistics(means, vars, nsteps_perblock * self.batch_size) results_ref = self.statistics( means_ref, vars_ref, nsteps_perblock * self.batch_size ) if self.rank == 0: if self.f_dim == 1: return results_unnorm[0] / results_ref[0] else: return results_unnorm / results_ref