Source code for bead.src.utils.diagnostics

import os

import matplotlib.colors
import matplotlib.pyplot as plt
import matplotlib.ticker
import numpy as np
import torch

import io
import pstats
import cProfile
from pstats import SortKey
import torch
from torch.profiler import profile, record_function, ProfilerActivity


[docs] def get_mean_node_activations(input_dict: dict) -> dict: output_dict = {} for kk in input_dict: output_dict_layer = [] for node in input_dict[kk].T: output_dict_layer.append(torch.mean(node).item()) output_dict[kk] = output_dict_layer return output_dict
[docs] def dict_to_square_matrix(input_dict: dict) -> np.array: """Function changes an input dictionary into a square np.array. Adds NaNs when the dimension of a dict key is less than of the final square matrix. Args: input_dict (dict) Returns: square_matrix (np.array) """ means_dict = get_mean_node_activations(input_dict) max_number_of_nodes = 0 number_of_layers = len(input_dict) for kk in means_dict: if len(means_dict[kk]) > max_number_of_nodes: max_number_of_nodes = len(means_dict[kk]) square_matrix = np.empty((number_of_layers, max_number_of_nodes)) counter = 0 for kk in input_dict: layer = np.array(means_dict[kk]) if len(layer) == max_number_of_nodes: square_matrix[counter] = layer else: layer = np.append( layer, np.zeros(max_number_of_nodes - len(layer)) + np.nan ) square_matrix[counter] = layer counter += 1 return square_matrix
[docs] def plot(data: np.array, output_path: str) -> None: nodes_numbers = np.array([0, 50, 100, 200]) fig, ax = plt.subplots() NAP = ax.imshow( data.T, cmap="RdBu_r", interpolation="nearest", aspect="auto", origin="lower", norm=matplotlib.colors.CenteredNorm(), ) colorbar = plt.colorbar(NAP) colorbar.set_label("Activation") ax.set_title("Neural Activation Pattern") ax.set_xlabel("Layers") ax.set_ylabel("Number of nodes") xtick_loc = ax.get_xticks().tolist() ax.xaxis.set_major_locator(matplotlib.ticker.FixedLocator(xtick_loc)) ax.set_xticklabels(["", "en1", "en2", "en3", "de1", "de2", "de3", ""]) ax.set_yticks(nodes_numbers) ax.figure.savefig(os.path.join(output_path, "diagnostics.pdf"))
[docs] def nap_diagnose(input_path: str, output_path: str) -> None: input = np.load(input_path) plot(input, output_path) print( "Diagnostics saved as diagnostics.pdf in the diagnostics folder of your project." )
[docs] def pytorch_profile(f, *args, **kwargs): """ This function performs PyTorch profiling of CPU, GPU time and memory consumed by the function f execution. Args: f (callable): The function to be profiled. Returns: result: The result of the function `f` execution. """ if torch.cuda.is_available(): activities = [ProfilerActivity.CPU, ProfilerActivity.CUDA] else: activities = [ProfilerActivity.CPU] # Start profiler before the function will be executed with profile( activities=activities, on_trace_ready=torch.profiler.tensorboard_trace_handler( "log/baler", worker_name="worker0" ), schedule=torch.profiler.schedule(wait=0, warmup=0, active=1, repeat=1), record_shapes=True, with_stack=True, profile_memory=True, ) as prof: with record_function(f"{f.__name__}"): # Call the function result = f(*args, **kwargs) prof.step() prof.stop() # Print the CPU time for each torch operation print(prof.key_averages().table(sort_by="cpu_time_total")) # Store the information about CPU and GPU usage if torch.cuda.is_available(): prof.export_stacks("profiler_stacks.json", "self_cuda_time_total") # Store the results to the .json file prof.export_stacks("/tmp/profiler_stacks.json", "self_cpu_time_total") return result
[docs] def c_profile(func, *args, **kwargs): """ Profile the function func with cProfile. Args: func (callable): The function to be profiled. Returns: result: The result of the function `func` execution. """ pr = cProfile.Profile() pr.enable() # Execute the function and get its result result = func(*args, **kwargs) pr.disable() s = io.StringIO() sortby = SortKey.CUMULATIVE ps = pstats.Stats(pr, stream=s).sort_stats(sortby) ps.print_stats() print(s.getvalue()) return result