123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793 |
- """zipimport provides support for importing Python modules from Zip archives.
- This module exports three objects:
- - zipimporter: a class; its constructor takes a path to a Zip archive.
- - ZipImportError: exception raised by zipimporter objects. It's a
- subclass of ImportError, so it can be caught as ImportError, too.
- - _zip_directory_cache: a dict, mapping archive paths to zip directory
- info dicts, as used in zipimporter._files.
- It is usually not needed to use the zipimport module explicitly; it is
- used by the builtin import mechanism for sys.path items that are paths
- to Zip archives.
- """
- #from importlib import _bootstrap_external
- #from importlib import _bootstrap # for _verbose_message
- import _frozen_importlib_external as _bootstrap_external
- from _frozen_importlib_external import _unpack_uint16, _unpack_uint32
- import _frozen_importlib as _bootstrap # for _verbose_message
- import _imp # for check_hash_based_pycs
- import _io # for open
- import marshal # for loads
- import sys # for modules
- import time # for mktime
- __all__ = ['ZipImportError', 'zipimporter']
- path_sep = _bootstrap_external.path_sep
- alt_path_sep = _bootstrap_external.path_separators[1:]
- class ZipImportError(ImportError):
- pass
- # _read_directory() cache
- _zip_directory_cache = {}
- _module_type = type(sys)
- END_CENTRAL_DIR_SIZE = 22
- STRING_END_ARCHIVE = b'PK\x05\x06'
- MAX_COMMENT_LEN = (1 << 16) - 1
- class zipimporter:
- """zipimporter(archivepath) -> zipimporter object
- Create a new zipimporter instance. 'archivepath' must be a path to
- a zipfile, or to a specific path inside a zipfile. For example, it can be
- '/tmp/myimport.zip', or '/tmp/myimport.zip/mydirectory', if mydirectory is a
- valid directory inside the archive.
- 'ZipImportError is raised if 'archivepath' doesn't point to a valid Zip
- archive.
- The 'archive' attribute of zipimporter objects contains the name of the
- zipfile targeted.
- """
- # Split the "subdirectory" from the Zip archive path, lookup a matching
- # entry in sys.path_importer_cache, fetch the file directory from there
- # if found, or else read it from the archive.
- def __init__(self, path):
- if not isinstance(path, str):
- import os
- path = os.fsdecode(path)
- if not path:
- raise ZipImportError('archive path is empty', path=path)
- if alt_path_sep:
- path = path.replace(alt_path_sep, path_sep)
- prefix = []
- while True:
- try:
- st = _bootstrap_external._path_stat(path)
- except (OSError, ValueError):
- # On Windows a ValueError is raised for too long paths.
- # Back up one path element.
- dirname, basename = _bootstrap_external._path_split(path)
- if dirname == path:
- raise ZipImportError('not a Zip file', path=path)
- path = dirname
- prefix.append(basename)
- else:
- # it exists
- if (st.st_mode & 0o170000) != 0o100000: # stat.S_ISREG
- # it's a not file
- raise ZipImportError('not a Zip file', path=path)
- break
- try:
- files = _zip_directory_cache[path]
- except KeyError:
- files = _read_directory(path)
- _zip_directory_cache[path] = files
- self._files = files
- self.archive = path
- # a prefix directory following the ZIP file path.
- self.prefix = _bootstrap_external._path_join(*prefix[::-1])
- if self.prefix:
- self.prefix += path_sep
- # Check whether we can satisfy the import of the module named by
- # 'fullname', or whether it could be a portion of a namespace
- # package. Return self if we can load it, a string containing the
- # full path if it's a possible namespace portion, None if we
- # can't load it.
- def find_loader(self, fullname, path=None):
- """find_loader(fullname, path=None) -> self, str or None.
- Search for a module specified by 'fullname'. 'fullname' must be the
- fully qualified (dotted) module name. It returns the zipimporter
- instance itself if the module was found, a string containing the
- full path name if it's possibly a portion of a namespace package,
- or None otherwise. The optional 'path' argument is ignored -- it's
- there for compatibility with the importer protocol.
- """
- mi = _get_module_info(self, fullname)
- if mi is not None:
- # This is a module or package.
- return self, []
- # Not a module or regular package. See if this is a directory, and
- # therefore possibly a portion of a namespace package.
- # We're only interested in the last path component of fullname
- # earlier components are recorded in self.prefix.
- modpath = _get_module_path(self, fullname)
- if _is_dir(self, modpath):
- # This is possibly a portion of a namespace
- # package. Return the string representing its path,
- # without a trailing separator.
- return None, [f'{self.archive}{path_sep}{modpath}']
- return None, []
- # Check whether we can satisfy the import of the module named by
- # 'fullname'. Return self if we can, None if we can't.
- def find_module(self, fullname, path=None):
- """find_module(fullname, path=None) -> self or None.
- Search for a module specified by 'fullname'. 'fullname' must be the
- fully qualified (dotted) module name. It returns the zipimporter
- instance itself if the module was found, or None if it wasn't.
- The optional 'path' argument is ignored -- it's there for compatibility
- with the importer protocol.
- """
- return self.find_loader(fullname, path)[0]
- def get_code(self, fullname):
- """get_code(fullname) -> code object.
- Return the code object for the specified module. Raise ZipImportError
- if the module couldn't be found.
- """
- code, ispackage, modpath = _get_module_code(self, fullname)
- return code
- def get_data(self, pathname):
- """get_data(pathname) -> string with file data.
- Return the data associated with 'pathname'. Raise OSError if
- the file wasn't found.
- """
- if alt_path_sep:
- pathname = pathname.replace(alt_path_sep, path_sep)
- key = pathname
- if pathname.startswith(self.archive + path_sep):
- key = pathname[len(self.archive + path_sep):]
- try:
- toc_entry = self._files[key]
- except KeyError:
- raise OSError(0, '', key)
- return _get_data(self.archive, toc_entry)
- # Return a string matching __file__ for the named module
- def get_filename(self, fullname):
- """get_filename(fullname) -> filename string.
- Return the filename for the specified module.
- """
- # Deciding the filename requires working out where the code
- # would come from if the module was actually loaded
- code, ispackage, modpath = _get_module_code(self, fullname)
- return modpath
- def get_source(self, fullname):
- """get_source(fullname) -> source string.
- Return the source code for the specified module. Raise ZipImportError
- if the module couldn't be found, return None if the archive does
- contain the module, but has no source for it.
- """
- mi = _get_module_info(self, fullname)
- if mi is None:
- raise ZipImportError(f"can't find module {fullname!r}", name=fullname)
- path = _get_module_path(self, fullname)
- if mi:
- fullpath = _bootstrap_external._path_join(path, '__init__.py')
- else:
- fullpath = f'{path}.py'
- try:
- toc_entry = self._files[fullpath]
- except KeyError:
- # we have the module, but no source
- return None
- return _get_data(self.archive, toc_entry).decode()
- # Return a bool signifying whether the module is a package or not.
- def is_package(self, fullname):
- """is_package(fullname) -> bool.
- Return True if the module specified by fullname is a package.
- Raise ZipImportError if the module couldn't be found.
- """
- mi = _get_module_info(self, fullname)
- if mi is None:
- raise ZipImportError(f"can't find module {fullname!r}", name=fullname)
- return mi
- # Load and return the module named by 'fullname'.
- def load_module(self, fullname):
- """load_module(fullname) -> module.
- Load the module specified by 'fullname'. 'fullname' must be the
- fully qualified (dotted) module name. It returns the imported
- module, or raises ZipImportError if it wasn't found.
- """
- code, ispackage, modpath = _get_module_code(self, fullname)
- mod = sys.modules.get(fullname)
- if mod is None or not isinstance(mod, _module_type):
- mod = _module_type(fullname)
- sys.modules[fullname] = mod
- mod.__loader__ = self
- try:
- if ispackage:
- # add __path__ to the module *before* the code gets
- # executed
- path = _get_module_path(self, fullname)
- fullpath = _bootstrap_external._path_join(self.archive, path)
- mod.__path__ = [fullpath]
- if not hasattr(mod, '__builtins__'):
- mod.__builtins__ = __builtins__
- _bootstrap_external._fix_up_module(mod.__dict__, fullname, modpath)
- exec(code, mod.__dict__)
- except:
- del sys.modules[fullname]
- raise
- try:
- mod = sys.modules[fullname]
- except KeyError:
- raise ImportError(f'Loaded module {fullname!r} not found in sys.modules')
- _bootstrap._verbose_message('import {} # loaded from Zip {}', fullname, modpath)
- return mod
- def get_resource_reader(self, fullname):
- """Return the ResourceReader for a package in a zip file.
- If 'fullname' is a package within the zip file, return the
- 'ResourceReader' object for the package. Otherwise return None.
- """
- try:
- if not self.is_package(fullname):
- return None
- except ZipImportError:
- return None
- if not _ZipImportResourceReader._registered:
- from importlib.abc import ResourceReader
- ResourceReader.register(_ZipImportResourceReader)
- _ZipImportResourceReader._registered = True
- return _ZipImportResourceReader(self, fullname)
- def __repr__(self):
- return f'<zipimporter object "{self.archive}{path_sep}{self.prefix}">'
- # _zip_searchorder defines how we search for a module in the Zip
- # archive: we first search for a package __init__, then for
- # non-package .pyc, and .py entries. The .pyc entries
- # are swapped by initzipimport() if we run in optimized mode. Also,
- # '/' is replaced by path_sep there.
- _zip_searchorder = (
- (path_sep + '__init__.pyc', True, True),
- (path_sep + '__init__.py', False, True),
- ('.pyc', True, False),
- ('.py', False, False),
- )
- # Given a module name, return the potential file path in the
- # archive (without extension).
- def _get_module_path(self, fullname):
- return self.prefix + fullname.rpartition('.')[2]
- # Does this path represent a directory?
- def _is_dir(self, path):
- # See if this is a "directory". If so, it's eligible to be part
- # of a namespace package. We test by seeing if the name, with an
- # appended path separator, exists.
- dirpath = path + path_sep
- # If dirpath is present in self._files, we have a directory.
- return dirpath in self._files
- # Return some information about a module.
- def _get_module_info(self, fullname):
- path = _get_module_path(self, fullname)
- for suffix, isbytecode, ispackage in _zip_searchorder:
- fullpath = path + suffix
- if fullpath in self._files:
- return ispackage
- return None
- # implementation
- # _read_directory(archive) -> files dict (new reference)
- #
- # Given a path to a Zip archive, build a dict, mapping file names
- # (local to the archive, using SEP as a separator) to toc entries.
- #
- # A toc_entry is a tuple:
- #
- # (__file__, # value to use for __file__, available for all files,
- # # encoded to the filesystem encoding
- # compress, # compression kind; 0 for uncompressed
- # data_size, # size of compressed data on disk
- # file_size, # size of decompressed data
- # file_offset, # offset of file header from start of archive
- # time, # mod time of file (in dos format)
- # date, # mod data of file (in dos format)
- # crc, # crc checksum of the data
- # )
- #
- # Directories can be recognized by the trailing path_sep in the name,
- # data_size and file_offset are 0.
- def _read_directory(archive):
- try:
- fp = _io.open_code(archive)
- except OSError:
- raise ZipImportError(f"can't open Zip file: {archive!r}", path=archive)
- with fp:
- try:
- fp.seek(-END_CENTRAL_DIR_SIZE, 2)
- header_position = fp.tell()
- buffer = fp.read(END_CENTRAL_DIR_SIZE)
- except OSError:
- raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
- if len(buffer) != END_CENTRAL_DIR_SIZE:
- raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
- if buffer[:4] != STRING_END_ARCHIVE:
- # Bad: End of Central Dir signature
- # Check if there's a comment.
- try:
- fp.seek(0, 2)
- file_size = fp.tell()
- except OSError:
- raise ZipImportError(f"can't read Zip file: {archive!r}",
- path=archive)
- max_comment_start = max(file_size - MAX_COMMENT_LEN -
- END_CENTRAL_DIR_SIZE, 0)
- try:
- fp.seek(max_comment_start)
- data = fp.read()
- except OSError:
- raise ZipImportError(f"can't read Zip file: {archive!r}",
- path=archive)
- pos = data.rfind(STRING_END_ARCHIVE)
- if pos < 0:
- raise ZipImportError(f'not a Zip file: {archive!r}',
- path=archive)
- buffer = data[pos:pos+END_CENTRAL_DIR_SIZE]
- if len(buffer) != END_CENTRAL_DIR_SIZE:
- raise ZipImportError(f"corrupt Zip file: {archive!r}",
- path=archive)
- header_position = file_size - len(data) + pos
- header_size = _unpack_uint32(buffer[12:16])
- header_offset = _unpack_uint32(buffer[16:20])
- if header_position < header_size:
- raise ZipImportError(f'bad central directory size: {archive!r}', path=archive)
- if header_position < header_offset:
- raise ZipImportError(f'bad central directory offset: {archive!r}', path=archive)
- header_position -= header_size
- arc_offset = header_position - header_offset
- if arc_offset < 0:
- raise ZipImportError(f'bad central directory size or offset: {archive!r}', path=archive)
- files = {}
- # Start of Central Directory
- count = 0
- try:
- fp.seek(header_position)
- except OSError:
- raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
- while True:
- buffer = fp.read(46)
- if len(buffer) < 4:
- raise EOFError('EOF read where not expected')
- # Start of file header
- if buffer[:4] != b'PK\x01\x02':
- break # Bad: Central Dir File Header
- if len(buffer) != 46:
- raise EOFError('EOF read where not expected')
- flags = _unpack_uint16(buffer[8:10])
- compress = _unpack_uint16(buffer[10:12])
- time = _unpack_uint16(buffer[12:14])
- date = _unpack_uint16(buffer[14:16])
- crc = _unpack_uint32(buffer[16:20])
- data_size = _unpack_uint32(buffer[20:24])
- file_size = _unpack_uint32(buffer[24:28])
- name_size = _unpack_uint16(buffer[28:30])
- extra_size = _unpack_uint16(buffer[30:32])
- comment_size = _unpack_uint16(buffer[32:34])
- file_offset = _unpack_uint32(buffer[42:46])
- header_size = name_size + extra_size + comment_size
- if file_offset > header_offset:
- raise ZipImportError(f'bad local header offset: {archive!r}', path=archive)
- file_offset += arc_offset
- try:
- name = fp.read(name_size)
- except OSError:
- raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
- if len(name) != name_size:
- raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
- # On Windows, calling fseek to skip over the fields we don't use is
- # slower than reading the data because fseek flushes stdio's
- # internal buffers. See issue #8745.
- try:
- if len(fp.read(header_size - name_size)) != header_size - name_size:
- raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
- except OSError:
- raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
- if flags & 0x800:
- # UTF-8 file names extension
- name = name.decode()
- else:
- # Historical ZIP filename encoding
- try:
- name = name.decode('ascii')
- except UnicodeDecodeError:
- name = name.decode('latin1').translate(cp437_table)
- name = name.replace('/', path_sep)
- path = _bootstrap_external._path_join(archive, name)
- t = (path, compress, data_size, file_size, file_offset, time, date, crc)
- files[name] = t
- _bootstrap._verbose_message('zipimport: name : {}', name)
- count += 1
- _bootstrap._verbose_message('zipimport: found {} names in {!r}', count, archive)
- return files
- # During bootstrap, we may need to load the encodings
- # package from a ZIP file. But the cp437 encoding is implemented
- # in Python in the encodings package.
- #
- # Break out of this dependency by using the translation table for
- # the cp437 encoding.
- cp437_table = (
- # ASCII part, 8 rows x 16 chars
- '\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f'
- '\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f'
- ' !"#$%&\'()*+,-./'
- '0123456789:;<=>?'
- '@ABCDEFGHIJKLMNO'
- 'PQRSTUVWXYZ[\\]^_'
- '`abcdefghijklmno'
- 'pqrstuvwxyz{|}~\x7f'
- # non-ASCII part, 16 rows x 8 chars
- '\xc7\xfc\xe9\xe2\xe4\xe0\xe5\xe7'
- '\xea\xeb\xe8\xef\xee\xec\xc4\xc5'
- '\xc9\xe6\xc6\xf4\xf6\xf2\xfb\xf9'
- '\xff\xd6\xdc\xa2\xa3\xa5\u20a7\u0192'
- '\xe1\xed\xf3\xfa\xf1\xd1\xaa\xba'
- '\xbf\u2310\xac\xbd\xbc\xa1\xab\xbb'
- '\u2591\u2592\u2593\u2502\u2524\u2561\u2562\u2556'
- '\u2555\u2563\u2551\u2557\u255d\u255c\u255b\u2510'
- '\u2514\u2534\u252c\u251c\u2500\u253c\u255e\u255f'
- '\u255a\u2554\u2569\u2566\u2560\u2550\u256c\u2567'
- '\u2568\u2564\u2565\u2559\u2558\u2552\u2553\u256b'
- '\u256a\u2518\u250c\u2588\u2584\u258c\u2590\u2580'
- '\u03b1\xdf\u0393\u03c0\u03a3\u03c3\xb5\u03c4'
- '\u03a6\u0398\u03a9\u03b4\u221e\u03c6\u03b5\u2229'
- '\u2261\xb1\u2265\u2264\u2320\u2321\xf7\u2248'
- '\xb0\u2219\xb7\u221a\u207f\xb2\u25a0\xa0'
- )
- _importing_zlib = False
- # Return the zlib.decompress function object, or NULL if zlib couldn't
- # be imported. The function is cached when found, so subsequent calls
- # don't import zlib again.
- def _get_decompress_func():
- global _importing_zlib
- if _importing_zlib:
- # Someone has a zlib.py[co] in their Zip file
- # let's avoid a stack overflow.
- _bootstrap._verbose_message('zipimport: zlib UNAVAILABLE')
- raise ZipImportError("can't decompress data; zlib not available")
- _importing_zlib = True
- try:
- from zlib import decompress
- except Exception:
- _bootstrap._verbose_message('zipimport: zlib UNAVAILABLE')
- raise ZipImportError("can't decompress data; zlib not available")
- finally:
- _importing_zlib = False
- _bootstrap._verbose_message('zipimport: zlib available')
- return decompress
- # Given a path to a Zip file and a toc_entry, return the (uncompressed) data.
- def _get_data(archive, toc_entry):
- datapath, compress, data_size, file_size, file_offset, time, date, crc = toc_entry
- if data_size < 0:
- raise ZipImportError('negative data size')
- with _io.open_code(archive) as fp:
- # Check to make sure the local file header is correct
- try:
- fp.seek(file_offset)
- except OSError:
- raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
- buffer = fp.read(30)
- if len(buffer) != 30:
- raise EOFError('EOF read where not expected')
- if buffer[:4] != b'PK\x03\x04':
- # Bad: Local File Header
- raise ZipImportError(f'bad local file header: {archive!r}', path=archive)
- name_size = _unpack_uint16(buffer[26:28])
- extra_size = _unpack_uint16(buffer[28:30])
- header_size = 30 + name_size + extra_size
- file_offset += header_size # Start of file data
- try:
- fp.seek(file_offset)
- except OSError:
- raise ZipImportError(f"can't read Zip file: {archive!r}", path=archive)
- raw_data = fp.read(data_size)
- if len(raw_data) != data_size:
- raise OSError("zipimport: can't read data")
- if compress == 0:
- # data is not compressed
- return raw_data
- # Decompress with zlib
- try:
- decompress = _get_decompress_func()
- except Exception:
- raise ZipImportError("can't decompress data; zlib not available")
- return decompress(raw_data, -15)
- # Lenient date/time comparison function. The precision of the mtime
- # in the archive is lower than the mtime stored in a .pyc: we
- # must allow a difference of at most one second.
- def _eq_mtime(t1, t2):
- # dostime only stores even seconds, so be lenient
- return abs(t1 - t2) <= 1
- # Given the contents of a .py[co] file, unmarshal the data
- # and return the code object. Return None if it the magic word doesn't
- # match, or if the recorded .py[co] metadata does not match the source,
- # (we do this instead of raising an exception as we fall back
- # to .py if available and we don't want to mask other errors).
- def _unmarshal_code(self, pathname, fullpath, fullname, data):
- exc_details = {
- 'name': fullname,
- 'path': fullpath,
- }
- try:
- flags = _bootstrap_external._classify_pyc(data, fullname, exc_details)
- except ImportError:
- return None
- hash_based = flags & 0b1 != 0
- if hash_based:
- check_source = flags & 0b10 != 0
- if (_imp.check_hash_based_pycs != 'never' and
- (check_source or _imp.check_hash_based_pycs == 'always')):
- source_bytes = _get_pyc_source(self, fullpath)
- if source_bytes is not None:
- source_hash = _imp.source_hash(
- _bootstrap_external._RAW_MAGIC_NUMBER,
- source_bytes,
- )
- try:
- _bootstrap_external._validate_hash_pyc(
- data, source_hash, fullname, exc_details)
- except ImportError:
- return None
- else:
- source_mtime, source_size = \
- _get_mtime_and_size_of_source(self, fullpath)
- if source_mtime:
- # We don't use _bootstrap_external._validate_timestamp_pyc
- # to allow for a more lenient timestamp check.
- if (not _eq_mtime(_unpack_uint32(data[8:12]), source_mtime) or
- _unpack_uint32(data[12:16]) != source_size):
- _bootstrap._verbose_message(
- f'bytecode is stale for {fullname!r}')
- return None
- code = marshal.loads(data[16:])
- if not isinstance(code, _code_type):
- raise TypeError(f'compiled module {pathname!r} is not a code object')
- return code
- _code_type = type(_unmarshal_code.__code__)
- # Replace any occurrences of '\r\n?' in the input string with '\n'.
- # This converts DOS and Mac line endings to Unix line endings.
- def _normalize_line_endings(source):
- source = source.replace(b'\r\n', b'\n')
- source = source.replace(b'\r', b'\n')
- return source
- # Given a string buffer containing Python source code, compile it
- # and return a code object.
- def _compile_source(pathname, source):
- source = _normalize_line_endings(source)
- return compile(source, pathname, 'exec', dont_inherit=True)
- # Convert the date/time values found in the Zip archive to a value
- # that's compatible with the time stamp stored in .pyc files.
- def _parse_dostime(d, t):
- return time.mktime((
- (d >> 9) + 1980, # bits 9..15: year
- (d >> 5) & 0xF, # bits 5..8: month
- d & 0x1F, # bits 0..4: day
- t >> 11, # bits 11..15: hours
- (t >> 5) & 0x3F, # bits 8..10: minutes
- (t & 0x1F) * 2, # bits 0..7: seconds / 2
- -1, -1, -1))
- # Given a path to a .pyc file in the archive, return the
- # modification time of the matching .py file and its size,
- # or (0, 0) if no source is available.
- def _get_mtime_and_size_of_source(self, path):
- try:
- # strip 'c' or 'o' from *.py[co]
- assert path[-1:] in ('c', 'o')
- path = path[:-1]
- toc_entry = self._files[path]
- # fetch the time stamp of the .py file for comparison
- # with an embedded pyc time stamp
- time = toc_entry[5]
- date = toc_entry[6]
- uncompressed_size = toc_entry[3]
- return _parse_dostime(date, time), uncompressed_size
- except (KeyError, IndexError, TypeError):
- return 0, 0
- # Given a path to a .pyc file in the archive, return the
- # contents of the matching .py file, or None if no source
- # is available.
- def _get_pyc_source(self, path):
- # strip 'c' or 'o' from *.py[co]
- assert path[-1:] in ('c', 'o')
- path = path[:-1]
- try:
- toc_entry = self._files[path]
- except KeyError:
- return None
- else:
- return _get_data(self.archive, toc_entry)
- # Get the code object associated with the module specified by
- # 'fullname'.
- def _get_module_code(self, fullname):
- path = _get_module_path(self, fullname)
- for suffix, isbytecode, ispackage in _zip_searchorder:
- fullpath = path + suffix
- _bootstrap._verbose_message('trying {}{}{}', self.archive, path_sep, fullpath, verbosity=2)
- try:
- toc_entry = self._files[fullpath]
- except KeyError:
- pass
- else:
- modpath = toc_entry[0]
- data = _get_data(self.archive, toc_entry)
- if isbytecode:
- code = _unmarshal_code(self, modpath, fullpath, fullname, data)
- else:
- code = _compile_source(modpath, data)
- if code is None:
- # bad magic number or non-matching mtime
- # in byte code, try next
- continue
- modpath = toc_entry[0]
- return code, ispackage, modpath
- else:
- raise ZipImportError(f"can't find module {fullname!r}", name=fullname)
- class _ZipImportResourceReader:
- """Private class used to support ZipImport.get_resource_reader().
- This class is allowed to reference all the innards and private parts of
- the zipimporter.
- """
- _registered = False
- def __init__(self, zipimporter, fullname):
- self.zipimporter = zipimporter
- self.fullname = fullname
- def open_resource(self, resource):
- fullname_as_path = self.fullname.replace('.', '/')
- path = f'{fullname_as_path}/{resource}'
- from io import BytesIO
- try:
- return BytesIO(self.zipimporter.get_data(path))
- except OSError:
- raise FileNotFoundError(path)
- def resource_path(self, resource):
- # All resources are in the zip file, so there is no path to the file.
- # Raising FileNotFoundError tells the higher level API to extract the
- # binary data and create a temporary file.
- raise FileNotFoundError
- def is_resource(self, name):
- # Maybe we could do better, but if we can get the data, it's a
- # resource. Otherwise it isn't.
- fullname_as_path = self.fullname.replace('.', '/')
- path = f'{fullname_as_path}/{name}'
- try:
- self.zipimporter.get_data(path)
- except OSError:
- return False
- return True
- def contents(self):
- # This is a bit convoluted, because fullname will be a module path,
- # but _files is a list of file names relative to the top of the
- # archive's namespace. We want to compare file paths to find all the
- # names of things inside the module represented by fullname. So we
- # turn the module path of fullname into a file path relative to the
- # top of the archive, and then we iterate through _files looking for
- # names inside that "directory".
- from pathlib import Path
- fullname_path = Path(self.zipimporter.get_filename(self.fullname))
- relative_path = fullname_path.relative_to(self.zipimporter.archive)
- # Don't forget that fullname names a package, so its path will include
- # __init__.py, which we want to ignore.
- assert relative_path.name == '__init__.py'
- package_path = relative_path.parent
- subdirs_seen = set()
- for filename in self.zipimporter._files:
- try:
- relative = Path(filename).relative_to(package_path)
- except ValueError:
- continue
- # If the path of the file (which is relative to the top of the zip
- # namespace), relative to the package given when the resource
- # reader was created, has a parent, then it's a name in a
- # subdirectory and thus we skip it.
- parent_name = relative.parent.name
- if len(parent_name) == 0:
- yield relative.name
- elif parent_name not in subdirs_seen:
- subdirs_seen.add(parent_name)
- yield parent_name
|