#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
try:
import cPickle as pickle
except ImportError:
import pickle
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
import functools
import inspect
import linecache
import optparse
import os
import sys
from IPython.core.magic import (Magics, magics_class, line_magic)
from IPython.core.page import page
from IPython.utils.ipstruct import Struct
from IPython.core.error import UsageError
from _line_profiler import LineProfiler as CLineProfiler
# Python 2/3 compatibility utils
# ===========================================================
PY3 = sys.version_info[0] == 3
PY35 = PY3 and sys.version_info[1] >= 5
# exec (from https://bitbucket.org/gutworth/six/):
if PY3:
import builtins
exec_ = getattr(builtins, "exec")
del builtins
else:
def exec_(_code_, _globs_=None, _locs_=None):
"""Execute code in a namespace."""
if _globs_ is None:
frame = sys._getframe(1)
_globs_ = frame.f_globals
if _locs_ is None:
_locs_ = frame.f_locals
del frame
elif _locs_ is None:
_locs_ = _globs_
exec("""exec _code_ in _globs_, _locs_""")
if PY35:
import inspect
def is_coroutine(f):
return inspect.iscoroutinefunction(f)
else:
def is_coroutine(f):
return False
# ============================================================
CO_GENERATOR = 0x0020
def is_generator(f):
""" Return True if a function is a generator.
"""
isgen = (f.__code__.co_flags & CO_GENERATOR) != 0
return isgen
class LineProfiler(CLineProfiler):
""" A profiler that records the execution times of individual lines.
"""
def __call__(self, func):
""" Decorate a function to start the profiler on function entry and stop
it on function exit.
"""
self.add_function(func)
if is_coroutine(func):
wrapper = self.wrap_coroutine(func)
elif is_generator(func):
wrapper = self.wrap_generator(func)
else:
wrapper = self.wrap_function(func)
return wrapper
def wrap_generator(self, func):
""" Wrap a generator to profile it.
"""
@functools.wraps(func)
def wrapper(*args, **kwds):
g = func(*args, **kwds)
# The first iterate will not be a .send()
self.enable_by_count()
try:
item = next(g)
finally:
self.disable_by_count()
input = (yield item)
# But any following one might be.
while True:
self.enable_by_count()
try:
item = g.send(input)
finally:
self.disable_by_count()
input = (yield item)
return wrapper
def wrap_function(self, func):
""" Wrap a function to profile it.
"""
@functools.wraps(func)
def wrapper(*args, **kwds):
self.enable_by_count()
try:
result = func(*args, **kwds)
finally:
self.disable_by_count()
return result
return wrapper
if PY35:
import line_profiler_py35
wrap_coroutine = line_profiler_py35.wrap_coroutine
def dump_stats(self, filename):
""" Dump a representation of the data to a file as a pickled LineStats
object from `get_stats()`.
"""
lstats = self.get_stats()
with open(filename, 'wb') as f:
pickle.dump(lstats, f, pickle.HIGHEST_PROTOCOL)
def print_stats(self, stream=None, output_unit=None, stripzeros=False):
""" Show the gathered statistics.
"""
lstats = self.get_stats()
show_text(lstats.timings, lstats.unit, output_unit=output_unit, stream=stream, stripzeros=stripzeros)
def run(self, cmd):
""" Profile a single executable statment in the main namespace.
"""
import __main__
main_dict = __main__.__dict__
return self.runctx(cmd, main_dict, main_dict)
def runctx(self, cmd, globals, locals):
""" Profile a single executable statement in the given namespaces.
"""
self.enable_by_count()
try:
exec_(cmd, globals, locals)
finally:
self.disable_by_count()
return self
def runcall(self, func, *args, **kw):
""" Profile a single function call.
"""
self.enable_by_count()
try:
return func(*args, **kw)
finally:
self.disable_by_count()
def add_module(self, mod):
""" Add all the functions in a module and its classes.
"""
from inspect import isclass, isfunction
nfuncsadded = 0
for item in mod.__dict__.values():
if isclass(item):
for k, v in item.__dict__.items():
if isfunction(v):
self.add_function(v)
nfuncsadded += 1
elif isfunction(item):
self.add_function(item)
nfuncsadded += 1
return nfuncsadded
def show_func(filename, start_lineno, func_name, timings, unit,
output_unit=None, stream=None, stripzeros=False):
""" Show results for a single function.
"""
if stream is None:
stream = sys.stdout
template = '%6s %9s %12s %8s %8s %-s'
d = {}
total_time = 0.0
linenos = []
for lineno, nhits, time in timings:
total_time += time
linenos.append(lineno)
if stripzeros and total_time == 0:
return
if output_unit is None:
output_unit = unit
scalar = unit / output_unit
stream.write("Total time: %g s\n" % (total_time * unit))
if os.path.exists(filename) or filename.startswith("<ipython-input-"):
stream.write("File: %s\n" % filename)
stream.write("Function: %s at line %s\n" % (func_name, start_lineno))
if os.path.exists(filename):
# Clear the cache to ensure that we get up-to-date results.
linecache.clearcache()
all_lines = linecache.getlines(filename)
sublines = inspect.getblock(all_lines[start_lineno-1:])
else:
stream.write("\n")
stream.write("Could not find file %s\n" % filename)
stream.write("Are you sure you are running this program from the same directory\n")
stream.write("that you ran the profiler from?\n")
stream.write("Continuing without the function's contents.\n")
# Fake empty lines so we can see the timings, if not the code.
nlines = max(linenos) - min(min(linenos), start_lineno) + 1
sublines = [''] * nlines
for lineno, nhits, time in timings:
d[lineno] = (nhits,
'%5.1f' % (time * scalar),
'%5.1f' % (float(time) * scalar / nhits),
'%5.1f' % (100 * time / total_time) )
linenos = range(start_lineno, start_lineno + len(sublines))
empty = ('', '', '', '')
header = template % ('Line #', 'Hits', 'Time', 'Per Hit', '% Time',
'Line Contents')
stream.write("\n")
stream.write(header)
stream.write("\n")
stream.write('=' * len(header))
stream.write("\n")
for lineno, line in zip(linenos, sublines):
nhits, time, per_hit, percent = d.get(lineno, empty)
txt = template % (lineno, nhits, time, per_hit, percent,
line.rstrip('\n').rstrip('\r'))
stream.write(txt)
stream.write("\n")
stream.write("\n")
def show_text(stats, unit, output_unit=None, stream=None, stripzeros=False):
""" Show text for the given timings.
"""
if stream is None:
stream = sys.stdout
if output_unit is not None:
stream.write('Timer unit: %g s\n\n' % output_unit)
else:
stream.write('Timer unit: %g s\n\n' % unit)
for (fn, lineno, name), timings in sorted(stats.items()):
show_func(fn, lineno, name, stats[fn, lineno, name], unit,
output_unit=output_unit, stream=stream, stripzeros=stripzeros)
@magics_class
class LineProfilerMagics(Magics):
@line_magic
def lprun(self, parameter_s=''):
""" Execute a statement under the line-by-line profiler from the
line_profiler module.
Usage:
%lprun -f func1 -f func2 <statement>
The given statement (which doesn't require quote marks) is run via the
LineProfiler. Profiling is enabled for the functions specified by the -f
options. The statistics will be shown side-by-side with the code through the
pager once the statement has completed.
Options:
-f <function>: LineProfiler only profiles functions and methods it is told
to profile. This option tells the profiler about these functions. Multiple
-f options may be used. The argument may be any expression that gives
a Python function or method object. However, one must be careful to avoid
spaces that may confuse the option parser.
-m <module>: Get all the functions/methods in a module
One or more -f or -m options are required to get any useful results.
-D <filename>: dump the raw statistics out to a pickle file on disk. The
usual extension for this is ".lprof". These statistics may be viewed later
by running line_profiler.py as a script.
-T <filename>: dump the text-formatted statistics with the code side-by-side
out to a text file.
-r: return the LineProfiler object after it has completed profiling.
-s: strip out all entries from the print-out that have zeros.
-u: specify time unit for the print-out in seconds.
"""
# Escape quote markers.
opts_def = Struct(D=[''], T=[''], f=[], m=[], u=None)
parameter_s = parameter_s.replace('"', r'\"').replace("'", r"\'")
opts, arg_str = self.parse_options(parameter_s, 'rsf:m:D:T:u:', list_all=True)
opts.merge(opts_def)
global_ns = self.shell.user_global_ns
local_ns = self.shell.user_ns
# Get the requested functions.
funcs = []
for name in opts.f:
try:
funcs.append(eval(name, global_ns, local_ns))
except Exception as e:
raise UsageError('Could not find function %r.\n%s: %s' % (name,
e.__class__.__name__, e))
profile = LineProfiler(*funcs)
# Get the modules, too
for modname in opts.m:
try:
mod = __import__(modname, fromlist=[''])
profile.add_module(mod)
except Exception as e:
raise UsageError('Could not find module %r.\n%s: %s' % (modname,
e.__class__.__name__, e))
if opts.u is not None:
try:
output_unit = float(opts.u[0])
except Exception as e:
raise TypeError("Timer unit setting must be a float.")
else:
output_unit = None
# Add the profiler to the builtins for @profile.
if PY3:
import builtins
else:
import __builtin__ as builtins
if 'profile' in builtins.__dict__:
had_profile = True
old_profile = builtins.__dict__['profile']
else:
had_profile = False
old_profile = None
builtins.__dict__['profile'] = profile
try:
try:
profile.runctx(arg_str, global_ns, local_ns)
message = ''
except SystemExit:
message = """*** SystemExit exception caught in code being profiled."""
except KeyboardInterrupt:
message = ("*** KeyboardInterrupt exception caught in code being "
"profiled.")
finally:
if had_profile:
builtins.__dict__['profile'] = old_profile
# Trap text output.
stdout_trap = StringIO()
profile.print_stats(stdout_trap, output_unit=output_unit, stripzeros='s' in opts)
output = stdout_trap.getvalue()
output = output.rstrip()
page(output)
print(message, end="")
dump_file = opts.D[0]
if dump_file:
profile.dump_stats(dump_file)
print('\n*** Profile stats pickled to file %r. %s' % (
dump_file, message))
text_file = opts.T[0]
if text_file:
pfile = open(text_file, 'w')
pfile.write(output)
pfile.close()
print('\n*** Profile printout saved to text file %r. %s' % (
text_file, message))
return_value = None
if 'r' in opts:
return_value = profile
return return_value
def load_ipython_extension(ip):
""" API for IPython to recognize this module as an IPython extension.
"""
ip.register_magics(LineProfilerMagics)
def load_stats(filename):
""" Utility function to load a pickled LineStats object from a given
filename.
"""
with open(filename, 'rb') as f:
return pickle.load(f)
def main():
usage = "usage: %prog profile.lprof"
parser = optparse.OptionParser(usage=usage, version='%prog 1.0b2')
options, args = parser.parse_args()
if len(args) != 1:
parser.error("Must provide a filename.")
lstats = load_stats(args[0])
show_text(lstats.timings, lstats.unit)
if __name__ == '__main__':
main()