Source code for resdk.resources.utils

"""Resource utility functions."""

import functools
from datetime import datetime
from urllib.parse import urljoin

import pytz
import tzlocal

from resdk.constants import RESOLWE_DATETIME_FORMAT


[docs]def iterate_fields(fields, schema): """Recursively iterate over all DictField sub-fields. :param fields: Field instance (e.g. input) :type fields: dict :param schema: Schema instance (e.g. input_schema) :type schema: dict """ schema_dict = {val["name"]: val for val in schema} for field_id, properties in fields.items(): if "group" in schema_dict[field_id]: for _field_sch, _fields in iterate_fields( properties, schema_dict[field_id]["group"] ): yield (_field_sch, _fields) else: yield (schema_dict[field_id], fields)
[docs]def iterate_schema(fields, schema, path=None): """Recursively iterate over all schema sub-fields. :param fields: Field instance (e.g. input) :type fields: dict :param schema: Schema instance (e.g. input_schema) :type schema: dict :path schema: Field path :path schema: string """ for field_schema in schema: name = field_schema["name"] if "group" in field_schema: for rvals in iterate_schema( fields[name] if name in fields else {}, field_schema["group"], None if path is None else "{}.{}".format(path, name), ): yield rvals else: if path is None: yield (field_schema, fields) else: yield (field_schema, fields, "{}.{}".format(path, name))
[docs]def flatten_field(field, schema, path): """Reduce dicts of dicts to dot separated keys. :param field: Field instance (e.g. input) :type field: dict :param schema: Schema instance (e.g. input_schema) :type schema: dict :param path: Field path :type path: string :return: flattened instance :rtype: dictionary """ flat = {} for field_schema, fields, current_path in iterate_schema(field, schema, path): name = field_schema["name"] typ = field_schema["type"] label = field_schema["label"] value = fields.get(name, None) flat[current_path] = {"name": name, "type": typ, "label": label, "value": value} return flat
[docs]def fill_spaces(word, desired_length): """Fill spaces at the end until word reaches desired length.""" return str(word) + " " * (desired_length - len(word))
def _print_input_line(element_list, level): """Pretty print of input_schema.""" spacing = 2 for element in element_list: if "group" in element: print( "{}- {} - {}".format(" " * level, element["name"], element["label"]) ) _print_input_line(element["group"], level + 1) else: max_name_len = max([len(elm["name"]) for elm in element_list]) max_type_len = max( [ len(elm["type"]) or 0 for elm in [e for e in element_list if "group" not in e] ] ) print( "{}- {} {} - {}".format( " " * level, fill_spaces(element["name"], max_name_len + spacing), fill_spaces("[" + element["type"] + "]", max_type_len + spacing), element["label"], ) )
[docs]def get_collection_id(collection): """Return id attribute of the object if it is collection, otherwise return given value.""" return collection.id if type(collection).__name__ == "Collection" else collection
[docs]def get_data_id(data): """Return id attribute of the object if it is data, otherwise return given value.""" return data.id if type(data).__name__ == "Data" else data
[docs]def get_descriptor_schema_id(dschema): """Get descriptor schema id. Return id attribute of the object if it is descriptor schema, otherwise return given value. """ return dschema.id if type(dschema).__name__ == "DescriptorSchema" else dschema
[docs]def get_process_id(process): """Return id attribute of the object if it is process, otherwise return given value.""" return process.id if type(process).__name__ == "Process" else process
[docs]def get_sample_id(sample): """Return id attribute of the object if it is sample, otherwise return given value.""" return sample.id if type(sample).__name__ == "Sample" else sample
[docs]def get_relation_id(relation): """Return id attribute of the object if it is relation, otherwise return given value.""" return relation.id if type(relation).__name__ == "Relation" else relation
[docs]def get_user_id(user): """Return id attribute of the object if it is relation, otherwise return given value.""" return user.id if type(user).__name__ == "User" else user
[docs]def is_collection(collection): """Return ``True`` if passed object is Collection and ``False`` otherwise.""" return type(collection).__name__ == "Collection"
[docs]def is_data(data): """Return ``True`` if passed object is Data and ``False`` otherwise.""" return type(data).__name__ == "Data"
[docs]def is_descriptor_schema(data): """Return ``True`` if passed object is DescriptorSchema and ``False`` otherwise.""" return type(data).__name__ == "DescriptorSchema"
[docs]def is_process(process): """Return ``True`` if passed object is Process and ``False`` otherwise.""" return type(process).__name__ == "Process"
[docs]def is_sample(sample): """Return ``True`` if passed object is Sample and ``False`` otherwise.""" return type(sample).__name__ == "Sample"
[docs]def is_relation(relation): """Return ``True`` if passed object is Relation and ``False`` otherwise.""" return type(relation).__name__ == "Relation"
[docs]def is_user(user): """Return ``True`` if passed object is User and ``False`` otherwise.""" return type(user).__name__ == "User"
[docs]def is_group(group): """Return ``True`` if passed object is Group and ``False`` otherwise.""" return type(group).__name__ == "Group"
[docs]def parse_resolwe_datetime(dtime): """Convert string representation of time to local datetime.datetime object.""" if dtime: # Get naive (=time-zone unaware) version of UTC time: utc_naive = datetime.strptime(dtime[:-6], RESOLWE_DATETIME_FORMAT) # Localize the time so it includes UTC timezone info: utc_aware = pytz.utc.localize(utc_naive) # Get name local time zone: local_tz = tzlocal.get_localzone() # Present time in the local time zone local_time = utc_aware.astimezone(local_tz) return local_time
@functools.lru_cache(128) def _get_billing_account_id(res, name): """Get billing account ID based on it's name.""" response = res.session.get(urljoin(res.url, "api/billingaccount")) if not response.json(): raise ValueError( "You do not have sufficient permissions for assigning billing accounts." ) for item in response.json(): if item["name"] == name: return item["id"] else: raise ValueError(f'Could not find a billing account with name "{name}"')