Source code for resdk.resources.geneset

"""Geneset resource."""

import json
import logging
from collections import Counter
from urllib.parse import urljoin

from .data import Data
from .utils import get_collection_id

[docs]class Geneset(Data): """Resolwe Geneset resource. :param resolwe: Resolwe instance :type resolwe: Resolwe object :param model_data: Resource model data """ def __init__(self, resolwe, genes=None, source=None, species=None, **model_data): """Initialize attributes.""" self.logger = logging.getLogger(__name__) super().__init__(resolwe, **model_data) self._genes = None self._source = source self._species = species # Make sure genes are stored in a set object if genes is not None: self.genes = genes @property def genes(self): """Get genes.""" if self._genes is None or len(self._genes) == 0: if and "geneset_json" in self.output: url = urljoin( self.resolwe.url, "api/storage/{}".format(self.output["geneset_json"]), ) response = self.resolwe.session.get(url, auth=self.resolwe.auth) response = json.loads(response.content.decode("utf-8")) self._genes = set(response["json"]["genes"]) return sorted(self._genes) @genes.setter def genes(self, genes): """Set genes.""" self._assert_allow_change("genes") if genes is not None: # Make sure submitted list only includes unique elements: if len(set(genes)) != len(genes): counter = Counter(list(genes)) duplicates = [gene for gene, count in counter.items() if count >= 2] duplicates = ", ".join(sorted(duplicates)) raise ValueError( f"Gene list should only contain unique elements. There are duplicates: {duplicates}" ) self._genes = set(genes) @property def source(self): """Get source.""" if self._source is None and and "source" in self.output: self._source = self.output["source"] return self._source @source.setter def source(self, new_source): """Set source.""" self._assert_allow_change("source") self._source = new_source @property def species(self): """Get species.""" if self._species is None and and "species" in self.output: self._species = self.output["species"] return self._species @species.setter def species(self, new_species): """Set species.""" self._assert_allow_change("species") self._species = new_species def _assert_allow_change(self, field_name): """Assert that this Geneset obj is not saved yet.""" if msg = "Not allowed to change field {} after geneset is saved".format( field_name ) raise ValueError(msg)
[docs] def save(self): """Save Geneset to the server. If Geneset is already on the server update with save() from base class. Otherwise, create a new Geneset by running process with slug "create-geneset". """ if super().save() else: none_fields = [ name for name in ["genes", "source", "species"] if getattr(self, name, None) is None ] if none_fields: msg = "Fields {} must not be none".format(", ".join(none_fields)) raise ValueError(msg) data = { "process": {"slug": "create-geneset"}, "input": { "genes": list(self.genes), "source": self.source, "species": self.species, }, } if data["name"] = if self.collection: data["collection"] = {"id": get_collection_id(self.collection)} model_data = tmp_genes, tmp_source, tmp_species = self.genes, self.source, self.species self._update_fields(model_data) # Since there is no output values in model_data # the original genes, source and species values gets overwritten # so we set them back here self._genes, self._source, self._species = ( tmp_genes, tmp_source, tmp_species, )
def __and__(self, other): """Intersection.""" return self.set_operator("__and__", other) def __or__(self, other): """Union.""" return self.set_operator("__or__", other) def __sub__(self, other): """Difference.""" return self.set_operator("__sub__", other) def __rsub__(self, other): """Right difference.""" return self.set_operator("__rsub__", other) def __xor__(self, other): """Symmetric difference.""" return self.set_operator("__xor__", other)
[docs] def set_operator(self, operator, other): """Perform set operations on Geneset object by creating a new Genseset. :param operator: string -> set operation function name :param other: Geneset object :return: new Geneset object """ # Make sure that self._genes is populated: _ = self.genes operator_func = getattr(self._genes, operator) if not isinstance(other, Geneset) or operator_func is None: return NotImplemented # Make sure that other._genes is populated: _ = other.genes if self.source != other.source: raise ValueError("Cannot compare Genesets with different sources") if self.species != other.species: raise ValueError("Cannot compare Genesets with different species") genes = operator_func(other._genes) return Geneset( self.resolwe, genes=genes, species=self.species, source=self.source )