Source code for resdk.resources.data

"""Data resource."""

import json
import logging
from urllib.parse import urljoin

from resdk.constants import CHUNK_SIZE

from ..utils.decorators import assert_object_exists
from .background_task import BackgroundTask
from .base import BaseResolweResource
from .collection import Collection
from .descriptor import DescriptorSchema
from .process import Process
from .sample import Sample
from .utils import flatten_field, parse_resolwe_datetime


[docs]class Data(BaseResolweResource): """Resolwe Data resource. :param resolwe: Resolwe instance :type resolwe: Resolwe object :param model_data: Resource model data """ endpoint = "data" full_search_paramater = "text" READ_ONLY_FIELDS = BaseResolweResource.READ_ONLY_FIELDS + ( "checksum", "descriptor_dirty", "duplicated", "process_cores", "process_error", "process_info", "process_memory", "process_progress", "process_rc", "process_warning", "output", "scheduled", "size", "status", ) UPDATE_PROTECTED_FIELDS = BaseResolweResource.UPDATE_PROTECTED_FIELDS + ( "input", "process", ) WRITABLE_FIELDS = BaseResolweResource.WRITABLE_FIELDS + ( "collection", "descriptor", "descriptor_schema", "process_resources", "sample", "tags", ) def __init__(self, resolwe, **model_data): """Initialize attributes.""" self.logger = logging.getLogger(__name__) #: ``Collection``s that contains ``Data`` self._collection = None #: ``DescriptorSchema`` of ``Data`` object self._descriptor_schema = None #: The process used in this data object self._process = None #: ``Sample`` containing ``Data`` object self._sample = None #: ``ResolweQuery`` containing parent ``Data`` objects (lazy loaded) self._parents = None #: ``ResolweQuery`` containing child ``Data`` objects (lazy loaded) self._children = None #: checksum field calculated on inputs self.checksum = None #: indicate whether `descriptor` doesn't match `descriptor_schema` (is dirty) self.descriptor_dirty = None #: annotation data, with the form defined in descriptor_schema self.descriptor = None #: duplicated self.duplicated = None #: actual input values self.input = None #: process cores self.process_cores = None #: error log message (list of strings) self.process_error = None #: info log message (list of strings) self.process_info = None #: process memory self.process_memory = None #: process progress in percentage self.process_progress = None #: Process algorithm return code self.process_rc = None #: warning log message (list of strings) self.process_warning = None #: actual output values self.output = None #: process_resources self.process_resources = None #: size self.size = None #: scheduled self.scheduled = None #: process status - Possible values: #: UP (Uploading - for upload processes), #: RE (Resolving - computing input data objects) #: WT (Waiting - waiting for process since the queue is full) #: PP (Preparing - preparing the environment for processing) #: PR (Processing) #: OK (Done) #: ER (Error) #: DR (Dirty - Data is dirty) self.status = None #: data object's tags self.tags = None super().__init__(resolwe, **model_data)
[docs] def update(self): """Clear cache and update resource fields from the server.""" self._children = None self._collection = None self._descriptor_schema = None self._parents = None self._process = None self._sample = None super().update()
@property def process(self): """Get process.""" return self._process @process.setter def process(self, payload): """Set process.""" self._resource_setter(payload, Process, "_process") @property def descriptor_schema(self): """Get descriptor schema.""" return self._descriptor_schema @descriptor_schema.setter def descriptor_schema(self, payload): """Set descriptor schema.""" self._resource_setter(payload, DescriptorSchema, "_descriptor_schema") @property def sample(self): """Get sample.""" if self._sample is None and self._original_values.get("entity", None): # The collection data is only serialized on the top level. Replace the # data inside 'entity' with the actual collection data. entity_values = self._original_values["entity"].copy() entity_values["collection"] = self._original_values.get("collection", None) self._sample = Sample(resolwe=self.resolwe, **entity_values) return self._sample @sample.setter def sample(self, payload): """Set sample.""" self._resource_setter(payload, Sample, "_sample") @property def collection(self): """Get collection.""" return self._collection @collection.setter def collection(self, payload): """Set collection.""" self._resource_setter(payload, Collection, "_collection") @property @assert_object_exists def started(self): """Get start time.""" return parse_resolwe_datetime(self._original_values["started"]) @property @assert_object_exists def finished(self): """Get finish time.""" return parse_resolwe_datetime(self._original_values["finished"]) @property @assert_object_exists def parents(self): """Get parents of this Data object.""" if self._parents is None: ids = [ item["id"] for item in self.resolwe.api.data(self.id).parents.get(fields="id") ] if not ids: return [] # Resolwe querry must be returned: self._parents = self.resolwe.data.filter(id__in=ids) return self._parents @property @assert_object_exists def children(self): """Get children of this Data object.""" if self._children is None: ids = [ item["id"] for item in self.resolwe.api.data(self.id).children.get(fields="id") ] if not ids: return [] # Resolwe querry must be returned: self._children = self.resolwe.data.filter(id__in=ids) return self._children def _files_dirs(self, field_type, file_name=None, field_name=None): """Get list of downloadable fields.""" download_list = [] def put_in_download_list(elm, fname): """Append only files od dirs with equal name.""" if field_type in elm: if file_name is None or file_name == elm[field_type]: download_list.append(elm[field_type]) else: raise KeyError( "Item {} does not contain '{}' key.".format(fname, field_type) ) if field_name and not field_name.startswith("output."): field_name = "output.{}".format(field_name) flattened = flatten_field(self.output, self.process.output_schema, "output") for ann_field_name, ann in flattened.items(): if ( ann_field_name.startswith("output") and (field_name is None or field_name == ann_field_name) and ann["value"] is not None ): if ann["type"].startswith("basic:{}:".format(field_type)): put_in_download_list(ann["value"], ann_field_name) elif ann["type"].startswith("list:basic:{}:".format(field_type)): for element in ann["value"]: put_in_download_list(element, ann_field_name) return download_list def _get_dir_files(self, dir_name): files_list, dir_list = [], [] dir_url = urljoin(self.resolwe.url, "data/{}/{}".format(self.id, dir_name)) if not dir_url.endswith("/"): dir_url += "/" response = self.resolwe.session.get(dir_url, auth=self.resolwe.auth) response = json.loads(response.content.decode("utf-8")) for obj in response: obj_path = "{}/{}".format(dir_name, obj["name"]) if obj["type"] == "directory": dir_list.append(obj_path) else: files_list.append(obj_path) if dir_list: for new_dir in dir_list: files_list.extend(self._get_dir_files(new_dir)) return files_list
[docs] @assert_object_exists def files(self, file_name=None, field_name=None): """Get list of downloadable file fields. Filter files by file name or output field. :param file_name: name of file :type file_name: string :param field_name: output field name :type field_name: string :rtype: List of tuples (data_id, file_name, field_name, process_type) """ file_list = self._files_dirs("file", file_name, field_name) for dir_name in self._files_dirs("dir", file_name, field_name): file_list.extend(self._get_dir_files(dir_name)) return file_list
[docs] def download(self, file_name=None, field_name=None, download_dir=None): """Download Data object's files and directories. Download files and directoriesfrom the Resolwe server to the download directory (defaults to the current working directory). :param file_name: name of file or directory :type file_name: string :param field_name: file or directory field name :type field_name: string :param download_dir: download path :type download_dir: string :rtype: None Data objects can contain multiple files and directories. All are downloaded by default, but may be filtered by name or output field: * re.data.get(42).download(file_name='alignment7.bam') * re.data.get(42).download(field_name='bam') """ if file_name and field_name: raise ValueError("Only one of file_name or field_name may be given.") files = [ "{}/{}".format(self.id, fname) for fname in self.files(file_name, field_name) ] self.resolwe._download_files(files, download_dir)
[docs] def stdout(self): """Return process standard output (stdout.txt file content). Fetch stdout.txt file from the corresponding Data object and return the file content as string. The string can be long and ugly. :rtype: string """ if self.process.type.startswith("data:workflow"): raise ValueError("stdout.txt file is not available for workflows.") output = b"" url = urljoin(self.resolwe.url, "data/{}/stdout.txt".format(self.id)) response = self.resolwe.session.get(url, stream=True, auth=self.resolwe.auth) if not response.ok and self.status in ["UP", "RE", "WT", "PP", "DR"]: raise ValueError( f"stdout.txt file is not available for Data with status {self.status}" ) if not response.ok: response.raise_for_status() else: for chunk in response.iter_content(chunk_size=CHUNK_SIZE): output += chunk return output.decode("utf-8")
[docs] @assert_object_exists def duplicate(self): """Duplicate (make copy of) ``data`` object. :return: Duplicated data object """ task_data = self.api().duplicate.post({"ids": [self.id]}) background_task = BackgroundTask(resolwe=self.resolwe, **task_data) return self.resolwe.data.get(id__in=background_task.result())