Source code for resdk.tables.variant

""".. Ignore pydocstyle D400.

=============
VariantTables
=============

.. autoclass:: VariantTables
    :members:
    :inherited-members:

"""

import re
import warnings
from functools import lru_cache
from typing import Callable, List, Optional, Union

import numpy as np
import pandas as pd

from resdk.resources import Collection, Data, Geneset

from .base import BaseTables


[docs]class VariantTables(BaseTables): """A helper class to fetch collection's variant and meta data. This class enables fetching given collection's data and returning it as tables which have samples in rows and variants in columns. A simple example: .. code-block:: python # Get Collection object collection = res.collection.get("collection-slug") tables = VariantTables(collection) # Get variant data tables.variants # Get depth per variant or coverage for specific base tables.depth tables.depth_a tables.depth_c tables.depth_g tables.depth_t """ process_type = "data:mutationstable" VARIANTS = "variants" DEPTH = "depth" DEPTH_A = "depth_a" DEPTH_C = "depth_c" DEPTH_G = "depth_g" DEPTH_T = "depth_t" FILTER = "FILTER" data_type_to_field_name = { VARIANTS: "tsv", DEPTH: "tsv", DEPTH_A: "tsv", DEPTH_C: "tsv", DEPTH_G: "tsv", DEPTH_T: "tsv", FILTER: "tsv", } DATA_FIELDS = [ "id", "slug", "modified", "entity__name", "entity__id", "input", "output", "process__output_schema", "process__slug", ]
[docs] def __init__( self, collection: Collection, geneset: Optional[List[str]] = None, filtering: bool = True, cache_dir: Optional[str] = None, progress_callable: Optional[Callable] = None, ): """Initialize class. :param collection: Collection to use. :param geneset: Only consider mutations from this gene-set. Can be a list of gene symbols or a valid geneset Data object id / slug. :param filtering: Only show variants that pass QC filters. :param cache_dir: Cache directory location, if not specified system specific cache directory is used. :param progress_callable: Custom callable that can be used to report progress. By default, progress is written to stderr with tqdm. """ super().__init__(collection, cache_dir, progress_callable) self.filtering = filtering self._geneset = None if geneset is None: self._check_heterogeneous_mutations() # Assign geneset from the Genialis Platform self.geneset = self._get_obj_geneset(self._data[0]) else: self.geneset = geneset
@property def geneset(self): """Get geneset.""" return self._geneset @geneset.setter def geneset(self, value: Union[str, int, Geneset, List[str]]): """Set geneset. Geneset can be set only once. On attempt to re-set, ValueError is raised. """ # Geneset can be set only once, prevent modifications if self._geneset is not None: raise ValueError("It is not allowed to change geneset value.") if value is None: return # If id / slug of a geneset is given, get it from the Resolwe server if isinstance(value, (int, str)): gs = self.resolwe.geneset.get(value) value = gs.genes elif isinstance(value, Geneset): value = value.genes if isinstance(value, (list, set, tuple, pd.Series)): self._geneset = set(value) else: raise ValueError(f'Unsupported type of "geneset" input: {value}.') @property @lru_cache() def variants(self) -> pd.DataFrame: """Get variants table. There are 4 possible values: - 0 - wild-type, no variant - 1 - heterozygous mutation - 2 - homozygous mutation - NaN - QC filters are failing - mutation status is unreliable """ df = self._load_fetch(self.VARIANTS) # Variants that are not reported (NaN) were not detected: # they are wild type. df = df.fillna(0) if self.filtering: # Keep values in case .filter == PASS or .variants == 0 df = df.where((self.filter == "PASS") | (df == 0), other=np.nan) return df @property @lru_cache() def depth(self) -> pd.DataFrame: """Get depth table.""" return self._load_fetch(self.DEPTH) @property @lru_cache() def depth_a(self) -> pd.DataFrame: """Get depth table for adenine.""" return self._load_fetch(self.DEPTH_A) @property @lru_cache() def depth_c(self) -> pd.DataFrame: """Get depth table for cytosine.""" return self._load_fetch(self.DEPTH_C) @property @lru_cache() def depth_g(self) -> pd.DataFrame: """Get depth table for guanine.""" return self._load_fetch(self.DEPTH_G) @property @lru_cache() def depth_t(self) -> pd.DataFrame: """Get depth table for thymine.""" return self._load_fetch(self.DEPTH_T) # TODO: consider better name @property @lru_cache() def filter(self) -> pd.DataFrame: """Get filter table. Values can be: - PASS - Variant has passed filters: - DP : Insufficient read depth (< 10.0) - QD: insufficient quality normalized by depth (< 2.0) - FS: insufficient phred-scaled p-value using Fisher's exact test to detect strand bias (> 30.0) - SnpCluster: Variant is part of a cluster For example, if a variant has read depth 8, GATK will mark it as DP. """ return self._load_fetch(self.FILTER) def _check_heterogeneous_mutations(self): """Check there are not heterogeneous mutations / genesets. Genes for which mutations are computed are given either with mutations (list of genes) or geneset (geneset ID) input. Ensure all the data has the same value of this. """ # Currently, frontend assigns empty list if this value is not entered. mutations = {str(d.input.get("mutations", [])) for d in self._data} genesets = {str(d.input.get("geneset", "")) for d in self._data} if len(mutations) > 1: name = "mutations" multiple = mutations elif len(genesets) > 1: name = "genesets" multiple = genesets else: return raise ValueError( f"Variants should be computed with the same {name} input. " f"Variants of samples in collection {self.collection.name} " f"have been computed with {', '.join(list(multiple))}.\n" "Use geneset filter in the VariantTables constructor.\n" ) def _get_obj_geneset(self, obj): """Get genes for which mutations are computed in an object.""" obj_geneset = set(obj.input.get("mutations", [])) if not obj_geneset: # Geneset is given via geneset input: gs = self.resolwe.geneset.get(obj.input["geneset"]) obj_geneset = set(gs.genes) # Convert to gene symbols in case genes are given as feature ID's if gs.output["source"] != "UCSC": qs = self.resolwe.feature.filter(feature_id__in=list(obj_geneset)) id_2_name = {obj.feature_id: obj.name for obj in qs} # Sometimes, genes defined in obj.input[geneset/mutations] are # missing in KnowledgeBase. This can happen due to KB updates. mapping_yes, mapping_no = set(), set() for gene_id in obj_geneset: if gene_id in id_2_name: mapping_yes.add(id_2_name[gene_id]) else: mapping_no.add(gene_id) if mapping_no: warnings.warn( f"{len(mapping_no)} genes in sample {obj.sample.id} are " "missing from KnowledgeBase. These genes are ignored in " "further analysis." ) obj_geneset = mapping_yes return obj_geneset @property @lru_cache() def _data(self) -> List[Data]: """Fetch data objects. Fetch Data of type ``self.process_type`` from given collection and cache the results in memory. :return: list of Data objects """ data = [] sample_ids, repeated_sample_ids = set(), set() for datum in self.collection.data.filter( type=self.process_type, status="OK", ordering="-created", fields=self.DATA_FIELDS, ): # 1 Filter by newest datum in the sample if datum.sample.id in sample_ids: repeated_sample_ids.add(datum.sample.id) continue # 2 Filter by genes, if geneset is given if self.geneset: obj_geneset = self._get_obj_geneset(datum) if not self.geneset.issubset(obj_geneset): warnings.warn( f"Sample {datum.sample} (Data {datum.id}) does not " "contain the genes requested in geneset input." ) continue sample_ids.add(datum.sample.id) data.append(datum) if repeated_sample_ids: repeated = ", ".join(map(str, repeated_sample_ids)) warnings.warn( f"The following samples have multiple data of type {self.process_type}: " f"{repeated}. Using only the newest data of this sample.", UserWarning, ) if not data: raise ValueError( f"Collection {self.collection.name} has no {self.process_type} " "data or there is no data with the requested mutations." ) return data def _download_qc(self) -> pd.DataFrame: """Download sample QC data and transform into table.""" # No QC is given for variants data - return empty DataFrame return pd.DataFrame() def _construct_index(self, row) -> str: """ Construct index of the variants table. Index should have the form: <chr>_<position>_<snp_change> E.g. chr2_1234567_C>T """ chrom = row["CHROM"] pos = row["POS"] ref = row["REF"] alt = row["ALT"] return f"{chrom}_{pos}_{ref}>{alt}" @staticmethod def _encode_mutation(row) -> int: """Encode mutation to numerical value. Mutations are given as <allele1>/<allele2>, e.g. T/T or C/T Encode these mutations as: - 0 for wild type (no mutation) - 1 for heterozygous mutation - 2 for homozygous mutation """ try: allele_line = row.get("SAMPLENAME1.GT", np.nan) allele_re = r"^([ACGT]+)/([ACGT]+)$" allele1, allele2 = re.match(allele_re, allele_line).group(1, 2) except AttributeError: # AttributeError is raised when there is no match, e.g. # there is a string value for column "SAMPLENAME1.GT" but # the above regex can't parse it warnings.warn(f'Cannot encode mutation from value "{allele_line}".') return np.nan if allele1 == allele2 == row["REF"]: return 0 elif allele1 == allele2 == row["ALT"]: return 2 else: return 1 def _parse_file(self, file_obj, sample_id, data_type) -> pd.Series: """Parse file - get encoded variants / depth of a single sample.""" sample_data = pd.read_csv(file_obj, sep="\t", low_memory=False) # Filter mutations if specified if self.geneset: out = sample_data.loc[sample_data["Gene_Name"].isin(self.geneset)] if out.empty: out = sample_data.loc[sample_data["Feature_ID"].isin(self.geneset)] sample_data = out # Construct index sample_data["index"] = sample_data.apply( self._construct_index, axis=1, result_type="reduce" ) sample_data.set_index("index", inplace=True) sample_data.index.name = None if data_type == self.VARIANTS: s = sample_data.apply(self._encode_mutation, axis=1, result_type="reduce") elif data_type == self.DEPTH: # Depth, as computed by GATK is reported by "DP" column s = sample_data["Total_depth"] elif data_type == self.DEPTH_A: s = sample_data["Base_A"] elif data_type == self.DEPTH_C: s = sample_data["Base_C"] elif data_type == self.DEPTH_G: s = sample_data["Base_G"] elif data_type == self.DEPTH_T: s = sample_data["Base_T"] elif data_type == self.FILTER: s = sample_data["FILTER"] s.name = sample_id # Sometimes mutations_table.tsv to contains duplicate variants # For now, keep the first one and drop the rest of them. s = s[~s.index.duplicated(keep="first")] return s