| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- """Utility for dumping memory usage stats.
- This is tailored to mypy and knows (a little) about which list objects are
- owned by particular AST nodes, etc.
- """
- from __future__ import annotations
- import gc
- import sys
- from collections import defaultdict
- from typing import Dict, Iterable, cast
- from mypy.nodes import FakeInfo, Node
- from mypy.types import Type
- from mypy.util import get_class_descriptors
- def collect_memory_stats() -> tuple[dict[str, int], dict[str, int]]:
- """Return stats about memory use.
- Return a tuple with these items:
- - Dict from object kind to number of instances of that kind
- - Dict from object kind to total bytes used by all instances of that kind
- """
- objs = gc.get_objects()
- find_recursive_objects(objs)
- inferred = {}
- for obj in objs:
- if type(obj) is FakeInfo:
- # Processing these would cause a crash.
- continue
- n = type(obj).__name__
- if hasattr(obj, "__dict__"):
- # Keep track of which class a particular __dict__ is associated with.
- inferred[id(obj.__dict__)] = f"{n} (__dict__)"
- if isinstance(obj, (Node, Type)): # type: ignore[misc]
- if hasattr(obj, "__dict__"):
- for x in obj.__dict__.values():
- if isinstance(x, list):
- # Keep track of which node a list is associated with.
- inferred[id(x)] = f"{n} (list)"
- if isinstance(x, tuple):
- # Keep track of which node a list is associated with.
- inferred[id(x)] = f"{n} (tuple)"
- for k in get_class_descriptors(type(obj)):
- x = getattr(obj, k, None)
- if isinstance(x, list):
- inferred[id(x)] = f"{n} (list)"
- if isinstance(x, tuple):
- inferred[id(x)] = f"{n} (tuple)"
- freqs: dict[str, int] = {}
- memuse: dict[str, int] = {}
- for obj in objs:
- if id(obj) in inferred:
- name = inferred[id(obj)]
- else:
- name = type(obj).__name__
- freqs[name] = freqs.get(name, 0) + 1
- memuse[name] = memuse.get(name, 0) + sys.getsizeof(obj)
- return freqs, memuse
- def print_memory_profile(run_gc: bool = True) -> None:
- if not sys.platform.startswith("win"):
- import resource
- system_memuse = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
- else:
- system_memuse = -1 # TODO: Support this on Windows
- if run_gc:
- gc.collect()
- freqs, memuse = collect_memory_stats()
- print("%7s %7s %7s %s" % ("Freq", "Size(k)", "AvgSize", "Type"))
- print("-------------------------------------------")
- totalmem = 0
- i = 0
- for n, mem in sorted(memuse.items(), key=lambda x: -x[1]):
- f = freqs[n]
- if i < 50:
- print("%7d %7d %7.0f %s" % (f, mem // 1024, mem / f, n))
- i += 1
- totalmem += mem
- print()
- print("Mem usage RSS ", system_memuse // 1024)
- print("Total reachable ", totalmem // 1024)
- def find_recursive_objects(objs: list[object]) -> None:
- """Find additional objects referenced by objs and append them to objs.
- We use this since gc.get_objects() does not return objects without pointers
- in them such as strings.
- """
- seen = {id(o) for o in objs}
- def visit(o: object) -> None:
- if id(o) not in seen:
- objs.append(o)
- seen.add(id(o))
- for obj in objs.copy():
- if type(obj) is FakeInfo:
- # Processing these would cause a crash.
- continue
- if type(obj) in (dict, defaultdict):
- for key, val in cast(Dict[object, object], obj).items():
- visit(key)
- visit(val)
- if type(obj) in (list, tuple, set):
- for x in cast(Iterable[object], obj):
- visit(x)
- if hasattr(obj, "__slots__"):
- for base in type.mro(type(obj)):
- for slot in getattr(base, "__slots__", ()):
- if hasattr(obj, slot):
- visit(getattr(obj, slot))
|