k now... emacs>vim

This commit is contained in:
TuDatTr
2017-12-19 22:52:42 +01:00
parent d701704955
commit df3a90ee6b
1500 changed files with 44508 additions and 0 deletions

View File

@@ -0,0 +1,41 @@
# Elpy, the Emacs Lisp Python Environment
# Copyright (C) 2013-2016 Jorgen Schaefer
# Author: Jorgen Schaefer <contact@jorgenschaefer.de>
# URL: http://github.com/jorgenschaefer/elpy
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""The Emacs Lisp Python Environment.
Elpy is a mode for Emacs to support writing Python code. This package
provides the backend within Python to support auto-completion,
documentation extraction, and navigation.
Emacs will start the protocol by running the module itself, like so:
python -m elpy
This will emit a greeting string on a single line, and then wait for
the protocol to start. Details of the protocol can be found in
elpy.rpc.
This package is unlikely to be useful on its own.
"""
__author__ = "Jorgen Schaefer"
__version__ = "1.17.0"
__license__ = "GPL"

View File

@@ -0,0 +1,25 @@
"""Main interface to the RPC server.
You should be able to just run the following to use this module:
python -m elpy
The first line should be "elpy-rpc ready". If it isn't, something
broke.
"""
import os
import sys
import elpy
from elpy.server import ElpyRPCServer
if __name__ == '__main__':
stdin = sys.stdin
stdout = sys.stdout
sys.stdout = sys.stderr = open(os.devnull, "w")
stdout.write('elpy-rpc ready ({0})\n'
.format(elpy.__version__))
stdout.flush()
ElpyRPCServer(stdin, stdout).serve_forever()

View File

@@ -0,0 +1,21 @@
"""Glue for the "autopep8" library.
"""
from elpy.rpc import Fault
try:
import autopep8
except ImportError: # pragma: no cover
autopep8 = None
def fix_code(code):
"""Formats Python code to conform to the PEP 8 style guide.
"""
if not autopep8:
raise Fault('autopep8 not installed, cannot fix code.',
code=400)
return autopep8.fix_code(code, apply_config=True)

View File

@@ -0,0 +1,33 @@
"""Python 2/3 compatibility definitions.
These are used by the rest of Elpy to keep compatibility definitions
in one place.
"""
import sys
if sys.version_info >= (3, 0):
PYTHON3 = True
from io import StringIO
def ensure_not_unicode(obj):
return obj
else:
PYTHON3 = False
from StringIO import StringIO # noqa
def ensure_not_unicode(obj):
"""Return obj. If it's a unicode string, convert it to str first.
Pydoc functions simply don't find anything for unicode
strings. No idea why.
"""
if isinstance(obj, unicode):
return obj.encode("utf-8")
else:
return obj

View File

@@ -0,0 +1,118 @@
"""Glue for the "importmagic" library.
"""
import os
import sys
import threading
try:
import importmagic.index
import importmagic.symbols
import importmagic.importer
except ImportError: # pragma: no cover
importmagic = None
class ImportMagicError(Exception):
"""Used to pass defined errors from importmagic to the RPC layer."""
class ImportMagic(object):
def __init__(self):
self.is_enabled = bool(importmagic)
# fail_message is reported to the user when symbol_index
# is (still) None
self.fail_message = "symbol index is not yet ready"
self.project_root = None
self.symbol_index = None
self.favorites = set()
self._thread = None
def _build_symbol_index(self, project_root, custom_path, blacklist_re):
try:
index = importmagic.index.SymbolIndex(blacklist_re=blacklist_re)
if os.environ.get('ELPY_TEST'):
# test suite support: do not index the whole PYTHONPATH, it
# takes much too long
index.build_index([])
elif custom_path:
index.build_index(custom_path)
else:
index.build_index([project_root] + sys.path)
except Exception as e:
self.fail_message = "symbol index failed to build: %s" % e
else:
self.symbol_index = index
def build_index(self, project_root, custom_path=None, blacklist_re=None):
self.project_root = None
self._thread = threading.Thread(target=self._build_symbol_index,
args=(project_root, custom_path,
blacklist_re))
self._thread.setDaemon(True)
self._thread.start()
def get_import_symbols(self, symbol):
scores = self.symbol_index.symbol_scores(symbol)
def sort_key(item):
score, mod, var = item
if mod in self.favorites:
return 2 + score, mod, var
return score, mod, var
scores.sort(key=sort_key, reverse=True)
return ["from %s import %s" % (mod, var) if var else "import %s" % mod
for (_, mod, var) in scores]
def add_import(self, source, statement):
imports = importmagic.importer.Imports(self.symbol_index, source)
if statement.startswith('import '):
sepalias = None
alias = None
if ' as ' in statement:
sepalias = statement.find(' as ')
alias = statement[sepalias+4:]
modname = statement[7:sepalias]
imports.add_import(modname, alias)
self.favorites.add(modname)
else:
sep = statement.find(' import ')
sepalias = None
alias = None
if ' as ' in statement:
sepalias = statement.find(' as ')
alias = statement[sepalias+4:]
modname = statement[5:sep]
name = statement[sep+8:sepalias]
if sep > -1:
self.favorites.add(modname)
imports.add_import_from(modname, name, alias)
start_line, end_line, import_block = imports.get_update()
return start_line, end_line, import_block
def get_unresolved_symbols(self, source):
try:
scope = importmagic.symbols.Scope.from_source(source)
except SyntaxError:
raise ImportMagicError('cannot find unresolved names in '
'incomplete file')
unres, unref = scope.find_unresolved_and_unreferenced_symbols()
return list(unres)
def remove_unreferenced_imports(self, source):
try:
scope = importmagic.symbols.Scope.from_source(source)
except SyntaxError:
raise ImportMagicError('cannot find unreferenced imports in '
'incomplete file')
unres, unref = scope.find_unresolved_and_unreferenced_symbols()
# Note: we do not supply "unres" to the call below, since we do
# not want to add imports without querying the user from which
# module symbols should be imported.
start_line, end_line, import_block = importmagic.importer.get_update(
source, self.symbol_index, set(), unref)
return start_line, end_line, import_block

View File

@@ -0,0 +1,397 @@
"""Elpy backend using the Jedi library.
This backend uses the Jedi library:
https://github.com/davidhalter/jedi
"""
import sys
import traceback
import jedi
from elpy import rpc
class JediBackend(object):
"""The Jedi backend class.
Implements the RPC calls we can pass on to Jedi.
Documentation: http://jedi.jedidjah.ch/en/latest/docs/plugin-api.html
"""
name = "jedi"
def __init__(self, project_root):
self.project_root = project_root
self.completions = {}
sys.path.append(project_root)
def rpc_get_completions(self, filename, source, offset):
line, column = pos_to_linecol(source, offset)
proposals = run_with_debug(jedi, 'completions',
source=source, line=line, column=column,
path=filename, encoding='utf-8')
if proposals is None:
return []
self.completions = dict((proposal.name, proposal)
for proposal in proposals)
return [{'name': proposal.name.rstrip("="),
'suffix': proposal.complete.rstrip("="),
'annotation': proposal.type,
'meta': proposal.description}
for proposal in proposals]
def rpc_get_completion_docstring(self, completion):
proposal = self.completions.get(completion)
if proposal is None:
return None
else:
return proposal.docstring(fast=False)
def rpc_get_completion_location(self, completion):
proposal = self.completions.get(completion)
if proposal is None:
return None
else:
return (proposal.module_path, proposal.line)
def rpc_get_docstring(self, filename, source, offset):
line, column = pos_to_linecol(source, offset)
locations = run_with_debug(jedi, 'goto_definitions',
source=source, line=line, column=column,
path=filename, encoding='utf-8')
if locations and locations[-1].docstring():
return ('Documentation for {0}:\n\n'.format(
locations[-1].full_name) + locations[-1].docstring())
else:
return None
def rpc_get_definition(self, filename, source, offset):
line, column = pos_to_linecol(source, offset)
locations = run_with_debug(jedi, 'goto_definitions',
source=source, line=line, column=column,
path=filename, encoding='utf-8')
# goto_definitions() can return silly stuff like __builtin__
# for int variables, so we fall back on goto() in those
# cases. See issue #76.
if (
locations and
locations[0].module_path is None
):
locations = run_with_debug(jedi, 'goto_assignments',
source=source, line=line,
column=column,
path=filename, encoding='utf-8')
if not locations:
return None
else:
loc = locations[-1]
try:
if loc.module_path:
if loc.module_path == filename:
offset = linecol_to_pos(source,
loc.line,
loc.column)
else:
with open(loc.module_path) as f:
offset = linecol_to_pos(f.read(),
loc.line,
loc.column)
else:
return None
except IOError:
return None
return (loc.module_path, offset)
def rpc_get_assignment(self, filename, source, offset):
line, column = pos_to_linecol(source, offset)
locations = run_with_debug(jedi, 'goto_assignments',
source=source, line=line, column=column,
path=filename, encoding='utf-8')
if not locations:
return None
else:
loc = locations[-1]
try:
if loc.module_path:
if loc.module_path == filename:
offset = linecol_to_pos(source,
loc.line,
loc.column)
else:
with open(loc.module_path) as f:
offset = linecol_to_pos(f.read(),
loc.line,
loc.column)
else:
return None
except IOError:
return None
return (loc.module_path, offset)
def rpc_get_calltip(self, filename, source, offset):
line, column = pos_to_linecol(source, offset)
calls = run_with_debug(jedi, 'call_signatures',
source=source, line=line, column=column,
path=filename, encoding='utf-8')
if calls:
call = calls[0]
else:
call = None
if not call:
return None
try:
call.index
except AttributeError as e:
if "get_definition" in str(e):
# Bug #627 / jedi#573
return None
elif "get_subscope_by_name" in str(e):
# Bug #677 / jedi#628
return None
else:
raise
return {"name": call.name,
"index": call.index,
"params": [param.description for param in call.params]}
def rpc_get_usages(self, filename, source, offset):
"""Return the uses of the symbol at offset.
Returns a list of occurrences of the symbol, as dicts with the
fields name, filename, and offset.
"""
line, column = pos_to_linecol(source, offset)
uses = run_with_debug(jedi, 'usages',
source=source, line=line, column=column,
path=filename, encoding='utf-8')
if uses is None:
return None
result = []
for use in uses:
if use.module_path == filename:
offset = linecol_to_pos(source, use.line, use.column)
elif use.module_path is not None:
with open(use.module_path) as f:
text = f.read()
offset = linecol_to_pos(text, use.line, use.column)
result.append({"name": use.name,
"filename": use.module_path,
"offset": offset})
return result
def rpc_get_names(self, filename, source, offset):
"""Return the list of possible names"""
# Remove form feed characters, they confuse Jedi (jedi#424)
source = source.replace("\f", " ")
names = jedi.api.names(source=source,
path=filename, encoding='utf-8',
all_scopes=True,
definitions=True,
references=True)
result = []
for name in names:
if name.module_path == filename:
offset = linecol_to_pos(source, name.line, name.column)
elif name.module_path is not None:
with open(name.module_path) as f:
text = f.read()
offset = linecol_to_pos(text, name.line, name.column)
result.append({"name": name.name,
"filename": name.module_path,
"offset": offset})
return result
# From the Jedi documentation:
#
# line is the current line you want to perform actions on (starting
# with line #1 as the first line). column represents the current
# column/indent of the cursor (starting with zero). source_path
# should be the path of your file in the file system.
def pos_to_linecol(text, pos):
"""Return a tuple of line and column for offset pos in text.
Lines are one-based, columns zero-based.
This is how Jedi wants it. Don't ask me why.
"""
line_start = text.rfind("\n", 0, pos) + 1
line = text.count("\n", 0, line_start) + 1
col = pos - line_start
return line, col
def linecol_to_pos(text, line, col):
"""Return the offset of this line and column in text.
Lines are one-based, columns zero-based.
This is how Jedi wants it. Don't ask me why.
"""
nth_newline_offset = 0
for i in range(line - 1):
new_offset = text.find("\n", nth_newline_offset)
if new_offset < 0:
raise ValueError("Text does not have {0} lines."
.format(line))
nth_newline_offset = new_offset + 1
offset = nth_newline_offset + col
if offset > len(text):
raise ValueError("Line {0} column {1} is not within the text"
.format(line, col))
return offset
def run_with_debug(jedi, name, *args, **kwargs):
re_raise = kwargs.pop('re_raise', ())
# Remove form feed characters, they confuse Jedi (jedi#424)
if 'source' in kwargs:
kwargs['source'] = kwargs['source'].replace("\f", " ")
try:
script = jedi.Script(*args, **kwargs)
return getattr(script, name)()
except Exception as e:
if isinstance(e, re_raise):
raise
# Bug jedi#417
if isinstance(e, TypeError) and str(e) == 'no dicts allowed':
return None
# Bug jedi#427
if isinstance(e, UnicodeDecodeError):
return None
# Bug jedi#429
if isinstance(e, IndexError):
return None
# Bug jedi#431
if isinstance(e, AttributeError) and str(e).endswith("'end_pos'"):
return None
# Bug in Python 2.6, see #275
if isinstance(e, OSError) and e.errno == 13:
return None
# Bug jedi#466
if (
isinstance(e, SyntaxError) and
"EOL while scanning string literal" in str(e)
):
return None
# Bug jedi#482
if isinstance(e, UnicodeEncodeError):
return None
# Bug jedi#485
if (
isinstance(e, ValueError) and
"invalid \\x escape" in str(e)
):
return None
# Bug jedi#485 in Python 3
if (
isinstance(e, SyntaxError) and
"truncated \\xXX escape" in str(e)
):
return None
# Bug jedi#465
if (
isinstance(e, SyntaxError) and
"encoding declaration in Unicode string" in str(e)
):
return None
# Bug #337 / jedi#471
if (
isinstance(e, ImportError) and
"No module named" in str(e)
):
return None
# Bug #365 / jedi#486 - fixed in Jedi 0.8.2
if (
isinstance(e, UnboundLocalError) and
"local variable 'path' referenced before assignment" in str(e)
):
return None
# Bug #366 / jedi#491
if (
isinstance(e, ValueError) and
"__loader__ is None" in str(e)
):
return None
# Bug #353
if (
isinstance(e, OSError) and
"No such file or directory" in str(e)
):
return None
# Bug #561, #564, #570, #588, #593, #599 / jedi#572, jedi#579, jedi#590
if isinstance(e, KeyError):
return None
# Bug #519 / jedi#610
if (
isinstance(e, RuntimeError) and
"maximum recursion depth exceeded" in str(e)
):
return None
# Bug #563 / jedi#589
if (
isinstance(e, AttributeError) and
"MergedNamesDict" in str(e)
):
return None
# Bug #615 / jedi#592
if (
isinstance(e, AttributeError) and
"ListComprehension" in str(e)
):
return None
# Bug #569 / jedi#593
if (
isinstance(e, AttributeError) and
"names_dict" in str(e)
):
return None
from jedi import debug
debug_info = []
def _debug(level, str_out):
if level == debug.NOTICE:
prefix = "[N]"
elif level == debug.WARNING:
prefix = "[W]"
else:
prefix = "[?]"
debug_info.append(u"{0} {1}".format(prefix, str_out))
jedi.set_debug_function(_debug, speed=False)
try:
script = jedi.Script(*args, **kwargs)
return getattr(script, name)()
except Exception as e:
source = kwargs.get('source')
sc_args = []
sc_args.extend(repr(arg) for arg in args)
sc_args.extend("{0}={1}".format(k, "source" if k == "source"
else repr(v))
for (k, v) in kwargs.items())
data = {
"traceback": traceback.format_exc(),
"jedi_debug_info": {'script_args': ", ".join(sc_args),
'source': source,
'method': name,
'debug_info': debug_info}
}
raise rpc.Fault(message=str(e),
code=500,
data=data)
finally:
jedi.set_debug_function(None)

View File

@@ -0,0 +1,91 @@
import sys
import types
from pydoc import safeimport, resolve, ErrorDuringImport
from pkgutil import iter_modules
from elpy import compat
# Types we want to recurse into (nodes).
CONTAINER_TYPES = (type, types.ModuleType)
# Types of attributes we can get documentation for (leaves).
PYDOC_TYPES = (type,
types.FunctionType,
types.BuiltinFunctionType,
types.BuiltinMethodType,
types.MethodType,
types.ModuleType)
if not compat.PYTHON3: # pragma: nocover
# Python 2 old style classes
CONTAINER_TYPES = tuple(list(CONTAINER_TYPES) + [types.ClassType])
PYDOC_TYPES = tuple(list(PYDOC_TYPES) + [types.ClassType])
def get_pydoc_completions(modulename):
"""Get possible completions for modulename for pydoc.
Returns a list of possible values to be passed to pydoc.
"""
modulename = compat.ensure_not_unicode(modulename)
modulename = modulename.rstrip(".")
if modulename == "":
return sorted(get_modules())
candidates = get_completions(modulename)
if candidates:
return sorted(candidates)
needle = modulename
if "." in needle:
modulename, part = needle.rsplit(".", 1)
candidates = get_completions(modulename)
else:
candidates = get_modules()
return sorted(candidate for candidate in candidates
if candidate.startswith(needle))
def get_completions(modulename):
modules = set("{0}.{1}".format(modulename, module)
for module in get_modules(modulename))
try:
module, name = resolve(modulename)
except ImportError:
return modules
if isinstance(module, CONTAINER_TYPES):
modules.update("{0}.{1}".format(modulename, name)
for name in dir(module)
if not name.startswith("_") and
isinstance(getattr(module, name),
PYDOC_TYPES))
return modules
def get_modules(modulename=None):
"""Return a list of modules and packages under modulename.
If modulename is not given, return a list of all top level modules
and packages.
"""
modulename = compat.ensure_not_unicode(modulename)
if not modulename:
try:
return ([modname for (importer, modname, ispkg)
in iter_modules()
if not modname.startswith("_")] +
list(sys.builtin_module_names))
except OSError:
# Bug in Python 2.6, see #275
return list(sys.builtin_module_names)
try:
module = safeimport(modulename)
except ErrorDuringImport:
return []
if module is None:
return []
if hasattr(module, "__path__"):
return [modname for (importer, modname, ispkg)
in iter_modules(module.__path__)
if not modname.startswith("_")]
return []

View File

@@ -0,0 +1,381 @@
"""Refactoring methods for elpy.
This interfaces directly with rope, regardless of the backend used,
because the other backends don't really offer refactoring choices.
Once Jedi is similarly featureful as Rope we can try and offer both.
# Too complex:
- Restructure: Interesting, but too complex, and needs deep Rope
knowledge to do well.
- ChangeSignature: Slightly less complex interface, but still to
complex, requiring a large effort for the benefit.
# Too useless:
I could not get these to work in any useful fashion. I might be doing
something wrong.
- ExtractVariable does not replace the code extracted with the
variable, making it a glorified copy&paste function. Emacs can do
better than this interface by itself.
- EncapsulateField: Getter/setter methods are outdated, this should be
using properties.
- IntroduceFactory: Inserts a trivial method to the current class.
Cute.
- IntroduceParameter: Introduces a parameter correctly, but does not
replace the old code with the parameter. So it just edits the
argument list and adds a shiny default.
- LocalToField: Seems to just add "self." in front of all occurrences
of a variable in the local scope.
- MethodObject: This turns the current method into a callable
class/object. Not sure what that would be good for.
# Can't even get to work:
- ImportOrganizer expand_star_imports, handle_long_imports,
relatives_to_absolutes: Seem not to do anything.
- create_move: I was not able to figure out what it would like to see
as its attrib argument.
"""
import os
from elpy.rpc import Fault
try:
from rope.base.exceptions import RefactoringError
from rope.base.project import Project
from rope.base.libutils import path_to_resource
from rope.base import change as rope_change
from rope.base import worder
from rope.refactor.importutils import ImportOrganizer
from rope.refactor.topackage import ModuleToPackage
from rope.refactor.rename import Rename
from rope.refactor.move import create_move
from rope.refactor.inline import create_inline
from rope.refactor.extract import ExtractMethod
from rope.refactor.usefunction import UseFunction
ROPE_AVAILABLE = True
except ImportError:
ROPE_AVAILABLE = False
def options(description, **kwargs):
"""Decorator to set some options on a method."""
def set_notes(function):
function.refactor_notes = {'name': function.__name__,
'category': "Miscellaneous",
'description': description,
'doc': getattr(function, '__doc__',
''),
'args': []}
function.refactor_notes.update(kwargs)
return function
return set_notes
class Refactor(object):
"""The main refactoring interface.
Once initialized, the first call should be to get_refactor_options
to get a list of refactoring options at a given position. The
returned value will also list any additional options required.
Once you picked one, you can call get_changes to get the actual
refactoring changes.
"""
def __init__(self, project_root, filename):
self.project_root = project_root
if not ROPE_AVAILABLE:
raise Fault('rope not installed, cannot refactor code.',
code=400)
if not os.path.exists(project_root):
raise Fault(
"cannot do refactoring without a local project root",
code=400
)
self.project = Project(project_root, ropefolder=None)
self.resource = path_to_resource(self.project, filename)
def get_refactor_options(self, start, end=None):
"""Return a list of options for refactoring at the given position.
If `end` is also given, refactoring on a region is assumed.
Each option is a dictionary of key/value pairs. The value of
the key 'name' is the one to be used for get_changes.
The key 'args' contains a list of additional arguments
required for get_changes.
"""
result = []
for symbol in dir(self):
if not symbol.startswith("refactor_"):
continue
method = getattr(self, symbol)
if not method.refactor_notes.get('available', True):
continue
category = method.refactor_notes['category']
if end is not None and category != 'Region':
continue
if end is None and category == 'Region':
continue
is_on_symbol = self._is_on_symbol(start)
if not is_on_symbol and category in ('Symbol', 'Method'):
continue
requires_import = method.refactor_notes.get('only_on_imports',
False)
if requires_import and not self._is_on_import_statement(start):
continue
result.append(method.refactor_notes)
return result
def _is_on_import_statement(self, offset):
"Does this offset point to an import statement?"
data = self.resource.read()
bol = data.rfind("\n", 0, offset) + 1
eol = data.find("\n", 0, bol)
if eol == -1:
eol = len(data)
line = data[bol:eol]
line = line.strip()
if line.startswith("import ") or line.startswith("from "):
return True
else:
return False
def _is_on_symbol(self, offset):
"Is this offset on a symbol?"
if not ROPE_AVAILABLE:
return False
data = self.resource.read()
if offset >= len(data):
return False
if data[offset] != '_' and not data[offset].isalnum():
return False
word = worder.get_name_at(self.resource, offset)
if word:
return True
else:
return False
def get_changes(self, name, *args):
"""Return a list of changes for the named refactoring action.
Changes are dictionaries describing a single action to be
taken for the refactoring to be successful.
A change has an action and possibly a type. In the description
below, the action is before the slash and the type after it.
change: Change file contents
- file: The path to the file to change
- contents: The new contents for the file
- Diff: A unified diff showing the changes introduced
create/file: Create a new file
- file: The file to create
create/directory: Create a new directory
- path: The directory to create
move/file: Rename a file
- source: The path to the source file
- destination: The path to the destination file name
move/directory: Rename a directory
- source: The path to the source directory
- destination: The path to the destination directory name
delete/file: Delete a file
- file: The file to delete
delete/directory: Delete a directory
- path: The directory to delete
"""
if not name.startswith("refactor_"):
raise ValueError("Bad refactoring name {0}".format(name))
method = getattr(self, name)
if not method.refactor_notes.get('available', True):
raise RuntimeError("Method not available")
return method(*args)
@options("Convert from x import y to import x.y as y", category="Imports",
args=[("offset", "offset", None)],
only_on_imports=True,
available=ROPE_AVAILABLE)
def refactor_froms_to_imports(self, offset):
"""Converting imports of the form "from ..." to "import ..."."""
refactor = ImportOrganizer(self.project)
changes = refactor.froms_to_imports(self.resource, offset)
return translate_changes(changes)
@options("Reorganize and clean up", category="Imports",
available=ROPE_AVAILABLE)
def refactor_organize_imports(self):
"""Clean up and organize imports."""
refactor = ImportOrganizer(self.project)
changes = refactor.organize_imports(self.resource)
return translate_changes(changes)
@options("Convert the current module into a package", category="Module",
available=ROPE_AVAILABLE)
def refactor_module_to_package(self):
"""Convert the current module into a package."""
refactor = ModuleToPackage(self.project, self.resource)
return self._get_changes(refactor)
@options("Rename symbol at point", category="Symbol",
args=[("offset", "offset", None),
("new_name", "string", "Rename to: "),
("in_hierarchy", "boolean",
"Rename in super-/subclasses as well? "),
("docs", "boolean",
"Replace occurences in docs and strings? ")
],
available=ROPE_AVAILABLE)
def refactor_rename_at_point(self, offset, new_name, in_hierarchy, docs):
"""Rename the symbol at point."""
try:
refactor = Rename(self.project, self.resource, offset)
except RefactoringError as e:
raise Fault(str(e), code=400)
return self._get_changes(refactor, new_name,
in_hierarchy=in_hierarchy, docs=docs)
@options("Rename current module", category="Module",
args=[("new_name", "string", "Rename to: ")],
available=ROPE_AVAILABLE)
def refactor_rename_current_module(self, new_name):
"""Rename the current module."""
refactor = Rename(self.project, self.resource, None)
return self._get_changes(refactor, new_name)
@options("Move the current module to a different package",
category="Module",
args=[("new_name", "directory", "Destination package: ")],
available=ROPE_AVAILABLE)
def refactor_move_module(self, new_name):
"""Move the current module."""
refactor = create_move(self.project, self.resource)
resource = path_to_resource(self.project, new_name)
return self._get_changes(refactor, resource)
@options("Inline function call at point", category="Symbol",
args=[("offset", "offset", None),
("only_this", "boolean", "Only this occurrence? ")],
available=ROPE_AVAILABLE)
def refactor_create_inline(self, offset, only_this):
"""Inline the function call at point."""
refactor = create_inline(self.project, self.resource, offset)
if only_this:
return self._get_changes(refactor, remove=False, only_current=True)
else:
return self._get_changes(refactor, remove=True, only_current=False)
@options("Extract current region as a method", category="Region",
args=[("start", "start_offset", None),
("end", "end_offset", None),
("name", "string", "Method name: "),
("make_global", "boolean", "Create global method? ")],
available=ROPE_AVAILABLE)
def refactor_extract_method(self, start, end, name,
make_global):
"""Extract region as a method."""
refactor = ExtractMethod(self.project, self.resource, start, end)
return self._get_changes(
refactor, name, similar=True, global_=make_global
)
@options("Use the function at point wherever possible", category="Method",
args=[("offset", "offset", None)],
available=ROPE_AVAILABLE)
def refactor_use_function(self, offset):
"""Use the function at point wherever possible."""
try:
refactor = UseFunction(self.project, self.resource, offset)
except RefactoringError as e:
raise Fault(
'Refactoring error: {}'.format(e),
code=400
)
return self._get_changes(refactor)
def _get_changes(self, refactor, *args, **kwargs):
try:
changes = refactor.get_changes(*args, **kwargs)
except Exception as e:
raise Fault("Error during refactoring: {}".format(e),
code=400)
return translate_changes(changes)
def translate_changes(initial_change):
"""Translate rope.base.change.Change instances to dictionaries.
See Refactor.get_changes for an explanation of the resulting
dictionary.
"""
agenda = [initial_change]
result = []
while agenda:
change = agenda.pop(0)
if isinstance(change, rope_change.ChangeSet):
agenda.extend(change.changes)
elif isinstance(change, rope_change.ChangeContents):
result.append({'action': 'change',
'file': change.resource.real_path,
'contents': change.new_contents,
'diff': change.get_description()})
elif isinstance(change, rope_change.CreateFile):
result.append({'action': 'create',
'type': 'file',
'file': change.resource.real_path})
elif isinstance(change, rope_change.CreateFolder):
result.append({'action': 'create',
'type': 'directory',
'path': change.resource.real_path})
elif isinstance(change, rope_change.MoveResource):
result.append({'action': 'move',
'type': ('directory'
if change.new_resource.is_folder()
else 'file'),
'source': change.resource.real_path,
'destination': change.new_resource.real_path})
elif isinstance(change, rope_change.RemoveResource):
if change.resource.is_folder():
result.append({'action': 'delete',
'type': 'directory',
'path': change.resource.real_path})
else:
result.append({'action': 'delete',
'type': 'file',
'file': change.resource.real_path})
return result
class FakeResource(object):
"""A fake resource in case Rope is absence."""
def __init__(self, filename):
self.real_path = filename
def read(self):
with open(self.real_path) as f:
return f.read()

View File

@@ -0,0 +1,290 @@
"""Elpy backend using the Rope library.
This backend uses the Rope library:
http://rope.sourceforge.net/
"""
import os
import time
import rope.contrib.codeassist
import rope.base.project
import rope.base.libutils
import rope.base.exceptions
import rope.contrib.findit
from elpy import rpc
import elpy.pydocutils
VALIDATE_EVERY_SECONDS = 5
MAXFIXES = 5
class RopeBackend(object):
"""The Rope backend class.
Implements the RPC calls we can pass on to Rope. Also subclasses
the native backend to provide methods Rope does not provide, if
any.
"""
name = "rope"
def __init__(self, project_root):
super(RopeBackend, self).__init__()
self.last_validation = 0
if not os.path.exists(project_root):
raise rpc.Fault(
"rope does not support files without a local project root",
code=400
)
self.project_root = project_root
self.completions = {}
prefs = dict(ignored_resources=['*.pyc', '*~', '.ropeproject',
'.hg', '.svn', '_svn', '.git'],
python_files=['*.py'],
save_objectdb=False,
compress_objectdb=False,
automatic_soa=True,
soa_followed_calls=0,
perform_doa=True,
validate_objectdb=True,
max_history_items=32,
save_history=False,
compress_history=False,
indent_size=4,
extension_modules=[],
import_dynload_stdmods=True,
ignore_syntax_errors=False,
ignore_bad_imports=False)
self.project = rope.base.project.Project(self.project_root,
ropefolder=None,
**prefs)
def get_resource(self, filename):
if filename is not None and os.path.exists(filename):
return rope.base.libutils.path_to_resource(self.project,
filename,
'file')
else:
return None
def validate(self):
"""Validate the stored project.
This should be called before every use of Rope. It will
revalidate the project, but do some call throttling.
"""
now = time.time()
if now > self.last_validation + VALIDATE_EVERY_SECONDS:
try:
self.project.validate()
except rope.base.exceptions.ResourceNotFoundError:
pass
self.last_validation = now
def call_rope(self, rope_function, filename, source, offset,
**kwargs):
self.validate()
resource = self.get_resource(filename)
try:
return rope_function(self.project,
source, offset,
resource,
maxfixes=MAXFIXES,
**kwargs)
except Exception:
return None
def rpc_get_completions(self, filename, source, offset):
proposals = self.call_rope(
rope.contrib.codeassist.code_assist,
filename, source, offset
)
if proposals is None:
return []
try:
starting_offset = rope.contrib.codeassist.starting_offset(source,
offset)
except Exception:
return []
prefixlen = offset - starting_offset
try:
self.completions = dict((proposal.name, proposal)
for proposal in proposals)
return [{'name': proposal.name,
'suffix': proposal.name[prefixlen:],
'annotation': proposal.type,
'meta': str(proposal)}
for proposal in proposals]
except Exception:
return []
def rpc_get_completion_docstring(self, completion):
proposal = self.completions.get(completion)
if proposal is None:
return None
else:
return proposal.get_doc()
def rpc_get_completion_location(self, completion):
proposal = self.completions.get(completion)
if proposal is None:
return None
else:
if not proposal.pyname:
return None
module, lineno = proposal.pyname.get_definition_location()
if module is None:
return None
resource = module.get_module().get_resource()
return (resource.real_path, lineno)
def rpc_get_definition(self, filename, source, offset):
location = self.call_rope(
rope.contrib.findit.find_definition,
filename, source, offset
)
if location is None:
return None
else:
return (location.resource.real_path, location.offset)
def rpc_get_calltip(self, filename, source, offset):
offset = find_called_name_offset(source, offset)
if 0 < offset < len(source) and source[offset] == ')':
offset -= 1
calltip = self.call_rope(
rope.contrib.codeassist.get_calltip,
filename, source, offset,
remove_self=True
)
if calltip is None:
return None
calltip = calltip.replace(".__init__(", "(")
calltip = calltip.replace("(self)", "()")
calltip = calltip.replace("(self, ", "(")
# "elpy.tests.support.source_and_offset(source)"
# =>
# "support.source_and_offset(source)"
try:
openpos = calltip.index("(")
period2 = calltip.rindex(".", 0, openpos)
period1 = calltip.rindex(".", 0, period2)
calltip = calltip[period1 + 1:]
except ValueError:
pass
return calltip
def rpc_get_docstring(self, filename, source, offset):
return self.call_rope(
rope.contrib.codeassist.get_doc,
filename, source, offset
)
def find_called_name_offset(source, orig_offset):
"""Return the offset of a calling function.
This only approximates movement.
"""
offset = min(orig_offset, len(source) - 1)
paren_count = 0
while True:
if offset <= 1:
return orig_offset
elif source[offset] == '(':
if paren_count == 0:
return offset - 1
else:
paren_count -= 1
elif source[offset] == ')':
paren_count += 1
offset -= 1
##################################################################
# A recurring problem in Rope for Elpy is that it searches the whole
# project root for Python files. If the user edits a file in their
# home directory, this can easily read a whole lot of files, making
# Rope practically useless. We change the file finding algorithm here
# to only recurse into directories with an __init__.py file in them.
def find_source_folders(self, folder):
for resource in folder.get_folders():
if self._is_package(resource):
return [folder]
result = []
for resource in folder.get_files():
if resource.name.endswith('.py'):
result.append(folder)
break
for resource in folder.get_folders():
if self._is_package(resource):
result.append(resource)
return result
import rope.base.pycore
rope.base.pycore.PyCore._find_source_folders = find_source_folders
def get_files(self):
if self.files is None:
self.files = get_python_project_files(self.project)
return self.files
rope.base.project._FileListCacher.get_files = get_files
def get_python_project_files(project):
for dirname, subdirs, files in os.walk(project.root.real_path):
for filename in files:
yield rope.base.libutils.path_to_resource(
project, os.path.join(dirname, filename), 'file')
subdirs[:] = [subdir for subdir in subdirs
if os.path.exists(os.path.join(dirname, subdir,
"__init__.py"))]
##################################################################
# Monkey patching a method in rope because it doesn't complete import
# statements.
orig_code_completions = (rope.contrib.codeassist.
_PythonCodeAssist._code_completions)
def code_completions(self):
proposals = get_import_completions(self)
if proposals:
return proposals
else:
return orig_code_completions(self)
def get_import_completions(self):
if not self.word_finder.is_import_statement(self.offset):
return []
modulename = self.word_finder.get_primary_at(self.offset)
# Rope can handle modules in packages
if "." in modulename:
return []
return dict((name, FakeProposal(name))
for name in elpy.pydocutils.get_modules()
if name.startswith(modulename))
class FakeProposal(object):
def __init__(self, name):
self.name = name
self.type = "mock"
def get_doc(self):
return None
rope.contrib.codeassist._PythonCodeAssist._code_completions = code_completions

View File

@@ -0,0 +1,151 @@
"""A simple JSON-RPC-like server.
The server will read and write lines of JSON-encoded method calls and
responses.
See the documentation of the JSONRPCServer class for further details.
"""
import json
import sys
import traceback
class JSONRPCServer(object):
"""Simple JSON-RPC-like server.
This class will read single-line JSON expressions from stdin,
decode them, and pass them to a handler. Return values from the
handler will be JSON-encoded and written to stdout.
To implement a handler, you need to subclass this class and add
methods starting with "rpc_". Methods then will be found.
Method calls should be encoded like this:
{"id": 23, "method": "method_name", "params": ["foo", "bar"]}
This will call self.rpc_method("foo", "bar").
Responses will be encoded like this:
{"id": 23, "result": "foo"}
Errors will be encoded like this:
{"id": 23, "error": "Simple error message"}
See http://www.jsonrpc.org/ for the inspiration of the protocol.
"""
def __init__(self, stdin=None, stdout=None):
"""Return a new JSON-RPC server object.
It will read lines of JSON data from stdin, and write the
responses to stdout.
"""
if stdin is None:
self.stdin = sys.stdin
else:
self.stdin = stdin
if stdout is None:
self.stdout = sys.stdout
else:
self.stdout = stdout
def read_json(self):
"""Read a single line and decode it as JSON.
Can raise an EOFError() when the input source was closed.
"""
line = self.stdin.readline()
if line == '':
raise EOFError()
return json.loads(line)
def write_json(self, **kwargs):
"""Write an JSON object on a single line.
The keyword arguments are interpreted as a single JSON object.
It's not possible with this method to write non-objects.
"""
self.stdout.write(json.dumps(kwargs) + "\n")
self.stdout.flush()
def handle_request(self):
"""Handle a single JSON-RPC request.
Read a request, call the appropriate handler method, and
return the encoded result. Errors in the handler method are
caught and encoded as error objects. Errors in the decoding
phase are not caught, as we can not respond with an error
response to them.
"""
request = self.read_json()
if 'method' not in request:
raise ValueError("Received a bad request: {0}"
.format(request))
method_name = request['method']
request_id = request.get('id', None)
params = request.get('params') or []
try:
method = getattr(self, "rpc_" + method_name, None)
if method is not None:
result = method(*params)
else:
result = self.handle(method_name, params)
if request_id is not None:
self.write_json(result=result,
id=request_id)
except Fault as fault:
error = {"message": fault.message,
"code": fault.code}
if fault.data is not None:
error["data"] = fault.data
self.write_json(error=error, id=request_id)
except Exception as e:
error = {"message": str(e),
"code": 500,
"data": {"traceback": traceback.format_exc()}}
self.write_json(error=error, id=request_id)
def handle(self, method_name, args):
"""Handle the call to method_name.
You should overwrite this method in a subclass.
"""
raise Fault("Unknown method {0}".format(method_name))
def serve_forever(self):
"""Serve requests forever.
Errors are not caught, so this is a slight misnomer.
"""
while True:
try:
self.handle_request()
except (KeyboardInterrupt, EOFError, SystemExit):
break
class Fault(Exception):
"""RPC Fault instances.
code defines the severity of the warning.
2xx: Normal behavior lead to end of operation, i.e. a warning
4xx: An expected error occurred
5xx: An unexpected error occurred (usually includes a traceback)
"""
def __init__(self, message, code=500, data=None):
super(Fault, self).__init__(message)
self.message = message
self.code = code
self.data = data

View File

@@ -0,0 +1,322 @@
"""Method implementations for the Elpy JSON-RPC server.
This file implements the methods exported by the JSON-RPC server. It
handles backend selection and passes methods on to the selected
backend.
"""
import io
import os
import pydoc
from elpy.pydocutils import get_pydoc_completions
from elpy.rpc import JSONRPCServer, Fault
from elpy.impmagic import ImportMagic, ImportMagicError
from elpy.auto_pep8 import fix_code
from elpy.yapfutil import fix_code as fix_code_with_yapf
try:
from elpy import jedibackend
except ImportError: # pragma: no cover
jedibackend = None
try:
from elpy import ropebackend
except ImportError: # pragma: no cover
ropebackend = None
class ElpyRPCServer(JSONRPCServer):
"""The RPC server for elpy.
See the rpc_* methods for exported method documentation.
"""
def __init__(self, *args, **kwargs):
super(ElpyRPCServer, self).__init__(*args, **kwargs)
self.backend = None
self.import_magic = ImportMagic()
self.project_root = None
def _call_backend(self, method, default, *args, **kwargs):
"""Call the backend method with args.
If there is currently no backend, return default."""
meth = getattr(self.backend, method, None)
if meth is None:
return default
else:
return meth(*args, **kwargs)
def rpc_echo(self, *args):
"""Return the arguments.
This is a simple test method to see if the protocol is
working.
"""
return args
def rpc_init(self, options):
self.project_root = options["project_root"]
if self.import_magic.is_enabled:
self.import_magic.build_index(self.project_root)
if ropebackend and options["backend"] == "rope":
self.backend = ropebackend.RopeBackend(self.project_root)
elif jedibackend and options["backend"] == "jedi":
self.backend = jedibackend.JediBackend(self.project_root)
elif ropebackend:
self.backend = ropebackend.RopeBackend(self.project_root)
elif jedibackend:
self.backend = jedibackend.JediBackend(self.project_root)
else:
self.backend = None
return {
'backend': (self.backend.name if self.backend is not None
else None)
}
def rpc_get_calltip(self, filename, source, offset):
"""Get the calltip for the function at the offset.
"""
return self._call_backend("rpc_get_calltip", None, filename,
get_source(source), offset)
def rpc_get_completions(self, filename, source, offset):
"""Get a list of completion candidates for the symbol at offset.
"""
results = self._call_backend("rpc_get_completions", [], filename,
get_source(source), offset)
# Uniquify by name
results = list(dict((res['name'], res) for res in results)
.values())
results.sort(key=lambda cand: _pysymbol_key(cand["name"]))
return results
def rpc_get_completion_docstring(self, completion):
"""Return documentation for a previously returned completion.
"""
return self._call_backend("rpc_get_completion_docstring",
None, completion)
def rpc_get_completion_location(self, completion):
"""Return the location for a previously returned completion.
This returns a list of [file name, line number].
"""
return self._call_backend("rpc_get_completion_location", None,
completion)
def rpc_get_definition(self, filename, source, offset):
"""Get the location of the definition for the symbol at the offset.
"""
return self._call_backend("rpc_get_definition", None, filename,
get_source(source), offset)
def rpc_get_assignment(self, filename, source, offset):
"""Get the location of the assignment for the symbol at the offset.
"""
return self._call_backend("rpc_get_assignment", None, filename,
get_source(source), offset)
def rpc_get_docstring(self, filename, source, offset):
"""Get the docstring for the symbol at the offset.
"""
return self._call_backend("rpc_get_docstring", None, filename,
get_source(source), offset)
def rpc_get_pydoc_completions(self, name=None):
"""Return a list of possible strings to pass to pydoc.
If name is given, the strings are under name. If not, top
level modules are returned.
"""
return get_pydoc_completions(name)
def rpc_get_pydoc_documentation(self, symbol):
"""Get the Pydoc documentation for the given symbol.
Uses pydoc and can return a string with backspace characters
for bold highlighting.
"""
try:
docstring = pydoc.render_doc(str(symbol),
"Elpy Pydoc Documentation for %s",
False)
except (ImportError, pydoc.ErrorDuringImport):
return None
else:
if isinstance(docstring, bytes):
docstring = docstring.decode("utf-8", "replace")
return docstring
def rpc_get_refactor_options(self, filename, start, end=None):
"""Return a list of possible refactoring options.
This list will be filtered depending on whether it's
applicable at the point START and possibly the region between
START and END.
"""
try:
from elpy import refactor
except:
raise ImportError("Rope not installed, refactorings unavailable")
ref = refactor.Refactor(self.project_root, filename)
return ref.get_refactor_options(start, end)
def rpc_refactor(self, filename, method, args):
"""Return a list of changes from the refactoring action.
A change is a dictionary describing the change. See
elpy.refactor.translate_changes for a description.
"""
try:
from elpy import refactor
except:
raise ImportError("Rope not installed, refactorings unavailable")
if args is None:
args = ()
ref = refactor.Refactor(self.project_root, filename)
return ref.get_changes(method, *args)
def rpc_get_usages(self, filename, source, offset):
"""Get usages for the symbol at point.
"""
source = get_source(source)
if hasattr(self.backend, "rpc_get_usages"):
return self.backend.rpc_get_usages(filename, source, offset)
else:
raise Fault("get_usages not implemented by current backend",
code=400)
def rpc_get_names(self, filename, source, offset):
"""Get all possible names
"""
source = get_source(source)
if hasattr(self.backend, "rpc_get_names"):
return self.backend.rpc_get_names(filename, source, offset)
else:
raise Fault("get_names not implemented by current backend",
code=400)
def _ensure_import_magic(self): # pragma: no cover
if not self.import_magic.is_enabled:
raise Fault("fixup_imports not enabled; install importmagic module",
code=400)
if not self.import_magic.symbol_index:
raise Fault(self.import_magic.fail_message, code=200) # XXX code?
def rpc_get_import_symbols(self, filename, source, symbol):
"""Return a list of modules from which the given symbol can be imported.
"""
self._ensure_import_magic()
try:
return self.import_magic.get_import_symbols(symbol)
except ImportMagicError as err:
raise Fault(str(err), code=200)
def rpc_add_import(self, filename, source, statement):
"""Add an import statement to the module.
"""
self._ensure_import_magic()
source = get_source(source)
try:
return self.import_magic.add_import(source, statement)
except ImportMagicError as err:
raise Fault(str(err), code=200)
def rpc_get_unresolved_symbols(self, filename, source):
"""Return a list of unreferenced symbols in the module.
"""
self._ensure_import_magic()
source = get_source(source)
try:
return self.import_magic.get_unresolved_symbols(source)
except ImportMagicError as err:
raise Fault(str(err), code=200)
def rpc_remove_unreferenced_imports(self, filename, source):
"""Remove unused import statements.
"""
self._ensure_import_magic()
source = get_source(source)
try:
return self.import_magic.remove_unreferenced_imports(source)
except ImportMagicError as err:
raise Fault(str(err), code=200)
def rpc_fix_code(self, source):
"""Formats Python code to conform to the PEP 8 style guide.
"""
source = get_source(source)
return fix_code(source)
def rpc_fix_code_with_yapf(self, source):
"""Formats Python code to conform to the PEP 8 style guide.
"""
source = get_source(source)
return fix_code_with_yapf(source)
def get_source(fileobj):
"""Translate fileobj into file contents.
fileobj is either a string or a dict. If it's a string, that's the
file contents. If it's a string, then the filename key contains
the name of the file whose contents we are to use.
If the dict contains a true value for the key delete_after_use,
the file should be deleted once read.
"""
if not isinstance(fileobj, dict):
return fileobj
else:
try:
with io.open(fileobj["filename"], encoding="utf-8",
errors="ignore") as f:
return f.read()
finally:
if fileobj.get('delete_after_use'):
try:
os.remove(fileobj["filename"])
except: # pragma: no cover
pass
def _pysymbol_key(name):
"""Return a sortable key index for name.
Sorting is case-insensitive, with the first underscore counting as
worse than any character, but subsequent underscores do not. This
means that dunder symbols (like __init__) are sorted after symbols
that start with an alphabetic character, but before those that
start with only a single underscore.
"""
if name.startswith("_"):
name = "~" + name[1:]
return name.lower()

View File

@@ -0,0 +1,8 @@
"""Unit tests for elpy."""
try:
import unittest2
import sys
sys.modules['unittest'] = unittest2
except:
pass

View File

@@ -0,0 +1,18 @@
"""Python 2/3 compatibility definitions.
These are used by the rest of Elpy to keep compatibility definitions
in one place.
"""
import sys
if sys.version_info >= (3, 0):
PYTHON3 = True
import builtins
from io import StringIO
else:
PYTHON3 = False
import __builtin__ as builtins # noqa
from StringIO import StringIO # noqa

View File

@@ -0,0 +1,950 @@
# coding: utf-8
"""Support classes and functions for the elpy test code.
Elpy uses a bit of a peculiar test setup to avoid redundancy. For the
tests of the two backends, we provide generic test cases for generic
tests and for specific callback tests.
These mixins can be included in the actual test classes. We can't add
these tests to a BackendTestCase subclass directly because the test
discovery would find them there and try to run them, which would fail.
"""
import os
import shutil
import sys
import tempfile
import unittest
from elpy.tests import compat
class BackendTestCase(unittest.TestCase):
"""Base class for backend tests.
This class sets up a project root directory and provides an easy
way to create files within the project root.
"""
def setUp(self):
"""Create the project root and make sure it gets cleaned up."""
super(BackendTestCase, self).setUp()
self.project_root = tempfile.mkdtemp(prefix="elpy-test")
self.addCleanup(shutil.rmtree, self.project_root, True)
def project_file(self, relname, contents):
"""Create a file named relname within the project root.
Write contents into that file.
"""
full_name = os.path.join(self.project_root, relname)
try:
os.makedirs(os.path.dirname(full_name))
except OSError:
pass
if compat.PYTHON3:
fobj = open(full_name, "w", encoding="utf-8")
else:
fobj = open(full_name, "w")
with fobj as f:
f.write(contents)
return full_name
class GenericRPCTests(object):
"""Generic RPC test methods.
This is a mixin to add tests that should be run for all RPC
methods that follow the generic (filename, source, offset) calling
conventions.
"""
METHOD = None
def rpc(self, filename, source, offset):
method = getattr(self.backend, self.METHOD)
return method(filename, source, offset)
def test_should_not_fail_on_inexisting_file(self):
filename = self.project_root + "/doesnotexist.py"
self.rpc(filename, "", 0)
def test_should_not_fail_on_empty_file(self):
filename = self.project_file("test.py", "")
self.rpc(filename, "", 0)
def test_should_not_fail_if_file_is_none(self):
self.rpc(None, "", 0)
def test_should_not_fail_for_module_syntax_errors(self):
source, offset = source_and_offset(
"class Foo(object):\n"
" def bar(self):\n"
" foo(_|_"
" bar("
"\n"
" def a(self):\n"
" pass\n"
"\n"
" def b(self):\n"
" pass\n"
"\n"
" def b(self):\n"
" pass\n"
"\n"
" def b(self):\n"
" pass\n"
"\n"
" def b(self):\n"
" pass\n"
"\n"
" def b(self):\n"
" pass\n"
)
filename = self.project_file("test.py", source)
self.rpc(filename, source, offset)
def test_should_not_fail_for_bad_indentation(self):
# Bug in Rope: rope#80
source, offset = source_and_offset(
"def foo():\n"
" print(23)_|_\n"
" print(17)\n")
filename = self.project_file("test.py", source)
self.rpc(filename, source, offset)
@unittest.skipIf((3, 3) <= sys.version_info < (3, 4),
"Bug in jedi for Python 3.3")
def test_should_not_fail_for_relative_import(self):
# Bug in Rope: rope#81 and rope#82
source, offset = source_and_offset(
"from .. import foo_|_"
)
filename = self.project_file("test.py", source)
self.rpc(filename, source, offset)
def test_should_not_fail_on_keyword(self):
source, offset = source_and_offset(
"_|_try:\n"
" pass\n"
"except:\n"
" pass\n")
filename = self.project_file("test.py", source)
self.rpc(filename, source, offset)
def test_should_not_fail_with_bad_encoding(self):
# Bug in Rope: rope#83
source, offset = source_and_offset(
u'# coding: utf-8X_|_\n'
)
filename = self.project_file("test.py", source)
self.rpc(filename, source, offset)
def test_should_not_fail_with_form_feed_characters(self):
# Bug in Jedi: jedi#424
source, offset = source_and_offset(
"\f\n"
"class Test(object):_|_\n"
" pass"
)
filename = self.project_file("test.py", source)
self.rpc(filename, source, offset)
def test_should_not_fail_for_dictionaries_in_weird_places(self):
# Bug in Jedi: jedi#417
source, offset = source_and_offset(
"import json\n"
"\n"
"def foo():\n"
" json.loads(_|_\n"
"\n"
" json.load.return_value = {'foo': [],\n"
" 'bar': True}\n"
"\n"
" c = Foo()\n"
)
filename = self.project_file("test.py", source)
self.rpc(filename, source, offset)
def test_should_not_break_with_binary_characters_in_docstring(self):
# Bug in Jedi: jedi#427
template = '''\
class Foo(object):
def __init__(self):
"""
COMMUNITY instance that this conversion belongs to.
DISPERSY_VERSION is the dispersy conversion identifier (on the wire version; must be one byte).
COMMUNIY_VERSION is the community conversion identifier (on the wire version; must be one byte).
COMMUNIY_VERSION may not be '\\x00' or '\\xff'. '\\x00' is used by the DefaultConversion until
a proper conversion instance can be made for the Community. '\\xff' is reserved for when
more than one byte is needed as a version indicator.
"""
pass
x = Foo()
x._|_
'''
source, offset = source_and_offset(template)
filename = self.project_file("test.py", source)
self.rpc(filename, source, offset)
def test_should_not_fail_for_def_without_name(self):
# Bug jedi#429
source, offset = source_and_offset(
"def_|_():\n"
" if True:\n"
" return True\n"
" else:\n"
" return False\n"
)
filename = self.project_file("project.py", source)
self.rpc(filename, source, offset)
def test_should_not_fail_on_lambda(self):
# Bug #272 / jedi#431, jedi#572
source, offset = source_and_offset(
"map(lambda_|_"
)
filename = self.project_file("project.py", source)
self.rpc(filename, source, offset)
def test_should_not_fail_on_literals(self):
# Bug #314, #344 / jedi#466
source = u'lit = u"""\\\n# -*- coding: utf-8 -*-\n"""\n'
offset = 0
filename = self.project_file("project.py", source)
self.rpc(filename, source, offset)
def test_should_not_fail_with_args_as_args(self):
# Bug #347 in rope_py3k
source, offset = source_and_offset(
"def my_function(*args):\n"
" ret_|_"
)
filename = self.project_file("project.py", source)
self.rpc(filename, source, offset)
def test_should_not_fail_for_unicode_chars_in_string(self):
# Bug #358 / jedi#482
source = '''\
# coding: utf-8
logging.info(u"Saving «{}»...".format(title))
requests.get(u"https://web.archive.org/save/{}".format(url))
'''
offset = 57
filename = self.project_file("project.py", source)
self.rpc(filename, source, offset)
def test_should_not_fail_for_bad_escape_sequence(self):
# Bug #360 / jedi#485
source = r"v = '\x'"
offset = 8
filename = self.project_file("project.py", source)
self.rpc(filename, source, offset)
def test_should_not_fail_for_coding_declarations_in_strings(self):
# Bug #314 / jedi#465 / python#22221
source = u'lit = """\\\n# -*- coding: utf-8 -*-\n"""'
offset = 8
filename = self.project_file("project.py", source)
self.rpc(filename, source, offset)
def test_should_not_fail_if_root_vanishes(self):
# Bug #353
source, offset = source_and_offset(
"import foo\n"
"foo._|_"
)
filename = self.project_file("project.py", source)
shutil.rmtree(self.project_root)
self.rpc(filename, source, offset)
# For some reason, this breaks a lot of other tests. Couldn't
# figure out why.
#
# def test_should_not_fail_for_sys_path(self):
# # Bug #365 / jedi#486
# source, offset = source_and_offset(
# "import sys\n"
# "\n"
# "sys.path.index(_|_\n"
# )
# filename = self.project_file("project.py", source)
#
# self.rpc(filename, source, offset)
def test_should_not_fail_for_key_error(self):
# Bug #561, #564, #570, #588, #593, #599 / jedi#572, jedi#579,
# jedi#590
source, offset = source_and_offset(
"map(lambda_|_"
)
filename = self.project_file("project.py", source)
self.rpc(filename, source, offset)
def test_should_not_fail_for_badly_defined_global_variable(self):
# Bug #519 / jedi#610
source, offset = source_and_offset(
"""\
def funct1():
global global_dict_var
global_dict_var = dict()
def funct2():
global global_dict_var
q = global_dict_var.copy_|_()
print(q)""")
filename = self.project_file("project.py", source)
self.rpc(filename, source, offset)
def test_should_not_fail_with_mergednamesdict(self):
# Bug #563 / jedi#589
source, offset = source_and_offset(
u'from email import message_|_'
)
filename = self.project_file("project.py", source)
self.rpc(filename, source, offset)
class RPCGetCompletionsTests(GenericRPCTests):
METHOD = "rpc_get_completions"
def test_should_complete_builtin(self):
source, offset = source_and_offset("o_|_")
expected = self.BUILTINS
actual = [cand['name'] for cand in
self.backend.rpc_get_completions("test.py",
source, offset)]
for candidate in expected:
self.assertIn(candidate, actual)
if sys.version_info >= (3, 5):
JSON_COMPLETIONS = ["SONDecoder", "SONEncoder", "SONDecodeError"]
else:
JSON_COMPLETIONS = ["SONDecoder", "SONEncoder"]
def test_should_complete_imports(self):
source, offset = source_and_offset("import json\n"
"json.J_|_")
filename = self.project_file("test.py", source)
completions = self.backend.rpc_get_completions(filename,
source,
offset)
self.assertEqual(
sorted([cand['suffix'] for cand in completions]),
sorted(self.JSON_COMPLETIONS))
def test_should_complete_top_level_modules_for_import(self):
source, offset = source_and_offset("import multi_|_")
filename = self.project_file("test.py", source)
completions = self.backend.rpc_get_completions(filename,
source,
offset)
if compat.PYTHON3:
expected = ["processing"]
else:
expected = ["file", "processing"]
self.assertEqual(sorted([cand['suffix'] for cand in completions]),
sorted(expected))
def test_should_complete_packages_for_import(self):
source, offset = source_and_offset("import email.mi_|_")
filename = self.project_file("test.py", source)
completions = self.backend.rpc_get_completions(filename,
source,
offset)
self.assertEqual([cand['suffix'] for cand in completions],
["me"])
def test_should_not_complete_for_import(self):
source, offset = source_and_offset("import foo.Conf_|_")
filename = self.project_file("test.py", source)
completions = self.backend.rpc_get_completions(filename,
source,
offset)
self.assertEqual([cand['suffix'] for cand in completions],
[])
@unittest.skipIf((3, 3) <= sys.version_info < (3, 4),
"Bug in jedi for Python 3.3")
def test_should_not_fail_for_short_module(self):
source, offset = source_and_offset("from .. import foo_|_")
filename = self.project_file("test.py", source)
completions = self.backend.rpc_get_completions(filename,
source,
offset)
self.assertIsNotNone(completions)
def test_should_complete_sys(self):
source, offset = source_and_offset("import sys\nsys._|_")
filename = self.project_file("test.py", source)
completions = self.backend.rpc_get_completions(filename,
source,
offset)
self.assertIn('path', [cand['suffix'] for cand in completions])
def test_should_find_with_trailing_text(self):
source, offset = source_and_offset(
"import threading\nthreading.T_|_mumble mumble")
expected = ["Thread", "ThreadError", "Timer"]
actual = [cand['name'] for cand in
self.backend.rpc_get_completions("test.py", source, offset)]
for candidate in expected:
self.assertIn(candidate, actual)
def test_should_find_completion_different_package(self):
# See issue #74
self.project_file("project/__init__.py", "")
source1 = ("class Add:\n"
" def add(self, a, b):\n"
" return a + b\n")
self.project_file("project/add.py", source1)
source2, offset = source_and_offset(
"from project.add import Add\n"
"class Calculator:\n"
" def add(self, a, b):\n"
" c = Add()\n"
" c.ad_|_\n")
file2 = self.project_file("project/calculator.py", source2)
proposals = self.backend.rpc_get_completions(file2,
source2,
offset)
self.assertEqual(["add"],
[proposal["name"] for proposal in proposals])
class RPCGetCompletionDocstringTests(object):
def test_should_return_docstring(self):
source, offset = source_and_offset("import json\n"
"json.JSONEnc_|_")
filename = self.project_file("test.py", source)
completions = self.backend.rpc_get_completions(filename,
source,
offset)
completions.sort(key=lambda p: p["name"])
prop = completions[0]
self.assertEqual(prop["name"], "JSONEncoder")
docs = self.backend.rpc_get_completion_docstring("JSONEncoder")
self.assertIn("Extensible JSON", docs)
def test_should_return_none_if_unknown(self):
docs = self.backend.rpc_get_completion_docstring("Foo")
self.assertIsNone(docs)
class RPCGetCompletionLocationTests(object):
def test_should_return_location(self):
source, offset = source_and_offset("donaudampfschiff = 1\n"
"donau_|_")
filename = self.project_file("test.py", source)
completions = self.backend.rpc_get_completions(filename,
source,
offset)
prop = completions[0]
self.assertEqual(prop["name"], "donaudampfschiff")
loc = self.backend.rpc_get_completion_location("donaudampfschiff")
self.assertEqual((filename, 1), loc)
def test_should_return_none_if_unknown(self):
docs = self.backend.rpc_get_completion_location("Foo")
self.assertIsNone(docs)
class RPCGetDefinitionTests(GenericRPCTests):
METHOD = "rpc_get_definition"
def test_should_return_definition_location_same_file(self):
source, offset = source_and_offset("import threading\n"
"def test_function(a, b):\n"
" return a + b\n"
"\n"
"test_func_|_tion(\n")
filename = self.project_file("test.py", source)
location = self.backend.rpc_get_definition(filename,
source,
offset)
self.assertEqual(location[0], filename)
# On def or on the function name
self.assertIn(location[1], (17, 21))
def test_should_return_location_in_same_file_if_not_saved(self):
source, offset = source_and_offset(
"import threading\n"
"\n"
"\n"
"def other_function():\n"
" test_f_|_unction(1, 2)\n"
"\n"
"\n"
"def test_function(a, b):\n"
" return a + b\n")
filename = self.project_file("test.py", "")
location = self.backend.rpc_get_definition(filename,
source,
offset)
self.assertEqual(location[0], filename)
# def or function name
self.assertIn(location[1], (67, 71))
def test_should_return_location_in_different_file(self):
source1 = ("def test_function(a, b):\n"
" return a + b\n")
file1 = self.project_file("test1.py", source1)
source2, offset = source_and_offset("from test1 import test_function\n"
"test_funct_|_ion(1, 2)\n")
file2 = self.project_file("test2.py", source2)
definition = self.backend.rpc_get_definition(file2,
source2,
offset)
self.assertEqual(definition[0], file1)
# Either on the def or on the function name
self.assertIn(definition[1], (0, 4))
def test_should_return_none_if_location_not_found(self):
source, offset = source_and_offset("test_f_|_unction()\n")
filename = self.project_file("test.py", source)
definition = self.backend.rpc_get_definition(filename,
source,
offset)
self.assertIsNone(definition)
def test_should_return_none_if_outside_of_symbol(self):
source, offset = source_and_offset("test_function(_|_)\n")
filename = self.project_file("test.py", source)
definition = self.backend.rpc_get_definition(filename,
source,
offset)
self.assertIsNone(definition)
def test_should_return_definition_location_different_package(self):
# See issue #74
self.project_file("project/__init__.py", "")
source1 = ("class Add:\n"
" def add(self, a, b):\n"
" return a + b\n")
file1 = self.project_file("project/add.py", source1)
source2, offset = source_and_offset(
"from project.add import Add\n"
"class Calculator:\n"
" def add(self, a, b):\n"
" return Add_|_().add(a, b)\n")
file2 = self.project_file("project/calculator.py", source2)
location = self.backend.rpc_get_definition(file2,
source2,
offset)
self.assertEqual(location[0], file1)
# class or class name
self.assertIn(location[1], (0, 6))
def test_should_find_variable_definition(self):
source, offset = source_and_offset("SOME_VALUE = 1\n"
"\n"
"variable = _|_SOME_VALUE\n")
filename = self.project_file("test.py", source)
self.assertEqual(self.backend.rpc_get_definition(filename,
source,
offset),
(filename, 0))
class RPCGetAssignmentTests(GenericRPCTests):
METHOD = "rpc_get_assignment"
def test_should_return_assignment_location_same_file(self):
source, offset = source_and_offset("import threading\n"
"class TestClass(object):\n"
" def __init__(self, a, b):\n"
" self.a = a\n"
" self.b = b\n"
"\n"
"testclass = TestClass(2, 4)"
"\n"
"testcl_|_ass(\n")
filename = self.project_file("test.py", source)
location = self.backend.rpc_get_assignment(filename,
source,
offset)
self.assertEqual(location[0], filename)
# On def or on the function name
self.assertEqual(location[1], 111)
def test_should_return_location_in_same_file_if_not_saved(self):
source, offset = source_and_offset("import threading\n"
"class TestClass(object):\n"
" def __init__(self, a, b):\n"
" self.a = a\n"
" self.b = b\n"
"\n"
"testclass = TestClass(2, 4)"
"\n"
"testcl_|_ass(\n")
filename = self.project_file("test.py", "")
location = self.backend.rpc_get_assignment(filename,
source,
offset)
self.assertEqual(location[0], filename)
# def or function name
self.assertEqual(location[1], 111)
def test_should_return_location_in_different_file(self):
source1 = ("class TestClass(object):\n"
" def __init__(self, a, b):\n"
" self.a = a\n"
" self.b = b\n"
"testclass = TestClass(3, 5)\n")
file1 = self.project_file("test1.py", source1)
source2, offset = source_and_offset("from test1 import testclass\n"
"testcl_|_ass.a\n")
file2 = self.project_file("test2.py", source2)
# First jump goes to import statement
assignment = self.backend.rpc_get_assignment(file2,
source2,
offset)
# Second jump goes to test1 file
self.assertEqual(assignment[0], file2)
assignment = self.backend.rpc_get_assignment(file2,
source2,
assignment[1])
self.assertEqual(assignment[0], file1)
self.assertEqual(assignment[1], 93)
def test_should_return_none_if_location_not_found(self):
source, offset = source_and_offset("test_f_|_unction()\n")
filename = self.project_file("test.py", source)
assignment = self.backend.rpc_get_assignment(filename,
source,
offset)
self.assertIsNone(assignment)
def test_should_return_none_if_outside_of_symbol(self):
source, offset = source_and_offset("testcl(_|_)ass\n")
filename = self.project_file("test.py", source)
assignment = self.backend.rpc_get_assignment(filename,
source,
offset)
self.assertIsNone(assignment)
def test_should_find_variable_assignment(self):
source, offset = source_and_offset("SOME_VALUE = 1\n"
"\n"
"variable = _|_SOME_VALUE\n")
filename = self.project_file("test.py", source)
self.assertEqual(self.backend.rpc_get_assignment(filename,
source,
offset),
(filename, 0))
class RPCGetCalltipTests(GenericRPCTests):
METHOD = "rpc_get_calltip"
@unittest.skipIf(sys.version_info >= (3, 0),
"Bug in Jedi 0.9.0")
def test_should_get_calltip(self):
source, offset = source_and_offset(
"import threading\nthreading.Thread(_|_")
filename = self.project_file("test.py", source)
calltip = self.backend.rpc_get_calltip(filename,
source,
offset)
expected = self.THREAD_CALLTIP
self.assertEqual(calltip, expected)
@unittest.skipIf(sys.version_info >= (3, 0),
"Bug in Jedi 0.9.0")
def test_should_get_calltip_even_after_parens(self):
source, offset = source_and_offset(
"import threading\nthreading.Thread(foo()_|_")
filename = self.project_file("test.py", source)
actual = self.backend.rpc_get_calltip(filename,
source,
offset)
self.assertEqual(self.THREAD_CALLTIP, actual)
@unittest.skipIf(sys.version_info >= (3, 0),
"Bug in Jedi 0.9.0")
def test_should_get_calltip_at_closing_paren(self):
source, offset = source_and_offset(
"import threading\nthreading.Thread(_|_)")
filename = self.project_file("test.py", source)
actual = self.backend.rpc_get_calltip(filename,
source,
offset)
self.assertEqual(self.THREAD_CALLTIP, actual)
def test_should_not_missing_attribute_get_definition(self):
# Bug #627 / jedi#573
source, offset = source_and_offset(
"import threading\nthreading.Thread(_|_)")
filename = self.project_file("test.py", source)
self.backend.rpc_get_calltip(filename, source, offset)
def test_should_return_none_for_bad_identifier(self):
source, offset = source_and_offset(
"froblgoo(_|_")
filename = self.project_file("test.py", source)
calltip = self.backend.rpc_get_calltip(filename,
source,
offset)
self.assertIsNone(calltip)
def test_should_remove_self_argument(self):
source, offset = source_and_offset(
"d = dict()\n"
"d.keys(_|_")
filename = self.project_file("test.py", source)
actual = self.backend.rpc_get_calltip(filename,
source,
offset)
self.assertEqual(self.KEYS_CALLTIP, actual)
def test_should_remove_package_prefix(self):
source, offset = source_and_offset(
"import decimal\n"
"d = decimal.Decimal('1.5')\n"
"d.radix(_|_")
filename = self.project_file("test.py", source)
actual = self.backend.rpc_get_calltip(filename,
source,
offset)
self.assertEqual(self.RADIX_CALLTIP, actual)
def test_should_return_none_outside_of_all(self):
filename = self.project_file("test.py", "")
source, offset = source_and_offset("import thr_|_eading\n")
calltip = self.backend.rpc_get_calltip(filename,
source, offset)
self.assertIsNone(calltip)
def test_should_find_calltip_different_package(self):
# See issue #74
self.project_file("project/__init__.py", "")
source1 = ("class Add:\n"
" def add(self, a, b):\n"
" return a + b\n")
self.project_file("project/add.py", source1)
source2, offset = source_and_offset(
"from project.add import Add\n"
"class Calculator:\n"
" def add(self, a, b):\n"
" c = Add()\n"
" c.add(_|_\n")
file2 = self.project_file("project/calculator.py", source2)
actual = self.backend.rpc_get_calltip(file2,
source2,
offset)
self.assertEqual(self.ADD_CALLTIP, actual)
class RPCGetDocstringTests(GenericRPCTests):
METHOD = "rpc_get_docstring"
def check_docstring(self, docstring):
def first_line(s):
return s[:s.index("\n")]
self.assertEqual(first_line(docstring),
self.JSON_LOADS_DOCSTRING)
def test_should_get_docstring(self):
source, offset = source_and_offset(
"import json\njson.loads_|_(")
filename = self.project_file("test.py", source)
docstring = self.backend.rpc_get_docstring(filename,
source,
offset)
self.check_docstring(docstring)
def test_should_return_none_for_bad_identifier(self):
source, offset = source_and_offset(
"froblgoo_|_(\n")
filename = self.project_file("test.py", source)
docstring = self.backend.rpc_get_docstring(filename,
source,
offset)
self.assertIsNone(docstring)
class RPCGetNamesTests(GenericRPCTests):
METHOD = "rpc_get_names"
def test_shouldreturn_names_in_same_file(self):
filename = self.project_file("test.py", "")
source, offset = source_and_offset(
"def foo(x, y):\n"
" return x + y\n"
"c = _|_foo(5, 2)\n")
names = self.backend.rpc_get_names(filename,
source,
offset)
self.assertEqual(names,
[{'name': 'foo',
'filename': filename,
'offset': 4},
{'name': 'x',
'filename': filename,
'offset': 8},
{'name': 'y',
'filename': filename,
'offset': 11},
{'name': 'x',
'filename': filename,
'offset': 26},
{'name': 'y',
'filename': filename,
'offset': 30},
{'name': 'c',
'filename': filename,
'offset': 32},
{'name': 'foo',
'filename': filename,
'offset': 36}])
def test_should_not_fail_without_symbol(self):
filename = self.project_file("test.py", "")
names = self.backend.rpc_get_names(filename,
"",
0)
self.assertEqual(names, [])
class RPCGetUsagesTests(GenericRPCTests):
METHOD = "rpc_get_usages"
def test_should_return_uses_in_same_file(self):
filename = self.project_file("test.py", "")
source, offset = source_and_offset(
"def foo(x):\n"
" return _|_x + x\n")
usages = self.backend.rpc_get_usages(filename,
source,
offset)
self.assertEqual(usages,
[{'name': 'x',
'offset': 8,
'filename': filename},
{'name': 'x',
'filename': filename,
'offset': 23},
{'name': u'x',
'filename': filename,
'offset': 27}])
def test_should_return_uses_in_other_file(self):
file1 = self.project_file("file1.py", "")
file2 = self.project_file("file2.py", "\n\n\n\n\nx = 5")
source, offset = source_and_offset(
"import file2\n"
"file2._|_x\n")
usages = self.backend.rpc_get_usages(file1,
source,
offset)
self.assertEqual(usages,
[{'name': 'x',
'filename': file1,
'offset': 19},
{'name': 'x',
'filename': file2,
'offset': 5}])
def test_should_not_fail_without_symbol(self):
filename = self.project_file("file.py", "")
usages = self.backend.rpc_get_usages(filename,
"",
0)
self.assertEqual(usages, [])
def source_and_offset(source):
"""Return a source and offset from a source description.
>>> source_and_offset("hello, _|_world")
("hello, world", 7)
>>> source_and_offset("_|_hello, world")
("hello, world", 0)
>>> source_and_offset("hello, world_|_")
("hello, world", 12)
"""
offset = source.index("_|_")
return source[:offset] + source[offset + 3:], offset

View File

@@ -0,0 +1,20 @@
# coding: utf-8
"""Tests for the elpy.autopep8 module"""
import unittest
from elpy import auto_pep8
from elpy.tests.support import BackendTestCase
class Autopep8TestCase(BackendTestCase):
def setUp(self):
if not auto_pep8.autopep8:
raise unittest.SkipTest
def test_fix_code(self):
code_block = 'x= 123\n'
new_block = auto_pep8.fix_code(code_block)
self.assertEqual(new_block, 'x = 123\n')

View File

@@ -0,0 +1,73 @@
# coding: utf-8
"""Tests for the elpy.impmagic module"""
import re
import sys
import unittest
from elpy import impmagic
from elpy.tests.support import BackendTestCase
TEST_SOURCE = '''# test file
import time
import logging
os.getcwd()
time.sleep(1)
'''
@unittest.skipIf(sys.version_info >= (3, 5), "importmagic fails in 3.5")
class ImportMagicTestCase(BackendTestCase):
def setUp(self):
if not impmagic.importmagic:
raise unittest.SkipTest
self.importmagic = impmagic.ImportMagic()
super(ImportMagicTestCase, self).setUp()
def build_index(self):
self.project_file('mymod.py', 'class AnUncommonName:\n pass\n')
self.importmagic.build_index(self.project_root,
custom_path=[self.project_root],
blacklist_re=re.compile('^$'))
self.importmagic._thread.join()
def test_get_symbols(self):
self.build_index()
candidates = self.importmagic.get_import_symbols('AnUncommonName')
self.assertEqual(candidates, ['from mymod import AnUncommonName'])
candidates = self.importmagic.get_import_symbols('mymod')
self.assertEqual(candidates, ['import mymod'])
def test_add_import(self):
self.build_index()
start, end, newblock = self.importmagic.add_import(
TEST_SOURCE, 'from mymod import AnUncommonName')
self.assertEqual(start, 2)
self.assertEqual(end, 5)
self.assertEqual(newblock.strip(),
'import logging\n'
'import time\n'
'from mymod import AnUncommonName')
start, end, newblock = self.importmagic.add_import(
TEST_SOURCE, 'import mymod')
self.assertEqual(start, 2)
self.assertEqual(end, 5)
self.assertEqual(newblock.strip(),
'import logging\nimport mymod\nimport time')
def test_get_unresolved_symbols(self):
self.build_index()
symbols = self.importmagic.get_unresolved_symbols('x = a + b\ny = c.d')
self.assertEqual(sorted(symbols), ['a', 'b', 'c.d'])
def test_remove_unreferenced_imports(self):
self.build_index()
start, end, newblock = \
self.importmagic.remove_unreferenced_imports(TEST_SOURCE)
self.assertEqual(start, 2)
self.assertEqual(end, 5)
self.assertEqual(newblock.strip(), 'import time')

View File

@@ -0,0 +1,317 @@
"""Tests for the elpy.jedibackend module."""
import sys
import unittest
import jedi
import mock
from elpy import jedibackend
from elpy import rpc
from elpy.tests import compat
from elpy.tests.support import BackendTestCase
from elpy.tests.support import RPCGetCompletionsTests
from elpy.tests.support import RPCGetCompletionDocstringTests
from elpy.tests.support import RPCGetCompletionLocationTests
from elpy.tests.support import RPCGetDocstringTests
from elpy.tests.support import RPCGetDefinitionTests
from elpy.tests.support import RPCGetAssignmentTests
from elpy.tests.support import RPCGetCalltipTests
from elpy.tests.support import RPCGetUsagesTests
from elpy.tests.support import RPCGetNamesTests
class JediBackendTestCase(BackendTestCase):
def setUp(self):
super(JediBackendTestCase, self).setUp()
self.backend = jedibackend.JediBackend(self.project_root)
class TestInit(JediBackendTestCase):
def test_should_have_jedi_as_name(self):
self.assertEqual(self.backend.name, "jedi")
class TestRPCGetCompletions(RPCGetCompletionsTests,
JediBackendTestCase):
BUILTINS = ['object', 'oct', 'open', 'ord', 'OSError', 'OverflowError']
class TestRPCGetCompletionDocstring(RPCGetCompletionDocstringTests,
JediBackendTestCase):
pass
class TestRPCGetCompletionLocation(RPCGetCompletionLocationTests,
JediBackendTestCase):
pass
class TestRPCGetDocstring(RPCGetDocstringTests,
JediBackendTestCase):
JSON_LOADS_DOCSTRING = (
'loads(s, encoding=None, cls=None, '
'object_hook=None, parse_float=None,'
)
def check_docstring(self, docstring):
lines = docstring.splitlines()
self.assertEqual(lines[0], 'Documentation for json.loads:')
self.assertEqual(lines[2], self.JSON_LOADS_DOCSTRING)
@mock.patch("elpy.jedibackend.run_with_debug")
def test_should_not_return_empty_docstring(self, run_with_debug):
location = mock.MagicMock()
location.full_name = "testthing"
location.docstring.return_value = ""
run_with_debug.return_value = [location]
filename = self.project_file("test.py", "print")
docstring = self.backend.rpc_get_docstring(filename, "print", 0)
self.assertIsNone(docstring)
class TestRPCGetDefinition(RPCGetDefinitionTests,
JediBackendTestCase):
@mock.patch("jedi.Script")
def test_should_not_fail_if_module_path_is_none(self, Script):
"""Do not fail if loc.module_path is None.
This can happen under some circumstances I am unsure about.
See #537 for the issue that reported this.
"""
locations = [
mock.Mock(module_path=None)
]
script = Script.return_value
script.goto_definitions.return_value = locations
script.goto_assignments.return_value = locations
location = self.rpc("", "", 0)
self.assertIsNone(location)
class TestRPCGetAssignment(RPCGetAssignmentTests,
JediBackendTestCase):
@mock.patch("jedi.Script")
def test_should_not_fail_if_module_path_is_none(self, Script):
"""Do not fail if loc.module_path is None.
"""
locations = [
mock.Mock(module_path=None)
]
script = Script.return_value
script.goto_assignments.return_value = locations
script.goto_assignments.return_value = locations
location = self.rpc("", "", 0)
self.assertIsNone(location)
class TestRPCGetCalltip(RPCGetCalltipTests,
JediBackendTestCase):
KEYS_CALLTIP = {'index': 0,
'params': ['param '],
'name': u'keys'}
RADIX_CALLTIP = {'index': None,
'params': [],
'name': u'radix'}
ADD_CALLTIP = {'index': 0,
'params': [u'param a', u'param b'],
'name': u'add'}
if compat.PYTHON3:
THREAD_CALLTIP = {"name": "Thread",
"params": ["group=None",
"target=None",
"name=None",
"args=()",
"kwargs=None",
"daemon=None"],
"index": 0}
else:
THREAD_CALLTIP = {"name": "Thread",
"params": ["param group=None",
"param target=None",
"param name=None",
"param args=()",
"param kwargs=None",
"param verbose=None"],
"index": 0}
def test_should_not_fail_with_get_subscope_by_name(self):
# Bug #677 / jedi#628
source = (
u"my_lambda = lambda x: x+1\n"
u"my_lambda(1)"
)
filename = self.project_file("project.py", source)
offset = 37
sigs = self.backend.rpc_get_calltip(filename, source, offset)
sigs["index"]
class TestRPCGetUsages(RPCGetUsagesTests,
JediBackendTestCase):
def test_should_not_fail_for_missing_module(self):
# This causes use.module_path to be None
source = "import sys\n\nsys.path.\n" # insert()"
offset = 21
filename = self.project_file("project.py", source)
self.rpc(filename, source, offset)
class TestRPCGetNames(RPCGetNamesTests,
JediBackendTestCase):
pass
class TestPosToLinecol(unittest.TestCase):
def test_should_handle_beginning_of_string(self):
self.assertEqual(jedibackend.pos_to_linecol("foo", 0),
(1, 0))
def test_should_handle_end_of_line(self):
self.assertEqual(jedibackend.pos_to_linecol("foo\nbar\nbaz\nqux", 9),
(3, 1))
def test_should_handle_end_of_string(self):
self.assertEqual(jedibackend.pos_to_linecol("foo\nbar\nbaz\nqux", 14),
(4, 2))
class TestLinecolToPos(unittest.TestCase):
def test_should_handle_beginning_of_string(self):
self.assertEqual(jedibackend.linecol_to_pos("foo", 1, 0),
0)
def test_should_handle_end_of_string(self):
self.assertEqual(jedibackend.linecol_to_pos("foo\nbar\nbaz\nqux",
3, 1),
9)
def test_should_return_offset(self):
self.assertEqual(jedibackend.linecol_to_pos("foo\nbar\nbaz\nqux",
4, 2),
14)
def test_should_fail_for_line_past_text(self):
self.assertRaises(ValueError,
jedibackend.linecol_to_pos, "foo\n", 3, 1)
def test_should_fail_for_column_past_text(self):
self.assertRaises(ValueError,
jedibackend.linecol_to_pos, "foo\n", 1, 10)
class TestRunWithDebug(unittest.TestCase):
@mock.patch('jedi.Script')
def test_should_call_method(self, Script):
Script.return_value.test_method.return_value = "test-result"
result = jedibackend.run_with_debug(jedi, 'test_method', 1, 2, arg=3)
Script.assert_called_with(1, 2, arg=3)
self.assertEqual(result, 'test-result')
@mock.patch('jedi.Script')
def test_should_re_raise(self, Script):
Script.side_effect = RuntimeError
with self.assertRaises(RuntimeError):
jedibackend.run_with_debug(jedi, 'test_method', 1, 2, arg=3,
re_raise=(RuntimeError,))
@mock.patch('jedi.Script')
@mock.patch('jedi.set_debug_function')
def test_should_keep_debug_info(self, set_debug_function, Script):
Script.side_effect = RuntimeError
try:
jedibackend.run_with_debug(jedi, 'test_method', 1, 2, arg=3)
except rpc.Fault as e:
self.assertGreaterEqual(e.code, 400)
self.assertIsNotNone(e.data)
self.assertIn("traceback", e.data)
jedi_debug_info = e.data["jedi_debug_info"]
self.assertIsNotNone(jedi_debug_info)
self.assertEqual(jedi_debug_info["script_args"],
"1, 2, arg=3")
self.assertEqual(jedi_debug_info["source"], None)
self.assertEqual(jedi_debug_info["method"], "test_method")
self.assertEqual(jedi_debug_info["debug_info"], [])
else:
self.fail("Fault not thrown")
@mock.patch('jedi.Script')
@mock.patch('jedi.set_debug_function')
def test_should_keep_error_text(self, set_debug_function, Script):
Script.side_effect = RuntimeError
try:
jedibackend.run_with_debug(jedi, 'test_method', 1, 2, arg=3)
except rpc.Fault as e:
self.assertEqual(str(e), str(RuntimeError()))
self.assertEqual(e.message, str(RuntimeError()))
else:
self.fail("Fault not thrown")
@mock.patch('jedi.Script')
@mock.patch('jedi.set_debug_function')
def test_should_handle_source_special(self, set_debug_function, Script):
Script.side_effect = RuntimeError
try:
jedibackend.run_with_debug(jedi, 'test_method', source="foo")
except rpc.Fault as e:
self.assertEqual(e.data["jedi_debug_info"]["script_args"],
"source=source")
self.assertEqual(e.data["jedi_debug_info"]["source"], "foo")
else:
self.fail("Fault not thrown")
@mock.patch('jedi.Script')
@mock.patch('jedi.set_debug_function')
def test_should_set_debug_info(self, set_debug_function, Script):
the_debug_function = [None]
def my_set_debug_function(debug_function, **kwargs):
the_debug_function[0] = debug_function
def my_script(*args, **kwargs):
the_debug_function[0](jedi.debug.NOTICE, "Notice")
the_debug_function[0](jedi.debug.WARNING, "Warning")
the_debug_function[0]("other", "Other")
raise RuntimeError
set_debug_function.side_effect = my_set_debug_function
Script.return_value.test_method = my_script
try:
jedibackend.run_with_debug(jedi, 'test_method', source="foo")
except rpc.Fault as e:
self.assertEqual(e.data["jedi_debug_info"]["debug_info"],
["[N] Notice",
"[W] Warning",
"[?] Other"])
else:
self.fail("Fault not thrown")
@mock.patch('jedi.set_debug_function')
@mock.patch('jedi.Script')
def test_should_not_fail_with_bad_data(self, Script, set_debug_function):
import jedi.debug
def set_debug(function, speed=True):
if function is not None:
function(jedi.debug.NOTICE, u"\xab")
set_debug_function.side_effect = set_debug
Script.return_value.test_method.side_effect = Exception
with self.assertRaises(rpc.Fault):
jedibackend.run_with_debug(jedi, 'test_method', 1, 2, arg=3)

View File

@@ -0,0 +1,88 @@
import os
import unittest
import shutil
import sys
import tempfile
import mock
import elpy.pydocutils
class TestGetPydocCompletions(unittest.TestCase):
def test_should_return_top_level_modules(self):
modules = elpy.pydocutils.get_pydoc_completions("")
self.assertIn('sys', modules)
self.assertIn('json', modules)
def test_should_return_submodules(self):
modules = elpy.pydocutils.get_pydoc_completions("elpy")
self.assertIn("elpy.rpc", modules)
self.assertIn("elpy.server", modules)
modules = elpy.pydocutils.get_pydoc_completions("os")
self.assertIn("os.path", modules)
def test_should_find_objects_in_module(self):
self.assertIn("elpy.tests.test_pydocutils.TestGetPydocCompletions",
elpy.pydocutils.get_pydoc_completions
("elpy.tests.test_pydocutils"))
def test_should_find_attributes_of_objects(self):
attribs = elpy.pydocutils.get_pydoc_completions(
"elpy.tests.test_pydocutils.TestGetPydocCompletions")
self.assertIn("elpy.tests.test_pydocutils.TestGetPydocCompletions."
"test_should_find_attributes_of_objects",
attribs)
def test_should_return_none_for_inexisting_module(self):
self.assertEqual([],
elpy.pydocutils.get_pydoc_completions
("does_not_exist"))
def test_should_work_for_unicode_strings(self):
self.assertIsNotNone(elpy.pydocutils.get_pydoc_completions
(u"sys"))
def test_should_find_partial_completions(self):
self.assertIn("multiprocessing",
elpy.pydocutils.get_pydoc_completions
("multiprocess"))
self.assertIn("multiprocessing.util",
elpy.pydocutils.get_pydoc_completions
("multiprocessing.ut"))
def test_should_ignore_trailing_dot(self):
self.assertIn("elpy.pydocutils",
elpy.pydocutils.get_pydoc_completions
("elpy."))
class TestGetModules(unittest.TestCase):
def test_should_return_top_level_modules(self):
modules = elpy.pydocutils.get_modules()
self.assertIn('sys', modules)
self.assertIn('json', modules)
def test_should_return_submodules(self):
modules = elpy.pydocutils.get_modules("elpy")
self.assertIn("rpc", modules)
self.assertIn("server", modules)
@mock.patch.object(elpy.pydocutils, 'safeimport')
def test_should_catch_import_errors(self, safeimport):
def raise_function(message):
raise elpy.pydocutils.ErrorDuringImport(message,
(None, None, None))
safeimport.side_effect = raise_function
self.assertEqual([], elpy.pydocutils.get_modules("foo.bar"))
def test_should_not_fail_for_permission_denied(self):
tmpdir = tempfile.mkdtemp(prefix="test-elpy-get-modules-")
sys.path.append(tmpdir)
os.chmod(tmpdir, 0o000)
try:
elpy.pydocutils.get_modules()
finally:
os.chmod(tmpdir, 0o755)
shutil.rmtree(tmpdir)
sys.path.remove(tmpdir)

View File

@@ -0,0 +1,545 @@
import unittest
import tempfile
import shutil
import os
import mock
import sys
from elpy import refactor
from textwrap import dedent
class RefactorTestCase(unittest.TestCase):
def setUp(self):
self.project_root = tempfile.mkdtemp(prefix="test-refactor-root")
self.addCleanup(shutil.rmtree, self.project_root,
ignore_errors=True)
def create_file(self, name, contents=""):
filename = os.path.join(self.project_root, name)
contents = dedent(contents)
offset = contents.find("_|_")
if offset > -1:
contents = contents[:offset] + contents[offset + 3:]
with open(filename, "w") as f:
f.write(contents)
return filename, offset
def assertSourceEqual(self, first, second, msg=None):
"""Fail if the two objects are unequal, ignoring indentation."""
self.assertEqual(dedent(first), dedent(second), msg=msg)
class TestGetRefactorOptions(RefactorTestCase):
def test_should_only_return_importsmodule_if_not_on_symbol(self):
filename, offset = self.create_file("foo.py",
"""\
import foo
_|_""")
ref = refactor.Refactor(self.project_root, filename)
options = ref.get_refactor_options(offset)
self.assertTrue(all(opt['category'] in ('Imports',
'Module')
for opt in options))
filename, offset = self.create_file("foo.py",
"""\
_|_
import foo""")
ref = refactor.Refactor(self.project_root, filename)
options = ref.get_refactor_options(offset)
self.assertTrue(all(opt['category'] in ('Imports',
'Module')
for opt in options))
def test_should_return_all_if_on_symbol(self):
filename, offset = self.create_file("foo.py",
"import _|_foo")
ref = refactor.Refactor(self.project_root, filename)
options = ref.get_refactor_options(offset)
self.assertTrue(all(opt['category'] in ('Imports',
'Method',
'Module',
'Symbol')
for opt in options))
def test_should_return_only_region_if_endoffset(self):
filename, offset = self.create_file("foo.py",
"import foo")
ref = refactor.Refactor(self.project_root, filename)
options = ref.get_refactor_options(offset, 5)
self.assertTrue(all(opt['category'] == 'Region'
for opt in options))
@unittest.skipIf(not refactor.ROPE_AVAILABLE, "Requires Rope")
def test_should_treat_from_import_special(self):
filename, offset = self.create_file("foo.py",
"""\
import foo
_|_""")
ref = refactor.Refactor(self.project_root, filename)
options = ref.get_refactor_options(offset)
self.assertFalse(any(opt['name'] == "refactor_froms_to_imports"
for opt in options))
filename, offset = self.create_file("foo.py",
"imp_|_ort foo")
ref = refactor.Refactor(self.project_root, filename)
options = ref.get_refactor_options(offset)
self.assertTrue(any(opt['name'] == "refactor_froms_to_imports"
for opt in options))
class TestGetChanges(RefactorTestCase):
def test_should_fail_if_method_is_not_refactoring(self):
filename, offset = self.create_file("foo.py")
ref = refactor.Refactor(self.project_root, filename)
self.assertRaises(ValueError, ref.get_changes, "bad_name")
def test_should_return_method_results(self):
filename, offset = self.create_file("foo.py")
ref = refactor.Refactor(self.project_root, filename)
with mock.patch.object(ref, 'refactor_extract_method') as test:
test.return_value = "Meep!"
self.assertEqual(ref.get_changes("refactor_extract_method",
1, 2),
"Meep!")
test.assert_called_with(1, 2)
@unittest.skipIf(not refactor.ROPE_AVAILABLE, "Requires Rope")
class TestIsOnSymbol(RefactorTestCase):
def test_should_find_symbol(self):
filename, offset = self.create_file("test.py", "__B_|_AR = 100")
r = refactor.Refactor(self.project_root, filename)
self.assertTrue(r._is_on_symbol(offset))
# Issue #111
def test_should_find_symbol_with_underscores(self):
filename, offset = self.create_file("test.py", "_|___BAR = 100")
r = refactor.Refactor(self.project_root, filename)
self.assertTrue(r._is_on_symbol(offset))
def test_should_not_find_weird_places(self):
filename, offset = self.create_file("test.py", "hello = _|_ 1 + 1")
r = refactor.Refactor(self.project_root, filename)
self.assertFalse(r._is_on_symbol(offset))
@unittest.skipIf(not refactor.ROPE_AVAILABLE, "Requires Rope")
class TestFromsToImports(RefactorTestCase):
def test_should_refactor(self):
filename, offset = self.create_file(
"foo.py",
"""\
_|_from datetime import datetime
d = datetime(2013, 4, 7)
""")
ref = refactor.Refactor(self.project_root, filename)
(change,) = ref.get_changes("refactor_froms_to_imports", offset)
self.assertEqual(change['action'], 'change')
self.assertEqual(change['file'], filename)
self.assertSourceEqual(change['contents'],
"""\
import datetime
d = datetime.datetime(2013, 4, 7)
""")
@unittest.skipIf(not refactor.ROPE_AVAILABLE, "Requires Rope")
class TestOrganizeImports(RefactorTestCase):
def test_should_refactor(self):
filename, offset = self.create_file(
"foo.py",
"""\
import unittest, base64
import datetime, json
obj = json.dumps(23)
unittest.TestCase()
""")
ref = refactor.Refactor(self.project_root, filename)
(change,) = ref.get_changes("refactor_organize_imports")
self.assertEqual(change['action'], 'change')
self.assertEqual(change['file'], filename)
self.assertSourceEqual(change['contents'],
"""\
import json
import unittest
obj = json.dumps(23)
unittest.TestCase()
""")
@unittest.skipIf(not refactor.ROPE_AVAILABLE, "Requires Rope")
class TestModuleToPackage(RefactorTestCase):
def test_should_refactor(self):
filename, offset = self.create_file(
"foo.py",
"_|_import os\n")
ref = refactor.Refactor(self.project_root, filename)
changes = ref.refactor_module_to_package()
a, b, c = changes
# Not sure why the a change is there. It's a CHANGE that
# changes nothing...
self.assertEqual(a['diff'], '')
self.assertEqual(b['action'], 'create')
self.assertEqual(b['type'], 'directory')
self.assertEqual(b['path'], os.path.join(self.project_root, "foo"))
self.assertEqual(c['action'], 'move')
self.assertEqual(c['type'], 'file')
self.assertEqual(c['source'], os.path.join(self.project_root,
"foo.py"))
self.assertEqual(c['destination'], os.path.join(self.project_root,
"foo/__init__.py"))
@unittest.skipIf(not refactor.ROPE_AVAILABLE, "Requires Rope")
class TestRenameAtPoint(RefactorTestCase):
def test_should_refactor(self):
filename, offset = self.create_file(
"foo.py",
"""\
class Foo(object):
def _|_foo(self):
return 5
def bar(self):
return self.foo()
""")
file2, offset2 = self.create_file(
"bar.py",
"""\
import foo
x = foo.Foo()
x.foo()""")
ref = refactor.Refactor(self.project_root, filename)
first, second = ref.refactor_rename_at_point(offset, "frob",
in_hierarchy=False,
docs=False)
if first['file'] == filename:
a, b = first, second
else:
a, b = second, first
self.assertEqual(a['action'], 'change')
self.assertEqual(a['file'], filename)
self.assertSourceEqual(a['contents'],
"""\
class Foo(object):
def frob(self):
return 5
def bar(self):
return self.frob()
""")
self.assertEqual(b['action'], 'change')
self.assertEqual(b['file'], file2)
self.assertSourceEqual(b['contents'],
"""\
import foo
x = foo.Foo()
x.frob()""")
def test_should_refactor_in_hierarchy(self):
filename, offset = self.create_file(
"foo.py",
"""\
class Foo(object):
def _|_foo(self):
return 5
def bar(self):
return self.foo()
class Bar(Foo):
def foo(self):
return 42
class Baz(object):
def foo(self):
return 42
""")
file2, offset2 = self.create_file(
"bar.py",
"""\
import foo
x, y, z = foo.Foo(), foo.Bar(), foo.Baz()
x.foo()
y.foo()
z.foo()""")
ref = refactor.Refactor(self.project_root, filename)
first, second = ref.refactor_rename_at_point(offset, "frob",
in_hierarchy=True,
docs=False)
if first['file'] == filename:
a, b = first, second
else:
a, b = second, first
self.assertEqual(a['action'], 'change')
self.assertEqual(a['file'], filename)
self.assertSourceEqual(a['contents'],
"""\
class Foo(object):
def frob(self):
return 5
def bar(self):
return self.frob()
class Bar(Foo):
def frob(self):
return 42
class Baz(object):
def foo(self):
return 42
""")
self.assertEqual(b['action'], 'change')
self.assertEqual(b['file'], file2)
self.assertSourceEqual(b['contents'],
"""\
import foo
x, y, z = foo.Foo(), foo.Bar(), foo.Baz()
x.frob()
y.frob()
z.foo()""")
def test_should_refactor_in_docstrings(self):
filename, offset = self.create_file(
"foo.py",
"""\
class Foo(object):
"Frobnicate the foo"
def _|_foo(self):
return 5
print("I'm an unrelated foo")
""")
ref = refactor.Refactor(self.project_root, filename)
(change,) = ref.refactor_rename_at_point(offset, "frob",
in_hierarchy=False,
docs=True)
self.assertEqual(change['action'], 'change')
self.assertEqual(change['file'], filename)
self.assertSourceEqual(change['contents'],
"""\
class Foo(object):
"Frobnicate the frob"
def frob(self):
return 5
print("I'm an unrelated foo")
""")
@unittest.skipIf(not refactor.ROPE_AVAILABLE, "Requires Rope")
class TestRenameCurrentModule(RefactorTestCase):
def test_should_refactor(self):
filename, offset = self.create_file(
"foo.py",
"_|_import os\n")
file2, offset = self.create_file(
"bar.py",
"""\
_|_import foo
foo.os
""")
dest = os.path.join(self.project_root, "frob.py")
ref = refactor.Refactor(self.project_root, filename)
a, b = ref.refactor_rename_current_module("frob")
self.assertEqual(a['action'], 'change')
self.assertEqual(a['file'], file2)
self.assertEqual(a['contents'],
"import frob\n"
"frob.os\n")
self.assertEqual(b['action'], 'move')
self.assertEqual(b['type'], 'file')
self.assertEqual(b['source'], filename)
self.assertEqual(b['destination'], dest)
@unittest.skipIf(not refactor.ROPE_AVAILABLE, "Requires Rope")
class TestMoveModule(RefactorTestCase):
def test_should_refactor(self):
filename, offset = self.create_file(
"foo.py",
"_|_import os\n")
file2, offset = self.create_file(
"bar.py",
"""\
_|_import foo
foo.os
""")
dest = os.path.join(self.project_root, "frob")
os.mkdir(dest)
with open(os.path.join(dest, "__init__.py"), "w") as f:
f.write("")
ref = refactor.Refactor(self.project_root, filename)
a, b = ref.refactor_move_module(dest)
self.assertEqual(a['action'], 'change')
self.assertEqual(a['file'], file2)
self.assertSourceEqual(a['contents'],
"""\
import frob.foo
frob.foo.os
""")
self.assertEqual(b['action'], 'move')
self.assertEqual(b['type'], 'file')
self.assertEqual(b['source'], filename)
self.assertEqual(b['destination'],
os.path.join(dest, "foo.py"))
@unittest.skipIf(not refactor.ROPE_AVAILABLE, "Requires Rope")
class TestCreateInline(RefactorTestCase):
def setUp(self):
super(TestCreateInline, self).setUp()
self.filename, self.offset = self.create_file(
"foo.py",
"""\
def add(a, b):
return a + b
x = _|_add(2, 3)
y = add(17, 4)
""")
def test_should_refactor_single_occurrenc(self):
ref = refactor.Refactor(self.project_root, self.filename)
(change,) = ref.refactor_create_inline(self.offset, True)
self.assertEqual(change['action'], 'change')
self.assertEqual(change['file'], self.filename)
self.assertSourceEqual(change['contents'],
"""\
def add(a, b):
return a + b
x = 2 + 3
y = add(17, 4)
""")
def test_should_refactor_all_occurrencs(self):
ref = refactor.Refactor(self.project_root, self.filename)
(change,) = ref.refactor_create_inline(self.offset, False)
self.assertEqual(change['action'], 'change')
self.assertEqual(change['file'], self.filename)
self.assertSourceEqual(change['contents'],
"""\
x = 2 + 3
y = 17 + 4
""")
@unittest.skipIf(not refactor.ROPE_AVAILABLE, "Requires Rope")
class TestExtractMethod(RefactorTestCase):
def setUp(self):
super(TestExtractMethod, self).setUp()
self.filename, self.offset = self.create_file(
"foo.py",
"""\
class Foo(object):
def spaghetti(self, a, b):
_|_x = a + 5
y = b + 23
return y
""")
@unittest.skipIf(sys.version_info >= (3, 5), "Python 3.5 not supported")
def test_should_refactor_local(self):
ref = refactor.Refactor(self.project_root, self.filename)
(change,) = ref.refactor_extract_method(self.offset, 104,
"calc", False)
self.assertEqual(change['action'], 'change')
self.assertEqual(change['file'], self.filename)
expected = """\
class Foo(object):
def spaghetti(self, a, b):
return self.calc(a, b)
def calc(self, a, b):
x = a + 5
y = b + 23
return y
"""
expected2 = expected.replace("return self.calc(a, b)",
"return self.calc(b, a)")
expected2 = expected2.replace("def calc(self, a, b)",
"def calc(self, b, a)")
# This is silly, but it's what we got.
if change['contents'] == dedent(expected2):
self.assertSourceEqual(change['contents'], expected2)
else:
self.assertSourceEqual(change['contents'], expected)
@unittest.skipIf(sys.version_info >= (3, 5), "Python 3.5 not supported")
def test_should_refactor_global(self):
ref = refactor.Refactor(self.project_root, self.filename)
(change,) = ref.refactor_extract_method(self.offset, 104,
"calc", True)
self.assertEqual(change['action'], 'change')
self.assertEqual(change['file'], self.filename)
expected = """\
class Foo(object):
def spaghetti(self, a, b):
return calc(a, b)
def calc(a, b):
x = a + 5
y = b + 23
return y
"""
expected2 = expected.replace("return calc(a, b)",
"return calc(b, a)")
expected2 = expected2.replace("def calc(a, b)",
"def calc(b, a)")
if change['contents'] == dedent(expected2):
self.assertSourceEqual(change['contents'], expected2)
else:
self.assertSourceEqual(change['contents'], expected)
@unittest.skipIf(not refactor.ROPE_AVAILABLE, "Requires Rope")
class TestUseFunction(RefactorTestCase):
def test_should_refactor(self):
filename, offset = self.create_file(
"foo.py",
"""\
def _|_add_and_multiply(a, b, c):
temp = a + b
return temp * c
f = 1 + 2
g = f * 3
""")
ref = refactor.Refactor(self.project_root, filename)
(change,) = ref.refactor_use_function(offset)
self.assertEqual(change['action'], 'change')
self.assertEqual(change['file'], filename)
self.assertSourceEqual(change['contents'],
"""\
def add_and_multiply(a, b, c):
temp = a + b
return temp * c
g = add_and_multiply(1, 2, 3)
""")

View File

@@ -0,0 +1,205 @@
"""Tests for elpy.ropebackend."""
import os
import shutil
import sys
import tempfile
import mock
from elpy import ropebackend
from elpy import rpc
from elpy.tests import compat
from elpy.tests.support import BackendTestCase
from elpy.tests.support import RPCGetCompletionsTests
from elpy.tests.support import RPCGetCompletionDocstringTests
from elpy.tests.support import RPCGetCompletionLocationTests
from elpy.tests.support import RPCGetDefinitionTests
from elpy.tests.support import RPCGetCalltipTests
from elpy.tests.support import RPCGetDocstringTests
class RopeBackendTestCase(BackendTestCase):
def setUp(self):
super(RopeBackendTestCase, self).setUp()
self.backend = ropebackend.RopeBackend(self.project_root)
class ShouldCallValidateTest(object):
def test_should_call_validate(self):
with mock.patch.object(self.backend, 'validate') as validate:
self.rpc(None, "", 0)
self.assertTrue(validate.called)
class TestInit(RopeBackendTestCase):
def test_should_have_rope_as_name(self):
self.assertEqual(self.backend.name, "rope")
def test_should_patch_project_files(self):
self.project_file("foo.txt", "")
self.project_file("baddir/file.py", "")
self.backend.project.validate()
actual = [f.real_path for f in
self.backend.project.file_list.get_files()]
self.assertEqual([os.path.join(self.project_root, "foo.txt")],
actual)
def test_should_fail_for_inexisting_project_root(self):
with self.assertRaises(rpc.Fault):
ropebackend.RopeBackend("/does/not/exist/")
class TestValidate(RopeBackendTestCase):
def test_should_call_validate_after_timeout(self):
with mock.patch("time.time") as t:
t.return_value = 10
self.backend.validate()
with mock.patch.object(self.backend, 'project') as project:
t.return_value = 10 + ropebackend.VALIDATE_EVERY_SECONDS + 1
self.backend.validate()
self.assertTrue(project.validate.called)
def test_should_not_call_validate_before_timeout(self):
with mock.patch("time.time") as t:
t.return_value = 10
self.backend.validate()
with mock.patch.object(self.backend, 'project') as project:
t.return_value = 10 + ropebackend.VALIDATE_EVERY_SECONDS - 1
self.backend.validate()
self.assertFalse(project.validate.called)
def test_should_not_fail_if_root_vanishes(self):
# Bug #353
tmpdir = tempfile.mkdtemp(prefix="elpy-test-validate-")
try:
backend = ropebackend.RopeBackend(tmpdir)
finally:
shutil.rmtree(tmpdir)
backend.validate()
class TestRPCGetCompletions(RPCGetCompletionsTests,
RopeBackendTestCase):
BUILTINS = ["object", "oct", "open", "or", "ord"]
class TestRPCGetCompletionDocstring(RPCGetCompletionDocstringTests,
RopeBackendTestCase):
pass
class TestRPCGetCompletionLocation(RPCGetCompletionLocationTests,
RopeBackendTestCase):
pass
class TestRPCGetDefinition(RPCGetDefinitionTests,
ShouldCallValidateTest,
RopeBackendTestCase):
pass
class TestRPCGetCalltip(RPCGetCalltipTests,
ShouldCallValidateTest,
RopeBackendTestCase):
ADD_CALLTIP = 'Add.add(a, b)'
RADIX_CALLTIP = "Decimal.radix()"
if compat.PYTHON3:
THREAD_CALLTIP = (
"threading.Thread(group=None, target=None, "
"name=None, args=(), kwargs=None, daemon=None, *)"
)
KEYS_CALLTIP = "builtins.keys()"
else:
THREAD_CALLTIP = (
"threading.Thread(group=None, target=None, "
"name=None, args=(), kwargs=None, verbose=None)"
)
KEYS_CALLTIP = "__builtin__.keys()"
class TestRPCGetDocstring(RPCGetDocstringTests,
ShouldCallValidateTest,
RopeBackendTestCase):
if sys.version_info < (2, 7):
JSON_LOADS_DOCSTRING = (
'loads(s, encoding=None, cls=None, object_hook=None, '
'parse_float=None, parse_int=None, parse_constant=None, '
'**kw):'
)
else:
JSON_LOADS_DOCSTRING = (
'loads(s, encoding=None, cls=None, object_hook=None, '
'parse_float=None, parse_int=None, parse_constant=None, '
'object_pairs_hook=None, **kw):'
)
class TestGetPythonProjectFiles(RopeBackendTestCase):
def test(self):
self.project_file("foo.txt", "")
self.project_file("gooddir/__init__.py", "")
self.project_file("gooddir/file.py", "")
self.project_file("baddir/file.py", "")
expected = set(os.path.join(self.project_root, name)
for name in ["foo.txt", "gooddir/__init__.py",
"gooddir/file.py"])
project = self.backend.project
actual = set(resource.real_path
for resource
in ropebackend.get_python_project_files(project))
self.assertEqual(expected, actual)
class TestPatchProjectFiles(RopeBackendTestCase):
def test(self):
self.project_file("foo.txt", "")
self.project_file("gooddir/__init__.py", "")
self.project_file("gooddir/file.py", "")
self.project_file("baddir/file.py", "")
expected = set(os.path.join(self.project_root, name)
for name in ["foo.txt", "gooddir/__init__.py",
"gooddir/file.py"])
actual = set(resource.real_path
for resource
in self.backend.project.get_files())
self.assertEqual(expected, actual)
class TestCallRope(RopeBackendTestCase):
def test_should_return_value(self):
func = mock.MagicMock()
func.return_value = 23
actual = self.backend.call_rope(
func, "foo.py", "", 0
)
self.assertEqual(23, actual)
def test_should_raise_fault_with_data_on_exception(self):
func = mock.MagicMock()
func.side_effect = RuntimeError("Stuff!")
func.__module__ = "rope.test"
func.__name__ = "test_function"
try:
self.backend.call_rope(
func, "foo.py", "", 0
)
except rpc.Fault as e:
self.assertEqual(500, e.code)
self.assertEqual("Stuff!", e.message)
self.assertIn("traceback", e.data)
self.assertIn("rope_debug_info", e.data)
self.assertEqual("rope.test.test_function",
e.data["rope_debug_info"]["function_name"])

View File

@@ -0,0 +1,209 @@
"""Tests for elpy.rpc."""
import json
import unittest
import sys
from elpy import rpc
from elpy.tests.compat import StringIO
class TestFault(unittest.TestCase):
def test_should_have_code_and_data(self):
fault = rpc.Fault("Hello", code=250, data="Fnord")
self.assertEqual(str(fault), "Hello")
self.assertEqual(fault.code, 250)
self.assertEqual(fault.data, "Fnord")
def test_should_have_defaults_for_code_and_data(self):
fault = rpc.Fault("Hello")
self.assertEqual(str(fault), "Hello")
self.assertEqual(fault.code, 500)
self.assertIsNone(fault.data)
class TestJSONRPCServer(unittest.TestCase):
def setUp(self):
self.stdin = StringIO()
self.stdout = StringIO()
self.rpc = rpc.JSONRPCServer(self.stdin, self.stdout)
def write(self, s):
self.stdin.seek(0)
self.stdin.truncate()
self.stdout.seek(0)
self.stdout.truncate()
self.stdin.write(s)
self.stdin.seek(0)
def read(self):
value = self.stdout.getvalue()
self.stdin.seek(0)
self.stdin.truncate()
self.stdout.seek(0)
self.stdout.truncate()
return value
class TestInit(TestJSONRPCServer):
def test_should_use_arguments(self):
self.assertEqual(self.rpc.stdin, self.stdin)
self.assertEqual(self.rpc.stdout, self.stdout)
def test_should_default_to_sys(self):
testrpc = rpc.JSONRPCServer()
self.assertEqual(sys.stdin, testrpc.stdin)
self.assertEqual(sys.stdout, testrpc.stdout)
class TestReadJson(TestJSONRPCServer):
def test_should_read_json(self):
objlist = [{'foo': 'bar'},
{'baz': 'qux', 'fnord': 'argl\nbargl'},
"beep\r\nbeep\r\nbeep"]
self.write("".join([(json.dumps(obj) + "\n")
for obj in objlist]))
for obj in objlist:
self.assertEqual(self.rpc.read_json(),
obj)
def test_should_raise_eof_on_eof(self):
self.assertRaises(EOFError, self.rpc.read_json)
def test_should_fail_on_malformed_json(self):
self.write("malformed json\n")
self.assertRaises(ValueError,
self.rpc.read_json)
class TestWriteJson(TestJSONRPCServer):
def test_should_write_json_line(self):
objlist = [{'foo': 'bar'},
{'baz': 'qux', 'fnord': 'argl\nbargl'},
]
for obj in objlist:
self.rpc.write_json(**obj)
self.assertEqual(json.loads(self.read()),
obj)
class TestHandleRequest(TestJSONRPCServer):
def test_should_fail_if_json_does_not_contain_a_method(self):
self.write(json.dumps(dict(params=[],
id=23)))
self.assertRaises(ValueError,
self.rpc.handle_request)
def test_should_call_right_method(self):
self.write(json.dumps(dict(method='foo',
params=[1, 2, 3],
id=23)))
self.rpc.rpc_foo = lambda *params: params
self.rpc.handle_request()
self.assertEqual(json.loads(self.read()),
dict(id=23,
result=[1, 2, 3]))
def test_should_pass_defaults_for_missing_parameters(self):
def test_method(*params):
self.args = params
self.write(json.dumps(dict(method='foo')))
self.rpc.rpc_foo = test_method
self.rpc.handle_request()
self.assertEqual(self.args, ())
self.assertEqual(self.read(), "")
def test_should_return_error_for_missing_method(self):
self.write(json.dumps(dict(method='foo',
id=23)))
self.rpc.handle_request()
result = json.loads(self.read())
self.assertEqual(result["id"], 23)
self.assertEqual(result["error"]["message"],
"Unknown method foo")
def test_should_return_error_for_exception_in_method(self):
def test_method():
raise ValueError("An error was raised")
self.write(json.dumps(dict(method='foo',
id=23)))
self.rpc.rpc_foo = test_method
self.rpc.handle_request()
result = json.loads(self.read())
self.assertEqual(result["id"], 23)
self.assertEqual(result["error"]["message"], "An error was raised")
self.assertIn("traceback", result["error"]["data"])
def test_should_not_include_traceback_for_faults(self):
def test_method():
raise rpc.Fault("This is a fault")
self.write(json.dumps(dict(method="foo",
id=23)))
self.rpc.rpc_foo = test_method
self.rpc.handle_request()
result = json.loads(self.read())
self.assertEqual(result["id"], 23)
self.assertEqual(result["error"]["message"], "This is a fault")
self.assertNotIn("traceback", result["error"])
def test_should_add_data_for_faults(self):
def test_method():
raise rpc.Fault("St. Andreas' Fault",
code=12345, data="Yippieh")
self.write(json.dumps(dict(method="foo", id=23)))
self.rpc.rpc_foo = test_method
self.rpc.handle_request()
result = json.loads(self.read())
self.assertEqual(result["error"]["data"], "Yippieh")
def test_should_call_handle_for_unknown_method(self):
def test_handle(method_name, args):
return "It works"
self.write(json.dumps(dict(method="doesnotexist",
id=23)))
self.rpc.handle = test_handle
self.rpc.handle_request()
self.assertEqual(json.loads(self.read()),
dict(id=23,
result="It works"))
class TestServeForever(TestJSONRPCServer):
def handle_request(self):
self.hr_called += 1
if self.hr_called > 10:
raise self.error()
def setUp(self):
super(TestServeForever, self).setUp()
self.hr_called = 0
self.error = KeyboardInterrupt
self.rpc.handle_request = self.handle_request
def test_should_call_handle_request_repeatedly(self):
self.rpc.serve_forever()
self.assertEqual(self.hr_called, 11)
def test_should_return_on_some_errors(self):
self.error = KeyboardInterrupt
self.rpc.serve_forever()
self.error = EOFError
self.rpc.serve_forever()
self.error = SystemExit
self.rpc.serve_forever()
def test_should_fail_on_most_errors(self):
self.error = RuntimeError
self.assertRaises(RuntimeError,
self.rpc.serve_forever)

View File

@@ -0,0 +1,441 @@
# coding: utf-8
"""Tests for the elpy.server module"""
import os
import tempfile
import unittest
import mock
from elpy import rpc
from elpy import server
from elpy.tests import compat
from elpy.tests.support import BackendTestCase
import elpy.refactor
class ServerTestCase(unittest.TestCase):
def setUp(self):
self.srv = server.ElpyRPCServer()
class BackendCallTestCase(ServerTestCase):
def assert_calls_backend(self, method):
with mock.patch("elpy.server.get_source") as get_source:
with mock.patch.object(self.srv, "backend") as backend:
get_source.return_value = "transformed source"
getattr(self.srv, method)("filename", "source", "offset")
get_source.assert_called_with("source")
getattr(backend, method).assert_called_with(
"filename", "transformed source", "offset"
)
class TestInit(ServerTestCase):
def test_should_not_select_a_backend_by_default(self):
self.assertIsNone(self.srv.backend)
class TestRPCEcho(ServerTestCase):
def test_should_return_arguments(self):
self.assertEqual(("hello", "world"),
self.srv.rpc_echo("hello", "world"))
class TestRPCInit(ServerTestCase):
@mock.patch("elpy.jedibackend.JediBackend")
@mock.patch("elpy.ropebackend.RopeBackend")
def test_should_set_project_root(self, RopeBackend, JediBackend):
self.srv.rpc_init({"project_root": "/project/root",
"backend": "rope"})
self.assertEqual("/project/root", self.srv.project_root)
@mock.patch("elpy.jedibackend.JediBackend")
@mock.patch("elpy.ropebackend.RopeBackend")
def test_should_initialize_rope(self, RopeBackend, JediBackend):
self.srv.rpc_init({"project_root": "/project/root",
"backend": "rope"})
RopeBackend.assert_called_with("/project/root")
@mock.patch("elpy.jedibackend.JediBackend")
@mock.patch("elpy.ropebackend.RopeBackend")
def test_should_initialize_jedi(self, RopeBackend, JediBackend):
self.srv.rpc_init({"project_root": "/project/root",
"backend": "jedi"})
JediBackend.assert_called_with("/project/root")
@mock.patch("elpy.jedibackend.JediBackend")
@mock.patch("elpy.ropebackend.RopeBackend")
def test_should_use_rope_if_available_and_requested(
self, RopeBackend, JediBackend):
RopeBackend.return_value.name = "rope"
JediBackend.return_value.name = "jedi"
self.srv.rpc_init({"project_root": "/project/root",
"backend": "rope"})
self.assertEqual("rope", self.srv.backend.name)
@mock.patch("elpy.jedibackend.JediBackend")
@mock.patch("elpy.ropebackend.RopeBackend")
def test_should_use_jedi_if_available_and_requested(
self, RopeBackend, JediBackend):
RopeBackend.return_value.name = "rope"
JediBackend.return_value.name = "jedi"
self.srv.rpc_init({"project_root": "/project/root",
"backend": "jedi"})
self.assertEqual("jedi", self.srv.backend.name)
@mock.patch("elpy.jedibackend.JediBackend")
@mock.patch("elpy.ropebackend.RopeBackend")
def test_should_use_rope_if_available_and_nothing_requested(
self, RopeBackend, JediBackend):
RopeBackend.return_value.name = "rope"
JediBackend.return_value.name = "jedi"
self.srv.rpc_init({"project_root": "/project/root",
"backend": None})
self.assertEqual("rope", self.srv.backend.name)
@mock.patch("elpy.jedibackend.JediBackend")
@mock.patch("elpy.ropebackend.RopeBackend")
def test_should_use_jedi_if_rope_not_available_and_nothing_requested(
self, RopeBackend, JediBackend):
RopeBackend.return_value.name = "rope"
JediBackend.return_value.name = "jedi"
old_rope = server.ropebackend
server.ropebackend = None
try:
self.srv.rpc_init({"project_root": "/project/root",
"backend": None})
finally:
server.ropebackend = old_rope
self.assertEqual("jedi", self.srv.backend.name)
@mock.patch("elpy.jedibackend.JediBackend")
@mock.patch("elpy.ropebackend.RopeBackend")
def test_should_use_none_if_nothing_available(
self, RopeBackend, JediBackend):
RopeBackend.return_value.name = "rope"
JediBackend.return_value.name = "jedi"
old_rope = server.ropebackend
old_jedi = server.jedibackend
server.ropebackend = None
server.jedibackend = None
try:
self.srv.rpc_init({"project_root": "/project/root",
"backend": None})
finally:
server.ropebackend = old_rope
server.jedibackend = old_jedi
self.assertIsNone(self.srv.backend)
class TestRPCGetCalltip(BackendCallTestCase):
def test_should_call_backend(self):
self.assert_calls_backend("rpc_get_calltip")
def test_should_handle_no_backend(self):
self.srv.backend = None
self.assertIsNone(self.srv.rpc_get_calltip("filname", "source",
"offset"))
class TestRPCGetCompletions(BackendCallTestCase):
def test_should_call_backend(self):
self.assert_calls_backend("rpc_get_completions")
def test_should_handle_no_backend(self):
self.srv.backend = None
self.assertEqual([],
self.srv.rpc_get_completions("filname", "source",
"offset"))
def test_should_sort_results(self):
with mock.patch.object(self.srv, 'backend') as backend:
backend.rpc_get_completions.return_value = [
{'name': '_e'},
{'name': '__d'},
{'name': 'c'},
{'name': 'B'},
{'name': 'a'},
]
expected = list(reversed(backend.rpc_get_completions.return_value))
actual = self.srv.rpc_get_completions("filename", "source",
"offset")
self.assertEqual(expected, actual)
def test_should_uniquify_results(self):
with mock.patch.object(self.srv, 'backend') as backend:
backend.rpc_get_completions.return_value = [
{'name': 'a'},
{'name': 'a'},
]
expected = [{'name': 'a'}]
actual = self.srv.rpc_get_completions("filename", "source",
"offset")
self.assertEqual(expected, actual)
class TestRPCGetCompletionDocs(ServerTestCase):
def test_should_call_backend(self):
with mock.patch.object(self.srv, "backend") as backend:
self.srv.rpc_get_completion_docstring("completion")
(backend.rpc_get_completion_docstring
.assert_called_with("completion"))
def test_should_handle_no_backend(self):
self.srv.backend = None
self.assertIsNone(self.srv.rpc_get_completion_docstring("foo"))
class TestRPCGetCompletionLocation(ServerTestCase):
def test_should_call_backend(self):
with mock.patch.object(self.srv, "backend") as backend:
self.srv.rpc_get_completion_location("completion")
(backend.rpc_get_completion_location
.assert_called_with("completion"))
def test_should_handle_no_backend(self):
self.srv.backend = None
self.assertIsNone(self.srv.rpc_get_completion_location("foo"))
class TestRPCGetDefinition(BackendCallTestCase):
def test_should_call_backend(self):
self.assert_calls_backend("rpc_get_definition")
def test_should_handle_no_backend(self):
self.srv.backend = None
self.assertIsNone(self.srv.rpc_get_definition("filname", "source",
"offset"))
class TestRPCGetAssignment(BackendCallTestCase):
def test_should_call_backend(self):
self.assert_calls_backend("rpc_get_assignment")
def test_should_handle_no_backend(self):
self.srv.backend = None
self.assertIsNone(self.srv.rpc_get_assignment("filname", "source",
"offset"))
class TestRPCGetDocstring(BackendCallTestCase):
def test_should_call_backend(self):
self.assert_calls_backend("rpc_get_docstring")
def test_should_handle_no_backend(self):
self.srv.backend = None
self.assertIsNone(self.srv.rpc_get_docstring("filname", "source",
"offset"))
class TestRPCGetPydocCompletions(ServerTestCase):
@mock.patch.object(server, 'get_pydoc_completions')
def test_should_call_pydoc_completions(self, get_pydoc_completions):
srv = server.ElpyRPCServer()
srv.rpc_get_pydoc_completions()
get_pydoc_completions.assert_called_with(None)
srv.rpc_get_pydoc_completions("foo")
get_pydoc_completions.assert_called_with("foo")
class TestGetPydocDocumentation(ServerTestCase):
@mock.patch("pydoc.render_doc")
def test_should_find_documentation(self, render_doc):
render_doc.return_value = "expected"
actual = self.srv.rpc_get_pydoc_documentation("open")
render_doc.assert_called_with("open",
"Elpy Pydoc Documentation for %s",
False)
self.assertEqual("expected", actual)
def test_should_return_none_for_unknown_module(self):
actual = self.srv.rpc_get_pydoc_documentation("frob.open")
self.assertIsNone(actual)
def test_should_return_valid_unicode(self):
import json
docstring = self.srv.rpc_get_pydoc_documentation("tarfile")
json.dumps(docstring)
class TestRPCGetRefactorOptions(BackendTestCase):
@mock.patch.object(compat.builtins, '__import__')
def test_should_fail_if_rope_is_not_available(self, import_):
import_.side_effect = ImportError
filename = self.project_file("foo.py", "")
srv = server.ElpyRPCServer()
self.assertRaises(ImportError, srv.rpc_get_refactor_options,
filename, 0)
@mock.patch.object(elpy.refactor, 'Refactor')
def test_should_initialize_and_call_refactor_object(self, Refactor):
filename = self.project_file("foo.py", "import foo")
srv = server.ElpyRPCServer()
srv.project_root = self.project_root
srv.rpc_get_refactor_options(filename, 5)
Refactor.assert_called_with(self.project_root, filename)
Refactor.return_value.get_refactor_options.assert_called_with(5, None)
class TestRPCRefactor(BackendTestCase):
@mock.patch.object(compat.builtins, '__import__')
def test_should_fail_if_rope_is_not_available(self, import_):
import_.side_effect = ImportError
filename = self.project_file("foo.py", "")
srv = server.ElpyRPCServer()
self.assertRaises(ImportError, srv.rpc_refactor,
filename, 'foo', ())
@mock.patch.object(elpy.refactor, 'Refactor')
def test_should_initialize_and_call_refactor_object_with_args(
self, Refactor):
filename = self.project_file("foo.py", "import foo")
srv = server.ElpyRPCServer()
srv.project_root = self.project_root
srv.rpc_refactor(filename, 'foo', (1, 2, 3))
Refactor.assert_called_with(self.project_root, filename)
Refactor.return_value.get_changes.assert_called_with('foo', 1, 2, 3)
@mock.patch.object(elpy.refactor, 'Refactor')
def test_should_initialize_and_call_refactor_object_without_args(
self, Refactor):
filename = self.project_file("foo.py", "import foo")
srv = server.ElpyRPCServer()
srv.project_root = self.project_root
srv.rpc_refactor(filename, 'foo', None)
Refactor.assert_called_with(self.project_root, filename)
Refactor.return_value.get_changes.assert_called_with('foo')
class TestRPCGetUsages(BackendCallTestCase):
def test_should_call_backend(self):
self.assert_calls_backend("rpc_get_usages")
def test_should_handle_no_backend(self):
self.srv.backend = None
with self.assertRaises(rpc.Fault):
self.assertIsNone(self.srv.rpc_get_usages("filname", "source",
"offset"))
class TestRPCGetNames(BackendCallTestCase):
def test_should_call_backend(self):
self.assert_calls_backend("rpc_get_names")
def test_should_handle_no_backend(self):
self.srv.backend = None
with self.assertRaises(rpc.Fault):
self.assertIsNone(self.srv.rpc_get_names("filname", "source", 0))
class TestRPCImportMagic(ServerTestCase):
def test_should_call_importmagic(self):
with mock.patch.object(self.srv, "import_magic") as impmagic:
self.srv.rpc_get_import_symbols("filename", "source", "os")
impmagic.get_import_symbols.assert_called_with("os")
self.srv.rpc_add_import("filename", "source", "import os")
impmagic.add_import.assert_called_with("source", "import os")
self.srv.rpc_get_unresolved_symbols("filename", "source")
impmagic.get_unresolved_symbols.assert_called_with("source")
self.srv.rpc_remove_unreferenced_imports("filename", "source")
impmagic.remove_unreferenced_imports.assert_called_with("source")
class TestGetSource(unittest.TestCase):
def test_should_return_string_by_default(self):
self.assertEqual(server.get_source("foo"),
"foo")
def test_should_return_file_contents(self):
fd, filename = tempfile.mkstemp(prefix="elpy-test-")
self.addCleanup(os.remove, filename)
with open(filename, "w") as f:
f.write("file contents")
fileobj = {'filename': filename}
self.assertEqual(server.get_source(fileobj),
"file contents")
def test_should_clean_up_tempfile(self):
fd, filename = tempfile.mkstemp(prefix="elpy-test-")
with open(filename, "w") as f:
f.write("file contents")
fileobj = {'filename': filename,
'delete_after_use': True}
self.assertEqual(server.get_source(fileobj),
"file contents")
self.assertFalse(os.path.exists(filename))
def test_should_support_utf8(self):
fd, filename = tempfile.mkstemp(prefix="elpy-test-")
self.addCleanup(os.remove, filename)
with open(filename, "wb") as f:
f.write(u"möp".encode("utf-8"))
source = server.get_source({'filename': filename})
self.assertEqual(source, u"möp")
class TestPysymbolKey(BackendTestCase):
def keyLess(self, a, b):
self.assertLess(b, a)
self.assertLess(server._pysymbol_key(a),
server._pysymbol_key(b))
def test_should_be_case_insensitive(self):
self.keyLess("bar", "Foo")
def test_should_sort_private_symbols_after_public_symbols(self):
self.keyLess("foo", "_bar")
def test_should_sort_private_symbols_after_dunder_symbols(self):
self.assertLess(server._pysymbol_key("__foo__"),
server._pysymbol_key("_bar"))
def test_should_sort_dunder_symbols_after_public_symbols(self):
self.keyLess("bar", "__foo")
class Autopep8TestCase(ServerTestCase):
def test_rpc_fix_code_should_return_formatted_string(self):
code_block = 'x= 123\n'
new_block = self.srv.rpc_fix_code(code_block)
self.assertEqual(new_block, 'x = 123\n')

View File

@@ -0,0 +1,19 @@
"""Tests for elpy.tests.support. Yep, we test test code."""
import unittest
from elpy.tests.support import source_and_offset
class TestSourceAndOffset(unittest.TestCase):
def test_should_return_source_and_offset(self):
self.assertEqual(source_and_offset("hello, _|_world"),
("hello, world", 7))
def test_should_handle_beginning_of_string(self):
self.assertEqual(source_and_offset("_|_hello, world"),
("hello, world", 0))
def test_should_handle_end_of_string(self):
self.assertEqual(source_and_offset("hello, world_|_"),
("hello, world", 12))

View File

@@ -0,0 +1,32 @@
# coding: utf-8
"""Tests for the elpy.yapf module"""
import unittest
from elpy import yapfutil
from elpy.rpc import Fault
from elpy.tests.support import BackendTestCase
@unittest.skipIf(yapfutil.YAPF_NOT_SUPPORTED,
'yapf not supported for current python version')
class YAPFTestCase(BackendTestCase):
def setUp(self):
if yapfutil.YAPF_NOT_SUPPORTED:
raise unittest.SkipTest
def test_fix_code_should_throw_error_for_invalid_code(self):
src = 'x = '
self.assertRaises(Fault, yapfutil.fix_code, src)
def test_fix_code(self):
testdata = [
('x= 123\n', 'x = 123\n'),
('x=1; \ny=2 \n', 'x = 1\ny = 2\n'),
]
for src, expected in testdata:
self._assert_format(src, expected)
def _assert_format(self, src, expected):
new_block = yapfutil.fix_code(src)
self.assertEqual(new_block, expected)

View File

@@ -0,0 +1,38 @@
"""Glue for the "yapf" library.
"""
import os
import sys
from elpy.rpc import Fault
YAPF_NOT_SUPPORTED = sys.version_info < (2, 7) or (
sys.version_info >= (3, 0) and sys.version_info < (3, 4))
try:
if YAPF_NOT_SUPPORTED:
yapf_api = None
else:
from yapf.yapflib import yapf_api
from yapf.yapflib import file_resources
except ImportError: # pragma: no cover
yapf_api = None
def fix_code(code):
"""Formats Python code to conform to the PEP 8 style guide.
"""
if not yapf_api:
raise Fault('yapf not installed', code=400)
style_config = file_resources.GetDefaultStyleForDir(os.getcwd())
try:
reformatted_source, _ = yapf_api.FormatCode(code,
filename='<stdin>',
style_config=style_config,
verify=False)
return reformatted_source
except Exception as e:
raise Fault("Error during formatting: {}".format(e),
code=400)