Source code for resdk.resources.metadata
"""Collection Metadata resource."""
import logging
import os
import tempfile
import warnings
from io import BytesIO
from urllib.parse import urljoin
import pandas as pd
from .data import Data
from .utils import get_collection_id
[docs]class Metadata(Data):
"""Metadata resource.
:param resolwe: Resolwe instance
:type resolwe: Resolwe object
:param model_data: Resource model data
"""
sample_identifier_columns = {
"Sample ID": "id",
"ms#Sample ID": "id",
"Sample slug": "slug",
"ms#Sample slug": "slug",
"Sample name": "name",
"ms#Sample name": "name",
}
def __init__(self, resolwe, **model_data):
"""Initialize attributes."""
self.logger = logging.getLogger(__name__)
self._df_bytes = None
self._df = model_data.pop("df", None)
super().__init__(resolwe, **model_data)
if self.id is None:
# Set unique (=set self.process) only if Metadata is not yet uploaded
self.unique = model_data.get("unique", True)
@property
def unique(self):
"""Get unique attribute.
This attribute tells if Metadata has one-to-one or one-to-many
relation to collection samples.
"""
if self.id or self.process:
return self.process.slug == "upload-metadata-unique"
# If no info, consider this true by default
return True
@unique.setter
def unique(self, value):
if self.id:
raise ValueError(
"Setting unique attribute on already uploaded Metadata is not allowed!"
)
if not isinstance(value, bool):
raise ValueError("Attribute unique can only have True / False value")
# In practice value of unique property is just a proxy for process
# Therefore, store process instead of unique
slug = "upload-metadata-unique" if value else "upload-metadata"
self.process = self.resolwe.process.get(slug=slug, ordering="-created", limit=1)
@property
def df_bytes(self):
"""Get file contents of table output in bytes form."""
if self._df_bytes is None:
if not (self.id and "table" in self.output):
raise ValueError(
"Cannot get df bytes if there is no table in output fields..."
)
url = urljoin(
self.resolwe.url, f"data/{self.id}/{self.output['table']['file']}"
)
response = self.resolwe.session.get(url, auth=self.resolwe.auth)
response.raise_for_status()
self._df_bytes = BytesIO(response.content)
self._df_bytes.seek(0)
return self._df_bytes
[docs] def set_index(self, df):
"""Set index of df to Sample ID.
If there is a column with ``Sample ID`` just set that as index. If there is
``Sample name`` or ``Sample slug`` column, map sample name / slug to sample ID's
and set ID's as an index. If no suitable column in there, raise an error.
Works also if any of the above options is already an index with appropriate name.
"""
for match_column in self.sample_identifier_columns:
if match_column in df.columns:
break
if match_column == df.index.name:
# Add new column with index name
df[match_column] = df.index
break
else:
options = ", ".join(self.sample_identifier_columns)
raise ValueError(
f"There should be a column in df with one of the following names: {options}"
)
if match_column in ["Sample ID", "ms#Sample ID"]:
# Just set this as index and return
return df.set_index(match_column)
# Sample identifiers from df
df_samples = df[match_column].astype(str)
# Sample identifiers from collection
attr = self.sample_identifier_columns[match_column]
col_samples = self.collection.samples.filter(fields=["id", attr])
# Map to Sample IDs
mapping = {getattr(s, attr): s.id for s in col_samples}
df["Sample ID"] = [mapping.get(s, None) for s in df_samples]
# Remove the samples that do not have mapping
df = df.dropna(subset=["Sample ID"])
return df.set_index("Sample ID")
[docs] def validate_df(self, df):
"""Validate df property.
Validates that df:
- is an instance of pandas.DataFrame
- index contains sample IDs that match some samples:
- If not matches, raise warning
- If there are samples in df but not in collection, raise warning
- If there are samples in collection but not in df, raise warning
"""
if not isinstance(df, pd.DataFrame):
raise ValueError("Attribute df must be a pandas.DataFrame object.")
df_samples = set(df.index)
# Sample IDs from collection
col_samples = {s.id for s in self.collection.samples.filter(fields=["id"])}
intersection = df_samples & col_samples
if not intersection:
warnings.warn(
"No intersection between samples in df and samples in collection."
)
not_in_col = df_samples - col_samples
if not_in_col:
missing = ", ".join(list(map(str, not_in_col))[:5]) + (
"..." if len(not_in_col) > 5 else ""
)
warnings.warn(
f"There are {len(not_in_col)} samples in df that are not in collection: {missing}"
)
not_in_df = col_samples - df_samples
if not_in_df:
missing = ", ".join(list(map(str, not_in_df))[:5]) + (
"..." if len(not_in_df) > 5 else ""
)
warnings.warn(
f"There are {len(not_in_df)} samples in collection that are not in df: {missing}"
)
[docs] def get_df(self, parser=None, **kwargs):
"""Get table as pd.DataFrame."""
# Do not use cached value if parser is specified
if self._df is None or parser is not None:
if self.id is None:
return None
if not self.output or "table" not in self.output:
raise ValueError('Cannot parse, no output with name "table".')
# Enable parsing the byte stream with arbitrary parser, not just pandas
# Otherwise try to guess the parser based on file extension
basename = self.output["table"]["file"]
if parser is None:
if basename.endswith("xls"):
parser = pd.read_excel
kwargs = dict(engine="xlrd")
elif basename.endswith("xlsx"):
parser = pd.read_excel
kwargs = dict(engine="openpyxl")
elif any(basename.endswith(ext) for ext in ["tab", "tsv"]):
parser = pd.read_csv
kwargs = dict(
sep="\t", low_memory=False, float_precision="round_trip"
)
else:
parser = pd.read_csv
kwargs = dict(low_memory=False, float_precision="round_trip")
df = parser(self.df_bytes, **kwargs)
df = self.set_index(df)
self.validate_df(df)
self._df = df
return self._df
[docs] def set_df(self, value):
"""Set df."""
if self.id:
raise ValueError(
"Setting df attribute on already uploaded Metadata is not allowed."
)
if not self.collection:
# Validation is not possible without collection
raise ValueError(
"Setting df attribute before setting collection is not allowed."
)
self.validate_df(value)
self._df = value
df = property(get_df, set_df)
[docs] def save(self):
"""Save Metadata to the server.
If Metadata is already uploaded: update. Otherwise, create new one.
"""
if self.id:
super().save()
else:
if not self.collection:
raise ValueError("Collection must be set before saving.")
if self.df is None or self.df.empty:
raise ValueError("Attribute df must be set before saving.")
# All resdk machinery for uploading files works with real
# files on the system. Ideally we would support "file upload"
# from a stream, but for now, let's use a tempfile solution.
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_file = os.path.join(tmp_dir, self.name or "metadata.csv")
self.df.to_csv(tmp_file)
inputs = self.resolwe._process_inputs({"src": tmp_file}, self.process)
# On context manager exit, tmp_dir and it's contents are removed
data = {
"process": {"slug": self.process.slug},
"input": inputs,
"collection": {"id": get_collection_id(self.collection)},
"tags": self.collection.tags,
}
if self.name:
data["name"] = self.name
model_data = self.api.post(data)
self._update_fields(model_data)
def __repr__(self):
"""
Format name.
To ease distinction between 1-1 / 1-n Metadata, provide also
process slug.
"""
return "{} <id: {}, slug: '{}', name: '{}', process slug: '{}'>".format(
self.__class__.__name__,
self.id,
self.slug,
self.name,
getattr(self.process, "slug", None),
)