utils.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563
  1. # Copyright (C) 2012 Anaconda, Inc
  2. # SPDX-License-Identifier: BSD-3-Clause
  3. from __future__ import annotations
  4. import logging
  5. import re
  6. import sys
  7. from contextlib import contextmanager
  8. from functools import lru_cache, wraps
  9. from os import PathLike, environ
  10. from os.path import abspath, basename, dirname, isfile, join
  11. from pathlib import Path
  12. from shutil import which
  13. from typing import Literal
  14. from . import CondaError
  15. from .auxlib.compat import Utf8NamedTemporaryFile, shlex_split_unicode
  16. from .common.compat import isiterable, on_win
  17. from .common.path import win_path_to_unix
  18. from .common.url import path_to_url
  19. from .deprecations import deprecated
  20. from .gateways.disk.read import compute_sum
  21. log = logging.getLogger(__name__)
  22. def path_identity(path):
  23. """Used as a dummy path converter where no conversion necessary"""
  24. return path
  25. def unix_path_to_win(path, root_prefix=""):
  26. """Convert a path or :-separated string of paths into a Windows representation
  27. Does not add cygdrive. If you need that, set root_prefix to "/cygdrive"
  28. """
  29. if len(path) > 1 and (";" in path or (path[1] == ":" and path.count(":") == 1)):
  30. # already a windows path
  31. return path.replace("/", "\\")
  32. path_re = root_prefix + r'(/[a-zA-Z]/(?:(?![:\s]/)[^:*?"<>])*)'
  33. def _translation(found_path):
  34. group = found_path.group(0)
  35. return "{}:{}".format(
  36. group[len(root_prefix) + 1],
  37. group[len(root_prefix) + 2 :].replace("/", "\\"),
  38. )
  39. translation = re.sub(path_re, _translation, path)
  40. translation = re.sub(
  41. ":([a-zA-Z]):\\\\", lambda match: ";" + match.group(0)[1] + ":\\", translation
  42. )
  43. return translation
  44. # curry cygwin functions
  45. def win_path_to_cygwin(path):
  46. return win_path_to_unix(path, "/cygdrive")
  47. def cygwin_path_to_win(path):
  48. return unix_path_to_win(path, "/cygdrive")
  49. def translate_stream(stream, translator):
  50. return "\n".join(translator(line) for line in stream.split("\n"))
  51. def human_bytes(n):
  52. """
  53. Return the number of bytes n in more human readable form.
  54. Examples:
  55. >>> human_bytes(42)
  56. '42 B'
  57. >>> human_bytes(1042)
  58. '1 KB'
  59. >>> human_bytes(10004242)
  60. '9.5 MB'
  61. >>> human_bytes(100000004242)
  62. '93.13 GB'
  63. """
  64. if n < 1024:
  65. return "%d B" % n
  66. k = n / 1024
  67. if k < 1024:
  68. return "%d KB" % round(k)
  69. m = k / 1024
  70. if m < 1024:
  71. return "%.1f MB" % m
  72. g = m / 1024
  73. return "%.2f GB" % g
  74. # TODO: this should be done in a more extensible way
  75. # (like files for each shell, with some registration mechanism.)
  76. # defaults for unix shells. Note: missing "exe" entry, which should be set to
  77. # either an executable on PATH, or a full path to an executable for a shell
  78. unix_shell_base = dict(
  79. binpath="/bin/", # mind the trailing slash.
  80. echo="echo",
  81. env_script_suffix=".sh",
  82. nul="2>/dev/null",
  83. path_from=path_identity,
  84. path_to=path_identity,
  85. pathsep=":",
  86. printdefaultenv="echo $CONDA_DEFAULT_ENV",
  87. printpath="echo $PATH",
  88. printps1="echo $CONDA_PROMPT_MODIFIER",
  89. promptvar="PS1",
  90. sep="/",
  91. set_var="export ",
  92. shell_args=["-l", "-c"],
  93. shell_suffix="",
  94. slash_convert=("\\", "/"),
  95. source_setup="source",
  96. test_echo_extra="",
  97. var_format="${}",
  98. )
  99. msys2_shell_base = dict(
  100. unix_shell_base,
  101. path_from=unix_path_to_win,
  102. path_to=win_path_to_unix,
  103. binpath="/bin/", # mind the trailing slash.
  104. printpath="python -c \"import os; print(';'.join(os.environ['PATH'].split(';')[1:]))\" | cygpath --path -f -", # NOQA
  105. )
  106. if on_win:
  107. shells = {
  108. # "powershell.exe": dict(
  109. # echo="echo",
  110. # test_echo_extra=" .",
  111. # var_format="${var}",
  112. # binpath="/bin/", # mind the trailing slash.
  113. # source_setup="source",
  114. # nul='2>/dev/null',
  115. # set_var='export ',
  116. # shell_suffix=".ps",
  117. # env_script_suffix=".ps",
  118. # printps1='echo $PS1',
  119. # printdefaultenv='echo $CONDA_DEFAULT_ENV',
  120. # printpath="echo %PATH%",
  121. # exe="powershell.exe",
  122. # path_from=path_identity,
  123. # path_to=path_identity,
  124. # slash_convert = ("/", "\\"),
  125. # ),
  126. "cmd.exe": dict(
  127. echo="@echo",
  128. var_format="%{}%",
  129. binpath="\\Scripts\\", # mind the trailing slash.
  130. source_setup="call",
  131. test_echo_extra="",
  132. nul="1>NUL 2>&1",
  133. set_var="set ",
  134. shell_suffix=".bat",
  135. env_script_suffix=".bat",
  136. printps1="@echo %PROMPT%",
  137. promptvar="PROMPT",
  138. # parens mismatched intentionally. See http://stackoverflow.com/questions/20691060/how-do-i-echo-a-blank-empty-line-to-the-console-from-a-windows-batch-file # NOQA
  139. printdefaultenv='IF NOT "%CONDA_DEFAULT_ENV%" == "" (\n'
  140. "echo %CONDA_DEFAULT_ENV% ) ELSE (\n"
  141. "echo()",
  142. printpath="@echo %PATH%",
  143. exe="cmd.exe",
  144. shell_args=["/d", "/c"],
  145. path_from=path_identity,
  146. path_to=path_identity,
  147. slash_convert=("/", "\\"),
  148. sep="\\",
  149. pathsep=";",
  150. ),
  151. "cygwin": dict(
  152. unix_shell_base,
  153. exe="bash.exe",
  154. binpath="/Scripts/", # mind the trailing slash.
  155. path_from=cygwin_path_to_win,
  156. path_to=win_path_to_cygwin,
  157. ),
  158. # bash is whichever bash is on PATH. If using Cygwin, you should use the cygwin
  159. # entry instead. The only major difference is that it handle's cygwin's /cygdrive
  160. # filesystem root.
  161. "bash.exe": dict(
  162. msys2_shell_base,
  163. exe="bash.exe",
  164. ),
  165. "bash": dict(
  166. msys2_shell_base,
  167. exe="bash",
  168. ),
  169. "sh.exe": dict(
  170. msys2_shell_base,
  171. exe="sh.exe",
  172. ),
  173. "zsh.exe": dict(
  174. msys2_shell_base,
  175. exe="zsh.exe",
  176. ),
  177. "zsh": dict(
  178. msys2_shell_base,
  179. exe="zsh",
  180. ),
  181. }
  182. else:
  183. shells = {
  184. "bash": dict(
  185. unix_shell_base,
  186. exe="bash",
  187. ),
  188. "dash": dict(
  189. unix_shell_base,
  190. exe="dash",
  191. source_setup=".",
  192. ),
  193. "zsh": dict(
  194. unix_shell_base,
  195. exe="zsh",
  196. ),
  197. "fish": dict(
  198. unix_shell_base,
  199. exe="fish",
  200. pathsep=" ",
  201. ),
  202. }
  203. # ##########################################
  204. # put back because of conda build
  205. # ##########################################
  206. urlpath = url_path = path_to_url
  207. @deprecated(
  208. "23.9",
  209. "24.3",
  210. addendum='Use `conda.gateways.disk.read.compute_sum(path, "md5")` instead.',
  211. )
  212. def md5_file(path: str | PathLike) -> str:
  213. return compute_sum(path, "md5")
  214. @deprecated(
  215. "23.9", "24.3", addendum="Use `conda.gateways.disk.read.compute_sum` instead."
  216. )
  217. def hashsum_file(path: str | PathLike, mode: Literal["md5", "sha256"] = "md5") -> str:
  218. return compute_sum(path, mode)
  219. @lru_cache(maxsize=None)
  220. def sys_prefix_unfollowed():
  221. """Since conda is installed into non-root environments as a symlink only
  222. and because sys.prefix follows symlinks, this function can be used to
  223. get the 'unfollowed' sys.prefix.
  224. This value is usually the same as the prefix of the environment into
  225. which conda has been symlinked. An example of when this is necessary
  226. is when conda looks for external sub-commands in find_commands.py
  227. """
  228. try:
  229. frame = next(iter(sys._current_frames().values()))
  230. while frame.f_back:
  231. frame = frame.f_back
  232. code = frame.f_code
  233. filename = code.co_filename
  234. unfollowed = dirname(dirname(filename))
  235. except Exception:
  236. return sys.prefix
  237. return unfollowed
  238. def quote_for_shell(*arguments):
  239. """Properly quote arguments for command line passing.
  240. For POSIX uses `shlex.join`, for Windows uses a custom implementation to properly escape
  241. metacharacters.
  242. :param arguments: Arguments to quote.
  243. :type arguments: list of str
  244. :return: Quoted arguments.
  245. :rtype: str
  246. """
  247. # [backport] Support passing in a list of strings or args of string.
  248. if len(arguments) == 1 and isiterable(arguments[0]):
  249. arguments = arguments[0]
  250. return _args_join(arguments)
  251. if on_win:
  252. # https://ss64.com/nt/syntax-esc.html
  253. # https://docs.microsoft.com/en-us/archive/blogs/twistylittlepassagesallalike/everyone-quotes-command-line-arguments-the-wrong-way
  254. _RE_UNSAFE = re.compile(r'["%\s^<>&|]')
  255. _RE_DBL = re.compile(r'(["%])')
  256. def _args_join(args):
  257. """Return a shell-escaped string from *args*."""
  258. def quote(s):
  259. # derived from shlex.quote
  260. if not s:
  261. return '""'
  262. # if any unsafe chars are present we must quote
  263. if not _RE_UNSAFE.search(s):
  264. return s
  265. # double escape (" -> "")
  266. s = _RE_DBL.sub(r"\1\1", s)
  267. # quote entire string
  268. return f'"{s}"'
  269. return " ".join(quote(arg) for arg in args)
  270. else:
  271. try:
  272. from shlex import join as _args_join
  273. except ImportError:
  274. # [backport] Python <3.8
  275. def _args_join(args):
  276. """Return a shell-escaped string from *args*."""
  277. from shlex import quote
  278. return " ".join(quote(arg) for arg in args)
  279. # Ensures arguments are a tuple or a list. Strings are converted
  280. # by shlex_split_unicode() which is bad; we warn about it or else
  281. # we assert (and fix the code).
  282. def massage_arguments(arguments, errors="assert"):
  283. # For reference and in-case anything breaks ..
  284. # .. one of the places (run_command in conda_env/utils.py) this
  285. # gets called from used to do this too:
  286. #
  287. # def escape_for_winpath(p):
  288. # return p.replace('\\', '\\\\')
  289. #
  290. # if not isinstance(arguments, list):
  291. # arguments = list(map(escape_for_winpath, arguments))
  292. if isinstance(arguments, str):
  293. if errors == "assert":
  294. # This should be something like 'conda programming bug', it is an assert
  295. assert False, "Please ensure arguments are not strings"
  296. else:
  297. arguments = shlex_split_unicode(arguments)
  298. log.warning(
  299. "Please ensure arguments is not a string; "
  300. "used `shlex_split_unicode()` on it"
  301. )
  302. if not isiterable(arguments):
  303. arguments = (arguments,)
  304. assert not any(
  305. [isiterable(arg) for arg in arguments]
  306. ), "Individual arguments must not be iterable" # NOQA
  307. arguments = list(arguments)
  308. return arguments
  309. def wrap_subprocess_call(
  310. root_prefix,
  311. prefix,
  312. dev_mode,
  313. debug_wrapper_scripts,
  314. arguments,
  315. use_system_tmp_path=False,
  316. ):
  317. arguments = massage_arguments(arguments)
  318. if not use_system_tmp_path:
  319. tmp_prefix = abspath(join(prefix, ".tmp"))
  320. else:
  321. tmp_prefix = None
  322. script_caller = None
  323. multiline = False
  324. if len(arguments) == 1 and "\n" in arguments[0]:
  325. multiline = True
  326. if on_win:
  327. comspec = get_comspec() # fail early with KeyError if undefined
  328. if dev_mode:
  329. from conda import CONDA_PACKAGE_ROOT
  330. conda_bat = join(CONDA_PACKAGE_ROOT, "shell", "condabin", "conda.bat")
  331. else:
  332. conda_bat = environ.get(
  333. "CONDA_BAT", abspath(join(root_prefix, "condabin", "conda.bat"))
  334. )
  335. with Utf8NamedTemporaryFile(
  336. mode="w", prefix=tmp_prefix, suffix=".bat", delete=False
  337. ) as fh:
  338. silencer = "" if debug_wrapper_scripts else "@"
  339. fh.write(f"{silencer}ECHO OFF\n")
  340. fh.write(f"{silencer}SET PYTHONIOENCODING=utf-8\n")
  341. fh.write(f"{silencer}SET PYTHONUTF8=1\n")
  342. fh.write(
  343. f'{silencer}FOR /F "tokens=2 delims=:." %%A in (\'chcp\') do for %%B in (%%A) do set "_CONDA_OLD_CHCP=%%B"\n' # noqa
  344. )
  345. fh.write(f"{silencer}chcp 65001 > NUL\n")
  346. if dev_mode:
  347. from . import CONDA_SOURCE_ROOT
  348. fh.write(f"{silencer}SET CONDA_DEV=1\n")
  349. # In dev mode, conda is really:
  350. # 'python -m conda'
  351. # *with* PYTHONPATH set.
  352. fh.write(f"{silencer}SET PYTHONPATH={CONDA_SOURCE_ROOT}\n")
  353. fh.write(f"{silencer}SET CONDA_EXE={sys.executable}\n")
  354. fh.write(f"{silencer}SET _CE_M=-m\n")
  355. fh.write(f"{silencer}SET _CE_CONDA=conda\n")
  356. if debug_wrapper_scripts:
  357. fh.write("echo *** environment before *** 1>&2\n")
  358. fh.write("SET 1>&2\n")
  359. # Not sure there is any point in backing this up, nothing will get called with it reset
  360. # after all!
  361. # fh.write("@FOR /F \"tokens=100\" %%F IN ('chcp') DO @SET CONDA_OLD_CHCP=%%F\n")
  362. # fh.write('@chcp 65001>NUL\n')
  363. fh.write(f'{silencer}CALL "{conda_bat}" activate "{prefix}"\n')
  364. fh.write(f"{silencer}IF %ERRORLEVEL% NEQ 0 EXIT /b %ERRORLEVEL%\n")
  365. if debug_wrapper_scripts:
  366. fh.write("echo *** environment after *** 1>&2\n")
  367. fh.write("SET 1>&2\n")
  368. if multiline:
  369. # No point silencing the first line. If that's what's wanted then
  370. # it needs doing for each line and the caller may as well do that.
  371. fh.write(f"{arguments[0]}\n")
  372. else:
  373. assert not any("\n" in arg for arg in arguments), (
  374. "Support for scripts where arguments contain newlines not implemented.\n"
  375. ".. requires writing the script to an external file and knowing how to "
  376. "transform the command-line (e.g. `python -c args` => `python file`) "
  377. "in a tool dependent way, or attempting something like:\n"
  378. ".. https://stackoverflow.com/a/15032476 (adds unacceptable escaping"
  379. "requirements)"
  380. )
  381. fh.write(f"{silencer}{quote_for_shell(*arguments)}\n")
  382. fh.write(f"{silencer}IF %ERRORLEVEL% NEQ 0 EXIT /b %ERRORLEVEL%\n")
  383. fh.write(f"{silencer}chcp %_CONDA_OLD_CHCP%>NUL\n")
  384. script_caller = fh.name
  385. command_args = [comspec, "/d", "/c", script_caller]
  386. else:
  387. shell_path = which("bash") or which("sh")
  388. if shell_path is None:
  389. raise Exception("No compatible shell found!")
  390. # During tests, we sometimes like to have a temp env with e.g. an old python in it
  391. # and have it run tests against the very latest development sources. For that to
  392. # work we need extra smarts here, we want it to be instead:
  393. if dev_mode:
  394. conda_exe = [abspath(join(root_prefix, "bin", "python")), "-m", "conda"]
  395. dev_arg = "--dev"
  396. dev_args = [dev_arg]
  397. else:
  398. conda_exe = [
  399. environ.get("CONDA_EXE", abspath(join(root_prefix, "bin", "conda")))
  400. ]
  401. dev_arg = ""
  402. dev_args = []
  403. with Utf8NamedTemporaryFile(mode="w", prefix=tmp_prefix, delete=False) as fh:
  404. if dev_mode:
  405. from . import CONDA_SOURCE_ROOT
  406. fh.write(">&2 export PYTHONPATH=" + CONDA_SOURCE_ROOT + "\n")
  407. hook_quoted = quote_for_shell(*conda_exe, "shell.posix", "hook", *dev_args)
  408. if debug_wrapper_scripts:
  409. fh.write(">&2 echo '*** environment before ***'\n" ">&2 env\n")
  410. fh.write(f'>&2 echo "$({hook_quoted})"\n')
  411. fh.write(f'eval "$({hook_quoted})"\n')
  412. fh.write(f"conda activate {dev_arg} {quote_for_shell(prefix)}\n")
  413. if debug_wrapper_scripts:
  414. fh.write(">&2 echo '*** environment after ***'\n" ">&2 env\n")
  415. if multiline:
  416. # The ' '.join() is pointless since mutliline is only True when there's 1 arg
  417. # still, if that were to change this would prevent breakage.
  418. fh.write("{}\n".format(" ".join(arguments)))
  419. else:
  420. fh.write(f"{quote_for_shell(*arguments)}\n")
  421. script_caller = fh.name
  422. if debug_wrapper_scripts:
  423. command_args = [shell_path, "-x", script_caller]
  424. else:
  425. command_args = [shell_path, script_caller]
  426. return script_caller, command_args
  427. def get_comspec():
  428. """Returns COMSPEC from envvars.
  429. Ensures COMSPEC envvar is set to cmd.exe, if not attempt to find it.
  430. :raises KeyError: COMSPEC is undefined and cannot be found.
  431. :returns: COMSPEC value.
  432. :rtype: str
  433. """
  434. if basename(environ.get("COMSPEC", "")).lower() != "cmd.exe":
  435. for comspec in (
  436. # %SystemRoot%\System32\cmd.exe
  437. environ.get("SystemRoot")
  438. and join(environ["SystemRoot"], "System32", "cmd.exe"),
  439. # %windir%\System32\cmd.exe
  440. environ.get("windir") and join(environ["windir"], "System32", "cmd.exe"),
  441. ):
  442. if comspec and isfile(comspec):
  443. environ["COMSPEC"] = comspec
  444. break
  445. else:
  446. log.warn(
  447. "cmd.exe could not be found. Looked in SystemRoot and windir env vars.\n"
  448. )
  449. # fails with KeyError if still undefined
  450. return environ["COMSPEC"]
  451. def ensure_dir_exists(func):
  452. """
  453. Ensures that the directory exists for functions returning
  454. a Path object containing a directory
  455. """
  456. @wraps(func)
  457. def wrapper(*args, **kwargs):
  458. result = func(*args, **kwargs)
  459. if isinstance(result, Path):
  460. try:
  461. result.mkdir(parents=True, exist_ok=True)
  462. except OSError as exc:
  463. raise CondaError(
  464. "Error encountered while attempting to create cache directory."
  465. f"\n Directory: {result}"
  466. f"\n Exception: {exc}"
  467. )
  468. return result
  469. return wrapper
  470. @deprecated("23.9", "24.3", addendum="Use `open` instead.")
  471. @contextmanager
  472. def safe_open(*args, **kwargs):
  473. """
  474. Allows us to open files while catching any exceptions
  475. and raise them as CondaErrors instead.
  476. We do this to provide a more informative/actionable error output.
  477. """
  478. try:
  479. fp = open(*args, **kwargs)
  480. yield fp
  481. except OSError as exc:
  482. raise CondaError(
  483. "Error encountered while reading or writing from cache."
  484. f"\n File: {args[0]}"
  485. f"\n Exception: {exc}"
  486. )
  487. fp.close()