import torch from torch import nn import torch.multiprocessing as mp from torch.nn.parallel import DistributedDataParallel as DDP from torch.amp import GradScaler import numpy as np from copy import deepcopy from argparse import ArgumentParser import os, json, hashlib, yaml current_dir = os.path.abspath(os.path.dirname(__file__)) from datasets import standardize_dataset_name from models import get_model from utils import setup, cleanup, init_seeds, get_logger, get_config, barrier from utils import get_dataloader, get_loss_fn, get_optimizer, load_checkpoint, save_checkpoint from utils import get_writer, update_train_result, update_eval_result, log, calc_bin_center from train import train from evaluate import evaluate parser = ArgumentParser(description="Train an EBC model.") # Parameters for model parser.add_argument("--model_name", type=str, default="CLIP_RN50", help="The model to train.") parser.add_argument("--block_size", type=int, default=16, choices=[7, 8, 14, 16, 28, 32], help="The block sizes for the model.") parser.add_argument("--clip_weight_name", type=str, default=None, help="The weight name for CLIP models.") parser.add_argument("--norm", type=str, default="none", choices=["none", "bn", "ln"], help="The normalization layer to use. 'none' means no normalization layer will be detected automatically, 'bn' means batch normalization, 'ln' means layer normalization.") parser.add_argument("--act", type=str, default="none", choices=["none", "relu", "gelu"], help="The activation function to use. 'none' means no activation function will be detected automatically, 'relu' means ReLU, 'gelu' means GELU.") parser.add_argument("--num_vpt", type=int, default=96, help="The number of visual prompt tokens.") parser.add_argument("--vpt_drop", type=float, default=0.0, help="The dropout rate for visual prompt tokens.") parser.add_argument("--adapter", action="store_true", help="Use adapter for the model. This will freeze the backbone and only train the adapter layers and newly added layers.") parser.add_argument("--adapter_reduction", type=int, default=4, help="The reduction ratio for the adapter layers. This will be used to reduce the number of parameters in the adapter layers.") parser.add_argument("--lora", action="store_true", help="Use LoRA for the model. This will freeze the backbone and only train the LoRA layers and newly added layers.") parser.add_argument("--lora_rank", type=int, default=16, help="The rank for the LoRA layers. This will be used to reduce the number of parameters in the LoRA layers.") parser.add_argument("--lora_alpha", type=float, default=32.0, help="The alpha for the LoRA layers. This will be used to scale the LoRA layers.") parser.add_argument("--lora_dropout", type=float, default=0.0, help="The dropout rate for the LoRA layers.") # Parameters for dataset parser.add_argument("--dataset", type=str, required=True, help="The dataset to train on.") parser.add_argument("--in_memory_dataset", action="store_true", help="Load the dataset into memory. This will speed up training but requires more memory.") parser.add_argument("--input_size", type=int, default=None, help="The size of the input image.") parser.add_argument("--batch_size", type=int, default=None, help="The training batch size.") parser.add_argument("--num_crops", type=int, default=None, help="The number of crops for multi-crop training.") parser.add_argument("--aug_min_scale", type=float, default=None, help="The minimum scale for random scale augmentation.") parser.add_argument("--aug_max_scale", type=float, default=None, help="The maximum scale for random scale augmentation.") parser.add_argument("--aug_brightness", type=float, default=None, help="The brightness factor for random color jitter augmentation.") parser.add_argument("--aug_contrast", type=float, default=None, help="The contrast factor for random color jitter augmentation.") parser.add_argument("--aug_saturation", type=float, default=None, help="The saturation factor for random color jitter augmentation.") parser.add_argument("--aug_hue", type=float, default=None, help="The hue factor for random color jitter augmentation.") parser.add_argument("--aug_kernel_size", type=int, default=None, help="The kernel size for Gaussian blur augmentation.") parser.add_argument("--aug_saltiness", type=float, default=None, help="The saltiness for pepper salt noise augmentation.") parser.add_argument("--aug_spiciness", type=float, default=None, help="The spiciness for pepper salt noise augmentation.") parser.add_argument("--aug_blur_prob", type=float, default=None, help="The probability for Gaussian blur augmentation.") # Parameters for evaluation parser.add_argument("--sliding_window", action="store_true", help="Use sliding window strategy for evaluation.") parser.add_argument("--stride", type=int, default=None, help="The stride for sliding window strategy.") parser.add_argument("--max_input_size", type=int, default=4096, help="The maximum size of the input image in evaluation. Images larger than this will be processed using sliding window by force to avoid OOM.") parser.add_argument("--max_num_windows", type=int, default=64, help="The maximum number of windows to be simultaneously processed.") parser.add_argument("--resize_to_multiple", action="store_true", help="Resize the image to a multiple of the input size.") # Parameters for loss function parser.add_argument("--reg_loss", type=str, default="zipnll", choices=["zipnll", "pnll", "dm", "msmae", "mae", "mse"], help="The regression loss function.") parser.add_argument("--aux_loss", type=str, default="msmae", choices=["zipnll", "pnll", "dm", "msmae", "mae", "mse", "none"], help="The auxiliary loss function.") parser.add_argument("--weight_cls", type=float, default=1.0, help="The weight for classification loss.") parser.add_argument("--weight_reg", type=float, default=1.0, help="The weight for regression loss.") parser.add_argument("--weight_aux", type=float, default=1.0, help="The weight for auxiliary loss.") parser.add_argument("--numItermax", type=int, default=100, help="The maximum number of iterations for the OT/POT solver.") parser.add_argument("--regularization", type=float, default=10.0, help="The regularization term for the OT/POT loss.") parser.add_argument("--scales", type=int, nargs="+", default=[1, 2, 4], help="The scales for multi-scale mae loss.") parser.add_argument("--min_scale_weight", type=float, default=0.0, help="The minimum weight for multi-scale mae loss.") parser.add_argument("--max_scale_weight", type=float, default=1.0, help="The maximum weight for multi-scale mae loss.") parser.add_argument("--alpha", type=float, default=0.5, help="The alpha for multi-scale mae loss.") # Parameters for optimizer parser.add_argument("--optimizer", type=str, default="adam", choices=["sgd", "adam", "adamw", "radam"], help="The optimizer to use.") parser.add_argument("--lr", type=float, default=None, help="The learning rate for untrained modules.") parser.add_argument("--vpt_lr", type=float, default=None, help="The learning rate for the visual prompt tokens.") parser.add_argument("--adapter_lr", type=float, default=None, help="The learning rate for the adapter layers. If None, it will be set to the same as lr.") parser.add_argument("--lora_lr", type=float, default=None, help="The learning rate for the LoRA layers. If None, it will be set to the same as lr.") parser.add_argument("--backbone_lr", type=float, default=None, help="The learning rate for the pretrained backbone.") parser.add_argument("--weight_decay", type=float, default=None, help="The weight decay for untrained modules.") parser.add_argument("--vpt_weight_decay", type=float, default=None, help="The weight decay for the visual prompt tokens.") parser.add_argument("--adapter_weight_decay", type=float, default=None, help="The weight decay for the adapter layers. If None, it will be set to the same as weight_decay.") parser.add_argument("--lora_weight_decay", type=float, default=None, help="The weight decay for the LoRA layers. If None, it will be set to the same as weight_decay.") parser.add_argument("--backbone_weight_decay", type=float, default=None, help="The weight decay for the pretrained backbone.") # Parameters for learning rate scheduler parser.add_argument("--scheduler", type=str, default="cos_restarts", choices=["step", "cos", "cos_restarts"], help="The learning rate scheduler.") parser.add_argument("--warmup_epochs", type=int, default=25, help="Number of epochs for warmup. The learning rate will linearly change from warmup_lr to lr.") parser.add_argument("--warmup_lr", type=float, default=1e-5, help="Learning rate for warmup.") parser.add_argument("--eta_min", type=float, default=1e-6, help="Minimum learning rate.") # Step Decay parameters parser.add_argument("--gamma", type=float, default=0.925, help="The decay factor for step scheduler.") parser.add_argument("--step_size", type=int, default=20, help="The step size for step scheduler.") # Cosine Annealing with Warm Restarts parameters parser.add_argument("--T_0", type=int, default=5, help="Number of epochs for the first restart.") parser.add_argument("--T_mult", type=int, default=2, help="A factor increases T_0 after a restart.") # Cosine Annealing parameters parser.add_argument("--T_max", type=int, default=20, help="The maximum number of epochs for the cosine annealing scheduler.") # Parameters for training parser.add_argument("--ckpt_dir_name", type=str, default=None, help="The name of the checkpoint folder.") parser.add_argument("--total_epochs", type=int, default=1300, help="Number of epochs to train.") parser.add_argument("--eval_start", type=int, default=None, help="Start to evaluate after this number of epochs.") parser.add_argument("--eval_freq", type=float, default=None, help="Evaluate every this number of epochs. If < 1, evaluate every this fraction of an epoch.") parser.add_argument("--save_freq", type=int, default=50, help="Save checkpoint every this number of epochs. Could help reduce I/O.") parser.add_argument("--save_best_k", type=int, default=5, help="Save the best k checkpoints.") parser.add_argument("--amp", action="store_true", help="Use automatic mixed precision training.") parser.add_argument("--num_workers", type=int, default=os.cpu_count(), help="Number of workers for data loading.") parser.add_argument("--local_rank", type=int, default=-1, help="Local rank for distributed training.") parser.add_argument("--seed", type=int, default=42, help="Random seed for initialization.") def run(local_rank: int, nprocs: int, args: ArgumentParser) -> None: print(f"Rank {local_rank} process among {nprocs} processes.") init_seeds(args.seed + local_rank) setup(local_rank, nprocs) args.local_rank = local_rank print(f"Initialized successfully. Training with {nprocs} GPUs.") device = f"cuda:{local_rank}" if local_rank != -1 else "cuda:0" print(f"Using device: {device}.") ddp = nprocs > 1 # Define the bins and bin centers with open(os.path.join(current_dir, "configs", "bin_config.json"), "r") as f: bins = json.load(f)[args.dataset][str(args.block_size)] bins = [(float(b[0]), float(b[1])) for b in bins] with open(os.path.join(current_dir, "counts", f"{args.dataset}.json"), "r") as f: count_stats = json.load(f)[str(args.block_size)] count_stats = {int(k): int(v) for k, v in count_stats.items()} bin_centers, bin_counts = calc_bin_center(bins, count_stats) args.bins = bins args.bin_centers = bin_centers args.bin_counts = bin_counts model = get_model( model_info_path=os.path.join(args.ckpt_dir, "model_info.pth"), model_name=args.model_name, block_size=args.block_size, bins=bins, bin_centers=bin_centers, zero_inflated=args.reg_loss == "zipnll" or args.aux_loss == "zipnll", clip_weight_name=args.clip_weight_name, num_vpt=args.num_vpt, vpt_drop=args.vpt_drop, adapter=args.adapter, adapter_reduction=args.adapter_reduction, lora=args.lora, lora_rank=args.lora_rank, lora_alpha=args.lora_alpha, lora_dropout=args.lora_dropout, input_size=args.input_size, norm=args.norm, act=args.act, ).to(device) total_params = sum(p.numel() for p in model.parameters()) total_trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) total_nontrainable_params = total_params - total_trainable_params grad_scaler = GradScaler(device=device) if args.amp else None loss_fn = get_loss_fn(args) optimizer, scheduler = get_optimizer(args, model) model, optimizer, scheduler, grad_scaler, start_epoch, loss_info, hist_val_scores, best_val_scores = load_checkpoint(args, model, optimizer, scheduler, grad_scaler) model = DDP(nn.SyncBatchNorm.convert_sync_batchnorm(model), device_ids=[local_rank], output_device=local_rank) if ddp else model if local_rank == 0: writer = get_writer(args.ckpt_dir) logger = get_logger(os.path.join(args.ckpt_dir, "train.log")) logger.info(get_config(vars(args), mute=False)) logger.info(f"Total parameters: {total_params:,}\nTrainable parameters: {total_trainable_params:,}\nNon-trainable parameters: {total_nontrainable_params:,}\n") train_loader, sampler = get_dataloader(args, split="train") val_loader = get_dataloader(args, split="val") for epoch in range(start_epoch, args.total_epochs + 1): # start from 1 if local_rank == 0: message = f"\tlr: {optimizer.param_groups[0]['lr']:.3e}" log(logger, epoch, args.total_epochs, message=message) if sampler is not None: sampler.set_epoch(epoch) if args.eval_freq < 1: eval_model = epoch >= args.eval_start if eval_model: model, optimizer, grad_scaler, loss_info, curr_val_scores, curr_weights = train( model=model, data_loader=train_loader, loss_fn=loss_fn, optimizer=optimizer, grad_scaler=grad_scaler, device=device, rank=local_rank, nprocs=nprocs, eval_data_loader=val_loader, eval_freq=args.eval_freq, sliding_window=args.sliding_window, max_input_size=args.max_input_size, window_size=args.input_size, stride=args.stride, max_num_windows=args.max_num_windows, ) scheduler.step() barrier(ddp) else: model, optimizer, grad_scaler, loss_info, _, _ = train( model=model, data_loader=train_loader, loss_fn=loss_fn, optimizer=optimizer, grad_scaler=grad_scaler, device=device, rank=local_rank, nprocs=nprocs, ) scheduler.step() barrier(ddp) else: model, optimizer, grad_scaler, loss_info, _, _ = train( model=model, data_loader=train_loader, loss_fn=loss_fn, optimizer=optimizer, grad_scaler=grad_scaler, device=device, rank=local_rank, nprocs=nprocs, ) scheduler.step() barrier(ddp) eval_model = (epoch >= args.eval_start) and ((epoch - args.eval_start) % args.eval_freq == 0) if eval_model: curr_val_scores = evaluate( model=model, data_loader=val_loader, sliding_window=args.sliding_window, max_input_size=args.max_input_size, window_size=args.input_size, stride=args.stride, max_num_windows=args.max_num_windows, device=device, amp=args.amp, local_rank=local_rank, nprocs=nprocs ) state_dict = deepcopy(model.module.state_dict() if ddp else model.state_dict()) curr_weights = {k: state_dict for k in curr_val_scores.keys()} # copy the state_dict if local_rank == 0: update_train_result(epoch, loss_info, writer) log(logger, None, None, loss_info=loss_info, message="\n" * 2 if not eval_model else None) if eval_model: hist_val_scores, best_val_scores = update_eval_result( epoch=epoch, curr_scores=curr_val_scores, hist_scores=hist_val_scores, best_scores=best_val_scores, model_info={"config": model.module.config if ddp else model.config, "weights": curr_weights}, writer=writer, ckpt_dir=args.ckpt_dir, ) log(logger, None, None, None, curr_val_scores, best_val_scores, message="\n" * 3) if local_rank == 0 and (epoch % args.save_freq == 0): save_checkpoint( epoch + 1, model.module.state_dict() if ddp else model.state_dict(), optimizer.state_dict(), scheduler.state_dict() if scheduler is not None else None, grad_scaler.state_dict() if grad_scaler is not None else None, loss_info, hist_val_scores, best_val_scores, args.ckpt_dir, ) barrier(ddp) if local_rank == 0: writer.close() print("Training completed. Best scores:") for k in best_val_scores.keys(): scores = " ".join([f"{best_val_scores[k][i]:.4f};" for i in range(len(best_val_scores[k]))]) print(f" {k}: {scores}. \t Mean: {np.mean(best_val_scores[k]):.4f}") cleanup(ddp) def main(): args = parser.parse_args() args.dataset = standardize_dataset_name(args.dataset) dataset_config_path = os.path.join(current_dir, "configs", f"{args.dataset}.yaml") with open(dataset_config_path, "r") as f: dataset_config = yaml.safe_load(f) for k, v in dataset_config.items(): if k in vars(args) and vars(args)[k] is None: vars(args)[k] = v # Sliding window prediction will be used if args.sliding_window is True, or when the image size is larger than args.max_input_size args.stride = args.stride or args.input_size assert args.model_name in ["ebc_p", "ebc_n", "ebc_t", "ebc_s", "ebc_b"], f"Expected model_name to be one of ['ebc_p', 'ebc_n', 'ebc_t', 'ebc_s', 'ebc_b'], got {args.model_name}." if args.model_name == "ebc_p": # pico args.model_name = "mobilenetv4_conv_small_050" elif args.model_name == "ebc_n": # nano args.model_name = "mobilenetv4_conv_small" elif args.model_name == "ebc_t": # tiny args.model_name = "mobilenetv4_conv_medium" elif args.model_name == "ebc_s": args.model_name = "CLIP_MobileCLIP_S1" args.clip_weight_name = "datacompdr" else: # args.model_name == "ebc_b": if args.dataset == "sha": args.model_name = "CLIP_ViT_B_16" args.clip_weight_name = "openai" args.num_vpt = args.num_vpt or 96 elif args.dataset == "shb": args.model_name = "CLIP_RN50x4" args.clip_weight_name = "openai" else: args.model_name = "CLIP_convnext_base_w_320" args.clip_weight_name = "laion_aesthetic_s13b_b82k_augreg" if "CLIP_" not in args.model_name: args.clip_weight_name = None if args.adapter: assert not args.lora, "Cannot use both adapter and LoRA at the same time." args.num_vpt = None args.vpt_drop = None args.vpt_lr = None args.vpt_weight_decay = None args.lora_rank = None args.lora_alpha = None args.lora_dropout = None args.lora_lr = None args.lora_weight_decay = None args.backbone_lr = None args.backbone_weight_decay = None assert args.adapter_lr > 0, f"Expected adapter_lr to be greater than 0, got {args.adapter_lr}" assert args.adapter_weight_decay > 0, f"Expected adapter_weight_decay to be greater than 0, got {args.adapter_weight_decay}" assert args.adapter_reduction > 0, f"Expected adapter_reduction to be greater than 0, got {args.adapter_reduction}" else: args.adapter_reduction = None args.adapter_lr = None args.adapter_weight_decay = None if args.lora: assert not args.adapter, "Cannot use both adapter and LoRA at the same time." args.num_vpt = None args.vpt_drop = None args.vpt_lr = None args.vpt_weight_decay = None args.adapter_reduction = None args.adapter_lr = None args.adapter_weight_decay = None assert args.lora_rank > 0, f"Expected lora_rank to be greater than 0, got {args.lora_rank}" assert args.lora_alpha > 0, f"Expected lora_alpha to be greater than 0, got {args.lora_alpha}" assert 0 <= args.lora_dropout < 1, f"Expected lora_dropout to be between 0 and 1, got {args.lora_dropout}" assert args.lora_lr > 0, f"Expected lora_lr to be greater than 0, got {args.lora_lr}" assert args.lora_weight_decay > 0, f"Expected lora_weight_decay to be greater than or equal to 0, got {args.lora_weight_decay}" else: args.lora_rank = None args.lora_alpha = None args.lora_dropout = None args.lora_lr = None args.lora_weight_decay = None if "vit" not in args.model_name.lower(): args.num_vpt = None args.vpt_drop = None args.vpt_lr = None args.vpt_weight_decay = None else: args.backbone_lr = None args.backbone_weight_decay = None if not (args.lora or args.adapter): # Use VPT only if not using LoRA or adapter assert args.num_vpt > 0, f"Expected num_vpt to be greater than 0, got {args.num_vpt}" assert 0 <= args.vpt_drop < 1, f"Expected vpt_drop to be between 0 and 1, got {args.vpt_drop}" assert args.vpt_lr > 0, f"Expected vpt_lr to be greater than 0, got {args.vpt_lr}" assert args.vpt_weight_decay >= 0, f"Expected vpt_weight_decay to be greater than or equal to 0, got {args.vpt_weight_decay}" else: args.num_vpt = None args.vpt_drop = None args.vpt_lr = None args.vpt_weight_decay = None if args.reg_loss != "dm" and args.aux_loss != "dm": args.numItermax = None args.regularization = None if args.reg_loss != "msmae" and args.aux_loss != "msmae": args.scales = None args.min_scale_weight = None args.max_scale_weight = None args.alpha = None else: assert args.max_scale_weight >= args.min_scale_weight >= 0, f"Expected max_scale_weight to be greater than or equal to min_scale_weight, got {args.min_scale_weight} and {args.max_scale_weight}" assert 1 >= args.alpha > 0, f"Expected alpha to be between 0 and 1, got {args.alpha}" if args.scheduler == "step": args.T_0 = None args.T_mult = None args.T_max = None elif args.scheduler == "cos": args.step_size = None args.gamma = None args.T_0 = None args.T_mult = None else: args.step_size = None args.gamma = None args.T_max = None args.nprocs = torch.cuda.device_count() args.batch_size = int(args.batch_size / args.nprocs) args.num_workers = int(args.num_workers / args.nprocs) if args.ckpt_dir_name is None: hyperparams_dict = (vars(args)).copy() hyperparams_dict.pop("save_freq") hyperparams_dict.pop("save_best_k") hyperparams_dict.pop("local_rank") hyperparams_dict.pop("num_workers") hyperparams_dict.pop("nprocs") hyperparams_dict.pop("ckpt_dir_name") hyperparams_dict = json.dumps(hyperparams_dict, sort_keys=True) args.hash = hashlib.sha256(hyperparams_dict.encode("utf-8")).hexdigest() if "CLIP_" in args.model_name: ckpt_dir_name = f"{args.model_name}_{args.clip_weight_name}_" if "ViT" in args.model_name: ckpt_dir_name += f"{args.num_vpt}_{args.vpt_drop}_" else: ckpt_dir_name = f"{args.model_name}_{args.block_size}_" ckpt_dir_name += f"{args.weight_cls}+{args.weight_reg}x{(args.reg_loss)}+{args.weight_aux}{(args.aux_loss)}_" ckpt_dir_name += f"{args.optimizer}_{args.scheduler}_{args.hash[:8]}" else: ckpt_dir_name = args.ckpt_dir_name args.ckpt_dir = os.path.join(current_dir, "checkpoints", args.dataset, ckpt_dir_name) os.makedirs(args.ckpt_dir, exist_ok=True) print(f"Using {args.nprocs} GPUs.") if args.nprocs > 1: if args.in_memory_dataset: print("In-memory dataset is not supported for distributed training. Using disk-based dataset instead.") args.in_memory_dataset = False mp.spawn(run, nprocs=args.nprocs, args=(args.nprocs, args)) else: run(0, 1, args) if __name__ == "__main__": main()