173 lines
6.2 KiB
Python
173 lines
6.2 KiB
Python
"""
|
|
|
|
We acquire the python information by running an interrogation script via subprocess trigger. This operation is not
|
|
cheap, especially not on Windows. To not have to pay this hefty cost every time we apply multiple levels of
|
|
caching.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import random
|
|
import sys
|
|
from collections import OrderedDict
|
|
from pathlib import Path
|
|
from shlex import quote
|
|
from string import ascii_lowercase, ascii_uppercase, digits
|
|
from subprocess import Popen
|
|
|
|
from virtualenv.app_data import AppDataDisabled
|
|
from virtualenv.discovery.py_info import PythonInfo
|
|
from virtualenv.util.subprocess import subprocess
|
|
|
|
_CACHE = OrderedDict()
|
|
_CACHE[Path(sys.executable)] = PythonInfo()
|
|
|
|
|
|
def from_exe(cls, app_data, exe, env=None, raise_on_error=True, ignore_cache=False):
|
|
env = os.environ if env is None else env
|
|
result = _get_from_cache(cls, app_data, exe, env, ignore_cache=ignore_cache)
|
|
if isinstance(result, Exception):
|
|
if raise_on_error:
|
|
raise result
|
|
else:
|
|
logging.info("%s", result)
|
|
result = None
|
|
return result
|
|
|
|
|
|
def _get_from_cache(cls, app_data, exe, env, ignore_cache=True):
|
|
# note here we cannot resolve symlinks, as the symlink may trigger different prefix information if there's a
|
|
# pyenv.cfg somewhere alongside on python3.5+
|
|
exe_path = Path(exe)
|
|
if not ignore_cache and exe_path in _CACHE: # check in the in-memory cache
|
|
result = _CACHE[exe_path]
|
|
else: # otherwise go through the app data cache
|
|
py_info = _get_via_file_cache(cls, app_data, exe_path, exe, env)
|
|
result = _CACHE[exe_path] = py_info
|
|
# independent if it was from the file or in-memory cache fix the original executable location
|
|
if isinstance(result, PythonInfo):
|
|
result.executable = exe
|
|
return result
|
|
|
|
|
|
def _get_via_file_cache(cls, app_data, path, exe, env):
|
|
path_text = str(path)
|
|
try:
|
|
path_modified = path.stat().st_mtime
|
|
except OSError:
|
|
path_modified = -1
|
|
if app_data is None:
|
|
app_data = AppDataDisabled()
|
|
py_info, py_info_store = None, app_data.py_info(path)
|
|
with py_info_store.locked():
|
|
if py_info_store.exists(): # if exists and matches load
|
|
data = py_info_store.read()
|
|
of_path, of_st_mtime, of_content = data["path"], data["st_mtime"], data["content"]
|
|
if of_path == path_text and of_st_mtime == path_modified:
|
|
py_info = cls._from_dict({k: v for k, v in of_content.items()})
|
|
sys_exe = py_info.system_executable
|
|
if sys_exe is not None and not os.path.exists(sys_exe):
|
|
py_info_store.remove()
|
|
py_info = None
|
|
else:
|
|
py_info_store.remove()
|
|
if py_info is None: # if not loaded run and save
|
|
failure, py_info = _run_subprocess(cls, exe, app_data, env)
|
|
if failure is None:
|
|
data = {"st_mtime": path_modified, "path": path_text, "content": py_info._to_dict()}
|
|
py_info_store.write(data)
|
|
else:
|
|
py_info = failure
|
|
return py_info
|
|
|
|
|
|
COOKIE_LENGTH: int = 32
|
|
|
|
|
|
def gen_cookie():
|
|
return "".join(random.choice("".join((ascii_lowercase, ascii_uppercase, digits))) for _ in range(COOKIE_LENGTH))
|
|
|
|
|
|
def _run_subprocess(cls, exe, app_data, env):
|
|
py_info_script = Path(os.path.abspath(__file__)).parent / "py_info.py"
|
|
# Cookies allow to split the serialized stdout output generated by the script collecting the info from the output
|
|
# generated by something else. The right way to deal with it is to create an anonymous pipe and pass its descriptor
|
|
# to the child and output to it. But AFAIK all of them are either not cross-platform or too big to implement and are
|
|
# not in the stdlib. So the easiest and the shortest way I could mind is just using the cookies.
|
|
# We generate pseudorandom cookies because it easy to implement and avoids breakage from outputting modules source
|
|
# code, i.e. by debug output libraries. We reverse the cookies to avoid breakages resulting from variable values
|
|
# appearing in debug output.
|
|
|
|
start_cookie = gen_cookie()
|
|
end_cookie = gen_cookie()
|
|
with app_data.ensure_extracted(py_info_script) as py_info_script:
|
|
cmd = [exe, str(py_info_script), start_cookie, end_cookie]
|
|
# prevent sys.prefix from leaking into the child process - see https://bugs.python.org/issue22490
|
|
env = env.copy()
|
|
env.pop("__PYVENV_LAUNCHER__", None)
|
|
logging.debug("get interpreter info via cmd: %s", LogCmd(cmd))
|
|
try:
|
|
process = Popen(
|
|
cmd,
|
|
universal_newlines=True,
|
|
stdin=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
env=env,
|
|
)
|
|
out, err = process.communicate()
|
|
code = process.returncode
|
|
except OSError as os_error:
|
|
out, err, code = "", os_error.strerror, os_error.errno
|
|
result, failure = None, None
|
|
if code == 0:
|
|
out_starts = out.find(start_cookie[::-1])
|
|
|
|
if out_starts > -1:
|
|
pre_cookie = out[:out_starts]
|
|
|
|
if pre_cookie:
|
|
sys.stdout.write(pre_cookie)
|
|
|
|
out = out[out_starts + COOKIE_LENGTH :]
|
|
|
|
out_ends = out.find(end_cookie[::-1])
|
|
|
|
if out_ends > -1:
|
|
post_cookie = out[out_ends + COOKIE_LENGTH :]
|
|
|
|
if post_cookie:
|
|
sys.stdout.write(post_cookie)
|
|
|
|
out = out[:out_ends]
|
|
|
|
result = cls._from_json(out)
|
|
result.executable = exe # keep original executable as this may contain initialization code
|
|
else:
|
|
msg = f"{exe} with code {code}{f' out: {out!r}' if out else ''}{f' err: {err!r}' if err else ''}"
|
|
failure = RuntimeError(f"failed to query {msg}")
|
|
return failure, result
|
|
|
|
|
|
class LogCmd:
|
|
def __init__(self, cmd, env=None):
|
|
self.cmd = cmd
|
|
self.env = env
|
|
|
|
def __repr__(self):
|
|
cmd_repr = " ".join(quote(str(c)) for c in self.cmd)
|
|
if self.env is not None:
|
|
cmd_repr = f"{cmd_repr} env of {self.env!r}"
|
|
return cmd_repr
|
|
|
|
|
|
def clear(app_data):
|
|
app_data.py_info_clear()
|
|
_CACHE.clear()
|
|
|
|
|
|
___all___ = [
|
|
"from_exe",
|
|
"clear",
|
|
"LogCmd",
|
|
]
|