|
|
import logging
|
|
|
import os
|
|
|
from dataclasses import dataclass
|
|
|
from pathlib import Path
|
|
|
from typing import List, Any
|
|
|
|
|
|
import pandas as pd
|
|
|
from omegaconf import DictConfig, OmegaConf
|
|
|
|
|
|
from nuplan.common.utils.io_utils import safe_path_to_string
|
|
|
from nuplan.common.utils.file_backed_barrier import distributed_sync
|
|
|
from nuplan.planning.script.builders.folder_builder import build_simulation_experiment_folder
|
|
|
from nuplan.planning.script.builders.logging_builder import build_logger
|
|
|
from nuplan.planning.script.builders.main_callback_builder import build_main_multi_callback
|
|
|
from nuplan.planning.simulation.main_callback.multi_main_callback import MultiMainCallback
|
|
|
from nuplan.planning.simulation.runner.abstract_runner import AbstractRunner
|
|
|
from nuplan.planning.simulation.runner.executor import execute_runners
|
|
|
from nuplan.planning.simulation.runner.runner_report import RunnerReport
|
|
|
from nuplan.planning.utils.multithreading.worker_pool import WorkerPool
|
|
|
|
|
|
from navsim.planning.script.builders.worker_pool_builder import build_worker
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
|
class CommonBuilder:
|
|
|
"""Common builder data."""
|
|
|
|
|
|
worker: WorkerPool
|
|
|
multi_main_callback: MultiMainCallback
|
|
|
output_dir: Path
|
|
|
profiler: Any
|
|
|
|
|
|
|
|
|
def update_config_for_simulation(cfg: DictConfig) -> None:
|
|
|
"""
|
|
|
Updates the config based on some conditions.
|
|
|
:param cfg: DictConfig. Configuration that is used to run the experiment.
|
|
|
"""
|
|
|
|
|
|
OmegaConf.set_struct(cfg, False)
|
|
|
if cfg.max_number_of_workers:
|
|
|
|
|
|
|
|
|
cfg.callbacks = [callback for callback in cfg.callback.values()]
|
|
|
|
|
|
|
|
|
OmegaConf.resolve(cfg)
|
|
|
|
|
|
|
|
|
OmegaConf.set_struct(cfg, True)
|
|
|
|
|
|
|
|
|
if cfg.log_config:
|
|
|
logger.info(f"Creating experiment: {cfg.experiment}")
|
|
|
logger.info("\n" + OmegaConf.to_yaml(cfg))
|
|
|
|
|
|
def set_up_common_builder(cfg: DictConfig, profiler_name: str) -> CommonBuilder:
|
|
|
"""
|
|
|
Set up a common builder when running simulations.
|
|
|
:param cfg: Hydra configuration.
|
|
|
:param profiler_name: Profiler name.
|
|
|
:return A data classes with common builders.
|
|
|
"""
|
|
|
|
|
|
multi_main_callback = build_main_multi_callback(cfg)
|
|
|
|
|
|
|
|
|
multi_main_callback.on_run_simulation_start()
|
|
|
|
|
|
|
|
|
update_config_for_simulation(cfg=cfg)
|
|
|
|
|
|
|
|
|
build_logger(cfg)
|
|
|
|
|
|
|
|
|
worker = build_worker(cfg)
|
|
|
|
|
|
|
|
|
build_simulation_experiment_folder(cfg=cfg)
|
|
|
|
|
|
|
|
|
output_dir = Path(cfg.output_dir)
|
|
|
|
|
|
return CommonBuilder(
|
|
|
worker=worker,
|
|
|
multi_main_callback=multi_main_callback,
|
|
|
output_dir=output_dir,
|
|
|
profiler=None,
|
|
|
)
|
|
|
|
|
|
def set_default_path() -> None:
|
|
|
"""
|
|
|
This function sets the default paths as environment variables if none are set.
|
|
|
These can then be used by Hydra, unless the user overwrites them from the command line.
|
|
|
"""
|
|
|
DEFAULT_DATA_ROOT = os.path.expanduser('~/nuplan/dataset')
|
|
|
DEFAULT_EXP_ROOT = os.path.expanduser('~/nuplan/exp')
|
|
|
|
|
|
if 'NUPLAN_DATA_ROOT' not in os.environ:
|
|
|
logger.info(f'Setting default NUPLAN_DATA_ROOT: {DEFAULT_DATA_ROOT}')
|
|
|
os.environ['NUPLAN_DATA_ROOT'] = DEFAULT_DATA_ROOT
|
|
|
|
|
|
if 'NUPLAN_EXP_ROOT' not in os.environ:
|
|
|
logger.info(f'Setting default NUPLAN_EXP_ROOT: {DEFAULT_EXP_ROOT}')
|
|
|
os.environ['NUPLAN_EXP_ROOT'] = DEFAULT_EXP_ROOT
|
|
|
|
|
|
def run_runners(
|
|
|
runners: List[AbstractRunner], common_builder: CommonBuilder, profiler_name: str, cfg: DictConfig
|
|
|
) -> None:
|
|
|
"""
|
|
|
Run a list of runners.
|
|
|
:param runners: A list of runners.
|
|
|
:param common_builder: Common builder.
|
|
|
:param profiler_name: Profiler name.
|
|
|
:param cfg: Hydra config.
|
|
|
"""
|
|
|
assert len(runners) > 0, 'No scenarios found to simulate!'
|
|
|
if common_builder.profiler:
|
|
|
|
|
|
common_builder.profiler.start_profiler(profiler_name)
|
|
|
|
|
|
logger.info('Executing runners...')
|
|
|
reports = execute_runners(
|
|
|
runners=runners,
|
|
|
worker=common_builder.worker,
|
|
|
num_gpus=cfg.number_of_gpus_allocated_per_simulation,
|
|
|
num_cpus=cfg.number_of_cpus_allocated_per_simulation,
|
|
|
exit_on_failure=cfg.exit_on_failure,
|
|
|
verbose=cfg.verbose,
|
|
|
)
|
|
|
logger.info('Finished executing runners!')
|
|
|
|
|
|
|
|
|
save_runner_reports(reports, common_builder.output_dir, cfg.runner_report_file)
|
|
|
|
|
|
|
|
|
distributed_sync(Path(cfg.output_dir / Path("barrier")), cfg.distributed_timeout_seconds)
|
|
|
|
|
|
|
|
|
if int(os.environ.get('NODE_RANK', 0)) == 0:
|
|
|
common_builder.multi_main_callback.on_run_simulation_end()
|
|
|
|
|
|
|
|
|
if common_builder.profiler:
|
|
|
common_builder.profiler.save_profiler(profiler_name)
|
|
|
|
|
|
def save_runner_reports(reports: List[RunnerReport], output_dir: Path, report_name: str) -> None:
|
|
|
"""
|
|
|
Save runner reports to a parquet file in the output directory.
|
|
|
Output directory can be local or s3.
|
|
|
:param reports: Runner reports returned from each simulation.
|
|
|
:param output_dir: Output directory to save the report.
|
|
|
:param report_name: Report name.
|
|
|
"""
|
|
|
report_dicts = []
|
|
|
for report in map(lambda x: x.__dict__, reports):
|
|
|
if (planner_report := report["planner_report"]) is not None:
|
|
|
planner_report_statistics = planner_report.compute_summary_statistics()
|
|
|
del report["planner_report"]
|
|
|
report.update(planner_report_statistics)
|
|
|
report_dicts.append(report)
|
|
|
df = pd.DataFrame(report_dicts)
|
|
|
df['duration'] = df['end_time'] - df['start_time']
|
|
|
|
|
|
save_path = output_dir / report_name
|
|
|
df.to_parquet(safe_path_to_string(save_path))
|
|
|
logger.info(f'Saved runner reports to {save_path}') |