# Copyright 2025 BEAD Contributors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
BEAD (Behavior and Anomaly Detection) main module.
This module serves as the entry point for the BEAD framework, providing command-line
interface functionality for anomaly detection workflows. It orchestrates the complete
pipeline from data preparation to model training, inference, and result visualization.
"""
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp # May not be strictly needed if using torchrun # noqa: F401
from .src.utils import ggl
[docs]
def main():
"""Process command-line arguments to execute BEAD functionality.
Parses command-line arguments and executes the appropriate functionality based on the
specified mode. The available modes are:
Args:
Arguments are parsed from command line, not passed directly to this function.
Modes:
new_project: Create a new project with default configuration.
convert_csv: Convert CSV data to HDF5 or NPY format.
prepare_inputs: Process data files into PyTorch tensors for training.
train: Train a model using the prepared data.
detect: Run inference on data to detect anomalies.
plot: Generate visualization plots for results.
diagnostics: Run performance profiling and diagnostics.
chain: Execute the full pipeline from data conversion to visualization.
Raises:
NameError: If the specified mode is not recognized.
"""
(
config,
mode,
options,
workspace_name,
project_name,
verbose,
) = ggl.get_arguments()
# Initialize DDP if configured and multiple GPUs are available
local_rank = 0
world_size = 1
is_ddp_active = False
if (
config
and hasattr(config, "use_ddp")
and config.use_ddp
and torch.cuda.is_available()
and torch.cuda.device_count() > 1
):
try:
# LOCAL_RANK and WORLD_SIZE are set by torchrun or torch.distributed.launch
local_rank = int(os.environ.get("LOCAL_RANK", 0))
world_size = int(os.environ.get("WORLD_SIZE", 1))
if (
world_size > 1
): # Proceed with DDP only if world_size indicates multiple processes
print(
f"Initializing DDP: RANK {os.environ.get('RANK')}, LOCAL_RANK {local_rank}, WORLD_SIZE {world_size}"
)
torch.cuda.set_device(local_rank)
dist.init_process_group(backend="nccl", init_method="env://")
is_ddp_active = True
if local_rank == 0 and verbose:
print(
f"DDP initialized. World size: {world_size}. Running on {torch.cuda.device_count()} GPUs."
)
else:
if verbose:
print(
"DDP use_ddp is True, but world_size is 1. Running in non-DDP mode."
)
# Fallback to non-DDP if world_size is 1 (e.g. launched without torchrun on single GPU)
config.use_ddp = False
except KeyError:
print(
"DDP environment variables (LOCAL_RANK, WORLD_SIZE) not set. Running in non-DDP mode."
)
config.use_ddp = False # Fallback if env vars are missing
except Exception as e:
print(f"Error initializing DDP: {e}. Running in non-DDP mode.")
config.use_ddp = False # Fallback on any other DDP init error
# Pass DDP status and ranks to helper functions if needed, or store in config
if config: # Ensure config is not None (e.g. for new_project mode)
config.is_ddp_active = is_ddp_active
config.local_rank = local_rank
config.world_size = world_size
# Set CUDNN benchmark for potential speedup if input sizes are consistent
if torch.cuda.is_available():
torch.backends.cudnn.benchmark = True
# Define paths dict for the different paths used frequently in the pipeline
paths = {
"workspace_path": os.path.join("bead/workspaces", workspace_name),
"project_path": os.path.join("bead/workspaces", workspace_name, project_name),
"data_path": os.path.join("bead/workspaces", workspace_name, "data"),
"output_path": os.path.join(
"bead/workspaces", workspace_name, project_name, "output"
),
}
# Check what the options flag is set to and override the default if necessary
if options == "h5" or options == "npy":
config.file_type = options
elif options == "overlay_roc":
config.skip_to_roc = True
# Call the appropriate ggl function based on the mode
if mode == "new_project":
ggl.create_new_project(workspace_name, project_name, verbose)
elif mode == "convert_csv":
ggl.convert_csv(paths, config, verbose)
elif mode == "prepare_inputs":
ggl.prepare_inputs(paths, config, verbose)
elif mode == "train":
ggl.run_training(paths, config, verbose)
elif mode == "detect":
ggl.run_inference(paths, config, verbose)
elif mode == "plot":
ggl.run_plots(paths, config, verbose)
elif mode == "diagnostics":
ggl.run_diagnostics(paths, config, verbose)
elif mode == "chain":
ggl.run_full_chain(
workspace_name, project_name, paths, config, options, verbose
)
else:
raise NameError(
"BEAD mode "
+ mode
+ " not recognised. Use < bead --help > to see available modes."
)
# Cleanup DDP
if is_ddp_active:
dist.destroy_process_group()
if __name__ == "__main__":
main()