Source code for

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Copyright (c) 2016 Jérémie DECOCK (

# This script is provided under the terms and conditions of the MIT license:
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.

__all__ = ['fill_nan_pixels',

import math

import collections

import numpy as np

import matplotlib.pyplot as plt
from matplotlib import cm
from matplotlib.patches import Ellipse

import os

from import fits
import astropy.units as u

import pandas as pd

from traitlets.config import Config

import ctapipe
import pyhessio
from ctapipe.image.hillas import HillasParameterizationError
from import hessio_event_source
from ctapipe.instrument import CameraGeometry
from ctapipe.calib import CameraCalibrator
import ctapipe.visualization

from pywicta.image.hillas_parameters import get_hillas_parameters
from import geometry_converter

DEBUG = False

# EXCEPTIONS #################################################################

class FitsError(Exception):

class WrongHDUError(FitsError):
    """Exception raised when trying to access a wrong HDU in a FITS file.

        file_path -- the FITS file concerned by the error
        hdu_index -- the HDU index concerned by the error

    def __init__(self, file_path, hdu_index):
        super().__init__("File {} doesn't have data in HDU {}.".format(file_path, hdu_index))
        self.file_path = file_path
        self.hdu_index = hdu_index

class NotAnImageError(FitsError):
    """Exception raised when trying to load a FITS file which doesn't contain a
    valid image in the given HDU.

        file_path -- the FITS file concerned by the error
        hdu_index -- the HDU index concerned by the error

    def __init__(self, file_path, hdu_index):
        super().__init__("HDU {} in file {} doesn't contain any image.".format(hdu_index, file_path))
        self.file_path = file_path
        self.hdu_index = hdu_index

class WrongDimensionError(FitsError):
    """ Exception raised when trying to save a FITS with more than 3 dimensions
    or less than 2 dimensions.

    def __init__(self):
        super().__init__("The input image should be a 2D or a 3D numpy array.")

class WrongFitsFileStructure(FitsError):
    """Exception raised when trying to load a FITS file which doesn't contain a
    valid structure (for benchmark).

        file_path -- the FITS file concerned by the error

    def __init__(self, file_path):
        super().__init__("File {} doesn't contain a valid structure.".format(file_path))
        self.file_path = file_path

# FILL NAN PIXELS #############################################################

[docs]def fill_nan_pixels(image, noise_distribution=None): """Replace *in-place* `NaN` values in `image` by zeros or by random noise. Images containing `NaN` values generate undesired harmonics with wavelet image cleaning. This function should be used to "fix" images before each wavelet image cleaning. Replace `NaN` ("Not a Number") values in `image` by zeros if `noise_distribution` is `None`. Otherwise, `NaN` values are replaced by random noise drawn by the `noise_distribution` random generator. Parameters ---------- image : array_like The image to process. `NaN` values are replaced **in-place** thus this function changes the provided object. noise_distribution : `pywicta.denoising.inverse_transform_sampling.EmpiricalDistribution` The random generator to use to replace `NaN` pixels by random noise. Returns ------- array_like Returns a boolean mask array indicating whether pixels in `images` initially contained `NaN` values (`True`) of not (`False`). This array is defined by the instruction `np.isnan(image)`. Notes ----- `NaN` values are replaced **in-place** in the provided `image` parameter. Examples -------- >>> import numpy as np >>> img = np.array([[1, 2, np.nan],[4, np.nan, 6],[np.nan, 8, np.nan]]) >>> fill_nan_pixels(img) ... # doctest: +NORMALIZE_WHITESPACE array([[False, False, True], [False, True, False], [ True, False, True]], dtype=bool) >>> img ... # doctest: +NORMALIZE_WHITESPACE array([[ 1., 2., 0.], [ 4., 0., 6.], [ 0., 8., 0.]]) """ # See nan_mask = np.isnan(image) if DEBUG: print(image) plot(image, "In") plot(nan_mask, "Mask") if noise_distribution is not None: nan_noise_size = np.count_nonzero(nan_mask) image[nan_mask] = noise_distribution.rvs(size=nan_noise_size) else: image[nan_mask] = 0 if DEBUG: print(image) plot(image, "Noise injected") return nan_mask
# DIRECTORY PARSER ############################################################
[docs]def image_files_in_dir(directory_path, max_num_files=None): """Return the path of FITS and Simtel files in `directory_path`. Return the path of all (or `max_num_files`) files having the extension ".simtel", ".simtel.gz", ".fits" or ".fit" in `directory_path`. Parameters ---------- directory_path : str The directory's path where FITS and Simtel files are searched. max_num_files : int The maximum number of files to return. Yields ------ str The path of the next FITS or Simtel files in `directory_path`. """ FILE_EXT = (".simtel", ".simtel.gz", ".fits", ".fit") directory_path = os.path.expanduser(directory_path) files_counter = 0 for file_name in os.listdir(directory_path): file_path = os.path.join(directory_path, file_name) if os.path.isfile(file_path) and file_name.lower().endswith(FILE_EXT): files_counter += 1 if (max_num_files is not None) and (files_counter > max_num_files): break else: yield file_path
[docs]def image_files_in_paths(path_list, max_num_files=None): """Return the path of FITS and Simtel files in `path_list`. Return the path of all (or `max_num_files`) files having the extension ".simtel", ".simtel.gz", ".fits" or ".fit" in `path_list`. Parameters ---------- path_list : str The list of directory's path where FITS and Simtel files are searched. It can also directly contain individual file paths (or a mix of files and directories path). max_num_files : int The maximum number of files to return. Yields ------ str The path of the next FITS or Simtel files in `path_list`. """ files_counter = 0 for path in path_list: path = os.path.expanduser(path) if os.path.isdir(path): # If path is a directory for file_path in image_files_in_dir(path): files_counter += 1 if (max_num_files is not None) and (files_counter > max_num_files): break else: yield file_path elif os.path.isfile(path): # If path is a regular file files_counter += 1 if (max_num_files is not None) and (files_counter > max_num_files): break else: yield path else: raise Exception("Wrong item:", path)
# LOAD IMAGES ################################################################ Image1D = collections.namedtuple('Image1D', ('input_image', 'input_samples', 'reference_image', 'adc_samples', 'extracted_samples', 'peakpos', 'adc_sum_image', 'pedestal_image', 'gains_image', 'pixels_position', 'meta')) Image2D = collections.namedtuple('Image2D', ('input_image', 'input_samples', # TODO 'reference_image', 'adc_samples', # TODO 'extracted_samples', 'peakpos', 'adc_sum_image', 'pedestal_image', 'gains_image', 'pixels_position', 'pixels_mask', 'meta'))
[docs]def image_generator(path_list, max_num_images=None, tel_filter_list=None, ev_filter_list=None, cam_filter_list=None, rejection_criteria=None, **kwargs): """Return an iterable sequence all calibrated images in `path_list`. Parameters ---------- path_list : list of str The path of files containing the images to extract. It can contain FITS/Simtel files and directories. max_num_images : int The maximum number of images to iterate. tel_filter_list : list of int Only iterate images from telescopes defined in this list. ev_filter_list : list of int Only iterate images from events defined in this list. cam_filter_list : list of string Only iterate images from cameras defined in this list. rejection_criteria : function A function that contains image rejection criteria. This function takes images and return True for images that should be ignored by the generator and False otherwise. It can be used to ignore images that are not in a given range of energy or images with a shower too close to the borders for instance. Yields ------ Image1D or Image2D The named tuple `Image1D` or `Image1D` of the next FITS or Simtel files in `path_list`. """ images_counter = 0 for file_path in image_files_in_paths(path_list): if (max_num_images is not None) and (images_counter >= max_num_images): break else: if file_path.lower().endswith((".simtel", ".simtel.gz")): # SIMTEL FILES for image in simtel_images_generator(file_path, tel_filter_list=tel_filter_list, ev_filter_list=ev_filter_list, cam_filter_list=cam_filter_list, rejection_criteria=rejection_criteria, **kwargs): if (max_num_images is not None) and (images_counter >= max_num_images): pyhessio.close_file() break else: images_counter += 1 yield image elif file_path.lower().endswith((".fits", ".fit")): # FITS FILES image2d = load_benchmark_images(file_path) if (tel_filter_list is None) or (image2d.meta['tel_id'] in tel_filter_list): if (ev_filter_list is None) or (image2d.meta['event_id'] in ev_filter_list): if (cam_filter_list is None) or (image2d.meta['cam_id'] in cam_filter_list): if (rejection_criteria is None) or not rejection_criteria(image2d): images_counter += 1 yield image2d else: raise Exception("Wrong item:", file_path)
# LOAD SIMTEL IMAGE ##########################################################
[docs]def quantity_to_tuple(quantity, unit_str): """Splits a quantity into a tuple of (value,unit) where unit is FITS compliant. Useful to write FITS header keywords with units in a comment. Parameters ---------- quantity : astropy quantity The Astropy quantity to split. unit_str: str Unit string representation readable by astropy.units (e.g. 'm', 'TeV', ...) Returns ------- tuple A tuple containing the value and the quantity. """ return,'FITS')
[docs]def simtel_event_to_images(event, tel_id, ctapipe_format=False, mix_channels=True, time_samples=False, **kwargs): """Extract and return `tel_id`'s images and metadata from a ctapipe `event`. Parameters ---------- event : ctapipe event object TODO tel_id : int TODO ctapipe_format : bool TODO mix_channels : bool TODO Returns ------- Image1D or Image2D Return a named tuple containing `tel_id` images and metadata from a ctapipe `event`. """ SINGLE_CHANNEL_CAMERAS = ("CHEC", "DigiCam", "FlashCam") TWO_CHANNELS_CAMERAS = ("ASTRICam", "NectarCam", "LSTCam") # GUESS THE IMAGE GEOMETRY ################################ x, y = event.inst.pixel_pos[tel_id] foclen = event.inst.optical_foclen[tel_id] geom1d = CameraGeometry.guess(x, y, foclen) # GET IMAGES ############################################## mc_pe =[tel_id].photo_electron_image mc_pedestal =[tel_id].pedestal # [N channels] mc_gain =[tel_id].dc_to_pe # [N channels] r0_adc_sums =[tel_id].adc_sums # [N channels] r0_adc_samples =[tel_id].adc_samples # [N channels] r1_pe_samples =[tel_id].pe_samples # [N channels] dl0_pe_samples =[tel_id].pe_samples # [N channels] dl1_cleaned_samples =[tel_id].cleaned # [N channels] dl1_extracted_samples =[tel_id].extracted_samples # [N channels] dl1_image =[tel_id].image # [N channels] dl1_peakpos =[tel_id].peakpos # [N channels] # ALIAS # pe_image = mc_pe pedestal = mc_pedestal gain = mc_gain uncalibrated_image = r0_adc_sums uncalibrated_samples = r0_adc_samples calibrated_image = dl1_image.copy() if time_samples: calibrated_samples = dl0_pe_samples.copy() else: calibrated_samples = None extracted_samples = dl1_extracted_samples peakpos = dl1_peakpos pixel_pos = event.inst.pixel_pos[tel_id] cam_id = geom1d.cam_id # MIX CHANNELS FOR DOUBLE CHANNEL CAMERAS ################# if mix_channels: if cam_id == "ASTRICam": ASTRI_CAM_CHANNEL_THRESHOLD = 2**12 - 1 # cf. "signal_and_noise_histograms_loglog_adc_counts_not_summed_per_channel_FAST" notebook calibrated_image[1, r0_adc_samples[0].max(axis=1) < ASTRI_CAM_CHANNEL_THRESHOLD] = 0 calibrated_image[0, r0_adc_samples[0].max(axis=1) >= ASTRI_CAM_CHANNEL_THRESHOLD] = 0 calibrated_image = calibrated_image.sum(axis=0) if time_samples: calibrated_samples[1, r0_adc_samples[0].max(axis=1) < ASTRI_CAM_CHANNEL_THRESHOLD] = 0 calibrated_samples[0, r0_adc_samples[0].max(axis=1) >= ASTRI_CAM_CHANNEL_THRESHOLD] = 0 calibrated_samples = calibrated_samples.sum(axis=0) elif cam_id == "NectarCam": NECTAR_CAM_CHANNEL_THRESHOLD = 2**12 - 1 # cf. "signal_and_noise_histograms_loglog_adc_counts_not_summed_per_channel_FAST" notebook calibrated_image[1, r0_adc_samples[0].max(axis=1) < NECTAR_CAM_CHANNEL_THRESHOLD] = 0 # LG channel calibrated_image[0, r0_adc_samples[0].max(axis=1) >= NECTAR_CAM_CHANNEL_THRESHOLD] = 0 # HG channel calibrated_image = calibrated_image.sum(axis=0) if time_samples: calibrated_samples[1, r0_adc_samples[0].max(axis=1) < NECTAR_CAM_CHANNEL_THRESHOLD] = 0 # LG channel calibrated_samples[0, r0_adc_samples[0].max(axis=1) >= NECTAR_CAM_CHANNEL_THRESHOLD] = 0 # HG channel calibrated_samples = calibrated_samples.sum(axis=0) elif cam_id == "LSTCam": LST_CAM_CHANNEL_THRESHOLD = 2**12 - 1 # cf. "signal_and_noise_histograms_loglog_adc_counts_not_summed_per_channel_FAST" notebook calibrated_image[1, r0_adc_samples[0].max(axis=1) < LST_CAM_CHANNEL_THRESHOLD] = 0 calibrated_image[0, r0_adc_samples[0].max(axis=1) >= LST_CAM_CHANNEL_THRESHOLD] = 0 calibrated_image = calibrated_image.sum(axis=0) if time_samples: calibrated_samples[1, r0_adc_samples[0].max(axis=1) < LST_CAM_CHANNEL_THRESHOLD, :] = 0 calibrated_samples[0, r0_adc_samples[0].max(axis=1) >= LST_CAM_CHANNEL_THRESHOLD, :] = 0 calibrated_samples = calibrated_samples.sum(axis=0) elif cam_id in SINGLE_CHANNEL_CAMERAS : calibrated_image = calibrated_image[0] if time_samples: calibrated_samples = calibrated_samples[0] else: raise NotImplementedError("Unknown camera: {}".format(cam_id)) # METADATA ############################################### metadata = {'cam_id': cam_id} ########################################################## if ctapipe_format: return Image1D(input_image=calibrated_image, input_samples=calibrated_samples, reference_image=pe_image, adc_samples=uncalibrated_samples, extracted_samples=extracted_samples, peakpos=peakpos, adc_sum_image=uncalibrated_image, pedestal_image=pedestal, gains_image=gain, pixels_position=pixel_pos, meta=metadata) else: # CONVERTING GEOMETRY (1D TO 2D) ########################## if cam_id in SINGLE_CHANNEL_CAMERAS: pe_image_2d = geometry_converter.image_1d_to_2d(pe_image, cam_id=cam_id) calibrated_image_2d = geometry_converter.image_1d_to_2d(calibrated_image, cam_id=cam_id) if time_samples: calibrated_samples_2d = geometry_converter.image_1d_to_2d(calibrated_samples, cam_id=cam_id) else: calibrated_samples_2d = None uncalibrated_samples_2d = None # TODO extracted_samples_2d = None # TODO peakpos_2d = None # TODO uncalibrated_image_2d = geometry_converter.image_1d_to_2d(uncalibrated_image[0], cam_id=cam_id) pedestal_2d = geometry_converter.image_1d_to_2d(pedestal[0], cam_id=cam_id) gains_2d = geometry_converter.image_1d_to_2d(gain[0], cam_id=cam_id) elif cam_id in TWO_CHANNELS_CAMERAS: pe_image_2d = geometry_converter.image_1d_to_2d(pe_image, cam_id=cam_id) calibrated_image_2d = geometry_converter.image_1d_to_2d(calibrated_image, cam_id=cam_id) if time_samples: calibrated_samples_2d = geometry_converter.image_1d_to_2d(calibrated_samples, cam_id=cam_id) else: calibrated_samples_2d = None uncalibrated_samples_2d = None # TODO extracted_samples_2d = None # TODO peakpos_2d = None # TODO uncalibrated_image_2d_ch0 = geometry_converter.image_1d_to_2d(uncalibrated_image[0], cam_id=cam_id) uncalibrated_image_2d_ch1 = geometry_converter.image_1d_to_2d(uncalibrated_image[1], cam_id=cam_id) pedestal_2d_ch0 = geometry_converter.image_1d_to_2d(pedestal[0], cam_id=cam_id) pedestal_2d_ch1 = geometry_converter.image_1d_to_2d(pedestal[1], cam_id=cam_id) gains_2d_ch0 = geometry_converter.image_1d_to_2d(gain[0], cam_id=cam_id) gains_2d_ch1 = geometry_converter.image_1d_to_2d(gain[1], cam_id=cam_id) else: raise NotImplementedError("Unknown camera: {}".format(cam_id)) # Make a mock pixel position array... pixel_pos_2d = np.array(np.meshgrid(np.linspace(pixel_pos[0].min().value, pixel_pos[0].max().value, pe_image_2d.shape[0]), np.linspace(pixel_pos[1].min().value, pixel_pos[1].max().value, pe_image_2d.shape[1]))) # FIX THE ARRAY SHAPE ##################################### # The ctapipe geometry converter operate on one channel # only and then takes and return a 2D array but pywicta # fits files keep all channels and thus takes 3D arrays... if cam_id in SINGLE_CHANNEL_CAMERAS: uncalibrated_image_2d = np.array([uncalibrated_image_2d]) pedestal_2d = np.array([pedestal_2d]) gains_2d = np.array([gains_2d]) elif cam_id in TWO_CHANNELS_CAMERAS: uncalibrated_image_2d = np.array([uncalibrated_image_2d_ch0, uncalibrated_image_2d_ch1]) pedestal_2d = np.array([pedestal_2d_ch0, pedestal_2d_ch1 ]) gains_2d = np.array([gains_2d_ch0, gains_2d_ch1]) else: raise NotImplementedError("Unknown camera: {}".format(cam_id)) # GET PIXEL MASK AND PUT NAN IN BLANK PIXELS ############## mask_1d = np.ones(pe_image.shape) mask_2d = geometry_converter.image_1d_to_2d(mask_1d, cam_id=cam_id) # TODO: apparently nan values are already there so this step is useless... #calibrated_image_2d[mask_2d != 1] = np.nan #pe_image_2d[mask_2d != 1] = np.nan #uncalibrated_image_2d[0, mask_2d != 1] = np.nan #pedestal_2d[0, mask_2d != 1] = np.nan #gains_2d[0, mask_2d != 1] = np.nan #if cam_id in ("NectarCam", "LSTCam", "ASTRICam", "ASTRI"): # # Double channel instruments # uncalibrated_image_2d[1, mask_2d != 1] = np.nan # pedestal_2d[1, mask_2d != 1] = np.nan # gains_2d[1, mask_2d != 1] = np.nan #pixel_pos_2d[mask_2d != 1] = np.nan return Image2D(input_image=calibrated_image_2d, input_samples=calibrated_samples_2d, reference_image=pe_image_2d, adc_samples=uncalibrated_samples_2d, extracted_samples=extracted_samples_2d, peakpos=peakpos_2d, adc_sum_image=uncalibrated_image_2d, pedestal_image=pedestal_2d, gains_image=gains_2d, pixels_position=pixel_pos_2d, pixels_mask=mask_2d, meta=metadata)
[docs]def simtel_images_generator(file_path, tel_filter_list=None, ev_filter_list=None, cam_filter_list=None, ctapipe_format=False, integrator='LocalPeakIntegrator', integrator_window_width=5, integrator_window_shift=2, integrator_t0=None, integrator_sig_amp_cut_hg=None, integrator_sig_amp_cut_lg=None, integrator_lwt=None, integration_correction=False, debug=False, rejection_criteria=None, **kwargs): """Return an iterable sequence all calibrated images in `file_path`. Parameters ---------- file_path : str The path of a Simtel file. tel_filter_list : sequence of int If defined, the generator iterator returns only images from telescopes listed in ``tel_filter_list``. ev_filter_list : sequence of int If defined, the generator iterator returns only images from events listed in ``ev_filter_list``. cam_filter_list : sequence of str If defined, the generator iterator returns only images from instrument listed in ``cam_filter_list``. ctapipe_format : bool The generator iterator returns ctapipe compliant 1D images if ``True``; it returns 2D converted images compliant with `iSAP/Sparce2D` otherwise. integrator : str Define the trace integration method used in ````. Should be one of the following: * None: use the default integrator with the default parameters (NeighbourPeakIntegrator); * 'FullIntegrator': integrates the entire waveform; * 'SimpleIntegrator': integrates within a window defined by the user; * 'GlobalPeakIntegrator': integration window about the global peak in each pixel; * 'LocalPeakIntegrator': integration window about the local peak in each pixel (default); * 'NeighbourPeakIntegrator': integration window defined by the peaks in the neighbouring pixels; * 'AverageWfPeakIntegrator': integration window defined by the average waveform across all pixels. integrator_window_width : int Define the width of the integration window. Only applicable to WindowIntegrators. integrator_window_shift : int Define the shift of the integration window from the peakpos (peakpos - shift). Only applicable to PeakFindingIntegrators. integrator_t0 : int Define the peak position for all pixels. Only applicable to SimpleIntegrators. integrator_sig_amp_cut_hg : float Define the cut above which a sample is considered as significant for PeakFinding in the HG channel. Only applicable to PeakFindingIntegrators. integrator_sig_amp_cut_lg : float Define the cut above which a sample is considered as significant for PeakFinding in the LG channel. Only applicable to PeakFindingIntegrators. integrator_lwt : int Weight of the local pixel (0: peak from neighbours only, 1: local pixel counts as much as any neighbour). Only applicable to NeighbourPeakIntegrator. integration_correction : bool If ``False``, switch off the integration correction occurring in ````. debug : bool Print additional values if ``True``. rejection_criteria : function A function that contains image rejection criteria. This function takes images and return True for images that should be ignored by the generator and False otherwise. It can be used to ignore images that are not in a given range of energy or images with a shower too close to the borders for instance. Notes ----- For more information about the integrator (``integrator*``) parameters, see the ``ChargeExtractorFactory`` class definition in ``ctapipe/image/`` or type (in a terminal) ``ctapipe-chargeres-extract --help-all``. Yields ------ image The next image in `file_path`. """ # EXTRACT IMAGES ########################################################## # hessio_event_source returns a Python generator that streams data from an # EventIO/HESSIO MC data file (e.g. a standard CTA data file). # This generator contains ctapipe.core.Container instances ("event"). # # Parameters: # - max_events: maximum number of events to read # - allowed_tels: select only a subset of telescope, if None, all are read. file_path = os.path.expanduser(file_path) source = hessio_event_source(file_path, allowed_tels=tel_filter_list) # CONFIGURE THE CALIBRATOR ################################################ # cfg = Config() # cfg["ChargeExtractorFactory"]["extractor"] = ‘LocalPeakIntegrator' # cfg["ChargeExtractorFactory"]["window_width"] = 5 # cfg["ChargeExtractorFactory"]["window_shift"] = 2 # # calib = CameraCalibrator(config=cfg, tool=None) # # def null_integration_correction_func(n_chan, pulse_shape, refstep, time_slice, window_width, window_shift): #     return np.ones(n_chan) # # = null_integration_correction_func if integrator is None: cfg = None elif integrator in ('FullIntegrator', 'SimpleIntegrator', 'GlobalPeakIntegrator', 'LocalPeakIntegrator', 'NeighbourPeakIntegrator', 'AverageWfPeakIntegrator'): cfg = Config() cfg["ChargeExtractorFactory"]["extractor"] = integrator if integrator_window_width is not None: cfg["ChargeExtractorFactory"]["window_width"] = integrator_window_width if integrator_window_shift is not None: cfg["ChargeExtractorFactory"]["window_shift"] = integrator_window_shift if integrator_t0 is not None: cfg["ChargeExtractorFactory"]["t0"] = integrator_t0 if integrator_sig_amp_cut_hg is not None: cfg["ChargeExtractorFactory"]["sig_amp_cut_HG"] = integrator_sig_amp_cut_hg if integrator_sig_amp_cut_lg is not None: cfg["ChargeExtractorFactory"]["sig_amp_cut_LG"] = integrator_sig_amp_cut_lg if integrator_lwt is not None: cfg["ChargeExtractorFactory"]["lwt"] = integrator_lwt else: raise ValueError(integrator) calib = CameraCalibrator(config=cfg, tool=None) if not integration_correction: # Switch off the integration correction occurring in #null_integration_correction_func = lambda n_chan, pulse_shape, refstep, time_slice, window_width, window_shift: np.ones(n_chan) null_integration_correction_func = lambda n_chan, *args, **kwargs: np.ones(n_chan) = null_integration_correction_func if debug: try: print("", except: pass try: print("extractor.window_width:", calib.dl1.extractor.window_width) except: pass try: print("extractor.window_shift:", calib.dl1.extractor.window_shift) except: pass try: print("extractor.t0:", calib.dl1.extractor.t0) except: pass try: print("extractor.sig_amp_cut_HG:", calib.dl1.extractor.sig_amp_cut_HG) except: pass try: print("extractor.sig_amp_cut_LG:", calib.dl1.extractor.sig_amp_cut_LG) except: pass try: print("extractor.lwt:", calib.dl1.extractor.lwt) except: pass # ITERATE OVER EVENTS ##################################################### for event in source: calib.calibrate(event) # calibrate the event event_id = int(event.dl0.event_id) if (ev_filter_list is None) or (event_id in ev_filter_list): # ITERATE OVER IMAGES ############################################# for tel_id in event.trig.tels_with_trigger: tel_id = int(tel_id) if (tel_filter_list is None) or (tel_id in tel_filter_list): try: image = simtel_event_to_images(event, tel_id, ctapipe_format=ctapipe_format, **kwargs) cam_id = image.meta['cam_id'] except NotImplementedError: cam_id = None if (cam_id is not None) and ((cam_filter_list is None) or (cam_id in cam_filter_list)): # MAKE METADATA ########################################### #image.meta['version'] = 1 # Version of the pywicta data format image.meta['tel_id'] = tel_id image.meta['event_id'] = event_id image.meta['file_path'] = file_path image.meta['simtel_path'] = file_path image.meta['num_tel_with_trigger'] = len(event.trig.tels_with_trigger) image.meta['mc_energy'] = quantity_to_tuple(, 'TeV') image.meta['mc_azimuth'] = quantity_to_tuple(, 'rad') image.meta['mc_altitude'] = quantity_to_tuple(, 'rad') image.meta['mc_core_x'] = quantity_to_tuple(, 'm') image.meta['mc_core_y'] = quantity_to_tuple(, 'm') image.meta['mc_height_first_interaction'] = quantity_to_tuple(, 'm') image.meta['ev_count'] = int(event.count) image.meta['run_id'] = int(event.dl0.run_id) image.meta['num_tel_with_data'] = len(event.dl0.tels_with_data) image.meta['optical_foclen'] = quantity_to_tuple(event.inst.optical_foclen[tel_id], 'm') image.meta['tel_pos_x'] = quantity_to_tuple(event.inst.tel_pos[tel_id][0], 'm') image.meta['tel_pos_y'] = quantity_to_tuple(event.inst.tel_pos[tel_id][1], 'm') image.meta['tel_pos_z'] = quantity_to_tuple(event.inst.tel_pos[tel_id][2], 'm') image.meta['integrator'] = integrator image.meta['integrator_window_width'] = integrator_window_width image.meta['integrator_window_shift'] = integrator_window_shift image.meta['integrator_t0'] = integrator_t0 image.meta['integrator_sig_amp_cut_hg'] = integrator_sig_amp_cut_hg image.meta['integrator_sig_amp_cut_lg'] = integrator_sig_amp_cut_lg image.meta['integrator_lwt'] = integrator_lwt image.meta['integration_correction'] = integration_correction # IMAGES ################################################## #images_dict = {} #images_dict["input_image"] = calibrated_image_2d #images_dict["reference_image"] = pe_image_2d #images_dict["adc_sum_image"] = uncalibrated_image_2d #images_dict["pedestal_image"] = pedestal_2d #images_dict["gains_image"] = gains_2d #images_dict["pixels_position"] = pixel_pos_2d #images_dict["pixels_mask"] = mask_2d if (rejection_criteria is None) or not rejection_criteria(image): yield image # End of file pyhessio.close_file()
# LOAD FITS BENCHMARK IMAGE ##################################################
[docs]def load_benchmark_images(input_file_path): """Return images contained in the given FITS file. Parameters ---------- input_file_path : str The path of the FITS file to load Returns ------- dict A dictionary containing the loaded images and their metadata Raises ------ WrongFitsFileStructure If `input_file_path` doesn't contain a valid structure """ hdu_list = # open the FITS file # METADATA ################################################################ hdu0 = hdu_list[0] metadata_dict = {} metadata_dict['file_path'] = input_file_path for key, value in hdu0.header.items(): if key not in ('SIMPLE', 'BITPIX', 'NAXIS', 'NAXIS1', 'NAXIS2', 'EXTEND'): metadata_dict[key.lower()] = value metadata_dict['mc_energy_unit'] = hdu0.header.comments['mc_energy'] metadata_dict['mc_azimuth_unit'] = hdu0.header.comments['mc_azimuth'] metadata_dict['mc_altitude_unit'] = hdu0.header.comments['mc_altitude'] metadata_dict['mc_core_x_unit'] = hdu0.header.comments['mc_core_x'] metadata_dict['mc_core_y_unit'] = hdu0.header.comments['mc_core_y'] metadata_dict['mc_height_first_interaction_unit'] = hdu0.header.comments['mc_height_first_interaction'] metadata_dict['optical_foclen_unit'] = hdu0.header.comments['optical_foclen'] metadata_dict['tel_pos_x_unit'] = hdu0.header.comments['tel_pos_x'] metadata_dict['tel_pos_y_unit'] = hdu0.header.comments['tel_pos_y'] metadata_dict['tel_pos_z_unit'] = hdu0.header.comments['tel_pos_z'] # IMAGES ################################################################## if metadata_dict['version'] == 1: if (len(hdu_list) != 7) or (not hdu_list[0].is_image) or (not hdu_list[1].is_image) or (not hdu_list[2].is_image) or (not hdu_list[3].is_image) or (not hdu_list[4].is_image) or (not hdu_list[5].is_image) or (not hdu_list[6].is_image): hdu_list.close() raise WrongFitsFileStructure(input_file_path) hdu0, hdu1, hdu2, hdu3, hdu4, hdu6, hdu7 = hdu_list # IMAGES images_dict = {} images_dict["input_image"] = # "" is a Numpy Array images_dict["reference_image"] = # "" is a Numpy Array images_dict["adc_sum_image"] = # "" is a Numpy Array images_dict["pedestal_image"] = # "" is a Numpy Array images_dict["gains_image"] = # "" is a Numpy Array #images_dict["calibration_image"] = # "" is a Numpy Array images_dict["pixels_position"] = # "" is a Numpy Array images_dict["pixels_mask"] = # "" is a Numpy Array images_dict["input_samples"] = None # TODO images_dict["adc_samples"] = None # TODO images_dict["extracted_samples"] = None # TODO images_dict["peakpos"] = None # TODO elif metadata_dict['version'] == 2: if (len(hdu_list) != 3) or (not hdu_list[0].is_image) or (not hdu_list[1].is_image) or (not hdu_list[2].is_image): hdu_list.close() raise WrongFitsFileStructure(input_file_path) hdu0, hdu1, hdu2 = hdu_list # IMAGES images_dict = {} images_dict["input_image"] = # "" is a Numpy Array images_dict["reference_image"] = # "" is a Numpy Array images_dict["input_samples"] = # "" is a Numpy Array or None images_dict["adc_samples"] = None # TODO images_dict["extracted_samples"] = None # TODO images_dict["peakpos"] = None # TODO images_dict["adc_sum_image"] = None # TODO images_dict["pedestal_image"] = None # TODO images_dict["gains_image"] = None # TODO images_dict["pixels_position"] = None # TODO images_dict["pixels_mask"] = None # TODO else: raise Exception("Unknown version number") # METADATA ################################################################ metadata_dict['npe'] = float(np.nansum(images_dict["reference_image"])) # np.sum() returns numpy.int64 objects thus it must be casted with float() to avoid serialization errors with JSON... metadata_dict['min_npe'] = float(np.nanmin(images_dict["reference_image"])) # np.min() returns numpy.int64 objects thus it must be casted with float() to avoid serialization errors with JSON... metadata_dict['max_npe'] = float(np.nanmax(images_dict["reference_image"])) # np.max() returns numpy.int64 objects thus it must be casted with float() to avoid serialization errors with JSON... hdu_list.close() image2d = Image2D(**images_dict, meta=metadata_dict) return image2d
# SAVE BENCHMARK IMAGE #######################################################
[docs]def save_benchmark_images(img, pe_img, metadata, output_file_path, sample_imgs=None): """Write a FITS file containing pe_img, output_file_path and metadata. Parameters ---------- img: ndarray The "input image" to save (it should be a 2D Numpy array). pe_img: ndarray The "reference image" to save (it should be a 2D Numpy array). output_file_path: str The path of the output FITS file. metadata: tuple A dictionary containing all metadata to write in the FITS file. """ # Check arguments ########################### if img.ndim != 2: raise Exception("The input image should be a 2D numpy array.") if pe_img.ndim != 2: raise Exception("The reference image should be a 2D numpy array.") if sample_imgs is not None and sample_imgs.ndim != 3: raise Exception("The input samples image should be a 3D numpy array.") # Make the dara structure ################### # # hdu0 = fits.PrimaryHDU(img) hdu1 = fits.ImageHDU(pe_img) hdu2 = fits.ImageHDU(sample_imgs) hdu0.header["desc"] = "calibrated image" hdu1.header["desc"] = "pe image" hdu2.header["desc"] = "sample images" # Add metadata ############################## metadata['version'] = 2 if 'file_path' in metadata: del metadata['file_path'] # = simtel_basename # TODO: for some reason it doesn't work anymore even if len(simtel_basename) < 80... if 'simtel_path' in metadata: del metadata['simtel_path'] for key, val in metadata.items(): if type(val) is tuple : hdu0.header[key] = val[0] hdu0.header.comments[key] = val[1] else: hdu0.header[key] = val # Remove file if it already exists ########## if os.path.isfile(output_file_path): os.remove(output_file_path) # Write Fits file ########################### hdu_list = fits.HDUList([hdu0, hdu1, hdu2]) hdu_list.writeto(output_file_path)
# LOAD AND SAVE FITS FILES ###################################################
[docs]def load_fits(input_file_path, hdu_index): """Return the image array contained in the given HDU of the given FITS file. Parameters ---------- input_file_path : str The path of the FITS file to load hdu_index : int The HDU to load within the FITS file (one FITS file can contain several images stored in different HDU) Returns ------- ndarray The loaded image Raises ------ WrongHDUError If `input_file_path` doesn't contain the HDU `hdu_index` NotAnImageError If `input_file_path` doesn't contain a valid image in the HDU `hdu_index` """ hdu_list = # open the FITS file if not (0 <= hdu_index < len(hdu_list)): hdu_list.close() raise WrongHDUError(input_file_path, hdu_index) hdu = hdu_list[hdu_index] if not hdu.is_image: hdu_list.close() raise NotAnImageError(input_file_path, hdu_index) image_array = # "" is a Numpy Array hdu_list.close() return image_array
[docs]def save_fits(img, output_file_path): """Save the `img` image (array_like) to the `output_file_path` FITS file. Parameters ---------- img : array_like The image to save (should be a 2D or a 3D numpy array) output_file_path : str The path of the FITS file where to save the `img` Raises ------ WrongDimensionError If `img` has more than 3 dimensions or less than 2 dimensions. """ if img.ndim not in (2, 3): raise WrongDimensionError() hdu = fits.PrimaryHDU(img) hdu.writeto(output_file_path, overwrite=True) # overwrite=True: overwrite the file if it already exists
[docs]def plot_ctapipe_image(image, geom, ax=None, figsize=(10, 10), title=None, title_fontsize=24, norm='lin', highlight_mask=None, plot_colorbar=True, plot_axis=True, colorbar_orientation='horizontal', colorbar_limits=None): """Plot an image. Parameters ---------- image : array_like Array of values corresponding to the pixels in the CameraGeometry. geometry : `~ctapipe.instrument.CameraGeometry` Definition of the Camera/Image ax : `matplotlib.axes.Axes` A matplotlib axes object to plot on, or None to create a new one title : str (default "Camera") Title to put on camera plot norm : str or `matplotlib.color.Normalize` instance (default 'lin') Normalization for the color scale. Supported str arguments are - 'lin': linear scale - 'log': logarithmic scale (base 10) cmap : str or `matplotlib.colors.Colormap` (default 'hot') Color map to use (see ``) """ if ax is None: fig = plt.figure(figsize=figsize) disp = ctapipe.visualization.CameraDisplay(geom, image=image, ax=ax, norm=norm) #disp.enable_pixel_picker() if colorbar_limits is not None: disp.set_limits_minmax(colorbar_limits[0], colorbar_limits[1]) if plot_colorbar: disp.add_colorbar(ax=disp.axes, fraction=0.04, pad=0.04, orientation=colorbar_orientation) if highlight_mask is not None: disp.highlight_pixels(highlight_mask, linewidth=4, color='white', alpha=0.9) if not plot_axis: disp.axes.set_axis_off() if title is None: title = geom.cam_id disp.axes.set_title(title, fontsize=title_fontsize) if colorbar_orientation == 'horizontal': # disp.axes.text(0.5, 0.01, "Intensity (photoelectrons)", horizontalalignment='center', verticalalignment='top', fontsize=22, transform=disp.axes.transAxes) plt.tight_layout() return disp
[docs]def plot_hillas_parameters_on_axes(ax, image, geom, hillas_params=None, plot_ellipse=True, plot_axis=True, plot_actual_axis_pm=True, plot_inner_axes=False, auto_lim=True, hillas_implementation=2): """Plot the shower ellipse and direction on an existing matplotlib axes.""" x_lim = ax.get_xlim() y_lim = ax.get_ylim() try: if hillas_params is None: hillas_params = get_hillas_parameters(geom, image, implementation=hillas_implementation) centroid = (hillas_params.cen_x.value, hillas_params.cen_y.value) length = hillas_params.length.value width = hillas_params.width.value angle = #print("centroid:", centroid) #print("length:", length) #print("width:", width) #print("angle:", angle) if plot_ellipse: ellipse = Ellipse(xy=centroid, width=length, height=width, angle=np.degrees(angle), fill=False, color='red', lw=2) ax.axes.add_patch(ellipse) title = ax.axes.get_title() ax.title.set_text("{} ({:.2f}°)".format(title, np.degrees(angle))) # Plot the center of the ellipse if plot_ellipse: ax.scatter(*centroid, c="r", marker="x", linewidth=2) # Plot the shower axis p0_x = centroid[0] p0_y = centroid[1] p1_x = p0_x + math.cos(angle) p1_y = p0_y + math.sin(angle) p2_x = p0_x + math.cos(angle + math.pi) p2_y = p0_y + math.sin(angle + math.pi) ax.plot([p1_x, p2_x], [p1_y, p2_y], ':r', lw=2) # Plot the actual axis in pointing source mode if plot_actual_axis_pm: ax.plot([0, p0_x], [0, p0_y], ':g', lw=2) # Plot the shower inner axes if plot_inner_axes: p3_x = p0_x + math.cos(angle) * length / 2. p3_y = p0_y + math.sin(angle) * length / 2. ax.plot([p0_x, p3_x], [p0_y, p3_y], '-r') p4_x = p0_x + math.cos(angle + math.pi/2.) * width / 2. p4_y = p0_y + math.sin(angle + math.pi/2.) * width / 2. ax.plot([p0_x, p4_x], [p0_y, p4_y], '-g') # Set (back) ax limits if auto_lim: ax.set_xlim(x_lim) ax.set_ylim(y_lim) except HillasParameterizationError as err: print(err)
def print_hillas_parameters(image, cam_id, implementation=2): geom = geometry_converter.get_geom1d(cam_id) try: hillas_params = get_hillas_parameters(geom, image, implementation=implementation) print("cen_x:...", hillas_params.cen_x) print("cen_y:...", hillas_params.cen_y) print("length:..", hillas_params.length) print("width:...", hillas_params.width) print("size:....", hillas_params.size, "PE") print("NPE: ....", np.nansum(image), "PE") print("psi:.....", hillas_params.psi) print("miss:....", hillas_params.miss) print("phi:.....", hillas_params.phi) print("r:.......", hillas_params.r) print("kurtosis:", hillas_params.kurtosis) print("skewness:", hillas_params.skewness) except HillasParameterizationError as err: print(err) def hillas_parameters_to_df(image, cam_id, implementation=2): geom = geometry_converter.get_geom1d(cam_id) columns = ['cen_x_m', 'cen_y_m', 'length_m', 'width_m', 'size_pe', 'psi_rad', 'miss_m', 'phi_rad', 'r_m', 'kurtosis', 'skewness'] data = np.full([1, len(columns)], np.nan) df = pd.DataFrame(data=data, columns=columns) try: hillas_params = get_hillas_parameters(geom, image, implementation=implementation) df.loc[0, "cen_x_m"] = df.loc[0, "cen_y_m"] = df.loc[0, "length_m"] = df.loc[0, "width_m"] = df.loc[0, "size_pe"] = hillas_params.size df.loc[0, "psi_rad"] = df.loc[0, "miss_m"] = df.loc[0, "phi_rad"] = df.loc[0, "r_m"] = df.loc[0, "kurtosis"] = hillas_params.kurtosis df.loc[0, "skewness"] = hillas_params.skewness except HillasParameterizationError as err: print(err) return df # MATPLOTLIB ################################################################## COLOR_MAP = cm.gnuplot2
[docs]def mpl_save(img, output_file_path, title=""): """ img should be a 2D numpy array. """ fig = plt.figure(figsize=(8.0, 8.0)) ax = fig.add_subplot(111) ax.set_title(title, fontsize=24) #im = ax.imshow(img, # origin='lower', # interpolation='nearest', # vmin=min(img.min(), 0), # cmap=COLOR_MAP) # Manage NaN values (see and masked =, img) cmap = COLOR_MAP cmap.set_bad('black') im = ax.imshow(masked, origin='lower', interpolation='nearest', cmap=cmap) plt.colorbar(im) # draw the colorbar plt.savefig(output_file_path, bbox_inches='tight') plt.close('all')
[docs]def plot(img, title=""): """ img should be a 2D numpy array. """ fig = plt.figure(figsize=(8.0, 8.0)) ax = fig.add_subplot(111) ax.set_title(title) #im = ax.imshow(img, # origin='lower', # interpolation='nearest', # vmin=min(img.min(), 0), # cmap=COLOR_MAP) # Manage NaN values (see and masked =, img) cmap = COLOR_MAP cmap.set_bad('black') im = ax.imshow(masked, origin='lower', interpolation='nearest', cmap=cmap) plt.colorbar(im) # draw the colorbar
def plot_hist(img, num_bins=50, logx=False, logy=False, x_max=None, title=""): """ """ # Flatten + remove NaN values flat_img = img[np.isfinite(img)] fig = plt.figure(figsize=(8.0, 8.0)) ax = fig.add_subplot(111) ax.set_title(title) if logx: # Setup the logarithmic scale on the X axis vmin = np.log10(flat_img.min()) vmax = np.log10(flat_img.max()) bins = np.logspace(vmin, vmax, num_bins) # Make a range from 10**vmin to 10**vmax else: bins = num_bins if x_max is not None: ax.set_xlim(xmax=x_max) res_tuple = ax.hist(flat_img, bins=bins, log=logy, # Set log scale on the Y axis histtype='bar', alpha=1) if logx: ax.set_xscale("log") # Activate log scale on X axis def _plot_list(img_list, geom_list, title_list=None, hillas_list=None, highlight_mask_list=None, main_title=None): """Plot several images at once.""" num_imgs = len(img_list) fig, ax_tuple = plt.subplots(nrows=1, ncols=num_imgs, figsize=(num_imgs*6, 6)) if title_list is None: title_list = [None for i in img_list] if hillas_list is None: hillas_list = [None for i in img_list] if highlight_mask_list is None: highlight_mask_list = [None for i in img_list] for img, title, ax, geom, plot_hillas, highlight_mask in zip(img_list, title_list, ax_tuple, geom_list, hillas_list, highlight_mask_list): disp = plot_ctapipe_image(img, geom=geom, ax=ax, title=title, norm='lin', highlight_mask=highlight_mask, plot_colorbar=True, plot_axis=False) if plot_hillas: plot_hillas_parameters_on_axes(disp.axes, img, geom) #plt.savefig('tailcut_cleaned_img.pdf') if main_title is not None: fig.suptitle(main_title, fontsize=18) plt.subplots_adjust(top=0.85) def plot_list(img_list, geom_list, title_list=None, hillas_list=None, highlight_mask_list=None, metadata_dict=None): """Plot several images at once. Parameters ---------- img_list A list of 2D numpy array to plot. """ # Main title if metadata_dict is not None: mc_energy = metadata_dict['mc_energy'] if 'mc_energy_unit' in metadata_dict else metadata_dict['mc_energy'][0] mc_energy_unit = metadata_dict['mc_energy_unit'] if 'mc_energy_unit' in metadata_dict else metadata_dict['mc_energy'][1] if 'simtel_path' in metadata_dict: simtel_path = metadata_dict['simtel_path'] else: simtel_path = "" main_title = "{} (Tel. {}, Ev. {}) {:.2E}{}".format(os.path.basename(simtel_path), metadata_dict['tel_id'], metadata_dict['event_id'], mc_energy, mc_energy_unit) else: main_title = None _plot_list(img_list, geom_list, title_list=title_list, hillas_list=hillas_list, highlight_mask_list=highlight_mask_list, main_title=main_title) def mpl_save_list(img_list, geom_list, output_file_path, title_list=None, hillas_list=None, highlight_mask_list=None, metadata_dict=None): """Plot several images at once. Parameters ---------- img_list A list of 2D numpy array to plot. """ # Main title if metadata_dict is not None: mc_energy = metadata_dict['mc_energy'] if 'mc_energy_unit' in metadata_dict else metadata_dict['mc_energy'][0] mc_energy_unit = metadata_dict['mc_energy_unit'] if 'mc_energy_unit' in metadata_dict else metadata_dict['mc_energy'][1] if 'simtel_path' in metadata_dict: simtel_path = metadata_dict['simtel_path'] else: simtel_path = "" main_title = "{} (Tel. {}, Ev. {}) {:.2E}{}".format(os.path.basename(simtel_path), metadata_dict['tel_id'], metadata_dict['event_id'], mc_energy, mc_energy_unit) else: main_title = "" _plot_list(img_list, geom_list, title_list=title_list, hillas_list=hillas_list, highlight_mask_list=highlight_mask_list, main_title=main_title) plt.savefig(output_file_path, bbox_inches='tight') plt.close('all') # DEBUG ####################################################################### def export_image_as_plain_text(image, output_file_path): fd = open(output_file_path, 'w') for x in image: for y in x: print("{:5.2f}".format(y), end=" ", file=fd) print("", file=fd) fd.close()