""".. Ignore pydocstyle D400.
=============
Resolwe Query
=============
.. autoclass:: resdk.ResolweQuery
:members:
"""
import collections
import copy
import logging
import operator
import tqdm
from resdk.resources import AnnotationField, DescriptorSchema, Process
from resdk.resources.base import BaseResource
[docs]class ResolweQuery:
"""Query resource endpoints.
A Resolwe instance (for example "res") has several endpoints:
- res.data
- res.collection
- res.sample
- res.process
- ...
Each such endpoint is an instance of the ResolweQuery class. ResolweQuery
supports queries on corresponding objects, for example:
.. code-block:: python
res.data.get(42) # return Data object with ID 42.
res.sample.filter(contributor=1) # return all samples made by contributor 1
This object is lazy loaded which means that actual request is made only
when needed. This enables composing multiple filters, for example:
.. code-block:: python
res.data.filter(contributor=1).filter(name='My object')
is the same as:
.. code-block:: python
res.data.filter(contributor=1, name='My object')
This is especially useful, because all endpoints at Resolwe instance
are such queries and can be filtered further before transferring
any data.
To get a list of all supported query parameters, use one that does
not exist and you will et a helpful error message with a list of
allowed ones.
.. code-block:: python
res.data.filter(foo="bar")
"""
_cache = None
_count = (
None # number of objects in current query (without applied limit and offset)
)
_limit = None
_offset = None
_filters = None
resolwe = None
resource = None
slug_field = None
endpoint = None
api = None
logger = None
def __init__(self, resolwe, resource, slug_field="slug"):
"""Initialize attributes."""
self.resolwe = resolwe
self.resource = resource
self.slug_field = slug_field
self.endpoint = resource.query_endpoint or resource.endpoint
self.api = operator.attrgetter(self.endpoint)(resolwe.api)
self._filters = collections.defaultdict(list)
self.logger = logging.getLogger(__name__)
def _non_string_iterable(self, item) -> bool:
"""Return True when item is iterable but not string."""
return isinstance(item, collections.abc.Iterable) and not isinstance(item, str)
def __getitem__(self, index):
"""Retrieve an item or slice from the set of results."""
if not isinstance(index, (slice, int)):
raise TypeError
if (
(not isinstance(index, slice) and index < 0)
or (
isinstance(index, slice) and index.start is not None and index.start < 0
)
or (isinstance(index, slice) and index.stop is not None and index.stop < 0)
):
raise ValueError("Negative indexing is not supported.")
if isinstance(index, slice) and index.step is not None:
raise ValueError("`step` parameter in slice is not supported")
if self._cache is not None:
return self._cache[index]
new_query = self._clone()
if isinstance(index, slice):
if self._offset or self._limit:
raise NotImplementedError("You cannot slice already sliced query.")
start = 0 if index.start is None else int(index.start)
stop = (
1000000 if index.stop is None else int(index.stop)
) # default to something big
new_query._offset = start
new_query._limit = stop - start
return new_query
new_query._offset = self._offset + index if self._offset else index
new_query._limit = 1
query_list = list(new_query)
if not query_list:
raise IndexError("list index out of range")
return query_list[0]
def __iter__(self):
"""Return iterator over the current object."""
self._fetch()
return iter(self._cache)
def __repr__(self):
"""Return string representation of the current object."""
self._fetch()
rep = "[{}]".format(",\n ".join(str(obj) for obj in self._cache))
return rep
def __len__(self):
"""Return length of results of current query."""
return self.count()
def _clone(self):
"""Return copy of current object with empty cache."""
new_obj = self.__class__(self.resolwe, self.resource)
new_obj._filters = copy.deepcopy(self._filters)
new_obj._limit = self._limit
new_obj._offset = self._offset
return new_obj
def _dehydrate_resources(self, obj):
"""Iterate through object and replace all objects with their ids."""
if isinstance(obj, BaseResource):
return obj.id
if isinstance(obj, dict):
return {key: self._dehydrate_resources(value) for key, value in obj.items()}
if self._non_string_iterable(obj):
return [self._dehydrate_resources(element) for element in obj]
return obj
def _add_filter(self, filter_):
"""Add filtering parameters."""
for key, value in filter_.items():
# 'sample' is called 'entity' in the backend.
key = key.replace("sample", "entity")
value = self._dehydrate_resources(value)
if self._non_string_iterable(value):
value = ",".join(map(str, value))
if self.resource.query_method == "GET":
self._filters[key].append(value)
elif self.resource.query_method == "POST":
self._filters[key] = value
else:
raise NotImplementedError(
"Unsupported query_method: {}".format(self.resource.query_method)
)
def _compose_filters(self):
"""Convert filters to dict and add pagination filters."""
filters = self._filters
if self._limit is not None:
filters["limit"] = self._limit
if self._offset is not None:
filters["offset"] = self._offset
return dict(filters)
def _populate_resource(self, data):
"""Populate resource with given data."""
return self.resource(resolwe=self.resolwe, **data)
def _fetch(self):
"""Make request to the server and populate cache."""
if self._cache is not None:
# Already fetched.
return
filters = self._compose_filters()
if self.resource.query_method == "GET":
items = self.api.get(**filters)
elif self.resource.query_method == "POST":
items = self.api.post(filters)
else:
raise NotImplementedError(
"Unsupported query_method: {}".format(self.resource.query_method)
)
# Extract data from paginated response
if isinstance(items, dict) and "results" in items:
self._count = items["count"]
items = items["results"]
# Store count when list of objects is received without limit.
if isinstance(items, list) and self._limit is None:
self._count = len(items)
self._cache = [self._populate_resource(data) for data in items]
[docs] def clear_cache(self):
"""Clear cache."""
self._cache = None
self._count = None
[docs] def count(self):
"""Return number of objects in current query."""
if self._count is None:
count_query = self._clone()
count_query._offset = 0
count_query._limit = 1
count_query._fetch()
self._count = count_query._count
if self._limit is None:
return self._count
remaining = self._count - self._offset
return max(0, min(self._limit, remaining))
[docs] def get(self, *args, **kwargs):
"""Get object that matches given parameters.
If only one non-keyworded argument is given, it is considered
as id if it is number and as slug otherwise.
:param uid: unique identifier - ID or slug
:type uid: int for ID or string for slug
:rtype: object of type self.resource
:raises ValueError: if non-keyworded and keyworded arguments
are combined or if more than one non-keyworded argument is
given
:raises LookupError: if none or more than one objects are
returned
"""
if args:
if len(args) > 1:
raise ValueError("Only one non-keyworded argument can be given")
if kwargs:
raise ValueError(
"Non-keyworded arguments cannot be combined with keyworded ones."
)
arg = args[0]
kwargs = {"id": arg} if isinstance(arg, int) else {self.slug_field: arg}
if self.slug_field in kwargs:
if issubclass(self.resource, (Process, DescriptorSchema)):
kwargs["ordering"] = kwargs.get("ordering", "-version")
kwargs["limit"] = kwargs.get("limit", 1)
new_query = self._clone()
new_query._add_filter(kwargs)
response = list(new_query)
if not response:
raise LookupError("Matching object does not exist.")
if len(response) > 1:
raise LookupError("get() returned more than one object.")
return response[0]
[docs] def create(self, **model_data):
"""Return new instance of current resource."""
resource = self.resource(self.resolwe, **model_data)
resource.save()
return resource
[docs] def filter(self, **filters):
"""Return clone of current query with added given filters."""
new_query = self._clone()
new_query._add_filter(filters)
return new_query
[docs] def delete(self, force=False):
"""Delete objects in current query.
:param bool force: Do not trigger confirmation prompt. WARNING: Be
sure that you really know what you are doing as deleted objects
are not recoverable.
"""
if force is not True:
user_input = input(self.resource.delete_warning_bulk.format(self.count()))
if user_input.strip().lower() != "y":
return
for obj in self:
obj.delete(force=True)
self.clear_cache()
[docs] def all(self):
"""Return copy of the current queryset.
This is handy function to get newly created query without any
filters.
"""
return self._clone()
[docs] def search(self, text):
"""Full text search."""
if not self.resource.full_search_paramater:
raise NotImplementedError()
new_query = self._clone()
new_query._add_filter({self.resource.full_search_paramater: text})
return new_query
[docs] def iterate(self, chunk_size=100, show_progress=False):
"""
Iterate through query.
This can come handy when one wishes to iterate through hundreds or
thousands of objects and would otherwise get "504 Gateway-timeout".
The method cannot be used together with the following filters:
limit, offset and ordering, and will raise a ``ValueError``.
"""
# For simplicity, let's assume that this method will only be used when
# limit and offset are not used as query parameters. We can relax
# these limitations at some later point. Also, ordering is
# prohibited for now.
if self._limit is not None:
raise ValueError(
"Parameter 'limit' should not be used in combination with method iterate."
)
if self._offset is not None:
raise ValueError(
"Parameter 'offset' should not be used in combination with method iterate."
)
if "ordering" in self._filters:
raise ValueError(
"Specifying order in combination with method iterate is not allowed."
)
count = self.count()
iterate_query = self._clone()
min_id = 0
obj_count = 0
with tqdm.tqdm(total=count, disable=not show_progress) as pbar:
while obj_count < count:
for obj in iterate_query.filter(
id__gt=min_id, limit=chunk_size, ordering="id"
):
obj_count += 1
min_id = obj.id
pbar.update(1)
yield obj
class AnnotationFieldQuery(ResolweQuery):
"""Add additional method to the annotation field query."""
def from_path(self, full_path: str) -> "AnnotationField":
"""Get the AnnotationField from full path.
:raises LookupError: when field at the specified path does not exist.
"""
group_name, field_name = full_path.split(".", maxsplit=1)
return self.get(name=field_name, group__name=group_name)
class AnnotationValueQuery(ResolweQuery):
"""Populate Annotation fields with a single query."""
def _fetch(self):
"""Make request to the server and populate cache.
Fetch all values and their fields with 2 queries.
"""
# Execute the query in a single request.
super()._fetch()
missing = collections.defaultdict(list)
for value in self._cache:
if value._field is None:
missing[value.field_id].append(value)
if missing:
# Get corresponding annotation field details in a single query and attach it to
# the values.
for field in self.resolwe.annotation_field.filter(id__in=missing.keys()):
for value in missing[field.id]:
value._field = field
value._original_values["field"] = field._original_values