Source code for pywicta.benchmark.assess

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright (c) 2016,2017,2018 Jérémie DECOCK (http://www.jdhp.org)

# This script is provided under the terms and conditions of the MIT license:
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

__all__ = ['normalize_array',
           'metric_mse',
           'metric_nrmse',
           'metric1',
           'metric2',
           'metric3',
           'metric4',
           'metric_ssim',
           'metric_psnr',
           'metric_delta_psi',
           'metric_hillas_delta',
           'metric_hillas_delta2',
           'metric_kill_isolated_pixels',
           'metric_clean_failure',
           'metric_roc',
           'assess_image_cleaning']

import astropy.units as u

import collections
import traceback
import sys
import warnings

import numpy as np
import math

from pywicta.image.hillas_parameters import get_hillas_parameters
from pywicta.io import geometry_converter

from pywi.processing.filtering.pixel_clusters import filter_pixels_clusters
from pywi.processing.filtering.pixel_clusters import filter_pixels_clusters_stats

from skimage.measure import compare_ssim as ssim
from skimage.measure import compare_psnr as psnr
from skimage.measure import compare_nrmse as nrmse

###############################################################################
# CONSTANTS                                                                   #
###############################################################################

FAILURE_SCORE = float('nan')                   # TODO: nan of inf ?

###############################################################################
# EXCEPTIONS                                                                  #
###############################################################################

class AssessError(Exception):
    pass

class EmptyOutputImageError(AssessError):
    """
    Exception raised when the output image only have null pixels (to prevent
    division by 0 in the assess function).
    """

    def __init__(self):
        super(EmptyOutputImageError, self).__init__("Empty output image error")

class EmptyReferenceImageError(AssessError):
    """
    Exception raised when the reference image only have null pixels (to prevent
    division by 0 in the assess function).
    """

    def __init__(self):
        super(EmptyReferenceImageError, self).__init__("Empty reference image error")


###############################################################################
# TOOL FUNCTIONS                                                              #
###############################################################################

[docs]def normalize_array(input_array): r"""Normalize the given image such that its pixels value fit between 0.0 and 1.0. It applies .. math:: \text{normalize}(\boldsymbol{S}) = \frac{ \boldsymbol{S} - \text{min}(\boldsymbol{S}) }{ \text{max}(\boldsymbol{S}) - \text{min}(\boldsymbol{S}) } where :math:`\boldsymbol{S}` is the input array (an image). Parameters ---------- image : Numpy array The image to normalize (whatever its shape) Returns ------- Numpy array The normalized version of the input image (keeping the same dimension and shape) """ # Copy and cast images to prevent tricky bugs # See https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.astype.html#numpy-ndarray-astype input_array = input_array.astype('float64', copy=True) min_value = np.nanmin(input_array) max_value = np.nanmax(input_array) output_array = (input_array - min_value) / float(max_value - min_value) return output_array
def norm_angle_diff(angle_in_degrees): """Normalize the difference of 2 angles in degree. This function is used to normalize the "delta psi" angle. """ return abs(((angle_in_degrees + 90) % 180) - 90.) ############################################################################### # METRIC FUNCTIONS # ############################################################################### # Mean-Squared Error (MSE) ####################################################
[docs]def metric_mse(input_img, output_image, reference_image, **kwargs): r"""Compute the score of ``output_image`` regarding ``reference_image`` with the *Mean-Squared Error* (MSE) metric. It applies .. math:: \text{MSE}(\hat{\boldsymbol{S}}, \boldsymbol{S}^*) = \left\langle \left( \hat{\boldsymbol{S}} - \boldsymbol{S}^* \right)^{\circ 2} \right\rangle with: - :math:`\hat{\boldsymbol{S}}` the algorithm's output image (i.e. the *cleaned* image); - :math:`\boldsymbol{S}^*` the reference image (i.e. the *clean* image); - :math:`\langle \boldsymbol{S} \rangle` the average of matrix :math:`\boldsymbol{S}`; - :math:`\boldsymbol{S}^{\circ 2}` the `Hadamar power <https://en.wikipedia.org/wiki/Hadamard_product_(matrices)#Analogous_operations>`_ (i.e. the element wise square) of matrix :math:`\boldsymbol{S}`. See http://scikit-image.org/docs/dev/api/skimage.measure.html#compare-mse for more information. Note ---- This function is not well-suited to high dynamic range images handled with this project (errors are correlated with energy levels). Parameters ---------- input_img: 2D ndarray The RAW original image. output_image: 2D ndarray The cleaned image returned by the image cleanning algorithm to assess. reference_image: 2D ndarray The actual clean image (the best result that can be expected for the image cleaning algorithm). kwargs: dict Additional options. Returns ------- float The score of the image cleaning algorithm for the given image. """ # Copy and cast images to prevent tricky bugs # See https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.astype.html#numpy-ndarray-astype output_image = output_image.astype('float64', copy=True) reference_image = reference_image.astype('float64', copy=True) score = np.nanmean(np.square(output_image - reference_image)) return float(score)
# Normalized Root Mean-Squared Error (NRMSE) ##################################
[docs]def metric_nrmse(input_img, output_image, reference_image, **kwargs): r"""Compute the score of ``output_image`` regarding ``reference_image`` with the *Normalized Root Mean-Squared Error* (NRMSE) metric. It applies .. math:: \text{NRMSE}(\hat{\boldsymbol{S}}, \boldsymbol{S}^*) = \frac{\sqrt{\text{MSE}}}{\sqrt{ \left\langle \hat{\boldsymbol{S}} \circ \boldsymbol{S}^* \right\rangle }} with: - :math:`\hat{\boldsymbol{S}}` the algorithm's output image (i.e. the *cleaned* image); - :math:`\boldsymbol{S}^*` the reference image (i.e. the *clean* image); - :math:`\langle \boldsymbol{S} \rangle` the average of matrix :math:`\boldsymbol{S}`; - :math:`\circ` the `Hadamar product <https://en.wikipedia.org/wiki/Hadamard_product_(matrices)>`_ (i.e. the element wise product operator). See http://scikit-image.org/docs/dev/api/skimage.measure.html#compare-nrmse and https://en.wikipedia.org/wiki/Root-mean-square_deviation for more information. Parameters ---------- input_img: 2D ndarray The RAW original image. output_image: 2D ndarray The cleaned image returned by the image cleanning algorithm to assess. reference_image: 2D ndarray The actual clean image (the best result that can be expected for the image cleaning algorithm). kwargs: dict Additional options. Returns ------- float The score of the image cleaning algorithm for the given image. """ # Copy and cast images to prevent tricky bugs # See https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.astype.html#numpy-ndarray-astype output_image = output_image.astype('float64', copy=True) reference_image = reference_image.astype('float64', copy=True) #if ('nrmse_normalize_type' in kwargs) and (kwargs['nrmse_normalize_type'].lower() == 'euclidian'): # denom = # TODO: see https://github.com/scikit-image/scikit-image/blob/master/skimage/measure/simple_metrics.py#L82 mse = metric_mse(input_img, output_image, reference_image, **kwargs) denom = np.sqrt(np.nanmean((reference_image * output_image), dtype=np.float64)) if denom == 0: score = FAILURE_SCORE else: score = float(np.sqrt(mse) / denom) return score
# Unusual Normalized Root Mean-Squared Error (uNRMSE) #########################
[docs]def metric1(input_img, output_image, reference_image, **kwargs): r"""Compute the score of ``output_image`` regarding ``reference_image`` with a (unusually) normalized version of the *Root Mean-Squared Error* (RMSE) metric. It applies .. math:: \text{uNRMSE}(\hat{\boldsymbol{S}}, \boldsymbol{S}^*) = \left\langle \left( \left( \hat{\boldsymbol{S}}_n - \boldsymbol{S}^*_n \right)^{\circ 2} \right)^{\circ \frac{1}{2}} \right\rangle with: - :math:`\hat{\boldsymbol{S}}_n` the algorithm's normalized output image (i.e. the *cleaned* image), (using :func:`normalize_array`); - :math:`\boldsymbol{S}^*_n` the normalized reference image (i.e. the *clean* image) (using :func:`normalize_array`); - :math:`\langle \boldsymbol{S} \rangle` the average of matrix :math:`\boldsymbol{S}`; - :math:`\boldsymbol{S}^{\circ 2}` the `Hadamar power <https://en.wikipedia.org/wiki/Hadamard_product_(matrices)#Analogous_operations>`_ (i.e. the element wise square) of matrix :math:`\boldsymbol{S}`. Note ---- This function is not robust to noise on extreme values. Parameters ---------- input_img: 2D ndarray The RAW original image. output_image: 2D ndarray The cleaned image returned by the image cleanning algorithm to assess. reference_image: 2D ndarray The actual clean image (the best result that can be expected for the image cleaning algorithm). kwargs: dict Additional options. Returns ------- float The score of the image cleaning algorithm for the given image. """ # Copy and cast images to prevent tricky bugs # See https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.astype.html#numpy-ndarray-astype output_image = output_image.astype('float64', copy=True) reference_image = reference_image.astype('float64', copy=True) output_image = normalize_array(output_image) reference_image = normalize_array(reference_image) score = np.nanmean(np.square(output_image - reference_image)) return float(score)
# Mean Pixel Difference 2 #####################################################
[docs]def metric2(input_img, output_image, reference_image, **kwargs): r"""Compute the score of ``output_image`` regarding ``reference_image`` with the :math:`\mathcal{E}_{\text{shape}}` metric. It applies .. math:: f(\hat{\boldsymbol{S}}, \boldsymbol{S}^*) = \left\langle \text{abs} \left( \frac{\hat{\boldsymbol{S}}}{\sum_i \hat{\boldsymbol{S}}_i} - \frac{\boldsymbol{S}^*}{\sum_i \boldsymbol{S}^*_i} \right) \right\rangle with: - :math:`\hat{\boldsymbol{S}}` the algorithm's output image (i.e. the *cleaned* image); - :math:`\boldsymbol{S}^*` the reference image (i.e. the *clean* image); - :math:`\langle \boldsymbol{S} \rangle` the average of matrix :math:`\boldsymbol{S}`. Parameters ---------- input_img: 2D ndarray The RAW original image. output_image: 2D ndarray The cleaned image returned by the image cleanning algorithm to assess. reference_image: 2D ndarray The actual clean image (the best result that can be expected for the image cleaning algorithm). kwargs: dict Additional options. Returns ------- float The score of the image cleaning algorithm for the given image. """ # Copy and cast images to prevent tricky bugs # See https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.astype.html#numpy-ndarray-astype output_image = output_image.astype('float64', copy=True) reference_image = reference_image.astype('float64', copy=True) score = FAILURE_SCORE if np.nanmin(output_image) == np.nanmax(output_image) == 0: warnings.warn("Empty output image") #raise EmptyOutputImageError() elif np.nanmin(reference_image) == np.nanmax(reference_image) == 0: warnings.warn("Empty reference image") #raise EmptyReferenceImageError() else: sum_output_image = float(np.nansum(output_image)) sum_reference_image = float(np.nansum(reference_image)) score = float(np.nanmean(np.abs((output_image / sum_output_image) - (reference_image / sum_reference_image)))) return score
# Relative Total Counts Difference (mpdspd) ###################################
[docs]def metric3(input_img, output_image, reference_image, **kwargs): r"""Compute the score of ``output_image`` regarding ``reference_image`` with the :math:`\mathcal{E}^+_{\text{energy}}` (a.k.a. *relative total counts difference*) metric. It applies .. math:: f(\hat{\boldsymbol{S}}, \boldsymbol{S}^*) = \frac{ \text{abs} \left( \sum_i \hat{\boldsymbol{S}}_i - \sum_i \boldsymbol{S}^*_i \right) }{ \sum_i \boldsymbol{S}^*_i } with :math:`\hat{\boldsymbol{S}}` the algorithm's output image (i.e. the *cleaned* image) and :math:`\boldsymbol{S}^*` the reference image (i.e. the *clean* image). Parameters ---------- input_img: 2D ndarray The RAW original image. output_image: 2D ndarray The cleaned image returned by the image cleanning algorithm to assess. reference_image: 2D ndarray The actual clean image (the best result that can be expected for the image cleaning algorithm). kwargs: dict Additional options. Returns ------- float The score of the image cleaning algorithm for the given image. """ # Copy and cast images to prevent tricky bugs # See https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.astype.html#numpy-ndarray-astype output_image = output_image.astype('float64', copy=True) reference_image = reference_image.astype('float64', copy=True) sum_output_image = float(np.nansum(output_image)) sum_reference_image = float(np.nansum(reference_image)) score = FAILURE_SCORE if np.nanmin(reference_image) == np.nanmax(reference_image) == 0: warnings.warn("Empty reference image") #raise EmptyReferenceImageError() else: score = np.abs(sum_output_image - sum_reference_image) / sum_reference_image return score
# Signed Relative Total Counts Difference (sspd) ##############################
[docs]def metric4(input_img, output_image, reference_image, **kwargs): r"""Compute the score of ``output_image`` regarding ``reference_image`` with the :math:`\mathcal{E}_{\text{energy}}` (a.k.a. *signed relative total counts difference*) metric. It applies .. math:: f(\hat{\boldsymbol{S}}, \boldsymbol{S}^*) = \frac{ \sum_i \hat{\boldsymbol{S}}_i - \sum_i \boldsymbol{S}^*_i }{ \sum_i \boldsymbol{S}^*_i } with :math:`\hat{\boldsymbol{S}}` the algorithm's output image (i.e. the *cleaned* image) and :math:`\boldsymbol{S}^*` the reference image (i.e. the *clean* image). Parameters ---------- input_img: 2D ndarray The RAW original image. output_image: 2D ndarray The cleaned image returned by the image cleanning algorithm to assess. reference_image: 2D ndarray The actual clean image (the best result that can be expected for the image cleaning algorithm). kwargs: dict Additional options. Returns ------- float The score of the image cleaning algorithm for the given image. """ # Copy and cast images to prevent tricky bugs # See https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.astype.html#numpy-ndarray-astype output_image = output_image.astype('float64', copy=True) reference_image = reference_image.astype('float64', copy=True) sum_output_image = float(np.nansum(output_image)) sum_reference_image = float(np.nansum(reference_image)) score = FAILURE_SCORE if np.nanmin(reference_image) == np.nanmax(reference_image) == 0: warnings.warn("Empty reference image") #raise EmptyReferenceImageError() else: score = (sum_output_image - sum_reference_image) / sum_reference_image return score
# Structural Similarity Index Measure (SSIM) ##################################
[docs]def metric_ssim(input_img, output_image, reference_image, **kwargs): r"""Compute the score of ``output_image`` regarding ``reference_image`` with the *Structural Similarity Index Measure* (SSIM) metric. See [1]_, [2]_, [3]_ and [4]_ for more information. The SSIM index is calculated on various windows of an image. The measure between two windows :math:`x` and :math:`y` of common size :math:`N.N` is: .. math:: \hbox{SSIM}(x,y) = \frac{(2\mu_x\mu_y + c_1)(2\sigma_{xy} + c_2)}{(\mu_x^2 + \mu_y^2 + c_1)(\sigma_x^2 + \sigma_y^2 + c_2)} with: * :math:`\scriptstyle\mu_x` the average of :math:`\scriptstyle x`; * :math:`\scriptstyle\mu_y` the average of :math:`\scriptstyle y`; * :math:`\scriptstyle\sigma_x^2` the variance of :math:`\scriptstyle x`; * :math:`\scriptstyle\sigma_y^2` the variance of :math:`\scriptstyle y`; * :math:`\scriptstyle \sigma_{xy}` the covariance of :math:`\scriptstyle x` and :math:`\scriptstyle y`; * :math:`\scriptstyle c_1 = (k_1L)^2`, :math:`\scriptstyle c_2 = (k_2L)^2` two variables to stabilize the division with weak denominator; * :math:`\scriptstyle L` the dynamic range of the pixel-values (typically this is :math:`\scriptstyle 2^{\#bits\ per\ pixel}-1`); * :math:`\scriptstyle k_1 = 0.01` and :math:`\scriptstyle k_2 = 0.03` by default. The SSIM index satisfies the condition of symmetry: .. math:: \text{SSIM}(x, y) = \text{SSIM}(y, x) Parameters ---------- input_img: 2D ndarray The RAW original image. output_image: 2D ndarray The cleaned image returned by the image cleanning algorithm to assess. reference_image: 2D ndarray The actual clean image (the best result that can be expected for the image cleaning algorithm). kwargs: dict Additional options. Returns ------- float The score of the image cleaning algorithm for the given image. References ---------- .. [1] Wang, Z., Bovik, A. C., Sheikh, H. R., & Simoncelli, E. P. (2004). Image quality assessment: From error visibility to structural similarity. IEEE Transactions on Image Processing, 13, 600-612. https://ece.uwaterloo.ca/~z70wang/publications/ssim.pdf, DOI:10.1.1.11.2477 .. [2] Avanaki, A. N. (2009). Exact global histogram specification optimized for structural similarity. Optical Review, 16, 613-621. http://arxiv.org/abs/0901.0065, DOI:10.1007/s10043-009-0119-z .. [3] http://scikit-image.org/docs/dev/api/skimage.measure.html#compare-ssim .. [4] https://en.wikipedia.org/wiki/Structural_similarity """ # Copy and cast images to prevent tricky bugs # See https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.astype.html#numpy-ndarray-astype output_image = output_image.astype('float64', copy=True) reference_image = reference_image.astype('float64', copy=True) # TODO: the following two lines may be wrong... output_image[np.isnan(output_image)] = 0 reference_image[np.isnan(reference_image)] = 0 try: ssim_val, ssim_image = ssim(output_image, reference_image, full=True, gaussian_weights=True, sigma=0.5) score = float(ssim_val) except ValueError: score = FAILURE_SCORE return score
# Peak Signal-to-Noise Ratio (PSNR) ###########################################
[docs]def metric_psnr(input_img, output_image, reference_image, **kwargs): r"""Compute the score of ``output_image`` regarding ``reference_image`` with the *Peak Signal-to-Noise Ratio* (PSNR) metric. See [5]_ and [6]_ for more information. Parameters ---------- input_img: 2D ndarray The RAW original image. output_image: 2D ndarray The cleaned image returned by the image cleanning algorithm to assess. reference_image: 2D ndarray The actual clean image (the best result that can be expected for the image cleaning algorithm). kwargs: dict Additional options. Returns ------- float The score of the image cleaning algorithm for the given image. References ---------- .. [5] http://scikit-image.org/docs/dev/api/skimage.measure.html#skimage.measure.compare_psnr .. [6] https://en.wikipedia.org/wiki/Peak_signal-to-noise_ratio """ # Copy and cast images to prevent tricky bugs # See https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.astype.html#numpy-ndarray-astype output_image = output_image.astype('float64', copy=True) reference_image = reference_image.astype('float64', copy=True) # TODO: the following two lines may be wrong... output_image[np.isnan(output_image)] = 0 reference_image[np.isnan(reference_image)] = 0 #psnr_val = psnr(output_image, reference_image, dynamic_range=1e3) psnr_val = psnr(output_image, reference_image, data_range=1e3) return float(psnr_val)
# Delta psi ###################################################################
[docs]def metric_delta_psi(input_img, output_image, reference_image, geom, **kwargs): r"""Compute the score of ``output_image`` regarding ``reference_image`` with the following relative *psi parameters* (relative difference of shower angle between the cleaned image and the reference image). Parameters ---------- input_img: 2D ndarray The RAW original image. output_image: 2D ndarray The cleaned image returned by the image cleanning algorithm to assess. reference_image: 2D ndarray The actual clean image (the best result that can be expected for the image cleaning algorithm). kwargs: dict Additional options. Returns ------- float The score of the image cleaning algorithm for the given image. """ # Copy and cast images to prevent tricky bugs # See https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.astype.html#numpy-ndarray-astype output_image = output_image.astype('float64', copy=True) reference_image = reference_image.astype('float64', copy=True) if "kill" in kwargs and kwargs["kill"]: # Remove isolated pixels on the reference image before assessment. reference_image = filter_pixels_clusters(reference_image, threshold=kwargs["kill_threshold"]) if "hillas_implementation" in kwargs and kwargs["hillas_implementation"] in (1, 2, 3, 4): # Remove isolated pixels on the reference image before assessment. hillas_implementation = kwargs["hillas_implementation"] else: hillas_implementation = 2 if output_image.ndim == 2: output_image = geometry_converter.image_2d_to_1d(output_image, geom.cam_id) # TODO!!! if reference_image.ndim == 2: reference_image = geometry_converter.image_2d_to_1d(reference_image, geom.cam_id) # TODO!!! try: output_image_parameters = get_hillas_parameters(geom, output_image, hillas_implementation) reference_image_parameters = get_hillas_parameters(geom, reference_image, hillas_implementation) # Psi (shower direction angle) output_image_parameter_psi_rad = output_image_parameters.psi.to(u.rad).value reference_image_parameter_psi_rad = reference_image_parameters.psi.to(u.rad).value delta_psi_rad = reference_image_parameter_psi_rad - output_image_parameter_psi_rad normalized_delta_psi_deg = norm_angle_diff(math.degrees(delta_psi_rad)) except Exception as e: #traceback.print_tb(e.__traceback__, file=sys.stdout) print(e) normalized_delta_psi_deg = float('nan') return normalized_delta_psi_deg
# Hillas delta ################################################################
[docs]def metric_hillas_delta(input_img, output_image, reference_image, geom, **kwargs): r"""Compute the score of ``output_image`` regarding ``reference_image`` with the following relative *Hillas parameters*: * :math:`\Delta_{\text{size}} = \text{reference_image}_{\text{size}} - \text{output_image}_{\text{size_out}}` * :math:`\Delta_{\text{cen_x}} = \text{reference_image}_{\text{cen_x}} - \text{output_image}_{\text{cen_x_out}}` * :math:`\Delta_{\text{cen_y}} = \text{reference_image}_{\text{cen_y}} - \text{output_image}_{\text{cen_y_out}}` * :math:`\Delta_{\text{length}} = \text{reference_image}_{\text{length}} - \text{output_image}_{\text{length_out}}` * :math:`\Delta_{\text{width}} = \text{reference_image}_{\text{width}} - \text{output_image}_{\text{width_out}}` * :math:`\Delta_{\text{r}} = \text{reference_image}_{\text{r}} - \text{output_image}_{\text{r_out}}` * :math:`\Delta_{\text{phi}} = \text{reference_image}_{\text{phi}} - \text{output_image}_{\text{phi_out}}` * :math:`\Delta_{\text{psi}} = \text{reference_image}_{\text{psi}} - \text{output_image}_{\text{psi_out}}` * :math:`\Delta_{\text{miss}} = \text{reference_image}_{\text{miss}} - \text{output_image}_{\text{miss_out}}` See http://adsabs.harvard.edu/abs/1989ApJ...342..379W for more details about Hillas parameters. Parameters ---------- input_img: 2D ndarray The RAW original image. output_image: 2D ndarray The cleaned image returned by the image cleanning algorithm to assess. reference_image: 2D ndarray The actual clean image (the best result that can be expected for the image cleaning algorithm). kwargs: dict Additional options. Returns ------- namedtuple The score of the image cleaning algorithm for the given image. """ # Copy and cast images to prevent tricky bugs # See https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.astype.html#numpy-ndarray-astype output_image = output_image.astype('float64', copy=True) reference_image = reference_image.astype('float64', copy=True) if ("kill" in kwargs) and kwargs["kill"] and ("kill_threshold" in kwargs): # Remove isolated pixels on the reference image before assessment. reference_image = filter_pixels_clusters(reference_image, threshold=kwargs["kill_threshold"]) if ("hillas_implementation" in kwargs) and (kwargs["hillas_implementation"] in (1, 2, 3, 4)): # Remove isolated pixels on the reference image before assessment. hillas_implementation = kwargs["hillas_implementation"] else: hillas_implementation = 2 if output_image.ndim == 2: output_image = geometry_converter.image_2d_to_1d(output_image, geom.cam_id) # TODO!!! if reference_image.ndim == 2: reference_image = geometry_converter.image_2d_to_1d(reference_image, geom.cam_id) # TODO!!! try: output_image_parameters = get_hillas_parameters(geom, output_image, hillas_implementation) reference_image_parameters = get_hillas_parameters(geom, reference_image, hillas_implementation) #print(reference_image_parameters) # Size output_image_parameter_size = float(output_image_parameters.size) reference_image_parameter_size = float(reference_image_parameters.size) delta_size = reference_image_parameter_size - output_image_parameter_size # Centroid x output_image_parameter_cen_x = output_image_parameters.cen_x.value reference_image_parameter_cen_x = reference_image_parameters.cen_x.value delta_cen_x = reference_image_parameter_cen_x - output_image_parameter_cen_x # Centroid y output_image_parameter_cen_y = output_image_parameters.cen_y.value reference_image_parameter_cen_y = reference_image_parameters.cen_y.value delta_cen_y = reference_image_parameter_cen_y - output_image_parameter_cen_y # Length output_image_parameter_length = output_image_parameters.length.value reference_image_parameter_length = reference_image_parameters.length.value delta_length = reference_image_parameter_length - output_image_parameter_length # Width output_image_parameter_width = output_image_parameters.width.value reference_image_parameter_width = reference_image_parameters.width.value delta_width = reference_image_parameter_width - output_image_parameter_width # R output_image_parameter_r = output_image_parameters.r.value reference_image_parameter_r = reference_image_parameters.r.value delta_r = reference_image_parameter_r - output_image_parameter_r # Phi output_image_parameter_phi = output_image_parameters.phi.to(u.rad).value reference_image_parameter_phi = reference_image_parameters.phi.to(u.rad).value delta_phi = reference_image_parameter_phi - output_image_parameter_phi # Psi (shower direction angle) output_image_parameter_psi_rad = output_image_parameters.psi.to(u.rad).value reference_image_parameter_psi_rad = reference_image_parameters.psi.to(u.rad).value delta_psi_rad = reference_image_parameter_psi_rad - output_image_parameter_psi_rad # Normalized psi normalized_delta_psi = norm_angle_diff(math.degrees(delta_psi_rad)) ## Miss #output_image_parameter_miss = output_image_parameters.miss.value #reference_image_parameter_miss = reference_image_parameters.miss.value #delta_miss = reference_image_parameter_miss - output_image_parameter_miss if "kill" in kwargs and kwargs["kill"]: suffix_str = '_kill' else: suffix_str = '' score_dict = collections.OrderedDict(( ('hillas' + str(hillas_implementation) + '_delta_size' + suffix_str, delta_size), ('hillas' + str(hillas_implementation) + '_delta_cen_x' + suffix_str, delta_cen_x), ('hillas' + str(hillas_implementation) + '_delta_cen_y' + suffix_str, delta_cen_y), ('hillas' + str(hillas_implementation) + '_delta_length' + suffix_str, delta_length), ('hillas' + str(hillas_implementation) + '_delta_width' + suffix_str, delta_width), ('hillas' + str(hillas_implementation) + '_delta_r' + suffix_str, delta_r), ('hillas' + str(hillas_implementation) + '_delta_phi' + suffix_str, delta_phi), ('hillas' + str(hillas_implementation) + '_delta_psi' + suffix_str, delta_psi_rad), ('hillas' + str(hillas_implementation) + '_delta_psi_norm' + suffix_str, normalized_delta_psi), #('hillas' + str(hillas_implementation) + '_delta_miss' + suffix_str, delta_miss) )) except Exception as e: #traceback.print_tb(e.__traceback__, file=sys.stdout) print(e) if "kill" in kwargs and kwargs["kill"]: suffix_str = '_kill' else: suffix_str = '' score_dict = collections.OrderedDict(( ('hillas' + str(hillas_implementation) + '_delta_size' + suffix_str, float('nan')), ('hillas' + str(hillas_implementation) + '_delta_cen_x' + suffix_str, float('nan')), ('hillas' + str(hillas_implementation) + '_delta_cen_y' + suffix_str, float('nan')), ('hillas' + str(hillas_implementation) + '_delta_length' + suffix_str, float('nan')), ('hillas' + str(hillas_implementation) + '_delta_width' + suffix_str, float('nan')), ('hillas' + str(hillas_implementation) + '_delta_r' + suffix_str, float('nan')), ('hillas' + str(hillas_implementation) + '_delta_phi' + suffix_str, float('nan')), ('hillas' + str(hillas_implementation) + '_delta_psi' + suffix_str, float('nan')), ('hillas' + str(hillas_implementation) + '_delta_psi_norm' + suffix_str, float('nan')), #('hillas' + str(hillas_implementation) + '_delta_miss' + suffix_str, float('nan')) )) Score = collections.namedtuple('Score', score_dict.keys()) return Score(**score_dict)
# Hillas delta 2 ##############################################################
[docs]def metric_hillas_delta2(input_img, output_image, reference_image, geom, **kwargs): r"""Compute the score of ``output_image`` regarding ``reference_image`` with the *Hillas parameters*. It works exactly like :func:`metric_hillas_delta` except that isolated pixels are removed from the ``reference_image`` before the evaluation (using :func:`pywi.processing.filtering.pixel_clusters.filter_pixels_clusters`). Parameters ---------- input_img: 2D ndarray The RAW original image. output_image: 2D ndarray The cleaned image returned by the image cleanning algorithm to assess. reference_image: 2D ndarray The actual clean image (the best result that can be expected for the image cleaning algorithm). kwargs: dict Additional options. Returns ------- namedtuple The score of the image cleaning algorithm for the given image. """ kwargs["kill"] = True kwargs["kill_threshold"] = 0.2 # TODO: don't give an hardcoded value scores = metric_hillas_delta(input_img, output_image, reference_image, geom, **kwargs) return scores
# Kill isolated pixels ######################################################## def metric_kill_isolated_pixels(input_img, output_image, reference_image, **kwargs): try: delta_pe, delta_abs_pe, delta_num_pixels = filter_pixels_clusters_stats(output_image) score_dict = collections.OrderedDict(( ('kill_isolated_pixels_delta_pe', delta_pe), ('kill_isolated_pixels_delta_abs_pe', delta_abs_pe), ('kill_isolated_pixels_delta_num_pixels', delta_num_pixels) )) except Exception as e: score_dict = collections.OrderedDict(( ('kill_isolated_pixels_delta_pe', float('nan')), ('kill_isolated_pixels_delta_abs_pe', float('nan')), ('kill_isolated_pixels_delta_num_pixels', float('nan')) )) #traceback.print_tb(e.__traceback__, file=sys.stdout) print(e) Score = collections.namedtuple('Score', score_dict.keys()) return Score(**score_dict) # Cleaning failure ############################################################ def metric_clean_failure(input_img, output_image, reference_image, **kwargs): if np.count_nonzero(output_image) > 3: # TODO: improve this condition return 0. # success else: return 1. # failure # ROC ######################################################################### def metric_roc(input_img, output_image, reference_image, **kwargs): try: # https://docs.scipy.org/doc/numpy-1.13.0/reference/routines.logic.html roc_true_positives = np.logical_and(reference_image, output_image).sum() roc_false_positives = np.logical_and(np.logical_not(reference_image), output_image).sum() roc_true_negatives = np.logical_and(np.logical_not(reference_image), np.logical_not(output_image)).sum() roc_false_negatives = np.logical_and(reference_image, np.logical_not(output_image)).sum() score_dict = collections.OrderedDict(( ('roc_true_positives', float(roc_true_positives)), ('roc_false_positives', float(roc_false_positives)), ('roc_true_negatives', float(roc_true_negatives)), ('roc_false_negatives', float(roc_false_negatives)) )) except Exception as e: score_dict = collections.OrderedDict(( ('roc_true_positives', float('nan')), ('roc_false_positives', float('nan')), ('roc_true_negatives', float('nan')), ('roc_false_negatives', float('nan')) )) #traceback.print_tb(e.__traceback__, file=sys.stdout) print(e) Score = collections.namedtuple('Score', score_dict.keys()) return Score(**score_dict) ############################################################################### # ASSESS FUNCTIONS DRIVER # ############################################################################### BENCHMARK_DICT = { "mse": (metric_mse,), "nrmse": (metric_nrmse,), "unrmse": (metric1,), "e_shape": (metric2,), "e_energy": (metric3,), "mpdspd": (metric2, metric3), "sspd": (metric4,), "ssim": (metric_ssim,), "psnr": (metric_psnr,), "delta_psi": (metric_delta_psi,), "hillas_delta": (metric_hillas_delta,), "hillas_delta2": (metric_hillas_delta2,), "kill_isolated_pixels": (metric_kill_isolated_pixels,), "metric_clean_failure": (metric_clean_failure,), "metric_roc": (metric_roc,), "all": (metric_mse, metric_nrmse, metric2, metric3, metric4, metric_ssim, metric_psnr, metric_hillas_delta, metric_hillas_delta2, metric_kill_isolated_pixels, metric_clean_failure, metric_roc) } METRIC_NAME_DICT = { metric_mse: "mse", metric_nrmse: "nrmse", metric1: "unrmse", metric2: "e_shape", metric3: "e_energy", metric4: "sspd", metric_ssim: "ssim", metric_psnr: "psnr", metric_delta_psi: "delta_psi", metric_hillas_delta: "hillas_delta", metric_hillas_delta2: "hillas_delta2", metric_kill_isolated_pixels: "kill_isolated_pixels", metric_clean_failure: "metric_clean_failure", metric_roc: "metric_roc" }
[docs]def assess_image_cleaning(input_img, output_img, reference_img, benchmark_method, **kwargs): r"""Compute the score of `output_image` regarding `reference_image` with the `benchmark_method` metrics: - "mse": :func:`metric_mse` - "nrmse": :func:`metric_nrmse` - "unrmse": :func:`metric1` - "e_shape": :func:`metric2` - "e_energy": :func:`metric3` - "mpdspd": :func:`metric2`, :func:`metric3` - "sspd": :func:`metric4` - "ssim": :func:`metric_ssim` - "psnr": :func:`metric_psnr` - "delta_psi": :func:`metric_delta_psi` - "hillas_delta": :func:`metric_hillas_delta` - "hillas_delta2": :func:`metric_hillas_delta2` - "kill_isolated_pixels": :func:`metric_kill_isolated_pixels` - "metric_clean_failure": :func:`metric_clean_failure` - "metric_roc": :func:`metric_roc` - "all": :func:`metric_mse`, :func:`metric_nrmse`, :func:`metric2`, :func:`metric3`, :func:`metric4`, :func:`metric_ssim`, :func:`metric_psnr`, :func:`metric_hillas_delta`, :func:`metric_hillas_delta2`, :func:`metric_kill_isolated_pixels`, :func:`metric_clean_failure`, :func:`metric_roc` Parameters ---------- input_img: 2D ndarray The RAW original image. output_img: 2D ndarray The cleaned image returned by the image cleanning algorithm to assess. reference_img: 2D ndarray The actual clean image (the best result that can be expected for the image cleaning algorithm). kwargs: dict Additional options. Returns ------- tuple of float numbers The score(s) of the image cleaning algorithm for the given image. """ try: score_list = [] metric_name_list = [] for metric_function in BENCHMARK_DICT[benchmark_method]: #try: score = metric_function(input_img, output_img, reference_img, **kwargs) #except Exception as e: # score = float('nan') # #print(e) # traceback.print_tb(e.__traceback__, file=sys.stdout) if isinstance(score, collections.Sequence): score_list.extend(score) metric_name_list.extend(score._fields) else: score_list.append(score) metric_name_list.append(METRIC_NAME_DICT[metric_function]) assert len(score_list) == len(metric_name_list) except KeyError as e: raise ValueError("Unknown benchmark method {}".format(benchmark_method)) #for s, m in zip(score_list, metric_name_list): # print(m, ":", s, type(s)) return tuple(score_list), tuple(metric_name_list)
def get_metrics_names(benchmark_method="all"): """A hack to get the tuple of metrics names for a given `brenchmark_method`.""" cam_id = "LSTCam" geom1d = geometry_converter.get_geom1d(cam_id) shape = geom1d.pix_x.shape dtype = "float" inp_img = np.zeros(shape=shape, dtype=dtype) out_img = np.zeros(shape=shape, dtype=dtype) ref_img = np.zeros(shape=shape, dtype=dtype) inp_img_2d = geometry_converter.image_1d_to_2d(inp_img, cam_id) out_img_2d = geometry_converter.image_1d_to_2d(out_img, cam_id) ref_img_2d = geometry_converter.image_1d_to_2d(ref_img, cam_id) score_tuple, metrics_name_tuple = assess_image_cleaning(inp_img_2d, out_img_2d, ref_img_2d, benchmark_method=benchmark_method, geom=geom1d) return metrics_name_tuple