如果 u 不是起始节点 sstart,则计算 rhs(u)=mins′∈Pred(u)(g(s′)+cost(s′,u))。
如果 u 已经在 U 中,先将其移除。
如果 g(u) \neq rhs(u)(即 u 处于不一致状态),则将 u 及其计算出的 Key(u) 重新插入到 U 中。
ComputeShortestPath()
这是LPA* 的主循环,它迭代地从 U 中提取优先级最高的节点,直到起始节点 sstart 变为一致且 U 中的所有节点优先级都高于 sstart。
循环:当 U 非空 且 Key(s_start) 的值大于 U.top() 的值 或 rhs(s_start) \neq g(s_start) 时,执行以下步骤:
a. 从 U 中取出优先级最高的节点 u 及其键 k_old。
b. 如果 k_old < Key(u)(这意味着 u 的优先级在处理过程中升高了,它可能被其他操作影响),则将 u 重新插入 U。
c. 如果 g(u) > rhs(u)(节点 u 过一致,代价可以降低):
i. 设置 g(u) = rhs(u)。
ii. 对于 u 的所有后继节点 s′′∈Succ(u),调用 UpdateVertex(s'')。
d. 如果 g(u) < rhs(u)(节点 u 欠一致,代价需要升高):
i. 设置 g(u) = \infty(暂时将其标记为无穷大,待重新计算)。
ii. 对于 u 及其所有后继节点 s′′∈Succ(u),调用 UpdateVertex(s'')。
D* Lite 算法是 LPA* 的一个巧妙变体,专门为动态环境下的机器人路径规划而设计。它的核心思想是:机器人从起点向目标移动,但搜索是从目标点向起点进行的(逆向LPA*),这样当机器人移动时,其当前位置(起始点)的变化不会导致整个 g 值和 rhs 值的重计算,因为这些值代表的是到固定目标点的距离。
核心思想
逆向搜索: D* Lite 的 g(s) 和 rhs(s) 值都表示从节点 s 到 目标节点 sgoal 的最短路径代价。这意味着当机器人从 sstart 移动时,g(s) 和 rhs(s) 不需要重新计算,因为它们是相对于固定目标点的。
动态起始点: 机器人的当前位置是动态变化的“起始点”。D* Lite 通过引入一个全局的“键偏移量” Km 来处理起始点的移动,而无需重新计算所有节点的启发式值。
一个更清晰的解释:D* Lite通过逆向搜索维护了从所有节点到终点的最短路径信息 (g,rhs 值)。当机器人移动时,其新的位置成为了新的起点。传统的A或LPA如果用于前向搜索,会因为起点变化而使得所有 g 值都无效。D* Lite 解决了这个问题:它通过一个全局偏移量 Km 来“修正”优先级队列中所有节点的优先级。
具体来说,当机器人从 sold 移动到 snew 时,D* Lite 会将 Km 增加 cost(sold,snew)。这意味着对于优先队列中的任何一个节点 s,其旧的优先级键 min(g(s),rhs(s))+h(s,sold) 被“有效”地转换为了 min(g(s),rhs(s))+h(s,snew)+Km′, 其中 Km′ 就是累积的移动代价。通过这种方式,D* Lite避免了遍历所有节点来更新其 h 值。
关键变量与函数
g(s): 从节点 s 到 目标节点 sgoal 的当前已知最短路径代价。
rhs(s): 节点 s 的一步看前代价。对于 sgoal,rhs(s_goal) = 0$。对于其他 $s$,rhs(s) = \min_{s’ \in Succ(s)} (g(s’) + cost(s, s’))$。这里 Succ(s) 是 s 的所有后继节点集合(即从 s 出发能到达的节点),cost(s,s′) 是从 s 到 s′ 的边代价。
其中 h(s,sstart) 是从节点 s 到当前机器人位置 sstart 的启发式估计(例如,欧几里得距离或曼哈顿距离)。
算法流程
D* Lite 算法可以分为初始化阶段、路径计算/更新阶段和机器人移动阶段。
1. 初始化 Initialize()
设置所有节点 s 的 g(s) = \infty,rhs(s) = \infty。
设置目标节点 sgoal 的 `rhs(s_goal) = 0$。
初始化 Km = 0。
将 sgoal 及其键 CalculateKey(s_goal) 插入优先级队列 U。
2. 计算/更新路径 ComputeShortestPath()
这个函数与 LPA* 的 ComputeShortestPath() 基本相同,只是它基于 D* Lite 的键和 sstart。
循环条件:当 U 非空 且 Key(s_start) 的值大于 U.top().key 的值 或 rhs(s_start) \neq g(s_start) 时,执行以下步骤:
a. 从 U 中取出优先级最高的节点 u 及其键 k_old。
b. 如果 k_old < CalculateKey(u),这意味着 u 的优先级在处理过程中升高了(可能是因为 g(u) 或 rhs(u) 在其他 UpdateVertex 调用中发生了变化),则将 u 重新插入 U。
c. 如果 g(u) > rhs(u)(节点 u 过一致,代价可以降低):
i. 设置 g(u) = rhs(u)。
ii. 对于 u 的所有前驱节点 s′∈Pred(u),调用 UpdateVertex(s')。
d. 如果 g(u) < rhs(u)(节点 u 欠一致,代价需要升高):
i. 设置 g(u) = \infty。
ii. 对于 u 及其所有前驱节点 s′∈Pred(u),调用 UpdateVertex(s')。
最优性与完备性 (Optimality and Completeness): 在给定当前环境信息和可接受的启发式函数下,D* Lite 能够保证找到从当前位置到目标的最短路径。如果路径存在,它就能找到;如果不存在,它也能判断。
内存效率 (Memory Efficiency): 相较于某些其他动态规划方法,D* Lite 仅需要维护每个节点固定的 g 和 rhs 值,以及一个优先队列,内存开销相对可控。
应用场景
D* Lite 的强大功能使其成为许多需要实时路径规划的应用的首选:
移动机器人导航 (Mobile Robot Navigation): 这是D* Lite 最典型的应用场景。机器人通过传感器(如激光雷达、摄像头)感知环境,当发现新的障碍物或地图更新时,D* Lite 能够快速重新规划路径,确保机器人安全高效地到达目的地。例如,SLAM(同步定位与建图)系统中的路径规划模块。
无人驾驶汽车 (Autonomous Driving): 在复杂的城市交通环境中,车辆需要不断地对前方路况、其他车辆、行人以及突发事件(如施工区域)作出反应。D* Lite 可以用于实时路径调整和避障。
物流配送与仓储机器人 (Logistics and Warehouse Robotics): 在自动化仓库中,机器人需要高效地搬运货物。随着货物布局和任务的变化,以及其他机器人或工作人员的移动,D* Lite 可以帮助机器人动态规划路径,避免碰撞和拥堵。
游戏人工智能 (Game AI): 在游戏中,NPC(非玩家角色)需要根据玩家的行为、环境破坏或新生成区域来动态规划行动路径,D* Lite 可以为复杂的AI行为提供高效的路径解决方案。
灾后救援与探索 (Disaster Relief and Exploration): 在地震、火灾等灾难现场,环境信息可能极其有限且不断变化。救援机器人可以使用 D* Lite 算法在危险且不确定的环境中规划安全的探索和救援路径。
function CalculateKey(s) // h(s, s_start) 是从s到当前机器人位置s_start的启发式距离 // min(g[s], rhs[s]) + h(s, s_start) + Km 是主键 // min(g[s], rhs[s]) 是次键,用于打破平局 return [min(g[s], rhs[s]) + h(s, s_start) + Km, min(g[s], rhs[s])]
function UpdateVertex(u) if u != s_goal: rhs[u] = infinity // 暂时设置为无穷大 // 寻找最优后继节点s',计算从u到目标的最优一步看前代价 for each s_prime in Succ(u): // Succ(u) 是u的所有后继节点 rhs[u] = min(rhs[u], g[s_prime] + cost(u, s_prime))
// 如果u在优先级队列中,先移除 if U.contains(u): U.remove(u)
// 如果u不一致,重新插入U if g[u] != rhs[u]: U.insert(u, CalculateKey(u))
function ComputeShortestPath() while U.top().key < CalculateKey(s_start) or rhs[s_start] != g[s_start]: k_old = U.top().key u = U.pop() // 移除优先级最高的节点
if k_old < CalculateKey(u): // 如果u的优先级变高了,重新插入 U.insert(u, CalculateKey(u)) else if g[u] > rhs[u]: // 过一致状态,g值可以降低 g[u] = rhs[u] for each s_prime in Pred(u): // Pred(u) 是u的所有前驱节点 UpdateVertex(s_prime) else: // 欠一致状态,g值需要升高 g[u] = infinity // 暂时设置为无穷大 // 对u及其所有前驱节点进行更新 for each s_prime in Pred(u) union {u}: // 包括u自身 UpdateVertex(s_prime)
// 主程序循环 function MainLoop() Initialize() ComputeShortestPath() // 首次规划
s_current = s_start_initial while s_current != s_goal: // 如果无法到达目标,则中断 if g[s_current] == infinity: print("No path to goal!") break
// 寻找下一步最优节点 s_next = null min_cost = infinity for each s_prime in Succ(s_current): c = cost(s_current, s_prime) + g[s_prime] if c < min_cost: min_cost = c s_next = s_prime
// 移动机器人 Km = Km + cost(s_current, s_next) // 累加实际移动代价 s_current = s_next // 更新当前机器人位置s_start
// 模拟环境变化:发现新障碍物或障碍物消失 // affected_edges = detect_map_changes() // for each (u, v) in affected_edges: // Update cost(u, v) // 更新地图中的边代价 // UpdateVertex(u) // UpdateVertex(v)
self.nodes = {} # Stores Node objects by (x,y) for quick access for r inrange(self.rows): for c inrange(self.cols): self.nodes[(r, c)] = Node(r, c) # Ensure start and goal are correctly referenced from nodes dict self.start = self.nodes[(start_pos[0], start_pos[1])] self.goal = self.nodes[(goal_pos[0], goal_pos[1])]
def_update_vertex(self, u): if u != self.goal: old_rhs = u.rhs u.rhs = float('inf') # Temporarily reset # Recalculate rhs based on successors # D* Lite looks at successors because g/rhs are from node to goal for s_prime inself._get_neighbors(u): u.rhs = min(u.rhs, s_prime.g + self._get_cost(u, s_prime)) if old_rhs != u.rhs: # Only update if rhs actually changed # If u is in U, remove it if u in [item for item inself.U]: # linear scan for removal (inefficient, but for demo) self.U = [item for item inself.U if item != u] heapq.heapify(self.U) # Rebuild heap # Reinsert if inconsistent if u.g != u.rhs: u.key = self._calculate_key(u) heapq.heappush(self.U, u) # Ensure goal's key is always up-to-date (esp. when start moves) # Goal node doesn't get updated by its neighbors as its RHS is fixed to 0. # But its key might change due to Km or start position changing if u == self.goal: if u in [item for item inself.U]: self.U = [item for item inself.U if item != u] heapq.heapify(self.U) u.key = self._calculate_key(u) heapq.heappush(self.U, u)
if k_old < k_new: # Priority increased, reinsert u.key = k_new heapq.heappush(self.U, u) elif u.g > u.rhs: # Overconsistent, cost can be lowered u.g = u.rhs for s_prime inself._get_neighbors(u): # D* Lite updates predecessors for goal-to-start self._update_vertex(s_prime) else: # Underconsistent, cost needs to be raised u.g = float('inf') # Temporarily set to inf self._update_vertex(u) # Update u itself for s_prime inself._get_neighbors(u): # Update neighbors (predecessors in search direction) self._update_vertex(s_prime)
defreplan(self, new_start_pos=None, changed_edges=None): if new_start_pos: old_start = self.start self.start = self.nodes[(new_start_pos[0], new_start_pos[1])] # Update Km based on actual movement cost # Note: _get_cost assumes n1 and n2 are adjacent. # If robot moves multiple steps, sum costs or calculate direct distance. # Here, we assume a single step. self.Km += self._get_cost(old_start, self.start) # Update the start node itself (its key will be re-calculated in compute_shortest_path check) # and potentially the goal node if its key depends on Km. self._update_vertex(self.start) self._update_vertex(self.goal) # Goal node's key also depends on Km and start node
if changed_edges: for (u_pos, v_pos, new_cost) in changed_edges: u = self.nodes[u_pos] v = self.nodes[v_pos] # In a real scenario, you'd update map internal cost here. # For this demo, let's just make the node an obstacle directly # For simplicity, if new_cost is inf, we mark v as obstacle if new_cost == float('inf'): self.grid_map[v.x][v.y] = 1# Mark v as obstacle else: self.grid_map[v.x][v.y] = 0# Mark v as free (if it was an obstacle) # Update u and its neighbors, and v and its neighbors # When an edge (u, v) changes, both u and v (and their predecessors) might be affected. # It's safer to update all affected nodes that could have their rhs/g values change. self._update_vertex(u) self._update_vertex(v) # A node turning into an obstacle affects its own rhs and neighbors
self.compute_shortest_path()
defget_path(self): path = [] current = self.start while current != self.goal: if current.g == float('inf'): return [] # No path path.append((current.x, current.y)) min_cost = float('inf') next_node = None # Find the successor that leads to the shortest path to goal for neighbor inself._get_neighbors(current): cost_to_neighbor = self._get_cost(current, neighbor) total_cost = cost_to_neighbor + neighbor.g if total_cost < min_cost: min_cost = total_cost next_node = neighbor if next_node isNone: return [] # Stuck, no path current = next_node path.append((self.goal.x, self.goal.y)) return path
# 模拟环境变化:路径上的一个格子变为障碍物 print("\n--- 环境变化:添加障碍物 ---") # 假设 (2,2) 变为障碍物,如果它在路径上 obstacle_pos = (2, 2) # To simulate change, we provide (predecessor_of_obstacle, obstacle_node, new_cost) # The actual _get_cost will check the grid_map. # So we just update the grid_map directly and then call replan. # Let's say (2,1) and (2,3) are neighbors of (2,2). # We update all nodes whose cost/rhs might be affected. # Marking (2,2) as an obstacle. grid[obstacle_pos[0]][obstacle_pos[1]] = 1 # When (2,2) becomes an obstacle, all its neighbors are affected. # Their rhs values might change because (2,2) is no longer a valid successor (or predecessor). # We need to call update_vertex for all neighbors of (2,2) and (2,2) itself. # For simplicity, we just trigger replan which will re-evaluate based on grid_map changes. # In a real system, you would precisely identify changed edges and update involved nodes. # For this example, let's identify the obstacle node and its neighbors obstacle_node = dstar._get_node(obstacle_pos[0], obstacle_pos[1]) affected_nodes = dstar._get_neighbors(obstacle_node) + [obstacle_node] # Include obstacle itself # Manually update affected nodes for node in affected_nodes: dstar._update_vertex(node)
# 模拟环境变化:障碍物消失 print("\n--- 环境变化:障碍物消失 ---") grid[obstacle_pos[0]][obstacle_pos[1]] = 0# Mark (2,2) as free again # Update affected nodes as before for node in affected_nodes: dstar._update_vertex(node) dstar.replan() path_after_clear = dstar.get_path() print(f"({obstacle_pos}) 障碍物消失后的路径:", path_after_clear)
代码解释:
Node 类存储了每个网格单元的坐标,以及 D* Lite 算法所需的 g 值、rhs 值和优先级 key。