3from tabulate
import tabulate
5from pathlib
import Path
7from typing
import Union, List
8from functools
import partial
10from datetime
import datetime
16from vamp
import pointcloud
as vpc
17from ompl
import base
as ob
18from ompl
import geometric
as og
19from ompl
import tools
as ot
21from vamp_state_space
import (
24 VampStateValidityChecker,
27sys.path.insert(0, str(Path(__file__).parent.parent))
28from viser_visualizer
import ViserVisualizer
33 planner: str =
"rrtc",
34 dataset: str =
"problems.pkl",
35 problem: Union[str, List[str]] = [],
37 print_failures: bool =
False,
38 visualize: bool =
False,
39 pointcloud: bool =
False,
40 samples_per_object: int = 10000,
41 filter_radius: float = 0.02,
42 filter_cull: bool =
True,
43 planning_time: float = 1.0,
48 if robot
not in vamp.robots:
49 raise RuntimeError(f
"Robot {robot} does not exist in VAMP!")
52 robot_module = getattr(vamp, robot)
53 dimension = robot_module.dimension()
58 upper_bounds = robot_module.upper_bounds()
59 lower_bounds = robot_module.lower_bounds()
60 for i
in range(dimension):
61 bounds.setLow(i, lower_bounds[i])
62 bounds.setHigh(i, upper_bounds[i])
63 space.setBounds(bounds)
74 if planner.lower()
not in planner_map:
76 f
"Planner {planner} not recognized. Available: {list(planner_map.keys())}"
79 planner_class = planner_map[planner.lower()]
80 vamp_folder = Path(__file__).parent.parent.parent /
"external" /
"vamp"
81 problems_dir = vamp_folder /
"resources" / robot
82 pickle_path = problems_dir / dataset
85 if not pickle_path.exists():
86 print(f
"Pickle file not found at {pickle_path}, generating from tar.bz2...")
87 script_path = vamp_folder /
"resources" /
"problem_tar_to_pkl_json.py"
88 result = subprocess.run(
89 [sys.executable, str(script_path), f
"--robot={robot}"], check=
True
91 if result.returncode != 0:
92 raise RuntimeError(f
"Failed to generate pickle file using {script_path}")
93 print(f
"Successfully generated pickle file")
95 with open(pickle_path,
"rb")
as f:
96 problems = pickle.load(f)
98 problem_names = list(problems[
"problems"].keys())
99 if isinstance(problem, str):
103 problem = problem_names
105 for problem_name
in problem:
106 if problem_name
not in problem_names:
108 f
"Problem `{problem_name}` not available! Available problems: {problem_names}"
117 visualize_enabled = visualize
118 skip_to_next_problem_set =
False
120 vis = ViserVisualizer(robot_name=robot, robot_dimension=dimension)
125 timestamp = datetime.now().strftime(
"%Y-%m-%d_%H-%M-%S")
126 benchmark_dir = Path(
"benchmarks") / f
"benchmark_{timestamp}"
127 benchmark_dir.mkdir(parents=
True, exist_ok=
True)
130 f
"Benchmarking {robot} on first problem of each type with {n_trials} trials..."
132 print(f
"Results will be saved to: {benchmark_dir}")
135 planner_constructors = [
143 tick = time.perf_counter()
147 f
"Evaluating {planner_class.__name__} on problems: {problems['problems'].keys()} with robot {robot}..."
150 for name, pset
in problems[
"problems"].items():
151 if name
not in problem:
155 skip_to_next_problem_set =
False
159 print(f
"Evaluating {robot} on {name}: ")
160 progress_bar = tqdm(total=len(pset), desc=f
"Processing {name}")
161 for i, data
in enumerate(pset):
164 progress_bar.update(1)
166 if not data[
"valid"]:
173 r_min, r_max = robot_module.min_max_radii()
174 (env, original_pc, filtered_pc, filter_time, build_time) = (
175 vpc.problem_dict_to_pointcloud(
186 pointcloud_results = {
187 "original_pointcloud_size": len(original_pc),
188 "filtered_pointcloud_size": len(filtered_pc),
189 "filter_time": pd.Timedelta(nanoseconds=filter_time),
190 "capt_build_time": pd.Timedelta(nanoseconds=build_time),
193 env = vamp.problem_dict_to_vamp(data)
195 for trial
in range(trials):
197 dimension = robot_module.dimension()
198 space = VampStateSpace(robot=robot_module)
201 motion_validator = VampMotionValidator(
202 si=si, env=env, robot=robot_module
204 state_validity_checker = VampStateValidityChecker(
205 si=si, env=env, robot=robot_module
209 si.setMotionValidator(motion_validator)
210 si.setStateValidityChecker(state_validity_checker)
213 ompl_planner = planner_class(si)
214 ss.setPlanner(ompl_planner)
217 start = si.allocState()
218 start[0:dimension] = data[
"start"]
221 goal = si.allocState()
222 goal[0:dimension] = data[
"goals"][0]
224 ss.setStartAndGoalStates(start, goal)
229 representation =
"pointcloud" if pointcloud
else "mesh"
230 ompl_benchmark.addExperimentParameter(
"robot",
"TEXT", robot)
231 ompl_benchmark.addExperimentParameter(
"environment",
"TEXT", name)
232 ompl_benchmark.addExperimentParameter(
233 "representation",
"TEXT", representation
236 for planner_constructor
in planner_constructors:
237 ompl_benchmark.addPlanner(planner_constructor(si))
241 req.maxTime = planning_time
243 req.runCount = n_trials
244 req.displayProgress =
True
246 print(f
" Running benchmark with {n_trials} trials...")
247 ompl_benchmark.benchmark(req)
250 file = str(f
"{robot}_{name}_{representation}_benchmark")
251 log_file = str(benchmark_dir / (file +
".log"))
252 db_file = str(benchmark_dir / (file +
".db"))
254 ompl_benchmark.saveResultsToFile(log_file)
255 print(f
" Saved log to: {log_file}")
258 ot.readBenchmarkLog(db_file, [log_file], moveitformat=
False)
259 print(f
" Saved database to: {db_file}")
264 planning_start = time.perf_counter()
265 result = ss.solve(planning_time)
266 planning_elapsed = time.perf_counter() - planning_start
268 if not ss.haveExactSolutionPath():
270 print(f
"Failed to find exact solution for problem {i}")
274 path = ss.getSolutionPath()
275 initial_cost = path.length()
278 simplify_start = time.perf_counter()
279 ss.simplifySolution()
280 simplify_elapsed = time.perf_counter() - simplify_start
282 simplified_path = ss.getSolutionPath()
283 simplified_cost = simplified_path.length()
286 "planning_time": pd.Timedelta(microseconds=planning_elapsed * 1e6),
287 "simplification_time": pd.Timedelta(
288 microseconds=simplify_elapsed * 1e6
290 "total_time": pd.Timedelta(
291 microseconds=(planning_elapsed + simplify_elapsed) * 1e6
293 "initial_path_cost": initial_cost,
294 "simplified_path_cost": simplified_cost,
295 "planning_iterations": 0,
296 "avg_time_per_iteration": 0.0,
300 trial_result.update(pointcloud_results)
302 results.append(trial_result)
305 if visualize_enabled
and vis
is not None:
307 if skip_to_next_problem_set:
315 original_pc = np.array(original_pc)
316 z_values = original_pc[:, 2]
317 z_min, z_max = z_values.min(), z_values.max()
318 z_range = z_max - z_min
321 z_normalized = np.clip(
322 (z_values - z_min) / (z_max - z_min), 0, 1
326 colors = np.zeros((len(original_pc), 3))
327 colors[:, 0] = np.maximum(
328 0, 1 - 2 * np.abs(z_normalized - 0.25)
330 colors[:, 1] = np.maximum(
331 0, 1 - 2 * np.abs(z_normalized - 0.5)
333 colors[:, 2] = np.maximum(
334 0, 1 - 2 * np.abs(z_normalized - 0.75)
336 vis.add_point_cloud(original_pc, color=colors, point_size=0.01)
338 vis.load_mbm_environment(
339 data, padding=0.0, color=(0.8, 0.4, 0.2, 0.75)
343 simplified_path.interpolate(
344 int(simplified_path.getStateCount() * 15)
346 states = simplified_path.getStates()
347 trajectory = np.array(
348 [list(state[0:dimension])
for state
in states]
352 vis.visualize_trajectory(trajectory)
355 f
"\nVisualizing problem {name} [{i}] - Press any key to continue, 'q' to disable viz, 'w' to skip to next problem set"
357 key = vis.play_until_key_pressed(key=
"any", dt=1 / 60)
360 print(
"Visualization disabled for remaining problems")
361 visualize_enabled =
False
363 print(f
"Skipping to next problem set...")
364 skip_to_next_problem_set =
True
372 failed_problems += len(failures)
376 print(f
" Invalid problems: {invalids}")
379 print(f
" Failed on {failures}")
382 print(f
"All benchmarks saved in {benchmark_dir}")
385 tock = time.perf_counter()
387 df = pd.DataFrame.from_dict(results)
390 df[
"planning_time"] = df[
"planning_time"].dt.microseconds
391 df[
"simplification_time"] = df[
"simplification_time"].dt.microseconds
392 df[
"avg_time_per_iteration"] = df[
"planning_iterations"] / df[
"planning_time"]
396 df[
"total_build_and_plan_time"] = (
397 df[
"total_time"] + df[
"filter_time"] + df[
"capt_build_time"]
399 df[
"filter_time"] = df[
"filter_time"].dt.microseconds / 1e3
400 df[
"capt_build_time"] = df[
"capt_build_time"].dt.microseconds / 1e3
401 df[
"total_build_and_plan_time"] = (
402 df[
"total_build_and_plan_time"].dt.microseconds / 1e3
405 df[
"total_time"] = df[
"total_time"].dt.microseconds
411 "simplification_time",
413 "planning_iterations",
414 "avg_time_per_iteration",
416 ].describe(percentiles=[0.25, 0.5, 0.75, 0.95])
417 time_stats.drop(index=[
"count"], inplace=
True)
422 "simplified_path_cost",
424 ].describe(percentiles=[0.25, 0.5, 0.75, 0.95])
425 cost_stats.drop(index=[
"count"], inplace=
True)
428 pointcloud_stats = df[
432 "total_build_and_plan_time",
434 ].describe(percentiles=[0.25, 0.5, 0.75, 0.95])
435 pointcloud_stats.drop(index=[
"count"], inplace=
True)
442 "Planning Time (μs)",
443 "Simplification Time (μs)",
446 "Time per Iter. (μs)",
456 " Initial Cost (L2)",
457 " Simplified Cost (L2)",
469 " CAPT Build Time (ms)",
477 f
"Solved / Valid / Total # Problems: {valid_problems - failed_problems} / {valid_problems} / {total_problems}"
479 print(f
"Completed all problems in {df['total_time'].sum() / 1000:.3f} milliseconds")
481 f
"Total time including Python overhead: {(tock - tick) * 1000:.3f} milliseconds"
485if __name__ ==
"__main__":
The lower and upper bounds for an Rn space.
A state space representing Rn. The distance function is the L2 norm.
Batch Informed Trees (BIT*).
Kinematic Planning by Interior-Exterior Cell Exploration.
Lazy Bi-directional KPIECE with one level of discretization.
Probabilistic RoadMap planner.
RRT-Connect (RRTConnect).
Rapidly-exploring Random Trees.
Optimal Rapidly-exploring Random Trees.
Create the set of classes typically needed to solve a geometric problem.