"""Sample resource."""
import logging
from typing import TYPE_CHECKING, Any, Dict, Optional
from resdk.exceptions import ResolweServerError
from resdk.shortcuts.sample import SampleUtilsMixin
from ..utils.decorators import assert_object_exists
from .background_task import BackgroundTask
from .collection import BaseCollection, Collection
if TYPE_CHECKING:
from .annotations import AnnotationValue
[docs]class Sample(SampleUtilsMixin, BaseCollection):
"""Resolwe Sample resource.
:param resolwe: Resolwe instance
:type resolwe: Resolwe object
:param model_data: Resource model data
"""
endpoint = "sample"
WRITABLE_FIELDS = BaseCollection.WRITABLE_FIELDS + ("collection",)
def __init__(self, resolwe, **model_data):
"""Initialize attributes."""
self.logger = logging.getLogger(__name__)
#: ``Collection``s that contains the ``Sample`` (lazy loaded)
self._collection = None
#: list of ``Relation`` objects in ``Collection`` (lazy loaded)
self._relations = None
#: background ``Sample`` of the current ``Sample``
self._background = None
#: is this sample background to any other sample?
self._is_background = None
super().__init__(resolwe, **model_data)
[docs] def update(self):
"""Clear cache and update resource fields from the server."""
self._collection = None
self._relations = None
self._background = None
self._is_background = None
super().update()
@property
@assert_object_exists
def data(self):
"""Get data."""
if self._data is None:
self._data = self.resolwe.data.filter(entity=self.id)
return self._data
@property
def collection(self):
"""Get collection."""
return self._collection
@collection.setter
def collection(self, payload):
"""Set collection."""
self._resource_setter(payload, Collection, "_collection")
@property
@assert_object_exists
def relations(self):
"""Get ``Relation`` objects for this sample."""
if self._relations is None:
self._relations = self.resolwe.relation.filter(entity=self.id)
return self._relations
@property
def background(self):
"""Get background sample of the current one."""
if self._background is None:
background_relation = list(
self.resolwe.relation.filter(
type="background",
entity=self.id,
label="case",
)
)
if len(background_relation) > 1:
raise LookupError(
"Multiple backgrounds defined for sample '{}'".format(self.name)
)
elif not background_relation:
raise LookupError(
"No background is defined for sample '{}'".format(self.name)
)
for partition in background_relation[0].partitions:
if (
partition["label"] == "background"
and partition["entity"] != self.id
):
self._background = self.resolwe.sample.get(id=partition["entity"])
if self._background is None:
# Cache the result = no background is found.
self._background = False
if self._background:
return self._background
@background.setter
def background(self, bground):
"""Set sample background."""
def count_cases(entity, label):
"""Get a tuple (relation, number_of_cases) in a specified relation.
Relation is specified by collection, type-background'entity and label.
"""
relation = list(
self.resolwe.relation.filter(
collection=collection.id,
type="background",
entity=entity.id,
label=label,
)
)
if len(relation) > 1:
raise ValueError(
'Multiple relations of type "background" for sample {} in '
"collection {} "
"with label {}.".format(entity, collection, label)
)
elif len(relation) == 1:
cases = len(
[
prt
for prt in relation[0].partitions
if prt.get("label") == "case"
]
)
else:
cases = 0
return (relation[0] if relation else None, cases)
if self.background == bground:
return
assert isinstance(bground, Sample)
# Relations are always defined on collections: it is necessary
# to check that both, background and case are defined in only
# one common collection. Actions are done on this collection.
if self.collection.id != bground.collection.id:
raise ValueError(
"Background and case sample are not in the same collection."
)
collection = self.collection
# One cannot simply assign a background to sample but needs to
# account also for already existing background relations they
# are part of. By this, 3 x 3 scenarios are possible. One
# dimension of scenarios is determined by the relation in which
# *sample* is. It can be in no background relation (0), it can
# be in background relation where only one sample is the case
# sample (1) or it can be in background relation where many
# case samples are involved (2). Similarly, (future, to-be)
# background relation can be without any existing background
# relation (0), in background relation with one (1) or more (2)
# case samples.
# Get background relation for this sample and count cases in it.
# If no relation is found set to 0.
self_relation, self_cases = count_cases(self, "case")
# Get background relation of to-be background sample and count
# cases in it. If no relation is found set to 0.
bground_relation, bground_cases = count_cases(bground, "background")
# 3 x 3 options reduce to 5, since some of them can be treated equally:
if self_cases == bground_cases == 0:
# Neither case nor background is in background relation.
# Make a new background relation.
collection.create_background_relation("Background", bground, [self])
elif self_cases == 0 and bground_cases > 0:
# Sample is not part of any existing background relation,
# but background sample is. In this cae, just add sample to
# alread existing background relation
bground_relation.add_sample(self, label="case")
elif self_cases == 1 and bground_cases == 0:
# Sample si part od already existing background relation
# where there is one sample and one background. New,
# to-be-background sample is not part of any background
# relation yet. Modify sample relation and replace background.
for partition in self_relation.partitions:
if partition["label"] == "background":
partition["entity"] = bground.id
break
elif self_cases == 1 and bground_cases > 0:
# Sample si part od already existing background relation
# where there is one sample and one background. New,
# to-be-background sample is is similar two-member relation.
# Remove relaton of case sample and add it to existing
# relation of the background smaple.
self_relation.delete(force=True)
bground_relation.add_sample(self, label="case")
elif self_cases > 1:
raise ValueError(
"This sample is a case in a background relation with also other samples as cases. "
"If you would like to change background sample for all of them please delete "
"current relation and create new one with desired background."
)
self.save()
self._relations = None
self._background = None
bground._is_background = True
@property
def is_background(self):
"""Return ``True`` if given sample is background to any other and ``False`` otherwise."""
if self._is_background is None:
background_relations = self.resolwe.relation.filter(
type="background",
entity=self.id,
label="background",
)
# we need to iterate ``background_relations`` (using len) to
# evaluate ResolweQuery:
self._is_background = len(background_relations) > 0
return self._is_background
[docs] @assert_object_exists
def duplicate(self):
"""Duplicate (make copy of) ``sample`` object.
:return: Duplicated sample
"""
task_data = self.api().duplicate.post({"ids": [self.id]})
background_task = BackgroundTask(resolwe=self.resolwe, **task_data)
return self.resolwe.sample.get(id__in=background_task.result())
@property
def annotations(self):
"""Get the annotations for the given sample."""
return self.resolwe.annotation_value.filter(entity=self.id)
[docs] def get_annotation(self, full_path: str) -> "AnnotationValue":
"""Get the AnnotationValue from full path.
:raises LookupError: when field at the specified path does not exist.
"""
group_name, field_name = full_path.split(".", maxsplit=1)
return self.annotations.get(
field__name=field_name, field__group__name=group_name
)
[docs] def set_annotation(
self, full_path: str, value, force=False
) -> Optional["AnnotationValue"]:
"""Create/update annotation value.
If value is None the annotation is deleted and None is returned. If force is
set to True no explicit confirmation is required to delete the annotation.
"""
try:
annotation_value = self.get_annotation(full_path)
if value is None:
annotation_value.delete(force=force)
return None
annotation_value.value = value
annotation_value.save()
except LookupError:
if value is None:
return None
try:
field = self.resolwe.annotation_field.from_path(full_path)
except LookupError:
raise ResolweServerError(f"Field '{full_path}' does not exist.")
annotation_value = self.resolwe.annotation_value.create(
sample=self.id, field=field.id, value=value
)
return annotation_value
[docs] def get_annotations(self) -> Dict[str, Any]:
"""Get all annotations for the given sample in a dictionary."""
return {
str(annotation.field): annotation.value
for annotation in self.annotations.all()
}
[docs] def set_annotations(self, annotations: Dict[str, Any]):
"""Bulk set annotations on the sample."""
payload = [
{"field_path": key, "value": value} for key, value in annotations.items()
]
self.api(self.id).set_annotations.post(payload)