[Feature] PEPPER in enviPath (#332)

Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#332
This commit is contained in:
2026-03-06 22:11:22 +13:00
parent 6e00926371
commit c6ff97694d
43 changed files with 3793 additions and 371 deletions

196
pepper/impl/bayesian.py Normal file
View File

@ -0,0 +1,196 @@
import emcee
import numpy as np
from scipy.stats import lognorm, norm
class Bayesian:
def __init__(self, y, comment_list=None):
if comment_list is None:
comment_list = []
self.y = y
self.comment_list = comment_list
# LOQ default settings
self.LOQ_lower = -1 # (2.4 hours)
self.LOQ_upper = 3 # 1000 days
# prior default settings
self.prior_mu_mean = 1.5
self.prior_mu_std = 2
self.prior_sigma_mean = 0.4
self.prior_sigma_std = 0.4
self.lower_limit_sigma = 0.2
# EMCEE defaults
self.nwalkers = 10
self.iterations = 2000
self.burn_in = 100
ndim = 2 # number of dimensions (mean, std)
# backend = emcee.backends.HDFBackend("backend.h5")
# backend.reset(self.nwalkers, ndim)
self.sampler = emcee.EnsembleSampler(self.nwalkers, ndim, self.logPosterior)
self.posterior_mu = None
self.posterior_sigma = None
def get_censored_values_only(self):
censored_values = []
for i, comment in enumerate(self.comment_list):
if comment in ["<", ">"]:
censored_values.append(self.y[i])
elif self.y[i] > self.LOQ_upper or self.y[i] < self.LOQ_lower:
censored_values.append(self.y[i])
return censored_values
# Class functions
def determine_LOQ(self):
"""
Determines if the LOQ is upper or lower, and the value (if not default)
:return: upper_LOQ , lower_LOQ
"""
censored_values = self.get_censored_values_only()
# Find upper LOQ
upper_LOQ = np.nan
# bigger than global LOQ
if max(self.y) >= self.LOQ_upper:
upper_LOQ = self.LOQ_upper
# case if exactly 365 days
elif max(self.y) == 2.562: # 365 days
upper_LOQ = 2.562
self.LOQ_upper = upper_LOQ
# case if "bigger than" indication in comments
elif ">" in self.comment_list:
i = 0
while i < len(self.y):
if self.y[i] == min(censored_values) and self.comment_list[i] == ">":
self.LOQ_upper = self.y[i]
break
i += 1
# Find lower LOQ
lower_LOQ = np.nan
# smaller than global LOQ
if min(self.y) <= self.LOQ_lower:
lower_LOQ = self.LOQ_lower
# case if exactly 1 day
elif min(self.y) == 0: # 1 day
lower_LOQ = 0
self.LOQ_lower = 0
# case if "smaller than" indication in comments
elif "<" in self.comment_list:
i = 0
while i < len(self.y):
if self.y[i] == max(censored_values) and self.comment_list[i] == "<":
self.LOQ_lower = self.y[i]
break
i += 1
return upper_LOQ, lower_LOQ
def logLikelihood(self, theta, sigma):
"""
Likelihood function (the probability of a dataset (mean, std) given the model parameters)
Convert not censored observations into type numeric
:param theta: mean half-life value to be evaluated
:param sigma: std half-life value to be evaluated
:return: log_likelihood
"""
upper_LOQ, lower_LOQ = self.determine_LOQ()
n_censored_upper = 0
n_censored_lower = 0
y_not_cen = []
if np.isnan(upper_LOQ) and np.isnan(lower_LOQ):
y_not_cen = self.y
else:
for i in self.y:
if np.isnan(upper_LOQ) and i >= upper_LOQ: # censor above threshold
n_censored_upper += 1
if np.isnan(lower_LOQ) and i <= lower_LOQ: # censor below threshold
n_censored_lower += 1
else: # do not censor
y_not_cen.append(i)
LL_left_cen = 0
LL_right_cen = 0
LL_not_cen = 0
# likelihood for not censored observations
if n_censored_lower > 0: # loglikelihood for left censored observations
LL_left_cen = n_censored_lower * norm.logcdf(
lower_LOQ, loc=theta, scale=sigma
) # cumulative distribution function CDF
if n_censored_upper > 0: # loglikelihood for right censored observations
LL_right_cen = n_censored_upper * norm.logsf(
upper_LOQ, loc=theta, scale=sigma
) # survival function (1-CDF)
if len(y_not_cen) > 0: # loglikelihood for uncensored values
LL_not_cen = sum(
norm.logpdf(y_not_cen, loc=theta, scale=sigma)
) # probability density function PDF
return LL_left_cen + LL_not_cen + LL_right_cen
def get_prior_probability_sigma(self, sigma):
# convert mean and sd to logspace parameters, to see this formula check
# https://en.wikipedia.org/wiki/Log-normal_distribution under Method of moments section
temp = 1 + (self.prior_sigma_std / self.prior_sigma_mean) ** 2
meanlog = self.prior_sigma_mean / np.sqrt(temp)
sdlog = np.sqrt(np.log(temp))
# calculate of logpdf of sigma
norm_pdf_sigma = lognorm.logpdf(sigma, s=sdlog, loc=self.lower_limit_sigma, scale=meanlog)
return norm_pdf_sigma
def get_prior_probability_theta(self, theta):
norm_pdf_theta = norm.logpdf(theta, loc=self.prior_mu_mean, scale=self.prior_mu_std)
return norm_pdf_theta
def logPrior(self, par):
"""
Obtain prior loglikelihood of [theta, sigma]
:param par: par = [theta,sigma]
:return: loglikelihood
"""
# calculate the mean and standard deviation in the log-space
norm_pdf_mean = self.get_prior_probability_theta(par[0])
norm_pdf_std = self.get_prior_probability_sigma(par[1])
log_norm_pdf = [norm_pdf_mean, norm_pdf_std]
return sum(log_norm_pdf)
def logPosterior(self, par):
"""
Obtain posterior loglikelihood
:param par: [theta, sigma]
:return: posterior loglikelihood
"""
logpri = self.logPrior(par)
if not np.isfinite(logpri):
return -np.inf
loglikelihood = self.logLikelihood(par[0], par[1])
return logpri + loglikelihood
def get_posterior_distribution(self):
"""
Sample posterior distribution and get median of mean and std samples
:return: posterior half-life mean and std
"""
if self.posterior_mu:
return self.posterior_mu, self.posterior_sigma
# Sampler parameters
ndim = 2 # number of dimensions (mean,std)
p0 = abs(np.random.randn(self.nwalkers, ndim)) # only positive starting numbers (for std)
# Sample distribution
self.sampler.run_mcmc(p0, self.iterations)
# get chain and log_prob in one-dimensional array (merged chains with burn-in)
samples = self.sampler.get_chain(flat=True, discard=100)
# get median mean and std
self.posterior_mu = np.median(samples[:, 0])
self.posterior_sigma = np.median(samples[:, 1])
return self.posterior_mu, self.posterior_sigma
# Utility functions
def get_normal_distribution(x, mu, sig):
return np.exp(-np.power(x - mu, 2.0) / (2 * np.power(sig, 2.0)))