Spaces:
Running
on
Zero
Running
on
Zero
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # This source code is licensed under the license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| import json | |
| import os | |
| from dataclasses import dataclass, field | |
| import hydra | |
| import numpy as np | |
| import torch | |
| from omegaconf import OmegaConf | |
| from cotracker.datasets.tap_vid_datasets import TapVidDataset | |
| from cotracker.datasets.dr_dataset import DynamicReplicaDataset | |
| from cotracker.datasets.utils import collate_fn | |
| from cotracker.models.evaluation_predictor import EvaluationPredictor | |
| from cotracker.evaluation.core.evaluator import Evaluator | |
| from cotracker.models.build_cotracker import ( | |
| build_cotracker, | |
| ) | |
| class DefaultConfig: | |
| # Directory where all outputs of the experiment will be saved. | |
| exp_dir: str = "./outputs" | |
| # Name of the dataset to be used for the evaluation. | |
| dataset_name: str = "tapvid_davis_first" | |
| # The root directory of the dataset. | |
| dataset_root: str = "./" | |
| # Path to the pre-trained model checkpoint to be used for the evaluation. | |
| # The default value is the path to a specific CoTracker model checkpoint. | |
| checkpoint: str = "./checkpoints/cotracker2.pth" | |
| # EvaluationPredictor parameters | |
| # The size (N) of the support grid used in the predictor. | |
| # The total number of points is (N*N). | |
| grid_size: int = 5 | |
| # The size (N) of the local support grid. | |
| local_grid_size: int = 8 | |
| # A flag indicating whether to evaluate one ground truth point at a time. | |
| single_point: bool = True | |
| # The number of iterative updates for each sliding window. | |
| n_iters: int = 6 | |
| seed: int = 0 | |
| gpu_idx: int = 0 | |
| # Override hydra's working directory to current working dir, | |
| # also disable storing the .hydra logs: | |
| hydra: dict = field( | |
| default_factory=lambda: { | |
| "run": {"dir": "."}, | |
| "output_subdir": None, | |
| } | |
| ) | |
| def run_eval(cfg: DefaultConfig): | |
| """ | |
| The function evaluates CoTracker on a specified benchmark dataset based on a provided configuration. | |
| Args: | |
| cfg (DefaultConfig): An instance of DefaultConfig class which includes: | |
| - exp_dir (str): The directory path for the experiment. | |
| - dataset_name (str): The name of the dataset to be used. | |
| - dataset_root (str): The root directory of the dataset. | |
| - checkpoint (str): The path to the CoTracker model's checkpoint. | |
| - single_point (bool): A flag indicating whether to evaluate one ground truth point at a time. | |
| - n_iters (int): The number of iterative updates for each sliding window. | |
| - seed (int): The seed for setting the random state for reproducibility. | |
| - gpu_idx (int): The index of the GPU to be used. | |
| """ | |
| # Creating the experiment directory if it doesn't exist | |
| os.makedirs(cfg.exp_dir, exist_ok=True) | |
| # Saving the experiment configuration to a .yaml file in the experiment directory | |
| cfg_file = os.path.join(cfg.exp_dir, "expconfig.yaml") | |
| with open(cfg_file, "w") as f: | |
| OmegaConf.save(config=cfg, f=f) | |
| evaluator = Evaluator(cfg.exp_dir) | |
| cotracker_model = build_cotracker(cfg.checkpoint) | |
| # Creating the EvaluationPredictor object | |
| predictor = EvaluationPredictor( | |
| cotracker_model, | |
| grid_size=cfg.grid_size, | |
| local_grid_size=cfg.local_grid_size, | |
| single_point=cfg.single_point, | |
| n_iters=cfg.n_iters, | |
| ) | |
| if torch.cuda.is_available(): | |
| predictor.model = predictor.model.cuda() | |
| # Setting the random seeds | |
| torch.manual_seed(cfg.seed) | |
| np.random.seed(cfg.seed) | |
| # Constructing the specified dataset | |
| curr_collate_fn = collate_fn | |
| if "tapvid" in cfg.dataset_name: | |
| dataset_type = cfg.dataset_name.split("_")[1] | |
| if dataset_type == "davis": | |
| data_root = os.path.join(cfg.dataset_root, "tapvid_davis", "tapvid_davis.pkl") | |
| elif dataset_type == "kinetics": | |
| data_root = os.path.join( | |
| cfg.dataset_root, "/kinetics/kinetics-dataset/k700-2020/tapvid_kinetics" | |
| ) | |
| test_dataset = TapVidDataset( | |
| dataset_type=dataset_type, | |
| data_root=data_root, | |
| queried_first=not "strided" in cfg.dataset_name, | |
| ) | |
| elif cfg.dataset_name == "dynamic_replica": | |
| test_dataset = DynamicReplicaDataset(sample_len=300, only_first_n_samples=1) | |
| # Creating the DataLoader object | |
| test_dataloader = torch.utils.data.DataLoader( | |
| test_dataset, | |
| batch_size=1, | |
| shuffle=False, | |
| num_workers=14, | |
| collate_fn=curr_collate_fn, | |
| ) | |
| # Timing and conducting the evaluation | |
| import time | |
| start = time.time() | |
| evaluate_result = evaluator.evaluate_sequence( | |
| predictor, | |
| test_dataloader, | |
| dataset_name=cfg.dataset_name, | |
| ) | |
| end = time.time() | |
| print(end - start) | |
| # Saving the evaluation results to a .json file | |
| evaluate_result = evaluate_result["avg"] | |
| print("evaluate_result", evaluate_result) | |
| result_file = os.path.join(cfg.exp_dir, f"result_eval_.json") | |
| evaluate_result["time"] = end - start | |
| print(f"Dumping eval results to {result_file}.") | |
| with open(result_file, "w") as f: | |
| json.dump(evaluate_result, f) | |
| cs = hydra.core.config_store.ConfigStore.instance() | |
| cs.store(name="default_config_eval", node=DefaultConfig) | |
| def evaluate(cfg: DefaultConfig) -> None: | |
| os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" | |
| os.environ["CUDA_VISIBLE_DEVICES"] = str(cfg.gpu_idx) | |
| run_eval(cfg) | |
| if __name__ == "__main__": | |
| evaluate() | |