-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathpredictor.py
612 lines (500 loc) · 26 KB
/
predictor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
"""Resources that represent collections of predictors."""
from functools import partial
from typing import Any, Iterable, Optional, Union, List
from uuid import UUID
from gemd.enumeration.base_enumeration import BaseEnumeration
from citrine._rest.collection import Collection
from citrine._rest.resource import Resource
from citrine._rest.paginator import Paginator
from citrine._serialization import properties
from citrine._session import Session
from citrine.informatics.data_sources import DataSource
from citrine.informatics.design_candidate import HierarchicalDesignMaterial
from citrine.informatics.predictors import GraphPredictor
from citrine.resources.status_detail import StatusDetail
# Refers to the most recently edited prediction version. Could be a draft.
MOST_RECENT_VER = "most_recent"
LATEST_VER = "latest" # Refers to the highest saved predictor version.
class AsyncDefaultPredictor(Resource["AsyncDefaultPredictor"]):
"""Return type for async default predictor generation and retrieval."""
uid = properties.UUID('id', serializable=False)
""":UUID: Citrine Platform unique identifier for this task."""
predictor = properties.Optional(properties.Object(GraphPredictor), 'data', serializable=False)
""":Optional[GraphPredictor]:"""
status = properties.String('metadata.status', serializable=False)
""":str: short description of the resource's status"""
status_detail = properties.List(properties.Object(StatusDetail), 'metadata.status_detail',
default=[], serializable=False)
""":List[StatusDetail]: a list of structured status info, containing the message and level"""
@classmethod
def _pre_build(cls, data: dict) -> dict:
"""Build an instance of this object from given data."""
if data.get("data"):
data["data"] = GraphPredictor.wrap_instance(data["data"]["instance"])
return data
class AutoConfigureMode(BaseEnumeration):
"""The format to use in building auto-configured assets.
* PLAIN corresponds to a single-row GEM table and plain predictor
* FORMULATION corresponds to a multi-row GEM table and formulations predictor
* INFER auto-detects the GEM table and predictor type
"""
PLAIN = 'PLAIN'
FORMULATION = 'FORMULATION'
INFER = 'INFER'
class _PredictorVersionPaginator(Paginator):
def _comparison_fields(self, entity: GraphPredictor) -> Any:
return (entity.uid, entity.version)
def paginate(self, *args, **kwargs) -> Iterable[GraphPredictor]:
# Since predictor versions have the same uid, and the paginate method uses uid alone to
# dedup, we have to disable deduplication in order to use it.
kwargs["deduplicate"] = False
return super().paginate(*args, **kwargs)
class _PredictorVersionCollection(Collection[GraphPredictor]):
_api_version = 'v3'
_path_template = '/projects/{project_id}/predictors/{uid}/versions'
_individual_key = None
_resource = GraphPredictor
_collection_key = 'response'
_paginator: Paginator = _PredictorVersionPaginator()
_SPECIAL_VERSIONS = [LATEST_VER, MOST_RECENT_VER]
def __init__(self, project_id: UUID, session: Session):
self.project_id = project_id
self.session: Session = session
def _construct_path(self,
uid: Union[UUID, str],
version: Optional[Union[int, str]] = None,
action: str = None) -> str:
path = self._path_template.format(project_id=self.project_id, uid=str(uid))
if version is not None:
version_str = str(version)
if version_str not in self._SPECIAL_VERSIONS \
and (not version_str.isdecimal() or int(version_str) <= 0):
raise ValueError("A predictor version must either be a positive integer, "
f"\"{LATEST_VER}\", or \"{MOST_RECENT_VER}\".")
path += f"/{version_str}"
path += f"/{action}" if action else ""
return path
def _page_fetcher(self, *, uid: Union[UUID, str], **additional_params):
fetcher_params = {
"path": self._construct_path(uid),
"additional_params": additional_params
}
return partial(self._fetch_page, **fetcher_params)
def build(self, data: dict) -> GraphPredictor:
"""Build an individual Predictor."""
predictor: GraphPredictor = GraphPredictor.build(data)
predictor._session = self.session
predictor._project_id = self.project_id
return predictor
def get(self,
uid: Union[UUID, str],
*,
version: Union[int, str] = MOST_RECENT_VER) -> GraphPredictor:
path = self._construct_path(uid, version)
entity = self.session.get_resource(path, version=self._api_version)
return self.build(entity)
def get_featurized_training_data(
self,
uid: Union[UUID, str],
*,
version: Union[int, str] = MOST_RECENT_VER
) -> List[HierarchicalDesignMaterial]:
version_path = self._construct_path(uid, version)
full_path = f"{version_path}/featurized-training-data"
payload = self.session.get_resource(full_path, version=self._api_version)
return [HierarchicalDesignMaterial.build(x) for x in payload]
def list(self,
uid: Union[UUID, str],
*,
per_page: int = 100) -> Iterable[GraphPredictor]:
"""List non-archived versions of the given predictor."""
page_fetcher = self._page_fetcher(uid=uid)
return self._paginator.paginate(page_fetcher=page_fetcher,
collection_builder=self._build_collection_elements,
per_page=per_page)
def list_archived(self,
uid: Union[UUID, str],
*,
per_page: int = 20) -> Iterable[GraphPredictor]:
"""List archived versions of the given predictor."""
page_fetcher = self._page_fetcher(uid=uid, filter="archived eq 'true'")
return self._paginator.paginate(page_fetcher=page_fetcher,
collection_builder=self._build_collection_elements,
per_page=per_page)
def archive(self,
uid: Union[UUID, str],
*,
version: Union[int, str] = MOST_RECENT_VER) -> GraphPredictor:
url = self._construct_path(uid, version, "archive")
entity = self.session.put_resource(url, {}, version=self._api_version)
return self.build(entity)
def restore(self,
uid: Union[UUID, str],
*,
version: Union[int, str] = MOST_RECENT_VER) -> GraphPredictor:
url = self._construct_path(uid, version, "restore")
entity = self.session.put_resource(url, {}, version=self._api_version)
return self.build(entity)
def is_stale(self,
uid: Union[UUID, str],
*,
version: Union[int, str] = MOST_RECENT_VER) -> bool:
path = self._construct_path(uid, version, "is-stale")
response = self.session.get_resource(path, version=self._api_version)
return response["is_stale"]
def retrain_stale(self,
uid: Union[UUID, str],
*,
version: Union[int, str] = MOST_RECENT_VER) -> GraphPredictor:
path = self._construct_path(uid, version, "retrain-stale")
entity = self.session.put_resource(path, {}, version=self._api_version)
return self.build(entity)
def rename(self,
uid: Union[UUID, str],
*,
version: Union[int, str],
name: Optional[str] = None,
description: Optional[str] = None
) -> GraphPredictor:
path = self._construct_path(uid, version, "rename")
json = {"name": name, "description": description}
entity = self.session.put_resource(path, json, version=self._api_version)
return self.build(entity)
def generate_feature_effects(self,
uid: Union[UUID, str],
*,
version: Union[int, str] = MOST_RECENT_VER) -> GraphPredictor:
path = self._construct_path(uid, version, "shapley/generate")
self.session.put_resource(path, {}, version=self._api_version)
return self.get(uid, version=version)
def delete(self, uid: Union[UUID, str], *, version: Union[int, str] = MOST_RECENT_VER):
"""Predictor versions cannot be deleted at this time."""
msg = "Predictor versions cannot be deleted. Use 'archive_version' instead."
raise NotImplementedError(msg)
class PredictorCollection(Collection[GraphPredictor]):
"""Represents the collection of all predictors for a project.
Parameters
----------
project_id: UUID
the UUID of the project
"""
_api_version = 'v3'
_path_template = '/projects/{project_id}/predictors'
_individual_key = None
_resource = GraphPredictor
_collection_key = 'response'
def __init__(self, project_id: UUID, session: Session):
self.project_id = project_id
self.session: Session = session
self._versions_collection = _PredictorVersionCollection(project_id, session)
def build(self, data: dict) -> GraphPredictor:
"""Build an individual Predictor."""
predictor: GraphPredictor = GraphPredictor.build(data)
predictor._session = self.session
predictor._project_id = self.project_id
return predictor
def get(self,
uid: Union[UUID, str],
*,
version: Union[int, str] = MOST_RECENT_VER) -> GraphPredictor:
"""Get a predictor by ID and (optionally) version.
If version is omitted, the most recent version will be retrieved.
"""
if uid is None:
raise ValueError("Cannot get when uid=None. Are you using a registered resource?")
return self._versions_collection.get(uid=uid, version=version)
def get_featurized_training_data(
self,
uid: Union[UUID, str],
*,
version: Union[int, str] = MOST_RECENT_VER
) -> List[HierarchicalDesignMaterial]:
"""Retrieve a list of featurized materials for a trained predictor.
Featurized materials contain the input variables found in the training data source
along with any internal features generated by the predictor while training.
If not available, retraining the predictor will generate new featurized data.
Parameters
----------
uid: UUID
the UUID of the predictor
version: str
the version of the predictor (if omitted, the most recent will be used)
Returns
-------
A list of featurized materials, formatted as design materials
"""
return self._versions_collection.get_featurized_training_data(uid=uid, version=version)
def register(self, predictor: GraphPredictor, *, train: bool = True) -> GraphPredictor:
"""Register and optionally train a Predictor.
This predctor will be version 1, and its `draft` flag will be `True`. If train is True and
training completes successfully, the `draft` flag will be set to `False`. Otherwise, it
will remain `True`.
"""
created_predictor = super().register(predictor)
if not train or created_predictor.failed():
return created_predictor
else:
return self.train(created_predictor.uid)
def update(self, predictor: GraphPredictor, *, train: bool = True) -> GraphPredictor:
"""Update and optionally train a Predictor.
If the predictor is a draft, this will overwrite its contents. If it's not a draft, a new
version will be created with the update.
In either case, training will begin after the update if train is `True`. And if training
completes successfully, the Predictor will no longer be a draft.
"""
updated_predictor = super().update(predictor)
if not train or updated_predictor.failed():
return updated_predictor
else:
return self.train(updated_predictor.uid)
def train(self, uid: Union[UUID, str]) -> GraphPredictor:
"""Train a predictor.
If the predictor is not a draft, a new version will be created which is a copy of the
current predictor version as a draft, which will be trained. Either way, if training
completes successfully, the Predictor will no longer be a draft.
"""
path = self._get_path(uid, action="train")
params = {"create_version": True}
entity = self.session.put_resource(path, {}, params=params, version=self._api_version)
return self.build(entity)
def archive_version(
self,
uid: Union[UUID, str],
*,
version: Union[int, str]
) -> GraphPredictor:
"""Archive a predictor version."""
return self._versions_collection.archive(uid, version=version)
def restore_version(
self,
uid: Union[UUID, str],
*,
version: Union[int, str]
) -> GraphPredictor:
"""Restore a predictor version."""
return self._versions_collection.restore(uid, version=version)
def archive_root(self, uid: Union[UUID, str]):
"""Archive a root predictor.
uid: Union[UUID, str]
Unique identifier of the predictor to archive.
"""
path = self._get_path(uid=uid, action="archive")
self.session.put_resource(path, {}, version=self._api_version)
def restore_root(self, uid: Union[UUID, str]):
"""Restore an archived root predictor.
uid: Union[UUID, str]
Unique identifier of the predictor to restore.
"""
path = self._get_path(uid, action="restore")
self.session.put_resource(path, {}, version=self._api_version)
def root_is_archived(self, uid: Union[UUID, str]) -> bool:
"""Determine if the predictor root is archived.
uid: Union[UUID, str]
Unique identifier of the predictor to check.
"""
uid = str(uid)
return any(uid == str(archived_pred.uid) for archived_pred in self.list_archived())
def archive(self, uid: Union[UUID, str]):
"""[UNSUPPORTED] Use archive_root or archive_version instead."""
raise NotImplementedError("The archive() method is no longer supported. You most likely "
"want archive_root(), or possibly archive_version().")
def restore(self, uid: Union[UUID, str]):
"""[UNSUPPORTED] Use restore_root or restore_version instead."""
raise NotImplementedError("The restore() method is no longer supported. You most likely "
"want restore_root(), or possibly restore_version().")
def _list_base(self, *, per_page: int = 100, archived: Optional[bool] = None):
filters = {}
if archived is not None:
filters["archived"] = archived
fetcher = partial(self._fetch_page,
additional_params=filters,
version="v4")
return self._paginator.paginate(page_fetcher=fetcher,
collection_builder=self._build_collection_elements,
per_page=per_page)
def list_all(self, *, per_page: int = 20) -> Iterable[GraphPredictor]:
"""List the most recent version of all predictors."""
return self._list_base(per_page=per_page)
def list(self, *, per_page: int = 20) -> Iterable[GraphPredictor]:
"""List the most recent version of all non-archived predictors."""
return self._list_base(per_page=per_page, archived=False)
def list_archived(self, *, per_page: int = 20) -> Iterable[GraphPredictor]:
"""List the most recent version of all archived predictors."""
return self._list_base(per_page=per_page, archived=True)
def list_versions(self,
uid: Union[UUID, str] = None,
*,
per_page: int = 100) -> Iterable[GraphPredictor]:
"""List all non-archived versions of the given Predictor."""
return self._versions_collection.list(uid, per_page=per_page)
def list_archived_versions(self,
uid: Union[UUID, str] = None,
*,
per_page: int = 20) -> Iterable[GraphPredictor]:
"""List all archived versions of the given Predictor."""
return self._versions_collection.list_archived(uid, per_page=per_page)
def check_for_update(self, uid: Union[UUID, str]) -> Optional[GraphPredictor]:
"""
Check if there are updates available for a predictor.
Typically these are updates to the training data. For example, a GEM Table may have
been re-built to include additional rows.
This check does not update the predictor; it just returns the update that is available.
To perform the update, the response should then be used to call PredictorCollection.update
Parameters
----------
uid: Union[UUID, str]
Unique identifier of the predictor to check
Returns
-------
Optional[Predictor]
The update, if an update is available; None otherwise.
"""
path = self._get_path(uid, action="update-check")
update_data = self.session.get_resource(path, version=self._api_version)
if update_data["updatable"]:
built = GraphPredictor.build(update_data)
built.uid = uid
return built
else:
return None
def create_default(self,
*,
training_data: DataSource,
pattern: Union[str, AutoConfigureMode] = AutoConfigureMode.INFER,
prefer_valid: bool = True) -> GraphPredictor:
"""Create a default predictor for some training data.
This method will return an unregistered predictor generated by inspecting the
training data and attempting to automatically configure the predictor.
The configuration generated while using the `AutoConfigureMode.SIMPLE` pattern
includes featurizers for chemical formulas/molecular structures,
and `AutoMLPredictor`s for any variables identified as responses in the training data.
The configuration generated while using the `AutoConfigureMode.FORMULATION` pattern
includes these same components, as well as a `SimpleMixturePredictor`,
`LabelFractionsPredictor`, `IngredientFractionsPredictor`, and a series of
`MeanPropertyPredictor`s to handle featurization of formulation quantities
and ingredient properties.
The `AutoConfigureMode.INFER` pattern chooses an appropriate mode based on whether
the data source contains formulations data or not.
Parameters
----------
training_data: DataSource
The data to configure the predictor to model.
pattern: AutoConfigureMode or str
The predictor pattern to use, either "PLAIN", "FORMULATION", or "INFER".
The "INFER" pattern auto-detects whether the `DataSource` contains formulations
data or not.
If it does, then a formulation predictor is created.
If not, then a plain predictor is created.
prefer_valid: Boolean
If True, enables filtering of sparse descriptors and trimming of
excess graph components in attempt to return a default configuration
that will pass validation.
Default: True.
Returns
-------
GraphPredictor
Automatically configured predictor for the training data
"""
payload = PredictorCollection._create_default_payload(training_data, pattern, prefer_valid)
path = self._get_path(action="default")
data = self.session.post_resource(path, json=payload, version=self._api_version)
return self.build(GraphPredictor.wrap_instance(data["instance"]))
def create_default_async(self,
*,
training_data: DataSource,
pattern: Union[str, AutoConfigureMode] = AutoConfigureMode.INFER,
prefer_valid: bool = True) -> AsyncDefaultPredictor:
"""Similar to PredictorCollection.create_default, except asynchronous.
This begins a long-running task to generate the predictor. The returned object contains an
ID which can be used to track its status and get the resulting predictor once complete.
PredictorCollection.get_default_async is intended for that purpose.
See PredictorCollection.create_default for more details on the generation process and
parameter specifics.
Parameters
----------
training_data: DataSource
The data to configure the predictor to model.
pattern: AutoConfigureMode or str
The predictor pattern to use, either "PLAIN", "FORMULATION", or "INFER".
The "INFER" pattern auto-detects whether the `DataSource` contains formulations
data or not.
If it does, then a formulation predictor is created.
If not, then a plain predictor is created.
prefer_valid: Boolean
If True, enables filtering of sparse descriptors and trimming of
excess graph components in attempt to return a default configuration
that will pass validation.
Default: True.
Returns
-------
AsyncDefaultPredictor
Information on the long-running default predictor generation task.
"""
payload = PredictorCollection._create_default_payload(training_data, pattern, prefer_valid)
path = self._get_path(action="default-async")
data = self.session.post_resource(path, json=payload, version=self._api_version)
return AsyncDefaultPredictor.build(data)
@staticmethod
def _create_default_payload(training_data: DataSource,
pattern: Union[str, AutoConfigureMode] = AutoConfigureMode.INFER,
prefer_valid: bool = True) -> dict:
# Continue handling string pattern inputs
pattern = AutoConfigureMode.from_str(pattern, exception=True)
return {"data_source": training_data.dump(), "pattern": pattern,
"prefer_valid": prefer_valid}
def get_default_async(self, *, task_id: Union[UUID, str]) -> AsyncDefaultPredictor:
"""Get the current async default predictor generation result.
The status field will indicate if it's INPROGRESS, SUCCEEDED, or FAILED. While INPROGRESS,
the predictor will also be None. Once it's SUCCEEDED, it will be populated with a
GraphPredictor, which can then be registered to the platform. If it's FAILED, look to the
status_detail field for more information on what went wrong.
"""
path = self._get_path(action=["default-async", task_id])
data = self.session.get_resource(path, version=self._api_version)
return AsyncDefaultPredictor.build(data)
def is_stale(self, uid: Union[UUID, str], *, version: Union[int, str]) -> bool:
"""Returns True if a predictor is stale, False otherwise.
A predictor is stale if it's in the READY state, but the platform cannot load the
previously trained object.
"""
return self._versions_collection.is_stale(uid, version=version)
def retrain_stale(self, uid: Union[UUID, str], *, version: Union[int, str]) -> GraphPredictor:
"""Begins retraining a stale predictor.
This can only be used on a stale predictor, which is when it's in the READY state, but the
platform cannot load the previously trained object. Using it on a non-stale predictor will
result in an error.
"""
return self._versions_collection.retrain_stale(uid, version=version)
def rename(self,
uid: Union[UUID, str],
*,
version: Union[int, str],
name: Optional[str] = None,
description: Optional[str] = None) -> GraphPredictor:
"""Rename an existing predictor.
Both the name and description can be changed. This does not trigger retraining.
Any existing version of the predictor can be renamed, or "most_recent".
"""
return self._versions_collection.rename(
uid, version=version, name=name, description=description
)
def generate_feature_effects_async(self,
uid: Union[UUID, str],
*,
version: Union[int, str]) -> GraphPredictor:
"""Begin generation of feature effects.
version can be any numerical version (which exists), "latest", or "most_recent". Although
note that this will fail if the predictor is not already trained.
Feature effects are automatically generated for all new predictors after a successful
training as of the end of 2024. This call allows either regenerating those values, or
generating them for older predictors.
This call just begins the process; generation usually takes a few minutes, but can take
much longer. As soon as the call completes, the old values will be inaccessible. To wait
for the generation to complete, and to retrieve the new values once they're ready, use
GraphPredictor.feature_effects.
"""
return self._versions_collection.generate_feature_effects(uid, version=version)
def delete(self, uid: Union[UUID, str]):
"""Predictors cannot be deleted at this time."""
msg = "Predictors cannot be deleted. Use 'archive_version' or 'archive_root' instead."
raise NotImplementedError(msg)