81 Commits

Author SHA1 Message Date
d1a00f71b4 wip 2026-04-17 19:39:54 +02:00
ca0508d96a More on PES 2026-04-15 21:14:47 +02:00
349877b5e3 wip 2026-04-15 12:23:29 +02:00
dd0f7eaf05 Prep configs, added Package Create Modal 2026-04-14 21:14:21 +02:00
7d828e2be0 Adjusted View for Package creation 2026-04-14 21:14:21 +02:00
8ae4f36174 Adding secret flags to group, add secret pools to packages 2026-04-14 21:14:21 +02:00
451986082a Adjusted Dockerfile for Bayer 2026-04-14 21:14:21 +02:00
ca5a9a12be Adjusted docker compose to bayer specifics 2026-04-14 21:14:21 +02:00
21181c80ec Show Pack Classification 2026-04-14 21:14:21 +02:00
5aa39637dc Initial bayer app 2026-04-14 21:14:21 +02:00
22179f0d90 adjusted migration 2026-04-14 21:14:21 +02:00
b508511cd6 Implement basic group listing and re-enabled group creation 2026-04-14 20:58:12 +02:00
877804c0ff [Feature] Path prefixes (#369)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#369
2026-04-14 21:59:29 +12:00
964574c700 [Feature] Biotransformer in enviPath (#364)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#364
2026-04-10 00:00:13 +12:00
5029a8cda5 [Feature] Dockerized Setup (#366)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#366
2026-04-07 20:06:46 +12:00
d06bd0d4fd [Feature] Minimal IUCLID export (#338)
This is an initial implementation that creates a working minimal .i6z document.
It passes schema validation and can be imported into IUCLID.

Caveat:
IUCLID files target individual compounds.
Pathway is not actually covered by the format.

It can be added in either soil or water and soil OECD endpoints.
**I currently only implemented the soil endpoint for all data.**

This sort of works, and I can report all degradation products in a pathway (not a nice view, but we can report many transformation products and add a diagram attachment in the future).

Adding additional information is an absolute pain, as we need to explicitly map each type of information to the relevant OECD field.
I use the XSD scheme for validation, but unfortunately the IUCLID parser is not fully compliant and requires a specific order, etc.

The workflow is: finding the AI structure from the XSD scheme -> make the scheme validation pass -> upload to IUCLID to get obscure error messages -> guess what could be wrong -> repeat 💣

New specifications get released once per year, so we will have to update accordingly.
I believe that this should be a more expensive feature, as it requires significant effort to uphold.

Currently implemented for root compound only in SOIL:

- Soil Texture 2
- Soil Texture 1
- pH value
- Half-life per soil sample / scenario (mapped to disappearance; not sure about that).
- CEC
- Organic Matter (only Carbon)
- Moisture content
- Humidity

<img width="2123" alt="image.png" src="attachments/d29830e1-65ef-4136-8939-1825e0959c62">
<img width="2124" alt="image.png" src="attachments/ac9de2ac-bf68-4ba4-b40b-82f810a9de93">
<img width="2139" alt="image.png" src="attachments/5674c7e6-865e-420e-974a-6b825b331e6c">

Reviewed-on: enviPath/enviPy#338
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2026-04-07 19:46:12 +12:00
f7c45b8015 [Feature] Add legacy api endpoint to mimic ReferringScenarios (#362)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#362
2026-03-17 19:44:47 +13:00
68aea97013 [Feature] Simple template extension mechanism (#361)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#361
2026-03-16 21:06:20 +13:00
3cc7fa9e8b [Fix] Add Captcha vars to Template (#359)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#359
2026-03-13 11:46:34 +13:00
21f3390a43 [Feature] Add Captchas to avoid spam registrations (#358)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#358
2026-03-13 11:36:48 +13:00
8cdf91c8fb [Fix] Broken Model Creation (#356)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#356
2026-03-12 11:34:14 +13:00
bafbf11322 [Fix] Broken Enzyme Links (#353)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#353
2026-03-12 10:25:47 +13:00
f1a9456d1d [Fix] enviFormer prediction (#352)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#352
2026-03-12 08:49:44 +13:00
e0764126e3 [Fix] Scenario Review Status + Depth issues (#351)
https://envipath.org/api/legacy/package/32de3cf4-e3e6-4168-956e-32fa5ddb0ce1/pathway/1d537657-298c-496b-9e6f-2bec0cbe0678

-> Node.depth can be float for Dummynodes
-> Scenarios in Edge.d3_json were lacking a reviewed flag

Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#351
2026-03-12 08:28:20 +13:00
ef0c45b203 [Fix] Pepper display probability calculation (#349)
Probability of persistent is now calculated to include very persistent.

Reviewed-on: enviPath/enviPy#349
Co-authored-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
Co-committed-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
2026-03-11 19:12:55 +13:00
b737fc93eb [Feature] Search for Permissions, Prep Compound / Structure to be extended, Prep Template overwrites (#347)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#347
2026-03-11 11:27:15 +13:00
d4295c9349 [Fix] bootstrap command now reflects new Scenario/AdditionalInformation structure (#346)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#346
2026-03-07 03:14:28 +13:00
c6ff97694d [Feature] PEPPER in enviPath (#332)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#332
2026-03-06 22:11:22 +13:00
6e00926371 [Feature] Scenario and Additional Information creation via enviPath-python, Add Half Lifes to API Output, Fix source/target ids in legacy API (#340)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#340
2026-03-06 07:20:18 +13:00
81cc612e69 [Feature] Populate Batch Predict Table by CSV (#339)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#339
2026-03-06 03:15:44 +13:00
cc9598775c [Fix] Fix Perm for creating entities (#341)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#341
2026-02-27 03:56:33 +13:00
d2c2e643cb [Fix] Compound Grouping, Identity prediction of enviFormer, Setting params (#337)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#337
2026-02-20 10:14:28 +13:00
0ff046363c [Fix] Fixed failing frontend tests due to renaming (#335)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#335
2026-02-17 03:09:32 +13:00
5150027f0d [Fix] Login via email, prevent Usernames with certain chars 2026-02-16 13:58:06 +01:00
58ab5b33e3 [Fix] Filter Active Users (#314) (#329)
Adding users to a group or setting permissions on a package now filter for active users. Also any inactive members of group/package get marked as such.

<img width="490" alt="{3B906C71-F3AE-41E4-A61C-B8377D79F685}.png" src="attachments/09cf149a-9d7a-4560-8ce7-9f3487527ee2">

Reviewed-on: enviPath/enviPy#329
Co-authored-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
Co-committed-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
2026-02-12 20:20:16 +13:00
73f0202267 [Fix] Filter Scenarios with Parent (#311) (#323)
The scenarios lists both in /scenarios and /package/<id>/scenario no longer show related scenarios (children).
All related scenarios are shown on the scenario page under Related Scenarios if there are any.
<img width="500" alt="{C2D38DED-A402-4A27-A241-BC2302C62A50}.png" src="attachments/1371c177-220c-42d5-94ff-56f9fbab761f">

Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#323
Co-authored-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
Co-committed-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
2026-02-11 23:19:20 +13:00
27c5bad9c5 [Fix] Upgraded ai-lib, temporarily ignore additional validation errors (#328)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#328
2026-02-11 03:49:20 +13:00
5789f20e7f [Feature] Create API Key Authenticaton for v1 API (#327)
Add API key authentication to v1 API
Also includes:
- management command to create keys for users
- Improvements to API tests

Minor:
- more robust way to start docker dev container.

Reviewed-on: enviPath/enviPy#327
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2026-02-11 02:29:54 +13:00
c0cfdb9255 [Style] Adds custom name display for timeseries (#320)
<img width="1336" alt="image.png" src="attachments/58e49257-976e-469f-a19e-069c8915c437">

Co-authored-by: jebus <lorsbach@envipath.com>
Reviewed-on: enviPath/enviPy#320
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2026-02-04 08:15:16 +13:00
5da8dbc191 [Feature] Timeseries Pathway view (#319)
**Warning depends on Timeseries feature to be merged**

Implements a way to display OECD 301F data on the pathway view.
This is mostly a PoC and needs to be improved once the pathway rendering is updated.

![image.png](/attachments/053965d7-78f7-487a-b5d0-898612708fa3)

Co-authored-by: jebus <lorsbach@envipath.com>
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#319
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2026-02-04 05:19:25 +13:00
dc18b73e08 [Feature] Adds timeseries display (#313)
Adds a way to input/display timeseries data to the additional information

Reviewed-on: enviPath/enviPy#313
Reviewed-by: jebus <lorsbach@envipath.com>
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2026-02-04 01:01:06 +13:00
d80dfb5ee3 [Feature] Dynamic additional information rendering in frontend (#282)
This implements a version of #274, relying on Pydantics built in JSON schema and JSON rendering.
Requires additional UI tagging in the ai model repo but will remove HTML tags.

Example scenario with filled information: 5882df9c-dae1-4d80-a40e-db4724271456/scenario/3a4d395a-6a6d-4154-8ce3-ced667fceec0

Reviewed-on: enviPath/enviPy#282
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2026-01-31 00:44:03 +13:00
9f63a9d4de [Fix] Fixed ObjectDoesNotExist for send_registration_mail, fixed duplicate mail sending (#312)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#312
2026-01-29 20:24:11 +13:00
5565b9cb9e [Fix] UI bugs, Registrations Mail, BTRules Popup, Legacy API fixes (#309)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#309
2026-01-29 11:13:34 +13:00
ab0b5a5186 [Feature] Leftovers after Release (#303)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#303
2026-01-22 10:26:38 +13:00
f905bf21cf [Feature] Update ToS to be more legally safe and sensible (#301)
- Improved ToS content
- Add ToS pointer and academic use note at signup
- Remove legal collection page (unnecessary)

Reviewed-on: enviPath/enviPy#301
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2026-01-20 03:18:40 +13:00
1fd993927c [Feature] Check os.environ for ENV_PATH (#300)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#300
2026-01-19 23:41:43 +13:00
2a2fe4f147 Static pages update (#299)
I have updated the static pages following @wicker comments on #275

Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Reviewed-on: enviPath/enviPy#299
Co-authored-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
Co-committed-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
2026-01-19 23:37:08 +13:00
5f5ae76182 [Fix] Fix Prediction Spinner, ensure proper pathway status is set
Fixes Spinner and status message display on pathway page

Reviewed-on: enviPath/enviPy#291
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2026-01-15 23:09:12 +13:00
1c2f70b3b9 [Feature] Add batch-predict to site-map (#285)
Adds batch predict to the site-map but does not give it prominence.
This is to avoid non-experts "accidentally" flooding the system.

Happy to move it to the main menu if better, @jebus?

Reviewed-on: enviPath/enviPy#285
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2026-01-15 22:30:31 +13:00
54f8302104 [FIX] Fixed Search Output, Legacy API Model Endpoint, Handle ObjectsDoesNotExists in views (#297)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#297
2026-01-15 20:39:54 +13:00
6499a0c659 [Feature] Prediction settings list on User page (#276)
Some checks failed
Build CI Docker Image / build-and-push (push) Failing after 23s
I have added a list of other prediction settings to the User page and a way to change a setting to the default.
<img width="500" alt="{4EFA1273-E53A-4333-948B-8AE3597821A8}.png" src="attachments/048fdc83-1c3e-41d2-a59b-44b0337a05bf">

Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#276
Reviewed-by: jebus <lorsbach@envipath.com>
Co-authored-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
Co-committed-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
2025-12-20 03:19:31 +13:00
7c60a28801 [Feature] Threshold Warning + Cosmetics (#277)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#277
2025-12-20 02:11:47 +13:00
a4a4179261 [Fix] Added libglib2.0 (#280)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#280
2025-12-20 01:26:25 +13:00
6ee4ac535a [Fix] Added libfreetype6 and libcairo2 (#279)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#279
2025-12-20 00:26:23 +13:00
d6065ee888 [Fix] Add libxrender, libxext6 and libfontconfig1 libs to envipy image (#278)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#278
2025-12-19 23:03:02 +13:00
9db4806d75 [Chore] Add custom CI Docker image with Node.js 24, pnpm 10, and uv (#268)
This is meant to drastically speed up CI because it skips the lengthy installation.

Co-authored-by: jebus <lorsbach@envipath.com>
Reviewed-on: enviPath/enviPy#268
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2025-12-17 23:06:58 +13:00
4bf20e62ef [Fix] UI Fixes (#266)
Rather than have a bunch of pull-requests that @jebus will have to merge shall we collect some of the fixes for the UI issues I found in here.

- [x] #259
- [x] #260
- [x] #261
- [x] #262
- [x] #263
- [x] #264
- [x] #265

Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Reviewed-on: enviPath/enviPy#266
Co-authored-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
Co-committed-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
2025-12-15 21:28:43 +13:00
8adb93012a [Feature] Server pagination implementation (#243)
## Major Changes
- Implement a REST style API app in epapi
- Currently implements a GET method for all entity types in the browse menu (both package level and global)
- Provides paginated results per default with query style filtering for reviewed vs unreviewed.
- Provides new paginated templates with thin wrappers per entity types for easier maintainability
- Implements e2e tests for the API

## Minor changes
- Added more comprehensive gitignore to cover coverage reports and other test/node.js etc. data.
- Add additional CI file for API tests that only gets triggered on API relevant changes.

## ⚠️ Currently only works with session-based authentication. Token based will be added in new PR.

Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Co-authored-by: jebus <lorsbach@envipath.com>
Reviewed-on: enviPath/enviPy#243
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2025-12-15 11:34:53 +13:00
d2d475b990 [Feature] Show Multi Gen Eval + Batch Prediction (#267)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#267
2025-12-15 08:48:28 +13:00
648ec150a9 [Feature] Engineer Pathway (#256)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#256
2025-12-10 07:35:42 +13:00
46b0f1c124 [Fix] Remove bootsrap code from AD (#257)
Closes #251

Reviewed-on: enviPath/enviPy#257
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2025-12-09 01:02:12 +13:00
d5af898053 [Fix] Create ML model command fixed (#255) (#258)
Removed the eval packages being incorrectly passed to the create method of enviFormer and MLRelativeReasoning.

Reviewed-on: enviPath/enviPy#258
Co-authored-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
Co-committed-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
2025-12-09 00:57:35 +13:00
b7379b3337 [Fix] Remove double action menu on pathway (#254)
Reviewed-on: enviPath/enviPy#254
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2025-12-03 21:38:36 +13:00
d6440f416c [Fix] Frontend Testing Fixtures (#249)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Co-authored-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
Reviewed-on: enviPath/enviPy#249
2025-12-03 10:49:23 +13:00
901de4640c [Fix] Stereochemistry prediction handling (#228 and #238) (#250)
**This pull request will need a separate migration pull-request**

I have added an alert box in two places when the user tries to predict with stereo chemistry.

When a user predicts a pathway with stereo chemistry an alert box is shown in that node's hover.
To do this I added two new fields. Pathway now has a "predicted" BooleanField indicating whether it was predicted or not. It is set to True if the pathway mode for prediction is "predict" or "incremental" and False if it is "build". I think it is a flag that could be useful in the future, perhaps for analysing how many predicted pathways are in enviPath?
Node now has a `stereo_removed` BooleanField which is set to True if the Node's parent Pathways has "predicted" as true and the node SMILES has stereochemistry.
<img width="500" alt="{927AC9FF-DBC9-4A19-9E6E-0EDD3B08C7AC}.png" src="attachments/69ea29bc-c2d2-4cd2-8e98-aae5c5737f69">

When a user does a prediction on a model's page it shows at the top of the list. This did not require any new fields as the entered SMILES does not get saved anywhere.
<img width="500" alt="{BED66F12-5F07-419E-AAA6-FE1FE5B4F266}.png" src="attachments/5fcc3a9b-4d1a-4e48-acac-76b7571f6507">

I think the alert box is an alright solution but if you have a great idea for something that looks/fits better please change it or let me know.

Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#250
Co-authored-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
Co-committed-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
2025-12-03 10:19:34 +13:00
69df139256 [Fix] Registration (#247)
Fixes:

- Register now works again with the html form action pointing to `register` instead of `login`

Since this is a major issue the above change should probably be merged soon. However, I will open another issue (#248) suggesting we add better help for password creation as currently we give password requirements but do not check them.

Reviewed-on: enviPath/enviPy#247
Reviewed-by: Tobias O <tobias.olenyi@envipath.com>
Co-authored-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
Co-committed-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
2025-12-01 20:47:04 +13:00
e8ae494c16 [Feature] Implemented SMARTS filtering for Rules (#246)
Reactant Filter SMARTS as well as Product Filter SMARTS are now reflected when applying rules.

Fixes #245

Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#246
2025-11-28 23:28:41 +13:00
fd2e2c2534 [Fix] Post Modern UI deploy Bugfixes (#240)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#240
2025-11-27 10:28:04 +13:00
1a2c9bb543 [Feature] Modern UI roll out (#236)
This PR moves all the collection pages into the new UI in a rough push.
I did not put the same amount of care into these as into search, index, and predict.

## Major changes

- All modals are now migrated to a state based alpine.js implementation.
- jQuery is no longer present in the base layout; ajax is replace by native fetch api
- most of the pps.js is now obsolte (as I understand it; the code is not referenced any more @jebus  please double check)
- in-memory pagination for large result lists (set to 50; we can make that configurable later; performance degrades at around 1k) stukk a bit rough tracked in #235

## Minor things

- Sarch and index also use alpine now
- The loading spinner is now CSS animated (not sure if it currently gets correctly called)

## Not done

- Ihave not even cheked the admin pages. Not sure If these need migrations
- The temporary migration pages still use the old template. Not sure what is supposed to happen with those? @jebus

## What I did to test

- opend all pages in browse, and user ; plus all pages reachable from there.
- Interacted and tested the functionality of each modal superfically with exception of the API key modal (no functional test).

---
This PR is massive sorry for that; just did not want to push half-brokenn state.
@jebus @liambrydon I would be glad if you could click around and try to break it :)

Finally closes #133

Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#236
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2025-11-26 23:16:44 +13:00
7f6f209b4a [Feature] Frontend Testing #140 (#218)
I added playwright for frontend testing and got a couple simple test cases working.
I have updated pyproject.toml but it can also be installed with `pip install pytest-playwright` followed by `playwright install`

With the django server running you can do `playwright codegen http://localhost:8000/` which will generate test code based on the actions you take on the webpage it opens. Be sure to change the target to pytest in the code pop up.

I will add more test cases but @jebus and @t03i feel free to add more. Especially once we are done with the full front-end redesign.

I have put the tests under `tests/frontend/` but I am not sure how to add them to the CI. They give steps for CI integration but maybe we want to somehow include them in our exisiting CI yaml? https://playwright.dev/python/docs/ci-intro

Reviewed-on: enviPath/enviPy#218
Reviewed-by: Tobias O <tobias.olenyi@envipath.com>
Co-authored-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
Co-committed-by: Liam Brydon <lbry121@aucklanduni.ac.nz>
2025-11-26 19:44:35 +13:00
b6c35fea76 [Feature] Search API Endpoint (#227)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#227
2025-11-20 09:56:11 +13:00
fa8a191383 [Fix] Show the User who ran the Job for Admins (#226)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#226
2025-11-20 08:05:15 +13:00
67b1baa5b0 [Feature] Legacy API (#224)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#224
2025-11-19 20:45:16 +13:00
89c194dcca [Enhancement] Restyle Discourse Cards for title only (#220)
Excerpts are only delivered for pinned posts. So all cards apart from pinned look empty.
Changed to only display (more of) the title now.

closes  #214

Reviewed-on: enviPath/enviPy#220
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2025-11-14 21:43:52 +13:00
a8554c903c [Enhancement] Swappable Packages (#216)
Co-authored-by: Tim Lorsbach <tim@lorsba.ch>
Reviewed-on: enviPath/enviPy#216
Reviewed-by: liambrydon <lbry121@aucklanduni.ac.nz>
Reviewed-by: Tobias O <tobias.olenyi@envipath.com>
2025-11-14 21:42:39 +13:00
d584791ee8 [Fix] Ketcher submission now recognized (#213)
This will hack the ketcher submission to work again (see #207).
The problem seems to be that the iframe loads slower than the script tag so the reference is not available on page load.

Registering from within the code to poll until ketcher is ready is a bit messy.
Tracked the introduced dept in #212.

Reviewed-on: enviPath/enviPy#213
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2025-11-13 23:27:29 +13:00
e60052b05c [Fix] Remove Search from Old Framework Navbar (#211)
fixes #204

Reviewed-on: enviPath/enviPy#211
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2025-11-13 23:16:52 +13:00
3ff8d938d6 [Fix] Advanced now redirects to predict_pathway. (#210)
fixes #208

Reviewed-on: enviPath/enviPy#210
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2025-11-13 23:15:50 +13:00
a7f48c2cf9 [Fix] Predict page scrolls to submit button (#209)
Autofocus on form is automatically placed on cancel button. Now it is on Name.

fixes #205

Reviewed-on: enviPath/enviPy#209
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2025-11-13 23:15:08 +13:00
39faab3d11 [Fix] Add extra styles to make show login form (#203)
FIx display on the login page

Reviewed-on: enviPath/enviPy#203
Co-authored-by: Tobias O <tobias.olenyi@envipath.com>
Co-committed-by: Tobias O <tobias.olenyi@envipath.com>
2025-11-13 09:11:32 +13:00
361 changed files with 98766 additions and 14841 deletions

93
.dockerignore Normal file
View File

@ -0,0 +1,93 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# uv / virtual environments
.venv/
venv/
env/
ENV/
# uv cache
.uv/
uv.lock.bak
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Test / coverage
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache/
pytest_cache/
nosetests.xml
coverage.xml
*.cover
*.py,cover
# Type checkers
.mypy_cache/
.pyre/
.pytype/
# Jupyter Notebook
.ipynb_checkpoints
# Environment variables
.env
.env.*
*.env
# IDEs / editors
.vscode/
.idea/
*.swp
*.swo
*~
# OS files
.DS_Store
Thumbs.db
# Git
.git/
.gitignore
.gitea/
# Docker
Dockerfile
docker-compose.yml
docker-compose.yaml
# Logs
*.log
# Temporary files
tmp/
temp/
# Frontend stuff
node_modules/

View File

@ -20,3 +20,16 @@ LOG_LEVEL='INFO'
SERVER_URL='http://localhost:8000'
PLUGINS_ENABLED=True
EP_DATA_DIR='data'
EMAIL_HOST_USER='admin@envipath.com'
EMAIL_HOST_PASSWORD='dummy-password'
DEFAULT_FROM_EMAIL="test@test.com"
SERVER_EMAIL='test@test.com'
# Testing settings VScode
DJANGO_SETTINGS_MODULE='envipath.settings'
MANAGE_PY_PATH='./manage.py'
APPLICABILITY_DOMAIN_ENABLED=True
ENVIFORMER_PRESENT=True
MODEL_BUILDING_ENABLED=True

View File

@ -3,10 +3,20 @@ EP_DATA_DIR=
ALLOWED_HOSTS=
DEBUG=
LOG_LEVEL=
MODEL_BUILDING_ENABLED=
APPLICABILITY_DOMAIN_ENABLED=
ENVIFORMER_PRESENT=
FLAG_CELERY_PRESENT=
SERVER_URL=
ENVIFORMER_DEVICE=
PLUGINS_ENABLED=
SERVER_URL=
SERVER_PATH=
ADMIN_APPROVAL_REQUIRED=
REGISTRATION_MANDATORY=
LOG_DIR=
# Celery
FLAG_CELERY_PRESENT=
CELERY_BROKER_URL=
CELERY_RESULT_BACKEND=
# DB
POSTGRES_SERVICE_NAME=
POSTGRES_DB=
@ -16,5 +26,30 @@ POSTGRES_PORT=
# MAIL
EMAIL_HOST_USER=
EMAIL_HOST_PASSWORD=
# MATOMO
MATOMO_SITE_ID
DEFAULT_FROM_EMAIL=
SERVER_EMAIL=
# SENTRY
SENTRY_ENABLED=
SENTRY_DSN=
SENTRY_ENVIRONMENT=
# MS ENTRA
MS_ENTRA_ENABLED=
MS_CLIENT_ID=
MS_CLIENT_SECRET=
MS_TENANT_ID=
MS_REDIRECT_URI=
MS_SCOPES=
# Tenant
TENANT=
EPDB_PACKAGE_MODEL=
# Captcha
CAP_ENABLED=
CAP_API_BASE=
CAP_SITE_KEY=
CAP_SECRET_KEY=
# QUARKUS (JAVA)
ENVIRULE_ENABLED=
FINGERPRINT_URL=
# Biotransformer
BIOTRANSFORMER_ENABLED=
BIOTRANSFORMER_URL=

View File

@ -0,0 +1,67 @@
name: 'Setup enviPy Environment'
description: 'Shared setup for enviPy CI - installs dependencies and prepares environment'
inputs:
skip-frontend:
description: 'Skip frontend build steps (pnpm, tailwind)'
required: false
default: 'false'
skip-playwright:
description: 'Skip Playwright installation'
required: false
default: 'false'
ssh-private-key:
description: 'SSH private key for git access'
required: true
run-migrations:
description: 'Run Django migrations after setup'
required: false
default: 'true'
runs:
using: "composite"
steps:
- name: Setup ssh
shell: bash
run: |
mkdir -p ~/.ssh
echo "${{ inputs.ssh-private-key }}" > ~/.ssh/id_ed25519
chmod 600 ~/.ssh/id_ed25519
ssh-keyscan git.envipath.com >> ~/.ssh/known_hosts
eval $(ssh-agent -s)
ssh-add ~/.ssh/id_ed25519
- name: Setup Python venv
shell: bash
run: |
uv sync --locked --all-extras --dev
- name: Install Playwright
if: inputs.skip-playwright == 'false'
shell: bash
run: |
source .venv/bin/activate
playwright install --with-deps
- name: Build Frontend
if: inputs.skip-frontend == 'false'
shell: bash
run: |
uv run python scripts/pnpm_wrapper.py install
uv run python scripts/pnpm_wrapper.py run build
- name: Wait for Postgres
shell: bash
run: |
until pg_isready -h postgres -U ${{ env.POSTGRES_USER }}; do
echo "Waiting for postgres..."
sleep 2
done
echo "Postgres is ready!"
- name: Run Django Migrations
if: inputs.run-migrations == 'true'
shell: bash
run: |
source .venv/bin/activate
python manage.py migrate --noinput

View File

@ -0,0 +1,53 @@
# Custom CI Docker image for Gitea runners
# Pre-installs Node.js 24, pnpm 10, uv, and system dependencies
# to eliminate setup time in CI workflows
FROM ubuntu:24.04
# Prevent interactive prompts during package installation
ENV DEBIAN_FRONTEND=noninteractive
# Install system dependencies
RUN apt-get update && \
apt-get install -y \
postgresql-client \
redis-tools \
openjdk-11-jre-headless \
curl \
ca-certificates \
gnupg \
lsb-release \
git \
ssh \
libxrender1 \
libxext6 \
libfontconfig1 \
libfreetype6 \
libcairo2 \
libglib2.0-0t64 \
&& rm -rf /var/lib/apt/lists/*
# Install Node.js 24 via NodeSource
RUN curl -fsSL https://deb.nodesource.com/setup_24.x | bash - && \
apt-get install -y nodejs && \
rm -rf /var/lib/apt/lists/*
# Enable corepack and install pnpm 10
RUN corepack enable && \
corepack prepare pnpm@10 --activate
# Install uv https://docs.astral.sh/uv/guides/integration/docker/#available-images
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
ENV PATH="/root/.cargo/bin:${PATH}"
# Verify installations
RUN node --version && \
npm --version && \
pnpm --version && \
uv --version && \
pg_isready --version && \
redis-cli --version && \
java -version
# Set working directory
WORKDIR /workspace

View File

@ -0,0 +1,86 @@
name: API CI
on:
pull_request:
branches:
- develop
paths:
- 'epapi/**'
- 'epdb/models.py' # API depends on models
- 'epdb/logic.py' # API depends on business logic
- 'tests/fixtures/**' # API tests use fixtures
workflow_dispatch:
jobs:
api-tests:
if: ${{ !contains(gitea.event.pull_request.title, 'WIP') }}
runs-on: ubuntu-latest
container:
image: git.envipath.com/envipath/envipy-ci:latest
services:
postgres:
image: postgres:16
env:
POSTGRES_USER: ${{ vars.POSTGRES_USER }}
POSTGRES_PASSWORD: ${{ secrets.POSTGRES_PASSWORD }}
POSTGRES_DB: ${{ vars.POSTGRES_DB }}
ports:
- ${{ vars.POSTGRES_PORT}}:5432
options: >-
--health-cmd="pg_isready -U postgres"
--health-interval=10s
--health-timeout=5s
--health-retries=5
env:
RUNNER_TOOL_CACHE: /toolcache
EP_DATA_DIR: /opt/enviPy/
ALLOWED_HOSTS: 127.0.0.1,localhost
DEBUG: True
LOG_LEVEL: INFO
MODEL_BUILDING_ENABLED: True
APPLICABILITY_DOMAIN_ENABLED: True
ENVIFORMER_PRESENT: True
ENVIFORMER_DEVICE: cpu
FLAG_CELERY_PRESENT: False
PLUGINS_ENABLED: True
SERVER_URL: http://localhost:8000
ADMIN_APPROVAL_REQUIRED: True
REGISTRATION_MANDATORY: True
LOG_DIR: ''
# DB
POSTGRES_SERVICE_NAME: postgres
POSTGRES_DB: ${{ vars.POSTGRES_DB }}
POSTGRES_USER: ${{ vars.POSTGRES_USER }}
POSTGRES_PASSWORD: ${{ secrets.POSTGRES_PASSWORD }}
POSTGRES_PORT: 5432
# SENTRY
SENTRY_ENABLED: False
# MS ENTRA
MS_ENTRA_ENABLED: False
steps:
- name: Checkout repository
uses: actions/checkout@v4
# Use shared setup action - skips frontend builds for API-only tests
- name: Setup enviPy Environment
uses: ./.gitea/actions/setup-envipy
with:
skip-frontend: 'true'
skip-playwright: 'false'
ssh-private-key: ${{ secrets.ENVIPY_CI_PRIVATE_KEY }}
run-migrations: 'true'
- name: Run API tests
run: |
.venv/bin/python manage.py test epapi -v 2
- name: Test API endpoints availability
run: |
.venv/bin/python manage.py runserver 0.0.0.0:8000 &
SERVER_PID=$!
sleep 5
curl -f http://localhost:8000/api/v1/docs || echo "API docs not available"
kill $SERVER_PID

View File

@ -0,0 +1,48 @@
name: Build CI Docker Image
on:
workflow_dispatch:
push:
branches:
- develop
- main
paths:
- '.gitea/docker/Dockerfile.ci'
- '.gitea/workflows/build-ci-image.yaml'
jobs:
build-and-push:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to container registry
uses: docker/login-action@v3
with:
registry: git.envipath.com
username: ${{ secrets.CI_REGISTRY_USER }}
password: ${{ secrets.CI_REGISTRY_PASSWORD }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: git.envipath.com/envipath/envipy-ci
tags: |
type=raw,value=latest
type=sha,prefix={{branch}}-
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: .
file: .gitea/docker/Dockerfile.ci
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=registry,ref=git.envipath.com/envipath/envipy-ci:latest
cache-to: type=inline

View File

@ -8,7 +8,10 @@ on:
jobs:
test:
if: ${{ !contains(gitea.event.pull_request.title, 'WIP') }}
runs-on: ubuntu-latest
container:
image: git.envipath.com/envipath/envipy-ci:latest
services:
postgres:
@ -40,7 +43,7 @@ jobs:
EP_DATA_DIR: /opt/enviPy/
ALLOWED_HOSTS: 127.0.0.1,localhost
DEBUG: True
LOG_LEVEL: DEBUG
LOG_LEVEL: INFO
MODEL_BUILDING_ENABLED: True
APPLICABILITY_DOMAIN_ENABLED: True
ENVIFORMER_PRESENT: True
@ -63,54 +66,22 @@ jobs:
MS_ENTRA_ENABLED: False
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Install system tools via apt
run: |
sudo apt-get update
sudo apt-get install -y postgresql-client redis-tools openjdk-11-jre-headless
- name: Setup ssh
run: |
echo "${{ secrets.ENVIPY_CI_PRIVATE_KEY }}" > ~/.ssh/id_ed25519
chmod 600 ~/.ssh/id_ed25519
ssh-keyscan git.envipath.com >> ~/.ssh/known_hosts
eval $(ssh-agent -s)
ssh-add ~/.ssh/id_ed25519
- name: Install pnpm
uses: pnpm/action-setup@v4
# Use shared setup action - includes all dependencies and migrations
- name: Setup enviPy Environment
uses: ./.gitea/actions/setup-envipy
with:
version: 10
skip-frontend: 'false'
skip-playwright: 'false'
ssh-private-key: ${{ secrets.ENVIPY_CI_PRIVATE_KEY }}
run-migrations: 'true'
- name: Use Node.js
uses: actions/setup-node@v4
with:
node-version: 20
cache: "pnpm"
- name: Install uv
uses: astral-sh/setup-uv@v6
with:
enable-cache: true
- name: Setup venv
- name: Run frontend tests
run: |
uv sync --locked --all-extras --dev
- name: Wait for services
run: |
until pg_isready -h postgres -U postgres; do sleep 2; done
# until redis-cli -h redis ping; do sleep 2; done
- name: Run Django Migrations
run: |
source .venv/bin/activate
python manage.py migrate --noinput
.venv/bin/python manage.py test --tag frontend
- name: Run Django tests
run: |
source .venv/bin/activate
python manage.py test tests --exclude-tag slow
.venv/bin/python manage.py test tests --exclude-tag slow --exclude-tag frontend

372
.gitignore vendored
View File

@ -1,17 +1,375 @@
*.pyc
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py.cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
.idea/
db.sqlite3-journal
static/admin/
static/django_extensions/
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
# https://pdm-project.org/en/latest/usage/project/#working-with-version-control
# pdm.lock
# pdm.toml
.pdm-python
.pdm-build/
# pixi
# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control.
# pixi.lock
# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one
# in the .venv directory. It is recommended not to include this directory in version control.
.pixi
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# Redis
*.rdb
*.aof
*.pid
# RabbitMQ
mnesia/
rabbitmq/
rabbitmq-data/
# ActiveMQ
activemq-data/
# SageMath parsed files
*.sage.py
# Environments
.env
.envrc
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
# Abstra
# Abstra is an AI-powered process automation framework.
# Ignore directories containing user credentials, local state, and settings.
# Learn more at https://abstra.io/docs
.abstra/
# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
.vscode/
*.code-workspace
# Ruff stuff:
.ruff_cache/
# UV cache
.uv-cache/
# PyPI configuration file
.pypirc
# Marimo
marimo/_static/
marimo/_lsp/
__marimo__/
# Streamlit
.streamlit/secrets.toml
### Agents ###
.claude/
.codex/
.cursor/
.github/prompts/
.junie/
.windsurf/
AGENTS.md
CLAUDE.md
GEMINI.md
.aider.*
### Node.js ###
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
lerna-debug.log*
# Diagnostic reports (https://nodejs.org/api/report.html)
report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
# Runtime data
pids
*.pid
*.seed
*.pid.lock
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov
# Coverage directory used by tools like istanbul
coverage
*.lcov
# nyc test coverage
.nyc_output
# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
.grunt
# Bower dependency directory (https://bower.io/)
bower_components
# node-waf configuration
.lock-wscript
# Compiled binary addons (https://nodejs.org/api/addons.html)
build/Release
# Dependency directories
node_modules/
jspm_packages/
# Snowpack dependency directory (https://snowpack.dev/)
web_modules/
# TypeScript cache
*.tsbuildinfo
# Optional npm cache directory
.npm
# Optional eslint cache
.eslintcache
# Optional stylelint cache
.stylelintcache
# Optional REPL history
.node_repl_history
# Output of 'npm pack'
*.tgz
# Yarn Integrity file
.yarn-integrity
# dotenv environment variable files
.env
.env.*
!.env.example
# parcel-bundler cache (https://parceljs.org/)
.cache
.parcel-cache
# Next.js build output
.next
out
# Nuxt.js build / generate output
.nuxt
dist
.output
# Gatsby files
.cache/
# Comment in the public line in if your project uses Gatsby and not Next.js
# https://nextjs.org/blog/next-9-1#public-directory-support
# public
# vuepress build output
.vuepress/dist
# vuepress v2.x temp and cache directory
.temp
.cache
# Sveltekit cache directory
.svelte-kit/
# vitepress build output
**/.vitepress/dist
# vitepress cache directory
**/.vitepress/cache
# Docusaurus cache and generated files
.docusaurus
# Serverless directories
.serverless/
# FuseBox cache
.fusebox/
# DynamoDB Local files
.dynamodb/
# Firebase cache directory
.firebase/
# TernJS port file
.tern-port
# Stores VSCode versions used for testing VSCode extensions
.vscode-test
# yarn v3
.pnp.*
.yarn/*
!.yarn/patches
!.yarn/plugins
!.yarn/releases
!.yarn/sdks
!.yarn/versions
# Vite files
vite.config.js.timestamp-*
vite.config.ts.timestamp-*
.vite/
### Custom ###
debug.log
scratches/
test-results/
data/
*.arff
.DS_Store
node_modules/
# Auto generated
static/css/output.css
*.code-workspace
# macOS system files
.DS_Store
.Trashes
._*

View File

@ -5,10 +5,12 @@ repos:
rev: v3.2.0
hooks:
- id: trailing-whitespace
exclude: epiuclid/schemas/
- id: end-of-file-fixer
exclude: epiuclid/schemas/
- id: check-yaml
- id: check-added-large-files
exclude: ^static/images/
exclude: ^static/images/|^epiuclid/schemas/|^fixtures/
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.13.3

98
Dockerfile Normal file
View File

@ -0,0 +1,98 @@
FROM python:3.12-slim AS builder
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
UV_LINK_MODE=copy
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
libpq-dev \
curl \
openssh-client \
git \
nodejs \
npm \
&& rm -rf /var/lib/apt/lists/*
# Install pnpm
RUN npm install -g pnpm
RUN curl -LsSf https://astral.sh/uv/install.sh | sh
ENV PATH="/root/.local/bin:${PATH}"
# Install dependencies first (cached layer — only invalidated when lockfile changes)
COPY pyproject.toml uv.lock ./
# Add key from git.envipath.com to known_hosts
RUN mkdir -p -m 0700 /root/.ssh \
&& ssh-keyscan git.envipath.com >> /root/.ssh/known_hosts
# We'll need access to private repos, let docker make use of host ssh agent and use it like:
# docker build --ssh default -t envipath/envipy:1.0 .
RUN --mount=type=ssh \
uv sync --locked --extra ms-login --extra pepper-plugin
# Now copy source and do a final sync to install the project itself
# Ensure .dockerignore is reasonable
COPY bayer bayer
COPY bridge bridge
COPY biotransformer biotransformer
COPY envipath envipath
COPY epapi epapi
COPY epauth epauth
COPY epdb epdb
COPY fixtures fixtures
COPY migration migration
COPY pepper pepper
COPY scripts scripts
COPY static static
COPY templates templates
COPY tests tests
COPY utilities utilities
COPY manage.py .
# Install frontend deps
COPY package.json pnpm-lock.yaml pnpm-workspace.yaml ./
# Build frontend assets
RUN uv run python scripts/pnpm_wrapper.py install
RUN uv run python scripts/pnpm_wrapper.py run build
FROM python:3.12-slim AS production
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PATH="/app/.venv/bin:$PATH"
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
libpq5 \
libxrender1 \
libxext6 \
libfontconfig1 \
nano \
&& rm -rf /var/lib/apt/lists/*
RUN useradd -ms /bin/bash django
# Create directories in /opt and set ownership
RUN mkdir -p /opt/enviPy \
&& mkdir -p /opt/enviPy/celery \
&& mkdir -p /opt/enviPy/log \
&& mkdir -p /opt/enviPy/models \
&& mkdir -p /opt/enviPy/plugins \
&& mkdir -p /opt/enviPy/static \
&& chown -R django:django /opt/enviPy
COPY --from=builder --chown=django:django /app /app
RUN touch /app/.env && chown -R django:django /app/.env
USER django
EXPOSE 8000
CMD ["gunicorn", "envipath.wsgi:application", "--bind", "0.0.0.0:8000", "--workers", "3"]

View File

@ -8,13 +8,12 @@ These instructions will guide you through setting up the project for local devel
- Python 3.11 or later
- [uv](https://github.com/astral-sh/uv) - Python package manager
- **Docker and Docker Compose** - Required for running PostgreSQL database
- **Docker and Docker Compose** - Required for running PostgreSQL database and Redis (for async Celery tasks)
- Git
- Make
> **Note:** This application requires PostgreSQL (uses `ArrayField`). Docker is the easiest way to run PostgreSQL locally.
### 1. Install Dependencies
This project uses `uv` to manage dependencies and `poe-the-poet` for task running. First, [install `uv` if you don't have it yet](https://docs.astral.sh/uv/guides/install-python/).
@ -79,25 +78,48 @@ uv run poe bootstrap # Bootstrap data only
uv run poe shell # Open the Django shell
uv run poe build # Build frontend assets and collect static files
uv run poe clean # Remove database volumes (WARNING: destroys all data)
uv run poe celery # Start Celery worker for async task processing
uv run poe celery-dev # Start database and Celery worker
```
### 4. Async Celery Setup (Optional)
By default, Celery tasks run synchronously (`CELERY_TASK_ALWAYS_EAGER = True`), which means prediction tasks block the HTTP request until completion. To enable asynchronous task processing with live status updates on pathway pages:
1. **Set the Celery flag in your `.env` file:**
```bash
FLAG_CELERY_PRESENT=True
```
2. **Start Redis and Celery worker:**
```bash
uv run poe celery-dev
```
3. **Start the development server** (in another terminal):
```bash
uv run poe dev
```
### Troubleshooting
* **Docker Connection Error:** If you see an error like `open //./pipe/dockerDesktopLinuxEngine: The system cannot find the file specified` (on Windows), it likely means your Docker Desktop application is not running. Please start Docker Desktop and try the command again.
- **Docker Connection Error:** If you see an error like `open //./pipe/dockerDesktopLinuxEngine: The system cannot find the file specified` (on Windows), it likely means your Docker Desktop application is not running. Please start Docker Desktop and try the command again.
* **SSH Keys for Git Dependencies:** Some dependencies are installed from private git repositories and require SSH authentication. Ensure your SSH keys are configured correctly for Git.
* For a general guide, see [GitHub's official documentation](https://docs.github.com/en/authentication/connecting-to-github-with-ssh/generating-a-new-ssh-key-and-adding-it-to-the-ssh-agent).
* **Windows Users:** If `uv sync` hangs while fetching git dependencies, you may need to explicitly configure Git to use the Windows OpenSSH client and use the `ssh-agent` to manage your key's passphrase.
- **SSH Keys for Git Dependencies:** Some dependencies are installed from private git repositories and require SSH authentication. Ensure your SSH keys are configured correctly for Git.
- For a general guide, see [GitHub's official documentation](https://docs.github.com/en/authentication/connecting-to-github-with-ssh/generating-a-new-ssh-key-and-adding-it-to-the-ssh-agent).
- **Windows Users:** If `uv sync` hangs while fetching git dependencies, you may need to explicitly configure Git to use the Windows OpenSSH client and use the `ssh-agent` to manage your key's passphrase.
1. **Point Git to the correct SSH executable:**
```powershell
git config --global core.sshCommand "C:/Windows/System32/OpenSSH/ssh.exe"
```
2. **Enable and use the SSH agent:**
1. **Point Git to the correct SSH executable:**
```powershell
git config --global core.sshCommand "C:/Windows/System32/OpenSSH/ssh.exe"
```
2. **Enable and use the SSH agent:**
```powershell
# Run these commands in an administrator PowerShell
Get-Service ssh-agent | Set-Service -StartupType Automatic -PassThru | Start-Service
```powershell
# Run these commands in an administrator PowerShell
Get-Service ssh-agent | Set-Service -StartupType Automatic -PassThru | Start-Service
# Add your key to the agent. It will prompt for the passphrase once.
ssh-add
```
# Add your key to the agent. It will prompt for the passphrase once.
ssh-add
```

0
bayer/__init__.py Normal file
View File

19
bayer/admin.py Normal file
View File

@ -0,0 +1,19 @@
from django.contrib import admin
# Register your models here.
from .models import (
PESCompound,
PESStructure
)
class PESCompoundAdmin(admin.ModelAdmin):
pass
class PESStructureAdmin(admin.ModelAdmin):
pass
admin.site.register(PESCompound, PESCompoundAdmin)
admin.site.register(PESStructure, PESStructureAdmin)

6
bayer/apps.py Normal file
View File

@ -0,0 +1,6 @@
from django.apps import AppConfig
class BayerConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'bayer'

31
bayer/epdb_hooks.py Normal file
View File

@ -0,0 +1,31 @@
import logging
from epdb.template_registry import register_template
logger = logging.getLogger(__name__)
# PES Create
register_template(
"epdb.actions.collections.compound",
"actions/collections/new_pes.html",
)
register_template(
"modals.collections.compound",
"modals/collections/new_pes_modal.html",
)
# PES Viz
register_template(
"epdb.objects.compound.viz",
"objects/compound_viz.html",
)
register_template(
"epdb.objects.compound_structure.viz",
"objects/compound_structure_viz.html",
)
register_template(
"epdb.objects.node.viz",
"objects/node_viz.html",
)

View File

@ -0,0 +1,35 @@
# Generated by Django 5.2.7 on 2026-03-06 10:51
import django.utils.timezone
import model_utils.fields
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='Package',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, editable=False, verbose_name='created')),
('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, editable=False, verbose_name='modified')),
('uuid', models.UUIDField(default=uuid.uuid4, unique=True, verbose_name='UUID of this object')),
('name', models.TextField(default='no name', verbose_name='Name')),
('description', models.TextField(default='no description', verbose_name='Descriptions')),
('url', models.TextField(null=True, unique=True, verbose_name='URL')),
('kv', models.JSONField(blank=True, default=dict, null=True)),
('reviewed', models.BooleanField(default=False, verbose_name='Reviewstatus')),
('classification_level', models.IntegerField(choices=[(0, 'Internal'), (10, 'Restricted'), (20, 'Secret')], default=10)),
],
options={
'db_table': 'epdb_package',
},
),
]

View File

@ -0,0 +1,22 @@
# Generated by Django 5.2.7 on 2026-03-06 10:51
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
('bayer', '0001_initial'),
('epdb', '0019_remove_scenario_additional_information_and_more'),
]
operations = [
migrations.AddField(
model_name='package',
name='license',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to='epdb.license', verbose_name='License'),
),
]

View File

@ -0,0 +1,20 @@
# Generated by Django 6.0.3 on 2026-04-14 19:07
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('bayer', '0002_initial'),
('epdb', '0023_alter_compoundstructure_options_and_more'),
]
operations = [
migrations.AddField(
model_name='package',
name='data_pool',
field=models.ForeignKey(blank=True, default=None, null=True, on_delete=django.db.models.deletion.SET_NULL, to='epdb.group', verbose_name='Data pool'),
),
]

View File

@ -0,0 +1,35 @@
# Generated by Django 6.0.3 on 2026-04-15 20:03
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('bayer', '0003_package_data_pool'),
('epdb', '0023_alter_compoundstructure_options_and_more'),
]
operations = [
migrations.CreateModel(
name='PESCompound',
fields=[
('compound_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='epdb.compound')),
],
options={
'abstract': False,
},
bases=('epdb.compound',),
),
migrations.CreateModel(
name='PESStructure',
fields=[
('compoundstructure_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='epdb.compoundstructure')),
],
options={
'abstract': False,
},
bases=('epdb.compoundstructure',),
),
]

View File

@ -0,0 +1,19 @@
# Generated by Django 6.0.3 on 2026-04-16 08:43
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('bayer', '0004_pescompound_pesstructure'),
]
operations = [
migrations.AddField(
model_name='pesstructure',
name='pes_link',
field=models.URLField(default=None, verbose_name='PES Link'),
preserve_default=False,
),
]

View File

168
bayer/models.py Normal file
View File

@ -0,0 +1,168 @@
from typing import List
import urllib.parse
import nh3
from django.conf import settings as s
from django.db import models, transaction
from django.db.models import QuerySet
from django.urls import reverse
from epdb.models import (
EnviPathModel,
Compound,
CompoundStructure,
ParallelRule,
SequentialRule,
SimpleAmbitRule,
SimpleRDKitRule,
)
class Package(EnviPathModel):
reviewed = models.BooleanField(verbose_name="Reviewstatus", default=False)
license = models.ForeignKey(
"epdb.License", on_delete=models.SET_NULL, blank=True, null=True, verbose_name="License"
)
class Classification(models.IntegerChoices):
INTERNAL = 0, "Internal"
RESTRICTED = 10 , "Restricted"
SECRET = 20, "Secret"
classification_level = models.IntegerField(
choices=Classification,
default=Classification.RESTRICTED,
)
data_pool = models.ForeignKey("epdb.Group", on_delete=models.SET_NULL, blank=True, null=True,
verbose_name="Data pool", default=None)
def delete(self, *args, **kwargs):
# explicitly handle related Rules
for r in self.rules.all():
r.delete()
super().delete(*args, **kwargs)
def __str__(self):
return f"{self.name} (pk={self.pk})"
@property
def compounds(self) -> QuerySet:
return self.compound_set.all()
@property
def rules(self) -> QuerySet:
return self.rule_set.all()
@property
def reactions(self) -> QuerySet:
return self.reaction_set.all()
@property
def pathways(self) -> QuerySet:
return self.pathway_set.all()
@property
def scenarios(self) -> QuerySet:
return self.scenario_set.all()
@property
def models(self) -> QuerySet:
return self.epmodel_set.all()
def _url(self):
return "{}/package/{}".format(s.SERVER_URL, self.uuid)
def get_applicable_rules(self) -> List["Rule"]:
"""
Returns a ordered set of rules where the following applies:
1. All Composite will be added to result
2. All SimpleRules will be added if theres no CompositeRule present using the SimpleRule
Ordering is based on "url" field.
"""
rules = []
rule_qs = self.rules
reflected_simple_rules = set()
for r in rule_qs:
if isinstance(r, ParallelRule) or isinstance(r, SequentialRule):
rules.append(r)
for sr in r.simple_rules.all():
reflected_simple_rules.add(sr)
for r in rule_qs:
if isinstance(r, SimpleAmbitRule) or isinstance(r, SimpleRDKitRule):
if r not in reflected_simple_rules:
rules.append(r)
rules = sorted(rules, key=lambda x: x.url)
return rules
class Meta:
db_table = "epdb_package"
class PESCompound(Compound):
@staticmethod
@transaction.atomic
def create(
package: "Package", pes_data: dict, name: str = None, description: str = None, *args, **kwargs
) -> "Compound":
pes_url = pes_data["pes_url"]
# Check if we find a direct match for a given pes_link
if PESStructure.objects.filter(pes_link=pes_url, compound__package=package).exists():
return PESStructure.objects.get(pes_link=pes_url, compound__package=package).compound
# Generate Compound
c = PESCompound()
c.package = package
if name is not None:
# Clean for potential XSS
name = nh3.clean(name, tags=s.ALLOWED_HTML_TAGS).strip()
if name is None or name == "":
name = f"Compound {Compound.objects.filter(package=package).count() + 1}"
c.name = name
# We have a default here only set the value if it carries some payload
if description is not None and description.strip() != "":
c.description = nh3.clean(description, tags=s.ALLOWED_HTML_TAGS).strip()
c.save()
is_standardized = standardized_smiles == smiles
if not is_standardized:
_ = CompoundStructure.create(
c,
standardized_smiles,
name="Normalized structure of {}".format(name),
description="{} (in its normalized form)".format(description),
normalized_structure=True,
)
cs = CompoundStructure.create(
c, smiles, name=name, description=description, normalized_structure=is_standardized
)
c.default_structure = cs
c.save()
return c
class PESStructure(CompoundStructure):
pes_link = models.URLField(blank=False, null=False, verbose_name="PES Link")
def d3_json(self):
return {
"is_pes": True,
"pes_link": self.pes_link,
# Will overwrite image from Node
"image": f"{reverse("depict_pes")}?pesLink={urllib.parse.quote(self.pes_link)}"
}

View File

@ -0,0 +1,9 @@
{% if meta.can_edit %}
<button
type="button"
class="btn btn-primary btn-sm"
onclick="document.getElementById('new_pes_modal').showModal(); return false;"
>
New PES
</button>
{% endif %}

View File

@ -0,0 +1,175 @@
{% load static %}
<dialog
id="new_package_modal"
class="modal"
x-data="{
isSubmitting: false,
packageClassification: null,
reset() {
this.isSubmitting = false;
this.packageClassification = null;
},
setFormData(data) {
this.formData = data;
},
get isSecret() {
return this.packageClassification === '20';
},
submit(formId) {
const form = document.getElementById(formId);
// Remove previously injected inputs
form.querySelectorAll('.dynamic-param').forEach(el => el.remove());
// Add values from dynamic form into the html form
if (this.formData) {
Object.entries(this.formData).forEach(([key, value]) => {
const input = document.createElement('input');
input.type = 'hidden';
input.name = key;
input.value = value;
input.classList.add('dynamic-param');
form.appendChild(input);
});
}
if (form && form.checkValidity()) {
this.isSubmitting = true;
form.submit();
} else if (form) {
form.reportValidity();
}
}
}"
@close="reset()"
>
<div class="modal-box max-w-3xl">
<!-- Header -->
<h3 class="text-lg font-bold">New Package</h3>
<!-- Close button (X) -->
<form method="dialog">
<button
class="btn btn-sm btn-circle btn-ghost absolute top-2 right-2"
:disabled="isSubmitting"
>
</button>
</form>
<!-- Body -->
<div class="py-4">
<form
id="new_package_form"
accept-charset="UTF-8"
action=""
method="post"
>
{% csrf_token %}
<!-- Name -->
<div class="form-control mb-3">
<label class="label" for="package-name">
<span class="label-text">Name</span>
</label>
<input
id="package-name"
class="input input-bordered w-full"
name="package-name"
placeholder="Name"
required
/>
</div>
<!-- Description -->
<div class="form-control mb-3">
<label class="label" for="package-description">
<span class="label-text">Description</span>
</label>
<input
id="package-description"
type="text"
class="input input-bordered w-full"
placeholder="Description..."
name="package-description"
/>
</div>
<!-- Classification Level -->
<div class="form-control mb-3">
<label class="label" for="package-classification">
<span class="label-text">Package Classification</span>
</label>
<select
id="package-classification"
name="package-classification"
class="select select-bordered w-full"
x-model="packageClassification"
required
>
<option value="null" disabled selected>Select Classification</option>
<option value="0">Internal</option>
<option value="10">Restricted</option>
<option value="20">Secret</option>
</select>
</div>
<!-- Secret Groups -->
<div class="form-control mb-3" x-show="isSecret" x-cloak>
<label class="label" for="package-data-pool">
<span class="label-text">Data Pool for SECRET Package</span>
</label>
<p>Only users with this role can be granted access to this package</p>
<select
id="package-data-pool"
name="package-data-pool"
class="select select-bordered w-full"
>
<option value="" disabled selected>Select Data Pool</option>
{% for obj in meta.secret_groups %}
<option value="{{ obj.url }}">{{ obj.name|safe }}</option>
{% endfor %}
</select>
</div>
</form>
</div>
<!-- Footer -->
<div class="modal-action">
<button
type="button"
class="btn"
onclick="this.closest('dialog').close()"
:disabled="isSubmitting"
>
Cancel
</button>
<button
type="button"
class="btn btn-primary"
@click="submit('new_package_form')"
:disabled="isSubmitting || !selectedType || loadingSchemas"
>
<span x-show="!isSubmitting">Submit</span>
<span
x-show="isSubmitting"
class="loading loading-spinner loading-sm"
></span>
<span x-show="isSubmitting">Creating...</span>
</button>
</div>
</div>
<!-- Backdrop -->
<form method="dialog" class="modal-backdrop">
<button :disabled="isSubmitting">close</button>
</form>
</dialog>

View File

@ -0,0 +1,174 @@
{% load static %}
<dialog
id="new_pes_modal"
class="modal"
x-data="{
isSubmitting: false,
pesLink: null,
pesVizHtml: '',
reset() {
this.isSubmitting = false;
},
get isPESSet() {
console.log(this.pesLink);
return this.pesLink !== null;
},
updatePesViz() {
if (!this.isPESSet) {
this.pesVizHtml = '';
return;
}
const img = new Image();
img.src = '{% url 'depict_pes' %}?pesLink=' + encodeURIComponent(this.pesLink);
img.style.width = '100%';
img.style.height = '100%';
img.style.objectFit = 'cover';
img.onload = () => {
this.pesVizHtml = img.outerHTML;
};
img.onerror = () => {
this.pesVizHtml = `
<div class='alert alert-error' role='alert'>
<h4 class='alert-heading'>Could not render PES!</h4>
<p>Could not render PES - Do you have access?</p>
</div>`;
};
},
submit(formId) {
const form = document.getElementById(formId);
// Remove previously injected inputs
form.querySelectorAll('.dynamic-param').forEach(el => el.remove());
// Add values from dynamic form into the html form
if (this.formData) {
Object.entries(this.formData).forEach(([key, value]) => {
const input = document.createElement('input');
input.type = 'hidden';
input.name = key;
input.value = value;
input.classList.add('dynamic-param');
form.appendChild(input);
});
}
if (form && form.checkValidity()) {
this.isSubmitting = true;
form.submit();
} else if (form) {
form.reportValidity();
}
}
}"
@close="reset()"
>
<div class="modal-box max-w-3xl">
<!-- Header -->
<h3 class="text-lg font-bold">New PES</h3>
<!-- Close button (X) -->
<form method="dialog">
<button
class="btn btn-sm btn-circle btn-ghost absolute top-2 right-2"
:disabled="isSubmitting"
>
</button>
</form>
<!-- Body -->
<div class="py-4">
<form
id="new-pes-modal-form"
accept-charset="UTF-8"
action="{% url 'create pes' meta.current_package.uuid %}"
method="post"
>
{% csrf_token %}
<div class="form-control mb-3">
<label class="label" for="compound-name">
<span class="label-text">Name</span>
</label>
<input
id="compound-name"
class="input input-bordered w-full"
name="compound-name"
placeholder="Name"
required
/>
</div>
<div class="form-control mb-3">
<label class="label" for="compound-description">
<span class="label-text">Description</span>
</label>
<input
id="compound-description"
class="input input-bordered w-full"
name="compound-description"
placeholder="Description"
/>
</div>
<div class="form-control mb-3">
<label class="label" for="pes-link">
<span class="label-text">Link to PES</span>
</label>
<input
id="pes-link"
name="pes-link"
type="text"
class="input input-bordered w-full"
placeholder="Link to PES e.g. https://pesregapp-test.cropkey-np.ag/entities/PES-000126"
x-model="pesLink"
@input="updatePesViz()"
required
/>
</div>
<div id="pes-viz" class="mb-3" x-html="pesVizHtml"></div>
</form>
</div>
<!-- Footer -->
<div class="modal-action">
<button
type="button"
class="btn"
onclick="this.closest('dialog').close()"
:disabled="isSubmitting"
>
Close
</button>
<button
type="button"
class="btn btn-primary"
@click="submit('new-compound-modal-form')"
:disabled="isSubmitting"
>
<span x-show="!isSubmitting">Submit</span>
<span
x-show="isSubmitting"
class="loading loading-spinner loading-sm"
></span>
<span x-show="isSubmitting">Creating...</span>
</button>
</div>
</div>
<!-- Backdrop -->
<form method="dialog" class="modal-backdrop">
<button :disabled="isSubmitting">close</button>
</form>
</dialog>

View File

@ -0,0 +1,12 @@
{% if compound_structure.pes_link %}
<!-- Image Representation -->
<div class="collapse-arrow bg-base-200 collapse">
<input type="checkbox" checked />
<div class="collapse-title text-xl font-medium">PES Image Representation</div>
<div class="collapse-content">
<div class="flex justify-center">
<img src='{% url 'depict_pes' %}?pesLink={{ compound_structure.pes_link|urlencode }}'/>
</div>
</div>
</div>
{% endif %}

View File

@ -0,0 +1,12 @@
{% if compound.default_structure.pes_link %}
<!-- Image Representation -->
<div class="collapse-arrow bg-base-200 collapse">
<input type="checkbox" checked />
<div class="collapse-title text-xl font-medium">PES Image Representation</div>
<div class="collapse-content">
<div class="flex justify-center">
<img src='{% url 'depict_pes' %}?pesLink={{ compound.default_structure.pes_link|urlencode }}'/>
</div>
</div>
</div>
{% endif %}

View File

@ -0,0 +1,12 @@
{% if node.default_node_label.pes_link %}
<!-- Image Representation -->
<div class="collapse-arrow bg-base-200 collapse">
<input type="checkbox" checked />
<div class="collapse-title text-xl font-medium">PES Image Representation</div>
<div class="collapse-content">
<div class="flex justify-center">
<img src='{% url 'depict_pes' %}?pesLink={{ node.default_node_label.pes_link|urlencode }}'/>
</div>
</div>
</div>
{% endif %}

View File

@ -0,0 +1,97 @@
{% extends "framework_modern.html" %}
{% block content %}
{% block action_modals %}
{% include "modals/objects/edit_package_modal.html" %}
{% include "modals/objects/edit_package_permissions_modal.html" %}
{% include "modals/objects/publish_package_modal.html" %}
{% include "modals/objects/set_license_modal.html" %}
{% include "modals/objects/export_package_modal.html" %}
{% include "modals/objects/generic_delete_modal.html" %}
{% endblock action_modals %}
<div class="space-y-2 p-4">
<!-- Header Section -->
<div class="card bg-base-100">
<div class="card-body">
<div class="flex items-center justify-between">
<h2 class="card-title text-2xl">{{ package.name }} - ({{ package.get_classification_level_display }})</h2>
<div id="actionsButton" class="dropdown dropdown-e nd hidden">
<div tabindex="0" role="button" class="btn btn-ghost btn-sm">
<svg
xmlns="http://www.w3.org/2000/svg"
width="16"
height="16"
viewBox="0 0 24 24"
fill="none"
stroke="currentColor"
stroke-width="2"
stroke-linecap="round"
stroke-linejoin="round"
class="lucide lucide-wrench"
>
<path
d="M14.7 6.3a1 1 0 0 0 0 1.4l1.6 1.6a1 1 0 0 0 1.4 0l3.77-3.77a6 6 0 0 1-7.94 7.94l-6.91 6.91a2.12 2.12 0 0 1-3-3l6.91-6.91a6 6 0 0 1 7.94-7.94l-3.76 3.76z"
/>
</svg>
Actions
</div>
<ul
tabindex="-1"
class="dropdown-content menu bg-base-100 rounded-box z-50 w-52 p-2"
>
{% block actions %}
{% include "actions/objects/package.html" %}
{% endblock %}
</ul>
</div>
</div>
<p class="mt-2">{{ package.description|safe }}</p>
<ul class="menu bg-base-200 rounded-box mt-4 w-full">
<li>
<a href="{{ package.url }}/pathway" class="hover:bg-base-300"
>Pathways ({{ package.pathways.count }})</a
>
</li>
<li>
<a href="{{ package.url }}/rule" class="hover:bg-base-300"
>Rules ({{ package.rules.count }})</a
>
</li>
<li>
<a href="{{ package.url }}/compound" class="hover:bg-base-300"
>Compounds ({{ package.compounds.count }})</a
>
</li>
<li>
<a href="{{ package.url }}/reaction" class="hover:bg-base-300"
>Reactions ({{ package.reactions.count }})</a
>
</li>
<li>
<a href="{{ package.url }}/model" class="hover:bg-base-300"
>Models ({{ package.models.count }})</a
>
</li>
<li>
<a href="{{ package.url }}/scenario" class="hover:bg-base-300"
>Scenarios ({{ package.scenarios.count }})</a
>
</li>
</ul>
</div>
</div>
</div>
<script>
// Show actions button if there are actions
document.addEventListener("DOMContentLoaded", function () {
const actionsButton = document.getElementById("actionsButton");
const actionsList = actionsButton?.querySelector("ul");
if (actionsList && actionsList.children.length > 0) {
actionsButton?.classList.remove("hidden");
}
});
</script>
{% endblock content %}

View File

@ -0,0 +1,149 @@
{% extends "static/login_base.html" %}
{% block title %}enviPath - Sign In{% endblock %}
{% block extra_styles %}
<style>
/* Tab styling */
.tab-content {
display: none;
}
.tab-content.active {
display: block;
}
input[type="radio"].tab-radio {
display: none;
}
.tab-label {
cursor: pointer;
padding: 0.75rem 1.5rem;
border-bottom: 2px solid transparent;
transition: all 0.3s ease;
}
.tab-label:hover {
background-color: rgba(0, 0, 0, 0.05);
}
input[type="radio"].tab-radio:checked + .tab-label {
border-bottom-color: #3b82f6;
font-weight: 600;
}
</style>
{% endblock %}
{% block content %}
<div class="card bg-base-200 mb-6 ">
<div class="card-body">
<h3 class="card-title">Welcome to the new enviPath!</h3>
</div>
</div>
<!-- Tab Navigation -->
<div class="border-base-300 mb-6 border-b">
<div class="flex justify-start">
<input
type="radio"
name="auth-tab"
id="tab-sso"
class="tab-radio"
checked
/>
<label for="tab-sso" class="tab-label">SSO</label>
<input
type="radio"
name="auth-tab"
id="tab-signin"
class="tab-radio"
/>
<label for="tab-signin" class="tab-label">Local User</label>
</div>
</div>
<!-- SSO Tab -->
<div id="content-sso" class="tab-content active">
<button role="link" onclick="window.location.href='/entra/login'" name="sso" class="btn btn-primary w-full">
Login with Microsoft
</button>
</div>
<!-- Sign In Tab -->
<div id="content-signin" class="tab-content">
<form method="post" action="{% url 'login' %}" class="space-y-4">
{% csrf_token %}
<input type="hidden" name="login" value="true" />
<div class="form-control">
<label class="label" for="username">
<span class="label-text">Account</span>
</label>
<input
type="text"
id="username"
name="username"
placeholder="Username or Email"
class="input input-bordered w-full"
required
autocomplete="username"
/>
</div>
<div class="form-control">
<label class="label" for="passwordinput">
<span class="label-text">Password</span>
</label>
<input
type="password"
id="passwordinput"
name="password"
placeholder="••••••••"
class="input input-bordered w-full"
required
autocomplete="current-password"
/>
</div>
<div class="text-right">
<a href="{% url 'password_reset' %}" class="link link-primary text-sm"
>Forgot password?</a
>
</div>
<input type="hidden" name="next" value="{{ next }}" />
<button type="submit" name="signin" class="btn btn-primary w-full">
Sign In
</button>
</form>
</div>
{% endblock %}
{% block extra_scripts %}
<script>
// Tab switching functionality
document.querySelectorAll('input[name="auth-tab"]').forEach((radio) => {
radio.addEventListener("change", function () {
// Hide all content
document.querySelectorAll(".tab-content").forEach((content) => {
content.classList.remove("active");
});
// Show selected content
const contentId = "content-" + this.id.replace("tab-", "");
document.getElementById(contentId).classList.add("active");
});
});
// Check for hash in URL to auto-select tab
window.addEventListener("DOMContentLoaded", function () {
const hash = window.location.hash.substring(1); // Remove the # symbol
if (hash === "signup" || hash === "signin") {
const tabRadio = document.getElementById("tab-" + hash);
if (tabRadio) {
tabRadio.checked = true;
// Trigger change event to show correct content
tabRadio.dispatchEvent(new Event("change"));
}
}
});
</script>
{% endblock %}

3
bayer/tests.py Normal file
View File

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

14
bayer/urls.py Normal file
View File

@ -0,0 +1,14 @@
from django.urls import re_path
from . import views as v
UUID = "[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}"
urlpatterns = [
re_path(r"^depict_pes$", v.visualize_pes, name="depict_pes"),
re_path(
rf"^package/(?P<package_uuid>{UUID})/compound$",
v.create_pes,
name="create pes",
),
]

96
bayer/views.py Normal file
View File

@ -0,0 +1,96 @@
import base64
import requests
from django.conf import settings as s
from django.core.exceptions import BadRequest
from django.http import HttpResponse
from django.shortcuts import redirect
from bayer.models import PESCompound
from epdb.logic import PackageManager
from epdb.views import _anonymous_or_real
from utilities.decorators import package_permission_required
@package_permission_required()
def create_pes(request, package_uuid):
current_user = _anonymous_or_real(request)
current_package = PackageManager.get_package_by_id(current_user, package_uuid)
if request.method == "POST":
compound_name = request.POST.get('compound-name')
compound_description = request.POST.get('compound-description')
pes_link = request.POST.get('pes-link')
if pes_link:
try:
pes_data = fetch_pes(request, pes_link)
except ValueError as e:
return BadRequest(f"Could not fetch PES data for {pes_link}")
pes = PESCompound.create(current_package, pes_data, compound_name, compound_description)
return redirect(pes.url)
else:
return BadRequest("Please provide a PES link.")
else:
pass
def fetch_pes(request, pes_url) -> dict:
proxies = {
"http": "http://10.185.190.100:8080",
"https": "http://10.185.190.100:8080",
}
from epauth.views import get_access_token_from_request
token = get_access_token_from_request(request)
if token or True:
for k, v in s.PES_API_MAPPING.items():
if pes_url.startswith(k):
pes_id = pes_url.split('/')[-1]
if pes_id == 'dummy' or True:
import json
res_data = json.load(open(s.BASE_DIR / "fixtures/pes.json"))
res_data["pes_url"] = pes_url
return res_data
else:
headers = {"Authorization": f"Bearer {token['access_token']}"}
params = {"pes_reg_entity_corporate_id": pes_id}
res = requests.get(v, headers=headers, params=params, proxies=proxies)
try:
res.raise_for_status()
pes_data = res.json()
if len(pes_data) == 0:
raise ValueError(f"PES with id {pes_id} not found")
res_data = pes_data[0]
res_data["pes_url"] = pes_url
return res_data
except requests.exceptions.HTTPError as e:
raise ValueError(f"Error fetching PES with id {pes_id}: {e}")
else:
raise ValueError(f"Unknown URL {pes_url}")
else:
raise ValueError("Could not fetch access token from request.")
def visualize_pes(request):
pes_link = request.GET.get('pesLink')
if pes_link:
pes_data = fetch_pes(request, pes_link)
representations = pes_data.get('representations')
for rep in representations:
if rep.get('type') == 'color':
image_data = base64.b64decode(rep.get('base64').replace("data:image/png;base64,", ""))
return HttpResponse(image_data, content_type="image/png")

112
biotransformer/__init__.py Normal file
View File

@ -0,0 +1,112 @@
import enum
from datetime import datetime
from typing import List
import requests
from django.conf import settings as s
# Once stable these will be exposed by enviPy-plugins lib
from envipy_additional_information import EnviPyModel, UIConfig, WidgetType # noqa: I001
from envipy_additional_information import register # noqa: I001
from bridge.contracts import Classifier # noqa: I001
from bridge.dto import (
BuildResult,
EnviPyDTO,
EvaluationResult,
RunResult,
TransformationProductPrediction,
) # noqa: I001
class BiotransformerEnvType(enum.Enum):
CYP450 = "CYP450"
ALLHUMAN = "ALLHUMAN"
ECBASED = "ECBASED"
HGUT = "HGUT"
PHASEII = "PHASEII"
ENV = "ENV"
@register("biotransformerconfig")
class BiotransformerConfig(EnviPyModel):
env_type: BiotransformerEnvType
class UI:
title = "Biotransformer Type"
env_type = UIConfig(widget=WidgetType.SELECT, label="Biotransformer Type", order=1)
class Biotransformer(Classifier):
Config = BiotransformerConfig
def __init__(self, config: BiotransformerConfig | None = None):
super().__init__(config)
self.url = f"{s.BIOTRANSFORMER_URL}/biotransformer"
@classmethod
def requires_rule_packages(cls) -> bool:
return False
@classmethod
def requires_data_packages(cls) -> bool:
return False
@classmethod
def identifier(cls) -> str:
return "biotransformer3"
@classmethod
def name(cls) -> str:
return "Biotransformer 3.0"
@classmethod
def display(cls) -> str:
return "Biotransformer 3.0"
def build(self, eP: EnviPyDTO, *args, **kwargs) -> BuildResult | None:
return
def run(self, eP: EnviPyDTO, *args, **kwargs) -> RunResult:
smiles = [c.smiles for c in eP.get_compounds()]
preds = self._post(smiles)
results = []
for substrate in preds.keys():
results.append(
TransformationProductPrediction(
substrate=substrate,
products=preds[substrate],
)
)
return RunResult(
producer=eP.get_context().url,
description=f"Generated at {datetime.now()}",
result=results,
)
def evaluate(self, eP: EnviPyDTO, *args, **kwargs) -> EvaluationResult:
pass
def build_and_evaluate(self, eP: EnviPyDTO, *args, **kwargs) -> EvaluationResult:
pass
def _post(self, smiles: List[str]) -> dict[str, dict[str, float]]:
data = {"substrates": smiles, "mode": self.config.env_type.value}
res = requests.post(self.url, json=data)
res.raise_for_status()
# Example Response JSON:
# {
# 'products': {
# 'CN1C=NC2=C1C(=O)N(C(=O)N2C)C': {
# 'CN1C2=C(C(=O)N(C)C1=O)NC=N2': 0.5,
# 'CN1C=NC2=C1C(=O)N(C)C(=O)N2.CN1C=NC2=C1C(=O)NC(=O)N2C.CO': 0.5
# }
# }
# }
return res.json()["products"]

0
bridge/__init__.py Normal file
View File

400
bridge/contracts.py Normal file
View File

@ -0,0 +1,400 @@
import enum
from abc import ABC, abstractmethod
from envipy_additional_information import EnviPyModel
from .dto import BuildResult, EnviPyDTO, EvaluationResult, RunResult
class PropertyType(enum.Enum):
"""
Enumeration representing different types of properties.
PropertyType is an Enum class that defines categories or types of properties
based on their weight or nature. It can typically be used when classifying
objects or entities by their weight classification, such as lightweight or heavy.
"""
LIGHTWEIGHT = "lightweight"
HEAVY = "heavy"
class Plugin(ABC):
"""
Defines an abstract base class Plugin to serve as a blueprint for plugins.
This class establishes the structure that all plugin implementations must
follow. It enforces the presence of required methods to ensure consistent
functionality across all derived classes.
"""
@classmethod
@abstractmethod
def identifier(cls) -> str:
pass
@classmethod
@abstractmethod
def name(cls) -> str:
"""
Represents an abstract method that provides a contract for implementing a method
to return a name as a string. Must be implemented in subclasses.
Name must be unique across all plugins.
Methods
-------
name() -> str
Abstract method to be defined in subclasses, which returns a string
representing a name.
"""
pass
@classmethod
@abstractmethod
def display(cls) -> str:
"""
An abstract method that must be implemented by subclasses to display
specific information or behavior. The method ensures that all subclasses
provide their own implementation of the display functionality.
Raises:
NotImplementedError: Raises this error when the method is not implemented
in a subclass.
Returns:
str: A string used in dropdown menus or other user interfaces to display
"""
pass
class Property(Plugin):
@classmethod
@abstractmethod
def requires_rule_packages(cls) -> bool:
"""
Defines an abstract method to determine whether rule packages are required.
This method should be implemented by subclasses to specify if they depend
on rule packages for their functioning.
Raises:
NotImplementedError: If the subclass has not implemented this method.
@return: A boolean indicating if rule packages are required.
"""
pass
@classmethod
@abstractmethod
def requires_data_packages(cls) -> bool:
"""
Defines an abstract method to determine whether data packages are required.
This method should be implemented by subclasses to specify if they depend
on data packages for their functioning.
Raises:
NotImplementedError: If the subclass has not implemented this method.
Returns:
bool: True if the service requires data packages, False otherwise.
"""
pass
@abstractmethod
def get_type(self) -> PropertyType:
"""
An abstract method that provides the type of property. This method must
be implemented by subclasses to specify the appropriate property type.
Raises:
NotImplementedError: If the method is not implemented by a subclass.
Returns:
PropertyType: The type of the property associated with the implementation.
"""
pass
def is_heavy(self):
"""
Determines if the current property type is heavy.
This method evaluates whether the property type returned from the `get_type()`
method is classified as `HEAVY`. It utilizes the `PropertyType.HEAVY` constant
for this comparison.
Raises:
AttributeError: If the `get_type()` method is not defined or does not return
a valid value.
Returns:
bool: True if the property type is `HEAVY`, otherwise False.
"""
return self.get_type() == PropertyType.HEAVY
@abstractmethod
def build(self, eP: EnviPyDTO, *args, **kwargs) -> BuildResult | None:
"""
Abstract method to prepare and construct a specific build process based on the provided
environment data transfer object (EnviPyDTO). This method should be implemented by
subclasses to handle the particular requirements of the environment.
Parameters:
eP : EnviPyDTO
The data transfer object containing environment details for the build process.
*args :
Additional positional arguments required for the build.
**kwargs :
Additional keyword arguments to offer flexibility and customization for
the build process.
Returns:
BuildResult | None
Returns a BuildResult instance if the build operation succeeds, else returns None.
Raises:
NotImplementedError
If the method is not implemented in a subclass.
"""
pass
@abstractmethod
def run(self, eP: EnviPyDTO, *args, **kwargs) -> RunResult:
"""
Represents an abstract base class for executing a generic process with
provided parameters and returning a standardized result.
Attributes:
None.
Methods:
run(eP, *args, **kwargs):
Executes a task with specified input parameters and optional
arguments, returning the outcome in the form of a RunResult object.
This is an abstract method and must be implemented in subclasses.
Raises:
NotImplementedError: If the subclass does not implement the abstract
method.
Parameters:
eP (EnviPyDTO): The primary object containing information or data required
for processing. Mandatory.
*args: Variable length argument list for additional positional arguments.
**kwargs: Arbitrary keyword arguments for additional options or settings.
Returns:
RunResult: A result object encapsulating the status, output, or details
of the process execution.
"""
pass
@abstractmethod
def evaluate(self, eP: EnviPyDTO, *args, **kwargs) -> EvaluationResult:
"""
Abstract method for evaluating data based on the given input and additional arguments.
This method is intended to be implemented by subclasses and provides
a mechanism to perform an evaluation procedure based on input encapsulated
in an EnviPyDTO object.
Parameters:
eP : EnviPyDTO
The data transfer object containing necessary input for evaluation.
*args : tuple
Additional positional arguments for the evaluation process.
**kwargs : dict
Additional keyword arguments for the evaluation process.
Returns:
EvaluationResult
The result of the evaluation performed by the method.
Raises:
NotImplementedError
If the method is not implemented in the subclass.
"""
pass
@abstractmethod
def build_and_evaluate(self, eP: EnviPyDTO, *args, **kwargs) -> EvaluationResult:
"""
An abstract method designed to build and evaluate a model or system using the provided
environmental parameters and additional optional arguments.
Args:
eP (EnviPyDTO): The environmental parameters required for building and evaluating.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
EvaluationResult: The result of the evaluation process.
Raises:
NotImplementedError: If the method is not implemented by a subclass.
"""
pass
class Classifier(Plugin):
Config: type[EnviPyModel] | None = None
def __init__(self, config: EnviPyModel | None = None):
self.config = config
@classmethod
def has_config(cls) -> bool:
return cls.Config is not None
@classmethod
def parse_config(cls, data: dict | None = None) -> EnviPyModel | None:
if cls.Config is None:
return None
return cls.Config(**(data or {}))
@classmethod
def create(cls, data: dict | None = None):
return cls(cls.parse_config(data))
@classmethod
@abstractmethod
def requires_rule_packages(cls) -> bool:
"""
Defines an abstract method to determine whether rule packages are required.
This method should be implemented by subclasses to specify if they depend
on rule packages for their functioning.
Raises:
NotImplementedError: If the subclass has not implemented this method.
@return: A boolean indicating if rule packages are required.
"""
pass
@classmethod
@abstractmethod
def requires_data_packages(cls) -> bool:
"""
Defines an abstract method to determine whether data packages are required.
This method should be implemented by subclasses to specify if they depend
on data packages for their functioning.
Raises:
NotImplementedError: If the subclass has not implemented this method.
Returns:
bool: True if the service requires data packages, False otherwise.
"""
pass
@abstractmethod
def build(self, eP: EnviPyDTO, *args, **kwargs) -> BuildResult | None:
"""
Abstract method to prepare and construct a specific build process based on the provided
environment data transfer object (EnviPyDTO). This method should be implemented by
subclasses to handle the particular requirements of the environment.
Parameters:
eP : EnviPyDTO
The data transfer object containing environment details for the build process.
*args :
Additional positional arguments required for the build.
**kwargs :
Additional keyword arguments to offer flexibility and customization for
the build process.
Returns:
BuildResult | None
Returns a BuildResult instance if the build operation succeeds, else returns None.
Raises:
NotImplementedError
If the method is not implemented in a subclass.
"""
pass
@abstractmethod
def run(self, eP: EnviPyDTO, *args, **kwargs) -> RunResult:
"""
Represents an abstract base class for executing a generic process with
provided parameters and returning a standardized result.
Attributes:
None.
Methods:
run(eP, *args, **kwargs):
Executes a task with specified input parameters and optional
arguments, returning the outcome in the form of a RunResult object.
This is an abstract method and must be implemented in subclasses.
Raises:
NotImplementedError: If the subclass does not implement the abstract
method.
Parameters:
eP (EnviPyDTO): The primary object containing information or data required
for processing. Mandatory.
*args: Variable length argument list for additional positional arguments.
**kwargs: Arbitrary keyword arguments for additional options or settings.
Returns:
RunResult: A result object encapsulating the status, output, or details
of the process execution.
"""
pass
@abstractmethod
def evaluate(self, eP: EnviPyDTO, *args, **kwargs) -> EvaluationResult | None:
"""
Abstract method for evaluating data based on the given input and additional arguments.
This method is intended to be implemented by subclasses and provides
a mechanism to perform an evaluation procedure based on input encapsulated
in an EnviPyDTO object.
Parameters:
eP : EnviPyDTO
The data transfer object containing necessary input for evaluation.
*args : tuple
Additional positional arguments for the evaluation process.
**kwargs : dict
Additional keyword arguments for the evaluation process.
Returns:
EvaluationResult
The result of the evaluation performed by the method.
Raises:
NotImplementedError
If the method is not implemented in the subclass.
"""
pass
@abstractmethod
def build_and_evaluate(self, eP: EnviPyDTO, *args, **kwargs) -> EvaluationResult | None:
"""
An abstract method designed to build and evaluate a model or system using the provided
environmental parameters and additional optional arguments.
Args:
eP (EnviPyDTO): The environmental parameters required for building and evaluating.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
EvaluationResult: The result of the evaluation process.
Raises:
NotImplementedError: If the method is not implemented by a subclass.
"""
pass

149
bridge/dto.py Normal file
View File

@ -0,0 +1,149 @@
from dataclasses import dataclass
from typing import Any, List, Optional, Protocol
from envipy_additional_information import EnviPyModel, register
from pydantic import HttpUrl
from utilities.chem import FormatConverter, ProductSet
@dataclass(frozen=True, slots=True)
class Context:
uuid: str
url: str
work_dir: str
class CompoundProto(Protocol):
url: str | None
name: str | None
smiles: str
class RuleProto(Protocol):
url: str
name: str
def apply(self, smiles, *args, **kwargs): ...
class ReactionProto(Protocol):
url: str
name: str
rules: List[RuleProto]
class EnviPyDTO(Protocol):
def get_context(self) -> Context: ...
def get_compounds(self) -> List[CompoundProto]: ...
def get_reactions(self) -> List[ReactionProto]: ...
def get_rules(self) -> List[RuleProto]: ...
@staticmethod
def standardize(smiles, remove_stereo=False, canonicalize_tautomers=False): ...
@staticmethod
def apply(
smiles: str,
smirks: str,
preprocess_smiles: bool = True,
bracketize: bool = True,
standardize: bool = True,
kekulize: bool = True,
remove_stereo: bool = True,
reactant_filter_smarts: str | None = None,
product_filter_smarts: str | None = None,
) -> List["ProductSet"]: ...
class EnviPyPrediction(EnviPyModel):
pass
class PropertyPrediction(EnviPyPrediction):
pass
class TransformationProductPrediction(EnviPyPrediction):
substrate: str
products: dict[str, float]
@register("buildresult")
class BuildResult(EnviPyModel):
data: dict[str, Any] | List[dict[str, Any]] | None
@register("runresult")
class RunResult(EnviPyModel):
producer: HttpUrl
description: Optional[str] = None
result: EnviPyPrediction | List[EnviPyPrediction]
@register("evaluationresult")
class EvaluationResult(EnviPyModel):
data: dict[str, Any] | List[dict[str, Any]] | None
class BaseDTO(EnviPyDTO):
def __init__(
self,
uuid: str,
url: str,
work_dir: str,
compounds: List[CompoundProto],
reactions: List[ReactionProto],
rules: List[RuleProto],
):
self.uuid = uuid
self.url = url
self.work_dir = work_dir
self.compounds = compounds
self.reactions = reactions
self.rules = rules
def get_context(self) -> Context:
return Context(uuid=self.uuid, url=self.url, work_dir=self.work_dir)
def get_compounds(self) -> List[CompoundProto]:
return self.compounds
def get_reactions(self) -> List[ReactionProto]:
return self.reactions
def get_rules(self) -> List[RuleProto]:
return self.rules
@staticmethod
def standardize(smiles, remove_stereo=False, canonicalize_tautomers=False):
return FormatConverter.standardize(
smiles, remove_stereo=remove_stereo, canonicalize_tautomers=canonicalize_tautomers
)
@staticmethod
def apply(
smiles: str,
smirks: str,
preprocess_smiles: bool = True,
bracketize: bool = True,
standardize: bool = True,
kekulize: bool = True,
remove_stereo: bool = True,
reactant_filter_smarts: str | None = None,
product_filter_smarts: str | None = None,
) -> List["ProductSet"]:
return FormatConverter.apply(
smiles,
smirks,
preprocess_smiles,
bracketize,
standardize,
kekulize,
remove_stereo,
reactant_filter_smarts,
product_filter_smarts,
)

View File

@ -1,20 +1,54 @@
services:
db:
image: postgres:15
container_name: envipath-postgres
image: postgres:18
container_name: eppostgres
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: envipath
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: ${POSTGRES_DB}
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
- ep_bayer_postgres_data:/var/lib/postgresql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
test: [ "CMD-SHELL", "pg_isready -U postgres" ]
interval: 5s
timeout: 5s
retries: 5
redis:
image: redis:7-alpine
container_name: epredis
ports:
- "6379:6379"
volumes:
- ep_bayer_redis_data:/data
biotransformer3:
image: envipath/biotransformer3:1.0
container_name: epbiotransformer3
# web:
# image: envipath/envipy-bayer:1.0
# container_name: epdjango
# ports:
# - "127.0.0.1:8000:8000"
# env_file:
# - .env
# command: gunicorn envipath.wsgi:application --bind 0.0.0.0:8000 --workers 3
# volumes:
# - ep_bayer_data:/opt/enviPy/
celery_worker:
image: envipath/envipy-bayer:1.0
container_name: epcelery
env_file:
- .env.dev
command: celery -A envipath worker --concurrency=6 -Q model,predict,background --pool threads
volumes:
- ep_bayer_data:/opt/enviPy/
volumes:
postgres_data:
ep_bayer_postgres_data:
ep_bayer_redis_data:
ep_bayer_data:

50
docker-compose.yml Normal file
View File

@ -0,0 +1,50 @@
services:
db:
image: postgres:18
container_name: eppostgres
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: ${POSTGRES_DB}
volumes:
- ep_bayer_postgres_data:/var/lib/postgresql
healthcheck:
test: [ "CMD-SHELL", "pg_isready -U postgres" ]
interval: 5s
timeout: 5s
retries: 5
redis:
image: redis:7-alpine
container_name: epredis
volumes:
- ep_bayer_redis_data:/data
biotransformer3:
image: envipath/biotransformer3:1.0
container_name: epbiotransformer3
web:
image: envipath/envipy-bayer:1.0
container_name: epdjango
ports:
- "127.0.0.1:8000:8000"
env_file:
- .env
command: gunicorn envipath.wsgi:application --bind 0.0.0.0:8000 --workers 3
volumes:
- ep_bayer_data:/opt/enviPy/
celery_worker:
image: envipath/envipy-bayer:1.0
container_name: epcelery
env_file:
- .env
command: celery -A envipath worker --concurrency=6 -Q model,predict,background --pool threads
volumes:
- ep_bayer_data:/opt/enviPy/
volumes:
ep_bayer_postgres_data:
ep_bayer_redis_data:
ep_bayer_data:

View File

@ -1,4 +1,4 @@
from epdb.api import router as epdb_app_router
from epapi.v1.router import router as v1_router # Refactored API from epdb.api_v2
from epdb.legacy_api import router as epdb_legacy_app_router
from ninja import NinjaAPI
@ -8,5 +8,5 @@ api_v1 = NinjaAPI(title="API V1 Docs", urls_namespace="api-v1")
api_legacy = NinjaAPI(title="Legacy API Docs", urls_namespace="api-legacy")
# Add routers
api_v1.add_router("/", epdb_app_router)
api_v1.add_router("/", v1_router)
api_legacy.add_router("/", epdb_legacy_app_router)

View File

@ -9,19 +9,20 @@ https://docs.djangoproject.com/en/4.2/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/4.2/ref/settings/
"""
import json
import os
from pathlib import Path
from dotenv import load_dotenv
from envipy_plugins import Classifier, Property, Descriptor
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
load_dotenv(BASE_DIR / ".env", override=False)
ENV_PATH = os.environ.get("ENV_PATH", BASE_DIR / ".env.dev")
print(f"Loading env from {ENV_PATH}")
load_dotenv(ENV_PATH, override=False)
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/4.2/howto/deployment/checklist/
@ -36,7 +37,6 @@ ALLOWED_HOSTS = os.environ["ALLOWED_HOSTS"].split(",")
# Application definition
INSTALLED_APPS = [
"django.contrib.admin",
"django.contrib.auth",
@ -44,20 +44,37 @@ INSTALLED_APPS = [
"django.contrib.sessions",
"django.contrib.messages",
"django.contrib.staticfiles",
"django.contrib.postgres",
# 3rd party
"django_extensions",
"oauth2_provider",
# Custom
"epapi", # API endpoints (v1, etc.)
"epdb",
"migration",
]
TENANT = os.environ.get("TENANT", "public")
if TENANT != "public":
INSTALLED_APPS.append(TENANT)
EPDB_PACKAGE_MODEL = os.environ.get("EPDB_PACKAGE_MODEL", "epdb.Package")
def GET_PACKAGE_MODEL():
from django.apps import apps
return apps.get_model(EPDB_PACKAGE_MODEL)
AUTHENTICATION_BACKENDS = [
"django.contrib.auth.backends.ModelBackend",
]
MIDDLEWARE = [
"django.middleware.security.SecurityMiddleware",
"whitenoise.middleware.WhiteNoiseMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"django.middleware.common.CommonMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
@ -76,10 +93,19 @@ if os.environ.get("REGISTRATION_MANDATORY", False) == "True":
ROOT_URLCONF = "envipath.urls"
TEMPLATE_DIRS = [
os.path.join(BASE_DIR, "templates"),
]
# If we have a non-public tenant, we might need to overwrite some templates
# search TENANT folder first...
if TENANT != "public":
TEMPLATE_DIRS.insert(0, os.path.join(BASE_DIR, TENANT, "templates"))
TEMPLATES = [
{
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": (os.path.join(BASE_DIR, "templates"),),
"DIRS": TEMPLATE_DIRS,
"APP_DIRS": True,
"OPTIONS": {
"context_processors": [
@ -111,6 +137,13 @@ DATABASES = {
}
}
if os.environ.get("USE_TEMPLATE_DB", False) == "True":
DATABASES["default"]["TEST"] = {
"NAME": f"test_{os.environ['TEMPLATE_DB']}",
"TEMPLATE": os.environ["TEMPLATE_DB"],
}
# Password validation
# https://docs.djangoproject.com/en/4.2/ref/settings/#auth-password-validators
@ -158,11 +191,21 @@ ADMIN_APPROVAL_REQUIRED = os.environ.get("ADMIN_APPROVAL_REQUIRED", "False") ==
# SESAME_MAX_AGE = 300
# # TODO set to "home"
# LOGIN_REDIRECT_URL = "/"
SERVER_HOST = os.environ.get("SERVER_URL", "http://localhost:8000")
SERVER_PATH = os.environ.get("SERVER_PATH", "")
SERVER_URL = SERVER_HOST
if SERVER_PATH:
SERVER_URL = os.path.join(SERVER_HOST, SERVER_PATH)
LOGIN_URL = "/login/"
if SERVER_PATH:
LOGIN_URL = f"/{SERVER_PATH}/login/"
SERVER_URL = os.environ.get("SERVER_URL", "http://localhost:8000")
CSRF_TRUSTED_ORIGINS = [SERVER_URL]
CSRF_TRUSTED_ORIGINS = [SERVER_HOST]
AMBIT_URL = "http://localhost:9001"
DEFAULT_VALUES = {"description": "no description"}
@ -187,10 +230,17 @@ PLUGIN_DIR = os.path.join(EP_DATA_DIR, "plugins")
if not os.path.exists(PLUGIN_DIR):
os.mkdir(PLUGIN_DIR)
API_PAGINATION_DEFAULT_PAGE_SIZE = int(os.environ.get("API_PAGINATION_DEFAULT_PAGE_SIZE", 50))
PAGINATION_MAX_PER_PAGE_SIZE = int(
os.environ.get("API_PAGINATION_MAX_PAGE_SIZE", 100)
) # Ninja override
# Set this as our static root dir
STATIC_ROOT = STATIC_DIR
STATIC_URL = "/static/"
if SERVER_PATH:
STATIC_URL = f"/{SERVER_PATH}/static/"
# Where the sources are stored...
STATICFILES_DIRS = (BASE_DIR / "static",)
@ -254,9 +304,8 @@ if not FLAG_CELERY_PRESENT:
# Celery Configuration Options
CELERY_TIMEZONE = "Europe/Berlin"
# Celery Configuration
CELERY_BROKER_URL = "redis://localhost:6379/0" # Use Redis as message broker
CELERY_RESULT_BACKEND = "redis://localhost:6379/1"
CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379/1")
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
@ -288,22 +337,21 @@ DEFAULT_MODEL_PARAMS = {
"num_chains": 10,
}
DEFAULT_MAX_NUMBER_OF_NODES = 30
DEFAULT_MAX_DEPTH = 5
DEFAULT_MAX_NUMBER_OF_NODES = 50
DEFAULT_MAX_DEPTH = 8
DEFAULT_MODEL_THRESHOLD = 0.25
# Loading Plugins
PLUGINS_ENABLED = os.environ.get("PLUGINS_ENABLED", "False") == "True"
if PLUGINS_ENABLED:
from utilities.plugin import discover_plugins
CLASSIFIER_PLUGINS = discover_plugins(_cls=Classifier)
PROPERTY_PLUGINS = discover_plugins(_cls=Property)
DESCRIPTOR_PLUGINS = discover_plugins(_cls=Descriptor)
BASE_PLUGINS = os.environ.get("BASE_PLUGINS", None)
if BASE_PLUGINS:
BASE_PLUGINS = BASE_PLUGINS.split(",")
else:
CLASSIFIER_PLUGINS = {}
PROPERTY_PLUGINS = {}
DESCRIPTOR_PLUGINS = {}
BASE_PLUGINS = []
CLASSIFIER_PLUGINS = {}
PROPERTY_PLUGINS = {}
DESCRIPTOR_PLUGINS = {}
SENTRY_ENABLED = os.environ.get("SENTRY_ENABLED", "False") == "True"
if SENTRY_ENABLED:
@ -327,6 +375,10 @@ if SENTRY_ENABLED:
before_send=before_send,
)
IUCLID_EXPORT_ENABLED = os.environ.get("IUCLID_EXPORT_ENABLED", "False") == "True"
if IUCLID_EXPORT_ENABLED:
INSTALLED_APPS.append("epiuclid")
# compile into digestible flags
FLAGS = {
"MODEL_BUILDING": MODEL_BUILDING_ENABLED,
@ -335,28 +387,34 @@ FLAGS = {
"SENTRY": SENTRY_ENABLED,
"ENVIFORMER": ENVIFORMER_PRESENT,
"APPLICABILITY_DOMAIN": APPLICABILITY_DOMAIN_ENABLED,
"IUCLID_EXPORT": IUCLID_EXPORT_ENABLED,
}
# path of the URL are checked via "startswith"
# -> /password_reset/done is covered as well
LOGIN_EXEMPT_URLS = [
"/register",
"/api/v1/", # Let API handle its own authentication
"/api/legacy/",
"/o/token/",
"/o/userinfo/",
"/password_reset/",
"/reset/",
"/microsoft/",
"/terms",
"/privacy",
"/cookie-policy",
"/about",
"/contact",
"/jobs",
"/careers",
"/cite",
"/legal",
"/entra/",
"/auth/",
]
if SERVER_PATH:
LOGIN_EXEMPT_URLS = [f"/{SERVER_PATH}{x}" for x in LOGIN_EXEMPT_URLS]
# MS AD/Entra
MS_ENTRA_ENABLED = os.environ.get("MS_ENTRA_ENABLED", "False") == "True"
if MS_ENTRA_ENABLED:
@ -372,3 +430,47 @@ if MS_ENTRA_ENABLED:
# Site ID 10 -> beta.envipath.org
MATOMO_SITE_ID = os.environ.get("MATOMO_SITE_ID", "10")
# CAP
CAP_ENABLED = os.environ.get("CAP_ENABLED", "False") == "True"
CAP_API_BASE = os.environ.get("CAP_API_BASE", None)
CAP_SITE_KEY = os.environ.get("CAP_SITE_KEY", None)
CAP_SECRET_KEY = os.environ.get("CAP_SECRET_KEY", None)
# Biotransformer
BIOTRANSFORMER_ENABLED = os.environ.get("BIOTRANSFORMER_ENABLED", "False") == "True"
FLAGS["BIOTRANSFORMER"] = BIOTRANSFORMER_ENABLED
if BIOTRANSFORMER_ENABLED:
BIOTRANSFORMER_URL = os.environ.get("BIOTRANSFORMER_URL", None)
# PES
PES_API_MAPPING = os.environ.get("PES_API_MAPPING", None)
if PES_API_MAPPING:
import json
PES_API_MAPPING = json.loads(PES_API_MAPPING)
else:
PES_API_MAPPING = {}
# Entra Groups
ENTRA_GROUPS = os.environ.get("ENTRA_GROUPS", None)
if ENTRA_GROUPS:
import json
ENTRA_GROUPS = json.loads(ENTRA_GROUPS)
else:
ENTRA_GROUPS = {}
ENTRA_SECRET_GROUPS = os.environ.get("ENTRA_SECRET_GROUPS", None)
if ENTRA_SECRET_GROUPS:
import json
ENTRA_SECRET_GROUPS = json.loads(ENTRA_SECRET_GROUPS)
else:
ENTRA_SECRET_GROUPS = {}
# PES Data Pools vs Entra Mapping
DATA_POOL_MAPPING = os.environ.get("DATA_POOL_MAPPING", None)
if DATA_POOL_MAPPING:
import json
DATA_POOL_MAPPING = json.loads(DATA_POOL_MAPPING)
else:
DATA_POOL_MAPPING = {}

View File

@ -21,14 +21,32 @@ from django.urls import include, path
from .api import api_v1, api_legacy
PATH_PREFIX = s.SERVER_PATH
if PATH_PREFIX and not PATH_PREFIX.endswith("/"):
PATH_PREFIX += "/"
urlpatterns = [
path("", include("epdb.urls")),
path("", include("migration.urls")),
path("admin/", admin.site.urls),
path("api/v1/", api_v1.urls),
path("api/legacy/", api_legacy.urls),
path("o/", include("oauth2_provider.urls", namespace="oauth2_provider")),
path(f"{PATH_PREFIX}", include("epdb.urls")),
path(f"{PATH_PREFIX}admin/", admin.site.urls),
path(f"{PATH_PREFIX}api/v1/", api_v1.urls),
path(f"{PATH_PREFIX}api/legacy/", api_legacy.urls),
path(f"{PATH_PREFIX}o/", include("oauth2_provider.urls", namespace="oauth2_provider")),
]
if "migration" in s.INSTALLED_APPS:
urlpatterns.append(path(f"{PATH_PREFIX}", include("migration.urls")))
if s.MS_ENTRA_ENABLED:
urlpatterns.append(path("", include("epauth.urls")))
urlpatterns.append(path(f"{PATH_PREFIX}", include("epauth.urls")))
if s.TENANT != "public":
urlpatterns.append(
path(f"{PATH_PREFIX}", include(f"{s.TENANT}.urls"))
)
# Custom error handlers
handler400 = "epdb.views.handler400"
handler403 = "epdb.views.handler403"
handler404 = "epdb.views.handler404"
handler500 = "epdb.views.handler500"

0
epapi/__init__.py Normal file
View File

6
epapi/apps.py Normal file
View File

@ -0,0 +1,6 @@
from django.apps import AppConfig
class EpapiConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "epapi"

View File

1
epapi/tests/__init__.py Normal file
View File

@ -0,0 +1 @@
# Tests for epapi app

View File

@ -0,0 +1 @@
"""Tests for epapi utility modules."""

View File

@ -0,0 +1,218 @@
"""
Tests for validation error utilities.
Tests the format_validation_error() and handle_validation_error() functions
that transform Pydantic validation errors into user-friendly messages.
"""
from django.test import TestCase, tag
import json
from pydantic import BaseModel, ValidationError, field_validator
from typing import Literal
from ninja.errors import HttpError
from epapi.utils.validation_errors import format_validation_error, handle_validation_error
@tag("api", "utils")
class ValidationErrorUtilityTests(TestCase):
"""Test validation error utility functions."""
def test_format_missing_field_error(self):
"""Test formatting of missing required field error."""
# Create a model with required field
class TestModel(BaseModel):
required_field: str
# Trigger validation error
try:
TestModel()
except ValidationError as e:
errors = e.errors()
self.assertEqual(len(errors), 1)
formatted = format_validation_error(errors[0])
self.assertEqual(formatted, "This field is required")
def test_format_enum_error(self):
"""Test formatting of enum validation error."""
class TestModel(BaseModel):
status: Literal["active", "inactive"]
try:
TestModel(status="invalid")
except ValidationError as e:
errors = e.errors()
self.assertEqual(len(errors), 1)
formatted = format_validation_error(errors[0])
# Literal errors get formatted as "Please enter ..." with the valid options
self.assertIn("Please enter", formatted)
self.assertIn("active", formatted)
self.assertIn("inactive", formatted)
def test_format_type_errors(self):
"""Test formatting of type validation errors (string, int, float)."""
test_cases = [
# (field_type, invalid_value, expected_message)
# Note: We don't check exact error_type as Pydantic may use different types
# (e.g., int_type vs int_parsing) but we verify the formatted message is correct
(str, 123, "Please enter a valid string"),
(int, "not_a_number", "Please enter a valid int"),
(float, "not_a_float", "Please enter a valid float"),
]
for field_type, invalid_value, expected_message in test_cases:
with self.subTest(field_type=field_type.__name__):
class TestModel(BaseModel):
field: field_type
try:
TestModel(field=invalid_value)
except ValidationError as e:
errors = e.errors()
self.assertEqual(len(errors), 1)
formatted = format_validation_error(errors[0])
self.assertEqual(formatted, expected_message)
def test_format_value_error(self):
"""Test formatting of value error from custom validator."""
class TestModel(BaseModel):
age: int
@field_validator("age")
@classmethod
def validate_age(cls, v):
if v < 0:
raise ValueError("Age must be positive")
return v
try:
TestModel(age=-5)
except ValidationError as e:
errors = e.errors()
self.assertEqual(len(errors), 1)
formatted = format_validation_error(errors[0])
self.assertEqual(formatted, "Age must be positive")
def test_format_unknown_error_type_fallback(self):
"""Test that unknown error types fall back to default formatting."""
# Mock an error with an unknown type
mock_error = {
"type": "unknown_custom_type",
"msg": "Input should be a valid email address",
"ctx": {},
}
formatted = format_validation_error(mock_error)
# Should use the else branch which does replacements on the message
self.assertEqual(formatted, "Please enter a valid email address")
def test_handle_validation_error_structure(self):
"""Test that handle_validation_error raises HttpError with correct structure."""
class TestModel(BaseModel):
name: str
count: int
try:
TestModel(name=123, count="invalid")
except ValidationError as e:
# handle_validation_error should raise HttpError
with self.assertRaises(HttpError) as context:
handle_validation_error(e)
http_error = context.exception
self.assertEqual(http_error.status_code, 400)
# Parse the JSON from the error message
error_data = json.loads(http_error.message)
# Check structure
self.assertEqual(error_data["type"], "validation_error")
self.assertIn("field_errors", error_data)
self.assertIn("message", error_data)
self.assertEqual(error_data["message"], "Please correct the errors below")
# Check that both fields have errors
self.assertIn("name", error_data["field_errors"])
self.assertIn("count", error_data["field_errors"])
def test_handle_validation_error_no_pydantic_internals(self):
"""Test that handle_validation_error doesn't expose Pydantic internals."""
class TestModel(BaseModel):
email: str
try:
TestModel(email=123)
except ValidationError as e:
with self.assertRaises(HttpError) as context:
handle_validation_error(e)
http_error = context.exception
error_data = json.loads(http_error.message)
error_str = json.dumps(error_data)
# Ensure no Pydantic internals are exposed
self.assertNotIn("pydantic", error_str.lower())
self.assertNotIn("https://errors.pydantic.dev", error_str)
self.assertNotIn("loc", error_str)
def test_handle_validation_error_user_friendly_messages(self):
"""Test that all error messages are user-friendly."""
class TestModel(BaseModel):
name: str
age: int
status: Literal["active", "inactive"]
try:
TestModel(name=123, status="invalid") # Multiple errors
except ValidationError as e:
with self.assertRaises(HttpError) as context:
handle_validation_error(e)
http_error = context.exception
error_data = json.loads(http_error.message)
# All messages should be user-friendly (contain "Please" or "This field")
for field, messages in error_data["field_errors"].items():
for message in messages:
# User-friendly messages start with "Please" or "This field"
self.assertTrue(
message.startswith("Please") or message.startswith("This field"),
f"Message '{message}' is not user-friendly",
)
def test_handle_validation_error_multiple_errors_same_field(self):
"""Test handling multiple validation errors for the same field."""
class TestModel(BaseModel):
value: int
@field_validator("value")
@classmethod
def validate_range(cls, v):
if v < 0:
raise ValueError("Must be non-negative")
if v > 100:
raise ValueError("Must be at most 100")
return v
# Test with string (type error) - this will fail before the validator runs
try:
TestModel(value="invalid")
except ValidationError as e:
with self.assertRaises(HttpError) as context:
handle_validation_error(e)
http_error = context.exception
error_data = json.loads(http_error.message)
# Should have error for 'value' field
self.assertIn("value", error_data["field_errors"])
self.assertIsInstance(error_data["field_errors"]["value"], list)
self.assertGreater(len(error_data["field_errors"]["value"]), 0)

View File

@ -0,0 +1 @@
# Tests for epapi v1 API

View File

@ -0,0 +1,446 @@
"""
Tests for Additional Information API endpoints.
Tests CRUD operations on scenario additional information including the new PATCH endpoint.
"""
from django.test import TestCase, tag
import json
from uuid import uuid4
from epdb.logic import PackageManager, UserManager
from epdb.models import Scenario
@tag("api", "additional_information")
class AdditionalInformationAPITests(TestCase):
"""Test additional information API endpoints."""
@classmethod
def setUpTestData(cls):
"""Set up test data: user, package, and scenario."""
cls.user = UserManager.create_user(
"ai-test-user",
"ai-test@envipath.com",
"SuperSafe",
set_setting=False,
add_to_group=False,
is_active=True,
)
cls.other_user = UserManager.create_user(
"ai-other-user",
"ai-other@envipath.com",
"SuperSafe",
set_setting=False,
add_to_group=False,
is_active=True,
)
cls.package = PackageManager.create_package(
cls.user, "AI Test Package", "Test package for additional information"
)
# Package owned by other_user (no access for cls.user)
cls.other_package = PackageManager.create_package(
cls.other_user, "Other Package", "Package without access"
)
# Create a scenario for testing
cls.scenario = Scenario.objects.create(
package=cls.package,
name="Test Scenario",
description="Test scenario for additional information tests",
scenario_type="biodegradation",
scenario_date="2024-01-01",
)
cls.other_scenario = Scenario.objects.create(
package=cls.other_package,
name="Other Scenario",
description="Scenario in package without access",
scenario_type="biodegradation",
scenario_date="2024-01-01",
)
def test_list_all_schemas(self):
"""Test GET /api/v1/information/schema/ returns all schemas."""
self.client.force_login(self.user)
response = self.client.get("/api/v1/information/schema/")
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertIsInstance(data, dict)
# Should have multiple schemas
self.assertGreater(len(data), 0)
# Each schema should have RJSF format
for name, schema in data.items():
self.assertIn("schema", schema)
self.assertIn("uiSchema", schema)
self.assertIn("formData", schema)
self.assertIn("groups", schema)
def test_get_specific_schema(self):
"""Test GET /api/v1/information/schema/{model_name}/ returns specific schema."""
self.client.force_login(self.user)
# Assuming 'temperature' is a valid model
response = self.client.get("/api/v1/information/schema/temperature/")
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertIn("schema", data)
self.assertIn("uiSchema", data)
def test_get_nonexistent_schema_returns_404(self):
"""Test GET for non-existent schema returns 404."""
self.client.force_login(self.user)
response = self.client.get("/api/v1/information/schema/nonexistent/")
self.assertEqual(response.status_code, 404)
def test_list_scenario_information_empty(self):
"""Test GET /api/v1/scenario/{uuid}/information/ returns empty list initially."""
self.client.force_login(self.user)
response = self.client.get(f"/api/v1/scenario/{self.scenario.uuid}/information/")
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertIsInstance(data, list)
self.assertEqual(len(data), 0)
def test_create_additional_information(self):
"""Test POST creates additional information."""
self.client.force_login(self.user)
# Create temperature information (assuming temperature model exists)
payload = {"interval": {"start": 20, "end": 25}}
response = self.client.post(
f"/api/v1/scenario/{self.scenario.uuid}/information/temperature/",
data=json.dumps(payload),
content_type="application/json",
)
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertEqual(data["status"], "created")
self.assertIn("uuid", data)
self.assertIsNotNone(data["uuid"])
def test_create_with_invalid_data_returns_400(self):
"""Test POST with invalid data returns 400 with validation errors."""
self.client.force_login(self.user)
# Invalid data (missing required fields or wrong types)
payload = {"invalid_field": "value"}
response = self.client.post(
f"/api/v1/scenario/{self.scenario.uuid}/information/temperature/",
data=json.dumps(payload),
content_type="application/json",
)
self.assertEqual(response.status_code, 400)
data = response.json()
# Should have validation error details in 'detail' field
self.assertIn("detail", data)
def test_validation_errors_are_user_friendly(self):
"""Test that validation errors are user-friendly and field-specific."""
self.client.force_login(self.user)
# Invalid data - wrong type (string instead of number in interval)
payload = {"interval": {"start": "not_a_number", "end": 25}}
response = self.client.post(
f"/api/v1/scenario/{self.scenario.uuid}/information/temperature/",
data=json.dumps(payload),
content_type="application/json",
)
self.assertEqual(response.status_code, 400)
data = response.json()
# Parse the error response - Django Ninja wraps errors in 'detail'
error_str = data.get("detail") or data.get("error")
self.assertIsNotNone(error_str, "Response should contain error details")
# Parse the JSON error string
error_data = json.loads(error_str)
# Check structure
self.assertEqual(error_data.get("type"), "validation_error")
self.assertIn("field_errors", error_data)
self.assertIn("message", error_data)
# Ensure error messages are user-friendly (no Pydantic URLs or technical jargon)
error_str = json.dumps(error_data)
self.assertNotIn("pydantic", error_str.lower())
self.assertNotIn("https://errors.pydantic.dev", error_str)
self.assertNotIn("loc", error_str) # No technical field like 'loc'
# Check that error message is helpful
self.assertIn("Please", error_data["message"]) # User-friendly language
def test_patch_additional_information(self):
"""Test PATCH updates existing additional information."""
self.client.force_login(self.user)
# First create an item
create_payload = {"interval": {"start": 20, "end": 25}}
create_response = self.client.post(
f"/api/v1/scenario/{self.scenario.uuid}/information/temperature/",
data=json.dumps(create_payload),
content_type="application/json",
)
item_uuid = create_response.json()["uuid"]
# Then update it with PATCH
update_payload = {"interval": {"start": 30, "end": 35}}
patch_response = self.client.patch(
f"/api/v1/scenario/{self.scenario.uuid}/information/item/{item_uuid}/",
data=json.dumps(update_payload),
content_type="application/json",
)
self.assertEqual(patch_response.status_code, 200)
data = patch_response.json()
self.assertEqual(data["status"], "updated")
self.assertEqual(data["uuid"], item_uuid) # UUID preserved
# Verify the data was updated
list_response = self.client.get(f"/api/v1/scenario/{self.scenario.uuid}/information/")
items = list_response.json()
self.assertEqual(len(items), 1)
updated_item = items[0]
self.assertEqual(updated_item["uuid"], item_uuid)
self.assertEqual(updated_item["data"]["interval"]["start"], 30)
self.assertEqual(updated_item["data"]["interval"]["end"], 35)
def test_patch_nonexistent_item_returns_404(self):
"""Test PATCH on non-existent item returns 404."""
self.client.force_login(self.user)
fake_uuid = str(uuid4())
payload = {"interval": {"start": 30, "end": 35}}
response = self.client.patch(
f"/api/v1/scenario/{self.scenario.uuid}/information/item/{fake_uuid}/",
data=json.dumps(payload),
content_type="application/json",
)
self.assertEqual(response.status_code, 404)
def test_patch_with_invalid_data_returns_400(self):
"""Test PATCH with invalid data returns 400."""
self.client.force_login(self.user)
# First create an item
create_payload = {"interval": {"start": 20, "end": 25}}
create_response = self.client.post(
f"/api/v1/scenario/{self.scenario.uuid}/information/temperature/",
data=json.dumps(create_payload),
content_type="application/json",
)
item_uuid = create_response.json()["uuid"]
# Try to update with invalid data
invalid_payload = {"invalid_field": "value"}
patch_response = self.client.patch(
f"/api/v1/scenario/{self.scenario.uuid}/information/item/{item_uuid}/",
data=json.dumps(invalid_payload),
content_type="application/json",
)
self.assertEqual(patch_response.status_code, 400)
def test_patch_validation_errors_are_user_friendly(self):
"""Test that PATCH validation errors are user-friendly and field-specific."""
self.client.force_login(self.user)
# First create an item
create_payload = {"interval": {"start": 20, "end": 25}}
create_response = self.client.post(
f"/api/v1/scenario/{self.scenario.uuid}/information/temperature/",
data=json.dumps(create_payload),
content_type="application/json",
)
item_uuid = create_response.json()["uuid"]
# Update with invalid data - wrong type (string instead of number in interval)
invalid_payload = {"interval": {"start": "not_a_number", "end": 25}}
patch_response = self.client.patch(
f"/api/v1/scenario/{self.scenario.uuid}/information/item/{item_uuid}/",
data=json.dumps(invalid_payload),
content_type="application/json",
)
self.assertEqual(patch_response.status_code, 400)
data = patch_response.json()
# Parse the error response - Django Ninja wraps errors in 'detail'
error_str = data.get("detail") or data.get("error")
self.assertIsNotNone(error_str, "Response should contain error details")
# Parse the JSON error string
error_data = json.loads(error_str)
# Check structure
self.assertEqual(error_data.get("type"), "validation_error")
self.assertIn("field_errors", error_data)
self.assertIn("message", error_data)
# Ensure error messages are user-friendly (no Pydantic URLs or technical jargon)
error_str = json.dumps(error_data)
self.assertNotIn("pydantic", error_str.lower())
self.assertNotIn("https://errors.pydantic.dev", error_str)
self.assertNotIn("loc", error_str) # No technical field like 'loc'
# Check that error message is helpful
self.assertIn("Please", error_data["message"]) # User-friendly language
def test_delete_additional_information(self):
"""Test DELETE removes additional information."""
self.client.force_login(self.user)
# Create an item
create_payload = {"interval": {"start": 20, "end": 25}}
create_response = self.client.post(
f"/api/v1/scenario/{self.scenario.uuid}/information/temperature/",
data=json.dumps(create_payload),
content_type="application/json",
)
item_uuid = create_response.json()["uuid"]
# Delete it
delete_response = self.client.delete(
f"/api/v1/scenario/{self.scenario.uuid}/information/item/{item_uuid}/"
)
self.assertEqual(delete_response.status_code, 200)
data = delete_response.json()
self.assertEqual(data["status"], "deleted")
# Verify deletion
list_response = self.client.get(f"/api/v1/scenario/{self.scenario.uuid}/information/")
items = list_response.json()
self.assertEqual(len(items), 0)
def test_delete_nonexistent_item_returns_404(self):
"""Test DELETE on non-existent item returns 404."""
self.client.force_login(self.user)
fake_uuid = str(uuid4())
response = self.client.delete(
f"/api/v1/scenario/{self.scenario.uuid}/information/item/{fake_uuid}/"
)
self.assertEqual(response.status_code, 404)
def test_multiple_items_crud(self):
"""Test creating, updating, and deleting multiple items."""
self.client.force_login(self.user)
# Create first item
item1_payload = {"interval": {"start": 20, "end": 25}}
response1 = self.client.post(
f"/api/v1/scenario/{self.scenario.uuid}/information/temperature/",
data=json.dumps(item1_payload),
content_type="application/json",
)
item1_uuid = response1.json()["uuid"]
# Create second item (different type if available, or same type)
item2_payload = {"interval": {"start": 30, "end": 35}}
response2 = self.client.post(
f"/api/v1/scenario/{self.scenario.uuid}/information/temperature/",
data=json.dumps(item2_payload),
content_type="application/json",
)
item2_uuid = response2.json()["uuid"]
# Verify both exist
list_response = self.client.get(f"/api/v1/scenario/{self.scenario.uuid}/information/")
items = list_response.json()
self.assertEqual(len(items), 2)
# Update first item
update_payload = {"interval": {"start": 15, "end": 20}}
self.client.patch(
f"/api/v1/scenario/{self.scenario.uuid}/information/item/{item1_uuid}/",
data=json.dumps(update_payload),
content_type="application/json",
)
# Delete second item
self.client.delete(f"/api/v1/scenario/{self.scenario.uuid}/information/item/{item2_uuid}/")
# Verify final state: one item with updated data
list_response = self.client.get(f"/api/v1/scenario/{self.scenario.uuid}/information/")
items = list_response.json()
self.assertEqual(len(items), 1)
self.assertEqual(items[0]["uuid"], item1_uuid)
self.assertEqual(items[0]["data"]["interval"]["start"], 15)
def test_list_info_denied_without_permission(self):
"""User cannot list info for scenario in package they don't have access to"""
self.client.force_login(self.user)
response = self.client.get(f"/api/v1/scenario/{self.other_scenario.uuid}/information/")
self.assertEqual(response.status_code, 403)
def test_add_info_denied_without_permission(self):
"""User cannot add info to scenario in package they don't have access to"""
self.client.force_login(self.user)
payload = {"interval": {"start": 25, "end": 30}}
response = self.client.post(
f"/api/v1/scenario/{self.other_scenario.uuid}/information/temperature/",
json.dumps(payload),
content_type="application/json",
)
self.assertEqual(response.status_code, 403)
def test_update_info_denied_without_permission(self):
"""User cannot update info in scenario they don't have access to"""
self.client.force_login(self.other_user)
# First create an item as other_user
create_payload = {"interval": {"start": 20, "end": 25}}
create_response = self.client.post(
f"/api/v1/scenario/{self.other_scenario.uuid}/information/temperature/",
data=json.dumps(create_payload),
content_type="application/json",
)
item_uuid = create_response.json()["uuid"]
# Try to update as user (who doesn't have access)
self.client.force_login(self.user)
update_payload = {"interval": {"start": 30, "end": 35}}
response = self.client.patch(
f"/api/v1/scenario/{self.other_scenario.uuid}/information/item/{item_uuid}/",
data=json.dumps(update_payload),
content_type="application/json",
)
self.assertEqual(response.status_code, 403)
def test_delete_info_denied_without_permission(self):
"""User cannot delete info from scenario they don't have access to"""
self.client.force_login(self.other_user)
# First create an item as other_user
create_payload = {"interval": {"start": 20, "end": 25}}
create_response = self.client.post(
f"/api/v1/scenario/{self.other_scenario.uuid}/information/temperature/",
data=json.dumps(create_payload),
content_type="application/json",
)
item_uuid = create_response.json()["uuid"]
# Try to delete as user (who doesn't have access)
self.client.force_login(self.user)
response = self.client.delete(
f"/api/v1/scenario/{self.other_scenario.uuid}/information/item/{item_uuid}/"
)
self.assertEqual(response.status_code, 403)
def test_nonexistent_scenario_returns_404(self):
"""Test operations on non-existent scenario return 404."""
self.client.force_login(self.user)
fake_uuid = uuid4()
response = self.client.get(f"/api/v1/scenario/{fake_uuid}/information/")
self.assertEqual(response.status_code, 404)

View File

@ -0,0 +1,477 @@
from django.test import TestCase, tag
from epdb.logic import GroupManager, PackageManager, UserManager
from epdb.models import (
Compound,
GroupPackagePermission,
Permission,
UserPackagePermission,
)
@tag("api", "end2end")
class APIPermissionTestBase(TestCase):
"""
Base class for API permission tests.
Sets up common test data:
- user1: Owner of packages
- user2: User with various permissions
- user3: User with no permissions
- reviewed_package: Public package (reviewed=True)
- unreviewed_package_owned: Unreviewed package owned by user1
- unreviewed_package_read: Unreviewed package with READ permission for user2
- unreviewed_package_write: Unreviewed package with WRITE permission for user2
- unreviewed_package_all: Unreviewed package with ALL permission for user2
- unreviewed_package_no_access: Unreviewed package with no permissions for user2/user3
- group_package: Unreviewed package accessible via group permission
- test_group: Group containing user2
"""
@classmethod
def setUpTestData(cls):
# Create users
cls.user1 = UserManager.create_user(
"permission-user1",
"permission-user1@envipath.com",
"SuperSafe",
set_setting=False,
add_to_group=False,
is_active=True,
)
cls.user2 = UserManager.create_user(
"permission-user2",
"permission-user2@envipath.com",
"SuperSafe",
set_setting=False,
add_to_group=False,
is_active=True,
)
cls.user3 = UserManager.create_user(
"permission-user3",
"permission-user3@envipath.com",
"SuperSafe",
set_setting=False,
add_to_group=False,
is_active=True,
)
# Delete default packages to ensure clean test data
for user in [cls.user1, cls.user2, cls.user3]:
default_pkg = user.default_package
user.default_package = None
user.save()
if default_pkg:
default_pkg.delete()
# Create reviewed package (public)
cls.reviewed_package = PackageManager.create_package(
cls.user1, "Reviewed Package", "Public package"
)
cls.reviewed_package.reviewed = True
cls.reviewed_package.save()
# Create unreviewed packages with various permissions
cls.unreviewed_package_owned = PackageManager.create_package(
cls.user1, "User1 Owned Package", "Owned by user1"
)
cls.unreviewed_package_read = PackageManager.create_package(
cls.user1, "User2 Read Package", "User2 has READ permission"
)
UserPackagePermission.objects.create(
user=cls.user2, package=cls.unreviewed_package_read, permission=Permission.READ[0]
)
cls.unreviewed_package_write = PackageManager.create_package(
cls.user1, "User2 Write Package", "User2 has WRITE permission"
)
UserPackagePermission.objects.create(
user=cls.user2, package=cls.unreviewed_package_write, permission=Permission.WRITE[0]
)
cls.unreviewed_package_all = PackageManager.create_package(
cls.user1, "User2 All Package", "User2 has ALL permission"
)
UserPackagePermission.objects.create(
user=cls.user2, package=cls.unreviewed_package_all, permission=Permission.ALL[0]
)
cls.unreviewed_package_no_access = PackageManager.create_package(
cls.user1, "No Access Package", "No permissions for user2/user3"
)
# Create group and group package
cls.test_group = GroupManager.create_group(
cls.user1, "Test Group", "Group for permission testing"
)
cls.test_group.user_member.add(cls.user2)
cls.test_group.save()
cls.group_package = PackageManager.create_package(
cls.user1, "Group Package", "Accessible via group permission"
)
GroupPackagePermission.objects.create(
group=cls.test_group, package=cls.group_package, permission=Permission.READ[0]
)
# Create test compounds in each package
cls.reviewed_compound = Compound.create(
cls.reviewed_package, "C", "Reviewed Compound", "Test compound"
)
cls.owned_compound = Compound.create(
cls.unreviewed_package_owned, "CC", "Owned Compound", "Test compound"
)
cls.read_compound = Compound.create(
cls.unreviewed_package_read, "CCC", "Read Compound", "Test compound"
)
cls.write_compound = Compound.create(
cls.unreviewed_package_write, "CCCC", "Write Compound", "Test compound"
)
cls.all_compound = Compound.create(
cls.unreviewed_package_all, "CCCCC", "All Compound", "Test compound"
)
cls.no_access_compound = Compound.create(
cls.unreviewed_package_no_access, "CCCCCC", "No Access Compound", "Test compound"
)
cls.group_compound = Compound.create(
cls.group_package, "CCCCCCC", "Group Compound", "Test compound"
)
@tag("api", "end2end")
class PackageListPermissionTest(APIPermissionTestBase):
"""
Test permissions for /api/v1/packages/ endpoint.
Special case: This endpoint allows anonymous access (auth=None)
"""
ENDPOINT = "/api/v1/packages/"
def test_anonymous_user_sees_only_reviewed_packages(self):
"""Anonymous users should only see reviewed packages."""
self.client.logout()
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
payload = response.json()
# Should only see reviewed package
self.assertEqual(payload["total_items"], 1)
self.assertEqual(payload["items"][0]["uuid"], str(self.reviewed_package.uuid))
self.assertEqual(payload["items"][0]["review_status"], "reviewed")
def test_authenticated_user_sees_all_readable_packages(self):
"""Authenticated users see reviewed + packages they have access to."""
self.client.force_login(self.user2)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
payload = response.json()
# user2 should see:
# - reviewed_package (public)
# - unreviewed_package_read (READ permission)
# - unreviewed_package_write (WRITE permission)
# - unreviewed_package_all (ALL permission)
# - group_package (via group membership)
# Total: 5 packages
self.assertEqual(payload["total_items"], 5)
visible_uuids = {item["uuid"] for item in payload["items"]}
expected_uuids = {
str(self.reviewed_package.uuid),
str(self.unreviewed_package_read.uuid),
str(self.unreviewed_package_write.uuid),
str(self.unreviewed_package_all.uuid),
str(self.group_package.uuid),
}
self.assertEqual(visible_uuids, expected_uuids)
def test_owner_sees_all_owned_packages(self):
"""Package owner sees all packages they created."""
self.client.force_login(self.user1)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
payload = response.json()
# user1 owns all packages
# Total: 7 packages (all packages created in setUpTestData)
self.assertEqual(payload["total_items"], 7)
def test_filter_by_review_status_true(self):
"""Filter to show only reviewed packages."""
self.client.force_login(self.user2)
response = self.client.get(self.ENDPOINT, {"review_status": True})
self.assertEqual(response.status_code, 200)
payload = response.json()
# Only reviewed_package
self.assertEqual(payload["total_items"], 1)
self.assertTrue(all(item["review_status"] == "reviewed" for item in payload["items"]))
def test_filter_by_review_status_false(self):
"""Filter to show only unreviewed packages."""
self.client.force_login(self.user2)
response = self.client.get(self.ENDPOINT, {"review_status": False})
self.assertEqual(response.status_code, 200)
payload = response.json()
# user2's accessible unreviewed packages: 4
self.assertEqual(payload["total_items"], 4)
self.assertTrue(all(item["review_status"] == "unreviewed" for item in payload["items"]))
def test_anonymous_filter_unreviewed_returns_empty(self):
"""Anonymous users get no results when filtering for unreviewed."""
self.client.logout()
response = self.client.get(self.ENDPOINT, {"review_status": False})
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["total_items"], 0)
@tag("api", "end2end")
class GlobalCompoundListPermissionTest(APIPermissionTestBase):
"""
Test permissions for /api/v1/compounds/ endpoint.
This endpoint requires authentication.
"""
ENDPOINT = "/api/v1/compounds/"
def test_anonymous_user_cannot_access(self):
"""Anonymous users should get 401 Unauthorized."""
self.client.logout()
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 401)
def test_authenticated_user_sees_compounds_from_readable_packages(self):
"""Authenticated users see compounds from packages they can read."""
self.client.force_login(self.user2)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["total_items"], 5)
visible_uuids = {item["uuid"] for item in payload["items"]}
expected_uuids = {
str(self.reviewed_compound.uuid),
str(self.read_compound.uuid),
str(self.write_compound.uuid),
str(self.all_compound.uuid),
str(self.group_compound.uuid),
}
self.assertEqual(visible_uuids, expected_uuids)
def test_user_without_permission_cannot_see_compound(self):
"""User without permission to package cannot see its compounds."""
self.client.force_login(self.user3)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
payload = response.json()
# user3 should only see compounds from reviewed_package
self.assertEqual(payload["total_items"], 1)
self.assertEqual(payload["items"][0]["uuid"], str(self.reviewed_compound.uuid))
def test_owner_sees_all_compounds(self):
"""Package owner sees all compounds they created."""
self.client.force_login(self.user1)
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
payload = response.json()
# user1 owns all packages, so sees all compounds
self.assertEqual(payload["total_items"], 7)
@tag("api", "end2end")
class PackageScopedCompoundListPermissionTest(APIPermissionTestBase):
"""
Test permissions for /api/v1/package/{uuid}/compound/ endpoint.
This endpoint requires authentication AND package access.
"""
def test_anonymous_user_cannot_access_reviewed_package(self):
"""Anonymous users should get 401 even for reviewed packages."""
self.client.logout()
endpoint = f"/api/v1/package/{self.reviewed_package.uuid}/compound/"
response = self.client.get(endpoint)
self.assertEqual(response.status_code, 401)
def test_authenticated_user_can_access_reviewed_package(self):
"""Authenticated users can access reviewed packages."""
self.client.force_login(self.user3)
endpoint = f"/api/v1/package/{self.reviewed_package.uuid}/compound/"
response = self.client.get(endpoint)
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["total_items"], 1)
self.assertEqual(payload["items"][0]["uuid"], str(self.reviewed_compound.uuid))
def test_user_can_access_package_with_read_permission(self):
"""User with READ permission can access package-scoped endpoint."""
self.client.force_login(self.user2)
endpoint = f"/api/v1/package/{self.unreviewed_package_read.uuid}/compound/"
response = self.client.get(endpoint)
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["total_items"], 1)
self.assertEqual(payload["items"][0]["uuid"], str(self.read_compound.uuid))
def test_user_can_access_package_with_write_permission(self):
"""User with WRITE permission can access package-scoped endpoint."""
self.client.force_login(self.user2)
endpoint = f"/api/v1/package/{self.unreviewed_package_write.uuid}/compound/"
response = self.client.get(endpoint)
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["total_items"], 1)
self.assertEqual(payload["items"][0]["uuid"], str(self.write_compound.uuid))
def test_user_can_access_package_with_all_permission(self):
"""User with ALL permission can access package-scoped endpoint."""
self.client.force_login(self.user2)
endpoint = f"/api/v1/package/{self.unreviewed_package_all.uuid}/compound/"
response = self.client.get(endpoint)
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["total_items"], 1)
self.assertEqual(payload["items"][0]["uuid"], str(self.all_compound.uuid))
def test_user_cannot_access_package_without_permission(self):
"""User without permission gets 403 Forbidden."""
self.client.force_login(self.user2)
endpoint = f"/api/v1/package/{self.unreviewed_package_no_access.uuid}/compound/"
response = self.client.get(endpoint)
self.assertEqual(response.status_code, 403)
def test_nonexistent_package_returns_404(self):
"""Request for non-existent package returns 404."""
self.client.force_login(self.user2)
fake_uuid = "00000000-0000-0000-0000-000000000000"
endpoint = f"/api/v1/package/{fake_uuid}/compound/"
response = self.client.get(endpoint)
self.assertEqual(response.status_code, 404)
def test_owner_can_access_owned_package(self):
"""Package owner can access their package."""
self.client.force_login(self.user1)
endpoint = f"/api/v1/package/{self.unreviewed_package_owned.uuid}/compound/"
response = self.client.get(endpoint)
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["total_items"], 1)
self.assertEqual(payload["items"][0]["uuid"], str(self.owned_compound.uuid))
def test_group_member_can_access_group_package(self):
"""Group member can access package via group permission."""
self.client.force_login(self.user2)
endpoint = f"/api/v1/package/{self.group_package.uuid}/compound/"
response = self.client.get(endpoint)
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["total_items"], 1)
self.assertEqual(payload["items"][0]["uuid"], str(self.group_compound.uuid))
def test_non_group_member_cannot_access_group_package(self):
"""Non-group member cannot access package with only group permission."""
self.client.force_login(self.user3)
endpoint = f"/api/v1/package/{self.group_package.uuid}/compound/"
response = self.client.get(endpoint)
self.assertEqual(response.status_code, 403)
@tag("api", "end2end")
class MultiResourcePermissionTest(APIPermissionTestBase):
"""
Test that permission system works consistently across all resource types.
Tests a sample of other endpoints to ensure permission logic is consistent.
"""
def test_rules_endpoint_respects_permissions(self):
"""Rules endpoint uses same permission logic."""
from epdb.models import SimpleAmbitRule
# Create rule in no-access package
rule = SimpleAmbitRule.create(
self.unreviewed_package_no_access, "Test Rule", "Test", "[C:1]>>[C:1]O"
)
self.client.force_login(self.user2)
response = self.client.get("/api/v1/rules/")
self.assertEqual(response.status_code, 200)
payload = response.json()
# user2 should not see the rule from no_access_package
rule_uuids = [item["uuid"] for item in payload["items"]]
self.assertNotIn(str(rule.uuid), rule_uuids)
def test_reactions_endpoint_respects_permissions(self):
"""Reactions endpoint uses same permission logic."""
from epdb.models import Reaction
# Create reaction in no-access package
reaction = Reaction.create(
self.unreviewed_package_no_access, "Test Reaction", "Test", ["C"], ["CO"]
)
self.client.force_login(self.user2)
response = self.client.get("/api/v1/reactions/")
self.assertEqual(response.status_code, 200)
payload = response.json()
# user2 should not see the reaction from no_access_package
reaction_uuids = [item["uuid"] for item in payload["items"]]
self.assertNotIn(str(reaction.uuid), reaction_uuids)
def test_pathways_endpoint_respects_permissions(self):
"""Pathways endpoint uses same permission logic."""
from epdb.models import Pathway
# Create pathway in no-access package
pathway = Pathway.objects.create(
package=self.unreviewed_package_no_access, name="Test Pathway", description="Test"
)
self.client.force_login(self.user2)
response = self.client.get("/api/v1/pathways/")
self.assertEqual(response.status_code, 200)
payload = response.json()
# user2 should not see the pathway from no_access_package
pathway_uuids = [item["uuid"] for item in payload["items"]]
self.assertNotIn(str(pathway.uuid), pathway_uuids)

View File

@ -0,0 +1,477 @@
from django.test import TestCase, tag
from epdb.logic import PackageManager, UserManager
from epdb.models import Compound, Reaction, Pathway, EPModel, SimpleAmbitRule, Scenario
class BaseTestAPIGetPaginated:
"""
Mixin class for API pagination tests.
Subclasses must inherit from both this class and TestCase, e.g.:
class MyTest(BaseTestAPIGetPaginated, TestCase):
...
Subclasses must define:
- resource_name: Singular name (e.g., "compound")
- resource_name_plural: Plural name (e.g., "compounds")
- global_endpoint: Global listing endpoint (e.g., "/api/v1/compounds/")
- package_endpoint_template: Template for package-scoped endpoint or None
- total_reviewed: Number of reviewed items to create
- total_unreviewed: Number of unreviewed items to create
- create_reviewed_resource(cls, package, idx): Factory method
- create_unreviewed_resource(cls, package, idx): Factory method
"""
# Configuration to be overridden by subclasses
resource_name = None
resource_name_plural = None
global_endpoint = None
package_endpoint_template = None
total_reviewed = 50
total_unreviewed = 20
default_page_size = 50
max_page_size = 100
@classmethod
def setUpTestData(cls):
# Create test user
cls.user = UserManager.create_user(
f"{cls.resource_name}-user",
f"{cls.resource_name}-user@envipath.com",
"SuperSafe",
set_setting=False,
add_to_group=False,
is_active=True,
)
# Delete the auto-created default package to ensure clean test data
default_pkg = cls.user.default_package
cls.user.default_package = None
cls.user.save()
default_pkg.delete()
# Create reviewed package
cls.reviewed_package = PackageManager.create_package(
cls.user, "Reviewed Package", f"Reviewed package for {cls.resource_name} tests"
)
cls.reviewed_package.reviewed = True
cls.reviewed_package.save()
# Create unreviewed package
cls.unreviewed_package = PackageManager.create_package(
cls.user, "Draft Package", f"Unreviewed package for {cls.resource_name} tests"
)
# Create reviewed resources
for idx in range(cls.total_reviewed):
cls.create_reviewed_resource(cls.reviewed_package, idx)
# Create unreviewed resources
for idx in range(cls.total_unreviewed):
cls.create_unreviewed_resource(cls.unreviewed_package, idx)
# Set up package-scoped endpoints if applicable
if cls.package_endpoint_template:
cls.reviewed_package_endpoint = cls.package_endpoint_template.format(
uuid=cls.reviewed_package.uuid
)
cls.unreviewed_package_endpoint = cls.package_endpoint_template.format(
uuid=cls.unreviewed_package.uuid
)
@classmethod
def create_reviewed_resource(cls, package, idx):
"""
Create a single reviewed resource.
Must be implemented by subclass.
Args:
package: The package to create the resource in
idx: Index of the resource (0-based)
"""
raise NotImplementedError(f"{cls.__name__} must implement create_reviewed_resource()")
@classmethod
def create_unreviewed_resource(cls, package, idx):
"""
Create a single unreviewed resource.
Must be implemented by subclass.
Args:
package: The package to create the resource in
idx: Index of the resource (0-based)
"""
raise NotImplementedError(f"{cls.__name__} must implement create_unreviewed_resource()")
def setUp(self):
self.client.force_login(self.user)
def test_requires_session_authentication(self):
"""Test that the global endpoint requires authentication."""
self.client.logout()
response = self.client.get(self.global_endpoint)
self.assertEqual(response.status_code, 401)
def test_global_listing_uses_default_page_size(self):
"""Test that the global endpoint uses default pagination settings."""
response = self.client.get(self.global_endpoint, {"review_status": True})
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["page"], 1)
self.assertEqual(payload["page_size"], self.default_page_size)
self.assertEqual(payload["total_items"], self.total_reviewed)
# Verify only reviewed items are returned
self.assertTrue(all(item["review_status"] == "reviewed" for item in payload["items"]))
def test_can_request_later_page(self):
"""Test that pagination works for later pages."""
if self.total_reviewed <= self.default_page_size:
self.skipTest(
f"Not enough items to test pagination "
f"({self.total_reviewed} <= {self.default_page_size})"
)
response = self.client.get(self.global_endpoint, {"page": 2, "review_status": True})
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["page"], 2)
# Calculate expected items on page 2
expected_items = min(self.default_page_size, self.total_reviewed - self.default_page_size)
self.assertEqual(len(payload["items"]), expected_items)
# Verify only reviewed items are returned
self.assertTrue(all(item["review_status"] == "reviewed" for item in payload["items"]))
def test_page_size_is_capped(self):
"""Test that page size is capped at the maximum."""
if self.total_reviewed <= self.max_page_size:
self.skipTest(
f"Not enough items to test page size cap "
f"({self.total_reviewed} <= {self.max_page_size})"
)
response = self.client.get(self.global_endpoint, {"page_size": 150})
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["page_size"], self.max_page_size)
self.assertEqual(len(payload["items"]), self.max_page_size)
def test_package_endpoint_for_reviewed_package(self):
"""Test the package-scoped endpoint for reviewed packages."""
if not self.package_endpoint_template:
self.skipTest("No package endpoint for this resource")
response = self.client.get(self.reviewed_package_endpoint)
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["total_items"], self.total_reviewed)
# Verify only reviewed items are returned
self.assertTrue(all(item["review_status"] == "reviewed" for item in payload["items"]))
def test_package_endpoint_for_unreviewed_package(self):
"""Test the package-scoped endpoint for unreviewed packages."""
if not self.package_endpoint_template:
self.skipTest("No package endpoint for this resource")
response = self.client.get(self.unreviewed_package_endpoint)
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["total_items"], self.total_unreviewed)
# Verify only unreviewed items are returned
self.assertTrue(all(item["review_status"] == "unreviewed" for item in payload["items"]))
@tag("api", "end2end")
class PackagePaginationAPITest(TestCase):
ENDPOINT = "/api/v1/packages/"
@classmethod
def setUpTestData(cls):
cls.user = UserManager.create_user(
"package-user",
"package-user@envipath.com",
"SuperSafe",
set_setting=False,
add_to_group=False,
is_active=True,
)
# Delete the auto-created default package to ensure clean test data
default_pkg = cls.user.default_package
cls.user.default_package = None
cls.user.save()
default_pkg.delete()
# Create reviewed packages
cls.total_reviewed = 25
for idx in range(cls.total_reviewed):
package = PackageManager.create_package(
cls.user, f"Reviewed Package {idx:03d}", "Reviewed package for tests"
)
package.reviewed = True
package.save()
# Create unreviewed packages
cls.total_unreviewed = 15
for idx in range(cls.total_unreviewed):
PackageManager.create_package(
cls.user, f"Draft Package {idx:03d}", "Unreviewed package for tests"
)
def setUp(self):
self.client.force_login(self.user)
def test_anonymous_can_access_reviewed_packages(self):
self.client.logout()
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
payload = response.json()
# Anonymous users can only see reviewed packages
self.assertEqual(payload["total_items"], self.total_reviewed)
self.assertTrue(all(item["review_status"] == "reviewed" for item in payload["items"]))
def test_listing_uses_default_page_size(self):
response = self.client.get(self.ENDPOINT)
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["page"], 1)
self.assertEqual(payload["page_size"], 50)
self.assertEqual(payload["total_items"], self.total_reviewed + self.total_unreviewed)
def test_reviewed_filter_true(self):
response = self.client.get(self.ENDPOINT, {"review_status": True})
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["total_items"], self.total_reviewed)
self.assertTrue(all(item["review_status"] == "reviewed" for item in payload["items"]))
def test_reviewed_filter_false(self):
response = self.client.get(self.ENDPOINT, {"review_status": False})
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["total_items"], self.total_unreviewed)
self.assertTrue(all(item["review_status"] == "unreviewed" for item in payload["items"]))
def test_reviewed_filter_false_anonymous(self):
self.client.logout()
response = self.client.get(self.ENDPOINT, {"review_status": False})
self.assertEqual(response.status_code, 200)
payload = response.json()
# Anonymous users cannot access unreviewed packages
self.assertEqual(payload["total_items"], 0)
@tag("api", "end2end")
class CompoundPaginationAPITest(BaseTestAPIGetPaginated, TestCase):
"""Compound pagination tests using base class."""
resource_name = "compound"
resource_name_plural = "compounds"
global_endpoint = "/api/v1/compounds/"
package_endpoint_template = "/api/v1/package/{uuid}/compound/"
total_reviewed = 125
total_unreviewed = 35
@classmethod
def create_reviewed_resource(cls, package, idx):
simple_smiles = ["C", "CC", "CCC", "CCCC", "CCCCC"]
smiles = simple_smiles[idx % len(simple_smiles)] + ("O" * (idx // len(simple_smiles)))
return Compound.create(
package,
smiles,
f"Reviewed Compound {idx:03d}",
"Compound for pagination tests",
)
@classmethod
def create_unreviewed_resource(cls, package, idx):
simple_smiles = ["C", "CC", "CCC", "CCCC", "CCCCC"]
smiles = simple_smiles[idx % len(simple_smiles)] + ("N" * (idx // len(simple_smiles)))
return Compound.create(
package,
smiles,
f"Draft Compound {idx:03d}",
"Compound for pagination tests",
)
@tag("api", "end2end")
class RulePaginationAPITest(BaseTestAPIGetPaginated, TestCase):
"""Rule pagination tests using base class."""
resource_name = "rule"
resource_name_plural = "rules"
global_endpoint = "/api/v1/rules/"
package_endpoint_template = "/api/v1/package/{uuid}/rule/"
total_reviewed = 125
total_unreviewed = 35
@classmethod
def create_reviewed_resource(cls, package, idx):
# Create unique SMIRKS by combining chain length and functional group variations
# This ensures each idx gets a truly unique SMIRKS pattern
carbon_chain = "C" * (idx + 1) # C, CC, CCC, ... (grows with idx)
smirks = f"[{carbon_chain}:1]>>[{carbon_chain}:1]O"
return SimpleAmbitRule.create(
package,
f"Reviewed Rule {idx:03d}",
f"Rule {idx} for pagination tests",
smirks,
)
@classmethod
def create_unreviewed_resource(cls, package, idx):
# Create unique SMIRKS by varying the carbon chain length
carbon_chain = "C" * (idx + 1) # C, CC, CCC, ... (grows with idx)
smirks = f"[{carbon_chain}:1]>>[{carbon_chain}:1]N"
return SimpleAmbitRule.create(
package,
f"Draft Rule {idx:03d}",
f"Rule {idx} for pagination tests",
smirks,
)
@tag("api", "end2end")
class ReactionPaginationAPITest(BaseTestAPIGetPaginated, TestCase):
"""Reaction pagination tests using base class."""
resource_name = "reaction"
resource_name_plural = "reactions"
global_endpoint = "/api/v1/reactions/"
package_endpoint_template = "/api/v1/package/{uuid}/reaction/"
total_reviewed = 125
total_unreviewed = 35
@classmethod
def create_reviewed_resource(cls, package, idx):
# Generate unique SMILES with growing chain lengths to avoid duplicates
# Each idx gets a unique chain length
educt_smiles = "C" * (idx + 1) # C, CC, CCC, ... (grows with idx)
product_smiles = educt_smiles + "O"
return Reaction.create(
package=package,
name=f"Reviewed Reaction {idx:03d}",
description="Reaction for pagination tests",
educts=[educt_smiles],
products=[product_smiles],
)
@classmethod
def create_unreviewed_resource(cls, package, idx):
# Generate unique SMILES with growing chain lengths to avoid duplicates
# Each idx gets a unique chain length
educt_smiles = "C" * (idx + 1) # C, CC, CCC, ... (grows with idx)
product_smiles = educt_smiles + "N"
return Reaction.create(
package=package,
name=f"Draft Reaction {idx:03d}",
description="Reaction for pagination tests",
educts=[educt_smiles],
products=[product_smiles],
)
@tag("api", "end2end")
class PathwayPaginationAPITest(BaseTestAPIGetPaginated, TestCase):
"""Pathway pagination tests using base class."""
resource_name = "pathway"
resource_name_plural = "pathways"
global_endpoint = "/api/v1/pathways/"
package_endpoint_template = "/api/v1/package/{uuid}/pathway/"
total_reviewed = 125
total_unreviewed = 35
@classmethod
def create_reviewed_resource(cls, package, idx):
return Pathway.objects.create(
package=package,
name=f"Reviewed Pathway {idx:03d}",
description="Pathway for pagination tests",
)
@classmethod
def create_unreviewed_resource(cls, package, idx):
return Pathway.objects.create(
package=package,
name=f"Draft Pathway {idx:03d}",
description="Pathway for pagination tests",
)
@tag("api", "end2end")
class ModelPaginationAPITest(BaseTestAPIGetPaginated, TestCase):
"""Model pagination tests using base class."""
resource_name = "model"
resource_name_plural = "models"
global_endpoint = "/api/v1/models/"
package_endpoint_template = "/api/v1/package/{uuid}/model/"
total_reviewed = 125
total_unreviewed = 35
@classmethod
def create_reviewed_resource(cls, package, idx):
return EPModel.objects.create(
package=package,
name=f"Reviewed Model {idx:03d}",
description="Model for pagination tests",
)
@classmethod
def create_unreviewed_resource(cls, package, idx):
return EPModel.objects.create(
package=package,
name=f"Draft Model {idx:03d}",
description="Model for pagination tests",
)
@tag("api", "end2end")
class ScenarioPaginationAPITest(BaseTestAPIGetPaginated, TestCase):
"""Scenario pagination tests using base class."""
resource_name = "scenario"
resource_name_plural = "scenarios"
global_endpoint = "/api/v1/scenarios/"
package_endpoint_template = "/api/v1/package/{uuid}/scenario/"
total_reviewed = 125
total_unreviewed = 35
@classmethod
def create_reviewed_resource(cls, package, idx):
return Scenario.create(
package,
f"Reviewed Scenario {idx:03d}",
"Scenario for pagination tests",
"2025-01-01",
"lab",
[],
)
@classmethod
def create_unreviewed_resource(cls, package, idx):
return Scenario.create(
package,
f"Draft Scenario {idx:03d}",
"Scenario for pagination tests",
"2025-01-01",
"field",
[],
)

View File

@ -0,0 +1,301 @@
"""
Tests for Scenario Creation Endpoint Error Handling.
Tests comprehensive error handling for POST /api/v1/package/{uuid}/scenario/
including package not found, permission denied, validation errors, and database errors.
"""
from django.test import TestCase, tag
import json
from uuid import uuid4
from epdb.logic import PackageManager, UserManager
from epdb.models import Scenario
@tag("api", "scenario_creation")
class ScenarioCreationAPITests(TestCase):
"""Test scenario creation endpoint error handling."""
@classmethod
def setUpTestData(cls):
"""Set up test data: users and packages."""
cls.user = UserManager.create_user(
"scenario-test-user",
"scenario-test@envipath.com",
"SuperSafe",
set_setting=False,
add_to_group=False,
is_active=True,
)
cls.other_user = UserManager.create_user(
"other-user",
"other@envipath.com",
"SuperSafe",
set_setting=False,
add_to_group=False,
is_active=True,
)
cls.package = PackageManager.create_package(
cls.user, "Test Package", "Test package for scenario creation"
)
def test_create_scenario_package_not_found(self):
"""Test that non-existent package UUID returns 404."""
self.client.force_login(self.user)
fake_uuid = uuid4()
payload = {
"name": "Test Scenario",
"description": "Test description",
"scenario_date": "2024-01-01",
"scenario_type": "biodegradation",
"additional_information": [],
}
response = self.client.post(
f"/api/v1/package/{fake_uuid}/scenario/",
data=json.dumps(payload),
content_type="application/json",
)
self.assertEqual(response.status_code, 404)
self.assertIn(f"Package with UUID {fake_uuid} not found", response.json()["detail"])
def test_create_scenario_insufficient_permissions(self):
"""Test that unauthorized access returns 403."""
self.client.force_login(self.other_user)
payload = {
"name": "Test Scenario",
"description": "Test description",
"scenario_date": "2024-01-01",
"scenario_type": "biodegradation",
"additional_information": [],
}
response = self.client.post(
f"/api/v1/package/{self.package.uuid}/scenario/",
data=json.dumps(payload),
content_type="application/json",
)
self.assertEqual(response.status_code, 403)
self.assertIn("permission", response.json()["detail"].lower())
def test_create_scenario_invalid_ai_type(self):
"""Test that unknown additional information type returns 400."""
self.client.force_login(self.user)
payload = {
"name": "Test Scenario",
"description": "Test description",
"scenario_date": "2024-01-01",
"scenario_type": "biodegradation",
"additional_information": [
{"type": "invalid_type_that_does_not_exist", "data": {"some_field": "some_value"}}
],
}
response = self.client.post(
f"/api/v1/package/{self.package.uuid}/scenario/",
data=json.dumps(payload),
content_type="application/json",
)
self.assertEqual(response.status_code, 400)
response_data = response.json()
self.assertIn("Validation errors", response_data["detail"])
def test_create_scenario_validation_error(self):
"""Test that invalid additional information data returns 400."""
self.client.force_login(self.user)
# Use malformed data structure for an actual AI type
payload = {
"name": "Test Scenario",
"description": "Test description",
"scenario_date": "2024-01-01",
"scenario_type": "biodegradation",
"additional_information": [
{
"type": "invalid_type_name",
"data": None, # This should cause a validation error
}
],
}
response = self.client.post(
f"/api/v1/package/{self.package.uuid}/scenario/",
data=json.dumps(payload),
content_type="application/json",
)
# Should return 422 for validation errors
self.assertEqual(response.status_code, 422)
def test_create_scenario_success(self):
"""Test that valid scenario creation returns 200."""
self.client.force_login(self.user)
payload = {
"name": "Test Scenario",
"description": "Test description",
"scenario_date": "2024-01-01",
"scenario_type": "biodegradation",
"additional_information": [],
}
response = self.client.post(
f"/api/v1/package/{self.package.uuid}/scenario/",
data=json.dumps(payload),
content_type="application/json",
)
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertEqual(data["name"], "Test Scenario")
self.assertEqual(data["description"], "Test description")
# Verify scenario was actually created
scenario = Scenario.objects.get(name="Test Scenario")
self.assertEqual(scenario.package, self.package)
self.assertEqual(scenario.scenario_type, "biodegradation")
def test_create_scenario_auto_name(self):
"""Test that empty name triggers auto-generation."""
self.client.force_login(self.user)
payload = {
"name": "", # Empty name should be auto-generated
"description": "Test description",
"scenario_date": "2024-01-01",
"scenario_type": "biodegradation",
"additional_information": [],
}
response = self.client.post(
f"/api/v1/package/{self.package.uuid}/scenario/",
data=json.dumps(payload),
content_type="application/json",
)
self.assertEqual(response.status_code, 200)
data = response.json()
# Auto-generated name should follow pattern "Scenario N"
self.assertTrue(data["name"].startswith("Scenario "))
def test_create_scenario_xss_protection(self):
"""Test that XSS attempts are sanitized."""
self.client.force_login(self.user)
payload = {
"name": "<script>alert('xss')</script>Clean Name",
"description": "<img src=x onerror=alert('xss')>Description",
"scenario_date": "2024-01-01",
"scenario_type": "biodegradation",
"additional_information": [],
}
response = self.client.post(
f"/api/v1/package/{self.package.uuid}/scenario/",
data=json.dumps(payload),
content_type="application/json",
)
self.assertEqual(response.status_code, 200)
data = response.json()
# XSS should be cleaned out
self.assertNotIn("<script>", data["name"])
self.assertNotIn("onerror", data["description"])
def test_create_scenario_missing_required_field(self):
"""Test that missing required fields returns validation error."""
self.client.force_login(self.user)
# Missing 'name' field entirely
payload = {
"description": "Test description",
"scenario_date": "2024-01-01",
"scenario_type": "biodegradation",
"additional_information": [],
}
response = self.client.post(
f"/api/v1/package/{self.package.uuid}/scenario/",
data=json.dumps(payload),
content_type="application/json",
)
# Should return 422 for schema validation errors
self.assertEqual(response.status_code, 422)
def test_create_scenario_type_error_in_ai(self):
"""Test that TypeError in AI instantiation returns 400."""
self.client.force_login(self.user)
payload = {
"name": "Test Scenario",
"description": "Test description",
"scenario_date": "2024-01-01",
"scenario_type": "biodegradation",
"additional_information": [
{
"type": "invalid_type_name",
"data": "string instead of dict", # Wrong type
}
],
}
response = self.client.post(
f"/api/v1/package/{self.package.uuid}/scenario/",
data=json.dumps(payload),
content_type="application/json",
)
# Should return 422 for validation errors
self.assertEqual(response.status_code, 422)
def test_create_scenario_default_values(self):
"""Test that default values are applied correctly."""
self.client.force_login(self.user)
# Minimal payload with only name
payload = {"name": "Minimal Scenario"}
response = self.client.post(
f"/api/v1/package/{self.package.uuid}/scenario/",
data=json.dumps(payload),
content_type="application/json",
)
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertEqual(data["name"], "Minimal Scenario")
# Check defaults are applied
scenario = Scenario.objects.get(name="Minimal Scenario")
# Default description from model is "no description"
self.assertIn(scenario.description.lower(), ["", "no description"])
def test_create_scenario_unicode_characters(self):
"""Test that unicode characters are handled properly."""
self.client.force_login(self.user)
payload = {
"name": "Test Scenario 测试 🧪",
"description": "Description with émojis and spëcial çhars",
"scenario_date": "2024-01-01",
"scenario_type": "biodegradation",
"additional_information": [],
}
response = self.client.post(
f"/api/v1/package/{self.package.uuid}/scenario/",
data=json.dumps(payload),
content_type="application/json",
)
self.assertEqual(response.status_code, 200)
data = response.json()
self.assertIn("测试", data["name"])
self.assertIn("émojis", data["description"])

View File

@ -0,0 +1,113 @@
"""
Property-based tests for schema generation.
Tests that verify schema generation works correctly for all models,
regardless of their structure.
"""
import pytest
from typing import Type
from pydantic import BaseModel
from envipy_additional_information import registry, EnviPyModel
from epapi.utils.schema_transformers import build_rjsf_output
class TestSchemaGeneration:
"""Test that all models can generate valid RJSF schemas."""
@pytest.mark.parametrize("model_name,model_cls", list(registry.list_models().items()))
def test_all_models_generate_rjsf(self, model_name: str, model_cls: Type[BaseModel]):
"""Every model in the registry should generate valid RJSF format."""
# Skip non-EnviPyModel classes (parsers, etc.)
if not issubclass(model_cls, EnviPyModel):
pytest.skip(f"{model_name} is not an EnviPyModel")
# Should not raise exception
result = build_rjsf_output(model_cls)
# Verify structure
assert isinstance(result, dict), f"{model_name}: Result should be a dict"
assert "schema" in result, f"{model_name}: Missing 'schema' key"
assert "uiSchema" in result, f"{model_name}: Missing 'uiSchema' key"
assert "formData" in result, f"{model_name}: Missing 'formData' key"
assert "groups" in result, f"{model_name}: Missing 'groups' key"
# Verify types
assert isinstance(result["schema"], dict), f"{model_name}: schema should be dict"
assert isinstance(result["uiSchema"], dict), f"{model_name}: uiSchema should be dict"
assert isinstance(result["formData"], dict), f"{model_name}: formData should be dict"
assert isinstance(result["groups"], list), f"{model_name}: groups should be list"
# Verify schema has properties
assert "properties" in result["schema"], f"{model_name}: schema should have 'properties'"
assert isinstance(result["schema"]["properties"], dict), (
f"{model_name}: properties should be dict"
)
@pytest.mark.parametrize("model_name,model_cls", list(registry.list_models().items()))
def test_ui_schema_matches_schema_fields(self, model_name: str, model_cls: Type[BaseModel]):
"""uiSchema keys should match schema properties (or be nested for intervals)."""
if not issubclass(model_cls, EnviPyModel):
pytest.skip(f"{model_name} is not an EnviPyModel")
result = build_rjsf_output(model_cls)
schema_props = set(result["schema"]["properties"].keys())
ui_schema_keys = set(result["uiSchema"].keys())
# uiSchema should have entries for all top-level properties
# (intervals may have nested start/end, but the main field should be present)
assert ui_schema_keys.issubset(schema_props), (
f"{model_name}: uiSchema has keys not in schema: {ui_schema_keys - schema_props}"
)
@pytest.mark.parametrize("model_name,model_cls", list(registry.list_models().items()))
def test_groups_is_list_of_strings(self, model_name: str, model_cls: Type[BaseModel]):
"""Groups should be a list of strings."""
if not issubclass(model_cls, EnviPyModel):
pytest.skip(f"{model_name} is not an EnviPyModel")
result = build_rjsf_output(model_cls)
groups = result["groups"]
assert isinstance(groups, list), f"{model_name}: groups should be list"
assert all(isinstance(g, str) for g in groups), (
f"{model_name}: all groups should be strings, got {groups}"
)
@pytest.mark.parametrize("model_name,model_cls", list(registry.list_models().items()))
def test_form_data_matches_schema(self, model_name: str, model_cls: Type[BaseModel]):
"""formData keys should match schema properties."""
if not issubclass(model_cls, EnviPyModel):
pytest.skip(f"{model_name} is not an EnviPyModel")
result = build_rjsf_output(model_cls)
schema_props = set(result["schema"]["properties"].keys())
form_data_keys = set(result["formData"].keys())
# formData should only contain keys that are in schema
assert form_data_keys.issubset(schema_props), (
f"{model_name}: formData has keys not in schema: {form_data_keys - schema_props}"
)
class TestWidgetTypes:
"""Test that widget types are valid."""
@pytest.mark.parametrize("model_name,model_cls", list(registry.list_models().items()))
def test_widget_types_are_valid(self, model_name: str, model_cls: Type[BaseModel]):
"""All widget types in uiSchema should be valid WidgetType values."""
from envipy_additional_information.ui_config import WidgetType
if not issubclass(model_cls, EnviPyModel):
pytest.skip(f"{model_name} is not an EnviPyModel")
result = build_rjsf_output(model_cls)
valid_widgets = {wt.value for wt in WidgetType}
for field_name, ui_config in result["uiSchema"].items():
widget = ui_config.get("ui:widget")
if widget:
assert widget in valid_widgets, (
f"{model_name}.{field_name}: Invalid widget '{widget}'. Valid: {valid_widgets}"
)

View File

@ -0,0 +1,94 @@
from datetime import timedelta
from django.test import TestCase, tag
from django.utils import timezone
from epdb.logic import PackageManager, UserManager
from epdb.models import APIToken
@tag("api", "auth")
class BearerTokenAuthTests(TestCase):
@classmethod
def setUpTestData(cls):
cls.user = UserManager.create_user(
"token-user",
"token-user@envipath.com",
"SuperSafe",
set_setting=False,
add_to_group=False,
is_active=True,
)
default_pkg = cls.user.default_package
cls.user.default_package = None
cls.user.save()
if default_pkg:
default_pkg.delete()
cls.unreviewed_package = PackageManager.create_package(
cls.user, "Token Auth Package", "Package for token auth tests"
)
def _auth_header(self, raw_token):
return {"HTTP_AUTHORIZATION": f"Bearer {raw_token}"}
def test_valid_token_allows_access(self):
_, raw_token = APIToken.create_token(self.user, name="Valid Token", expires_days=1)
response = self.client.get("/api/v1/compounds/", **self._auth_header(raw_token))
self.assertEqual(response.status_code, 200)
def test_expired_token_rejected(self):
token, raw_token = APIToken.create_token(self.user, name="Expired Token", expires_days=1)
token.expires_at = timezone.now() - timedelta(days=1)
token.save(update_fields=["expires_at"])
response = self.client.get("/api/v1/compounds/", **self._auth_header(raw_token))
self.assertEqual(response.status_code, 401)
def test_inactive_token_rejected(self):
token, raw_token = APIToken.create_token(self.user, name="Inactive Token", expires_days=1)
token.is_active = False
token.save(update_fields=["is_active"])
response = self.client.get("/api/v1/compounds/", **self._auth_header(raw_token))
self.assertEqual(response.status_code, 401)
def test_invalid_token_rejected(self):
response = self.client.get("/api/v1/compounds/", HTTP_AUTHORIZATION="Bearer invalid-token")
self.assertEqual(response.status_code, 401)
def test_no_token_rejected(self):
self.client.logout()
response = self.client.get("/api/v1/compounds/")
self.assertEqual(response.status_code, 401)
def test_bearer_populates_request_user_for_packages(self):
response = self.client.get("/api/v1/packages/")
self.assertEqual(response.status_code, 200)
payload = response.json()
uuids = {item["uuid"] for item in payload["items"]}
self.assertNotIn(str(self.unreviewed_package.uuid), uuids)
_, raw_token = APIToken.create_token(self.user, name="Package Token", expires_days=1)
response = self.client.get("/api/v1/packages/", **self._auth_header(raw_token))
self.assertEqual(response.status_code, 200)
payload = response.json()
uuids = {item["uuid"] for item in payload["items"]}
self.assertIn(str(self.unreviewed_package.uuid), uuids)
def test_session_auth_still_works_without_bearer(self):
self.client.force_login(self.user)
response = self.client.get("/api/v1/packages/")
self.assertEqual(response.status_code, 200)
payload = response.json()
uuids = {item["uuid"] for item in payload["items"]}
self.assertIn(str(self.unreviewed_package.uuid), uuids)

0
epapi/utils/__init__.py Normal file
View File

View File

@ -0,0 +1,181 @@
"""
Schema transformation utilities for converting Pydantic models to RJSF format.
This module provides functions to extract UI configuration from Pydantic models
and transform them into React JSON Schema Form (RJSF) compatible format.
"""
from typing import Type, Optional, Any
import jsonref
from pydantic import BaseModel
from envipy_additional_information.ui_config import UIConfig
from envipy_additional_information import registry
def extract_groups(model_cls: Type[BaseModel]) -> list[str]:
"""
Extract groups from registry-stored group information.
Args:
model_cls: The model class
Returns:
List of group names the model belongs to
"""
return registry.get_groups(model_cls)
def extract_ui_metadata(model_cls: Type[BaseModel]) -> dict[str, Any]:
"""
Extract model-level UI metadata from UI class.
Returns metadata attributes that are NOT UIConfig instances.
Common metadata includes: unit, description, title.
"""
metadata: dict[str, Any] = {}
if not hasattr(model_cls, "UI"):
return metadata
ui_class = getattr(model_cls, "UI")
# Iterate over all attributes in the UI class
for attr_name in dir(ui_class):
# Skip private attributes
if attr_name.startswith("_"):
continue
# Get the attribute value
try:
attr_value = getattr(ui_class, attr_name)
except AttributeError:
continue
# Skip callables but keep types/classes
if callable(attr_value) and not isinstance(attr_value, type):
continue
# Skip UIConfig instances (these are field-level configs, not metadata)
# This includes both UIConfig and IntervalConfig
if isinstance(attr_value, UIConfig):
continue
metadata[attr_name] = attr_value
return metadata
def extract_ui_config_from_model(model_cls: Type[BaseModel]) -> dict[str, Any]:
"""
Extract UI configuration from model's UI class.
Returns a dictionary mapping field names to their UI schema configurations.
Trusts the config classes to handle their own transformation logic.
"""
ui_configs: dict[str, Any] = {}
if not hasattr(model_cls, "UI"):
return ui_configs
ui_class = getattr(model_cls, "UI")
schema = model_cls.model_json_schema()
field_names = schema.get("properties", {}).keys()
# Extract config for each field
for field_name in field_names:
# Skip if UI config doesn't exist for this field (field may be hidden from UI)
if not hasattr(ui_class, field_name):
continue
ui_config = getattr(ui_class, field_name)
if isinstance(ui_config, UIConfig):
ui_configs[field_name] = ui_config.to_ui_schema_field()
return ui_configs
def build_ui_schema(model_cls: Type[BaseModel]) -> dict:
"""Generate RJSF uiSchema from model's UI class."""
ui_schema = {}
# Extract field-level UI configs
field_configs = extract_ui_config_from_model(model_cls)
for field_name, config in field_configs.items():
ui_schema[field_name] = config
return ui_schema
def build_schema(model_cls: Type[BaseModel]) -> dict[str, Any]:
"""
Build JSON schema from Pydantic model, applying UI metadata.
Dereferences all $ref pointers to produce fully inlined schema.
This ensures the frontend receives schemas with enum values and nested
properties fully resolved, without needing client-side ref resolution.
Extracts model-level metadata from UI class (title, unit, etc.) and applies
it to the generated schema. This ensures UI metadata is the single source of truth.
"""
schema = model_cls.model_json_schema()
# Dereference $ref pointers (inlines $defs) using jsonref
# This ensures the frontend receives schemas with enum values and nested
# properties fully resolved, currently necessary for client-side rendering.
# FIXME: This is a hack to get the schema to work with alpine schema-form.js replace once we migrate to client-side framework.
schema = jsonref.replace_refs(schema, proxies=False)
# Remove $defs section as all refs are now inlined
if "$defs" in schema:
del schema["$defs"]
# Extract and apply UI metadata (title, unit, description, etc.)
ui_metadata = extract_ui_metadata(model_cls)
# Apply all metadata consistently as custom properties with x- prefix
# This ensures consistency and avoids conflicts with standard JSON Schema properties
for key, value in ui_metadata.items():
if value is not None:
schema[f"x-{key}"] = value
# Set standard title property from UI metadata for JSON Schema compliance
if "title" in ui_metadata:
schema["title"] = ui_metadata["title"]
elif "label" in ui_metadata:
schema["title"] = ui_metadata["label"]
return schema
def build_rjsf_output(model_cls: Type[BaseModel], initial_data: Optional[dict] = None) -> dict:
"""
Main function that returns complete RJSF format.
Trusts the config classes to handle their own transformation logic.
No special-case handling - if a config knows how to transform itself, it will.
Returns:
dict with keys: schema, uiSchema, formData, groups
"""
# Build schema with UI metadata applied
schema = build_schema(model_cls)
# Build UI schema - config classes handle their own transformation
ui_schema = build_ui_schema(model_cls)
# Extract groups from marker interfaces
groups = extract_groups(model_cls)
# Use provided initial_data or empty dict
form_data = initial_data if initial_data is not None else {}
return {
"schema": schema,
"uiSchema": ui_schema,
"formData": form_data,
"groups": groups,
}

View File

@ -0,0 +1,82 @@
"""Shared utilities for handling Pydantic validation errors."""
import json
from pydantic import ValidationError
from pydantic_core import ErrorDetails
from ninja.errors import HttpError
def format_validation_error(error: ErrorDetails) -> str:
"""Format a Pydantic validation error into a user-friendly message.
Args:
error: A Pydantic error details dictionary containing 'msg', 'type', 'ctx', etc.
Returns:
A user-friendly error message string.
"""
msg = error.get("msg") or "Invalid value"
error_type = error.get("type") or ""
# Handle common validation types with friendly messages
if error_type == "enum":
ctx = error.get("ctx", {})
expected = ctx.get("expected", "") if ctx else ""
return f"Please select a valid option{': ' + expected if expected else ''}"
elif error_type == "literal_error":
# Literal errors (like Literal["active", "inactive"])
return msg.replace("Input should be ", "Please enter ")
elif error_type == "missing":
return "This field is required"
elif error_type == "string_type":
return "Please enter a valid string"
elif error_type == "int_type":
return "Please enter a valid int"
elif error_type == "int_parsing":
return "Please enter a valid int"
elif error_type == "float_type":
return "Please enter a valid float"
elif error_type == "float_parsing":
return "Please enter a valid float"
elif error_type == "value_error":
# Strip "Value error, " prefix from custom validator messages
return msg.replace("Value error, ", "")
else:
# Default: use the message from Pydantic but clean it up
return msg.replace("Input should be ", "Please enter ").replace("Value error, ", "")
def handle_validation_error(e: ValidationError) -> None:
"""Convert a Pydantic ValidationError into a structured HttpError.
This function transforms Pydantic validation errors into a JSON structure
that the frontend expects for displaying field-level errors.
Args:
e: The Pydantic ValidationError to handle.
Raises:
HttpError: Always raises a 400 error with structured JSON containing
type, field_errors, and message fields.
"""
# Transform Pydantic validation errors into user-friendly format
field_errors: dict[str, list[str]] = {}
for error in e.errors():
# Get the field name from location tuple
loc = error.get("loc", ())
field = str(loc[-1]) if loc else "root"
# Format the error message
friendly_msg = format_validation_error(error)
if field not in field_errors:
field_errors[field] = []
field_errors[field].append(friendly_msg)
# Return structured error for frontend parsing
error_response = {
"type": "validation_error",
"field_errors": field_errors,
"message": "Please correct the errors below",
}
raise HttpError(400, json.dumps(error_response))

0
epapi/v1/__init__.py Normal file
View File

34
epapi/v1/auth.py Normal file
View File

@ -0,0 +1,34 @@
import hashlib
from ninja.security import HttpBearer
from ninja.errors import HttpError
from epdb.models import APIToken
class BearerTokenAuth(HttpBearer):
def authenticate(self, request, token):
if token is None:
return None
hashed_token = hashlib.sha256(token.encode()).hexdigest()
user = APIToken.authenticate(hashed_token, hashed=True)
if not user:
raise HttpError(401, "Invalid or expired token")
request.user = user
return user
class OptionalBearerTokenAuth:
"""Bearer auth that allows unauthenticated access.
Validates the Bearer token if present (401 on invalid token),
otherwise lets the request through for anonymous/session access.
"""
def __init__(self):
self._bearer = BearerTokenAuth()
def __call__(self, request):
return self._bearer(request) or request.user

141
epapi/v1/dal.py Normal file
View File

@ -0,0 +1,141 @@
from uuid import UUID
from django.conf import settings as s
from django.db.models import Model
from epdb.logic import PackageManager
from epdb.models import CompoundStructure, User, Compound, Scenario
from .errors import EPAPINotFoundError, EPAPIPermissionDeniedError
Package = s.GET_PACKAGE_MODEL()
def get_compound_for_read(user, compound_uuid: UUID):
"""
Get compound by UUID with permission check.
"""
try:
compound = Compound.objects.get(uuid=compound_uuid)
package = compound.package
except Compound.DoesNotExist:
raise EPAPINotFoundError(f"Compound with UUID {compound_uuid} not found")
# FIXME: optimize package manager to exclusively work with UUIDs
if not user or user.is_anonymous or not PackageManager.readable(user, package):
raise EPAPIPermissionDeniedError("Insufficient permissions to access this compound.")
return compound
def get_package_for_read(user, package_uuid: UUID):
"""
Get package by UUID with permission check.
"""
# FIXME: update package manager with custom exceptions to avoid manual checks here
try:
package = Package.objects.get(uuid=package_uuid)
except Package.DoesNotExist:
raise EPAPINotFoundError(f"Package with UUID {package_uuid} not found")
# FIXME: optimize package manager to exclusively work with UUIDs
if not user or user.is_anonymous or not PackageManager.readable(user, package):
raise EPAPIPermissionDeniedError("Insufficient permissions to access this package.")
return package
def get_package_for_write(user, package_uuid: UUID):
"""
Get package by UUID with permission check.
"""
# FIXME: update package manager with custom exceptions to avoid manual checks here
try:
package = Package.objects.get(uuid=package_uuid)
except Package.DoesNotExist:
raise EPAPINotFoundError(f"Package with UUID {package_uuid} not found")
# FIXME: optimize package manager to exclusively work with UUIDs
if not user or user.is_anonymous or not PackageManager.writable(user, package):
raise EPAPIPermissionDeniedError("Insufficient permissions to access this package.")
return package
def get_scenario_for_read(user, scenario_uuid: UUID):
"""Get scenario by UUID with read permission check."""
try:
scenario = Scenario.objects.select_related("package").get(uuid=scenario_uuid)
except Scenario.DoesNotExist:
raise EPAPINotFoundError(f"Scenario with UUID {scenario_uuid} not found")
if not user or user.is_anonymous or not PackageManager.readable(user, scenario.package):
raise EPAPIPermissionDeniedError("Insufficient permissions to access this scenario.")
return scenario
def get_scenario_for_write(user, scenario_uuid: UUID):
"""Get scenario by UUID with write permission check."""
try:
scenario = Scenario.objects.select_related("package").get(uuid=scenario_uuid)
except Scenario.DoesNotExist:
raise EPAPINotFoundError(f"Scenario with UUID {scenario_uuid} not found")
if not user or user.is_anonymous or not PackageManager.writable(user, scenario.package):
raise EPAPIPermissionDeniedError("Insufficient permissions to modify this scenario.")
return scenario
def get_user_packages_for_read(user: User | None):
"""Get all packages readable by the user."""
if not user or user.is_anonymous:
return PackageManager.get_reviewed_packages()
return PackageManager.get_all_readable_packages(user, include_reviewed=True)
def get_user_entities_for_read(model_class: Model, user: User | None):
"""Build queryset for reviewed package entities."""
if not user or user.is_anonymous:
return model_class.objects.filter(package__reviewed=True).select_related("package")
qs = model_class.objects.filter(
package__in=PackageManager.get_all_readable_packages(user, include_reviewed=True)
).select_related("package")
return qs
def get_package_entities_for_read(model_class: Model, package_uuid: UUID, user: User | None = None):
"""Build queryset for specific package entities."""
package = get_package_for_read(user, package_uuid)
qs = model_class.objects.filter(package=package).select_related("package")
return qs
def get_user_structure_for_read(user: User | None):
"""Build queryset for structures accessible to the user (via compound->package)."""
if not user or user.is_anonymous:
return CompoundStructure.objects.filter(compound__package__reviewed=True).select_related(
"compound__package"
)
qs = CompoundStructure.objects.filter(
compound__package__in=PackageManager.get_all_readable_packages(user, include_reviewed=True)
).select_related("compound__package")
return qs
def get_package_compound_structure_for_read(
package_uuid: UUID, compound_uuid: UUID, user: User | None = None
):
"""Build queryset for specific package compound structures."""
get_package_for_read(user, package_uuid)
compound = get_compound_for_read(user, compound_uuid)
qs = CompoundStructure.objects.filter(compound=compound).select_related("compound__package")
return qs

View File

View File

@ -0,0 +1,174 @@
from ninja import Router, Body
from ninja.errors import HttpError
from uuid import UUID
from pydantic import ValidationError
from typing import Dict, Any
import logging
from envipy_additional_information import registry
from envipy_additional_information.groups import GroupEnum
from epapi.utils.schema_transformers import build_rjsf_output
from epapi.utils.validation_errors import handle_validation_error
from epdb.models import AdditionalInformation
from ..dal import get_scenario_for_read, get_scenario_for_write
logger = logging.getLogger(__name__)
router = Router(tags=["Additional Information"])
@router.get("/information/schema/")
def list_all_schemas(request):
"""Return all schemas in RJSF format with lowercase class names as keys."""
result = {}
for name, cls in registry.list_models().items():
try:
result[name] = build_rjsf_output(cls)
except Exception as e:
logger.warning(f"Failed to generate schema for {name}: {e}")
continue
return result
@router.get("/information/schema/{model_name}/")
def get_model_schema(request, model_name: str):
"""Return RJSF schema for specific model."""
cls = registry.get_model(model_name.lower())
if not cls:
raise HttpError(404, f"Unknown model: {model_name}")
return build_rjsf_output(cls)
@router.get("/scenario/{uuid:scenario_uuid}/information/")
def list_scenario_info(request, scenario_uuid: UUID):
"""List all additional information for a scenario"""
scenario = get_scenario_for_read(request.user, scenario_uuid)
result = []
for ai in AdditionalInformation.objects.filter(scenario=scenario):
result.append(
{
"type": ai.get().__class__.__name__,
"uuid": getattr(ai, "uuid", None),
"data": ai.data,
"attach_object": ai.content_object.simple_json() if ai.content_object else None,
}
)
return result
@router.post("/scenario/{uuid:scenario_uuid}/information/{model_name}/")
def add_scenario_info(
request, scenario_uuid: UUID, model_name: str, payload: Dict[str, Any] = Body(...)
):
"""Add new additional information to scenario"""
cls = registry.get_model(model_name.lower())
if not cls:
raise HttpError(404, f"Unknown model: {model_name}")
try:
instance = cls(**payload) # Pydantic validates
except ValidationError as e:
handle_validation_error(e)
scenario = get_scenario_for_write(request.user, scenario_uuid)
# Model method now returns the UUID
created_uuid = scenario.add_additional_information(instance)
return {"status": "created", "uuid": created_uuid}
@router.patch("/scenario/{uuid:scenario_uuid}/information/item/{uuid:ai_uuid}/")
def update_scenario_info(
request, scenario_uuid: UUID, ai_uuid: UUID, payload: Dict[str, Any] = Body(...)
):
"""Update existing additional information for a scenario"""
scenario = get_scenario_for_write(request.user, scenario_uuid)
ai_uuid_str = str(ai_uuid)
ai = AdditionalInformation.objects.filter(uuid=ai_uuid_str, scenario=scenario)
if not ai.exists():
raise HttpError(404, f"Additional information with UUID {ai_uuid} not found")
ai = ai.first()
# Get the model class for validation
cls = registry.get_model(ai.type.lower())
if not cls:
raise HttpError(500, f"Unknown model type in data: {ai.type}")
# Validate the payload against the model
try:
instance = cls(**payload)
except ValidationError as e:
handle_validation_error(e)
# Use model method for update
try:
scenario.update_additional_information(ai_uuid_str, instance)
except ValueError as e:
raise HttpError(404, str(e))
return {"status": "updated", "uuid": ai_uuid_str}
@router.delete("/scenario/{uuid:scenario_uuid}/information/item/{uuid:ai_uuid}/")
def delete_scenario_info(request, scenario_uuid: UUID, ai_uuid: UUID):
"""Delete additional information from scenario"""
scenario = get_scenario_for_write(request.user, scenario_uuid)
try:
scenario.remove_additional_information(str(ai_uuid))
except ValueError as e:
raise HttpError(404, str(e))
return {"status": "deleted"}
@router.get("/information/groups/")
def list_groups(request):
"""Return list of available group names."""
return {"groups": GroupEnum.values()}
@router.get("/information/groups/{group_name}/")
def get_group_models(request, group_name: str):
"""
Return models for a specific group organized by subcategory.
Args:
group_name: One of "sludge", "soil", or "sediment" (string)
Returns:
Dictionary with subcategories (exp, spike, comp, misc, or group name)
as keys and lists of model info as values
"""
# Convert string to enum (raises ValueError if invalid)
try:
group_enum = GroupEnum(group_name)
except ValueError:
valid = ", ".join(GroupEnum.values())
raise HttpError(400, f"Invalid group '{group_name}'. Valid: {valid}")
try:
group_data = registry.collect_group(group_enum)
except (ValueError, TypeError) as e:
raise HttpError(400, str(e))
result = {}
for subcategory, models in group_data.items():
result[subcategory] = [
{
"name": cls.__name__.lower(),
"class": cls.__name__,
"title": getattr(cls.UI, "title", cls.__name__)
if hasattr(cls, "UI")
else cls.__name__,
}
for cls in models
]
return result

View File

@ -0,0 +1,41 @@
from django.conf import settings as s
from ninja import Router
from ninja_extra.pagination import paginate
from uuid import UUID
from epdb.models import Compound
from ..pagination import EnhancedPageNumberPagination
from ..schemas import CompoundOutSchema, ReviewStatusFilter
from ..dal import get_user_entities_for_read, get_package_entities_for_read
router = Router()
@router.get("/compounds/", response=EnhancedPageNumberPagination.Output[CompoundOutSchema])
@paginate(
EnhancedPageNumberPagination,
page_size=s.API_PAGINATION_DEFAULT_PAGE_SIZE,
filter_schema=ReviewStatusFilter,
)
def list_all_compounds(request):
"""
List all compounds from reviewed packages.
"""
return get_user_entities_for_read(Compound, request.user).order_by("name").all()
@router.get(
"/package/{uuid:package_uuid}/compound/",
response=EnhancedPageNumberPagination.Output[CompoundOutSchema],
)
@paginate(
EnhancedPageNumberPagination,
page_size=s.API_PAGINATION_DEFAULT_PAGE_SIZE,
filter_schema=ReviewStatusFilter,
)
def list_package_compounds(request, package_uuid: UUID):
"""
List all compounds for a specific package.
"""
user = request.user
return get_package_entities_for_read(Compound, package_uuid, user).order_by("name").all()

View File

@ -0,0 +1,23 @@
from django.conf import settings as s
from ninja import Router
from ninja_extra.pagination import paginate
from epdb.logic import GroupManager
from ..pagination import EnhancedPageNumberPagination
from ..schemas import GroupOutSchema
router = Router()
@router.get("/groups/", response=EnhancedPageNumberPagination.Output[GroupOutSchema])
@paginate(
EnhancedPageNumberPagination,
page_size=s.API_PAGINATION_DEFAULT_PAGE_SIZE,
)
def list_all_groups(request):
"""
List all groups the user has access to.
"""
user = request.user
return GroupManager.get_groups(user)

View File

@ -0,0 +1,41 @@
from django.conf import settings as s
from ninja import Router
from ninja_extra.pagination import paginate
from uuid import UUID
from epdb.models import EPModel
from ..pagination import EnhancedPageNumberPagination
from ..schemas import ModelOutSchema, ReviewStatusFilter
from ..dal import get_user_entities_for_read, get_package_entities_for_read
router = Router()
@router.get("/models/", response=EnhancedPageNumberPagination.Output[ModelOutSchema])
@paginate(
EnhancedPageNumberPagination,
page_size=s.API_PAGINATION_DEFAULT_PAGE_SIZE,
filter_schema=ReviewStatusFilter,
)
def list_all_models(request):
"""
List all models from reviewed packages.
"""
return get_user_entities_for_read(EPModel, request.user).order_by("name").all()
@router.get(
"/package/{uuid:package_uuid}/model/",
response=EnhancedPageNumberPagination.Output[ModelOutSchema],
)
@paginate(
EnhancedPageNumberPagination,
page_size=s.API_PAGINATION_DEFAULT_PAGE_SIZE,
filter_schema=ReviewStatusFilter,
)
def list_package_models(request, package_uuid: UUID):
"""
List all models for a specific package.
"""
user = request.user
return get_package_entities_for_read(EPModel, package_uuid, user).order_by("name").all()

View File

@ -0,0 +1,32 @@
from django.conf import settings as s
from ninja import Router
from ninja_extra.pagination import paginate
import logging
from ..auth import OptionalBearerTokenAuth
from ..dal import get_user_packages_for_read
from ..pagination import EnhancedPageNumberPagination
from ..schemas import PackageOutSchema, SelfReviewStatusFilter
router = Router()
logger = logging.getLogger(__name__)
@router.get(
"/packages/",
response=EnhancedPageNumberPagination.Output[PackageOutSchema],
auth=OptionalBearerTokenAuth(),
)
@paginate(
EnhancedPageNumberPagination,
page_size=s.API_PAGINATION_DEFAULT_PAGE_SIZE,
filter_schema=SelfReviewStatusFilter,
)
def list_all_packages(request):
"""
List packages accessible to the user.
"""
user = request.user
qs = get_user_packages_for_read(user)
return qs.order_by("name").all()

View File

@ -0,0 +1,42 @@
from django.conf import settings as s
from ninja import Router
from ninja_extra.pagination import paginate
from uuid import UUID
from epdb.models import Pathway
from ..pagination import EnhancedPageNumberPagination
from ..schemas import PathwayOutSchema, ReviewStatusFilter
from ..dal import get_user_entities_for_read, get_package_entities_for_read
router = Router()
@router.get("/pathways/", response=EnhancedPageNumberPagination.Output[PathwayOutSchema])
@paginate(
EnhancedPageNumberPagination,
page_size=s.API_PAGINATION_DEFAULT_PAGE_SIZE,
filter_schema=ReviewStatusFilter,
)
def list_all_pathways(request):
"""
List all pathways from reviewed packages.
"""
user = request.user
return get_user_entities_for_read(Pathway, user).order_by("name").all()
@router.get(
"/package/{uuid:package_uuid}/pathway/",
response=EnhancedPageNumberPagination.Output[PathwayOutSchema],
)
@paginate(
EnhancedPageNumberPagination,
page_size=s.API_PAGINATION_DEFAULT_PAGE_SIZE,
filter_schema=ReviewStatusFilter,
)
def list_package_pathways(request, package_uuid: UUID):
"""
List all pathways for a specific package.
"""
user = request.user
return get_package_entities_for_read(Pathway, package_uuid, user).order_by("name").all()

View File

@ -0,0 +1,42 @@
from django.conf import settings as s
from ninja import Router
from ninja_extra.pagination import paginate
from uuid import UUID
from epdb.models import Reaction
from ..pagination import EnhancedPageNumberPagination
from ..schemas import ReactionOutSchema, ReviewStatusFilter
from ..dal import get_user_entities_for_read, get_package_entities_for_read
router = Router()
@router.get("/reactions/", response=EnhancedPageNumberPagination.Output[ReactionOutSchema])
@paginate(
EnhancedPageNumberPagination,
page_size=s.API_PAGINATION_DEFAULT_PAGE_SIZE,
filter_schema=ReviewStatusFilter,
)
def list_all_reactions(request):
"""
List all reactions from reviewed packages.
"""
user = request.user
return get_user_entities_for_read(Reaction, user).order_by("name").all()
@router.get(
"/package/{uuid:package_uuid}/reaction/",
response=EnhancedPageNumberPagination.Output[ReactionOutSchema],
)
@paginate(
EnhancedPageNumberPagination,
page_size=s.API_PAGINATION_DEFAULT_PAGE_SIZE,
filter_schema=ReviewStatusFilter,
)
def list_package_reactions(request, package_uuid: UUID):
"""
List all reactions for a specific package.
"""
user = request.user
return get_package_entities_for_read(Reaction, package_uuid, user).order_by("name").all()

View File

@ -0,0 +1,42 @@
from django.conf import settings as s
from ninja import Router
from ninja_extra.pagination import paginate
from uuid import UUID
from epdb.models import Rule
from ..pagination import EnhancedPageNumberPagination
from ..schemas import ReviewStatusFilter, RuleOutSchema
from ..dal import get_user_entities_for_read, get_package_entities_for_read
router = Router()
@router.get("/rules/", response=EnhancedPageNumberPagination.Output[RuleOutSchema])
@paginate(
EnhancedPageNumberPagination,
page_size=s.API_PAGINATION_DEFAULT_PAGE_SIZE,
filter_schema=ReviewStatusFilter,
)
def list_all_rules(request):
"""
List all rules from reviewed packages.
"""
user = request.user
return get_user_entities_for_read(Rule, user).order_by("name").all()
@router.get(
"/package/{uuid:package_uuid}/rule/",
response=EnhancedPageNumberPagination.Output[RuleOutSchema],
)
@paginate(
EnhancedPageNumberPagination,
page_size=s.API_PAGINATION_DEFAULT_PAGE_SIZE,
filter_schema=ReviewStatusFilter,
)
def list_package_rules(request, package_uuid: UUID):
"""
List all rules for a specific package.
"""
user = request.user
return get_package_entities_for_read(Rule, package_uuid, user).order_by("name").all()

View File

@ -0,0 +1,129 @@
from django.conf import settings as s
from django.db import IntegrityError, OperationalError, DatabaseError
from ninja import Router, Body
from ninja.errors import HttpError
from ninja_extra.pagination import paginate
from uuid import UUID
from pydantic import ValidationError
import logging
import json
from epdb.models import Scenario
from epdb.views import _anonymous_or_real
from ..pagination import EnhancedPageNumberPagination
from ..schemas import (
ReviewStatusFilter,
ScenarioOutSchema,
ScenarioCreateSchema,
)
from ..dal import get_user_entities_for_read, get_package_entities_for_read, get_package_for_write
from envipy_additional_information import registry
logger = logging.getLogger(__name__)
router = Router()
@router.get("/scenarios/", response=EnhancedPageNumberPagination.Output[ScenarioOutSchema])
@paginate(
EnhancedPageNumberPagination,
page_size=s.API_PAGINATION_DEFAULT_PAGE_SIZE,
filter_schema=ReviewStatusFilter,
)
def list_all_scenarios(request):
user = request.user
items = get_user_entities_for_read(Scenario, user)
return items.order_by("name").all()
@router.get(
"/package/{uuid:package_uuid}/scenario/",
response=EnhancedPageNumberPagination.Output[ScenarioOutSchema],
)
@paginate(
EnhancedPageNumberPagination,
page_size=s.API_PAGINATION_DEFAULT_PAGE_SIZE,
filter_schema=ReviewStatusFilter,
)
def list_package_scenarios(request, package_uuid: UUID):
user = request.user
items = get_package_entities_for_read(Scenario, package_uuid, user)
return items.order_by("name").all()
@router.post("/package/{uuid:package_uuid}/scenario/", response=ScenarioOutSchema)
def create_scenario(request, package_uuid: UUID, payload: ScenarioCreateSchema = Body(...)):
"""Create a new scenario with optional additional information."""
user = _anonymous_or_real(request)
try:
current_package = get_package_for_write(user, package_uuid)
except ValueError as e:
error_msg = str(e)
if "does not exist" in error_msg:
raise HttpError(404, f"Package not found: {package_uuid}")
elif "Insufficient permissions" in error_msg:
raise HttpError(403, "You do not have permission to access this package")
else:
logger.error(f"Unexpected ValueError from get_package_by_id: {error_msg}")
raise HttpError(400, "Invalid package request")
# Build additional information models from payload
additional_information_models = []
validation_errors = []
for ai_item in payload.additional_information:
# Get model class from registry
model_cls = registry.get_model(ai_item.type.lower())
if not model_cls:
validation_errors.append(f"Unknown additional information type: {ai_item.type}")
continue
try:
# Validate and create model instance
instance = model_cls(**ai_item.data)
additional_information_models.append(instance)
except ValidationError as e:
# Collect validation errors to return to user
error_messages = [err.get("msg", "Validation error") for err in e.errors()]
validation_errors.append(f"{ai_item.type}: {', '.join(error_messages)}")
except (TypeError, AttributeError, KeyError) as e:
logger.warning(f"Failed to instantiate {ai_item.type} model: {str(e)}")
validation_errors.append(f"{ai_item.type}: Invalid data structure - {str(e)}")
except Exception as e:
logger.error(f"Unexpected error instantiating {ai_item.type}: {str(e)}")
validation_errors.append(f"{ai_item.type}: Failed to process - please check your data")
# If there are validation errors, return them
if validation_errors:
raise HttpError(
400,
json.dumps(
{
"error": "Validation errors in additional information",
"details": validation_errors,
}
),
)
# Create scenario using the existing Scenario.create method
try:
new_scenario = Scenario.create(
package=current_package,
name=payload.name,
description=payload.description,
scenario_date=payload.scenario_date,
scenario_type=payload.scenario_type,
additional_information=additional_information_models,
)
except IntegrityError as e:
logger.error(f"Database integrity error creating scenario: {str(e)}")
raise HttpError(400, "Scenario creation failed - data constraint violation")
except OperationalError as e:
logger.error(f"Database operational error creating scenario: {str(e)}")
raise HttpError(503, "Database temporarily unavailable - please try again")
except (DatabaseError, AttributeError) as e:
logger.error(f"Error creating scenario: {str(e)}")
raise HttpError(500, "Failed to create scenario due to database error")
return new_scenario

View File

@ -0,0 +1,23 @@
from django.conf import settings as s
from ninja import Router
from ninja_extra.pagination import paginate
from epdb.logic import SettingManager
from ..pagination import EnhancedPageNumberPagination
from ..schemas import SettingOutSchema
router = Router()
@router.get("/settings/", response=EnhancedPageNumberPagination.Output[SettingOutSchema])
@paginate(
EnhancedPageNumberPagination,
page_size=s.API_PAGINATION_DEFAULT_PAGE_SIZE,
)
def list_all_settings(request):
"""
List all settings the user has access to.
"""
user = request.user
return SettingManager.get_all_settings(user)

View File

@ -0,0 +1,50 @@
from django.conf import settings as s
from ninja import Router
from ninja_extra.pagination import paginate
from uuid import UUID
from ..pagination import EnhancedPageNumberPagination
from ..schemas import CompoundStructureOutSchema, StructureReviewStatusFilter
from ..dal import (
get_user_structure_for_read,
get_package_compound_structure_for_read,
)
router = Router()
@router.get(
"/structures/", response=EnhancedPageNumberPagination.Output[CompoundStructureOutSchema]
)
@paginate(
EnhancedPageNumberPagination,
page_size=s.API_PAGINATION_DEFAULT_PAGE_SIZE,
filter_schema=StructureReviewStatusFilter,
)
def list_all_structures(request):
"""
List all structures from all packages.
"""
user = request.user
return get_user_structure_for_read(user).order_by("name").all()
@router.get(
"/package/{uuid:package_uuid}/compound/{uuid:compound_uuid}/structure/",
response=EnhancedPageNumberPagination.Output[CompoundStructureOutSchema],
)
@paginate(
EnhancedPageNumberPagination,
page_size=s.API_PAGINATION_DEFAULT_PAGE_SIZE,
filter_schema=StructureReviewStatusFilter,
)
def list_package_structures(request, package_uuid: UUID, compound_uuid: UUID):
"""
List all structures for a specific package and compound.
"""
user = request.user
return (
get_package_compound_structure_for_read(package_uuid, compound_uuid, user)
.order_by("name")
.all()
)

28
epapi/v1/errors.py Normal file
View File

@ -0,0 +1,28 @@
from ninja.errors import HttpError
class EPAPIError(HttpError):
status_code: int = 500
def __init__(self, message: str) -> None:
super().__init__(status_code=self.status_code, message=message)
@classmethod
def from_exception(cls, exc: Exception):
return cls(message=str(exc))
class EPAPIUnauthorizedError(EPAPIError):
status_code = 401
class EPAPIPermissionDeniedError(EPAPIError):
status_code = 403
class EPAPINotFoundError(EPAPIError):
status_code = 404
class EPAPIValidationError(EPAPIError):
status_code = 422

View File

@ -0,0 +1,3 @@
"""
Service interfaces: each subdirectory defines the full boundary contract between enviPy and feature-flagged apps. DTOs and projections are shared concerns to avoid direct ORM access.
"""

View File

View File

@ -0,0 +1,58 @@
from dataclasses import dataclass, field
from uuid import UUID
@dataclass(frozen=True)
class PathwayCompoundDTO:
pk: int
name: str
smiles: str | None = None
cas_number: str | None = None
ec_number: str | None = None
@dataclass(frozen=True)
class PathwayScenarioDTO:
scenario_uuid: UUID
name: str
additional_info: list = field(default_factory=list) # EnviPyModel instances
@dataclass(frozen=True)
class PathwayNodeDTO:
node_uuid: UUID
compound_pk: int
name: str
depth: int
smiles: str | None = None
cas_number: str | None = None
ec_number: str | None = None
additional_info: list = field(default_factory=list) # EnviPyModel instances
scenarios: list[PathwayScenarioDTO] = field(default_factory=list)
@dataclass(frozen=True)
class PathwayEdgeDTO:
edge_uuid: UUID
start_compound_pks: list[int] = field(default_factory=list)
end_compound_pks: list[int] = field(default_factory=list)
probability: float | None = None
@dataclass(frozen=True)
class PathwayModelInfoDTO:
model_name: str | None = None
model_uuid: UUID | None = None
software_name: str | None = None
software_version: str | None = None
@dataclass(frozen=True)
class PathwayExportDTO:
pathway_uuid: UUID
pathway_name: str
compounds: list[PathwayCompoundDTO] = field(default_factory=list)
nodes: list[PathwayNodeDTO] = field(default_factory=list)
edges: list[PathwayEdgeDTO] = field(default_factory=list)
root_compound_pks: list[int] = field(default_factory=list)
model_info: PathwayModelInfoDTO | None = None

View File

@ -0,0 +1,142 @@
from uuid import UUID
from epdb.logic import PackageManager
from epdb.models import Pathway
from epapi.v1.errors import EPAPINotFoundError, EPAPIPermissionDeniedError
from .dto import (
PathwayCompoundDTO,
PathwayEdgeDTO,
PathwayExportDTO,
PathwayModelInfoDTO,
PathwayNodeDTO,
PathwayScenarioDTO,
)
def get_pathway_for_iuclid_export(user, pathway_uuid: UUID) -> PathwayExportDTO:
"""Return pathway data projected into DTOs for the IUCLID export consumer."""
try:
pathway = (
Pathway.objects.select_related("package", "setting", "setting__model")
.prefetch_related(
"node_set__default_node_label__compound__external_identifiers__database",
"node_set__scenarios",
"edge_set__start_nodes__default_node_label__compound",
"edge_set__end_nodes__default_node_label__compound",
)
.get(uuid=pathway_uuid)
)
except Pathway.DoesNotExist:
raise EPAPINotFoundError(f"Pathway with UUID {pathway_uuid} not found")
if not user or user.is_anonymous or not PackageManager.readable(user, pathway.package):
raise EPAPIPermissionDeniedError("Insufficient permissions to access this pathway.")
nodes: list[PathwayNodeDTO] = []
edges: list[PathwayEdgeDTO] = []
compounds_by_pk: dict[int, PathwayCompoundDTO] = {}
root_compound_pks: list[int] = []
for node in pathway.node_set.all().order_by("depth", "pk"):
cs = node.default_node_label
if cs is None:
continue
compound = cs.compound
cas_number = None
ec_number = None
for ext_id in compound.external_identifiers.all():
db_name = ext_id.database.name if ext_id.database else None
if db_name == "CAS" and cas_number is None:
cas_number = ext_id.identifier_value
elif db_name == "EC" and ec_number is None:
ec_number = ext_id.identifier_value
ai_for_node = []
scenario_entries: list[PathwayScenarioDTO] = []
for scenario in sorted(node.scenarios.all(), key=lambda item: item.pk):
ai_for_scenario = list(scenario.get_additional_information(direct_only=True))
ai_for_node.extend(ai_for_scenario)
scenario_entries.append(
PathwayScenarioDTO(
scenario_uuid=scenario.uuid,
name=scenario.name,
additional_info=ai_for_scenario,
)
)
nodes.append(
PathwayNodeDTO(
node_uuid=node.uuid,
compound_pk=compound.pk,
name=compound.name,
depth=node.depth,
smiles=cs.smiles,
cas_number=cas_number,
ec_number=ec_number,
additional_info=ai_for_node,
scenarios=scenario_entries,
)
)
if node.depth == 0 and compound.pk not in root_compound_pks:
root_compound_pks.append(compound.pk)
if compound.pk not in compounds_by_pk:
compounds_by_pk[compound.pk] = PathwayCompoundDTO(
pk=compound.pk,
name=compound.name,
smiles=cs.smiles,
cas_number=cas_number,
ec_number=ec_number,
)
for edge in pathway.edge_set.all():
start_compounds = {
n.default_node_label.compound.pk
for n in edge.start_nodes.all()
if n.default_node_label is not None
}
end_compounds = {
n.default_node_label.compound.pk
for n in edge.end_nodes.all()
if n.default_node_label is not None
}
probability = None
if edge.kv and edge.kv.get("probability") is not None:
try:
probability = float(edge.kv.get("probability"))
except (TypeError, ValueError):
probability = None
edges.append(
PathwayEdgeDTO(
edge_uuid=edge.uuid,
start_compound_pks=sorted(start_compounds),
end_compound_pks=sorted(end_compounds),
probability=probability,
)
)
model_info = None
if pathway.setting and pathway.setting.model:
model = pathway.setting.model
model_info = PathwayModelInfoDTO(
model_name=model.get_name(),
model_uuid=model.uuid,
software_name="enviPath",
software_version=None,
)
return PathwayExportDTO(
pathway_uuid=pathway.uuid,
pathway_name=pathway.get_name(),
compounds=list(compounds_by_pk.values()),
nodes=nodes,
edges=edges,
root_compound_pks=root_compound_pks,
model_info=model_info,
)

60
epapi/v1/pagination.py Normal file
View File

@ -0,0 +1,60 @@
import math
from typing import Any, Generic, List, TypeVar
from django.db.models import QuerySet
from ninja import Schema
from ninja.pagination import PageNumberPagination
T = TypeVar("T")
class EnhancedPageNumberPagination(PageNumberPagination):
class Output(Schema, Generic[T]):
items: List[T]
page: int
page_size: int
total_items: int
total_pages: int
def paginate_queryset(
self,
queryset: QuerySet,
pagination: PageNumberPagination.Input,
**params: Any,
) -> Any:
page_size = self._get_page_size(pagination.page_size)
offset = (pagination.page - 1) * page_size
total_items = self._items_count(queryset)
total_pages = math.ceil(total_items / page_size) if page_size > 0 else 0
return {
"items": queryset[offset : offset + page_size],
"page": pagination.page,
"page_size": page_size,
"total_items": total_items,
"total_pages": total_pages,
}
async def apaginate_queryset(
self,
queryset: QuerySet,
pagination: PageNumberPagination.Input,
**params: Any,
) -> Any:
page_size = self._get_page_size(pagination.page_size)
offset = (pagination.page - 1) * page_size
total_items = await self._aitems_count(queryset)
total_pages = math.ceil(total_items / page_size) if page_size > 0 else 0
if isinstance(queryset, QuerySet):
items = [obj async for obj in queryset[offset : offset + page_size]]
else:
items = queryset[offset : offset + page_size]
return {
"items": items,
"page": pagination.page,
"page_size": page_size,
"total_items": total_items,
"total_pages": total_pages,
}

44
epapi/v1/router.py Normal file
View File

@ -0,0 +1,44 @@
from ninja import Router
from ninja.security import SessionAuth
from envipath import settings as s
from .auth import BearerTokenAuth
from .endpoints import (
packages,
scenarios,
compounds,
rules,
reactions,
pathways,
models,
structure,
additional_information,
settings,
groups,
)
# Main router with authentication
router = Router(
auth=[
SessionAuth(),
BearerTokenAuth(),
]
)
# Include all endpoint routers
router.add_router("", packages.router)
router.add_router("", scenarios.router)
router.add_router("", compounds.router)
router.add_router("", rules.router)
router.add_router("", reactions.router)
router.add_router("", pathways.router)
router.add_router("", models.router)
router.add_router("", structure.router)
router.add_router("", additional_information.router)
router.add_router("", settings.router)
router.add_router("", groups.router)
if s.IUCLID_EXPORT_ENABLED:
from epiuclid.api import router as iuclid_router
router.add_router("", iuclid_router)

135
epapi/v1/schemas.py Normal file
View File

@ -0,0 +1,135 @@
from ninja import FilterSchema, FilterLookup, Schema
from typing import Annotated, Optional, List, Dict, Any
from uuid import UUID
# Filter schema for query parameters
class ReviewStatusFilter(FilterSchema):
"""Filter schema for review_status query parameter."""
review_status: Annotated[Optional[bool], FilterLookup("package__reviewed")] = None
class SelfReviewStatusFilter(FilterSchema):
"""Filter schema for review_status query parameter on self-reviewed entities."""
review_status: Annotated[Optional[bool], FilterLookup("reviewed")] = None
class StructureReviewStatusFilter(FilterSchema):
"""Filter schema for review_status on structures (via compound->package)."""
review_status: Annotated[Optional[bool], FilterLookup("compound__package__reviewed")] = None
# Base schema for all package-scoped entities
class PackageEntityOutSchema(Schema):
"""Base schema for entities belonging to a package."""
uuid: UUID
url: str = ""
name: str
description: str
review_status: str = ""
package: str = ""
@staticmethod
def resolve_url(obj):
return obj.url
@staticmethod
def resolve_package(obj):
return obj.package.url
@staticmethod
def resolve_review_status(obj):
return "reviewed" if obj.package.reviewed else "unreviewed"
# All package-scoped entities inherit from base
class ScenarioOutSchema(PackageEntityOutSchema):
pass
class AdditionalInformationItemSchema(Schema):
"""Schema for additional information item in scenario creation."""
type: str
data: Dict[str, Any]
class ScenarioCreateSchema(Schema):
"""Schema for creating a new scenario."""
name: str
description: str = ""
scenario_date: str = "No date"
scenario_type: str = "Not specified"
additional_information: List[AdditionalInformationItemSchema] = []
class CompoundOutSchema(PackageEntityOutSchema):
pass
class RuleOutSchema(PackageEntityOutSchema):
pass
class ReactionOutSchema(PackageEntityOutSchema):
pass
class PathwayOutSchema(PackageEntityOutSchema):
pass
class ModelOutSchema(PackageEntityOutSchema):
pass
class CompoundStructureOutSchema(PackageEntityOutSchema):
compound: str = ""
@staticmethod
def resolve_compound(obj):
return obj.compound.url
@staticmethod
def resolve_package(obj):
return obj.compound.package.url
@staticmethod
def resolve_review_status(obj):
return "reviewed" if obj.compound.package.reviewed else "unreviewed"
# Package is special (no package FK)
class PackageOutSchema(Schema):
uuid: UUID
url: str = ""
name: str
description: str
review_status: str = ""
@staticmethod
def resolve_url(obj):
return obj.url
@staticmethod
def resolve_review_status(obj):
return "reviewed" if obj.reviewed else "unreviewed"
class SettingOutSchema(Schema):
uuid: UUID
url: str = ""
name: str
description: str
class GroupOutSchema(Schema):
uuid: UUID
url: str = ""
name: str
description: str

View File

@ -3,6 +3,6 @@ from django.urls import path
from . import views
urlpatterns = [
path("microsoft/login/", views.microsoft_login, name="microsoft_login"),
path("microsoft/callback/", views.microsoft_callback, name="microsoft_callback"),
path("entra/login/", views.entra_login, name="entra_login"),
path("auth/redirect/", views.entra_callback, name="entra_callback"),
]

View File

@ -1,34 +1,50 @@
import msal
from django.conf import settings as s
from django.contrib.auth import get_user_model
from django.contrib.auth import login
from django.shortcuts import redirect
from django.contrib.auth import get_user_model
from epdb.logic import UserManager
from epdb.logic import UserManager, GroupManager
from epdb.models import Group
def microsoft_login(request):
def get_msal_app_with_cache(request):
"""
Create MSAL app with session-based token cache.
"""
cache = msal.SerializableTokenCache()
# Load cache from session if it exists
if request.session.get("msal_token_cache"):
cache.deserialize(request.session["msal_token_cache"])
msal_app = msal.ConfidentialClientApplication(
client_id=s.MS_ENTRA_CLIENT_ID,
client_credential=s.MS_ENTRA_CLIENT_SECRET,
authority=s.MS_ENTRA_AUTHORITY
authority=s.MS_ENTRA_AUTHORITY,
token_cache=cache
)
return msal_app, cache
def entra_login(request):
msal_app = msal.ConfidentialClientApplication(
client_id=s.MS_ENTRA_CLIENT_ID,
client_credential=s.MS_ENTRA_CLIENT_SECRET,
authority=s.MS_ENTRA_AUTHORITY,
)
flow = msal_app.initiate_auth_code_flow(
scopes=s.MS_ENTRA_SCOPES,
redirect_uri=s.MS_ENTRA_REDIRECT_URI
scopes=s.MS_ENTRA_SCOPES, redirect_uri=s.MS_ENTRA_REDIRECT_URI
)
request.session["msal_auth_flow"] = flow
return redirect(flow["auth_uri"])
def microsoft_callback(request):
msal_app = msal.ConfidentialClientApplication(
client_id=s.MS_ENTRA_CLIENT_ID,
client_credential=s.MS_ENTRA_CLIENT_SECRET,
authority=s.MS_ENTRA_AUTHORITY
)
def entra_callback(request):
msal_app, cache = get_msal_app_with_cache(request)
flow = request.session.pop("msal_auth_flow", None)
if not flow:
@ -37,30 +53,105 @@ def microsoft_callback(request):
# Acquire token using the flow and callback request
result = msal_app.acquire_token_by_auth_code_flow(flow, request.GET)
if "access_token" in result:
# Optional: Fetch user info from Microsoft Graph
import requests
resp = requests.get(
"https://graph.microsoft.com/v1.0/me",
headers={"Authorization": f"Bearer {result['access_token']}"}
)
user_info = resp.json()
# Save the token cache to session
if cache.has_state_changed:
request.session["msal_token_cache"] = cache.serialize()
user_name = user_info["displayName"]
user_email = user_info["mail"]
user_oid = user_info["id"]
claims = result["id_token_claims"]
# Get implementing class
User = get_user_model()
user_name = claims["name"]
user_email = claims.get("emailaddress", claims["email"])
user_oid = claims["oid"]
if User.objects.filter(uuid=user_oid).exists():
login(request, User.objects.get(uuid=user_oid))
# Get implementing class
User = get_user_model()
if User.objects.filter(uuid=user_oid).exists():
u = User.objects.get(uuid=user_oid)
if u.username != user_name:
u.username = user_name
u.save()
else:
u = UserManager.create_user(user_name, user_email, None, uuid=user_oid, is_active=True)
login(request, u)
# EDIT START
# Ensure groups exists in eP
for id, name in s.ENTRA_SECRET_GROUPS.items():
if not Group.objects.filter(uuid=id).exists():
g = GroupManager.create_group(User.objects.get(username="admin"), name, f"Synced Entra Group {name} ",
uuid=id)
else:
u = UserManager.create_user(user_name, user_email, None, uuid=user_oid, is_active=True)
login(request, u)
g = Group.objects.get(uuid=id)
# Ensure its secret
g.secret = True
g.save()
# TODO Group Sync
for id, name in s.ENTRA_GROUPS.items():
if not Group.objects.filter(uuid=id).exists():
g = GroupManager.create_group(User.objects.get(username="admin"), name, f"Synced Entra Group {name} ",
uuid=id)
else:
g = Group.objects.get(uuid=id)
return redirect("/")
for group_uuid in claims.get("groups", []):
if Group.objects.filter(uuid=group_uuid).exists():
g = Group.objects.get(uuid=group_uuid)
g.user_member.add(u)
return redirect("/") # Handle errors
# EDIT END
return redirect(s.SERVER_URL) # Handle errors
def get_access_token_from_request(request, scopes=None):
"""
Get an access token from the request using MSAL token cache.
"""
if scopes is None:
scopes = s.MS_ENTRA_SCOPES
# Get user from request (must be authenticated)
if not request.user.is_authenticated:
return None
# Create MSAL app with persistent cache
msal_app, cache = get_msal_app_with_cache(request)
# Try to get accounts from cache
accounts = msal_app.get_accounts()
if not accounts:
return None
# Find the account that matches the current user
user_account = None
for account in accounts:
if account.get("local_account_id") == str(request.user.uuid):
user_account = account
break
# If no matching account found, use the first available account
if not user_account and accounts:
user_account = accounts[0]
if not user_account:
return None
# Try to acquire token silently from cache
result = msal_app.acquire_token_silent(
scopes=scopes,
account=user_account
)
# Save cache changes back to session
if cache.has_state_changed:
request.session["msal_token_cache"] = cache.serialize()
if result and "access_token" in result:
return result
return None

View File

@ -1,32 +1,49 @@
from django.conf import settings as s
from django.contrib import admin
from .models import (
User,
UserPackagePermission,
Group,
GroupPackagePermission,
Package,
MLRelativeReasoning,
EnviFormer,
AdditionalInformation,
ClassifierPluginModel,
Compound,
CompoundStructure,
SimpleAmbitRule,
ParallelRule,
Reaction,
Pathway,
Node,
Edge,
Scenario,
Setting,
EnviFormer,
ExternalDatabase,
ExternalIdentifier,
Group,
GroupPackagePermission,
JobLog,
License,
MLRelativeReasoning,
Node,
ParallelRule,
Pathway,
PropertyPluginModel,
Reaction,
Scenario,
Setting,
SimpleAmbitRule,
User,
UserPackagePermission,
)
Package = s.GET_PACKAGE_MODEL()
class AdditionalInformationAdmin(admin.ModelAdmin):
pass
class UserAdmin(admin.ModelAdmin):
list_display = ["username", "email", "is_active"]
list_display = [
"username",
"email",
"is_active",
"is_staff",
"is_superuser",
"last_login",
"date_joined",
]
class UserPackagePermissionAdmin(admin.ModelAdmin):
@ -46,7 +63,7 @@ class JobLogAdmin(admin.ModelAdmin):
class EPAdmin(admin.ModelAdmin):
search_fields = ["name", "description"]
search_fields = ["name", "description", "url", "uuid"]
list_display = ["name", "url", "created"]
ordering = ["-created"]
@ -63,6 +80,14 @@ class EnviFormerAdmin(EPAdmin):
pass
class PropertyPluginModelAdmin(admin.ModelAdmin):
pass
class ClassifierPluginModelAdmin(admin.ModelAdmin):
pass
class LicenseAdmin(admin.ModelAdmin):
list_display = ["cc_string", "link", "image_link"]
@ -115,6 +140,7 @@ class ExternalIdentifierAdmin(admin.ModelAdmin):
pass
admin.site.register(AdditionalInformation, AdditionalInformationAdmin)
admin.site.register(User, UserAdmin)
admin.site.register(UserPackagePermission, UserPackagePermissionAdmin)
admin.site.register(Group, GroupAdmin)
@ -123,7 +149,9 @@ admin.site.register(JobLog, JobLogAdmin)
admin.site.register(Package, PackageAdmin)
admin.site.register(MLRelativeReasoning, MLRelativeReasoningAdmin)
admin.site.register(EnviFormer, EnviFormerAdmin)
admin.site.register(PropertyPluginModel, PropertyPluginModelAdmin)
admin.site.register(License, LicenseAdmin)
admin.site.register(ClassifierPluginModel, ClassifierPluginModelAdmin)
admin.site.register(Compound, CompoundAdmin)
admin.site.register(CompoundStructure, CompoundStructureAdmin)
admin.site.register(SimpleAmbitRule, SimpleAmbitRuleAdmin)

View File

@ -2,20 +2,12 @@ from typing import List
from django.contrib.auth import get_user_model
from ninja import Router, Schema, Field
from ninja.errors import HttpError
from ninja.pagination import paginate
from ninja.security import HttpBearer
from epapi.v1.auth import BearerTokenAuth
from .logic import PackageManager
from .models import User, Compound, APIToken
class BearerTokenAuth(HttpBearer):
def authenticate(self, request, token):
for token_obj in APIToken.objects.select_related("user").all():
if token_obj.check_token(token) and token_obj.is_valid():
return token_obj.user
raise HttpError(401, "Invalid or expired token")
from .models import User, Compound
def _anonymous_or_real(request):

View File

@ -1,4 +1,9 @@
import logging
from django.apps import AppConfig
from django.conf import settings
logger = logging.getLogger(__name__)
class EPDBConfig(AppConfig):
@ -7,3 +12,17 @@ class EPDBConfig(AppConfig):
def ready(self):
import epdb.signals # noqa: F401
model_name = getattr(settings, "EPDB_PACKAGE_MODEL", "epdb.Package")
logger.info(f"Using Package model: {model_name}")
from .autodiscovery import autodiscover
autodiscover()
if settings.PLUGINS_ENABLED:
from bridge.contracts import Property, Classifier
from utilities.plugin import discover_plugins
settings.PROPERTY_PLUGINS.update(**discover_plugins(_cls=Property))
settings.CLASSIFIER_PLUGINS.update(**discover_plugins(_cls=Classifier))

5
epdb/autodiscovery.py Normal file
View File

@ -0,0 +1,5 @@
from django.utils.module_loading import autodiscover_modules
def autodiscover():
autodiscover_modules("epdb_hooks")

View File

@ -5,7 +5,7 @@ Context processors automatically make variables available to all templates.
"""
from .logic import PackageManager
from .models import Package
from django.conf import settings as s
def package_context(request):
@ -20,7 +20,7 @@ def package_context(request):
reviewed_package_qs = PackageManager.get_reviewed_packages()
unreviewed_package_qs = Package.objects.none()
unreviewed_package_qs = s.GET_PACKAGE_MODEL().objects.none()
# Only get user-specific packages if user is authenticated
if current_user.is_authenticated:

File diff suppressed because it is too large Load Diff

View File

@ -1,39 +1,42 @@
import re
import logging
import json
from typing import Union, List, Optional, Set, Dict, Any
import re
from typing import Any, Dict, List, Optional, Set, Union, Tuple
from uuid import UUID
import nh3
from django.conf import settings as s
from django.contrib.auth import get_user_model
from django.db import transaction
from django.conf import settings as s
from pydantic import ValidationError
from epdb.models import (
User,
Package,
UserPackagePermission,
GroupPackagePermission,
Permission,
Group,
Setting,
EPModel,
UserSettingPermission,
Rule,
Pathway,
Node,
Edge,
AdditionalInformation,
Compound,
Reaction,
CompoundStructure,
Edge,
EnzymeLink,
EPModel,
ExpansionSchemeChoice,
Group,
GroupPackagePermission,
Node,
Pathway,
Permission,
PropertyPluginModel,
Reaction,
Rule,
Setting,
User,
UserPackagePermission,
UserSettingPermission,
)
from utilities.chem import FormatConverter
from utilities.misc import PackageImporter, PackageExporter
from utilities.misc import PackageExporter, PackageImporter
logger = logging.getLogger(__name__)
Package = s.GET_PACKAGE_MODEL()
class EPDBURLParser:
UUID_PATTERN = r"[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}"
@ -192,8 +195,6 @@ class UserManager(object):
if clean_username != username or clean_email != email:
# This will be caught by the try in view.py/register
raise ValueError("Invalid username or password")
# avoid circular import :S
from .tasks import send_registration_mail
extra_fields = {"is_active": not s.ADMIN_APPROVAL_REQUIRED}
@ -212,10 +213,6 @@ class UserManager(object):
u.default_package = p
u.save()
if not u.is_active:
# send email for verification
send_registration_mail.delay(u.pk)
if set_setting:
u.default_setting = Setting.objects.get(global_default=True)
u.save()
@ -267,8 +264,12 @@ class GroupManager(object):
return bool(re.findall(GroupManager.group_pattern, url))
@staticmethod
def create_group(current_user, name, description):
def create_group(current_user, name, description, *args, **kwargs):
g = Group()
if "uuid" in kwargs:
g.uuid = kwargs["uuid"]
# Clean for potential XSS
g.name = nh3.clean(name, tags=s.ALLOWED_HTML_TAGS).strip()
g.description = nh3.clean(description, tags=s.ALLOWED_HTML_TAGS).strip()
@ -442,6 +443,7 @@ class PackageManager(object):
if PackageManager.readable(user, p):
return p
else:
# FIXME: use custom exception to be translatable to 403 in API
raise ValueError(
"Insufficient permissions to access Package with ID {}".format(package_id)
)
@ -578,30 +580,39 @@ class PackageManager(object):
else:
_ = perm_cls.objects.update_or_create(defaults={"permission": new_perm}, **data)
@staticmethod
def grant_read(caller: User, package: Package, grantee: Union[User, Group]):
PackageManager.update_permissions(caller, package, grantee, Permission.READ[0])
@staticmethod
def grant_write(caller: User, package: Package, grantee: Union[User, Group]):
PackageManager.update_permissions(caller, package, grantee, Permission.WRITE[0])
@staticmethod
@transaction.atomic
def import_legacy_package(
data: dict, owner: User, keep_ids=False, add_import_timestamp=True, trust_reviewed=False
):
from uuid import UUID, uuid4
from datetime import datetime
from collections import defaultdict
from datetime import datetime
from uuid import UUID, uuid4
from envipy_additional_information import AdditionalInformationConverter
from .models import (
Package,
Compound,
CompoundStructure,
SimpleRule,
SimpleAmbitRule,
Edge,
Node,
ParallelRule,
Pathway,
Reaction,
Scenario,
SequentialRule,
SequentialRuleOrdering,
Reaction,
Pathway,
Node,
Edge,
Scenario,
SimpleAmbitRule,
SimpleRule,
)
from envipy_additional_information import AdditionalInformationConverter
pack = Package()
pack.uuid = UUID(data["id"].split("/")[-1]) if keep_ids else uuid4()
@ -616,6 +627,25 @@ class PackageManager(object):
else:
pack.reviewed = False
# EDIT START
if data.get("classification"):
if data["classification"] == "INTERNAL":
pack.classification = Package.Classification.RESTRICTED
elif data["classification"] == "RESTRICTED":
pack.classification = Package.Classification.RESTRICTED
elif data["classification"] == "SECRET":
pack.classification = Package.Classification.SECRET
if not "datapool" in data:
raise ValueError("Missing datapool in package")
g = Group.objects.get(uuid=data["datapool"].split('/')[-1])
pack.data_pool = g
else:
raise ValueError(f"Invalid classification {data['classification']}")
# EDIT END
pack.description = data["description"]
pack.save()
@ -627,15 +657,30 @@ class PackageManager(object):
# Stores old_id to new_id
mapping = {}
# Stores new_scen_id to old_parent_scen_id
parent_mapping = {}
# Mapping old scen_id to old_obj_id
scen_mapping = defaultdict(list)
# Enzymelink Mapping rule_id to enzymelink objects
enzyme_mapping = defaultdict(list)
# old_parent_id to child
postponed_scens = defaultdict(list)
# Store Scenarios
for scenario in data["scenarios"]:
skip_scen = False
# Check if parent exists and park this Scenario to convert it later into an
# AdditionalInformation object
for ex in scenario.get("additionalInformationCollection", {}).get(
"additionalInformation", []
):
if ex["name"] == "referringscenario":
postponed_scens[ex["data"]].append(scenario)
skip_scen = True
break
if skip_scen:
continue
scen = Scenario()
scen.package = pack
scen.uuid = UUID(scenario["id"].split("/")[-1]) if keep_ids else uuid4()
@ -648,19 +693,12 @@ class PackageManager(object):
mapping[scenario["id"]] = scen.uuid
new_add_inf = defaultdict(list)
# TODO Store AI...
for ex in scenario.get("additionalInformationCollection", {}).get(
"additionalInformation", []
):
name = ex["name"]
addinf_data = ex["data"]
# park the parent scen id for now and link it later
if name == "referringscenario":
parent_mapping[scen.uuid] = addinf_data
continue
# Broken eP Data
if name == "initialmasssediment" and addinf_data == "missing data":
continue
@ -668,17 +706,11 @@ class PackageManager(object):
continue
try:
res = AdditionalInformationConverter.convert(name, addinf_data)
res_cls_name = res.__class__.__name__
ai_data = json.loads(res.model_dump_json())
ai_data["uuid"] = f"{uuid4()}"
new_add_inf[res_cls_name].append(ai_data)
except ValidationError:
ai = AdditionalInformationConverter.convert(name, addinf_data)
AdditionalInformation.create(pack, ai, scenario=scen)
except (ValidationError, ValueError):
logger.error(f"Failed to convert {name} with {addinf_data}")
scen.additional_information = new_add_inf
scen.save()
print("Scenarios imported...")
# Store compounds and its structures
@ -699,7 +731,13 @@ class PackageManager(object):
default_structure = None
for structure in compound["structures"]:
struc = CompoundStructure()
if structure.get("pesLink"):
from bayer.models import PESStructure
struc = PESStructure()
struc.pes_link = structure["pesLink"]
else:
struc = CompoundStructure()
# struc.object_url = Command.get_id(structure, keep_ids)
struc.compound = comp
struc.uuid = UUID(structure["id"].split("/")[-1]) if keep_ids else uuid4()
@ -707,6 +745,10 @@ class PackageManager(object):
struc.description = structure["description"]
struc.aliases = structure.get("aliases", [])
struc.smiles = structure["smiles"]
if structure.get("molfile"):
struc.molfile = structure["molfile"]
struc.save()
for scen in structure["scenarios"]:
@ -918,14 +960,46 @@ class PackageManager(object):
print("Pathways imported...")
# Linking Phase
for child, parent in parent_mapping.items():
child_obj = Scenario.objects.get(uuid=child)
parent_obj = Scenario.objects.get(uuid=mapping[parent])
child_obj.parent = parent_obj
child_obj.save()
for parent, children in postponed_scens.items():
for child in children:
for ex in child.get("additionalInformationCollection", {}).get(
"additionalInformation", []
):
child_id = child["id"]
name = ex["name"]
addinf_data = ex["data"]
if name == "referringscenario":
continue
# Broken eP Data
if name == "initialmasssediment" and addinf_data == "missing data":
continue
if name == "columnheight" and addinf_data == "(2)-(2.5);(6)-(8)":
continue
ai = AdditionalInformationConverter.convert(name, addinf_data)
if child_id not in scen_mapping:
logger.info(
f"{child_id} not found in scen_mapping. Seems like its not attached to any object"
)
print(
f"{child_id} not found in scen_mapping. Seems like its not attached to any object"
)
scen = Scenario.objects.get(uuid=mapping[parent])
mapping[child_id] = scen.uuid
for obj in scen_mapping[child_id]:
_ = AdditionalInformation.create(pack, ai, scen, content_object=obj)
for scen_id, objects in scen_mapping.items():
new_id = mapping.get(scen_id)
if new_id is None:
logger.warning(f"Could not find mapping for {scen_id}")
print(f"Could not find mapping for {scen_id}")
continue
scen = Scenario.objects.get(uuid=mapping[scen_id])
for o in objects:
o.scenarios.add(scen)
@ -958,6 +1032,7 @@ class PackageManager(object):
matches = re.findall(r">(R[0-9]+)<", evidence["evidence"])
if not matches or len(matches) != 1:
logger.warning(f"Could not find reaction id in {evidence['evidence']}")
print(f"Could not find reaction id in {evidence['evidence']}")
continue
e.add_kegg_reaction_id(matches[0])
@ -977,7 +1052,6 @@ class PackageManager(object):
print("Fixing Node depths...")
total_pws = Pathway.objects.filter(package=pack).count()
for p, pw in enumerate(Pathway.objects.filter(package=pack)):
print(pw.url)
in_count = defaultdict(lambda: 0)
out_count = defaultdict(lambda: 0)
@ -1013,7 +1087,6 @@ class PackageManager(object):
if str(prod.uuid) not in seen:
old_depth = prod.depth
if old_depth != i + 1:
print(f"updating depth from {old_depth} to {i + 1}")
prod.depth = i + 1
prod.save()
@ -1024,7 +1097,7 @@ class PackageManager(object):
if new_level:
levels.append(new_level)
print(f"{p + 1}/{total_pws} fixed.")
print(f"{p + 1}/{total_pws} fixed.", end="\r")
return pack
@ -1103,18 +1176,23 @@ class SettingManager(object):
description: str = None,
max_nodes: int = None,
max_depth: int = None,
rule_packages: List[Package] = None,
rule_packages: List[Package] | None = None,
model: EPModel = None,
model_threshold: float = None,
expansion_scheme: ExpansionSchemeChoice = ExpansionSchemeChoice.BFS,
property_models: List["PropertyPluginModel"] | None = None,
):
new_s = Setting()
# Clean for potential XSS
new_s.name = nh3.clean(name, tags=s.ALLOWED_HTML_TAGS).strip()
new_s.description = nh3.clean(description, tags=s.ALLOWED_HTML_TAGS).strip()
new_s.max_nodes = max_nodes
new_s.max_depth = max_depth
new_s.model = model
new_s.model_threshold = model_threshold
new_s.expansion_scheme = expansion_scheme
new_s.save()
@ -1123,6 +1201,11 @@ class SettingManager(object):
new_s.rule_packages.add(r)
new_s.save()
if property_models is not None:
for pm in property_models:
new_s.property_models.add(pm)
new_s.save()
usp = UserSettingPermission()
usp.user = user
usp.setting = new_s
@ -1388,6 +1471,9 @@ class SEdge(object):
self.rule = rule
self.probability = probability
def product_smiles(self):
return [p.smiles for p in self.products]
def __hash__(self):
full_hash = 0
@ -1473,6 +1559,7 @@ class SPathway(object):
self.smiles_to_node: Dict[str, SNode] = dict(**{n.smiles: n for n in self.root_nodes})
self.edges: Set["SEdge"] = set()
self.done = False
self.empty_due_to_threshold = False
@staticmethod
def from_pathway(pw: "Pathway", persist: bool = True):
@ -1537,6 +1624,207 @@ class SPathway(object):
return sorted(res, key=lambda x: hash(x))
def _expand(self, substrates: List[SNode]) -> Tuple[List[SNode], List[SEdge]]:
"""
Expands the given substrates by generating new nodes and edges based on prediction settings.
This method processes a list of substrates and expands them into new nodes and edges using defined
rules and settings. It evaluates each substrate to determine its applicability domain, persists
domain assessments, and generates candidates for further processing. Newly created nodes and edges
are returned, and any applicable information is stored or updated internally during the process.
Parameters:
substrates (List[SNode]): A list of substrate nodes to be expanded.
Returns:
Tuple[List[SNode], List[SEdge]]:
A tuple containing:
- A list of new nodes generated during the expansion.
- A list of new edges representing connections between nodes based on candidate reactions.
Raises:
ValueError: If a node does not have an ID when it should have been saved already.
"""
new_nodes: List[SNode] = []
new_edges: List[SEdge] = []
for sub in substrates:
# For App Domain we have to ensure that each Node is evaluated
if sub.app_domain_assessment is None:
if self.prediction_setting.model:
if self.prediction_setting.model.app_domain:
app_domain_assessment = self.prediction_setting.model.app_domain.assess(
sub.smiles
)
if self.persist is not None:
n = self.snode_persist_lookup[sub]
if n.id is None:
raise ValueError(f"Node {n} has no ID... aborting!")
node_data = n.simple_json()
node_data["image"] = f"{n.url}?image=svg"
app_domain_assessment["assessment"]["node"] = node_data
n.kv["app_domain_assessment"] = app_domain_assessment
n.save()
sub.app_domain_assessment = app_domain_assessment
expansion_result = self.prediction_setting.expand(self, sub)
# We don't have any substrate, but technically we have at least one rule that triggered.
# If our substrate is a root node a.k.a. depth == 0 store that info in SPathway
if (
len(expansion_result["transformations"]) == 0
and expansion_result["rule_triggered"]
and sub.depth == 0
):
self.empty_due_to_threshold = True
# Emit directly
if self.persist is not None:
self.persist.kv["empty_due_to_threshold"] = True
self.persist.save()
# candidates is a List of PredictionResult. The length of the List is equal to the number of rules
for cand_set in expansion_result["transformations"]:
if cand_set:
# cand_set is a PredictionResult object that can consist of multiple candidate reactions
for cand in cand_set:
cand_nodes = []
# candidate reactions can have multiple fragments
for c in cand:
if c not in self.smiles_to_node:
# For new nodes do an AppDomain Assessment if an AppDomain is attached
app_domain_assessment = None
if self.prediction_setting.model:
if self.prediction_setting.model.app_domain:
app_domain_assessment = (
self.prediction_setting.model.app_domain.assess(c)
)
snode = SNode(c, sub.depth + 1, app_domain_assessment)
self.smiles_to_node[c] = snode
new_nodes.append(snode)
node = self.smiles_to_node[c]
cand_nodes.append(node)
edge = SEdge(
sub,
cand_nodes,
rule=cand_set.rule,
probability=cand_set.probability,
)
self.edges.add(edge)
new_edges.append(edge)
return new_nodes, new_edges
def predict(self):
"""
Predicts outcomes based on a graph traversal algorithm using the specified expansion schema.
This method iteratively explores the nodes of a graph starting from the root nodes, propagating
probabilities through edges, and updating the probabilities of the connected nodes. The traversal
can follow one of three predefined expansion schemas: Depth-First Search (DFS), Breadth-First Search
(BFS), or a Greedy approach based on node probabilities. The methodology ensures that all reachable
nodes are processed systematically according to the specified schema.
Errors will be raised if the expansion schema is undefined or invalid. Additionally, this method
supports persisting changes by writing back data to the database when configured to do so.
Attributes
----------
done : bool
A flag indicating whether the prediction process is completed.
persist : Any
An optional object that manages persistence operations for saving modifications.
root_nodes : List[SNode]
A collection of initial nodes in the graph from which traversal begins.
prediction_setting : Any
Configuration object specifying settings for graph traversal, such as the choice of
expansion schema.
Raises
------
ValueError
If an invalid or unknown expansion schema is provided in `prediction_setting`.
"""
# populate initial queue
queue = list(self.root_nodes)
processed = set()
# initial nodes have prob 1.0
node_probs: Dict[SNode, float] = {}
node_probs.update({n: 1.0 for n in queue})
while queue:
current = queue.pop(0)
if current in processed:
continue
processed.add(current)
new_nodes, new_edges = self._expand([current])
if new_nodes or new_edges:
# Check if we need to write back data to the database
if self.persist:
self._sync_to_pathway()
# call save to update the internal modified field
self.persist.save()
if new_nodes:
for edge in new_edges:
# All edge have `current` as educt
# Use `current` and adjust probs
current_prob = node_probs[current]
for prod in edge.products:
# Either is a new product or a product and we found a path with a higher prob
if (
prod not in node_probs
or current_prob * edge.probability > node_probs[prod]
):
node_probs[prod] = current_prob * edge.probability
# Update Queue to proceed
if self.prediction_setting.expansion_scheme == "DFS":
for n in new_nodes:
if n not in processed:
# We want to follow this path -> prepend queue
queue.insert(0, n)
elif self.prediction_setting.expansion_scheme == "BFS":
for n in new_nodes:
if n not in processed:
# Add at the end, everything queued before will be processed
# before new_nodese
queue.append(n)
elif self.prediction_setting.expansion_scheme == "GREEDY":
# Simply add them, as we will re-order the queue later
for n in new_nodes:
if n not in processed:
queue.append(n)
node_and_probs = []
for queued_val in queue:
node_and_probs.append((queued_val, node_probs[queued_val]))
# re-order the queue and only pick smiles
queue = [
n[0] for n in sorted(node_and_probs, key=lambda x: x[1], reverse=True)
]
else:
raise ValueError(
f"Unknown expansion schema: {self.prediction_setting.expansion_scheme}"
)
# Queue exhausted, we're done
self.done = True
def predict_step(self, from_depth: int = None, from_node: "Node" = None):
substrates: List[SNode] = []
@ -1547,67 +1835,15 @@ class SPathway(object):
if from_node == v:
substrates = [k]
break
else:
raise ValueError(f"Node {from_node} not found in SPathway!")
else:
raise ValueError("Neither from_depth nor from_node_url specified")
new_tp = False
if substrates:
for sub in substrates:
if sub.app_domain_assessment is None:
if self.prediction_setting.model:
if self.prediction_setting.model.app_domain:
app_domain_assessment = self.prediction_setting.model.app_domain.assess(
sub.smiles
)
if self.persist is not None:
n = self.snode_persist_lookup[sub]
assert n.id is not None, (
"Node has no id! Should have been saved already... aborting!"
)
node_data = n.simple_json()
node_data["image"] = f"{n.url}?image=svg"
app_domain_assessment["assessment"]["node"] = node_data
n.kv["app_domain_assessment"] = app_domain_assessment
n.save()
sub.app_domain_assessment = app_domain_assessment
candidates = self.prediction_setting.expand(self, sub)
# candidates is a List of PredictionResult. The length of the List is equal to the number of rules
for cand_set in candidates:
if cand_set:
new_tp = True
# cand_set is a PredictionResult object that can consist of multiple candidate reactions
for cand in cand_set:
cand_nodes = []
# candidate reactions can have multiple fragments
for c in cand:
if c not in self.smiles_to_node:
# For new nodes do an AppDomain Assessment if an AppDomain is attached
app_domain_assessment = None
if self.prediction_setting.model:
if self.prediction_setting.model.app_domain:
app_domain_assessment = (
self.prediction_setting.model.app_domain.assess(c)
)
self.smiles_to_node[c] = SNode(
c, sub.depth + 1, app_domain_assessment
)
node = self.smiles_to_node[c]
cand_nodes.append(node)
edge = SEdge(
sub,
cand_nodes,
rule=cand_set.rule,
probability=cand_set.probability,
)
self.edges.add(edge)
new_nodes, _ = self._expand(substrates)
new_tp = len(new_nodes) > 0
# In case no substrates are found, we're done.
# For "predict from node" we're always done
@ -1620,6 +1856,14 @@ class SPathway(object):
# call save to update the internal modified field
self.persist.save()
def get_edge_for_educt_smiles(self, smiles: str) -> List[SEdge]:
res = []
for e in self.edges:
for n in e.educts:
if n.smiles == smiles:
res.append(e)
return res
def _sync_to_pathway(self) -> None:
logger.info("Updating Pathway with SPathway")
@ -1683,11 +1927,6 @@ class SPathway(object):
"to": to_indices,
}
# if edge.rule:
# e['rule'] = {
# 'name': edge.rule.name,
# 'id': edge.rule.url,
# }
edges.append(e)
return {

View File

@ -8,7 +8,6 @@ from epdb.logic import UserManager, GroupManager, PackageManager, SettingManager
from epdb.models import (
UserSettingPermission,
MLRelativeReasoning,
EnviFormer,
Permission,
User,
ExternalDatabase,
@ -231,7 +230,6 @@ class Command(BaseCommand):
package=pack,
rule_packages=[mapping["EAWAG-BBD"]],
data_packages=[mapping["EAWAG-BBD"]],
eval_packages=[],
threshold=0.5,
name="ECC - BBD - T0.5",
description="ML Relative Reasoning",
@ -239,7 +237,3 @@ class Command(BaseCommand):
ml_model.build_dataset()
ml_model.build_model()
# If available, create EnviFormerModel
if s.ENVIFORMER_PRESENT:
EnviFormer.create(pack, "EnviFormer - T0.5", "EnviFormer Model with Threshold 0.5", 0.5)

View File

@ -0,0 +1,92 @@
from django.conf import settings as s
from django.contrib.auth import get_user_model
from django.core.management.base import BaseCommand, CommandError
from epdb.models import APIToken
class Command(BaseCommand):
help = "Create an API token for a user"
def add_arguments(self, parser):
parser.add_argument(
"--username",
required=True,
help="Username of the user who will own the token",
)
parser.add_argument(
"--name",
required=True,
help="Descriptive name for the token",
)
parser.add_argument(
"--expires-days",
type=int,
default=90,
help="Days until expiration (0 for no expiration)",
)
parser.add_argument(
"--inactive",
action="store_true",
help="Create the token as inactive",
)
parser.add_argument(
"--curl",
action="store_true",
help="Print a curl example using the token",
)
parser.add_argument(
"--base-url",
default=None,
help="Base URL for curl example (default SERVER_URL or http://localhost:8000)",
)
parser.add_argument(
"--endpoint",
default="/api/v1/compounds/",
help="Endpoint path for curl example",
)
def handle(self, *args, **options):
username = options["username"]
name = options["name"]
expires_days = options["expires_days"]
if expires_days < 0:
raise CommandError("--expires-days must be >= 0")
if expires_days == 0:
expires_days = None
user_model = get_user_model()
try:
user = user_model.objects.get(username=username)
except user_model.DoesNotExist as exc:
raise CommandError(f"User not found for username '{username}'") from exc
token, raw_token = APIToken.create_token(user, name=name, expires_days=expires_days)
if options["inactive"]:
token.is_active = False
token.save(update_fields=["is_active"])
self.stdout.write(f"User: {user.username} ({user.email})")
self.stdout.write(f"Token name: {token.name}")
self.stdout.write(f"Token id: {token.id}")
if token.expires_at:
self.stdout.write(f"Expires at: {token.expires_at.isoformat()}")
else:
self.stdout.write("Expires at: never")
self.stdout.write(f"Active: {token.is_active}")
self.stdout.write("Raw token:")
self.stdout.write(raw_token)
if options["curl"]:
base_url = (
options["base_url"] or getattr(s, "SERVER_URL", None) or "http://localhost:8000"
)
endpoint = options["endpoint"]
endpoint = endpoint if endpoint.startswith("/") else f"/{endpoint}"
url = f"{base_url.rstrip('/')}{endpoint}"
curl_cmd = f'curl -H "Authorization: Bearer {raw_token}" "{url}"'
self.stdout.write("Curl:")
self.stdout.write(curl_cmd)

View File

@ -2,7 +2,9 @@ from django.conf import settings as s
from django.core.management.base import BaseCommand
from django.db import transaction
from epdb.models import MLRelativeReasoning, EnviFormer, Package
from epdb.models import EnviFormer, MLRelativeReasoning
Package = s.GET_PACKAGE_MODEL()
class Command(BaseCommand):
@ -75,11 +77,13 @@ class Command(BaseCommand):
return packages
# Iteratively create models in options["model_names"]
print(f"Creating models: {options['model_names']}\n"
f"Data packages: {options['data_packages']}\n"
f"Rule Packages (only for MLRR): {options['rule_packages']}\n"
f"Eval Packages: {options['eval_packages']}\n"
f"Threshold: {options['threshold']:.2f}")
print(
f"Creating models: {options['model_names']}\n"
f"Data packages: {options['data_packages']}\n"
f"Rule Packages (only for MLRR): {options['rule_packages']}\n"
f"Eval Packages: {options['eval_packages']}\n"
f"Threshold: {options['threshold']:.2f}"
)
data_packages = decode_packages(options["data_packages"])
eval_packages = decode_packages(options["eval_packages"])
rule_packages = decode_packages(options["rule_packages"])
@ -89,22 +93,20 @@ class Command(BaseCommand):
model = EnviFormer.create(
pack,
data_packages=data_packages,
eval_packages=eval_packages,
threshold=options['threshold'],
threshold=options["threshold"],
name=f"EnviFormer - {', '.join(options['data_packages'])} - T{options['threshold']:.2f}",
description=f"EnviFormer transformer trained on {options['data_packages']} "
f"evaluated on {options['eval_packages']}.",
f"evaluated on {options['eval_packages']}.",
)
elif model_name == "mlrr":
model = MLRelativeReasoning.create(
package=pack,
rule_packages=rule_packages,
data_packages=data_packages,
eval_packages=eval_packages,
threshold=options['threshold'],
threshold=options["threshold"],
name=f"ECC - {', '.join(options['data_packages'])} - T{options['threshold']:.2f}",
description=f"ML Relative Reasoning trained on {options['data_packages']} with rules from "
f"{options['rule_packages']} and evaluated on {options['eval_packages']}.",
f"{options['rule_packages']} and evaluated on {options['eval_packages']}.",
)
else:
raise ValueError(f"Cannot create model of type {model_name}, unknown model type")

View File

@ -47,7 +47,7 @@ class Command(BaseCommand):
"description": model.description,
"kv": model.kv,
"data_packages_uuids": [str(p.uuid) for p in model.data_packages.all()],
"eval_packages_uuids": [str(p.uuid) for p in model.data_packages.all()],
"eval_packages_uuids": [str(p.uuid) for p in model.eval_packages.all()],
"threshold": model.threshold,
"eval_results": model.eval_results,
"multigen_eval": model.multigen_eval,

View File

@ -8,7 +8,9 @@ from django.conf import settings as s
from django.core.management.base import BaseCommand
from django.db import transaction
from epdb.models import EnviFormer, Package
from epdb.models import EnviFormer
Package = s.GET_PACKAGE_MODEL()
class Command(BaseCommand):

View File

@ -1,8 +1,8 @@
from django.apps import apps
from django.conf import settings as s
from django.core.management.base import BaseCommand
from django.db.models import F, Value, TextField, JSONField
from django.db.models.functions import Replace, Cast
from django.db.models import F, JSONField, TextField, Value
from django.db.models.functions import Cast, Replace
from epdb.models import EnviPathModel
@ -23,10 +23,12 @@ class Command(BaseCommand):
)
def handle(self, *args, **options):
Package = s.GET_PACKAGE_MODEL()
Package.objects.update(url=Replace(F("url"), Value(options["old"]), Value(options["new"])))
MODELS = [
"User",
"Group",
"Package",
"Compound",
"CompoundStructure",
"Pathway",
@ -39,15 +41,12 @@ class Command(BaseCommand):
"SequentialRule",
"Scenario",
"Setting",
"MLRelativeReasoning",
"RuleBasedRelativeReasoning",
"EnviFormer",
"EPModel",
"ApplicabilityDomain",
"EnzymeLink",
]
for model in MODELS:
obj_cls = apps.get_model("epdb", model)
print(f"Localizing urls for {model}")
obj_cls.objects.update(
url=Replace(F("url"), Value(options["old"]), Value(options["new"]))
)

View File

@ -0,0 +1,83 @@
import os
import subprocess
from django.conf import settings
from django.core.management import call_command
from django.core.management.base import BaseCommand
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument(
"-n",
"--name",
type=str,
help="Name of the database to recreate. Default is 'appdb'",
default="appdb",
)
parser.add_argument(
"-d",
"--dump",
type=str,
help="Path to the dump file",
default="./fixtures/db.dump",
)
parser.add_argument(
"-ou",
"--oldurl",
type=str,
help="Old URL, e.g. https://envipath.org/",
default="https://envipath.org/",
)
parser.add_argument(
"-nu",
"--newurl",
type=str,
help="New URL, e.g. http://localhost:8000/",
default="http://localhost:8000/",
)
def handle(self, *args, **options):
dump_file = options["dump"]
if not os.path.exists(dump_file):
raise ValueError(f"Dump file {dump_file} does not exist")
db_name = options["name"]
print(f"Dropping database {db_name} y/n: ", end="")
if input() in "yY":
result = subprocess.run(
["dropdb", db_name],
capture_output=True,
text=True,
)
print(result.stdout)
else:
raise ValueError("Aborted")
print(f"Creating database {db_name}")
result = subprocess.run(
["createdb", db_name],
capture_output=True,
text=True,
)
print(result.stdout)
print(f"Restoring database {db_name} from {dump_file}")
result = subprocess.run(
["pg_restore", "-d", db_name, dump_file, "--no-owner"],
capture_output=True,
text=True,
)
print(result.stdout)
if db_name == settings.DATABASES["default"]["NAME"]:
call_command("localize_urls", "--old", options["oldurl"], "--new", options["newurl"])
else:
print("Skipping localize_urls as database is not the default one.")

View File

@ -1,128 +0,0 @@
# Generated by Django 5.2.1 on 2025-08-25 18:07
import django.db.models.deletion
import django.utils.timezone
import model_utils.fields
import uuid
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('contenttypes', '0002_remove_content_type_name'),
('epdb', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='ExternalDatabase',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, editable=False, verbose_name='created')),
('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, editable=False, verbose_name='modified')),
('uuid', models.UUIDField(default=uuid.uuid4, editable=False, unique=True)),
('name', models.CharField(max_length=100, unique=True, verbose_name='Database Name')),
('full_name', models.CharField(blank=True, max_length=255, verbose_name='Full Database Name')),
('description', models.TextField(blank=True, verbose_name='Description')),
('base_url', models.URLField(blank=True, null=True, verbose_name='Base URL')),
('url_pattern', models.CharField(blank=True, help_text="URL pattern with {id} placeholder, e.g., 'https://pubchem.ncbi.nlm.nih.gov/compound/{id}'", max_length=500, verbose_name='URL Pattern')),
('is_active', models.BooleanField(default=True, verbose_name='Is Active')),
],
options={
'verbose_name': 'External Database',
'verbose_name_plural': 'External Databases',
'db_table': 'epdb_external_database',
'ordering': ['name'],
},
),
migrations.AlterModelOptions(
name='apitoken',
options={'ordering': ['-created'], 'verbose_name': 'API Token', 'verbose_name_plural': 'API Tokens'},
),
migrations.AlterModelOptions(
name='edge',
options={},
),
migrations.RemoveField(
model_name='edge',
name='polymorphic_ctype',
),
migrations.AddField(
model_name='apitoken',
name='is_active',
field=models.BooleanField(default=True, help_text='Whether this token is active'),
),
migrations.AddField(
model_name='apitoken',
name='modified',
field=model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, editable=False, verbose_name='modified'),
),
migrations.AddField(
model_name='applicabilitydomain',
name='functional_groups',
field=models.JSONField(blank=True, default=dict, null=True),
),
migrations.AddField(
model_name='mlrelativereasoning',
name='app_domain',
field=models.ForeignKey(blank=True, default=None, null=True, on_delete=django.db.models.deletion.SET_NULL, to='epdb.applicabilitydomain'),
),
migrations.AlterField(
model_name='apitoken',
name='created',
field=model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, editable=False, verbose_name='created'),
),
migrations.AlterField(
model_name='apitoken',
name='expires_at',
field=models.DateTimeField(blank=True, help_text='Token expiration time (null for no expiration)', null=True),
),
migrations.AlterField(
model_name='apitoken',
name='hashed_key',
field=models.CharField(help_text='SHA-256 hash of the token key', max_length=128, unique=True),
),
migrations.AlterField(
model_name='apitoken',
name='name',
field=models.CharField(help_text='Descriptive name for this token', max_length=100),
),
migrations.AlterField(
model_name='apitoken',
name='user',
field=models.ForeignKey(help_text='User who owns this token', on_delete=django.db.models.deletion.CASCADE, related_name='api_tokens', to=settings.AUTH_USER_MODEL),
),
migrations.AlterField(
model_name='applicabilitydomain',
name='num_neighbours',
field=models.IntegerField(default=5),
),
migrations.AlterModelTable(
name='apitoken',
table='epdb_api_token',
),
migrations.CreateModel(
name='ExternalIdentifier',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, editable=False, verbose_name='created')),
('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, editable=False, verbose_name='modified')),
('uuid', models.UUIDField(default=uuid.uuid4, editable=False, unique=True)),
('object_id', models.IntegerField()),
('identifier_value', models.CharField(max_length=255, verbose_name='Identifier Value')),
('url', models.URLField(blank=True, null=True, verbose_name='Direct URL')),
('is_primary', models.BooleanField(default=False, help_text='Mark this as the primary identifier for this database', verbose_name='Is Primary')),
('content_type', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='contenttypes.contenttype')),
('database', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='epdb.externaldatabase', verbose_name='External Database')),
],
options={
'verbose_name': 'External Identifier',
'verbose_name_plural': 'External Identifiers',
'db_table': 'epdb_external_identifier',
'indexes': [models.Index(fields=['content_type', 'object_id'], name='epdb_extern_content_b76813_idx'), models.Index(fields=['database', 'identifier_value'], name='epdb_extern_databas_486422_idx')],
'unique_together': {('content_type', 'object_id', 'database', 'identifier_value')},
},
),
]

Some files were not shown because too many files have changed in this diff Show More