import torch import argparse import json import time import os from copy import deepcopy from fvcore.nn import FlopCountAnalysis, parameter_count_table from torch.utils.data import DataLoader from datasets import load_dataset from transformers import AutoModel, AutoModelForImageClassification, AutoImageProcessor from tqdm import tqdm from PIL import Image def get_image_size(processor): dummy_image = Image.new("RGB", (256, 256), color="white") dummy_image = processor(dummy_image, return_tensors="pt") image_size = { 'height': dummy_image['pixel_values'].shape[-2], 'width': dummy_image['pixel_values'].shape[-1] } return image_size def benchmark_inference(model, processor, device, runs=20, warmup=5): image_size = get_image_size(processor) dummy_image = torch.randn(1, 3, image_size['height'], image_size['width'], device=device) model.eval() # Warmup for _ in range(warmup): _ = model(dummy_image) torch.cuda.synchronize() start = time.time() for _ in range(runs): _ = model(dummy_image) torch.cuda.synchronize() elapsed = (time.time() - start) * 1000 avg_latency = elapsed / runs throughput = 1000.0 / avg_latency return avg_latency, throughput def load_dataloader(args): dataset = load_dataset(args.data_path, split="validation") def collate_fn(batch): images = [item['image'].convert('RGB') for item in batch] labels = [item['label'] for item in batch] labels = torch.tensor(labels) return { 'image': images, 'label': labels } return DataLoader(dataset, batch_size=args.batch_size, shuffle=False, num_workers=4, collate_fn=collate_fn) def evaluate_model(args, dataloader, model_info): """Evaluate a model on ImageNet-1k validation set""" device = torch.device(args.device if torch.cuda.is_available() else "cpu") #model = AutoModel.from_pretrained(model_info["path"], trust_remote_code=True) model = AutoModelForImageClassification.from_pretrained( model_info["path"], #trust_remote_code=True ) processor = AutoImageProcessor.from_pretrained(model_info["path"]) image_size = get_image_size(processor) model.to(device) model.eval() # Initialize metrics correct_top1 = 0 correct_top5 = 0 total_samples = 0 with torch.no_grad(): for batch_idx, batch in enumerate(tqdm(dataloader, desc="Evaluating")): images = batch['image'] labels = batch['label'] inputs = processor(images, return_tensors="pt") inputs = {k: v.to(device) for k, v in inputs.items()} labels = labels.to(device) outputs = model(**inputs) torch.cuda.synchronize() logits = outputs.logits predictions = torch.softmax(logits, dim=-1) _, predicted_classes = torch.topk(predictions, 5, dim=1) correct_top1 += (predicted_classes[:, 0] == labels).sum().item() for i in range(5): correct_top5 += (predicted_classes[:, i] == labels).sum().item() total_samples += labels.size(0) top1_accuracy = (correct_top1 / total_samples) * 100 top5_accuracy = (correct_top5 / total_samples) * 100 avg_inference_time, throughput = benchmark_inference(model, processor, device) sample_inputs = processor(images[:1], return_tensors="pt") sample_inputs = {k: v.to(device) for k, v in sample_inputs.items()} # Calculate model parameters total_params = sum(p.numel() for p in model.parameters()) parameters_millions = total_params # Calculate model size model_size = sum(p.numel() * p.element_size() for p in model.parameters()) sample_tensor = sample_inputs['pixel_values'] flops = FlopCountAnalysis(model, sample_tensor).total() metrics = { "model": model_info["path"], "top1_accuracy": top1_accuracy, "top5_accuracy": top5_accuracy, "parameters": total_params, "flops": flops, "inference_time": avg_inference_time, "model_size": model_size, "license": model_info["license"] } return metrics def load_models_list(json_path): """Load models list from JSON file""" with open(json_path, 'r') as f: models = json.load(f) return models def load_existing_results(output_path): """Load existing results from JSONL file and return set of evaluated model paths""" evaluated_models = set() results = [] if os.path.exists(output_path): try: with open(output_path, 'r') as f: for line in f: if line.strip(): # Skip empty lines result = json.loads(line.strip()) evaluated_models.add(result['model']) results.append(result) print(f"Found {len(evaluated_models)} existing results in {output_path}") except (json.JSONDecodeError, KeyError) as e: print(f"Warning: Error reading existing results from {output_path}: {e}") print("Starting fresh evaluation...") return evaluated_models, results def save_result_to_jsonl(result, output_path): """Append a single evaluation result to JSONL file""" with open(output_path, 'a') as jsonlfile: jsonlfile.write(json.dumps(result) + '\n') print(f"Result saved to {output_path}") def save_results_to_jsonl(results, output_path): """Save evaluation results to JSONL file (overwrites existing file)""" if not results: print("No results to save.") return with open(output_path, 'w') as jsonlfile: for result in results: jsonlfile.write(json.dumps(result) + '\n') print(f"Results saved to {output_path}") if __name__ == "__main__": parser = argparse.ArgumentParser( prog='ImageNet-1k Evaluation', description='Evaluate models on ImageNet-1k validation set', epilog='Results will be saved to JSONL file') parser.add_argument('--data-path', default="ILSVRC/imagenet-1k", help='Path to ImageNet-1k dataset') parser.add_argument('--device', default="cuda:6", help='Device to use for evaluation (cuda/cpu)') parser.add_argument('--batch-size', type=int, default=128, help='Batch size for evaluation') parser.add_argument('--models-list', default="models_list.json", help='Path to JSON file containing models list') parser.add_argument('--output-path', default="imagenet_results.jsonl", help='Path to save evaluation results') args = parser.parse_args() # Override data path with absolute path #args.device = "cuda:6" args.data_path = "/data3/salah/datasets/imagenet-1k" # Load models list models_list = load_models_list(args.models_list) # Load existing results to avoid re-evaluating models evaluated_models, existing_results = load_existing_results(args.output_path) # Filter out models that have already been evaluated models_to_evaluate = [model for model in models_list if model['path'] not in evaluated_models] if len(models_to_evaluate) < len(models_list): skipped_count = len(models_list) - len(models_to_evaluate) print(f"Skipping {skipped_count} models that have already been evaluated") if not models_to_evaluate: print("All models have already been evaluated!") results = existing_results else: # Load dataset only if we have models to evaluate print("Loading dataset...") dataloader = load_dataloader(args) print(f"Dataset loaded with {len(dataloader)} batches") # Evaluate remaining models results = existing_results.copy() # Start with existing results for i, model_info in enumerate(models_to_evaluate): print(f"\n{'='*50}") print(f"Evaluating model {i+1}/{len(models_to_evaluate)}: {model_info['path']}") print(f"{'='*50}") metrics = evaluate_model(args, dataloader, model_info) results.append(metrics) # Save result immediately after each model evaluation save_result_to_jsonl(metrics, args.output_path) print(f"\nEvaluation complete! Results saved to {args.output_path}") # Print summary print("\nSummary:") for result in results: if result['top1_accuracy'] != -1: print(f" {result['model']}: {result['top1_accuracy']:.2f}% Top-1, {result['top5_accuracy']:.2f}% Top-5") else: print(f" {result['model']}: Failed to evaluate")