提交 57615e21 authored 作者: Brandon T. Willard's avatar Brandon T. Willard 提交者: Thomas Wiecki

Remove unused theano.misc files

上级 67015ca9
#!/usr/bin/env python
import sys
def filter_output(fd_in):
s = ""
for line in fd_in:
toks = line.split()
if len(toks):
if toks[0] == "File" and toks[-1].startswith("test"):
s += line
elif toks[0].startswith("ImportError"):
s += line
elif toks[0] in [
"KnownFailureTest:",
"Exception:",
"Failure:",
"AssertionError",
"AssertionError:",
"GradientError:",
]:
s += line
elif toks[0] == "Executing" and toks[1] in ["tests", "nosetests"]:
s += line
return s
if __name__ == "__main__":
import pdb
pdb.set_trace()
if len(sys.argv) > 1:
with open(sys.argv[1]) as f:
print(filter_output(f))
else:
print(filter_output(sys.stdin))
#!/bin/bash
date
START=`date +%s`
ARGS="$@"
PROFILING=""
RELEASE=""
# If not jenkins, set workspace to local Tmp
if [ -v $WORKSPACE ]; then
if [ -v $TMPDIR ]; then
TMPDIR=/tmp
fi
WORKSPACE=$TMPDIR
fi
echo "Hostname:"
hostname
if [ "$1" == "--release" ]; then
RELEASE="True"
shift
ARGS="$@"
fi
if [ "$1" == "--buildbot" ]; then
shift
ARGS="$@"
BASE_COMPILEDIR=$WORKSPACE/compile/theano_compile_dir_theano
ROOT_CWD=$WORKSPACE/nightly_build
FLAGS=base_compiledir=$BASE_COMPILEDIR
COMPILEDIR=`THEANO_FLAGS=$FLAGS python -c "from __future__ import print_function; import theano; print(theano.config.compiledir)"`
cd ${ROOT_CWD}/Theano
git rev-parse HEAD
#Run tests from inside the Theano directory to prevent import problem.
# PROFILING="--with-coverage --cover-package=theano"
export PYTHONPATH=${ROOT_CWD}:$PYTHONPATH
else
ROOT_CWD=.
COMPILEDIR=`python -c "from __future__ import print_function; import theano; print(theano.config.compiledir)"|tail -1`
fi
# tests xunit for test profiling
XUNIT="--with-xunit --xunit-file="
echo "Number of elements in the compiledir:"
ls ${COMPILEDIR}|wc -l
# We don't want warnings in the buildbot for errors already fixed.
FLAGS=${THEANO_FLAGS},warn__argmax_pushdown_bug=False,warn__gpusum_01_011_0111_bug=False,warn__sum_sum_bug=False,warn__sum_div_dimshuffle_bug=False,warn__subtensor_merge_bug=False,$FLAGS
# We want to see correctly optimization/shape errors, so make make them raise an
# error.
FLAGS=on_opt_error=raise,$FLAGS
FLAGS=on_shape_error=raise,$FLAGS
# Ignore user device and floatX config, because:
# 1. Tests are intended to be run with device=cpu.
# 2. We explicitly add 'floatX=float32' in one run of the test suite below,
# while we want all other runs to run with 'floatX=float64'.
FLAGS=${FLAGS},device=cpu,floatX=float64
if [ "$RELEASE" ]; then
echo "Executing tests with default mode and compute_test_value"
date
THEANO_FLAGS=${FLAGS},compute_test_value=ignore pytest ${ARGS}
echo "Number of elements in the compiledir:"
ls ${COMPILEDIR}|wc -l
echo
echo "Executing tests with linker=vm,floatX=float32"
echo "THEANO_FLAGS=${FLAGS},linker=vm,floatX=float32 pytest ${ARGS}"
date
THEANO_FLAGS=${FLAGS},linker=vm,floatX=float32 pytest ${ARGS}
echo "Number of elements in the compiledir:"
ls ${COMPILEDIR}|wc -l
echo
echo "Executing tests with cxx="
echo "THEANO_FLAGS=${FLAGS},cxx= pytest ${ARGS}"
date
THEANO_FLAGS=${FLAGS},cxx= pytest ${ARGS}
echo "Number of elements in the compiledir:"
ls ${COMPILEDIR}|wc -l
echo
fi
echo "Executing tests with mode=FAST_RUN"
FILE=${ROOT_CWD}/theano_fastrun_tests.xml
echo "THEANO_FLAGS=cmodule__warn_no_version=True,${FLAGS},mode=FAST_RUN pytest ${PROFILING} ${ARGS} ${XUNIT}${FILE}"
date
THEANO_FLAGS=cmodule__warn_no_version=True,${FLAGS},mode=FAST_RUN pytest ${PROFILING} ${ARGS} ${XUNIT}${FILE}
echo "Number of elements in the compiledir:"
ls ${COMPILEDIR}|wc -l
echo
echo "Executing tests with mode=FAST_RUN,floatX=float32"
FILE=${ROOT_CWD}/theano_fastrun_float32_tests.xml
echo "THEANO_FLAGS=${FLAGS},mode=FAST_RUN,floatX=float32 pytest ${ARGS} ${XUNIT}${FILE}"
date
THEANO_FLAGS=${FLAGS},mode=FAST_RUN,floatX=float32 pytest ${ARGS} ${XUNIT}${FILE}
echo "Number of elements in the compiledir:"
ls ${COMPILEDIR}|wc -l
echo
echo "Executing tests with linker=vm,vm__lazy=True,floatX=float32"
FILE=${ROOT_CWD}/theano_fastrun_float32_lazyvm_tests.xml
echo "THEANO_FLAGS=${FLAGS},linker=vm,vm__lazy=True,floatX=float32 pytest ${ARGS} ${XUNIT}${FILE}"
date
THEANO_FLAGS=${FLAGS},linker=vm,vm__lazy=True,floatX=float32 pytest ${ARGS} ${XUNIT}${FILE}
echo "Number of elements in the compiledir:"
ls ${COMPILEDIR}|wc -l
echo
#we change the seed and record it everyday to test different combination. We record it to be able to reproduce bug caused by different seed. We don't want multiple test in DEBUG_MODE each day as this take too long.
seed=$RANDOM
echo "Executing tests with mode=DEBUG_MODE with seed of the day $seed"
FILE=${ROOT_CWD}/theano_debug_tests.xml
echo "THEANO_FLAGS=${FLAGS},unittests__rseed=$seed,mode=DEBUG_MODE,DebugMode__check_strides=0,DebugMode__patience=3,DebugMode__check_preallocated_output= pytest ${ARGS} ${XUNIT}${FILE}"
date
THEANO_FLAGS=${FLAGS},unittests__rseed=$seed,mode=DEBUG_MODE,DebugMode__check_strides=0,DebugMode__patience=3,DebugMode__check_preallocated_output= pytest ${ARGS} ${XUNIT}${FILE}
echo "Number of elements in the compiledir:"
ls ${COMPILEDIR}|wc -l
echo
#We put this at the end as it have a tendency to loop infinitly.
#Until we fix the root of the problem we let the rest run, then we can kill this one in the morning.
# with --batch=1000" # The buildbot freeze sometimes when collecting the tests to run
echo "Executing tests with mode=FAST_COMPILE"
FILE=${ROOT_CWD}/theano_fastcompile_tests.xml
echo "THEANO_FLAGS=${FLAGS},mode=FAST_COMPILE pytest ${ARGS} ${XUNIT}${FILE}"
date
THEANO_FLAGS=${FLAGS},mode=FAST_COMPILE pytest ${ARGS} ${XUNIT}${FILE}
echo "Number of elements in the compiledir:"
ls ${COMPILEDIR}|wc -l
echo
echo
END=`date +%s`
python -c "from __future__ import print_function; print('Total test time: %dm %ds'%((${END} - ${START})/60, (${END} - ${START})%60))"
date
#!/bin/env python
# Import smtplib for the actual sending function
import os.path
import smtplib
import sys
from theano.misc.buildbot_filter import filter_output
# me == the sender's email address
# family = the list of all recipients' email addresses
family=['theano-buildbot@googlegroups.com']
me='lisa@iro.umontreal.ca'
#Those file contain the output of the do_nightly_build script.
files=["do_nightly_build_theano.log",
"do_nightly_build_deeplearning.log",
"do_nightly_build_theano_python3.3.0.log",
]
msgs=['Theano buildbot',
'Deep Learning Tutorial buildbot',
'Theano Python3.3.0 buildbot']
print('files', files)
print("msgs", msgs)
print("args", sys.argv)
if len(sys.argv) == 3:
#We send just a file with a message
files = [sys.argv[1]]
msgs = [sys.argv[2]]
elif len(sys.argv) == 2:
#This is a prefix where the output files are
files=[os.path.join(sys.argv[1], x) for x in files]
else:
files=[os.path.join('/tmp', x) for x in files]
print('path', files)
from email.mime.multipart import MIMEMultipart
# Here are the email package modules we'll need
from email.mime.text import MIMEText
COMMASPACE = ', '
def mysend(subject, file):
# Create the container (outer) email message.
if not os.path.isfile(file):
print("Error: no file", file)
return
msg = MIMEMultipart()
msg['From'] = me
msg['To'] = COMMASPACE.join(family)
msg.preamble = 'The output of the buildbot'
# Open the files in binary mode. Let the MIMEImage class automatically
# guess the specific image type.
with open(file, 'rb') as fp:
s=fp.read()
failures=0
errors=0
ran=False
nb_ran=0
skip=0
speed_failure=0
show_speed_failure=False
knownfail=0
gpu_time = None
float32_time = None
float64_time = None
for token in s.split():
token=token.strip('(,)')
if token.startswith("failures="):
failures+=int(token[9:])
elif token.startswith("errors="):
errors+=int(token[+7:])
elif token == "Ran":
ran=True
elif token.startswith("SKIP="):
skip+=int(token[5:])
elif token == "KnownFailureTest:":
# This means that KnownFailure plugin is off,
# so knownfails are also counted as errors
knownfail+=1
errors-=1
elif token.startswith("KNOWNFAIL="):
knownfail += int(token.split('=')[1])
elif token.startswith("speed_failure_"):
speed_failure+=int(token.split('=')[1])
show_speed_failure=True
elif ran:
ran=False
try:
nb_ran+=int(token)
except Exception as e:
print(e)
start = ""
for line in s.splitlines():
if gpu_time is None and line.startswith("gpu % expected/get"):
start=line
elif float32_time is None and line.startswith("float32 % expected/get"):
start=line
elif float64_time is None and line.startswith("float64 % expected/get"):
start=line
elif start:
start+=line
if start[-1]=="]":
if start.startswith("gpu % expected/get"):
gpu_time = start
start = ""
elif start.startswith("float32 % expected/get"):
float32_time = start
start = ""
elif start.startswith("float64 % expected/get"):
float64_time = start
start = ""
s = ("Summary of the output:\n\n" + filter_output(open(file)) +
"\n\nFull output:\n\n" + s)
img = MIMEText(s)
msg.attach(img)
# Send the email via our own SMTP server.
if show_speed_failure:
msg['Subject'] = subject+" Fail="+str(failures)+" Err="+str(errors)+" Ran="+str(nb_ran)+" Skip="+str(skip)+" KnownFail="+str(knownfail)+ " SpeedFailure="+str(speed_failure)
else:
msg['Subject'] = subject+" Fail="+str(failures)+" Err="+str(errors)+" Ran="+str(nb_ran)+" Skip="+str(skip)+" KnownFail="+str(knownfail)
print(msg['Subject'])
s = smtplib.SMTP()
s.connect()
s.sendmail(me, family, msg.as_string())
s.close()
print("Finished sending email for", subject)
for msg, file in zip(msgs, files):
mysend(msg, file)
"""Functions for Github authorisation."""
try:
input = raw_input
except NameError:
pass
import getpass
import json
import requests
# Keyring stores passwords by a 'username', but we're not storing a username and
# password
fake_username = "ipython_tools"
token = None
def get_auth_token():
global token
if token is not None:
return token
import keyring
token = keyring.get_password("github", fake_username)
if token is not None:
return token
print(
"Please enter your github username and password. These are not "
"stored, only used to get an oAuth token. You can revoke this at "
"any time on Github."
)
user = input("Username: ")
pw = getpass.getpass("Password: ")
auth_request = {
"scopes": ["public_repo", "gist"],
"note": "IPython tools",
"note_url": "https://github.com/ipython/ipython/tree/master/tools",
}
response = requests.post(
"https://api.github.com/authorizations",
auth=(user, pw),
data=json.dumps(auth_request),
)
response.raise_for_status()
token = json.loads(response.text)["token"]
keyring.set_password("github", fake_username, token)
return token
def make_auth_header():
return {"Authorization": "token " + get_auth_token()}
def post_issue_comment(project, num, body):
url = f"https://api.github.com/repos/{project}/issues/{num}/comments"
payload = json.dumps({"body": body})
requests.post(url, data=payload, headers=make_auth_header())
def post_gist(content, description="", filename="file", auth=False):
"""Post some text to a Gist, and return the URL."""
post_data = json.dumps(
{
"description": description,
"public": True,
"files": {filename: {"content": content}},
}
).encode("utf-8")
headers = make_auth_header() if auth else {}
response = requests.post(
"https://api.github.com/gists", data=post_data, headers=headers
)
response.raise_for_status()
response_data = json.loads(response.text)
return response_data["html_url"]
def get_pull_request(project, num, github_api=3):
"""get pull request info by number
github_api : version of github api to use
"""
if github_api == 2:
url = f"http://github.com/api/v2/json/pulls/{project}/{num}"
elif github_api == 3:
url = f"https://api.github.com/repos/{project}/pulls/{num}"
response = requests.get(url)
response.raise_for_status()
if github_api == 2:
return json.loads(response.text)["pull"]
return json.loads(response.text)
def get_pulls_list(project, github_api=3):
"""get pull request list
github_api : version of github api to use
"""
if github_api == 3:
url = f"https://api.github.com/repos/{project}/pulls"
else:
url = f"http://github.com/api/v2/json/pulls/{project}"
response = requests.get(url)
response.raise_for_status()
if github_api == 2:
return json.loads(response.text)["pulls"]
return json.loads(response.text)
#!/usr/bin/env python
__docformat__ = "restructuredtext en"
import difflib
import operator
import os
import string
import sys
import tabnanny
import tokenize
from subprocess import PIPE, Popen
try:
import argparse
except ImportError:
raise ImportError(
"check_whitespace.py need Python module argparse introduced in"
" Python 2.7. It is available in pypi for compatibility."
" You can install it with this command 'pip install argparse'"
)
from io import StringIO
import reindent
SKIP_WHITESPACE_CHECK_FILENAME = ".hg/skip_whitespace_check"
def get_parse_error(code):
"""
Checks code for ambiguous tabs or other basic parsing issues.
:param code: a string containing a file's worth of Python code
:returns: a string containing a description of the first parse error encountered,
or None if the code is ok
"""
# note that this uses non-public elements from stdlib's tabnanny, because tabnanny
# is (very frustratingly) written only to be used as a script, but using it that way
# in this context requires writing temporarily files, running subprocesses, blah blah blah
code_buffer = StringIO(code)
try:
tabnanny.process_tokens(tokenize.generate_tokens(code_buffer.readline))
except tokenize.TokenError as err:
return f"Could not parse code: {err}"
except IndentationError as err:
return f"Indentation error: {err}"
except tabnanny.NannyNag as err:
return f"Ambiguous tab at line {err.get_lineno()}; line is '{err.get_line()}'."
return None
def clean_diff_line_for_python_bug_2142(diff_line):
if diff_line.endswith("\n"):
return diff_line
else:
return diff_line + "\n\\ No newline at end of file\n"
def get_correct_indentation_diff(code, filename):
"""
Generate a diff to make code correctly indented.
:param code: a string containing a file's worth of Python code
:param filename: the filename being considered (used in diff generation only)
:returns: a unified diff to make code correctly indented, or
None if code is already correctedly indented
"""
code_buffer = StringIO(code)
output_buffer = StringIO()
reindenter = reindent.Reindenter(code_buffer)
reindenter.run()
reindenter.write(output_buffer)
reindent_output = output_buffer.getvalue()
output_buffer.close()
if code != reindent_output:
diff_generator = difflib.unified_diff(
code.splitlines(True),
reindent_output.splitlines(True),
fromfile=filename,
tofile=filename + " (reindented)",
)
# work around http://bugs.python.org/issue2142
diff_tuple = map(clean_diff_line_for_python_bug_2142, diff_generator)
diff = "".join(diff_tuple)
return diff
else:
return None
def is_merge():
parent2 = os.environ.get("HG_PARENT2", None)
return parent2 is not None and len(parent2) > 0
def parent_commit():
parent1 = os.environ.get("HG_PARENT1", None)
return parent1
class MercurialRuntimeError(Exception):
pass
def run_mercurial_command(hg_command):
hg_executable = os.environ.get("HG", "hg")
hg_command_tuple = hg_command.split()
hg_command_tuple.insert(0, hg_executable)
# If you install your own mercurial version in your home
# hg_executable does not always have execution permission.
if not os.access(hg_executable, os.X_OK):
hg_command_tuple.insert(0, sys.executable)
try:
hg_subprocess = Popen(hg_command_tuple, stdout=PIPE, stderr=PIPE)
except OSError as e:
print("Can't find the hg executable!", file=sys.stderr)
print(e)
sys.exit(1)
hg_out, hg_err = hg_subprocess.communicate()
if len(hg_err) > 0:
raise MercurialRuntimeError(hg_err)
return hg_out
def parse_stdout_filelist(hg_out_filelist):
files = hg_out_filelist.split()
files = [f.strip(string.whitespace + "'") for f in files]
files = list(filter(operator.truth, files)) # get rid of empty entries
return files
def changed_files():
hg_out = run_mercurial_command("tip --template '{file_mods}'")
return parse_stdout_filelist(hg_out)
def added_files():
hg_out = run_mercurial_command("tip --template '{file_adds}'")
return parse_stdout_filelist(hg_out)
def is_python_file(filename):
return filename.endswith(".py")
def get_file_contents(filename, revision="tip"):
hg_out = run_mercurial_command(f"cat -r {revision} {filename}")
return hg_out
def save_commit_message(filename):
commit_message = run_mercurial_command("tip --template '{desc}'")
with open(filename, "w") as save_file:
save_file.write(commit_message)
def save_diffs(diffs, filename):
diff = "\n\n".join(diffs)
with open(filename, "w") as diff_file:
diff_file.write(diff)
def should_skip_commit():
if not os.path.exists(SKIP_WHITESPACE_CHECK_FILENAME):
return False
with open(SKIP_WHITESPACE_CHECK_FILENAME) as whitespace_check_file:
whitespace_check_changeset = whitespace_check_file.read()
return whitespace_check_changeset == parent_commit()
def save_skip_next_commit():
with open(SKIP_WHITESPACE_CHECK_FILENAME, "w") as whitespace_check_file:
whitespace_check_file.write(parent_commit())
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(
description="Pretxncommit hook for Mercurial to check for whitespace issues"
)
parser.add_argument(
"-n",
"--no-indentation",
action="store_const",
default=False,
const=True,
help="don't check indentation, just basic parsing",
)
parser.add_argument(
"-i",
"--incremental",
action="store_const",
default=False,
const=True,
help="only block on newly introduced indentation problems; ignore all others",
)
parser.add_argument(
"-p",
"--incremental-with-patch",
action="store_const",
default=False,
const=True,
help="only block on newly introduced indentation problems; propose a patch for all others",
)
parser.add_argument(
"-s",
"--skip-after-failure",
action="store_const",
default=False,
const=True,
help="when this pre-commit hook fails, don't run it on the next commit; "
"this lets you check in your changes and then check in "
"any necessary whitespace changes in the subsequent commit",
)
args = parser.parse_args(argv)
# -i and -s are incompatible; if you skip checking, you end up with a not-correctly-indented
# file, which -i then causes you to ignore!
if args.skip_after_failure and args.incremental:
print(
"*** check whitespace hook misconfigured! -i and -s are incompatible.",
file=sys.stderr,
)
return 1
if is_merge():
# don't inspect merges: (a) they're complex and (b) they don't really introduce new code
return 0
if args.skip_after_failure and should_skip_commit():
# we're set up to skip this one, so skip it, but
# first, make sure we don't skip the next one as well :)
os.remove(SKIP_WHITESPACE_CHECK_FILENAME)
return 0
block_commit = False
diffs = []
added_filenames = added_files()
changed_filenames = changed_files()
for filename in filter(is_python_file, added_filenames + changed_filenames):
code = get_file_contents(filename)
parse_error = get_parse_error(code)
if parse_error is not None:
print(
f"*** {filename} has parse error: {parse_error}",
file=sys.stderr,
)
block_commit = True
else:
# parsing succeeded, it is safe to check indentation
if not args.no_indentation:
was_clean = None # unknown
# only calculate was_clean if it will matter to us
if args.incremental or args.incremental_with_patch:
if filename in changed_filenames:
old_file_contents = get_file_contents(
filename, revision=parent_commit()
)
was_clean = (
get_correct_indentation_diff(old_file_contents, "") is None
)
else:
was_clean = True # by default -- it was newly added and thus had no prior problems
check_indentation = was_clean or not args.incremental
if check_indentation:
indentation_diff = get_correct_indentation_diff(code, filename)
if indentation_diff is not None:
if was_clean or not args.incremental_with_patch:
block_commit = True
diffs.append(indentation_diff)
print(f"{filename} is not correctly indented", file=sys.stderr)
if len(diffs) > 0:
diffs_filename = ".hg/indentation_fixes.patch"
save_diffs(diffs, diffs_filename)
print(
f"*** To fix all indentation issues, run: cd `hg root` && patch -p0 < {diffs_filename}",
file=sys.stderr,
)
if block_commit:
save_filename = ".hg/commit_message.saved"
save_commit_message(save_filename)
print(f"*** Commit message saved to {save_filename}", file=sys.stderr)
if args.skip_after_failure:
save_skip_next_commit()
print(
f"*** Next commit attempt will not be checked. To change this, rm {SKIP_WHITESPACE_CHECK_FILENAME}",
file=sys.stderr,
)
return int(block_commit)
if __name__ == "__main__":
sys.exit(main())
#! /usr/bin/env python
# Released to the public domain, by Tim Peters, 03 October 2000.
"""reindent [-d][-r][-v] [ path ... ]
-d (--dryrun) Dry run. Analyze, but don't make any changes to, files.
-r (--recurse) Recurse. Search for all .py files in subdirectories too.
-n (--nobackup) No backup. Does not make a ".bak" file before reindenting.
-v (--verbose) Verbose. Print informative msgs; else no output.
-h (--help) Help. Print this usage information and exit.
Change Python (.py) files to use 4-space indents and no hard tab characters.
Also trim excess spaces and tabs from ends of lines, and remove empty lines
at the end of files. Also ensure the last line ends with a newline.
If no paths are given on the command line, reindent operates as a filter,
reading a single source file from standard input and writing the transformed
source to standard output. In this case, the -d, -r and -v flags are
ignored.
You can pass one or more file and/or directory paths. When a directory
path, all .py files within the directory will be examined, and, if the -r
option is given, likewise recursively for subdirectories.
If output is not to standard output, reindent overwrites files in place,
renaming the originals with a .bak extension. If it finds nothing to
change, the file is left alone. If reindent does change a file, the changed
file is a fixed-point for future runs (i.e., running reindent on the
resulting .py file won't change it again).
The hard part of reindenting is figuring out what to do with comment
lines. So long as the input files get a clean bill of health from
tabnanny.py, reindent should do a good job.
The backup file is a copy of the one that is being reindented. The ".bak"
file is generated with shutil.copy(), but some corner cases regarding
user/group and permissions could leave the backup file more readable that
you'd prefer. You can always use the --nobackup option to prevent this.
"""
__version__ = "1"
import os
import shutil
import sys
import tokenize
verbose = 0
recurse = 0
dryrun = 0
makebackup = True
def usage(msg=None):
if msg is not None:
print(msg, file=sys.stderr)
print(__doc__, file=sys.stderr)
def errprint(*args):
sep = ""
for arg in args:
sys.stderr.write(sep + str(arg))
sep = " "
sys.stderr.write("\n")
def main():
import getopt
global verbose, recurse, dryrun, makebackup
try:
opts, args = getopt.getopt(
sys.argv[1:], "drnvh", ["dryrun", "recurse", "nobackup", "verbose", "help"]
)
except getopt.error as msg:
usage(msg)
return
for o, a in opts:
if o in ("-d", "--dryrun"):
dryrun += 1
elif o in ("-r", "--recurse"):
recurse += 1
elif o in ("-n", "--nobackup"):
makebackup = False
elif o in ("-v", "--verbose"):
verbose += 1
elif o in ("-h", "--help"):
usage()
return
if not args:
r = Reindenter(sys.stdin)
r.run()
r.write(sys.stdout)
return
for arg in args:
check(arg)
def check(file):
if os.path.isdir(file) and not os.path.islink(file):
if verbose:
print("listing directory", file)
names = os.listdir(file)
for name in names:
fullname = os.path.join(file, name)
if (
recurse
and os.path.isdir(fullname)
and not os.path.islink(fullname)
and not os.path.split(fullname)[1].startswith(".")
) or name.lower().endswith(".py"):
check(fullname)
return
if verbose:
print("checking", file, "...", end=" ")
try:
f = open(file)
except OSError as msg:
errprint(f"{file}: I/O Error: {msg}")
return
r = Reindenter(f)
f.close()
if r.run():
if verbose:
print("changed.")
if dryrun:
print("But this is a dry run, so leaving it alone.")
if not dryrun:
bak = file + ".bak"
if makebackup:
shutil.copyfile(file, bak)
if verbose:
print("backed up", file, "to", bak)
with open(file, "w") as f:
r.write(f)
if verbose:
print("wrote new", file)
return True
else:
if verbose:
print("unchanged.")
return False
def _rstrip(line, JUNK="\n \t"):
"""Return line stripped of trailing spaces, tabs, newlines.
Note that line.rstrip() instead also strips sundry control characters,
but at least one known Emacs user expects to keep junk like that, not
mentioning Barry by name or anything <wink>.
"""
i = len(line)
while i > 0 and line[i - 1] in JUNK:
i -= 1
return line[:i]
class Reindenter:
def __init__(self, f):
self.find_stmt = 1 # next token begins a fresh stmt?
self.level = 0 # current indent level
# Raw file lines.
self.raw = f.readlines()
# File lines, rstripped & tab-expanded. Dummy at start is so
# that we can use tokenize's 1-based line numbering easily.
# Note that a line is all-blank iff it's "\n".
self.lines = [_rstrip(line).expandtabs() + "\n" for line in self.raw]
self.lines.insert(0, None)
self.index = 1 # index into self.lines of next line
# List of (lineno, indentlevel) pairs, one for each stmt and
# comment line. indentlevel is -1 for comment lines, as a
# signal that tokenize doesn't know what to do about them;
# indeed, they're our headache!
self.stats = []
def run(self):
tokenize.tokenize(self.getline, self.tokeneater)
# Remove trailing empty lines.
lines = self.lines
while lines and lines[-1] == "\n":
lines.pop()
# Sentinel.
stats = self.stats
stats.append((len(lines), 0))
# Map count of leading spaces to # we want.
have2want = {}
# Program after transformation.
after = self.after = []
# Copy over initial empty lines -- there's nothing to do until
# we see a line with *something* on it.
i = stats[0][0]
after.extend(lines[1:i])
for i in range(len(stats) - 1):
thisstmt, thislevel = stats[i]
nextstmt = stats[i + 1][0]
have = getlspace(lines[thisstmt])
want = thislevel * 4
if want < 0:
# A comment line.
if have:
# An indented comment line. If we saw the same
# indentation before, reuse what it most recently
# mapped to.
want = have2want.get(have, -1)
if want < 0:
# Then it probably belongs to the next real stmt.
for j in range(i + 1, len(stats) - 1):
jline, jlevel = stats[j]
if jlevel >= 0:
if have == getlspace(lines[jline]):
want = jlevel * 4
break
if want < 0: # Maybe it's a hanging
# comment like this one,
# in which case we should shift it like its base
# line got shifted.
for j in range(i - 1, -1, -1):
jline, jlevel = stats[j]
if jlevel >= 0:
want = (
have
+ getlspace(after[jline - 1])
- getlspace(lines[jline])
)
break
if want < 0:
# Still no luck -- leave it alone.
want = have
else:
want = 0
assert want >= 0
have2want[have] = want
diff = want - have
if diff == 0 or have == 0:
after.extend(lines[thisstmt:nextstmt])
else:
for line in lines[thisstmt:nextstmt]:
if diff > 0:
if line == "\n":
after.append(line)
else:
after.append(" " * diff + line)
else:
remove = min(getlspace(line), -diff)
after.append(line[remove:])
return self.raw != self.after
def write(self, f):
f.writelines(self.after)
# Line-getter for tokenize.
def getline(self):
if self.index >= len(self.lines):
line = ""
else:
line = self.lines[self.index]
self.index += 1
return line
# Line-eater for tokenize.
def tokeneater(
self,
type,
token,
pos,
end,
line,
INDENT=tokenize.INDENT,
DEDENT=tokenize.DEDENT,
NEWLINE=tokenize.NEWLINE,
COMMENT=tokenize.COMMENT,
NL=tokenize.NL,
):
sline, scol = pos
if type == NEWLINE:
# A program statement, or ENDMARKER, will eventually follow,
# after some (possibly empty) run of tokens of the form
# (NL | COMMENT)* (INDENT | DEDENT+)?
self.find_stmt = 1
elif type == INDENT:
self.find_stmt = 1
self.level += 1
elif type == DEDENT:
self.find_stmt = 1
self.level -= 1
elif type == COMMENT:
if self.find_stmt:
self.stats.append((sline, -1))
# but we're still looking for a new stmt, so leave
# find_stmt alone
elif type == NL:
pass
elif self.find_stmt:
# This is the first "real token" following a NEWLINE, so it
# must be the first token of the next program statement, or an
# ENDMARKER.
self.find_stmt = 0
if line: # not endmarker
self.stats.append((sline, self.level))
# Count number of leading blanks.
def getlspace(line):
i, n = 0, len(line)
while i < n and line[i] == " ":
i += 1
return i
if __name__ == "__main__":
main()
def render_string(string, sub):
"""
string: a string, containing formatting instructions
sub: a dictionary containing keys and values to substitute for
them.
returns: string % sub
The only difference between this function and the % operator
is that it raises an exception with a more informative error
message than the % operator does.
"""
try:
finalCode = string % sub
except Exception as E:
# If unable to render the string, render longer and longer
# initial substrings until we find the minimal initial substring
# that causes an error
i = 0
while i <= len(string):
try:
finalCode = string[0:i] % sub
except Exception as F:
if str(F) == str(E):
raise Exception(string[0:i] + "<<<< caused exception " + str(F))
i += 1
raise AssertionError()
return finalCode
def pretty_format(string):
lines = string.split("\n")
lines = [strip_leading_white_space(line) for line in lines]
indent = 0
for i in range(len(lines)):
indent -= lines[i].count("}")
if indent < 0:
indent = 0
#
lines[i] = (" " * indent) + lines[i]
indent += lines[i].count("{")
#
rval = "\n".join(lines)
return rval
def strip_leading_white_space(line):
while len(line) > 0 and (line[0] == " " or line[0] == "\t"):
line = line[1:]
return line
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论