How does this tridirectional uniform cost search work and how can…

Question Answered step-by-step How does this tridirectional uniform cost search work and how can… How does this tridirectional uniform cost search work and how can make it more concise?  def tridirectional_search(graph, goals): “”” Exercise 3: Tridirectional UCS Search See README.MD for exercise description. Args: graph (ExplorableGraph): Undirected graph to search. goals (list): Key values for the 3 goals Returns: The best path as a list from one of the goal nodes (including both of the other goal nodes). “”” if goals[0] == goals[1] and goals[0] == goals[2]: return [] # Explored dictionaries have entries of (cost from origin, parent) frontier = [PriorityQueue(), PriorityQueue(), PriorityQueue()] explored = [{}, {}, {}] node_data = [{}, {}, {}] # False until the initial meeting of two searches occurs meet = False path = [[], []] # Best node met at so far (state, cost) best_meet = None for i in range(0,3): frontier[i].append((0, goals[i])) node_data[i][goals[i]] = (0, None) while not meet: # Alternate, exploring forward and backward for i in range(0,3): cost, exploring = frontier[i].pop() if exploring in explored[(i + 1) % 3]: other_idx = (i + 1) % 3 cont_idx = (i + 2) % 3 elif exploring in explored[(i + 2) % 3]: other_idx = (i + 2) % 3 cont_idx = (i + 1) % 3 elif exploring not in explored[i]: children = graph[exploring] explored[i][exploring] = node_data[i].pop(exploring) parent_cost = explored[i][exploring][0] for state in children: g = children[state][‘weight’] + parent_cost if state in node_data[i] and node_data[i][state][0] > g: del node_data[i][state] if state not in explored[i] and state not in node_data[i]: node_data[i][state] = (g, exploring) frontier[i].append((g, state)) continue else: continue # Add exploring to the dict of explored states explored[i][exploring] = node_data[i].pop(exploring) # Create the union of the frontier and the explored sets for the set we connect with tmp_explored = merge_explored_frontier(explored[other_idx], frontier[other_idx], node_data[other_idx]) explored[other_idx] = tmp_explored # Provide the initial meeting point between both explored sets best_meet = (exploring, cost + explored[other_idx][exploring][0]) # Iterate through to see if there is a better meeting point best_meet = update_best_meet(best_meet, explored[other_idx], explored[i]) # Write to the path parent = best_meet[0] while parent: path[0].insert(0, parent) parent = explored[i][parent][-1] parent = explored[other_idx][best_meet[0]][-1] while parent: path[0].append(parent) parent = explored[other_idx][parent][-1] # Union the current explored set with the frontier to aid in finding shortest path for search continuing explored[i] = merge_explored_frontier(explored[i], frontier[i], node_data[i]) meet = True break # Now continue search for the non-intersected explored set # while the state exploring is not in either other stopped explored sets while True: cost, exploring = frontier[cont_idx].pop() # If the state being explored is found in one of the sets, search it to find the lowest cost path # Also, search the other explored set to see if it has a lower cost path if exploring not in explored[cont_idx]: children = graph[exploring] explored[cont_idx][exploring] = node_data[cont_idx].pop(exploring) parent_cost = explored[cont_idx][exploring][0] for state in children: g = children[state][‘weight’] + parent_cost if state in node_data[cont_idx] and node_data[cont_idx][state][0] > g: del node_data[cont_idx][state] if state not in explored[cont_idx] and state not in node_data[cont_idx]: node_data[cont_idx][state] = (g, exploring) frontier[cont_idx].append((g, state)) if exploring in explored[(cont_idx + 1) % 3]: other_idx = (cont_idx + 1) % 3 best_meet = (exploring, cost + explored[other_idx][exploring][0]) best_meet = update_best_meet(best_meet, explored[other_idx], explored[cont_idx]) best_meet = update_best_meet(best_meet, explored[other_idx], node_data[cont_idx]) next_best_meet = update_best_meet(best_meet, explored[(cont_idx + 2) % 3], explored[cont_idx]) next_best_meet = update_best_meet(next_best_meet, explored[(cont_idx + 2) % 3], node_data[cont_idx]) if next_best_meet[-1] < best_meet[-1]: best_meet = next_best_meet other_idx = (cont_idx + 2) %3 elif exploring in explored[(cont_idx + 2) % 3]: other_idx = (cont_idx + 2) % 3 best_meet = (exploring, cost + explored[other_idx][exploring][0]) best_meet = update_best_meet(best_meet, explored[other_idx], explored[cont_idx]) continue else: continue parent = best_meet[0] while parent: path[1].insert(0, parent) if(parent in explored[cont_idx]): parent = explored[cont_idx][parent][-1] else: parent = node_data[cont_idx][parent][-1] parent = explored[other_idx][best_meet[0]][-1] while parent: path[1].append(parent) if parent in path[0]: if parent in explored[(cont_idx + 2) % 3] and parent in explored[(cont_idx + 1) % 3]: if explored[(cont_idx + 2) % 3][parent][0] < explored[(cont_idx + 1) % 3][parent][0]: other_idx = (cont_idx + 2) % 3 else: other_idx = (cont_idx + 1) % 3 parent = explored[other_idx][parent][-1] break if set(path[1]).issubset(set(path[0])): return path[0] elif set(path[0]).issubset(set(path[1])): return path[1] if path[0][-1] == path[1][-1]: path[1].reverse() elif path[0][0] == path[1][-1]: path[1].pop() return path[1] + path[0] path[1].pop(0) for state in path[1]: path[0].append(state) return path[0] class PriorityQueue(object): """ A queue structure where each element is served in order of priority. Elements in the queue are popped based on the priority with higher priority elements being served before lower priority elements. If two elements have the same priority, they will be served in the order they were added to the queue. Traditionally priority queues are implemented with heaps, but there are any number of implementation options. (Hint: take a look at the module heapq) Attributes: queue (list): Nodes added to the priority queue. current (int): The index of the current node in the queue. """ def __init__(self): """Initialize a new Priority Queue.""" self.queue = [] self.index = 0 self.my_nodes = {} def pop(self): """ Pop top priority node from queue. Returns: The node with the highest priority. """ # raise NotImplementedError if self.queue: priority, index, key = heapq.heappop(self.queue) del self.my_nodes[index] return (priority, key) raise Exception('Queue is empty!') def remove(self, node_id): """ Remove a node from the queue. This is a hint, you might require this in ucs, however, if you choose not to use it, you are free to define your own method and not use it. Args: node_id (int): Index of node in queue. """ for element in self.queue: if node_id == element[2]: del self.my_nodes[element[1]] self.queue.remove(element) def __iter__(self): """Queue iterator.""" return iter(sorted(self.queue)) def __str__(self): """Priority Queue to string.""" return 'PQ:%s' % self.queue def append(self, node): """ Append a node to the queue. Args: node: Comparable Object to be added to the priority queue. """ # priority, count, node_id self.index += 1 priority, key = node node_list = [priority, self.index, key] self.my_nodes[self.index] = node_list heapq.heappush(self.queue, node_list) def __contains__(self, key): """ Containment Check operator for 'in' Args: key: The key to check for in the queue. Returns: True if key is found in queue, False otherwise. """ return key in [n for _,_,n in self.queue] def __eq__(self, other): """ Compare this Priority Queue with another Priority Queue. Args: other (PriorityQueue): Priority Queue to compare against. Returns: True if the two priority queues are equivalent. """ return self == other def size(self): """ Get the current size of the queue. Returns: Integer of number of items in queue. """ return len(self.queue) def clear(self): """Reset queue to empty (no nodes).""" self.queue = [] def top(self): """ Get the top item in the queue. Returns: The first item stored in teh queue. """ return self.queue[0] # helper for bidirectional_ucs def update_best_meet(best_meet, total_explored, this_explored): this_cost = float('inf') for key in this_explored: if key in total_explored: this_cost = this_explored[key][0] + total_explored[key][0] if this_cost < best_meet[1]: best_meet = (key, this_cost) return best_meet# helper for bidirectional_ucs def merge_explored_frontier(explored, frontier, node_cost): while frontier.size() > 0: node = frontier.pop() tmp_node = node[1] if tmp_node not in explored: explored[tmp_node] = node_cost[tmp_node] return explored  Computer Science Engineering & Technology Python Programming CSC 8520 Share QuestionEmailCopy link Comments (0)