Source code for ctlearn_manager.utils.RF_counterpart

import glob

import astropy.units as u
import numpy as np
from astropy.coordinates import EarthLocation, SkyCoord
from astropy.io import fits
from tqdm import tqdm

from ctlearn_manager.utils import Cuts, CutType, DefaultCuts
from ctlearn_manager.utils.DL2_processing import DL2DataProcessor
from ctlearn_manager.utils.utils import set_mpl_style

from .. import TriModelCollection
from ..tri_model import CTLearnTriModelManager
import os
from concurrent.futures import ProcessPoolExecutor
from ctlearn_manager.utils import (
    get_avg_pointing,
    get_closest_rf_irf_files,
    ParticleType
    )

[docs] def extract_cuts_RF(args): file, cut, pointing_table, pointing_alt_key, pointing_az_key = args zenith, azimuth = get_avg_pointing(file, pointing_table, pointing_alt_key, pointing_az_key) cuts_file = get_closest_rf_irf_files(zenith.value, cut) with fits.open(cuts_file, mode="readonly") as hdul: theta_cut = hdul["THETA_CUTS"].data["cut"] gammaness_cut = hdul["GH_CUTS"].data["GH_CUTS"] return theta_cut, gammaness_cut
[docs] class RFCounterpart(DL2DataProcessor): def __init__( self, # dl2_processed_dir, CTLearn_TriModel_Manager: CTLearnTriModelManager or TriModelCollection, DL2_files=None, runs=None, # cluster_configuration=ClusterConfiguration(), cuts: list[Cuts] = [DefaultCuts.GH_0_9.value], source_position=SkyCoord.from_name("Crab"), default_E_bins=np.logspace( np.log10(0.02), np.log10(20), int((np.log10(20) - np.log10(0.02)) * 5 + 1) ) * u.TeV, pointing_table="/dl2/event/telescope/parameters/LST_LSTCam", intensity_cut: int=80, global_gammaness_cut: float=0., workers=None ): self.workers = workers self.CTLearn = False self.intensity_cut = intensity_cut self.global_gammaness_cut = global_gammaness_cut if isinstance(CTLearn_TriModel_Manager, CTLearnTriModelManager): self.CTLearnTriModelCollection = TriModelCollection( [CTLearn_TriModel_Manager], cluster_configuration=CTLearn_TriModel_Manager.cluster_configuration, ) else: self.CTLearnTriModelCollection = CTLearn_TriModel_Manager self.cluster_configuration = self.CTLearnTriModelCollection.cluster_configuration if ( # (self.cluster_configuration.cluster == "lst-cluster") (runs is not None) and (DL2_files is None) ): self.DL2_files = [] for run in tqdm(runs, desc="Fetching DL2 files from runs"): DL2_file_run = glob.glob( f"/fefs/aswg/data/real/DL2/*/v0.10/tailcut*/nsb_tuning_*/dl2_LST-1.Run{run:05d}.h5" )[0] self.DL2_files.append(DL2_file_run) # self.DL2_files.append(f"/fefs/aswg/data/real/DL2/20220331/v0.10/tailcut84/nsb_tuning_0.14/dl2_LST-1.Run{run}.h5") elif DL2_files is not None: self.DL2_files = DL2_files else: raise ValueError( "Either DL2_files or runs must be provided, if runs are provided, be sure to run on the LST cluster." ) self.DL2_files = np.sort(self.DL2_files) # self.CTLearnTriModelManager = CTLearnTriModelManager self.stereo = self.CTLearnTriModelCollection.tri_models[0].stereo self.pointing_table = pointing_table self.reconstruction_method = "RF" self.reco_field_suffix = ( self.reconstruction_method if self.stereo else f"{self.reconstruction_method}_tel" ) self.source_position = source_position if self.CTLearn: self.dl2_processed_dir = self.CTLearnTriModelCollection.tri_models[0].project_directories.dl2_post_processed_data_directory else: self.dl2_processed_dir = self.CTLearnTriModelCollection.tri_models[0].project_directories.dl2_post_processed_data_rf_directory if not os.path.exists(self.dl2_processed_dir): os.makedirs(self.dl2_processed_dir, exist_ok=True) self.telscope_names = self.CTLearnTriModelCollection.tri_models[ 0 ].telescope_names self.stereo = self.CTLearnTriModelCollection.tri_models[0].stereo # self.stereo = CTLearnTriModelManager.stereo self.cuts = cuts # self.gammaness_cut = gammaness_cut self.reconstruction_method = "RF" self.reco_field_suffix = ( self.reconstruction_method if self.stereo else f"{self.reconstruction_method}_tel" ) self.telescope_id = ( self.CTLearnTriModelCollection.tri_models[0].telescope_ids if self.stereo else self.CTLearnTriModelCollection.tri_models[0].telescope_ids[0] ) # FIXME other telescopes ? # self.irfs = CTLearnTriModelManager.irfs # E_bins_tot = np.empty(len(self.cuts), dtype=object) # GH_cuts_tot = np.empty(len(self.cuts), dtype=object) # Theta_cuts_tot = np.empty(len(self.cuts), dtype=object) # for i, cut in enumerate(self.cuts): # if ( # cut.cut_type == CutType.EFFICIENCY_OPTIMIZED # or cut.cut_type == CutType.SENSITIVITY_OPTIMIZED # ): # # print(self.edep_cuts) # # get E bins from IRFs cuts file # cuts_file = self.CTLearnTriModelCollection.tri_models[ # 0 # ].direction_model.get_IRF_data()[ # 1 # ][ # 0 # ] # index 1 is the cut file table, index 0 is the first line (the path to the file) # with fits.open(cuts_file, mode="readonly") as hdul: # E_bins = hdul["GH_CUTS"].data["low"] # E_bins = np.append(E_bins, hdul["GH_CUTS"].data["high"][-1]) * u.TeV # E_bins_tot[i] = E_bins # GH_cuts = hdul["GH_CUTS"].data["cut"] # GH_cuts_tot[i] = GH_cuts # Theta_cuts = hdul["RAD_MAX"].data["cut"] # Theta_cuts_tot[i] = Theta_cuts # else: # E_bins = default_E_bins # E_bins_tot[i] = E_bins # self.E_bins = E_bins_tot # self.GH_cuts = GH_cuts_tot # self.Theta_cuts = Theta_cuts_tot self.set_keys() if any("LST" in name and "1" in name for name in self.telscope_names): # print("LST1 is in the telescope names") self.telescope_location = EarthLocation( lon=-17.89149701 * u.deg, lat=28.76152611 * u.deg, # height of central pin + distance from pin to elevation axis height=2184 * u.m + 15.883 * u.m, ) if self.CTLearn: self.dl2_processed_dir = self.CTLearnTriModelCollection.tri_models[0].project_directories.dl2_post_processed_data_directory else: self.dl2_processed_dir = self.CTLearnTriModelCollection.tri_models[0].project_directories.dl2_post_processed_data_rf_directory if not os.path.exists(self.dl2_processed_dir): os.makedirs(self.dl2_processed_dir, exist_ok=True) self.process_DL2_data() self.load_processed_data() self.cut_file_theta_cuts = [] self.cut_file_gammaness_cuts = [] for i, cut in enumerate(self.cuts): if ( cut.cut_type == CutType.EFFICIENCY_OPTIMIZED or cut.cut_type == CutType.SENSITIVITY_OPTIMIZED ): file_args = [ (file, cut, self.pointing_table, self.pointing_alt_key, self.pointing_az_key) for file in self.DL2_files ] file_theta_cuts = [] file_gammaness_cuts = [] with ProcessPoolExecutor() as executor: results = list(tqdm(executor.map(extract_cuts_RF, file_args), desc=f"Creating masks [{cut.get_label()}]", total=len(file_args))) for theta_cut, gammaness_cut in results: file_theta_cuts.append(theta_cut) file_gammaness_cuts.append(gammaness_cut) self.cut_file_theta_cuts.append(file_theta_cuts) self.cut_file_gammaness_cuts.append(file_gammaness_cuts) E_bins_tot = np.empty(len(self.cuts), dtype=object) # GH_cuts_tot = np.empty(len(self.cuts), dtype=object) # Theta_cuts_tot = np.empty(len(self.cuts), dtype=object) for i, cut in enumerate(self.cuts): if ( cut.cut_type == CutType.EFFICIENCY_OPTIMIZED # or cut.cut_type == CutType.SENSITIVITY_OPTIMIZED ): # for modelindex, file in zip(self.corresponding_model_indexs, self.DL2_files): # print(self.edep_cuts) # get E bins from IRFs cuts file # zenith, azimuth = get_avg_pointing(file, pointing_table, pointing_alt_key, pointing_az_key) cuts_file = get_closest_rf_irf_files(10.00, cut) # zenith, azimuth = self.CTLearnTriModelCollection.tri_models[ # 0 # FIXME # ].project_directories.get_available_MC_directions(ParticleType.GAMMA_POINT) # cuts_file = self.CTLearnTriModelCollection.tri_models[ # 0 # FIXME # ].direction_model.project_directories.get_irf_files(zenith[0], azimuth[0], cut)['cuts_file'] with fits.open(cuts_file, mode="readonly") as hdul: E_bins = hdul["GH_CUTS"].data["true_energy_low"] E_bins = np.append(E_bins, hdul["GH_CUTS"].data["true_energy_high"][-1]) * u.TeV E_bins_tot[i] = E_bins # GH_cuts = hdul["GH_CUTS"].data["cut"] # GH_cuts_tot[i] = GH_cuts # Theta_cuts = hdul["RAD_MAX"].data["cut"] # Theta_cuts_tot[i] = Theta_cuts else: E_bins = default_E_bins E_bins_tot[i] = E_bins self.E_bins = E_bins_tot
# set_mpl_style()
[docs] class LazyRFCounterpart(DL2DataProcessor): def __init__( self, DL2DataProcessor: DL2DataProcessor, dl2_processed_dir, gammaness_cut=0.9, lstchain_version="0.10", ): self.cluster_configuration = ( DL2DataProcessor.CTLearnTriModelManager.cluster_configuration ) self.lstchain_version = lstchain_version runs = [] for dl2 in DL2DataProcessor.dl2s: for obs_id in dl2["obs_id"]: run_temp = int(str(obs_id)[:4]) if run_temp not in runs: runs.append(run_temp) print(runs) if (self.cluster_configuration.cluster == "lst-cluster") and (runs is not None): self.DL2_files = [] for run in runs: # try: DL2_file_run = glob.glob( f"/fefs/aswg/data/real/DL2/*/v{self.lstchain_version}/tailcut*/nsb_tuning_*/dl2_LST-1.Run{run:05d}.h5" )[0] # except: # DL2_file_run = glob.glob(f"/fefs/aswg/data/real/DL2/*/v0.9/tailcut*/nsb_tuning_*/dl2_LST-1.Run{run:05d}.h5")[0] self.DL2_files.append(DL2_file_run) # self.DL2_files.append(f"/fefs/aswg/data/real/DL2/20220331/v0.10/tailcut84/nsb_tuning_0.14/dl2_LST-1.Run{run}.h5") else: raise ValueError( "Either DL2_files or runs must be provided, if runs are provided, be sure to run on the LST cluster." ) self.CTLearnTriModelManager = DL2DataProcessor.CTLearnTriModelManager self.source_position = DL2DataProcessor.source_position self.dl2_processed_dir = dl2_processed_dir self_telscope_names = DL2DataProcessor.CTLearnTriModelManager.telescope_names self.stereo = DL2DataProcessor.CTLearnTriModelManager.stereo self.gammaness_cut = gammaness_cut self.reconstruction_method = "RF" self.reco_field_suffix = ( self.reconstruction_method if self.stereo else f"{self.reconstruction_method}_tel" ) self.telescope_id = ( DL2DataProcessor.CTLearnTriModelManager.telescope_ids if self.stereo else DL2DataProcessor.CTLearnTriModelManager.telescope_ids[0] ) # self.irfs = CTLearnTriModelManager.irfs self.CTLearn = False self.set_keys() if any("LST" in name and "1" in name for name in self_telscope_names): # print("LST1 is in the telescope names") self.telescope_location = EarthLocation( lon=-17.89149701 * u.deg, lat=28.76152611 * u.deg, # height of central pin + distance from pin to elevation axis height=2184 * u.m + 15.883 * u.m, ) self.process_DL2_data() self.load_processed_data() set_mpl_style()