"""Constants and abstract classes."""
import copy
import logging
import operator
from ..constants import ALL_PERMISSIONS
from ..utils.decorators import assert_object_exists
from .permissions import PermissionsManager
from .utils import parse_resolwe_datetime
[docs]class BaseResource:
"""Abstract resource.
One and only one of the identifiers (slug, id or model_data)
should be given.
:param resolwe: Resolwe instance
:type resolwe: Resolwe object
:param model_data: Resource model data
"""
endpoint = None
query_endpoint = None
query_method = "GET"
full_search_paramater = None
delete_warning_single = "Do you really want to delete {}?[yN]"
delete_warning_bulk = "Do you really want to delete {} objects?[yN]"
READ_ONLY_FIELDS = ("id",)
UPDATE_PROTECTED_FIELDS = ()
WRITABLE_FIELDS = ()
all_permissions = [] # override this in subclass
def __init__(self, resolwe, **model_data):
"""Initialize attributes."""
self._original_values = {}
self.api = operator.attrgetter(self.endpoint)(resolwe.api)
self.resolwe = resolwe
self.logger = logging.getLogger(__name__)
#: unique identifier of an object
self.id = None
if model_data:
self._update_fields(model_data)
[docs] @classmethod
def fetch_object(cls, resolwe, id=None, slug=None):
"""Return resource instance that is uniquely defined by identifier."""
if (id is None and slug is None) or (id and slug):
raise ValueError("One and only one of id or slug must be given")
query = resolwe.get_query_by_resource(cls)
if id:
return query.get(id=id)
return query.get(slug=slug)
[docs] def fields(self):
"""Resource fields."""
return (
self.READ_ONLY_FIELDS + self.UPDATE_PROTECTED_FIELDS + self.WRITABLE_FIELDS
)
def _update_fields(self, payload):
"""Update fields of the local resource based on the server values."""
self._original_values = copy.deepcopy(payload)
for field_name in self.fields():
setattr(self, field_name, payload.get(field_name, None))
[docs] def update(self):
"""Update resource fields from the server."""
response = self.api(self.id).get()
self._update_fields(response)
def _dehydrate_resources(self, obj):
"""Iterate through object and replace all objects with their ids."""
# Prevent circular imports:
from .descriptor import DescriptorSchema
from .process import Process
if isinstance(obj, DescriptorSchema) or isinstance(obj, Process):
# Slug can only be given at create requests (id not present yet)
if not self.id:
return {"slug": obj.slug}
return {"id": obj.id}
if isinstance(obj, BaseResource):
return {"id": obj.id}
if isinstance(obj, list):
return [self._dehydrate_resources(element) for element in obj]
if isinstance(obj, dict):
return {key: self._dehydrate_resources(value) for key, value in obj.items()}
return obj
[docs] def save(self):
"""Save resource to the server."""
def field_changed(field_name):
"""Check if local field value is different from the server."""
current_value = getattr(self, field_name, None)
original_value = self._original_values.get(field_name, None)
if isinstance(current_value, BaseResource) and original_value:
# TODO: Check that current and original are instances of the same resource class
return current_value.id != original_value.get("id", None)
else:
return current_value != original_value
def assert_fields_unchanged(field_names):
"""Assert that fields in ``field_names`` were not changed."""
changed_fields = [name for name in field_names if field_changed(name)]
if changed_fields:
msg = "Not allowed to change read only fields {}".format(
", ".join(changed_fields)
)
raise ValueError(msg)
if self.id: # update resource
assert_fields_unchanged(
self.READ_ONLY_FIELDS + self.UPDATE_PROTECTED_FIELDS
)
payload = {}
for field_name in self.WRITABLE_FIELDS:
if field_changed(field_name):
payload[field_name] = self._dehydrate_resources(
getattr(self, field_name)
)
if "sample" in payload:
payload["entity"] = payload.pop("sample")
if payload:
response = self.api(self.id).patch(payload)
self._update_fields(response)
else: # create resource
assert_fields_unchanged(self.READ_ONLY_FIELDS)
field_names = self.WRITABLE_FIELDS + self.UPDATE_PROTECTED_FIELDS
payload = {
field_name: self._dehydrate_resources(getattr(self, field_name))
for field_name in field_names
if getattr(self, field_name) is not None
}
if "sample" in payload:
payload["entity"] = payload.pop("sample")
from .annotations import AnnotationValue
# Annotation models have primarykey serializer.
if isinstance(self, AnnotationValue):
payload["field"] = payload["field"]["id"]
payload["entity"] = payload["entity"]["id"]
response = self.api.post(payload)
self._update_fields(response)
[docs] def delete(self, force=False):
"""Delete the resource object from the server.
:param bool force: Do not trigger confirmation prompt. WARNING: Be
sure that you really know what you are doing as deleted objects
are not recoverable.
"""
if force is not True:
user_input = input(self.delete_warning_single.format(self))
if user_input.strip().lower() != "y":
return
response = self.api(self.id).delete()
# This could either be True or a background task data.
if response is not True:
# Resolve circular import
from .background_task import BackgroundTask
BackgroundTask(resolwe=self.resolwe, **response).wait()
return True
def __setattr__(self, name, value):
"""Detect changes of read only fields.
This method detects changes of scalar fields and references. A
more comprehensive check is called before save.
"""
if (
hasattr(self, "_original_values")
and name in self._original_values
and name in self.READ_ONLY_FIELDS
and value != self._original_values[name]
):
raise ValueError("Can not change read only field {}".format(name))
super().__setattr__(name, value)
def __eq__(self, obj):
"""Evaluate if objects are the same."""
if (
self.__class__ == obj.__class__
and self.resolwe.url == obj.resolwe.url
and self.id == obj.id
):
return True
else:
return False
def _resource_setter(self, payload, resource, field):
"""Set ``resource`` with ``payload`` on ``field``."""
if isinstance(payload, resource):
setattr(self, field, payload)
elif isinstance(payload, dict):
setattr(self, field, resource(resolwe=self.resolwe, **payload))
elif isinstance(payload, int):
setattr(self, field, resource.fetch_object(self.resolwe, id=payload))
elif isinstance(payload, str):
setattr(self, field, resource.fetch_object(self.resolwe, slug=payload))
else:
setattr(self, field, payload)
[docs]class BaseResolweResource(BaseResource):
"""Base class for Resolwe resources.
One and only one of the identifiers (slug, id or model_data)
should be given.
:param resolwe: Resolwe instance
:type resolwe: Resolwe object
:param model_data: Resource model data
"""
_permissions = None
READ_ONLY_FIELDS = BaseResource.READ_ONLY_FIELDS + (
"current_user_permissions",
"id",
"version",
)
WRITABLE_FIELDS = BaseResource.WRITABLE_FIELDS + (
"name",
"slug",
)
all_permissions = ALL_PERMISSIONS
def __init__(self, resolwe, **model_data):
"""Initialize attributes."""
self.logger = logging.getLogger(__name__)
#: User object of the contributor (lazy loaded)
self._contributor = None
#: current user permissions
self.current_user_permissions = None
#: name of resource
self.name = None
#: human-readable unique identifier
self.slug = None
#: resource version
self.version = None
BaseResource.__init__(self, resolwe, **model_data)
@property
@assert_object_exists
def permissions(self):
"""Permissions."""
if not self._permissions:
self._permissions = PermissionsManager(
self.all_permissions, self.api(self.id), self.resolwe
)
return self._permissions
@property
@assert_object_exists
def contributor(self):
"""Contributor."""
if self._contributor is None:
contributor_data = self._original_values.get("contributor", {})
try:
self._contributor = self.resolwe.user.get(id=contributor_data.get("id"))
except LookupError:
from . import User
# Normal user has only access to his user instance on user
# endpoint. Instead of returning None for all other
# contributors, data that is received in response is used to
# populate User resource.
self._contributor = User(
self.resolwe,
id=contributor_data.get("id"),
username=contributor_data.get("username"),
first_name=contributor_data.get("first_name"),
last_name=contributor_data.get("last_name"),
)
return self._contributor
@property
@assert_object_exists
def created(self):
"""Creation time."""
return parse_resolwe_datetime(self._original_values["created"])
@property
@assert_object_exists
def modified(self):
"""Modification time."""
return parse_resolwe_datetime(self._original_values["modified"])
[docs] def update(self):
"""Clear permissions cache and update the object."""
self.permissions.clear_cache()
super().update()
def __repr__(self):
"""Format resource name."""
return "{} <id: {}, slug: '{}', name: '{}'>".format(
self.__class__.__name__, self.id, self.slug, self.name
)