import gzip import json import os.path from django.conf import settings as s from django.shortcuts import render from epdb.logic import PackageManager from epdb.models import Rule from epdb.views import get_base_context, _anonymous_or_real from utilities.chem import FormatConverter def migration(request): if request.method == 'GET': context = get_base_context(request) if os.path.exists(s.BASE_DIR / 'fixtures' / 'migration_status_per_rule.json') and request.GET.get( "force") is None: migration_status = json.load(open(s.BASE_DIR / 'fixtures' / 'migration_status_per_rule.json')) else: data = json.load(gzip.open(s.BASE_DIR / 'fixtures' / 'ambit_rules.json.gz', 'rb')) results = [] success = 0 error = 0 total = 0 num_keys = len(data.keys()) for i, bt_rule_name in enumerate(data.keys()): print(f"{i + 1}/{num_keys}") bt_rule = data[bt_rule_name] smirks = bt_rule['smirks'] all_prods = set() res = True for comp, ambit_prod in zip(bt_rule['compounds'], bt_rule['products']): # if comp['smiles'] != 'CC1=C(C(=C(C=N1)CO)C=O)O': # continue products = FormatConverter.apply(comp['smiles'], smirks, preprocess_smiles=True, bracketize=False) all_rdkit_prods = [] for ps in products: for p in ps: all_rdkit_prods.append(p) all_rdkit_prods = list(set(all_rdkit_prods)) ambit_smiles, ambit_errors = FormatConverter.sanitize_smiles(ambit_prod) rdkit_smiles, rdkit_errors = FormatConverter.sanitize_smiles(all_rdkit_prods) for x in ambit_smiles: all_prods.add(x) # TODO mode "intersection" # partial_res = (len(set(ambit_smiles).intersection(set(rdkit_smiles))) > 0) or (len(ambit_smiles) == 0) # FAILED (failures=37) # TODO mode = "full ambit" # partial_res = len(set(ambit_smiles).intersection(set(rdkit_smiles))) == len(ambit_smiles) # FAILED (failures=46) # TODO mode = "equality" partial_res = set(ambit_smiles) == set(rdkit_smiles) # FAILED (failures=69) res &= partial_res results.append( { 'name': bt_rule_name, 'id': bt_rule['id'].split('/')[-1], 'url': bt_rule['id'], 'status': res, 'detail_url': s.SERVER_URL + '/migration/' + bt_rule['id'].replace('https://envipath.org/', '') } ) if res: success += 1 else: error += 1 total += 1 results = sorted(results, key=lambda x: (x['status'], x['name'])) migration_status = { 'results': results, 'success': success, 'error': error, 'total': total } json.dump(migration_status, open(s.BASE_DIR / 'fixtures' / 'migration_status_per_rule.json', 'w')) for r in migration_status['results']: r['detail_url'] = r['detail_url'].replace('http://localhost:8000', s.SERVER_URL) context.update(**migration_status) return render(request, 'migration.html', context) def migration_detail(request, package_uuid, rule_uuid): current_user = _anonymous_or_real(request) if request.method == 'GET': context = get_base_context(request) p = PackageManager.get_package_by_id(current_user, package_uuid) rule = Rule.objects.get(package=p, uuid=rule_uuid) bt_rule_name = rule.name data = json.load(gzip.open(s.BASE_DIR / 'fixtures' / 'ambit_rules.json.gz', 'rb')) bt_rule = data[bt_rule_name] smirks = bt_rule['smirks'] results = [] res = True all_prods = set() for comp, ambit_prod in zip(bt_rule['compounds'], bt_rule['products']): # if comp['smiles'] != 'CC1=C(C(=C(C=N1)CO)C=O)O': # continue products = FormatConverter.apply(comp['smiles'], smirks, preprocess_smiles=True, bracketize=False) all_rdkit_prods = [] for ps in products: for p in ps: all_rdkit_prods.append(p) all_rdkit_prods = list(set(all_rdkit_prods)) ambit_smiles, ambit_errors = FormatConverter.sanitize_smiles(ambit_prod) rdkit_smiles, rdkit_errors = FormatConverter.sanitize_smiles(all_rdkit_prods) for x in ambit_smiles: all_prods.add(x) # TODO mode "intersection" # partial_res = (len(set(ambit_smiles).intersection(set(rdkit_smiles))) > 0) or (len(ambit_smiles) == 0) # FAILED (failures=37) # TODO mode = "full ambit" # partial_res = len(set(ambit_smiles).intersection(set(rdkit_smiles))) == len(ambit_smiles) # FAILED (failures=46) # TODO mode = "equality" partial_res = set(ambit_smiles) == set(rdkit_smiles) # FAILED (failures=69) # if len(ambit_smiles) or len(rdkit_smiles): temp = { 'url': comp['id'], 'id': comp['id'].split('/')[-1], 'name': comp['name'], 'initial_smiles': comp['smiles'], 'ambit_smiles': sorted(list(ambit_smiles)), 'rdkit_smiles': sorted(list(rdkit_smiles)), 'status': set(ambit_smiles) == set(rdkit_smiles), } if set(ambit_smiles) != set(rdkit_smiles): detail = f""" BT: {bt_rule_name} SMIRKS: {bt_rule['smirks']} Compound: {comp['smiles']} Compound URL: {comp['id']} Num ambit: {len(set(ambit_smiles))} Num rdkit: {len(set(rdkit_smiles))} Num Intersection A: {len(set(ambit_smiles).intersection(set(rdkit_smiles)))} Num Intersection B: {len(set(rdkit_smiles).intersection(set(ambit_smiles)))} Difference A: {set(ambit_smiles).difference(set(rdkit_smiles))} Difference B: {set(rdkit_smiles).difference(set(ambit_smiles))} ambit products: {ambit_smiles} rdkit products: {rdkit_smiles} ambit_errors: {ambit_errors} rdkit_errors: {rdkit_errors} """ temp['detail'] = '\n'.join([x.strip() for x in detail.split('\n')]) # print(detail.strip()) results.append(temp) res &= partial_res results = sorted(results, key=lambda x: x['status']) context['results'] = results context['res'] = res context['bt_rule_name'] = bt_rule_name return render(request, 'migration_detail.html', context)