Skip to content
This repository was archived by the owner on Jan 26, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 79 additions & 21 deletions deeprank/models/metrics.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import lzma
import os
import csv
from typing import List, Tuple, Any
from typing import List, Tuple, Any, Optional
from math import sqrt

from torch import argmax, tensor
from torch.nn.functional import cross_entropy
from torch.utils.tensorboard import SummaryWriter
from sklearn.metrics import roc_auc_score

from deeprank.models.variant import VariantClass
from deeprank.tools.metrics import get_labels_from_output, get_labels_from_targets


class MetricsExporter:
"The class implements an object, to be called when a neural network generates output"
Expand Down Expand Up @@ -49,12 +52,19 @@ def process(self, pass_name: str, epoch_number: int,
metrics_exporter.process(pass_name, epoch_number, entry_names, output_values, target_values)


class TensorboardBinaryClassificationExporter(MetricsExporter):
"exports to tensorboard, works for binary classification only"
class TensorboardVariantClassificationExporter(MetricsExporter):
"exports to tensorboard, works for variant classification only"

def __init__(self, directory_path: str, unknown_treshold: Optional[float] = 0.5):
"""
Args:
directory_path: where to store tensorboard files
unknown_treshold: if the absolute difference between the class probabilities is below this value, give the class label UNKNOWN
"""

def __init__(self, directory_path):
self._directory_path = directory_path
self._writer = SummaryWriter(log_dir=directory_path)
self._unknown_treshold = unknown_treshold

def __enter__(self):
self._writer.__enter__()
Expand All @@ -67,46 +77,60 @@ def process(self, pass_name: str, epoch_number: int,
entry_names: List[str], output_values: List[Any], target_values: List[Any]):
"write to tensorboard"

loss = cross_entropy(tensor(output_values), tensor(target_values))
output_tensor = tensor(output_values)
target_tensor = tensor(target_values)

loss = cross_entropy(output_tensor, target_tensor)
self._writer.add_scalar(f"{pass_name} loss", loss, epoch_number)

probabilities = []
# lists of VariantClass values
prediction_labels = get_labels_from_output(output_tensor, unknown_treshold=self._unknown_treshold)
target_labels = get_labels_from_targets(target_tensor)

roc_probabilities = [] # floating point values
roc_targets = [] # list of 0/1 values

fp, fn, tp, tn = 0, 0, 0, 0
for entry_index, entry_name in enumerate(entry_names):
probability = output_values[entry_index][1]
probabilities.append(probability)
prediction_label = prediction_labels[entry_index]
target_label = target_labels[entry_index]

prediction_value = argmax(tensor(output_values[entry_index]))
target_value = target_values[entry_index]

if prediction_value > 0.0 and target_value > 0.0:
if prediction_label == VariantClass.PATHOGENIC and target_label == VariantClass.PATHOGENIC:
tp += 1

elif prediction_value <= 0.0 and target_value <= 0.0:
elif prediction_label == VariantClass.BENIGN and target_label == VariantClass.BENIGN:
tn += 1

elif prediction_value > 0.0 and target_value <= 0.0:
elif prediction_label == VariantClass.PATHOGENIC and target_label == VariantClass.BENIGN:
fp += 1

elif prediction_value <= 0.0 and target_value > 0.0:
elif prediction_label == VariantClass.BENIGN and target_label == VariantClass.PATHOGENIC:
fn += 1

if prediction_label != VariantClass.UNKNOWN:
roc_probabilities.append(output_values[entry_index][1])
roc_targets.append(target_values[entry_index])

# Furthermore, UNKNOWN variants are completely ignored..

mcc_numerator = tn * tp - fp * fn
if mcc_numerator == 0.0:
self._writer.add_scalar(f"{pass_name} MCC", 0.0, epoch_number)
self._writer.add_scalar(f"{pass_name} MCC", 0.0, epoch_number)
else:
mcc_denominator = sqrt((tn + fn) * (fp + tp) * (tn + fp) * (fn + tp))

if mcc_denominator != 0.0:
mcc = mcc_numerator / mcc_denominator
self._writer.add_scalar(f"{pass_name} MCC", mcc, epoch_number)

accuracy = (tp + tn) / (tp + tn + fp + fn)
self._writer.add_scalar(f"{pass_name} accuracy", accuracy, epoch_number)
accuracy_denominator = tp + tn + fp + fn
if accuracy_denominator > 0:
accuracy = (tp + tn) / accuracy_denominator
self._writer.add_scalar(f"{pass_name} accuracy", accuracy, epoch_number)

# for ROC curves to work, we need both class values in the set
if len(set(target_values)) == 2:
roc_auc = roc_auc_score(target_values, probabilities)
# for ROC curves to work, we need both class values in the target set
if len(set(roc_targets)) >= 2:
roc_auc = roc_auc_score(roc_targets, roc_probabilities)
self._writer.add_scalar(f"{pass_name} ROC AUC", roc_auc, epoch_number)


Expand Down Expand Up @@ -139,3 +163,37 @@ def process(self, pass_name: str, epoch_number: int,
target_value = target_values[entry_index]

w.writerow([entry_name, str(output_value), str(target_value)])


class LabelExporter(MetricsExporter):
"writes all labels to a table"

def __init__(self, directory_path, unknown_treshold: Optional[float] = 0.5):
self._directory_path = directory_path
self._unknown_treshold = unknown_treshold

def get_filename(self, pass_name, epoch_number):
"returns the filename for the table"
return os.path.join(self._directory_path, f"labels-{pass_name}-epoch-{epoch_number}.csv.xz")

def process(self, pass_name: str, epoch_number: int,
entry_names: List[str], output_values: List[Any], target_values: List[Any]):
"write the output to the table"

# lists of VariantClass values
output_labels = get_labels_from_output(tensor(output_values), unknown_treshold=self._unknown_treshold)
target_labels = get_labels_from_targets(tensor(target_values))

if not os.path.isdir(self._directory_path):
os.mkdir(self._directory_path)

with lzma.open(self.get_filename(pass_name, epoch_number), 'wt') as f:
w = csv.writer(f)

w.writerow(["entry", "output_label", "target_label"])

for entry_index, entry_name in enumerate(entry_names):
output_name = output_labels[entry_index].name
target_name = target_labels[entry_index].name

w.writerow([entry_name, output_name, target_name])
1 change: 1 addition & 0 deletions deeprank/models/variant.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@


class VariantClass(Enum):
UNKNOWN = -1
BENIGN = 0
PATHOGENIC = 1

Expand Down
85 changes: 31 additions & 54 deletions deeprank/tools/metrics.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,51 @@
from math import sqrt
from typing import List, Optional

import torch

def get_tp_tn_fp_fn(output_data, target_data):
""" A classification metric
from deeprank.models.variant import VariantClass


def get_labels_from_output(output_data: torch.Tensor,
unknown_treshold: Optional[float] = 0.5) -> List[VariantClass]:
"""
Args:
output_data(array of dimension (x,2)): considered negative if left value > right value
target_data(array of dimension (x,1)): considered negative if 0, positive otherewise

Returns (four floats):
true positive count (tp)
true negative count (tn)
false positive count (fp)
false negative count (fn)
output_data: [x, 2] considered BENIGN if left value > right value and otherwise PATHOGENIC
unknown_treshold: if the absolute difference between the values is below this value, then consider the class UNKNOWN
"""

tp = 0
tn = 0
fp = 0
fn = 0

total = output_data.shape[0]
if total == 0:
raise ValueError("0 output data entries")

labels = []
for index in range(total):
output0, output1 = output_data[index,:]
target = target_data[index]

if output0 > output1: # negative output

if target != 0: # wrong

fn += 1

else: # right

tn += 1
if abs(output_data[index, 0] - output_data[index, 1]) < unknown_treshold:
label = VariantClass.UNKNOWN

else: # positive output
elif output_data[index, 0] < output_data[index, 1]:
label = VariantClass.PATHOGENIC

if target != 0: # right
else:
label = VariantClass.BENIGN

tp += 1
labels.append(label)

else: # wrong
return labels

fp += 1

return tp, tn, fp, fn


def get_mcc(tp, tn, fp, fn):
""" The Mathews Correlation Coefficient

def get_labels_from_targets(target_data: torch.Tensor) -> List[VariantClass]:
"""
Args:
tp (float): true positive count
tn (float): true negative count
fp (float): false positive count
fn (float): false negative count

Returns (float): Mathews Correlation Coefficient
target_data: [x, 1] where 0 means BENIGN and 1 means PATHOGENIC
"""

numerator = tp * tn - fp * fn
if numerator == 0:
return 0.0
total = target_data.shape[0]

labels = []
for index in range(total):
if target_data[index] > 0:
label = VariantClass.PATHOGENIC
else:
label = VariantClass.BENIGN

denominator = sqrt((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn))
if denominator == 0:
raise ValueError(f"MCC denominator is zero for tp={tp}, tn={tn}, fp={fp}, fn={fn}")
labels.append(label)

return numerator / denominator
return labels
7 changes: 3 additions & 4 deletions scripts/learn.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
from deeprank.learn.NeuralNet import NeuralNet
from deeprank.learn.DataSet import DataSet
from deeprank.learn.model3d import cnn_class
from deeprank.models.metrics import OutputExporter
from deeprank.models.metrics import TensorboardBinaryClassificationExporter
from deeprank.models.metrics import OutputExporter, LabelExporter, TensorboardVariantClassificationExporter


logging.basicConfig(filename="learn-%d.log" % os.getpid(), filemode="w", level=logging.INFO)
Expand Down Expand Up @@ -79,7 +78,7 @@ def interpret_args(args, usage):
run_directory = "run-{}".format(os.getpid())

neural_net = NeuralNet(dataset, cnn_class, model_type='3d',task='class', cuda=False,
metrics_exporters=[OutputExporter(run_directory),
TensorboardBinaryClassificationExporter(run_directory)])
metrics_exporters=[OutputExporter(run_directory), LabelExporter(run_directory),
TensorboardVariantClassificationExporter(run_directory)])
neural_net.optimizer = optim.AdamW(neural_net.net.parameters(), lr=0.001, weight_decay=0.005)
neural_net.train(nepoch = epoch_count, divide_trainset=None, train_batch_size = 5, num_workers=0)
18 changes: 16 additions & 2 deletions test/models/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import shutil
import os

from deeprank.models.metrics import TensorboardBinaryClassificationExporter, OutputExporter
from deeprank.models.metrics import TensorboardVariantClassificationExporter, OutputExporter, LabelExporter


test_entries = ["entry0", "entry1"]
Expand All @@ -14,7 +14,7 @@ def test_tensorboard_class_output():

tmp_dir_path = tempfile.mkdtemp()
try:
exporter = TensorboardBinaryClassificationExporter(tmp_dir_path)
exporter = TensorboardVariantClassificationExporter(tmp_dir_path)

with exporter:
exporter.process("unit-testing", 0, test_entries, test_outputs, test_targets)
Expand All @@ -36,3 +36,17 @@ def test_output():
assert len(os.listdir(tmp_dir_path)) > 0, "output directory is empty"
finally:
shutil.rmtree(tmp_dir_path)


def test_label():
tmp_dir_path = tempfile.mkdtemp()
try:

exporter = LabelExporter(tmp_dir_path)

with exporter:
exporter.process("unit-testing", 0, test_entries, test_outputs, test_targets)

assert len(os.listdir(tmp_dir_path)) > 0, "output directory is empty"
finally:
shutil.rmtree(tmp_dir_path)
4 changes: 2 additions & 2 deletions test/test_learn.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from deeprank.learn.model3d import cnn_class
from deeprank.models.environment import Environment
from deeprank.domain.amino_acid import *
from deeprank.models.metrics import OutputExporter, TensorboardBinaryClassificationExporter
from deeprank.models.metrics import OutputExporter, TensorboardVariantClassificationExporter
import deeprank.config


Expand Down Expand Up @@ -81,7 +81,7 @@ def test_learn():

neural_net = NeuralNet(dataset, cnn_class, model_type='3d',task='class',
cuda=False, metrics_exporters=[OutputExporter(metrics_directory),
TensorboardBinaryClassificationExporter(metrics_directory)])
TensorboardVariantClassificationExporter(metrics_directory)])

neural_net.optimizer = optim.SGD(neural_net.net.parameters(),
lr=0.001,
Expand Down
28 changes: 28 additions & 0 deletions test/tools/test_metrics_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import torch

from deeprank.models.variant import VariantClass
from deeprank.tools.metrics import get_labels_from_output, get_labels_from_targets


def test_labels_from_output():
data = torch.tensor([[0.1, -0.1],
[0.0, 1.1],
[1.2, -0.1]])

labels = get_labels_from_output(data)

assert labels == [VariantClass.UNKNOWN,
VariantClass.PATHOGENIC,
VariantClass.BENIGN], f"labels are {labels}"


def get_labels_from_targets():
data = torch.tensor([[0], [1], [1], [0], [0]])

labels = get_labels_from_targets(data)

assert labels == [VariantClass.BENIGN,
VariantClass.PATHOGENIC,
VariantClass.PATHOGENIC,
VariantClass.BENIGN,
VariantClass.BENIGN], f"labels are {labels}"