graph_utils.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. """Helpers for manipulations with graphs."""
  2. from __future__ import annotations
  3. from typing import AbstractSet, Iterable, Iterator, TypeVar
  4. T = TypeVar("T")
  5. def strongly_connected_components(
  6. vertices: AbstractSet[T], edges: dict[T, list[T]]
  7. ) -> Iterator[set[T]]:
  8. """Compute Strongly Connected Components of a directed graph.
  9. Args:
  10. vertices: the labels for the vertices
  11. edges: for each vertex, gives the target vertices of its outgoing edges
  12. Returns:
  13. An iterator yielding strongly connected components, each
  14. represented as a set of vertices. Each input vertex will occur
  15. exactly once; vertices not part of a SCC are returned as
  16. singleton sets.
  17. From https://code.activestate.com/recipes/578507/.
  18. """
  19. identified: set[T] = set()
  20. stack: list[T] = []
  21. index: dict[T, int] = {}
  22. boundaries: list[int] = []
  23. def dfs(v: T) -> Iterator[set[T]]:
  24. index[v] = len(stack)
  25. stack.append(v)
  26. boundaries.append(index[v])
  27. for w in edges[v]:
  28. if w not in index:
  29. yield from dfs(w)
  30. elif w not in identified:
  31. while index[w] < boundaries[-1]:
  32. boundaries.pop()
  33. if boundaries[-1] == index[v]:
  34. boundaries.pop()
  35. scc = set(stack[index[v] :])
  36. del stack[index[v] :]
  37. identified.update(scc)
  38. yield scc
  39. for v in vertices:
  40. if v not in index:
  41. yield from dfs(v)
  42. def prepare_sccs(
  43. sccs: list[set[T]], edges: dict[T, list[T]]
  44. ) -> dict[AbstractSet[T], set[AbstractSet[T]]]:
  45. """Use original edges to organize SCCs in a graph by dependencies between them."""
  46. sccsmap = {v: frozenset(scc) for scc in sccs for v in scc}
  47. data: dict[AbstractSet[T], set[AbstractSet[T]]] = {}
  48. for scc in sccs:
  49. deps: set[AbstractSet[T]] = set()
  50. for v in scc:
  51. deps.update(sccsmap[x] for x in edges[v])
  52. data[frozenset(scc)] = deps
  53. return data
  54. def topsort(data: dict[T, set[T]]) -> Iterable[set[T]]:
  55. """Topological sort.
  56. Args:
  57. data: A map from vertices to all vertices that it has an edge
  58. connecting it to. NOTE: This data structure
  59. is modified in place -- for normalization purposes,
  60. self-dependencies are removed and entries representing
  61. orphans are added.
  62. Returns:
  63. An iterator yielding sets of vertices that have an equivalent
  64. ordering.
  65. Example:
  66. Suppose the input has the following structure:
  67. {A: {B, C}, B: {D}, C: {D}}
  68. This is normalized to:
  69. {A: {B, C}, B: {D}, C: {D}, D: {}}
  70. The algorithm will yield the following values:
  71. {D}
  72. {B, C}
  73. {A}
  74. From https://code.activestate.com/recipes/577413/.
  75. """
  76. # TODO: Use a faster algorithm?
  77. for k, v in data.items():
  78. v.discard(k) # Ignore self dependencies.
  79. for item in set.union(*data.values()) - set(data.keys()):
  80. data[item] = set()
  81. while True:
  82. ready = {item for item, dep in data.items() if not dep}
  83. if not ready:
  84. break
  85. yield ready
  86. data = {item: (dep - ready) for item, dep in data.items() if item not in ready}
  87. assert not data, f"A cyclic dependency exists amongst {data!r}"