import importlib.resources import logging import math import os import pickle from collections import defaultdict from typing import List import numpy as np import polars as pl import yaml from joblib import Parallel, delayed from scipy.cluster import hierarchy from scipy.spatial.distance import squareform from scipy.stats import spearmanr from sklearn.feature_selection import VarianceThreshold from sklearn.gaussian_process import GaussianProcessRegressor from sklearn.pipeline import Pipeline from sklearn.preprocessing import FunctionTransformer, MinMaxScaler from .bayesian import Bayesian from .descriptors import Mordred class Pepper: def __init__(self, config_path=None, random_state=42): self.random_state = random_state if config_path is None: config_path = importlib.resources.files("pepper.impl.config").joinpath( "regressor_settings_singlevalue_soil_paper_GPR_optimized.yml" ) with open(config_path, "r") as file: regressor_settings = yaml.safe_load(file) if len(regressor_settings) > 1: logging.warning( f"More than one regressor config found in {config_path}, using the first one" ) self.regressor_settings = regressor_settings[list(regressor_settings.keys())[0]] if "kernel" in self.regressor_settings["regressor_params"]: from sklearn.gaussian_process.kernels import ConstantKernel, Matern # noqa: F401 # We could hard-code the kernels they have, maybe better than using eval self.regressor_settings["regressor_params"]["kernel"] = eval( self.regressor_settings["regressor_params"]["kernel"] ) # We assume the YAML has the key regressor containing a regressor name self.regressor = self.get_regressor_by_name(self.regressor_settings["regressor"]) if "regressor_params" in self.regressor_settings: # Set params if any are given self.regressor.set_params(**self.regressor_settings["regressor_params"]) # TODO we could make this configurable self.descriptors = Mordred() self.descriptor_subset = None self.min_max_scaler = MinMaxScaler().set_output(transform="polars") self.feature_preselector = Pipeline( [ ( "variance_threshold", VarianceThreshold(threshold=0.02).set_output(transform="polars"), ), # Feature selection based on variance threshold ( "custom_feature_selection", FunctionTransformer( func=self.remove_highly_correlated_features, validate=False, kw_args={"corr_method": "spearman", "cluster_threshold": 0.01}, ).set_output(transform="polars"), ), ] ) def get_regressor_by_name(self, regressor_string): """ Load regressor function from a regressor name :param regressor_string: name of regressor as defined in config file (function name with parentheses) :return: Regressor object """ # if regressor_string == 'RandomForestRegressor': # return RandomForestRegressor(random_state=self.random_state) # elif regressor_string == 'GradientBoostingRegressor': # return GradientBoostingRegressor(random_state=self.random_state) # elif regressor_string == 'AdaBoostRegressor': # return AdaBoostRegressor(random_state=self.random_state) # elif regressor_string == 'MLPRegressor': # return MLPRegressor(random_state=self.random_state) # elif regressor_string == 'SVR': # return SVR() # elif regressor_string == 'KNeighborsRegressor': # return KNeighborsRegressor() if regressor_string == "GaussianProcessRegressor": return GaussianProcessRegressor(random_state=self.random_state) # elif regressor_string == 'DecisionTreeRegressor': # return DecisionTreeRegressor(random_state=self.random_state) # elif regressor_string == 'Ridge': # return Ridge(random_state=self.random_state) # elif regressor_string == 'SGDRegressor': # return SGDRegressor(random_state=self.random_state) # elif regressor_string == 'KernelRidge': # return KernelRidge() # elif regressor_string == 'LinearRegression': # return LinearRegression() # elif regressor_string == 'LSVR': # return SVR(kernel='linear') # Linear Support Vector Regressor else: raise NotImplementedError( f"No regressor type defined for regressor_string = {regressor_string}" ) def train_model(self, train_data, preprocess=True): """ Fit self.regressor and preprocessors. train_data is a pl.DataFrame """ if preprocess: # Compute the mean and std of half-lives per structure train_data = self.preprocess_data(train_data) # train_data structure: # columns = [ # "structure_id", # "smiles", # "dt50_log", # "dt50_bayesian_mean", # "dt50_bayesian_std", # ] + self.descriptors.get_descriptor_names() # only select descriptor features for feature preselector df = train_data[self.descriptors.get_descriptor_names()] # Remove columns having at least None, nan, inf, "" value df = Pepper.keep_clean_columns(df) # Scale and Remove highly correlated features as well as features having a low variance x_train_normal = self.min_max_scaler.fit_transform(df) x_train_normal = self.feature_preselector.fit_transform(x_train_normal) # Store subset, as this is the input used for prediction self.descriptor_subset = x_train_normal.columns y_train = train_data["dt50_bayesian_mean"].to_numpy() y_train_std = train_data["dt50_bayesian_std"].to_numpy() self.regressor.set_params(alpha=y_train_std) self.regressor.fit(x_train_normal, y_train) return self, train_data @staticmethod def keep_clean_columns(df: pl.DataFrame) -> pl.DataFrame: """ Filters out columns from the DataFrame that contain null values, NaN, or infinite values. This static method takes a DataFrame as input and evaluates each of its columns to determine if the column contains invalid values. Columns that have null values, NaN, or infinite values are excluded from the resulting DataFrame. The method is especially useful for cleaning up a dataset by keeping only the valid columns. Parameters: df (polars.DataFrame): The input DataFrame to be cleaned. Returns: polars.DataFrame: A DataFrame containing only columns without null, NaN, or infinite values. """ valid_cols = [] for col in df.columns: s = df[col] # Check nulls has_null = s.null_count() > 0 # Check NaN and inf only for numeric columns if s.dtype.is_numeric(): has_nan = s.is_nan().any() has_inf = s.is_infinite().any() else: has_nan = False has_inf = False if not (has_null or has_nan or has_inf): valid_cols.append(col) return df.select(valid_cols) def preprocess_data(self, dataset): groups = [group for group in dataset.group_by("structure_id")] # Unless explicitly set compute everything serial if os.environ.get("N_PEPPER_THREADS", 1) > 1: results = Parallel(n_jobs=os.environ["N_PEPPER_THREADS"])( delayed(compute_bayes_per_group)(group[1]) for group in dataset.group_by("structure_id") ) else: results = [] for g in groups: results.append(compute_bayes_per_group(g[1])) bayes_stats = pl.concat(results, how="vertical") dataset = dataset.join(bayes_stats, on="structure_id", how="left") # Remove duplicates after calculating mean, std dataset = dataset.unique(subset="structure_id") # Calculate and normalise features, make a "desc" column with the features dataset = dataset.with_columns( pl.col("smiles") .map_elements( self.descriptors.get_molecule_descriptors, return_dtype=pl.List(pl.Float64) ) .alias("desc") ) # If a SMILES fails to get desc it is removed dataset = dataset.filter(pl.col("desc").is_not_null() & (pl.col("desc").list.len() > 0)) # Flatten the features into the dataset dataset = dataset.with_columns( pl.col("desc").list.to_struct(fields=self.descriptors.get_descriptor_names()) ).unnest("desc") return dataset def predict_batch(self, batch: List[str], is_smiles: bool = True) -> List[List[float | None]]: if is_smiles: rows = [self.descriptors.get_molecule_descriptors(smiles) for smiles in batch] else: rows = batch # Create Dataframe with all descriptors initial_desc_rows_df = pl.DataFrame( data=rows, schema=self.descriptors.get_descriptor_names(), orient="row" ) # Before checking for invalid values per row, select only required columns initial_desc_rows_df = initial_desc_rows_df.select( list(self.min_max_scaler.feature_names_in_) ) to_pad = [] adjusted_rows = [] for i, row in enumerate(initial_desc_rows_df.rows()): # neither infs nor nans are found -> rows seems to be valid input if row and not any(math.isinf(x) for x in row) and not any(math.isnan(x) for x in row): adjusted_rows.append(row) else: to_pad.append(i) if adjusted_rows: desc_rows_df = pl.DataFrame( data=adjusted_rows, schema=list(self.min_max_scaler.feature_names_in_), orient="row" ) x_normal = self.min_max_scaler.transform(desc_rows_df) x_normal = x_normal[self.descriptor_subset] res = self.regressor.predict(x_normal, return_std=True) # Convert to lists res = [list(res[0]), list(res[1])] # If we had rows containing bad input (inf, nan) insert Nones at the correct position if to_pad: for i in to_pad: res[0].insert(i, None) res[1].insert(i, None) return res else: return [[None] * len(batch), [None] * len(batch)] @staticmethod def remove_highly_correlated_features( X_train, corr_method: str = "spearman", cluster_threshold: float = 0.01, ignore=False, ): if ignore: return X_train # pass else: # Using spearmanr from scipy to achieve pandas.corr in polars corr = spearmanr(X_train, axis=0).statistic # Ensure the correlation matrix is symmetric corr = (corr + corr.T) / 2 np.fill_diagonal(corr, 1) corr = np.nan_to_num(corr) # code from https://scikit-learn.org/stable/auto_examples/inspection/ # plot_permutation_importance_multicollinear.html # We convert the correlation matrix to a distance matrix before performing # hierarchical clustering using Ward's linkage. distance_matrix = 1 - np.abs(corr) dist_linkage = hierarchy.ward(squareform(distance_matrix)) cluster_ids = hierarchy.fcluster(dist_linkage, cluster_threshold, criterion="distance") cluster_id_to_feature_ids = defaultdict(list) for idx, cluster_id in enumerate(cluster_ids): cluster_id_to_feature_ids[cluster_id].append(idx) my_selected_features = [v[0] for v in cluster_id_to_feature_ids.values()] X_train_sel = X_train[:, my_selected_features] return X_train_sel def save_model(self, path): with open(path, "wb") as save_file: pickle.dump(self, save_file, protocol=5) @staticmethod def load_model(path) -> "Pepper": with open(path, "rb") as load_file: return pickle.load(load_file) def compute_bayes_per_group(group): """Get mean and std using bayesian""" mean, std = Bayesian(group["dt50_log"]).get_posterior_distribution() return pl.DataFrame( { "structure_id": [group["structure_id"][0]], "dt50_bayesian_mean": [mean], "dt50_bayesian_std": [std], } )