Source code for resdk.tables.rna

""".. Ignore pydocstyle D400.

=========
RNATables
=========

.. autoclass:: RNATables
    :members:
    :inherited-members:

    .. automethod:: __init__

"""

import os
import warnings
from collections import Counter
from functools import lru_cache
from typing import Callable, Dict, List, Optional

import numpy as np
import pandas as pd

from resdk.resources import Collection, Data
from resdk.utils.table_cache import load_pickle, save_pickle

from .base import BaseTables
from .utils import batch_download

CHUNK_SIZE = 1000


MQC_GENERAL_COLUMNS = [
    {
        "name": "FastQC (raw)_mqc-generalstats-fastqc_raw-total_sequences",
        "slug": "total_read_count_raw",
        "type": "Int64",
        "agg_func": "sum",
    },
    {
        "name": "FastQC (trimmed)_mqc-generalstats-fastqc_trimmed-total_sequences",
        "slug": "total_read_count_trimmed",
        "type": "Int64",
        "agg_func": "sum",
    },
    {
        "name": "FastQC (raw)_mqc-generalstats-fastqc_raw-percent_gc",
        "slug": "gc_content_raw",
        "type": "float64",
        "agg_func": "mean",
    },
    {
        "name": "FastQC (trimmed)_mqc-generalstats-fastqc_trimmed-percent_gc",
        "slug": "gc_content_trimmed",
        "type": "float64",
        "agg_func": "mean",
    },
    {
        "name": "FastQC (raw)_mqc-generalstats-fastqc_raw-percent_duplicates",
        "slug": "seq_duplication_raw",
        "type": "float64",
        "agg_func": "mean",
    },
    {
        "name": "FastQC (trimmed)_mqc-generalstats-fastqc_trimmed-percent_duplicates",
        "slug": "seq_duplication_trimmed",
        "type": "float64",
        "agg_func": "mean",
    },
    {
        "name": "FastQC (raw)_mqc-generalstats-fastqc_raw-avg_sequence_length",
        "slug": "avg_seq_length_raw",
        "type": "float64",
        "agg_func": "mean",
    },
    {
        "name": "FastQC (trimmed)_mqc-generalstats-fastqc_trimmed-avg_sequence_length",
        "slug": "avg_seq_length_trimmed",
        "type": "float64",
        "agg_func": "mean",
    },
    {
        "name": "STAR_mqc-generalstats-star-uniquely_mapped_percent",
        "slug": "mapped_reads_percent",
        "type": "float64",
        "agg_func": "mean",
    },
    {
        "name": "STAR_mqc-generalstats-star-uniquely_mapped",
        "slug": "mapped_reads",
        "type": "Int64",
        "agg_func": "sum",
    },
    {
        "name": "STAR (Globin)_mqc-generalstats-star_globin-uniquely_mapped_percent",
        "slug": "mapped_reads_percent_globin",
        "type": "float64",
        "agg_func": "mean",
    },
    {
        "name": "STAR (Globin)_mqc-generalstats-star_globin-uniquely_mapped",
        "slug": "mapped_reads_globin",
        "type": "Int64",
        "agg_func": "sum",
    },
    {
        "name": "STAR (rRNA)_mqc-generalstats-star_rrna-uniquely_mapped_percent",
        "slug": "mapped_reads_percent_rRNA",
        "type": "float64",
        "agg_func": "mean",
    },
    {
        "name": "STAR (rRNA)_mqc-generalstats-star_rrna-uniquely_mapped",
        "slug": "mapped_reads_rRNA",
        "type": "Int64",
        "agg_func": "sum",
    },
    {
        "name": "featureCounts_mqc-generalstats-featurecounts-percent_assigned",
        "slug": "fc_assigned_reads_percent",
        "type": "float64",
        "agg_func": "mean",
    },
    {
        "name": "featureCounts_mqc-generalstats-featurecounts-Assigned",
        "slug": "fc_assigned_reads",
        "type": "Int64",
        "agg_func": "sum",
    },
    {
        "name": "STAR quantification_mqc-generalstats-star_quantification-of_assigned_reads",
        "slug": "star_assigned_reads_percent",
        "type": "float64",
        "agg_func": "mean",
    },
    {
        "name": "STAR quantification_mqc-generalstats-star_quantification-Assigned_reads",
        "slug": "star_assigned_reads",
        "type": "int64",
        "agg_func": "sum",
    },
    {
        "name": "Salmon_mqc-generalstats-salmon-percent_mapped",
        "slug": "salmon_assigned_reads_percent",
        "type": "float64",
        "agg_func": "mean",
    },
    {
        "name": "Salmon_mqc-generalstats-salmon-num_mapped",
        "slug": "salmon_assigned_reads",
        "type": "Int64",
        "agg_func": "sum",
    },
    {
        "name": "QoRTs_mqc-generalstats-qorts-Genes_PercentWithNonzeroCounts",
        "slug": "nonzero_count_features_percent",
        "type": "float64",
        "agg_func": "mean",
    },
    {
        "name": "QoRTs_mqc-generalstats-qorts-NumberOfChromosomesCovered",
        "slug": "contigs_covered",
        "type": "Int64",
        "agg_func": "mean",
    },
    {
        "name": "RNA-SeQC_mqc-generalstats-rna_seqc-Expression_Profiling_Efficiency",
        "slug": "profiling_efficiency",
        "type": "float64",
        "agg_func": "mean",
    },
    {
        "name": "RNA-SeQC_mqc-generalstats-rna_seqc-Genes_Detected",
        "slug": "genes_detected",
        "type": "Int64",
        "agg_func": "mean",
    },
    {
        "slug": "strandedness_code",
        "type": "string",
    },
    {
        "slug": "genome_build",
        "type": "string",
    },
]

MQC_COVERAGE_COLUMNS = [
    {
        "name": "Genes used in 3' bias",
        "slug": "num_genes_three_prime_bias",
        "type": "Int64",
        "agg_func": "mean",
    },
    {
        "name": "Mean 3' bias",
        "slug": "mean_three_prime_bias",
        "type": "float64",
        "agg_func": "mean",
    },
]


def general_multiqc_parser(file_object, name, column_names):
    """General parser for MultiQC files."""
    df = pd.read_csv(file_object, sep="\t", index_col=0)

    # Keep only specified columns:
    df = df[
        [
            column.get("name", "")
            for column in column_names
            if column.get("name", "") in df.columns
        ]
    ]
    # Rename
    df = df.rename(
        columns={
            column.get("name", ""): column["slug"]
            for column in column_names
            if column.get("name", "") in df.columns
        }
    )

    # Perform aggregation
    series = df.agg(
        {
            column["slug"]: column["agg_func"]
            for column in column_names
            if column["slug"] in df.columns
        }
    )
    series.name = name
    return series


def multiqc_general_stats_parser(file_object, name):
    """Parse "multiqc_general_stats.txt" file."""
    return general_multiqc_parser(file_object, name, MQC_GENERAL_COLUMNS)


def multiqc_coverage_parser(file_object, name):
    """Parse "multiqc_rna-seqc_coverage_stats.txt" file."""
    return general_multiqc_parser(file_object, name, MQC_COVERAGE_COLUMNS)


def multiqc_strand_parser(file_object, name):
    """Parse "multiqc_library_strandedness.txt" file."""
    df = pd.read_csv(file_object, sep="\t", index_col=0)
    return pd.Series(
        df["Strandedness code"].iloc[0],
        index=["strandedness_code"],
        name=name,
    )


def multiqc_build_parser(file_object, name):
    """Parse "multiqc_sample_info.txt" file."""
    df = pd.read_csv(file_object, sep="\t", index_col=0)
    return pd.Series(
        df["Genome Build"].iloc[0],
        index=["genome_build"],
        name=name,
    )


[docs]class RNATables(BaseTables): """A helper class to fetch collection's expression and meta data. This class enables fetching given collection's data and returning it as tables which have samples in rows and expressions/metadata in columns. When calling :attr:`RNATables.exp`, :attr:`RNATables.rc` and :attr:`RNATables.meta` for the first time the corresponding data gets downloaded from the server. This data than gets cached in memory and on disc and is used in consequent calls. If the data on the server changes the updated version gets re-downloaded. A simple example: .. code-block:: python # Get Collection object collection = res.collection.get("collection-slug") # Fetch collection expressions and metadata tables = RNATables(collection) exp = tables.exp rc = tables.rc meta = tables.meta """ process_type = "data:expression:" EXP = "exp" RC = "rc" data_type_to_field_name = { EXP: "exp", RC: "rc", }
[docs] def __init__( self, collection: Collection, cache_dir: Optional[str] = None, progress_callable: Optional[Callable] = None, expression_source: Optional[str] = None, expression_process_slug: Optional[str] = None, ): """Initialize class. :param collection: collection to use :param cache_dir: cache directory location, if not specified system specific cache directory is used :param progress_callable: custom callable that can be used to report progress. By default, progress is written to stderr with tqdm :param expression_source: Only consider samples in the collection with specified source :param expression_process_slug: Only consider samples in the collection with specified process slug """ super().__init__(collection, cache_dir, progress_callable) self.expression_source = expression_source self.expression_process_slug = expression_process_slug self.check_heterogeneous_collections() self.gene_ids = [] # type: List[str]
[docs] def check_heterogeneous_collections(self): """Ensure consistency among expressions.""" message = "" process_slugs = sorted({d.process.slug for d in self._data}) if len(process_slugs) > 1: message += ( "Expressions of all samples must be computed with the " "same process. Expressions of samples in collection " f"{self.collection.name} have been computed with " f"{', '.join(process_slugs)}.\n" "Use expression_process_slug filter in the " "RNATable constructor.\n" ) exp_sources = {d.output["source"] for d in self._data} if len(exp_sources) > 1: message += ( "Alignment of all samples must be computed with the " "same genome source. Alignments of samples in " f"collection {self.collection.name} have been computed " f"with {', '.join(exp_sources)}.\n" "Use expression_source filter in the RNATable " "constructor.\n" ) if message: raise ValueError(message)
@property @lru_cache() def exp(self) -> pd.DataFrame: """Return expressions table as a pandas DataFrame object. Which type of expressions (TPM, CPM, FPKM, ...) get returned depends on how the data was processed. The expression type can be checked in the returned table attribute `attrs['exp_type']`: .. code-block:: python exp = tables.exp print(exp.attrs['exp_type']) :return: table of expressions """ exp = self._load_fetch(self.EXP) self.gene_ids = exp.columns.tolist() return exp @property @lru_cache() def rc(self) -> pd.DataFrame: """Return expression counts table as a pandas DataFrame object. :return: table of counts """ rc = self._load_fetch(self.RC) self.gene_ids = rc.columns.tolist() return rc @property @lru_cache() def readable_columns(self) -> Dict[str, str]: """Map of source gene ids to symbols. This also gets fetched only once and then cached in memory and on disc. :attr:`RNATables.exp` or :attr:`RNATables.rc` must be called before this as the mapping is specific to just this data. Its intended use is to rename table column labels from gene ids to symbols. Example of use: .. code-block:: python exp = exp.rename(columns=tables.readable_columns) :return: dict with gene ids as keys and gene symbols as values """ species = self._data[0].output["species"] source = self._data[0].output["source"] if not self.gene_ids: raise ValueError("Expression data must be used before!") mapping = self._mapping(self.gene_ids, source, species) if len(mapping) < len(self.gene_ids): missing = list(set(self.gene_ids) - set(mapping)) missing_str = ( "(" + ", ".join(missing[:5]) + (", ...)" if len(missing) > 5 else ")") ) warnings.warn( f"Symbols for {len(missing)} gene IDs were not found. ({missing_str})" "Missing symbols will be set to empty string.", UserWarning, ) mapping = {id_: mapping.get(id_, np.nan) for id_ in self.gene_ids} return mapping @property @lru_cache() def build(self) -> str: """Get build.""" builds = Counter([d.output.get("build") for d in self._data]) if len(builds) == 0: raise ValueError("Cannot determine build, no data found.") elif len(builds) > 1: builds_str = ", ".join(k for k in builds.keys()) warnings.warn( f"Cannot determine build, multiple builds found: {builds_str}." ) # Return the only / most common build return builds.most_common(1)[0][0] @property @lru_cache() def _data(self) -> List[Data]: """Fetch data objects. Fetch expression data objects from given collection and cache the results in memory. If ``expression_source`` / ``expression_process_slug`` is provided also filter for that. Only the needed subset of fields is fetched. :return: list of Data objects """ data = super()._data if self.expression_process_slug: data = [d for d in data if d.process.slug == self.expression_process_slug] if self.expression_source: data = [d for d in data if d.output["source"] == self.expression_source] return data def _cache_file(self, data_type: str) -> str: """Return full cache file path.""" if data_type == self.META: version = self._metadata_version elif data_type == self.QC: version = self._qc_version else: version = self._data_version cache_file = f"{self.collection.slug}_{data_type}_{self.expression_source}_{self.expression_process_slug}_{version}.pickle" return os.path.join(self.cache_dir, cache_file) def _download_qc(self) -> pd.DataFrame: """Download sample QC data and transform into table. QC info can be sourced from multiple files and multiple columns in those files. """ mqc_db = {} for mqc in self.collection.data.filter( type="data:multiqc", status="OK", ordering="created", entity__isnull=False, fields=["id", "entity__id"], ): if mqc.sample.id in mqc_db: warnings.warn( f"Sample with ID={mqc.sample.id} has multiple MultiQC Data. " "Using the latest one.", UserWarning, ) mqc_db[mqc.sample.id] = { "mqc_id": mqc.id, "uri_general": f"{mqc.id}/multiqc_data/multiqc_general_stats.txt", "uri_strand": f"{mqc.id}/multiqc_data/multiqc_library_strandedness.txt", "uri_build": f"{mqc.id}/multiqc_data/multiqc_sample_info.txt", "uri_coverage": f"{mqc.id}/multiqc_data/multiqc_rna-seqc_coverage_stats.txt", } df = pd.DataFrame(index=[sample.id for sample in self._samples]) parsers = { "uri_general": multiqc_general_stats_parser, "uri_strand": multiqc_strand_parser, "uri_build": multiqc_build_parser, "uri_coverage": multiqc_coverage_parser, } for type_, parser in parsers.items(): uris = [item[type_] for item in mqc_db.values()] qc = batch_download(self.resolwe, uris, parser) qc = qc.rename(index={value[type_]: key for key, value in mqc_db.items()}) df = df.merge(qc, how="left", left_index=True, right_index=True) # Cast to correct dtype BUILD_COLUMN = [{"slug": "genome_build", "type": "category"}] STRANDEDNESS_COLUMN = [{"slug": "strandedness_code", "type": "category"}] column_types = { c["slug"]: c["type"] for c in MQC_GENERAL_COLUMNS + MQC_COVERAGE_COLUMNS + BUILD_COLUMN + STRANDEDNESS_COLUMN if c["slug"] in df.columns } df = df[column_types.keys()].astype(column_types) return df def _parse_file(self, file_obj, sample_id, data_type): """Parse file object and return a one DataFrame line.""" sample_data = pd.read_csv(file_obj, sep="\t", compression="gzip") sample_data = sample_data.set_index("Gene")["Expression"] sample_data.name = sample_id # Optimize memory usage, float32 and int32 will suffice. sample_data = sample_data.astype("int32" if data_type == self.RC else "float32") return sample_data async def _download_data(self, data_type: str) -> pd.DataFrame: df = await super()._download_data(data_type) source = self._data[0].output["source"] df.columns.name = source.capitalize() if source == "ENSEMBL" else source df.attrs["exp_type"] = ( "rc" if data_type == self.RC else self._data[0].output["exp_type"] ) try: df.attrs["build"] = self.build except ValueError: # In rare cases, it can happen that Collection has Data with # different builds but same source. In such case, just skip # assigning the build. pass return df def _mapping(self, ids: List[str], source: str, species: str) -> Dict[str, str]: """Fetch and cache gene mapping.""" mapping_cache = os.path.join(self.cache_dir, f"{source}_{species}.pickle") mapping = load_pickle(mapping_cache) if mapping is None: mapping = {} # download only the genes that are not in cache diff = list(set(ids) - set(mapping.keys())) if diff: diff_mapping = self._download_mapping(diff, source, species) mapping.update(diff_mapping) save_pickle(mapping, mapping_cache, override=True) return mapping def _download_mapping( self, ids: List[str], source: str, species: str ) -> Dict[str, str]: """Download gene mapping.""" sublists = [ids[i : i + CHUNK_SIZE] for i in range(0, len(ids), CHUNK_SIZE)] mapping = {} for sublist in self.tqdm( sublists, desc="Downloading gene mapping", ncols=100, file=open(os.devnull, "w") if self.progress_callable else None, callable=self.progress_callable, ): features = self.resolwe.feature.filter( source=source, species=species, feature_id__in=sublist ) mapping.update({f.feature_id: f.name for f in features}) return mapping