forked from enviPath/enviPy
330 lines
13 KiB
Python
330 lines
13 KiB
Python
import importlib.resources
|
|
import logging
|
|
import math
|
|
import os
|
|
import pickle
|
|
from collections import defaultdict
|
|
from typing import List
|
|
|
|
import numpy as np
|
|
import polars as pl
|
|
import yaml
|
|
from joblib import Parallel, delayed
|
|
from scipy.cluster import hierarchy
|
|
from scipy.spatial.distance import squareform
|
|
from scipy.stats import spearmanr
|
|
from sklearn.feature_selection import VarianceThreshold
|
|
from sklearn.gaussian_process import GaussianProcessRegressor
|
|
from sklearn.pipeline import Pipeline
|
|
from sklearn.preprocessing import FunctionTransformer, MinMaxScaler
|
|
|
|
from .bayesian import Bayesian
|
|
from .descriptors import Mordred
|
|
|
|
|
|
class Pepper:
|
|
def __init__(self, config_path=None, random_state=42):
|
|
self.random_state = random_state
|
|
if config_path is None:
|
|
config_path = importlib.resources.files("pepper.impl.config").joinpath(
|
|
"regressor_settings_singlevalue_soil_paper_GPR_optimized.yml"
|
|
)
|
|
with open(config_path, "r") as file:
|
|
regressor_settings = yaml.safe_load(file)
|
|
if len(regressor_settings) > 1:
|
|
logging.warning(
|
|
f"More than one regressor config found in {config_path}, using the first one"
|
|
)
|
|
self.regressor_settings = regressor_settings[list(regressor_settings.keys())[0]]
|
|
if "kernel" in self.regressor_settings["regressor_params"]:
|
|
from sklearn.gaussian_process.kernels import ConstantKernel, Matern # noqa: F401
|
|
|
|
# We could hard-code the kernels they have, maybe better than using eval
|
|
self.regressor_settings["regressor_params"]["kernel"] = eval(
|
|
self.regressor_settings["regressor_params"]["kernel"]
|
|
)
|
|
# We assume the YAML has the key regressor containing a regressor name
|
|
self.regressor = self.get_regressor_by_name(self.regressor_settings["regressor"])
|
|
if "regressor_params" in self.regressor_settings: # Set params if any are given
|
|
self.regressor.set_params(**self.regressor_settings["regressor_params"])
|
|
|
|
# TODO we could make this configurable
|
|
self.descriptors = Mordred()
|
|
self.descriptor_subset = None
|
|
|
|
self.min_max_scaler = MinMaxScaler().set_output(transform="polars")
|
|
self.feature_preselector = Pipeline(
|
|
[
|
|
(
|
|
"variance_threshold",
|
|
VarianceThreshold(threshold=0.02).set_output(transform="polars"),
|
|
),
|
|
# Feature selection based on variance threshold
|
|
(
|
|
"custom_feature_selection",
|
|
FunctionTransformer(
|
|
func=self.remove_highly_correlated_features,
|
|
validate=False,
|
|
kw_args={"corr_method": "spearman", "cluster_threshold": 0.01},
|
|
).set_output(transform="polars"),
|
|
),
|
|
]
|
|
)
|
|
|
|
def get_regressor_by_name(self, regressor_string):
|
|
"""
|
|
Load regressor function from a regressor name
|
|
:param regressor_string: name of regressor as defined in config file (function name with parentheses)
|
|
:return: Regressor object
|
|
"""
|
|
# if regressor_string == 'RandomForestRegressor':
|
|
# return RandomForestRegressor(random_state=self.random_state)
|
|
# elif regressor_string == 'GradientBoostingRegressor':
|
|
# return GradientBoostingRegressor(random_state=self.random_state)
|
|
# elif regressor_string == 'AdaBoostRegressor':
|
|
# return AdaBoostRegressor(random_state=self.random_state)
|
|
# elif regressor_string == 'MLPRegressor':
|
|
# return MLPRegressor(random_state=self.random_state)
|
|
# elif regressor_string == 'SVR':
|
|
# return SVR()
|
|
# elif regressor_string == 'KNeighborsRegressor':
|
|
# return KNeighborsRegressor()
|
|
if regressor_string == "GaussianProcessRegressor":
|
|
return GaussianProcessRegressor(random_state=self.random_state)
|
|
# elif regressor_string == 'DecisionTreeRegressor':
|
|
# return DecisionTreeRegressor(random_state=self.random_state)
|
|
# elif regressor_string == 'Ridge':
|
|
# return Ridge(random_state=self.random_state)
|
|
# elif regressor_string == 'SGDRegressor':
|
|
# return SGDRegressor(random_state=self.random_state)
|
|
# elif regressor_string == 'KernelRidge':
|
|
# return KernelRidge()
|
|
# elif regressor_string == 'LinearRegression':
|
|
# return LinearRegression()
|
|
# elif regressor_string == 'LSVR':
|
|
# return SVR(kernel='linear') # Linear Support Vector Regressor
|
|
else:
|
|
raise NotImplementedError(
|
|
f"No regressor type defined for regressor_string = {regressor_string}"
|
|
)
|
|
|
|
def train_model(self, train_data, preprocess=True):
|
|
"""
|
|
Fit self.regressor and preprocessors. train_data is a pl.DataFrame
|
|
"""
|
|
if preprocess:
|
|
# Compute the mean and std of half-lives per structure
|
|
train_data = self.preprocess_data(train_data)
|
|
|
|
# train_data structure:
|
|
# columns = [
|
|
# "structure_id",
|
|
# "smiles",
|
|
# "dt50_log",
|
|
# "dt50_bayesian_mean",
|
|
# "dt50_bayesian_std",
|
|
# ] + self.descriptors.get_descriptor_names()
|
|
|
|
# only select descriptor features for feature preselector
|
|
df = train_data[self.descriptors.get_descriptor_names()]
|
|
|
|
# Remove columns having at least None, nan, inf, "" value
|
|
df = Pepper.keep_clean_columns(df)
|
|
|
|
# Scale and Remove highly correlated features as well as features having a low variance
|
|
x_train_normal = self.min_max_scaler.fit_transform(df)
|
|
x_train_normal = self.feature_preselector.fit_transform(x_train_normal)
|
|
|
|
# Store subset, as this is the input used for prediction
|
|
self.descriptor_subset = x_train_normal.columns
|
|
|
|
y_train = train_data["dt50_bayesian_mean"].to_numpy()
|
|
y_train_std = train_data["dt50_bayesian_std"].to_numpy()
|
|
|
|
self.regressor.set_params(alpha=y_train_std)
|
|
self.regressor.fit(x_train_normal, y_train)
|
|
|
|
return self, train_data
|
|
|
|
@staticmethod
|
|
def keep_clean_columns(df: pl.DataFrame) -> pl.DataFrame:
|
|
"""
|
|
Filters out columns from the DataFrame that contain null values, NaN, or infinite values.
|
|
|
|
This static method takes a DataFrame as input and evaluates each of its columns to determine
|
|
if the column contains invalid values. Columns that have null values, NaN, or infinite values
|
|
are excluded from the resulting DataFrame. The method is especially useful for cleaning up a
|
|
dataset by keeping only the valid columns.
|
|
|
|
Parameters:
|
|
df (polars.DataFrame): The input DataFrame to be cleaned.
|
|
|
|
Returns:
|
|
polars.DataFrame: A DataFrame containing only columns without null, NaN, or infinite values.
|
|
"""
|
|
valid_cols = []
|
|
|
|
for col in df.columns:
|
|
s = df[col]
|
|
|
|
# Check nulls
|
|
has_null = s.null_count() > 0
|
|
|
|
# Check NaN and inf only for numeric columns
|
|
if s.dtype.is_numeric():
|
|
has_nan = s.is_nan().any()
|
|
has_inf = s.is_infinite().any()
|
|
else:
|
|
has_nan = False
|
|
has_inf = False
|
|
|
|
if not (has_null or has_nan or has_inf):
|
|
valid_cols.append(col)
|
|
|
|
return df.select(valid_cols)
|
|
|
|
def preprocess_data(self, dataset):
|
|
groups = [group for group in dataset.group_by("structure_id")]
|
|
|
|
# Unless explicitly set compute everything serial
|
|
if os.environ.get("N_PEPPER_THREADS", 1) > 1:
|
|
results = Parallel(n_jobs=os.environ["N_PEPPER_THREADS"])(
|
|
delayed(compute_bayes_per_group)(group[1])
|
|
for group in dataset.group_by("structure_id")
|
|
)
|
|
else:
|
|
results = []
|
|
for g in groups:
|
|
results.append(compute_bayes_per_group(g[1]))
|
|
|
|
bayes_stats = pl.concat(results, how="vertical")
|
|
dataset = dataset.join(bayes_stats, on="structure_id", how="left")
|
|
|
|
# Remove duplicates after calculating mean, std
|
|
dataset = dataset.unique(subset="structure_id")
|
|
|
|
# Calculate and normalise features, make a "desc" column with the features
|
|
dataset = dataset.with_columns(
|
|
pl.col("smiles")
|
|
.map_elements(
|
|
self.descriptors.get_molecule_descriptors, return_dtype=pl.List(pl.Float64)
|
|
)
|
|
.alias("desc")
|
|
)
|
|
|
|
# If a SMILES fails to get desc it is removed
|
|
dataset = dataset.filter(pl.col("desc").is_not_null() & (pl.col("desc").list.len() > 0))
|
|
|
|
# Flatten the features into the dataset
|
|
dataset = dataset.with_columns(
|
|
pl.col("desc").list.to_struct(fields=self.descriptors.get_descriptor_names())
|
|
).unnest("desc")
|
|
|
|
return dataset
|
|
|
|
def predict_batch(self, batch: List[str], is_smiles: bool = True) -> List[List[float | None]]:
|
|
if is_smiles:
|
|
rows = [self.descriptors.get_molecule_descriptors(smiles) for smiles in batch]
|
|
else:
|
|
rows = batch
|
|
|
|
# Create Dataframe with all descriptors
|
|
initial_desc_rows_df = pl.DataFrame(
|
|
data=rows, schema=self.descriptors.get_descriptor_names(), orient="row"
|
|
)
|
|
|
|
# Before checking for invalid values per row, select only required columns
|
|
initial_desc_rows_df = initial_desc_rows_df.select(
|
|
list(self.min_max_scaler.feature_names_in_)
|
|
)
|
|
|
|
to_pad = []
|
|
adjusted_rows = []
|
|
for i, row in enumerate(initial_desc_rows_df.rows()):
|
|
# neither infs nor nans are found -> rows seems to be valid input
|
|
if row and not any(math.isinf(x) for x in row) and not any(math.isnan(x) for x in row):
|
|
adjusted_rows.append(row)
|
|
else:
|
|
to_pad.append(i)
|
|
|
|
if adjusted_rows:
|
|
desc_rows_df = pl.DataFrame(
|
|
data=adjusted_rows, schema=list(self.min_max_scaler.feature_names_in_), orient="row"
|
|
)
|
|
x_normal = self.min_max_scaler.transform(desc_rows_df)
|
|
x_normal = x_normal[self.descriptor_subset]
|
|
|
|
res = self.regressor.predict(x_normal, return_std=True)
|
|
|
|
# Convert to lists
|
|
res = [list(res[0]), list(res[1])]
|
|
|
|
# If we had rows containing bad input (inf, nan) insert Nones at the correct position
|
|
if to_pad:
|
|
for i in to_pad:
|
|
res[0].insert(i, None)
|
|
res[1].insert(i, None)
|
|
|
|
return res
|
|
|
|
else:
|
|
return [[None] * len(batch), [None] * len(batch)]
|
|
|
|
@staticmethod
|
|
def remove_highly_correlated_features(
|
|
X_train,
|
|
corr_method: str = "spearman",
|
|
cluster_threshold: float = 0.01,
|
|
ignore=False,
|
|
):
|
|
if ignore:
|
|
return X_train
|
|
# pass
|
|
else:
|
|
# Using spearmanr from scipy to achieve pandas.corr in polars
|
|
corr = spearmanr(X_train, axis=0).statistic
|
|
|
|
# Ensure the correlation matrix is symmetric
|
|
corr = (corr + corr.T) / 2
|
|
np.fill_diagonal(corr, 1)
|
|
corr = np.nan_to_num(corr)
|
|
|
|
# code from https://scikit-learn.org/stable/auto_examples/inspection/
|
|
# plot_permutation_importance_multicollinear.html
|
|
# We convert the correlation matrix to a distance matrix before performing
|
|
# hierarchical clustering using Ward's linkage.
|
|
distance_matrix = 1 - np.abs(corr)
|
|
dist_linkage = hierarchy.ward(squareform(distance_matrix))
|
|
|
|
cluster_ids = hierarchy.fcluster(dist_linkage, cluster_threshold, criterion="distance")
|
|
cluster_id_to_feature_ids = defaultdict(list)
|
|
|
|
for idx, cluster_id in enumerate(cluster_ids):
|
|
cluster_id_to_feature_ids[cluster_id].append(idx)
|
|
|
|
my_selected_features = [v[0] for v in cluster_id_to_feature_ids.values()]
|
|
X_train_sel = X_train[:, my_selected_features]
|
|
|
|
return X_train_sel
|
|
|
|
def save_model(self, path):
|
|
with open(path, "wb") as save_file:
|
|
pickle.dump(self, save_file, protocol=5)
|
|
|
|
@staticmethod
|
|
def load_model(path) -> "Pepper":
|
|
with open(path, "rb") as load_file:
|
|
return pickle.load(load_file)
|
|
|
|
|
|
def compute_bayes_per_group(group):
|
|
"""Get mean and std using bayesian"""
|
|
mean, std = Bayesian(group["dt50_log"]).get_posterior_distribution()
|
|
return pl.DataFrame(
|
|
{
|
|
"structure_id": [group["structure_id"][0]],
|
|
"dt50_bayesian_mean": [mean],
|
|
"dt50_bayesian_std": [std],
|
|
}
|
|
)
|