123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- # Copyright (C) 2012 Anaconda, Inc
- # SPDX-License-Identifier: BSD-3-Clause
- from logging import getLogger
- from os.path import isfile, join
- from .core.link import UnlinkLinkTransaction
- from .core.package_cache_data import ProgressiveFetchExtract
- from .deprecations import deprecated
- from .exceptions import CondaFileIOError
- from .gateways.disk.link import islink
- log = getLogger(__name__)
- # op codes
- CHECK_FETCH = "CHECK_FETCH"
- FETCH = "FETCH"
- CHECK_EXTRACT = "CHECK_EXTRACT"
- EXTRACT = "EXTRACT"
- RM_EXTRACTED = "RM_EXTRACTED"
- RM_FETCHED = "RM_FETCHED"
- PREFIX = "PREFIX"
- PRINT = "PRINT"
- PROGRESS = "PROGRESS"
- SYMLINK_CONDA = "SYMLINK_CONDA"
- UNLINK = "UNLINK"
- LINK = "LINK"
- UNLINKLINKTRANSACTION = "UNLINKLINKTRANSACTION"
- PROGRESSIVEFETCHEXTRACT = "PROGRESSIVEFETCHEXTRACT"
- PROGRESS_COMMANDS = {EXTRACT, RM_EXTRACTED}
- ACTION_CODES = (
- CHECK_FETCH,
- FETCH,
- CHECK_EXTRACT,
- EXTRACT,
- UNLINK,
- LINK,
- SYMLINK_CONDA,
- RM_EXTRACTED,
- RM_FETCHED,
- )
- @deprecated("23.9", "24.3", addendum="Unused.")
- def PREFIX_CMD(state, prefix):
- state["prefix"] = prefix
- def PRINT_CMD(state, arg): # pragma: no cover
- if arg.startswith(("Unlinking packages", "Linking packages")):
- return
- getLogger("conda.stdout.verbose").info(arg)
- def FETCH_CMD(state, package_cache_entry):
- raise NotImplementedError()
- def EXTRACT_CMD(state, arg):
- raise NotImplementedError()
- def PROGRESSIVEFETCHEXTRACT_CMD(state, progressive_fetch_extract): # pragma: no cover
- assert isinstance(progressive_fetch_extract, ProgressiveFetchExtract)
- progressive_fetch_extract.execute()
- def UNLINKLINKTRANSACTION_CMD(state, arg): # pragma: no cover
- unlink_link_transaction = arg
- assert isinstance(unlink_link_transaction, UnlinkLinkTransaction)
- unlink_link_transaction.execute()
- def check_files_in_package(source_dir, files):
- for f in files:
- source_file = join(source_dir, f)
- if isfile(source_file) or islink(source_file):
- return True
- else:
- raise CondaFileIOError(source_file, "File %s does not exist in tarball" % f)
- # Map instruction to command (a python function)
- commands = {
- PREFIX: PREFIX_CMD,
- PRINT: PRINT_CMD,
- FETCH: FETCH_CMD,
- PROGRESS: lambda x, y: None,
- EXTRACT: EXTRACT_CMD,
- RM_EXTRACTED: lambda x, y: None,
- RM_FETCHED: lambda x, y: None,
- UNLINK: None,
- LINK: None,
- SYMLINK_CONDA: lambda x, y: None,
- UNLINKLINKTRANSACTION: UNLINKLINKTRANSACTION_CMD,
- PROGRESSIVEFETCHEXTRACT: PROGRESSIVEFETCHEXTRACT_CMD,
- }
- OP_ORDER = (
- RM_FETCHED,
- FETCH,
- RM_EXTRACTED,
- EXTRACT,
- UNLINK,
- LINK,
- )
|