123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- # Copyright (C) 2012 Anaconda, Inc
- # SPDX-License-Identifier: BSD-3-Clause
- # this module contains miscellaneous stuff which eventually could be moved
- # into other places
- import os
- import re
- import shutil
- import sys
- from collections import defaultdict
- from os.path import abspath, dirname, exists, isdir, isfile, join, relpath
- from .base.context import context
- from .common.compat import on_win, open
- from .common.path import expand
- from .common.url import is_url, join_url, path_to_url
- from .core.index import get_index
- from .core.link import PrefixSetup, UnlinkLinkTransaction
- from .core.package_cache_data import PackageCacheData, ProgressiveFetchExtract
- from .core.prefix_data import PrefixData
- from .exceptions import (
- CondaExitZero,
- DisallowedPackageError,
- DryRunExit,
- PackagesNotFoundError,
- ParseError,
- )
- from .gateways.disk.delete import rm_rf
- from .gateways.disk.link import islink, readlink, symlink
- from .models.match_spec import MatchSpec
- from .models.prefix_graph import PrefixGraph
- from .plan import _get_best_prec_match
- def conda_installed_files(prefix, exclude_self_build=False):
- """
- Return the set of files which have been installed (using conda) into
- a given prefix.
- """
- res = set()
- for meta in PrefixData(prefix).iter_records():
- if exclude_self_build and "file_hash" in meta:
- continue
- res.update(set(meta.get("files", ())))
- return res
- url_pat = re.compile(
- r"(?:(?P<url_p>.+)(?:[/\\]))?"
- r"(?P<fn>[^/\\#]+(?:\.tar\.bz2|\.conda))"
- r"(:?#(?P<md5>[0-9a-f]{32}))?$"
- )
- def explicit(
- specs, prefix, verbose=False, force_extract=True, index_args=None, index=None
- ):
- actions = defaultdict(list)
- actions["PREFIX"] = prefix
- fetch_specs = []
- for spec in specs:
- if spec == "@EXPLICIT":
- continue
- if not is_url(spec):
- """
- # This does not work because url_to_path does not enforce Windows
- # backslashes. Should it? Seems like a dangerous change to make but
- # it would be cleaner.
- expanded = expand(spec)
- urled = path_to_url(expanded)
- pathed = url_to_path(urled)
- assert pathed == expanded
- """
- spec = path_to_url(expand(spec))
- # parse URL
- m = url_pat.match(spec)
- if m is None:
- raise ParseError("Could not parse explicit URL: %s" % spec)
- url_p, fn, md5sum = m.group("url_p"), m.group("fn"), m.group("md5")
- url = join_url(url_p, fn)
- # url_p is everything but the tarball_basename and the md5sum
- fetch_specs.append(MatchSpec(url, md5=md5sum) if md5sum else MatchSpec(url))
- if context.dry_run:
- raise DryRunExit()
- pfe = ProgressiveFetchExtract(fetch_specs)
- pfe.execute()
- if context.download_only:
- raise CondaExitZero(
- "Package caches prepared. "
- "UnlinkLinkTransaction cancelled with --download-only option."
- )
- # now make an UnlinkLinkTransaction with the PackageCacheRecords as inputs
- # need to add package name to fetch_specs so that history parsing keeps track of them correctly
- specs_pcrecs = tuple(
- [spec, next(PackageCacheData.query_all(spec), None)] for spec in fetch_specs
- )
- # Assert that every spec has a PackageCacheRecord
- specs_with_missing_pcrecs = [
- str(spec) for spec, pcrec in specs_pcrecs if pcrec is None
- ]
- if specs_with_missing_pcrecs:
- if len(specs_with_missing_pcrecs) == len(specs_pcrecs):
- raise AssertionError("No package cache records found")
- else:
- missing_precs_list = ", ".join(specs_with_missing_pcrecs)
- raise AssertionError(
- f"Missing package cache records for: {missing_precs_list}"
- )
- precs_to_remove = []
- prefix_data = PrefixData(prefix)
- for q, (spec, pcrec) in enumerate(specs_pcrecs):
- new_spec = MatchSpec(spec, name=pcrec.name)
- specs_pcrecs[q][0] = new_spec
- prec = prefix_data.get(pcrec.name, None)
- if prec:
- # If we've already got matching specifications, then don't bother re-linking it
- if next(prefix_data.query(new_spec), None):
- specs_pcrecs[q][0] = None
- else:
- precs_to_remove.append(prec)
- stp = PrefixSetup(
- prefix,
- precs_to_remove,
- tuple(sp[1] for sp in specs_pcrecs if sp[0]),
- (),
- tuple(sp[0] for sp in specs_pcrecs if sp[0]),
- (),
- )
- txn = UnlinkLinkTransaction(stp)
- txn.execute()
- def rel_path(prefix, path, windows_forward_slashes=True):
- res = path[len(prefix) + 1 :]
- if on_win and windows_forward_slashes:
- res = res.replace("\\", "/")
- return res
- def walk_prefix(prefix, ignore_predefined_files=True, windows_forward_slashes=True):
- """Return the set of all files in a given prefix directory."""
- res = set()
- prefix = abspath(prefix)
- ignore = {
- "pkgs",
- "envs",
- "conda-bld",
- "conda-meta",
- ".conda_lock",
- "users",
- "LICENSE.txt",
- "info",
- "conda-recipes",
- ".index",
- ".unionfs",
- ".nonadmin",
- }
- binignore = {"conda", "activate", "deactivate"}
- if sys.platform == "darwin":
- ignore.update({"python.app", "Launcher.app"})
- for fn in (entry.name for entry in os.scandir(prefix)):
- if ignore_predefined_files and fn in ignore:
- continue
- if isfile(join(prefix, fn)):
- res.add(fn)
- continue
- for root, dirs, files in os.walk(join(prefix, fn)):
- should_ignore = ignore_predefined_files and root == join(prefix, "bin")
- for fn2 in files:
- if should_ignore and fn2 in binignore:
- continue
- res.add(relpath(join(root, fn2), prefix))
- for dn in dirs:
- path = join(root, dn)
- if islink(path):
- res.add(relpath(path, prefix))
- if on_win and windows_forward_slashes:
- return {path.replace("\\", "/") for path in res}
- else:
- return res
- def untracked(prefix, exclude_self_build=False):
- """Return (the set) of all untracked files for a given prefix."""
- conda_files = conda_installed_files(prefix, exclude_self_build)
- return {
- path
- for path in walk_prefix(prefix) - conda_files
- if not (
- path.endswith("~")
- or sys.platform == "darwin"
- and path.endswith(".DS_Store")
- or path.endswith(".pyc")
- and path[:-1] in conda_files
- )
- }
- def touch_nonadmin(prefix):
- """Creates $PREFIX/.nonadmin if sys.prefix/.nonadmin exists (on Windows)."""
- if on_win and exists(join(context.root_prefix, ".nonadmin")):
- if not isdir(prefix):
- os.makedirs(prefix)
- with open(join(prefix, ".nonadmin"), "w") as fo:
- fo.write("")
- def clone_env(prefix1, prefix2, verbose=True, quiet=False, index_args=None):
- """Clone existing prefix1 into new prefix2."""
- untracked_files = untracked(prefix1)
- # Discard conda, conda-env and any package that depends on them
- filter = {}
- found = True
- while found:
- found = False
- for prec in PrefixData(prefix1).iter_records():
- name = prec["name"]
- if name in filter:
- continue
- if name == "conda":
- filter["conda"] = prec
- found = True
- break
- if name == "conda-env":
- filter["conda-env"] = prec
- found = True
- break
- for dep in prec.combined_depends:
- if MatchSpec(dep).name in filter:
- filter[name] = prec
- found = True
- if filter:
- if not quiet:
- fh = sys.stderr if context.json else sys.stdout
- print(
- "The following packages cannot be cloned out of the root environment:",
- file=fh,
- )
- for prec in filter.values():
- print(" - " + prec.dist_str(), file=fh)
- drecs = {
- prec
- for prec in PrefixData(prefix1).iter_records()
- if prec["name"] not in filter
- }
- else:
- drecs = {prec for prec in PrefixData(prefix1).iter_records()}
- # Resolve URLs for packages that do not have URLs
- index = {}
- unknowns = [prec for prec in drecs if not prec.get("url")]
- notfound = []
- if unknowns:
- index_args = index_args or {}
- index = get_index(**index_args)
- for prec in unknowns:
- spec = MatchSpec(name=prec.name, version=prec.version, build=prec.build)
- precs = tuple(prec for prec in index.values() if spec.match(prec))
- if not precs:
- notfound.append(spec)
- elif len(precs) > 1:
- drecs.remove(prec)
- drecs.add(_get_best_prec_match(precs))
- else:
- drecs.remove(prec)
- drecs.add(precs[0])
- if notfound:
- raise PackagesNotFoundError(notfound)
- # Assemble the URL and channel list
- urls = {}
- for prec in drecs:
- urls[prec] = prec["url"]
- precs = tuple(PrefixGraph(urls).graph)
- urls = [urls[prec] for prec in precs]
- disallowed = tuple(MatchSpec(s) for s in context.disallowed_packages)
- for prec in precs:
- if any(d.match(prec) for d in disallowed):
- raise DisallowedPackageError(prec)
- if verbose:
- print("Packages: %d" % len(precs))
- print("Files: %d" % len(untracked_files))
- if context.dry_run:
- raise DryRunExit()
- for f in untracked_files:
- src = join(prefix1, f)
- dst = join(prefix2, f)
- dst_dir = dirname(dst)
- if islink(dst_dir) or isfile(dst_dir):
- rm_rf(dst_dir)
- if not isdir(dst_dir):
- os.makedirs(dst_dir)
- if islink(src):
- symlink(readlink(src), dst)
- continue
- try:
- with open(src, "rb") as fi:
- data = fi.read()
- except OSError:
- continue
- try:
- s = data.decode("utf-8")
- s = s.replace(prefix1, prefix2)
- data = s.encode("utf-8")
- except UnicodeDecodeError: # data is binary
- pass
- with open(dst, "wb") as fo:
- fo.write(data)
- shutil.copystat(src, dst)
- actions = explicit(
- urls,
- prefix2,
- verbose=not quiet,
- index=index,
- force_extract=False,
- index_args=index_args,
- )
- return actions, untracked_files
|