Source code for crash.commands.kmem

#!/usr/bin/python3
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79:
"""
SUMMARY
-------

Kernel memory inspection

::

  kmem addr             - try to find addr within kmem caches
  kmem -s [slabname]    - check consistency of single or all kmem cache
  kmem -z               - report zones
  kmem -V               - report vmstats

DESCRIPTION
-----------

This command currently offers very basic kmem cache query and checking.
"""

from typing import List

import argparse

from crash.commands import Command, ArgumentParser
from crash.commands import CommandError, CommandLineError
from crash.types.slab import kmem_cache_get_all, kmem_cache_from_name
from crash.types.slab import slab_from_obj_addr, KmemCacheNotFound
from crash.types.node import for_each_zone, for_each_populated_zone
from crash.types.vmstat import VmStat
from crash.util import get_symbol_value
from crash.exceptions import MissingSymbolError

[docs]class KmemCommand(Command): """ kernel memory inspection""" def __init__(self, name: str) -> None: parser = ArgumentParser(prog=name) group = parser.add_mutually_exclusive_group() group.add_argument('-s', nargs='?', const=True, default=False, dest='slabname') group.add_argument('-z', action='store_true', default=False) group.add_argument('-V', action='store_true', default=False) group.add_argument('address', nargs='?') super().__init__(name, parser)
[docs] def execute(self, args: argparse.Namespace) -> None: if args.z: self.print_zones() return if args.V: self.print_vmstats() return if args.slabname: if args.slabname is True: print("Checking all kmem caches...") for cache in kmem_cache_get_all(): print(cache.name) cache.check_all() else: cache_name = args.slabname print(f"Checking kmem cache {cache_name}") try: cache = kmem_cache_from_name(cache_name) except KmemCacheNotFound: raise CommandError(f"Cache {cache_name} not found.") cache.check_all() print("Checking done.") return if not args.address: raise CommandLineError("no address specified") try: addr = int(args.address[0], 0) except ValueError: raise CommandLineError("address must be numeric") slab = slab_from_obj_addr(addr) if not slab: raise CommandError("Address not found in any kmem cache.") obj = slab.contains_obj(addr) name = slab.kmem_cache.name if obj[0]: print("ALLOCATED object %x from slab %s" % (obj[1], name)) else: if obj[1] == 0: print("Address on slab %s but not within valid object slot" % name) elif not obj[2]: print("FREE object %x from slab %s" % (obj[1], name)) elif obj[2] is not None: ac = obj[2] ac_type = ac['ac_type'] # pylint: disable=unsubscriptable-object nid_tgt = int(ac['nid_tgt']) # pylint: disable=unsubscriptable-object if ac_type == "percpu": ac_desc = "cpu %d cache" % nid_tgt elif ac_type == "shared": ac_desc = "shared cache on node %d" % nid_tgt elif ac_type == "alien": nid_src = int(ac['nid_src']) # pylint: disable=unsubscriptable-object ac_desc = "alien cache of node %d for node %d" % \ (nid_src, nid_tgt) else: raise CommandError(f"unexpected array cache type {str(ac)}") print("FREE object %x from slab %s (in %s)" % (obj[1], name, ac_desc)) else: raise RuntimeError("odd return value from contains_obj")
def __print_vmstat(self, vmstat: List[int], diffs: List[int]) -> None: vmstat_names = VmStat.get_stat_names() just = max(map(len, vmstat_names)) nr_items = VmStat.nr_stat_items vmstat = [sum(x) for x in zip(vmstat, diffs)] for i in range(0, nr_items): print("%s: %d (%d)" % (vmstat_names[i].rjust(just), vmstat[i], diffs[i]))
[docs] def print_vmstats(self) -> None: try: vm_stat = get_symbol_value("vm_stat") except MissingSymbolError: raise CommandError("Support for new-style vmstat is unimplemented.") print(" VM_STAT:") #TODO put this... where? nr_items = VmStat.nr_stat_items stats = [0] * nr_items for item in range(0, nr_items): # TODO abstract atomic? stats[item] = int(vm_stat[item]["counter"]) diffs = [0] * nr_items for zone in for_each_populated_zone(): zone.add_vmstat_diffs(diffs) self.__print_vmstat(stats, diffs) print() print(" VM_EVENT_STATES:") vm_events = VmStat.get_events() names = VmStat.get_event_names() just = max(map(len, names)) for name, val in zip(names, vm_events): print("%s: %d" % (name.rjust(just), val))
[docs] def print_zones(self) -> None: for zone in for_each_zone(): zone_struct = zone.gdb_obj print("NODE: %d ZONE: %d ADDR: %x NAME: \"%s\"" % (int(zone_struct["node"]), zone.zid, int(zone_struct.address), zone_struct["name"].string())) if not zone.is_populated(): print(" [unpopulated]") print() continue print(" VM_STAT:") vmstat = zone.get_vmstat() diffs = zone.get_vmstat_diffs() self.__print_vmstat(vmstat, diffs) print()
KmemCommand("kmem")