moprofiler.stopwatch 源代码

# encoding=utf8
"""
提供用于计时打点的秒表工具
"""
from __future__ import absolute_import

import logging
import os
import time
import types  # pylint: disable=W0611
from contextlib import contextmanager
from functools import wraps

import psutil
from line_profiler import is_generator

from . import base

LOG = logging.getLogger(__name__)


[文档]class Stopwatch(object): # pylint: disable=R0902 """秒表类""" LOGGING_LEVEL_DEFAULT = logging.INFO DOTTING_FMT_DEFAULT = '[性能] 当前耗时({idx}): {time_diff:.4f}s, 累计耗时: {time_total:.4f}s' DOTTING_FMT_WITH_MEM_DEFAULT = DOTTING_FMT_DEFAULT + \ ', 当前变化: {mem_diff:2d}M, 累计变化: {mem_total:2d}M' FINAL_FMT_ARGS_DEFAULT = '[性能] {name}, 参数列表: {args} {kwargs}, 耗时: {time_use:.4f}s' FINAL_FMT_DEFAULT = '[性能] {name}, 耗时: {time_use:.4f}s' FINAL_FMT_ARGS_WITH_MEM_DEFAULT = FINAL_FMT_ARGS_DEFAULT + ', 内存变化: {mem_use:2d}M' FINAL_FMT_WITH_MEM_DEFAULT = FINAL_FMT_DEFAULT + ', 内存变化: {mem_use:2d}M' def __init__(self): self.time_buf = [] #: 用来存储计时打点时间 self.mem_buf = [] #: 用来存储打点内存 self.name = '' #: 秒表的名称,可在装饰时设置,默认为使用被装饰方法的方法名 self.dkwargs = {} #: 用来存储最终输出时使用的变量 self.dotting_param_pre = {'kwargs': {}} #: 用来记录上次打点输出时的参数信息 #: 用来日志输出的 logger self.logger = None # type: logging.Logger self.logging_level = None #: 日志输出级别 self.final_fmt = '' #: 输出最终计时结果的字符串模板 self.is_print_memory = False #: 是否打印内存 def __call__(self, func, wrap_param): """ 装饰一个函数用来启动一个秒表 :param func: 被装饰的函数 or 方法 :type func: types.FunctionType or types.MethodType :param dict wrap_param: 额外的关键字参数字典 :return: 装饰后的函数 or 方法 :rtype: types.FunctionType or types.MethodType """ if is_generator(func): wrapper = self.wrap_generator(func, wrap_param) else: wrapper = self.wrap_function(func, wrap_param) return wrapper def _init_param(self, wrap_param): """ 初始化对象属性 :param dict wrap_param: 封装时传入的参数字典 """ self.name = wrap_param.get('name') self.logger = wrap_param.get('logger') or LOG # type: logging.Logger self.logging_level = wrap_param.get('logging_level') or self.LOGGING_LEVEL_DEFAULT self.dkwargs = wrap_param.get('dkwargs') self.is_print_memory = wrap_param.get('print_mem') or False if self.is_print_memory: self.final_fmt = wrap_param.get('fmt') or ( self.FINAL_FMT_ARGS_WITH_MEM_DEFAULT if wrap_param.get('print_args') else \ self.FINAL_FMT_WITH_MEM_DEFAULT) else: self.final_fmt = wrap_param.get('fmt') or ( self.FINAL_FMT_ARGS_DEFAULT if wrap_param.get('print_args') else \ self.FINAL_FMT_DEFAULT) def _start(self, func, args, kwargs): """ 启动秒表 :param func: 被封装的函数,由解释器自动传入,不需关心 :type func: types.FunctionType or types.MethodType :param list args: 被装饰函数被调用时的位置参数 :param dict kwargs: 被装饰函数被调用时的关键字参数 """ self.name = self.name or func.__name__ _begin_time = time.time() fmt_dict = { 'name': self.name, 'args': args, 'kwargs': kwargs, 'begin_time': _begin_time, } self.time_buf.append(_begin_time) _begin_mem = self._get_mem_info() self.mem_buf.append(_begin_mem) fmt_dict['begin_mem'] = _begin_mem self.dkwargs.update(fmt_dict) def _end(self): """ 结束秒表 """ _end_time = time.time() self.time_buf.append(_end_time) self.dkwargs['end_time'] = _end_time self.dkwargs['time_use'] = _end_time - self.dkwargs['begin_time'] if self.is_print_memory: _end_mem = self._get_mem_info() self.mem_buf.append(_end_mem) self.dkwargs['end_mem'] = _end_mem self.dkwargs['mem_use'] = _end_mem - self.dkwargs['begin_mem'] self.logger.log(self.logging_level, self.final_fmt.format(**self.dkwargs)) @staticmethod def _get_mem_info(): """ 获取内存信息 :return: 当前进程已用内存(rss),单位 ``MB`` :rtype: int """ process = psutil.Process(os.getpid()) mem = process.memory_info().rss >> 20 # 为速度考虑,使用移位操作 return mem
[文档] def wrap_generator(self, func, wrap_param): """ 封装一个生成器从而使用秒表对其进行观察 :param func: 被封装的函数,由解释器自动传入,不需关心 :type func: types.FunctionType or types.MethodType :param dict wrap_param: 封装时传入的参数字典 :return: 封装后的方法 :rtype: types.FunctionType or types.MethodType """ self._init_param(wrap_param) @wraps(func) def inner(*args, **kwargs): """ 在被装饰函数执行前后进行秒表计时 """ self._start(func, args, kwargs) g = func(*args, **kwargs) # 第一次迭代将不调用 send() item = next(g) _input = (yield item) try: while True: item = g.send(_input) _input = (yield item) except StopIteration: self._end() return inner
[文档] def wrap_function(self, func, wrap_param): """ 封装一个函数从而使用秒表对其进行观察 :param func: 被封装的函数,由解释器自动传入,不需关心 :type func: types.FunctionType or types.MethodType :param dict wrap_param: 封装时传入的参数字典 :return: 封装后的方法 :rtype: types.FunctionType or types.MethodType """ self._init_param(wrap_param) @wraps(func) def inner(*args, **kwargs): """ 在被装饰函数执行前后进行秒表计时 """ self._start(func, args, kwargs) result = func(*args, **kwargs) self._end() return result return inner
[文档] def dotting(self, fmt='', logging_level=None, memory=False, mute=False, **kwargs): """ 输出打点日志 会在打点时记录当前的时间&进程内存使用(若 ``memory`` 为 ``True``),并将其与上次的打点记录做差, 分别获取时间差 ``time_diff`` 、内存差 ``memory_diff`` ,再将其与进入函数时的记录做差, 分别获取到时间差 ``time_total`` 、内存差 ``memory_total`` 。 将以上四个变量传入 ``fmt`` 模板中格式化后输出到 ``log`` 。 除上述变量外,您还可以使用装饰器参数中指定的变量。 注意仅当 ``memory`` 为真时,才会记录日志情况,否则将直接跳过。 日志输出模板中可使用变量如下: #. ``idx`` : 当前打点的序号,从 *1* 开始 #. ``time_diff`` : 距上次打点间的时间差 #. ``time_total`` : 距函数/方法开始时的时间差 #. ``memory_diff`` : 距上次打点(memory=True)间的内存差,跳过未记录内存的时间打点,直到函数/方法进入时的内存记录 #. ``memory_total`` : 距函数/方法开始时的内存差 :param str fmt: 用来输出打点日志的格式化模板,需使用 format 的占位符格式 :param int logging_level: 日志输出级别,默认使用装饰当前方法时设置的级别,若无则使用类属性中定义的默认值 :param bool memory: 是否记录内存使用,默认为 False :param bool mute: 静默打点,默认为 False ,若设为 True ,则当次仅记录时间/内存,不执行任何输出逻辑 """ self.time_buf.append(time.time()) if memory: self.mem_buf.append(self._get_mem_info()) if mute: return idx = len(self.time_buf) - 1 _fmt = fmt or (self.DOTTING_FMT_WITH_MEM_DEFAULT if memory else self.DOTTING_FMT_DEFAULT) _level = logging_level or self.logging_level kwargs.update(self.dkwargs) self.logger.log(_level, _fmt.format( time_diff=self.time_buf[-1] - self.time_buf[-2], time_total=self.time_buf[-1] - self.time_buf[0], mem_diff=(self.mem_buf[-1] - self.mem_buf[-2]) if memory else None, mem_total=(self.mem_buf[-1] - self.mem_buf[0]) if memory else None, idx=idx, **kwargs))
[文档]class StopwatchMixin(base.ProfilerMixin): """ 秒表 Mixin 类 用以提供复杂的秒表功能,如: #. 针对需要多次调用的方法进行累加记录的场景 #. 在一次代码执行流程中同时记录多个方法,并灵活控制记录结果的输出 """ stopwatch = None # type: Stopwatch @classmethod @contextmanager def _get_stopwatch(cls, self_or_cls, **callargs): """ 获取秒表对象 :param object self_or_cls: 被代理的对象 or 类 :param dict callargs: 调用该上下文管理器时传入的所有调用参数 :return: 返回被代理的对象 or 类 :rtype: Iterator[base.Proxy] """ with super(StopwatchMixin, cls)._get_profiler( self_or_cls, **callargs) as _self_or_cls: yield base.proxy( _self_or_cls, prop_name='stopwatch', prop=Stopwatch())
[文档]def stopwatch( _function=None, print_args=False, logger=None, print_mem=False, fmt='', name='', logging_level=logging.INFO, **dkwargs): """ 返回秒表监控下的函数或方法 通过额外的关键字参数,支持配置自定义的值到输出模板中 日志输出模板中可使用变量如下: #. ``name`` : 当前秒表名称 #. ``args`` : 被装饰函数/方法的位置参数 #. ``kwargs`` : 被装饰函数/方法 #. ``time_use`` : 函数/方法执行耗时 #. ``mem_use`` : 函数/方法执行内存 :param _function: 被封装的对象,由解释器自动传入,不需关心 :type _function: types.FunctionType or types.MethodType :param bool print_args: 是否打印被装饰函数的参数列表,若含有较长的参数,可能造成日志过长,开启时请注意 :param logging.Logger logger: 可传入指定的日志对象,便于统一输出样式,默认使用该模块中的全局 logger :param bool print_mem: 是否在方法退出时打印内存信息,默认为 False :param str fmt: 用于格式化输出的模板,可在了解所有内置参数变量后自行定制输出样式,若指定该参数则会忽略 print_args :param str name: 关键字参数,被装饰方法代理生成的 stopwatch 所使用的名称,默认为使用被装饰方法的方法名 :param int logging_level: 打印日志的级别,默认为 INFO :return: 装饰后的函数 :rtype: types.FunctionType or types.MethodType """ invoked = bool(_function and callable(_function)) if invoked: func = _function # type: types.FunctionType or types.MethodType wrap_param = { 'print_args': print_args, 'logger': logger, 'fmt': fmt, 'name': name, 'logging_level': logging_level, 'dkwargs': dkwargs, 'print_mem': print_mem, } def wrapper(func): """ 装饰器封装函数 """ @wraps(func) def inner(*args, **kwargs): """ 使用秒表封装被装饰对象 若被封装方法的首个位置参数继承了 StopwatchMixin 则将被封装方法所用的 self_or_cls 进行代理,从而在方法内可以访问到被代理的 stopwatch 属性 若被封装方法的首个位置参数未继承 StopwatchMixin 则作为普通函数处理 :param list args: 位置参数 """ if not (args and base.is_instance_or_subclass(args[0], StopwatchMixin)): # 若当前被装饰的方法未继承 StopwatchMixin ,则将其作为普通函数装饰 _stopwatch_wrapper = Stopwatch()(func, wrap_param) return _stopwatch_wrapper(*args, **kwargs) self_or_cls = args[0] # type: StopwatchMixin callargs = base.get_callargs(func, *args, **kwargs) callargs.pop("cls", None) with self_or_cls._get_stopwatch(self_or_cls, **callargs) as _self_or_cls: _stopwatch_wrapper = _self_or_cls.stopwatch(func, wrap_param) return _stopwatch_wrapper(_self_or_cls, *args[1:], **kwargs) return inner return wrapper if not invoked else wrapper(func)