Source code for MCintegration.utils

# utils.py
# Utility functions and classes for Monte Carlo integration.
# This file includes the RAvg class for running averages and error estimation,
# along with various utility functions.

import torch
from torch import nn
import numpy as np
import gvar
import sys

# Constants for numerical stability
# Small but safe non-zero value
MINVAL = 10 ** (sys.float_info.min_10_exp + 50)
MAXVAL = 10 ** (sys.float_info.max_10_exp - 50)  # Large but safe value
_VECTOR_TYPES = [np.ndarray, list]


[docs] class RAvg(gvar.GVar): """ Running Average class that extends gvar.GVar. This class maintains a running average of measurements and keeps track of errors, providing statistical analysis of the integration results. """ def __init__(self, weighted=True, itn_results=None, sum_neval=0): """ Initialize a Running Average object. Args: weighted (bool): Whether to use weighted averaging itn_results (list): Initial list of iteration results sum_neval (int): Initial sum of function evaluations """ if weighted: self._wlist = [] self.weighted = True else: self._sum = 0.0 self._varsum = 0.0 self.weighted = False self._count = 0 self._mlist = [] self.itn_results = [] if itn_results is None: super(RAvg, self).__init__( *gvar.gvar(0.0, 0.0).internaldata, ) else: if isinstance(itn_results, bytes): itn_results = gvar.loads(itn_results) for r in itn_results: self.add(r) self.sum_neval = sum_neval
[docs] def update(self, mean, var, last_neval=None): """ Update the running average with a new mean and variance. Args: mean (float): Mean value to add var (float): Variance to add last_neval (int, optional): Number of evaluations for this update """ self.add(gvar.gvar(mean, var**0.5)) if last_neval is not None: self.sum_neval += last_neval
[docs] def add(self, res): """ Add a new result to the running average. Args: res (gvar.GVar): Result to add to the running average """ self.itn_results.append(res) if isinstance(res, gvar.GVarRef): return self._mlist.append(res.mean) if self.weighted: # Weighted average with weights proportional to 1/variance self._wlist.append(1 / (res.var if res.var > MINVAL else MINVAL)) var = 1.0 / np.sum(self._wlist) sdev = np.sqrt(var) mean = np.sum( [w * m for w, m in zip(self._wlist, self._mlist)]) * var super(RAvg, self).__init__(*gvar.gvar(mean, sdev).internaldata) else: # Simple average self._sum += res.mean self._varsum += res.var self._count += 1 mean = self._sum / self._count var = self._varsum / self._count**2 super(RAvg, self).__init__( *gvar.gvar(mean, np.sqrt(var)).internaldata)
[docs] def extend(self, ravg): """ Merge results from another RAvg object after results currently in self. Args: ravg (RAvg): Another RAvg object to merge with this one """ for r in ravg.itn_results: self.add(r) self.sum_neval += ravg.sum_neval
[docs] def __reduce_ex__(self, protocol): """ Support for pickling RAvg objects. Args: protocol (int): The protocol version Returns: tuple: Data for reconstruction """ return ( RAvg, ( self.weighted, gvar.dumps(self.itn_results, protocol=protocol), self.sum_neval, ), )
def _remove_gvars(self, gvlist): """ Create a copy with references to gvars in gvlist removed. Args: gvlist (list): List of gvars to remove Returns: RAvg: New RAvg instance with gvars removed """ tmp = RAvg( weighted=self.weighted, itn_results=self.itn_results, sum_neval=self.sum_neval, ) tmp.itn_results = gvar.remove_gvars(tmp.itn_results, gvlist) tgvar = gvar.gvar_factory() # small cov matrix super(RAvg, tmp).__init__(*tgvar(0, 0).internaldata) return tmp def _distribute_gvars(self, gvlist): """ Create a copy with references to gvars in gvlist. Args: gvlist (list): List of gvars to distribute Returns: RAvg: New RAvg instance with distributed gvars """ return RAvg( weighted=self.weighted, itn_results=gvar.distribute_gvars(self.itn_results, gvlist), sum_neval=self.sum_neval, ) def _chi2(self): """ Calculate chi-squared of the weighted average. Returns: float: chi-squared value """ if len(self.itn_results) <= 1: return 0.0 if self.weighted: wavg = self.mean ans = 0.0 for m, w in zip(self._mlist, self._wlist): ans += (wavg - m) ** 2 * w return ans else: wavg = self.mean ans = np.sum([(m - wavg) ** 2 for m in self._mlist]) / ( self._varsum / self._count ) return ans chi2 = property(_chi2, None, None, "*chi**2* of weighted average.") def _dof(self): """ Calculate degrees of freedom. Returns: int: Degrees of freedom (number of iterations - 1) """ return len(self.itn_results) - 1 dof = property( _dof, None, None, "Number of degrees of freedom in weighted average." ) def _nitn(self): """ Get number of iterations. Returns: int: Number of iterations """ return len(self.itn_results) nitn = property(_nitn, None, None, "Number of iterations.") def _Q(self): """ Calculate Q value (p-value) of the chi-squared. Returns: float: Q value """ return ( gvar.gammaQ(self.dof / 2.0, self.chi2 / 2.0) if self.dof > 0 and self.chi2 >= 0 else float("nan") ) Q = property( _Q, None, None, "*Q* or *p-value* of weighted average's *chi**2*.", ) def _avg_neval(self): """ Calculate average number of evaluations per iteration. Returns: float: Average number of evaluations """ return self.sum_neval / self.nitn if self.nitn > 0 else 0 avg_neval = property( _avg_neval, None, None, "Average number of function evaluations per iteration." )
[docs] def summary(self, weighted=None): """ Produce a summary of the running average statistics. Args: weighted (bool, optional): Whether to use weighted averaging Returns: str: Summary string with statistics """ if weighted is None: weighted = self.weighted acc = RAvg(weighted=weighted) linedata = [] for i, res in enumerate(self.itn_results): acc.add(res) if i > 0: chi2_dof = acc.chi2 / acc.dof Q = acc.Q else: chi2_dof = 0.0 Q = 1.0 itn = "%3d" % (i + 1) integral = "%-15s" % res wgtavg = "%-15s" % acc chi2dof = "%8.2f" % (acc.chi2 / acc.dof if i != 0 else 0.0) Q = "%8.2f" % (acc.Q if i != 0 else 1.0) linedata.append((itn, integral, wgtavg, chi2dof, Q)) nchar = 5 * [0] for data in linedata: for i, d in enumerate(data): if len(d) > nchar[i]: nchar[i] = len(d) fmt = "%%%ds %%-%ds %%-%ds %%%ds %%%ds\n" % tuple(nchar) if weighted: ans = fmt % ("itn", "integral", "wgt average", "chi2/dof", "Q") else: ans = fmt % ("itn", "integral", "average", "chi2/dof", "Q") ans += len(ans[:-1]) * "-" + "\n" for data in linedata: ans += fmt % data return ans
[docs] def converged(self, rtol, atol): """ Check if the running average has converged within tolerance. Args: rtol (float): Relative tolerance atol (float): Absolute tolerance Returns: bool: True if converged, False otherwise """ return self.sdev < atol + rtol * abs(self.mean)
def __mul__(xx, yy): if type(yy) in _VECTOR_TYPES: return NotImplemented # let ndarray handle it elif isinstance(xx, RAvg): resx = gvar.gvar(xx.mean, xx.sdev) if isinstance(yy, RAvg): resy = gvar.gvar(yy.mean, yy.sdev) return RAvg( weighted=xx.weighted, itn_results=[resx * resy], sum_neval=xx.sum_neval, ) else: return RAvg( weighted=xx.weighted, itn_results=[resx * yy], sum_neval=xx.sum_neval, ) elif isinstance(yy, RAvg): resy = gvar.gvar(yy.mean, yy.sdev) return RAvg( weighted=yy.weighted, itn_results=[xx * resy], sum_neval=yy.sum_neval, ) else: return NotImplemented def __truediv__(xx, yy): if type(yy) in _VECTOR_TYPES: return NotImplemented # let ndarray handle it elif isinstance(xx, RAvg): resx = gvar.gvar(xx.mean, xx.sdev) if isinstance(yy, RAvg): resy = gvar.gvar(yy.mean, yy.sdev) return RAvg( weighted=xx.weighted, itn_results=[resx / resy], sum_neval=xx.sum_neval, ) else: return RAvg( weighted=xx.weighted, itn_results=[resx / yy], sum_neval=xx.sum_neval, ) elif isinstance(yy, RAvg): resy = gvar.gvar(yy.mean, yy.sdev) return RAvg( weighted=yy.weighted, itn_results=[xx / resy], sum_neval=yy.sum_neval, ) else: return NotImplemented
[docs] def set_seed(seed): """ Set random seed for reproducibility. Args: seed (int): Random seed to set """ np.random.seed(seed) torch.manual_seed(seed)
[docs] def get_device(): """ Get the best available device (CUDA GPU if available, otherwise CPU). Returns: torch.device: The selected device """ return torch.device("cuda" if torch.cuda.is_available() else "cpu")