Basic System (#31)

Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#31
This commit is contained in:
2025-07-23 06:47:07 +12:00
parent 49e02ed97d
commit df896878f1
75 changed files with 3821 additions and 1429 deletions

View File

@ -7,13 +7,21 @@ from django.db import transaction
from django.conf import settings as s
from epdb.models import User, Package, UserPackagePermission, GroupPackagePermission, Permission, Group, Setting, \
EPModel, UserSettingPermission, Rule, Pathway, Node, Edge
EPModel, UserSettingPermission, Rule, Pathway, Node, Edge, Compound, Reaction, CompoundStructure
from utilities.chem import FormatConverter
logger = logging.getLogger(__name__)
class UserManager(object):
user_pattern = re.compile(r".*/user/[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}")
@staticmethod
def create_user(username, email, password, *args, **kwargs):
def is_user_url(url: str):
return bool(re.findall(UserManager.user_pattern, url))
@staticmethod
def create_user(username, email, password, set_setting=True, add_to_group=True, *args, **kwargs):
# avoid circular import :S
from .tasks import send_registration_mail
@ -34,6 +42,17 @@ class UserManager(object):
# send email for verification
send_registration_mail.delay(u.pk)
if set_setting:
u.default_setting = Setting.objects.get(global_default=True)
u.save()
if add_to_group:
g = Group.objects.get(public=True, name='enviPath Users')
g.user_member.add(u)
g.save()
u.default_group = g
u.save()
return u
@staticmethod
@ -54,7 +73,16 @@ class UserManager(object):
uuid = user_url.strip().split('/')[-1]
return get_user_model().objects.get(uuid=uuid)
@staticmethod
def writable(current_user, user):
return (current_user == user) or user.is_superuser
class GroupManager(object):
group_pattern = re.compile(r".*/group/[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}")
@staticmethod
def is_group_url(url: str):
return bool(re.findall(GroupManager.group_pattern, url))
@staticmethod
def create_group(current_user, name, description):
@ -110,9 +138,17 @@ class GroupManager(object):
group.save()
@staticmethod
def writable(user, group):
return (user == group.owner) or user.is_superuser
class PackageManager(object):
package_pattern = re.compile(r".*/package/[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$")
package_pattern = re.compile(r".*/package/[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}")
@staticmethod
def is_package_url(url: str):
return bool(re.findall(PackageManager.package_pattern, url))
@staticmethod
def get_reviewed_packages():
@ -120,7 +156,6 @@ class PackageManager(object):
@staticmethod
def readable(user, package):
# TODO Owner!
if UserPackagePermission.objects.filter(package=package, user=user).exists() or \
GroupPackagePermission.objects.filter(package=package, group__in=GroupManager.get_groups(user)) or \
package.reviewed is True or \
@ -131,14 +166,21 @@ class PackageManager(object):
@staticmethod
def writable(user, package):
# TODO Owner!
if UserPackagePermission.objects.filter(package=package, user=user, permission=Permission.WRITE).exists() or \
GroupPackagePermission.objects.filter(package=package, group__in=GroupManager.get_groups(user),
permission=Permission.WRITE) or \
if UserPackagePermission.objects.filter(package=package, user=user, permission=Permission.WRITE[0]).exists() or \
GroupPackagePermission.objects.filter(package=package, group__in=GroupManager.get_groups(user), permission=Permission.WRITE[0]).exists() or \
UserPackagePermission.objects.filter(package=package, user=user, permission=Permission.ALL[0]).exists() or \
user.is_superuser:
return True
return False
@staticmethod
def get_package_lp(package_url):
match = re.findall(PackageManager.package_pattern, package_url)
if match:
package_id = match[0].split('/')[-1]
return Package.objects.get(uuid=package_id)
return None
@staticmethod
def get_package_by_url(user, package_url):
match = re.findall(PackageManager.package_pattern, package_url)
@ -229,8 +271,12 @@ class PackageManager(object):
@staticmethod
@transaction.atomic
def update_permissions(caller: User, package: Package, grantee: Union[User, Group], new_perm: Optional[str]):
if not PackageManager.writable(caller, package):
raise ValueError(f"User {caller} is not allowed to modify permissions on {package}")
caller_perm = None
if not caller.is_superuser:
caller_perm = UserPackagePermission.objects.get(user=caller, package=package).permission
if caller_perm != Permission.ALL[0] and not caller.is_superuser:
raise ValueError(f"Only owner are allowed to modify permissions")
data = {
'package': package,
@ -629,8 +675,6 @@ class PackageManager(object):
return pack
class SettingManager(object):
setting_pattern = re.compile(r".*/setting/[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$")
@ -648,7 +692,8 @@ class SettingManager(object):
def get_setting_by_id(user, setting_id):
s = Setting.objects.get(uuid=setting_id)
if s.global_default or s.public or s.owner == user or user.is_superuser:
if s.global_default or s.public or user.is_superuser or \
UserSettingPermission.objects.filter(user=user, setting=s).exists():
return s
raise ValueError(
@ -697,6 +742,116 @@ class SettingManager(object):
def set_default_setting(user: User, setting: Setting):
pass
class SearchManager(object):
@staticmethod
def search(packages: Union[Package, List[Package]], searchterm: str, mode: str):
match mode:
case 'text':
return SearchManager._search_text(packages, searchterm)
case 'default':
return SearchManager._search_default_smiles(packages, searchterm)
case 'exact':
return SearchManager._search_exact_smiles(packages, searchterm)
case 'canonical':
return SearchManager._search_canonical_smiles(packages, searchterm)
case 'inchikey':
return SearchManager._search_inchikey(packages, searchterm)
case _:
raise ValueError(f"Unknown search mode {mode}!")
@staticmethod
def _search_inchikey(packages: Union[Package, List[Package]], searchterm: str):
from django.db.models import Q
search_cond = Q(inchikey=searchterm)
compound_qs = Compound.objects.filter(Q(package__in=packages) & Q(compoundstructure__inchikey=searchterm)).distinct()
compound_structure_qs = CompoundStructure.objects.filter(Q(compound__package__in=packages) & search_cond)
reactions_qs = Reaction.objects.filter(Q(package__in=packages) & (Q(educts__inchikey=searchterm) | Q(products__inchikey=searchterm))).distinct()
pathway_qs = Pathway.objects.filter(Q(package__in=packages) & (Q(edge__edge_label__educts__inchikey=searchterm) | Q(edge__edge_label__products__inchikey=searchterm))).distinct()
return {
'Compounds': [{'name': c.name, 'description': c.description, 'url': c.url} for c in compound_qs],
'Compound Structures': [{'name': c.name, 'description': c.description, 'url': c.url} for c in compound_structure_qs],
'Reactions': [{'name': r.name, 'description': r.description, 'url': r.url} for r in reactions_qs],
'Pathways': [{'name': p.name, 'description': p.description, 'url': p.url} for p in pathway_qs],
}
@staticmethod
def _search_exact_smiles(packages: Union[Package, List[Package]], searchterm: str):
from django.db.models import Q
search_cond = Q(smiles=searchterm)
compound_qs = Compound.objects.filter(Q(package__in=packages) & Q(compoundstructure__smiles=searchterm)).distinct()
compound_structure_qs = CompoundStructure.objects.filter(Q(compound__package__in=packages) & search_cond)
reactions_qs = Reaction.objects.filter(Q(package__in=packages) & (Q(educts__smiles=searchterm) | Q(products__smiles=searchterm))).distinct()
pathway_qs = Pathway.objects.filter(Q(package__in=packages) & (Q(edge__edge_label__educts__smiles=searchterm) | Q(edge__edge_label__products__smiles=searchterm))).distinct()
return {
'Compounds': [{'name': c.name, 'description': c.description, 'url': c.url} for c in compound_qs],
'Compound Structures': [{'name': c.name, 'description': c.description, 'url': c.url} for c in compound_structure_qs],
'Reactions': [{'name': r.name, 'description': r.description, 'url': r.url} for r in reactions_qs],
'Pathways': [{'name': p.name, 'description': p.description, 'url': p.url} for p in pathway_qs],
}
@staticmethod
def _search_default_smiles(packages: Union[Package, List[Package]], searchterm: str):
from django.db.models import Q
inchi_front = FormatConverter.InChIKey(searchterm)[:14]
search_cond = Q(inchikey__startswith=inchi_front)
compound_qs = Compound.objects.filter(Q(package__in=packages) & Q(compoundstructure__inchikey__startswith=inchi_front)).distinct()
compound_structure_qs = CompoundStructure.objects.filter(Q(compound__package__in=packages) & search_cond)
reactions_qs = Reaction.objects.filter(Q(package__in=packages) & (Q(educts__inchikey__startswith=inchi_front) | Q(products__inchikey__startswith=inchi_front))).distinct()
pathway_qs = Pathway.objects.filter(Q(package__in=packages) & (Q(edge__edge_label__educts__inchikey__startswith=inchi_front) | Q(edge__edge_label__products__inchikey__startswith=inchi_front))).distinct()
return {
'Compounds': [{'name': c.name, 'description': c.description, 'url': c.url} for c in compound_qs],
'Compound Structures': [{'name': c.name, 'description': c.description, 'url': c.url} for c in compound_structure_qs],
'Reactions': [{'name': r.name, 'description': r.description, 'url': r.url} for r in reactions_qs],
'Pathways': [{'name': p.name, 'description': p.description, 'url': p.url} for p in pathway_qs],
}
@staticmethod
def _search_canonical_smiles(packages: Union[Package, List[Package]], searchterm: str):
from django.db.models import Q
search_cond = Q(canonical_smiles=searchterm)
compound_qs = Compound.objects.filter(Q(package__in=packages) & Q(compoundstructure__canonical_smiles=searchterm)).distinct()
compound_structure_qs = CompoundStructure.objects.filter(Q(compound__package__in=packages) & search_cond)
reactions_qs = Reaction.objects.filter(Q(package__in=packages) & (Q(educts__canonical_smiles=searchterm) | Q(products__canonical_smiles=searchterm))).distinct()
pathway_qs = Pathway.objects.filter(Q(package__in=packages) & (Q(edge__edge_label__educts__canonical_smiles=searchterm) | Q(edge__edge_label__products__canonical_smiles=searchterm))).distinct()
return {
'Compounds': [{'name': c.name, 'description': c.description, 'url': c.url} for c in compound_qs],
'Compound Structures': [{'name': c.name, 'description': c.description, 'url': c.url} for c in compound_structure_qs],
'Reactions': [{'name': r.name, 'description': r.description, 'url': r.url} for r in reactions_qs],
'Pathways': [{'name': p.name, 'description': p.description, 'url': p.url} for p in pathway_qs],
}
@staticmethod
def _search_text(packages: Union[Package, List[Package]], searchterm: str):
from django.db.models import Q
search_cond = (Q(name__icontains=searchterm) | Q(description__icontains=searchterm))
cond = Q(package__in=packages) & search_cond
compound_qs = Compound.objects.filter(cond)
compound_structure_qs = CompoundStructure.objects.filter(Q(compound__package__in=packages) & search_cond)
rule_qs = Rule.objects.filter(cond)
reactions_qs = Reaction.objects.filter(cond)
pathway_qs = Pathway.objects.filter(cond)
res = {
'Compounds': [{'name': c.name, 'description': c.description, 'url': c.url} for c in compound_qs],
'Compound Structures': [{'name': c.name, 'description': c.description, 'url': c.url} for c in compound_structure_qs],
'Rules': [{'name': r.name, 'description': r.description, 'url': r.url} for r in rule_qs],
'Reactions': [{'name': r.name, 'description': r.description, 'url': r.url} for r in reactions_qs],
'Pathways': [{'name': p.name, 'description': p.description, 'url': p.url} for p in pathway_qs],
}
return res
class SNode(object):
@ -719,7 +874,7 @@ class SNode(object):
class SEdge(object):
def __init__(self, educts: Union[SNode, List[SNode]], products: Union[SNode | List[SNode]],
rule: Optional['Rule'] = None):
rule: Optional['Rule'] = None, probability: Optional[float] = None):
if not isinstance(educts, list):
educts = [educts]
@ -727,6 +882,7 @@ class SEdge(object):
self.educts = educts
self.products = products
self.rule = rule
self.probability = probability
def __hash__(self):
full_hash = 0
@ -799,11 +955,45 @@ class SPathway(object):
elif isinstance(n, SNode):
self.root_nodes.append(n)
self.queue = list()
self.smiles_to_node: Dict[str, SNode] = dict(**{n.smiles: n for n in self.root_nodes})
self.edges: Set['SEdge'] = set()
self.done = False
@staticmethod
def from_pathway(pw: 'Pathway', persist: bool = True):
""" Initializes a SPathway with a state given by a Pathway """
spw = SPathway(root_nodes=pw.root_nodes, persist=pw if persist else None, prediction_setting=pw.setting)
# root_nodes are already added in __init__, add remaining nodes
for n in pw.nodes:
snode = SNode(n.default_node_label.smiles, n.depth)
if snode.smiles not in spw.smiles_to_node:
spw.smiles_to_node[snode.smiles] = snode
spw.snode_persist_lookup[snode] = n
for e in pw.edges:
sub = []
prod = []
for n in e.start_nodes.all():
sub.append(spw.smiles_to_node[n.default_node_label.smiles])
for n in e.end_nodes.all():
prod.append(spw.smiles_to_node[n.default_node_label.smiles])
rule = None
if e.edge_label.rules.all():
rule = e.edge_label.rules.all().first()
prob = None
if e.kv.get('probability'):
prob = float(e.kv['probability'])
sedge = SEdge(sub, prod, rule=rule, probability=prob)
spw.edges.add(sedge)
spw.sedge_persist_lookup[sedge] = e
return spw
def num_nodes(self):
return len(self.smiles_to_node.keys())
@ -830,8 +1020,18 @@ class SPathway(object):
return sorted(res, key=lambda x: hash(x))
def predict_step(self, from_depth: int = 0):
substrates = self._get_nodes_for_depth(from_depth)
def predict_step(self, from_depth: int = None, from_node: 'Node' = None):
substrates: List[SNode] = []
if from_depth is not None:
substrates = self._get_nodes_for_depth(from_depth)
elif from_node is not None:
for k,v in self.snode_persist_lookup.items():
if from_node == v:
substrates = [k]
break
else:
raise ValueError("Neither from_depth nor from_node_url specified")
new_tp = False
if substrates:
@ -849,13 +1049,19 @@ class SPathway(object):
node = self.smiles_to_node[c]
cand_nodes.append(node)
edge = SEdge(sub, cand_nodes, cand_set.rule)
edge = SEdge(sub, cand_nodes, rule=cand_set.rule, probability=cand_set.probability)
self.edges.add(edge)
else:
# In case no substrates are found, we're done.
# For "predict from node" we're always done
if len(substrates) == 0 or from_node is not None:
self.done = True
# Check if we need to write back data to database
if new_tp and self.persist:
self._sync_to_pathway()
# call save to update internal modified field
self.persist.save()
def _sync_to_pathway(self):
logger.info("Updating Pathway with SPathway")
@ -876,6 +1082,11 @@ class SPathway(object):
product_nodes.append(self.snode_persist_lookup[snode])
e = Edge.create(self.persist, educt_nodes, product_nodes, sedge.rule)
if sedge.probability:
e.kv.update({'probability': sedge.probability})
e.save()
self.sedge_persist_lookup[sedge] = e
logger.info("Update done!")