import os
import sys
import numpy as np
import h5py
import torch
from concurrent.futures import ProcessPoolExecutor
from sklearn.preprocessing import LabelEncoder
from loky import get_reusable_executor
import pickle
from . import helper, normalization
[docs]
def load_data(file_path, file_type="h5", verbose: bool = False):
"""
Load data from either an HDF5 file or .npy files.
"""
if verbose:
print(f"Loading data from {file_path}...")
if file_type == "h5":
with h5py.File(file_path, "r") as h5file:
events = np.array(h5file["events"])
jets = np.array(h5file["jets"])
constituents = np.array(h5file["constituents"])
elif file_type == "npy":
raise NotImplementedError(
"Loading .npy files is not yet supported. Please retry with --mode = convert_csv and --options = h5 first."
)
else:
raise ValueError(
"Unsupported file type. First convert to 'h5' or 'npy' using --mode = convert_csv and --options = [chosen file_type]."
)
return events, jets, constituents
[docs]
def encode_pid_parallel(constits_tensor_path, encoded_tensor_path, pid_map_path):
"""
Parallelized: Reads the constituent-level tensor and encodes the pid column using categorical encoding.
Saves the new tensor and the pid map for later retrieval.
"""
# Load constituent-level tensor
constits_tensor = torch.load(constits_tensor_path)
constits_array = constits_tensor.numpy()
# Extract pid column
pid_column = constits_array[:, :, 3]
# Perform categorical encoding
label_encoder = LabelEncoder()
encoded_pids = label_encoder.fit_transform(pid_column)
# Replace pid column in the array
constits_array[:, :, 3] = encoded_pids
# Save the encoded tensor
encoded_tensor = torch.tensor(constits_array, dtype=torch.float32)
torch.save(encoded_tensor, encoded_tensor_path)
# Save the pid map (original to encoded)
pid_map = {
"original_pid": label_encoder.classes_.tolist(),
"encoded_pid": list(range(len(label_encoder.classes_))),
}
pid_map_df = pd.DataFrame(pid_map)
# Parallel write using loky
with get_reusable_executor(max_workers=1) as executor:
executor.submit(pid_map_df.to_csv, pid_map_path, index=False).result()
print(f"Encoded tensor saved to {encoded_tensor_path}")
print(f"PID map saved to {pid_map_path}")
[docs]
def decode_pid(encoded_tensor_path, pid_map_path, decoded_tensor_path):
"""
Reads the encoded tensor and pid map, decodes the pids back to their original values,
and saves the decoded tensor.
"""
# Load the encoded tensor
encoded_tensor = torch.load(encoded_tensor_path)
encoded_array = encoded_tensor.numpy()
# Load the PID map
pid_map_df = pd.read_csv(pid_map_path)
encoded_to_original = dict(
zip(pid_map_df["encoded_pid"], pid_map_df["original_pid"])
)
# Decode the pid column
pid_column = encoded_array[:, :, 3]
decoded_pids = np.vectorize(encoded_to_original.get)(pid_column)
# Replace the pid column with the decoded values
encoded_array[:, :, 3] = decoded_pids
# Save the decoded tensor
decoded_tensor = torch.tensor(encoded_array, dtype=torch.float32)
torch.save(decoded_tensor, decoded_tensor_path)
print(f"Decoded tensor saved to {decoded_tensor_path}")
[docs]
def process_event(
evt_id, evt_jets, evt_constits, n_jets=3, n_constits=15, verbose: bool = False
):
"""
Process a single event to select top jets and their top constituents.
"""
if verbose:
print(
f"Processing event {evt_id} with {len(evt_jets)} jets and {len(evt_constits)} constituents..."
)
# Sort jets by pT (column 4)
evt_jets = evt_jets[np.argsort(-evt_jets[:, 4])]
# Select the top n_jets, zero-pad if necessary
selected_jets = np.zeros((n_jets, evt_jets.shape[1]), dtype=evt_jets.dtype)
selected_jets[:, 0] = evt_id # Retain event ID for all jets
selected_jets[: len(evt_jets), :] = evt_jets[:n_jets]
# For each selected jet, select its top n_constits, zero-padding as needed
selected_constits = np.zeros(
(n_jets * n_constits, evt_constits.shape[1]), dtype=evt_constits.dtype
)
selected_constits[:, 0] = evt_id # Retain event ID for all constituents
for jet_idx in range(min(len(evt_jets), n_jets)):
jet_id = evt_jets[jet_idx, 1] # Jet ID within the event
jet_constits = evt_constits[evt_constits[:, 1] == jet_id]
jet_btag = evt_jets[jet_idx, 3]
# Sort constituents by pT (column 4), remove PiD and select the top n_constits
jet_constits = jet_constits[np.argsort(-jet_constits[:, 4])]
jet_constits[:, 3] = (
jet_btag # Replace constituent pid with btag info of corresponding jet
)
num_constits = min(len(jet_constits), n_constits)
start_idx = jet_idx * n_constits
end_idx = start_idx + num_constits
selected_constits[start_idx:end_idx, :] = jet_constits[:num_constits]
selected_constits[start_idx:end_idx, 1] = jet_id # Retain jet ID
return selected_jets, selected_constits
[docs]
def parallel_select_top_jets_and_constituents(
jets, constituents, n_jets=3, n_constits=15, n_workers=4, verbose: bool = False
):
"""
Parallelized selection of top jets and constituents.
"""
if verbose:
print(
f"Selecting top {n_jets} jets and their top {n_constits} constituents in parallel..."
)
# Group jets and constituents by event ID
event_ids = np.unique(jets[:, 0])
event_data = []
for evt_id in event_ids:
evt_jets = jets[jets[:, 0] == evt_id]
evt_constits = constituents[constituents[:, 0] == evt_id]
event_data.append((evt_id, evt_jets, evt_constits))
# Parallel processing of events
if verbose:
print(
f"Processing {len(event_data)} events in parallel using {n_workers} workers..."
)
jet_results = []
constits_results = []
with ProcessPoolExecutor(max_workers=n_workers) as executor:
futures = [
executor.submit(process_event, *event, n_jets, n_constits)
for event in event_data
]
for future in futures:
jets, constits = future.result()
jet_results.append(jets)
constits_results.append(constits)
# Combine results into single arrays
jet_selection = np.array(jet_results)
constits_selection = np.array(constits_results)
if verbose:
print(
f"shape of jet_selection: {jet_selection.shape}\nshape of constits_selection: {constits_selection.shape}"
)
return jet_selection, constits_selection
[docs]
def process_and_save_tensors(
in_path, out_path, output_prefix, config, verbose: bool = False
):
"""
Process the input file, parallelize selections, and save the results as PyTorch tensors.
"""
file_type = config.file_type
n_jets = config.num_jets
n_constits = config.num_constits
n_workers = config.parallel_workers
norm = config.normalizations
if verbose:
print(
f"Processing {n_jets} jets and {n_constits} constituents from {in_path}..."
)
# Load the data
events, jets, constituents = load_data(in_path, file_type, verbose)
if verbose:
print(
f"Loaded {len(events)} events, {len(jets)} jets, and {len(constituents)} constituents from {in_path}"
)
print(
f"Events shape: {events.shape}\nJets shape: {jets.shape}\nConstituents shape: {constituents.shape}"
)
# Apply normalizations
if norm:
if verbose:
print(f"Normalizing data using {norm}...")
if norm == "pj_custom":
jets_norm, jet_comp_scaler = normalization.normalize_jet_pj_custom(jets)
constituents_norm, constit_comp_scaler = (
normalization.normalize_constit_pj_custom(constituents)
)
else:
jets_norm, jet_comp_scaler = helper.normalize_data(jets, norm)
constituents_norm, constit_comp_scaler = helper.normalize_data(
constituents, norm
)
if verbose:
print("Normalization complete.")
print(
f"Jets shape after normalization: {jets_norm.shape}\nConstituents shape after normalization: {constituents_norm.shape}"
)
# Parallel processing for top jets and constituents
if verbose:
print(
f"Selecting top {n_jets} jets and their top {n_constits} constituents in parallel..."
)
jet_selection, constits_selection = parallel_select_top_jets_and_constituents(
jets_norm, constituents_norm, n_jets, n_constits, n_workers
)
if verbose:
print(
f"Jets shape after selection: {jet_selection.shape}\nConstituents shape after selection: {constits_selection.shape}"
)
# Convert to PyTorch tensors
if verbose:
print("Converting to PyTorch tensors...")
evt_tensor, jet_tensor, constits_tensor = [
helper.convert_to_tensor(data)
for data in [events, jet_selection, constits_selection]
]
# Save tensors
if verbose:
print(
f"Saving tensors to {output_prefix}_events.pt, {output_prefix}_jets.pt and {output_prefix}_constituents.pt..."
)
torch.save(evt_tensor, out_path + f"/{output_prefix}_events.pt")
torch.save(jet_tensor, out_path + f"/{output_prefix}_jets.pt")
torch.save(constits_tensor, out_path + f"/{output_prefix}_constituents.pt")
# Save normalization scalers as pickle files
if norm:
if verbose:
print(
f"Saving normalization scalers to {output_prefix}_jet_scaler.pkl and {output_prefix}_constituent_scaler.pkl..."
)
jet_scaler_path = out_path + "/" + f"{output_prefix}_jet_scaler.pkl"
constit_scaler_path = out_path + "/" + f"{output_prefix}_constituent_scaler.pkl"
with open(jet_scaler_path, "wb") as f:
pickle.dump(jet_comp_scaler, f)
with open(constit_scaler_path, "wb") as f:
pickle.dump(constit_comp_scaler, f)
if verbose:
print(
f"Tensors saved to {output_prefix}_events.pt, {output_prefix}_jets.pt and {output_prefix}_constituents.pt"
)
if norm:
print(
f"Normalization scalers saved to {output_prefix}_jet_scaler.pkl and {output_prefix}_constituent_scaler.pkl"
)