import emcee import numpy as np from scipy.stats import lognorm, norm class Bayesian: def __init__(self, y, comment_list=None): if comment_list is None: comment_list = [] self.y = y self.comment_list = comment_list # LOQ default settings self.LOQ_lower = -1 # (2.4 hours) self.LOQ_upper = 3 # 1000 days # prior default settings self.prior_mu_mean = 1.5 self.prior_mu_std = 2 self.prior_sigma_mean = 0.4 self.prior_sigma_std = 0.4 self.lower_limit_sigma = 0.2 # EMCEE defaults self.nwalkers = 10 self.iterations = 2000 self.burn_in = 100 ndim = 2 # number of dimensions (mean, std) # backend = emcee.backends.HDFBackend("backend.h5") # backend.reset(self.nwalkers, ndim) self.sampler = emcee.EnsembleSampler(self.nwalkers, ndim, self.logPosterior) self.posterior_mu = None self.posterior_sigma = None def get_censored_values_only(self): censored_values = [] for i, comment in enumerate(self.comment_list): if comment in ["<", ">"]: censored_values.append(self.y[i]) elif self.y[i] > self.LOQ_upper or self.y[i] < self.LOQ_lower: censored_values.append(self.y[i]) return censored_values # Class functions def determine_LOQ(self): """ Determines if the LOQ is upper or lower, and the value (if not default) :return: upper_LOQ , lower_LOQ """ censored_values = self.get_censored_values_only() # Find upper LOQ upper_LOQ = np.nan # bigger than global LOQ if max(self.y) >= self.LOQ_upper: upper_LOQ = self.LOQ_upper # case if exactly 365 days elif max(self.y) == 2.562: # 365 days upper_LOQ = 2.562 self.LOQ_upper = upper_LOQ # case if "bigger than" indication in comments elif ">" in self.comment_list: i = 0 while i < len(self.y): if self.y[i] == min(censored_values) and self.comment_list[i] == ">": self.LOQ_upper = self.y[i] break i += 1 # Find lower LOQ lower_LOQ = np.nan # smaller than global LOQ if min(self.y) <= self.LOQ_lower: lower_LOQ = self.LOQ_lower # case if exactly 1 day elif min(self.y) == 0: # 1 day lower_LOQ = 0 self.LOQ_lower = 0 # case if "smaller than" indication in comments elif "<" in self.comment_list: i = 0 while i < len(self.y): if self.y[i] == max(censored_values) and self.comment_list[i] == "<": self.LOQ_lower = self.y[i] break i += 1 return upper_LOQ, lower_LOQ def logLikelihood(self, theta, sigma): """ Likelihood function (the probability of a dataset (mean, std) given the model parameters) Convert not censored observations into type ’numeric’ :param theta: mean half-life value to be evaluated :param sigma: std half-life value to be evaluated :return: log_likelihood """ upper_LOQ, lower_LOQ = self.determine_LOQ() n_censored_upper = 0 n_censored_lower = 0 y_not_cen = [] if np.isnan(upper_LOQ) and np.isnan(lower_LOQ): y_not_cen = self.y else: for i in self.y: if np.isnan(upper_LOQ) and i >= upper_LOQ: # censor above threshold n_censored_upper += 1 if np.isnan(lower_LOQ) and i <= lower_LOQ: # censor below threshold n_censored_lower += 1 else: # do not censor y_not_cen.append(i) LL_left_cen = 0 LL_right_cen = 0 LL_not_cen = 0 # likelihood for not censored observations if n_censored_lower > 0: # loglikelihood for left censored observations LL_left_cen = n_censored_lower * norm.logcdf( lower_LOQ, loc=theta, scale=sigma ) # cumulative distribution function CDF if n_censored_upper > 0: # loglikelihood for right censored observations LL_right_cen = n_censored_upper * norm.logsf( upper_LOQ, loc=theta, scale=sigma ) # survival function (1-CDF) if len(y_not_cen) > 0: # loglikelihood for uncensored values LL_not_cen = sum( norm.logpdf(y_not_cen, loc=theta, scale=sigma) ) # probability density function PDF return LL_left_cen + LL_not_cen + LL_right_cen def get_prior_probability_sigma(self, sigma): # convert mean and sd to logspace parameters, to see this formula check # https://en.wikipedia.org/wiki/Log-normal_distribution under Method of moments section temp = 1 + (self.prior_sigma_std / self.prior_sigma_mean) ** 2 meanlog = self.prior_sigma_mean / np.sqrt(temp) sdlog = np.sqrt(np.log(temp)) # calculate of logpdf of sigma norm_pdf_sigma = lognorm.logpdf(sigma, s=sdlog, loc=self.lower_limit_sigma, scale=meanlog) return norm_pdf_sigma def get_prior_probability_theta(self, theta): norm_pdf_theta = norm.logpdf(theta, loc=self.prior_mu_mean, scale=self.prior_mu_std) return norm_pdf_theta def logPrior(self, par): """ Obtain prior loglikelihood of [theta, sigma] :param par: par = [theta,sigma] :return: loglikelihood """ # calculate the mean and standard deviation in the log-space norm_pdf_mean = self.get_prior_probability_theta(par[0]) norm_pdf_std = self.get_prior_probability_sigma(par[1]) log_norm_pdf = [norm_pdf_mean, norm_pdf_std] return sum(log_norm_pdf) def logPosterior(self, par): """ Obtain posterior loglikelihood :param par: [theta, sigma] :return: posterior loglikelihood """ logpri = self.logPrior(par) if not np.isfinite(logpri): return -np.inf loglikelihood = self.logLikelihood(par[0], par[1]) return logpri + loglikelihood def get_posterior_distribution(self): """ Sample posterior distribution and get median of mean and std samples :return: posterior half-life mean and std """ if self.posterior_mu: return self.posterior_mu, self.posterior_sigma # Sampler parameters ndim = 2 # number of dimensions (mean,std) p0 = abs(np.random.randn(self.nwalkers, ndim)) # only positive starting numbers (for std) # Sample distribution self.sampler.run_mcmc(p0, self.iterations) # get chain and log_prob in one-dimensional array (merged chains with burn-in) samples = self.sampler.get_chain(flat=True, discard=100) # get median mean and std self.posterior_mu = np.median(samples[:, 0]) self.posterior_sigma = np.median(samples[:, 1]) return self.posterior_mu, self.posterior_sigma # Utility functions def get_normal_distribution(x, mu, sig): return np.exp(-np.power(x - mu, 2.0) / (2 * np.power(sig, 2.0)))