Loading...
Searching...
No Matches
PlannerData.py
1#!/usr/bin/env python3
2
3
36
37# Author: Ryan Luna, Weihang Guo
38
39try:
40 import viser
41 import numpy as np
42
43 viser_available = True
44except ImportError:
45 print("Failed to import viser. PlannerData will not be visualized")
46 viser_available = False
47
48from ompl import base as ob
49from ompl import geometric as og
50
51
52# Create a narrow passage between y=[-3,3]. Only a 6x6x6 cube will be valid, centered at origin
53def isStateValid(state):
54 if state.getY() >= -3 and state.getY() <= 3:
55 return (
56 state.getX() >= -3
57 and state.getX() <= 3
58 and state.getZ() >= -3
59 and state.getZ() <= 3
60 )
61 return True
62
63
64def useViser(pd):
65 """
66 Visualize planner data graph in 3D using viser.
67 Nodes are displayed at their actual 3D positions, edges as line segments.
68 """
69 num_vertices = pd.numVertices()
70 num_edges = pd.numEdges()
71
72 if num_vertices == 0:
73 print("No vertices in planner data to visualize")
74 return
75
76 # Extract 3D positions and build graph structure
77 positions = []
78 vertex_colors = []
79 vertex_sizes = []
80 start_indices = []
81 goal_indices = []
82
83 # Build adjacency list and edge weights
84 adjacency = [[] for _ in range(num_vertices)]
85 edge_weights = {}
86
87 for v in range(num_vertices):
88 vertex = pd.getVertex(v)
89 state = vertex.getState()
90
91 # Extract 3D position (assuming SE3StateSpace)
92 x = state.getX()
93 y = state.getY()
94 z = state.getZ()
95 positions.append([x, y, z])
96
97 # Determine vertex color and size
98 if pd.isStartVertex(v):
99 start_indices.append(v)
100 vertex_colors.append([0, 1, 1]) # cyan
101 vertex_sizes.append(0.3)
102 elif pd.isGoalVertex(v):
103 goal_indices.append(v)
104 vertex_colors.append([0, 1, 0]) # green
105 vertex_sizes.append(0.3)
106 else:
107 vertex_colors.append([1, 1, 0]) # yellow
108 vertex_sizes.append(0.15)
109
110 # Get outgoing edges
111 outgoing = pd.getEdges(v)
112 for target in outgoing:
113 if pd.edgeExists(v, target):
114 # Compute edge weight as distance between states
115 state1 = pd.getVertex(v).getState()
116 state2 = pd.getVertex(target).getState()
117 si = pd.getSpaceInformation()
118 weight = si.distance(state1, state2)
119 adjacency[v].append((target, weight))
120 edge_weights[(v, target)] = weight
121
122 positions = np.array(positions, dtype=np.float32)
123 vertex_colors = np.array(vertex_colors, dtype=np.float32)
124
125 # Compute statistics
126 degrees = [
127 len(adjacency[v]) + len(pd.getIncomingEdges(v)) for v in range(num_vertices)
128 ]
129 avg_degree = np.mean(degrees) if degrees else 0
130 stddev_degree = np.std(degrees) if degrees else 0
131
132 all_weights = list(edge_weights.values())
133 avg_weight = np.mean(all_weights) if all_weights else 0
134 stddev_weight = np.std(all_weights) if all_weights else 0
135
136 # Compute connected components (using DFS)
137 def dfs_component(v, visited, component):
138 visited[v] = True
139 component.append(v)
140 for neighbor, _ in adjacency[v]:
141 if not visited[neighbor]:
142 dfs_component(neighbor, visited, component)
143 # Also check incoming edges (for undirected graph)
144 for neighbor in pd.getIncomingEdges(v):
145 if not visited[neighbor]:
146 dfs_component(neighbor, visited, component)
147
148 visited = [False] * num_vertices
149 components = []
150 for v in range(num_vertices):
151 if not visited[v]:
152 component = []
153 dfs_component(v, visited, component)
154 components.append(component)
155
156 print("---- PLANNER DATA STATISTICS ----")
157 print(f"{num_vertices} vertices and {num_edges} edges")
158 print(
159 f"Average vertex degree (in+out) = {avg_degree:.3f} St. Dev = {stddev_degree:.3f}"
160 )
161 print(f"Average edge weight = {avg_weight:.3f} St. Dev = {stddev_weight:.3f}")
162 print(f"Connected components: {len(components)}")
163
164 # Find shortest path using Dijkstra's algorithm
165 path_edges = set()
166 if start_indices and goal_indices:
167 # Use first start and first goal
168 start = start_indices[0]
169 goal = goal_indices[0]
170
171 # Dijkstra's algorithm
172 import heapq
173
174 dist = {v: float("inf") for v in range(num_vertices)}
175 prev = {v: None for v in range(num_vertices)}
176 dist[start] = 0
177 pq = [(0, start)]
178
179 while pq:
180 d, u = heapq.heappop(pq)
181 if d > dist[u]:
182 continue
183 if u == goal:
184 break
185
186 # Check outgoing edges
187 for v, weight in adjacency[u]:
188 alt = dist[u] + weight
189 if alt < dist[v]:
190 dist[v] = alt
191 prev[v] = u
192 heapq.heappush(pq, (alt, v))
193
194 # Check incoming edges (for undirected graph)
195 for v in pd.getIncomingEdges(u):
196 # Get weight for incoming edge by computing distance
197 if pd.edgeExists(v, u):
198 state1 = pd.getVertex(v).getState()
199 state2 = pd.getVertex(u).getState()
200 si = pd.getSpaceInformation()
201 weight = si.distance(state1, state2)
202 alt = dist[u] + weight
203 if alt < dist[v]:
204 dist[v] = alt
205 prev[v] = u
206 heapq.heappush(pq, (alt, v))
207
208 # Reconstruct path
209 if prev[goal] is not None or goal == start:
210 path = []
211 current = goal
212 while current is not None:
213 path.append(current)
214 current = prev[current]
215 path.reverse()
216
217 # Mark path edges
218 for i in range(len(path) - 1):
219 u, v = path[i], path[i + 1]
220 path_edges.add((u, v))
221 path_edges.add((v, u)) # Add reverse for undirected
222
223 # Create viser server
224 server = viser.ViserServer(port=8080, label="OMPL Planner Data Visualization")
225
226 # Add vertices as point cloud
227 server.scene.add_point_cloud(
228 "/graph/vertices", points=positions, colors=vertex_colors, point_size=0.2
229 )
230
231 # Add edges as line segments
232 # Separate path edges from regular edges for different styling
233 regular_edge_lines = []
234 path_edge_lines = []
235
236 # Track which edges we've added to avoid duplicates
237 added_edges = set()
238
239 for v in range(num_vertices):
240 for target, _ in adjacency[v]:
241 # Only add each edge once (v < target to avoid duplicates)
242 if (v, target) not in added_edges:
243 added_edges.add((v, target))
244 p1 = positions[v]
245 p2 = positions[target]
246
247 # Check if this edge is part of the path
248 if (v, target) in path_edges or (target, v) in path_edges:
249 path_edge_lines.append([p1, p2])
250 else:
251 regular_edge_lines.append([p1, p2])
252
253 # Add regular edges (black, thinner)
254 if regular_edge_lines:
255 regular_edge_lines = np.array(regular_edge_lines, dtype=np.float32)
256 # Reshape to (N, 2, 3) format expected by viser
257 regular_edge_lines = regular_edge_lines.reshape(-1, 2, 3)
258 # Use a single color tuple (3,) which applies to all segments
259 server.scene.add_line_segments(
260 "/graph/edges/regular",
261 points=regular_edge_lines,
262 colors=(0, 0, 0), # black
263 line_width=0.5,
264 )
265
266 # Add path edges (red, thicker)
267 if path_edge_lines:
268 path_edge_lines = np.array(path_edge_lines, dtype=np.float32)
269 # Reshape to (N, 2, 3) format expected by viser
270 path_edge_lines = path_edge_lines.reshape(-1, 2, 3)
271 # Use a single color tuple (3,) which applies to all segments
272 server.scene.add_line_segments(
273 "/graph/edges/path",
274 points=path_edge_lines,
275 colors=(1, 0, 0), # red
276 line_width=3.0,
277 )
278
279 print("\n3D graph visualization available at http://localhost:8080")
280 print("Press Ctrl+C to stop the server")
281
282 # Keep server running
283 try:
284 server.sleep_forever()
285 except KeyboardInterrupt:
286 server.stop()
287 print("\nServer stopped")
288
289
290def plan():
291 # construct the state space we are planning in
292 space = ob.SE3StateSpace()
293
294 # set the bounds for R^3 portion of SE(3)
295 bounds = ob.RealVectorBounds(3)
296 bounds.setLow(-10)
297 bounds.setHigh(10)
298 space.setBounds(bounds)
299
300 # define a simple setup class
301 ss = og.SimpleSetup(space)
302
303 # create a start state
304 start = space.allocState()
305 start.setX(-9)
306 start.setY(-9)
307 start.setZ(-9)
308 start.rotation().setIdentity()
309
310 # create a goal state
311 goal = space.allocState()
312 goal.setX(-9)
313 goal.setY(9)
314 goal.setZ(-9)
315 goal.rotation().setIdentity()
316
317 ss.setStateValidityChecker(isStateValid)
318
319 # set the start and goal states
320 ss.setStartAndGoalStates(start, goal, 0.05)
321
322 # planner = og.RRTConnect(ss.getSpaceInformation())
323 planner = og.PRM(ss.getSpaceInformation())
324 # planner = og.RRT(ss.getSpaceInformation())
325
326 ss.setPlanner(planner)
327 ss.setup()
328
329 # attempt to solve the problem
330 solved = ss.solve(20.0)
331
332 if solved:
333 # print the path to screen
334 print("Found solution:\n%s" % ss.getSolutionPath())
335
336 # Extracting planner data from most recent solve attempt
337 pd = ob.PlannerData(ss.getSpaceInformation())
338 ss.getPlannerData(pd)
339
340 # Computing weights of all edges based on state space distance
341 pd.computeEdgeWeights()
342
343 if viser_available:
344 useViser(pd)
345
346
347if __name__ == "__main__":
348 plan()
Object containing planner generated vertex and edge data. It is assumed that all vertices are unique,...
The lower and upper bounds for an Rn space.
A state space representing SE(3).
Probabilistic RoadMap planner.
Definition PRM.h:81
Create the set of classes typically needed to solve a geometric problem.
Definition SimpleSetup.h:63