Source code for crash.cache.syscache

# -*- coding: utf-8 -*-
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79:

from typing import Dict, List, Any

import re
import zlib
from datetime import timedelta

from crash.exceptions import DelayedAttributeError
from crash.cache import CrashCache
from crash.util import array_size
from crash.util.symbols import Types, Symvals, SymbolCallbacks, MinimalSymvals
from crash.infra.lookup import DelayedValue

import gdb

ImageLocation = Dict[str, Dict[str, int]]

[docs]class CrashUtsnameCache(CrashCache): symvals = Symvals(['init_uts_ns']) def __init__(self) -> None: self._utsname_cache_dict: Dict[str, str] = dict() @property def utsname(self) -> gdb.Value: return self.symvals.init_uts_ns['name'] def _init_utsname_cache(self) -> None: d = self._utsname_cache_dict for field in self.utsname.type.fields(): val = self.utsname[field.name].string() d[field.name] = val @property def _utsname_cache(self) -> Dict[str, str]: if not self._utsname_cache_dict: self._init_utsname_cache() return self._utsname_cache_dict def _utsname_field(self, name: str) -> str: try: return self._utsname_cache[name] except KeyError: raise DelayedAttributeError(name) @property def sysname(self) -> str: return self._utsname_field('sysname') @property def nodename(self) -> str: return self._utsname_field('nodename') @property def release(self) -> str: return self._utsname_field('release') @property def version(self) -> str: return self._utsname_field('version') @property def machine(self) -> str: return self._utsname_field('machine') @property def domainname(self) -> str: return self._utsname_field('domainname')
[docs]class CrashConfigCache(CrashCache): types = Types(['char *']) symvals = Symvals(['kernel_config_data']) msymvals = MinimalSymvals(['kernel_config_data', 'kernel_config_data_end']) def __init__(self) -> None: self._config_buffer = "" self._ikconfig_cache: Dict[str, str] = dict() @property def config_buffer(self) -> str: if not self._config_buffer: self._config_buffer = self._decompress_config_buffer() return self._config_buffer @property def ikconfig_cache(self) -> Dict[str, str]: if not self._ikconfig_cache: self._parse_config() return self._ikconfig_cache def __getitem__(self, name: str) -> Any: try: return self.ikconfig_cache[name] except KeyError: return None @staticmethod def _read_buf_bytes(address: int, size: int) -> bytes: return gdb.selected_inferior().read_memory(address, size).tobytes() def _locate_config_buffer_section(self) -> ImageLocation: data_start = int(self.msymvals.kernel_config_data) data_end = int(self.msymvals.kernel_config_data_end) return { 'data' : { 'start' : data_start, 'size' : data_end - data_start, }, 'magic' : { 'start' : data_start - 8, 'end' : data_end, }, } def _locate_config_buffer_typed(self) -> ImageLocation: start = int(self.symvals.kernel_config_data.address) end = start + self.symvals.kernel_config_data.type.sizeof return { 'data' : { 'start' : start + 8, 'size' : end - start - 2*8 - 1, }, 'magic' : { 'start' : start, 'end' : end - 8 - 1, }, } def _verify_image(self, location: ImageLocation) -> None: magic_start = b'IKCFG_ST' magic_end = b'IKCFG_ED' buf_len = len(magic_start) buf = self._read_buf_bytes(location['magic']['start'], buf_len) if buf != magic_start: raise IOError(f"Missing magic_start in kernel_config_data. Got `{buf}'") buf_len = len(magic_end) buf = self._read_buf_bytes(location['magic']['end'], buf_len) if buf != magic_end: raise IOError("Missing magic_end in kernel_config_data. Got `{buf}'") def _decompress_config_buffer(self) -> str: try: location = self._locate_config_buffer_section() except DelayedAttributeError: location = self._locate_config_buffer_typed() self._verify_image(location) # Read the compressed data buf = self._read_buf_bytes(location['data']['start'], location['data']['size']) return zlib.decompress(buf, 16 + zlib.MAX_WBITS).decode('utf-8') def __str__(self) -> str: return self.config_buffer def _parse_config(self) -> None: for line in self.config_buffer.splitlines(): # bin comments line = re.sub("#.*$", "", line).strip() if not line: continue m = re.match("CONFIG_([^=]*)=(.*)", line) if m: self._ikconfig_cache[m.group(1)] = m.group(2)
[docs]class CrashKernelCache(CrashCache): symvals = Symvals(['avenrun']) _adjust_jiffies = False _reset_uptime = True _jiffies_dv = DelayedValue('jiffies') def __init__(self, config_cache: CrashConfigCache) -> None: CrashCache.__init__(self) self.config = config_cache self._hz = -1 self._uptime = timedelta(seconds=0) self._loadavg = "" @property def jiffies(self) -> int: v = self._jiffies_dv.get() return v @property def hz(self) -> int: if self._hz == -1: self._hz = int(self.config['HZ']) return self._hz
[docs] def get_uptime(self) -> timedelta: return self.uptime
@property def uptime(self) -> timedelta: if self._uptime == 0 or self._reset_uptime: uptime = self._adjusted_jiffies() // self.hz self._uptime = timedelta(seconds=uptime) self._reset_uptime = False return self._uptime @property def loadavg(self) -> str: if not self._loadavg: try: metrics = self._get_loadavg_values() self._loadavg = self._format_loadavg(metrics) except DelayedAttributeError: return "Unknown" return self._loadavg def _calculate_loadavg(self, metric: int) -> float: # The kernel needs to do fixed point trickery to calculate # a floating point average. We can just return a float. return round(int(metric) / (1 << 11), 2) def _format_loadavg(self, metrics: List[float]) -> str: out = [] for metric in metrics: out.append(str(metric)) return " ".join(out) def _get_loadavg_values(self) -> List[float]: metrics = [] for index in range(0, array_size(self.symvals.avenrun)): metrics.append(self._calculate_loadavg(self.symvals.avenrun[index])) return metrics
[docs] @classmethod def set_jiffies(cls, value: int) -> None: cls._jiffies_dv.value = None cls._jiffies_dv.callback(value) cls._reset_uptime = True
[docs] @classmethod # pylint: disable=unused-argument def setup_jiffies(cls, symbol: gdb.Symbol) -> bool: jiffies_sym = gdb.lookup_global_symbol('jiffies_64') if jiffies_sym: try: jiffies = int(jiffies_sym.value()) except gdb.MemoryError: return False cls._adjust_jiffies = True else: jiffies_sym = gdb.lookup_global_symbol('jiffies') if not jiffies_sym: return False jiffies = int(jiffies_sym.value()) cls._adjust_jiffies = False cls.set_jiffies(jiffies) return True
def _adjusted_jiffies(self) -> int: if self._adjust_jiffies: return self.jiffies -(int(0x100000000) - 300 * self.hz) return self.jiffies
symbol_cbs = SymbolCallbacks([('jiffies', CrashKernelCache.setup_jiffies), ('jiffies_64', CrashKernelCache.setup_jiffies)]) utsname = CrashUtsnameCache() config = CrashConfigCache() kernel = CrashKernelCache(config)
[docs]def jiffies_to_msec(jiffies: int) -> int: return 1000 // kernel.hz * jiffies