diff --git a/deeprank/models/metrics.py b/deeprank/models/metrics.py index 6574b61..6a21bb4 100644 --- a/deeprank/models/metrics.py +++ b/deeprank/models/metrics.py @@ -1,7 +1,7 @@ import lzma import os import csv -from typing import List, Tuple, Any +from typing import List, Tuple, Any, Optional from math import sqrt from torch import argmax, tensor @@ -9,6 +9,9 @@ from torch.utils.tensorboard import SummaryWriter from sklearn.metrics import roc_auc_score +from deeprank.models.variant import VariantClass +from deeprank.tools.metrics import get_labels_from_output, get_labels_from_targets + class MetricsExporter: "The class implements an object, to be called when a neural network generates output" @@ -49,12 +52,19 @@ def process(self, pass_name: str, epoch_number: int, metrics_exporter.process(pass_name, epoch_number, entry_names, output_values, target_values) -class TensorboardBinaryClassificationExporter(MetricsExporter): - "exports to tensorboard, works for binary classification only" +class TensorboardVariantClassificationExporter(MetricsExporter): + "exports to tensorboard, works for variant classification only" + + def __init__(self, directory_path: str, unknown_treshold: Optional[float] = 0.5): + """ + Args: + directory_path: where to store tensorboard files + unknown_treshold: if the absolute difference between the class probabilities is below this value, give the class label UNKNOWN + """ - def __init__(self, directory_path): self._directory_path = directory_path self._writer = SummaryWriter(log_dir=directory_path) + self._unknown_treshold = unknown_treshold def __enter__(self): self._writer.__enter__() @@ -67,33 +77,45 @@ def process(self, pass_name: str, epoch_number: int, entry_names: List[str], output_values: List[Any], target_values: List[Any]): "write to tensorboard" - loss = cross_entropy(tensor(output_values), tensor(target_values)) + output_tensor = tensor(output_values) + target_tensor = tensor(target_values) + + loss = cross_entropy(output_tensor, target_tensor) self._writer.add_scalar(f"{pass_name} loss", loss, epoch_number) - probabilities = [] + # lists of VariantClass values + prediction_labels = get_labels_from_output(output_tensor, unknown_treshold=self._unknown_treshold) + target_labels = get_labels_from_targets(target_tensor) + + roc_probabilities = [] # floating point values + roc_targets = [] # list of 0/1 values + fp, fn, tp, tn = 0, 0, 0, 0 for entry_index, entry_name in enumerate(entry_names): - probability = output_values[entry_index][1] - probabilities.append(probability) + prediction_label = prediction_labels[entry_index] + target_label = target_labels[entry_index] - prediction_value = argmax(tensor(output_values[entry_index])) - target_value = target_values[entry_index] - - if prediction_value > 0.0 and target_value > 0.0: + if prediction_label == VariantClass.PATHOGENIC and target_label == VariantClass.PATHOGENIC: tp += 1 - elif prediction_value <= 0.0 and target_value <= 0.0: + elif prediction_label == VariantClass.BENIGN and target_label == VariantClass.BENIGN: tn += 1 - elif prediction_value > 0.0 and target_value <= 0.0: + elif prediction_label == VariantClass.PATHOGENIC and target_label == VariantClass.BENIGN: fp += 1 - elif prediction_value <= 0.0 and target_value > 0.0: + elif prediction_label == VariantClass.BENIGN and target_label == VariantClass.PATHOGENIC: fn += 1 + if prediction_label != VariantClass.UNKNOWN: + roc_probabilities.append(output_values[entry_index][1]) + roc_targets.append(target_values[entry_index]) + + # Furthermore, UNKNOWN variants are completely ignored.. + mcc_numerator = tn * tp - fp * fn if mcc_numerator == 0.0: - self._writer.add_scalar(f"{pass_name} MCC", 0.0, epoch_number) + self._writer.add_scalar(f"{pass_name} MCC", 0.0, epoch_number) else: mcc_denominator = sqrt((tn + fn) * (fp + tp) * (tn + fp) * (fn + tp)) @@ -101,12 +123,14 @@ def process(self, pass_name: str, epoch_number: int, mcc = mcc_numerator / mcc_denominator self._writer.add_scalar(f"{pass_name} MCC", mcc, epoch_number) - accuracy = (tp + tn) / (tp + tn + fp + fn) - self._writer.add_scalar(f"{pass_name} accuracy", accuracy, epoch_number) + accuracy_denominator = tp + tn + fp + fn + if accuracy_denominator > 0: + accuracy = (tp + tn) / accuracy_denominator + self._writer.add_scalar(f"{pass_name} accuracy", accuracy, epoch_number) - # for ROC curves to work, we need both class values in the set - if len(set(target_values)) == 2: - roc_auc = roc_auc_score(target_values, probabilities) + # for ROC curves to work, we need both class values in the target set + if len(set(roc_targets)) >= 2: + roc_auc = roc_auc_score(roc_targets, roc_probabilities) self._writer.add_scalar(f"{pass_name} ROC AUC", roc_auc, epoch_number) @@ -139,3 +163,37 @@ def process(self, pass_name: str, epoch_number: int, target_value = target_values[entry_index] w.writerow([entry_name, str(output_value), str(target_value)]) + + +class LabelExporter(MetricsExporter): + "writes all labels to a table" + + def __init__(self, directory_path, unknown_treshold: Optional[float] = 0.5): + self._directory_path = directory_path + self._unknown_treshold = unknown_treshold + + def get_filename(self, pass_name, epoch_number): + "returns the filename for the table" + return os.path.join(self._directory_path, f"labels-{pass_name}-epoch-{epoch_number}.csv.xz") + + def process(self, pass_name: str, epoch_number: int, + entry_names: List[str], output_values: List[Any], target_values: List[Any]): + "write the output to the table" + + # lists of VariantClass values + output_labels = get_labels_from_output(tensor(output_values), unknown_treshold=self._unknown_treshold) + target_labels = get_labels_from_targets(tensor(target_values)) + + if not os.path.isdir(self._directory_path): + os.mkdir(self._directory_path) + + with lzma.open(self.get_filename(pass_name, epoch_number), 'wt') as f: + w = csv.writer(f) + + w.writerow(["entry", "output_label", "target_label"]) + + for entry_index, entry_name in enumerate(entry_names): + output_name = output_labels[entry_index].name + target_name = target_labels[entry_index].name + + w.writerow([entry_name, output_name, target_name]) diff --git a/deeprank/models/variant.py b/deeprank/models/variant.py index ee5839a..fbe127b 100644 --- a/deeprank/models/variant.py +++ b/deeprank/models/variant.py @@ -5,6 +5,7 @@ class VariantClass(Enum): + UNKNOWN = -1 BENIGN = 0 PATHOGENIC = 1 diff --git a/deeprank/tools/metrics.py b/deeprank/tools/metrics.py index 5a4968d..3c81426 100644 --- a/deeprank/tools/metrics.py +++ b/deeprank/tools/metrics.py @@ -1,74 +1,51 @@ -from math import sqrt +from typing import List, Optional +import torch -def get_tp_tn_fp_fn(output_data, target_data): - """ A classification metric +from deeprank.models.variant import VariantClass + +def get_labels_from_output(output_data: torch.Tensor, + unknown_treshold: Optional[float] = 0.5) -> List[VariantClass]: + """ Args: - output_data(array of dimension (x,2)): considered negative if left value > right value - target_data(array of dimension (x,1)): considered negative if 0, positive otherewise - - Returns (four floats): - true positive count (tp) - true negative count (tn) - false positive count (fp) - false negative count (fn) + output_data: [x, 2] considered BENIGN if left value > right value and otherwise PATHOGENIC + unknown_treshold: if the absolute difference between the values is below this value, then consider the class UNKNOWN """ - tp = 0 - tn = 0 - fp = 0 - fn = 0 - total = output_data.shape[0] - if total == 0: - raise ValueError("0 output data entries") + labels = [] for index in range(total): - output0, output1 = output_data[index,:] - target = target_data[index] - - if output0 > output1: # negative output - - if target != 0: # wrong - - fn += 1 - - else: # right - - tn += 1 + if abs(output_data[index, 0] - output_data[index, 1]) < unknown_treshold: + label = VariantClass.UNKNOWN - else: # positive output + elif output_data[index, 0] < output_data[index, 1]: + label = VariantClass.PATHOGENIC - if target != 0: # right + else: + label = VariantClass.BENIGN - tp += 1 + labels.append(label) - else: # wrong + return labels - fp += 1 - - return tp, tn, fp, fn - - -def get_mcc(tp, tn, fp, fn): - """ The Mathews Correlation Coefficient +def get_labels_from_targets(target_data: torch.Tensor) -> List[VariantClass]: + """ Args: - tp (float): true positive count - tn (float): true negative count - fp (float): false positive count - fn (float): false negative count - - Returns (float): Mathews Correlation Coefficient + target_data: [x, 1] where 0 means BENIGN and 1 means PATHOGENIC """ - numerator = tp * tn - fp * fn - if numerator == 0: - return 0.0 + total = target_data.shape[0] + + labels = [] + for index in range(total): + if target_data[index] > 0: + label = VariantClass.PATHOGENIC + else: + label = VariantClass.BENIGN - denominator = sqrt((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn)) - if denominator == 0: - raise ValueError(f"MCC denominator is zero for tp={tp}, tn={tn}, fp={fp}, fn={fn}") + labels.append(label) - return numerator / denominator + return labels diff --git a/scripts/learn.py b/scripts/learn.py index 3daea7c..c934328 100755 --- a/scripts/learn.py +++ b/scripts/learn.py @@ -13,8 +13,7 @@ from deeprank.learn.NeuralNet import NeuralNet from deeprank.learn.DataSet import DataSet from deeprank.learn.model3d import cnn_class -from deeprank.models.metrics import OutputExporter -from deeprank.models.metrics import TensorboardBinaryClassificationExporter +from deeprank.models.metrics import OutputExporter, LabelExporter, TensorboardVariantClassificationExporter logging.basicConfig(filename="learn-%d.log" % os.getpid(), filemode="w", level=logging.INFO) @@ -79,7 +78,7 @@ def interpret_args(args, usage): run_directory = "run-{}".format(os.getpid()) neural_net = NeuralNet(dataset, cnn_class, model_type='3d',task='class', cuda=False, - metrics_exporters=[OutputExporter(run_directory), - TensorboardBinaryClassificationExporter(run_directory)]) + metrics_exporters=[OutputExporter(run_directory), LabelExporter(run_directory), + TensorboardVariantClassificationExporter(run_directory)]) neural_net.optimizer = optim.AdamW(neural_net.net.parameters(), lr=0.001, weight_decay=0.005) neural_net.train(nepoch = epoch_count, divide_trainset=None, train_batch_size = 5, num_workers=0) diff --git a/test/models/test_metrics.py b/test/models/test_metrics.py index a311a7f..f0a957e 100644 --- a/test/models/test_metrics.py +++ b/test/models/test_metrics.py @@ -2,7 +2,7 @@ import shutil import os -from deeprank.models.metrics import TensorboardBinaryClassificationExporter, OutputExporter +from deeprank.models.metrics import TensorboardVariantClassificationExporter, OutputExporter, LabelExporter test_entries = ["entry0", "entry1"] @@ -14,7 +14,7 @@ def test_tensorboard_class_output(): tmp_dir_path = tempfile.mkdtemp() try: - exporter = TensorboardBinaryClassificationExporter(tmp_dir_path) + exporter = TensorboardVariantClassificationExporter(tmp_dir_path) with exporter: exporter.process("unit-testing", 0, test_entries, test_outputs, test_targets) @@ -36,3 +36,17 @@ def test_output(): assert len(os.listdir(tmp_dir_path)) > 0, "output directory is empty" finally: shutil.rmtree(tmp_dir_path) + + +def test_label(): + tmp_dir_path = tempfile.mkdtemp() + try: + + exporter = LabelExporter(tmp_dir_path) + + with exporter: + exporter.process("unit-testing", 0, test_entries, test_outputs, test_targets) + + assert len(os.listdir(tmp_dir_path)) > 0, "output directory is empty" + finally: + shutil.rmtree(tmp_dir_path) diff --git a/test/test_learn.py b/test/test_learn.py index 6dbd776..44c5b27 100644 --- a/test/test_learn.py +++ b/test/test_learn.py @@ -14,7 +14,7 @@ from deeprank.learn.model3d import cnn_class from deeprank.models.environment import Environment from deeprank.domain.amino_acid import * -from deeprank.models.metrics import OutputExporter, TensorboardBinaryClassificationExporter +from deeprank.models.metrics import OutputExporter, TensorboardVariantClassificationExporter import deeprank.config @@ -81,7 +81,7 @@ def test_learn(): neural_net = NeuralNet(dataset, cnn_class, model_type='3d',task='class', cuda=False, metrics_exporters=[OutputExporter(metrics_directory), - TensorboardBinaryClassificationExporter(metrics_directory)]) + TensorboardVariantClassificationExporter(metrics_directory)]) neural_net.optimizer = optim.SGD(neural_net.net.parameters(), lr=0.001, diff --git a/test/tools/test_metrics_tools.py b/test/tools/test_metrics_tools.py new file mode 100644 index 0000000..e4af5fc --- /dev/null +++ b/test/tools/test_metrics_tools.py @@ -0,0 +1,28 @@ +import torch + +from deeprank.models.variant import VariantClass +from deeprank.tools.metrics import get_labels_from_output, get_labels_from_targets + + +def test_labels_from_output(): + data = torch.tensor([[0.1, -0.1], + [0.0, 1.1], + [1.2, -0.1]]) + + labels = get_labels_from_output(data) + + assert labels == [VariantClass.UNKNOWN, + VariantClass.PATHOGENIC, + VariantClass.BENIGN], f"labels are {labels}" + + +def get_labels_from_targets(): + data = torch.tensor([[0], [1], [1], [0], [0]]) + + labels = get_labels_from_targets(data) + + assert labels == [VariantClass.BENIGN, + VariantClass.PATHOGENIC, + VariantClass.PATHOGENIC, + VariantClass.BENIGN, + VariantClass.BENIGN], f"labels are {labels}"