123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604 |
- # -*- coding: utf-8 -*-
- # Copyright (c) 2013, Mahmoud Hashemi
- #
- # Redistribution and use in source and binary forms, with or without
- # modification, are permitted provided that the following conditions are
- # met:
- #
- # * Redistributions of source code must retain the above copyright
- # notice, this list of conditions and the following disclaimer.
- #
- # * Redistributions in binary form must reproduce the above
- # copyright notice, this list of conditions and the following
- # disclaimer in the documentation and/or other materials provided
- # with the distribution.
- #
- # * The names of the contributors may not be used to endorse or
- # promote products derived from this software without specific
- # prior written permission.
- #
- # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- # Coding decl above needed for rendering the emdash properly in the
- # documentation.
- """
- Module ``ioutils`` implements a number of helper classes and functions which
- are useful when dealing with input, output, and bytestreams in a variety of
- ways.
- """
- import os
- from io import BytesIO, IOBase
- from abc import (
- ABCMeta,
- abstractmethod,
- abstractproperty,
- )
- from errno import EINVAL
- from codecs import EncodedFile
- from tempfile import TemporaryFile
- try:
- from itertools import izip_longest as zip_longest # Python 2
- except ImportError:
- from itertools import zip_longest # Python 3
- try:
- text_type = unicode # Python 2
- binary_type = str
- except NameError:
- text_type = str # Python 3
- binary_type = bytes
- READ_CHUNK_SIZE = 21333
- """
- Number of bytes to read at a time. The value is ~ 1/3rd of 64k which means that
- the value will easily fit in the L2 cache of most processors even if every
- codepoint in a string is three bytes long which makes it a nice fast default
- value.
- """
- class SpooledIOBase(IOBase):
- """
- A base class shared by the SpooledBytesIO and SpooledStringIO classes.
- The SpooledTemporaryFile class is missing several attributes and methods
- present in the StringIO implementation. This brings the api as close to
- parity as possible so that classes derived from SpooledIOBase can be used
- as near drop-in replacements to save memory.
- """
- __metaclass__ = ABCMeta
- def __init__(self, max_size=5000000, dir=None):
- self._max_size = max_size
- self._dir = dir
- def _checkClosed(self, msg=None):
- """Raise a ValueError if file is closed"""
- if self.closed:
- raise ValueError('I/O operation on closed file.'
- if msg is None else msg)
- @abstractmethod
- def read(self, n=-1):
- """Read n characters from the buffer"""
- @abstractmethod
- def write(self, s):
- """Write into the buffer"""
- @abstractmethod
- def seek(self, pos, mode=0):
- """Seek to a specific point in a file"""
- @abstractmethod
- def readline(self, length=None):
- """Returns the next available line"""
- @abstractmethod
- def readlines(self, sizehint=0):
- """Returns a list of all lines from the current position forward"""
- def writelines(self, lines):
- """
- Write lines to the file from an interable.
- NOTE: writelines() does NOT add line separators.
- """
- self._checkClosed()
- for line in lines:
- self.write(line)
- @abstractmethod
- def rollover(self):
- """Roll file-like-object over into a real temporary file"""
- @abstractmethod
- def tell(self):
- """Return the current position"""
- @abstractproperty
- def buffer(self):
- """Should return a flo instance"""
- @abstractproperty
- def _rolled(self):
- """Returns whether the file has been rolled to a real file or not"""
- @abstractproperty
- def len(self):
- """Returns the length of the data"""
- def _get_softspace(self):
- return self.buffer.softspace
- def _set_softspace(self, val):
- self.buffer.softspace = val
- softspace = property(_get_softspace, _set_softspace)
- @property
- def _file(self):
- return self.buffer
- def close(self):
- return self.buffer.close()
- def flush(self):
- self._checkClosed()
- return self.buffer.flush()
- def isatty(self):
- self._checkClosed()
- return self.buffer.isatty()
- @property
- def closed(self):
- return self.buffer.closed
- @property
- def pos(self):
- return self.tell()
- @property
- def buf(self):
- return self.getvalue()
- def fileno(self):
- self.rollover()
- return self.buffer.fileno()
- def truncate(self, size=None):
- """
- Truncate the contents of the buffer.
- Custom version of truncate that takes either no arguments (like the
- real SpooledTemporaryFile) or a single argument that truncates the
- value to a certain index location.
- """
- self._checkClosed()
- if size is None:
- return self.buffer.truncate()
- if size < 0:
- raise IOError(EINVAL, "Negative size not allowed")
- # Emulate truncation to a particular location
- pos = self.tell()
- self.seek(size)
- self.buffer.truncate()
- if pos < size:
- self.seek(pos)
- def getvalue(self):
- """Return the entire files contents."""
- self._checkClosed()
- pos = self.tell()
- self.seek(0)
- val = self.read()
- self.seek(pos)
- return val
- def seekable(self):
- return True
- def readable(self):
- return True
- def writable(self):
- return True
- def __next__(self):
- self._checkClosed()
- line = self.readline()
- if not line:
- pos = self.buffer.tell()
- self.buffer.seek(0, os.SEEK_END)
- if pos == self.buffer.tell():
- raise StopIteration
- else:
- self.buffer.seek(pos)
- return line
- next = __next__
- def __len__(self):
- return self.len
- def __iter__(self):
- self._checkClosed()
- return self
- def __enter__(self):
- self._checkClosed()
- return self
- def __exit__(self, *args):
- self._file.close()
- def __eq__(self, other):
- if isinstance(other, self.__class__):
- self_pos = self.tell()
- other_pos = other.tell()
- try:
- self.seek(0)
- other.seek(0)
- eq = True
- for self_line, other_line in zip_longest(self, other):
- if self_line != other_line:
- eq = False
- break
- self.seek(self_pos)
- other.seek(other_pos)
- except Exception:
- # Attempt to return files to original position if there were any errors
- try:
- self.seek(self_pos)
- except Exception:
- pass
- try:
- other.seek(other_pos)
- except Exception:
- pass
- raise
- else:
- return eq
- return False
- def __ne__(self, other):
- return not self.__eq__(other)
- def __bool__(self):
- return True
- def __del__(self):
- """Can fail when called at program exit so suppress traceback."""
- try:
- self.close()
- except Exception:
- pass
- __nonzero__ = __bool__
- class SpooledBytesIO(SpooledIOBase):
- """
- SpooledBytesIO is a spooled file-like-object that only accepts bytes. On
- Python 2.x this means the 'str' type; on Python 3.x this means the 'bytes'
- type. Bytes are written in and retrieved exactly as given, but it will
- raise TypeErrors if something other than bytes are written.
- Example::
- >>> from boltons import ioutils
- >>> with ioutils.SpooledBytesIO() as f:
- ... f.write(b"Happy IO")
- ... _ = f.seek(0)
- ... isinstance(f.getvalue(), ioutils.binary_type)
- True
- """
- def read(self, n=-1):
- self._checkClosed()
- return self.buffer.read(n)
- def write(self, s):
- self._checkClosed()
- if not isinstance(s, binary_type):
- raise TypeError("{} expected, got {}".format(
- binary_type.__name__,
- type(s).__name__
- ))
- if self.tell() + len(s) >= self._max_size:
- self.rollover()
- self.buffer.write(s)
- def seek(self, pos, mode=0):
- self._checkClosed()
- return self.buffer.seek(pos, mode)
- def readline(self, length=None):
- self._checkClosed()
- if length:
- return self.buffer.readline(length)
- else:
- return self.buffer.readline()
- def readlines(self, sizehint=0):
- return self.buffer.readlines(sizehint)
- def rollover(self):
- """Roll the StringIO over to a TempFile"""
- if not self._rolled:
- tmp = TemporaryFile(dir=self._dir)
- pos = self.buffer.tell()
- tmp.write(self.buffer.getvalue())
- tmp.seek(pos)
- self.buffer.close()
- self._buffer = tmp
- @property
- def _rolled(self):
- return not isinstance(self.buffer, BytesIO)
- @property
- def buffer(self):
- try:
- return self._buffer
- except AttributeError:
- self._buffer = BytesIO()
- return self._buffer
- @property
- def len(self):
- """Determine the length of the file"""
- pos = self.tell()
- if self._rolled:
- self.seek(0)
- val = os.fstat(self.fileno()).st_size
- else:
- self.seek(0, os.SEEK_END)
- val = self.tell()
- self.seek(pos)
- return val
- def tell(self):
- self._checkClosed()
- return self.buffer.tell()
- class SpooledStringIO(SpooledIOBase):
- """
- SpooledStringIO is a spooled file-like-object that only accepts unicode
- values. On Python 2.x this means the 'unicode' type and on Python 3.x this
- means the 'str' type. Values are accepted as unicode and then coerced into
- utf-8 encoded bytes for storage. On retrieval, the values are returned as
- unicode.
- Example::
- >>> from boltons import ioutils
- >>> with ioutils.SpooledStringIO() as f:
- ... f.write(u"\u2014 Hey, an emdash!")
- ... _ = f.seek(0)
- ... isinstance(f.read(), ioutils.text_type)
- True
- """
- def __init__(self, *args, **kwargs):
- self._tell = 0
- super(SpooledStringIO, self).__init__(*args, **kwargs)
- def read(self, n=-1):
- self._checkClosed()
- ret = self.buffer.reader.read(n, n)
- self._tell = self.tell() + len(ret)
- return ret
- def write(self, s):
- self._checkClosed()
- if not isinstance(s, text_type):
- raise TypeError("{} expected, got {}".format(
- text_type.__name__,
- type(s).__name__
- ))
- current_pos = self.tell()
- if self.buffer.tell() + len(s.encode('utf-8')) >= self._max_size:
- self.rollover()
- self.buffer.write(s.encode('utf-8'))
- self._tell = current_pos + len(s)
- def _traverse_codepoints(self, current_position, n):
- """Traverse from current position to the right n codepoints"""
- dest = current_position + n
- while True:
- if current_position == dest:
- # By chance we've landed on the right position, break
- break
- # If the read would take us past the intended position then
- # seek only enough to cover the offset
- if current_position + READ_CHUNK_SIZE > dest:
- self.read(dest - current_position)
- break
- else:
- ret = self.read(READ_CHUNK_SIZE)
- # Increment our current position
- current_position += READ_CHUNK_SIZE
- # If we kept reading but there was nothing here, break
- # as we are at the end of the file
- if not ret:
- break
- return dest
- def seek(self, pos, mode=0):
- """Traverse from offset to the specified codepoint"""
- self._checkClosed()
- # Seek to position from the start of the file
- if mode == os.SEEK_SET:
- self.buffer.seek(0)
- self._traverse_codepoints(0, pos)
- self._tell = pos
- # Seek to new position relative to current position
- elif mode == os.SEEK_CUR:
- start_pos = self.tell()
- self._traverse_codepoints(self.tell(), pos)
- self._tell = start_pos + pos
- elif mode == os.SEEK_END:
- self.buffer.seek(0)
- dest_position = self.len - pos
- self._traverse_codepoints(0, dest_position)
- self._tell = dest_position
- else:
- raise ValueError(
- "Invalid whence ({0}, should be 0, 1, or 2)".format(mode)
- )
- return self.tell()
- def readline(self, length=None):
- self._checkClosed()
- ret = self.buffer.readline(length).decode('utf-8')
- self._tell = self.tell() + len(ret)
- return ret
- def readlines(self, sizehint=0):
- ret = [x.decode('utf-8') for x in self.buffer.readlines(sizehint)]
- self._tell = self.tell() + sum((len(x) for x in ret))
- return ret
- @property
- def buffer(self):
- try:
- return self._buffer
- except AttributeError:
- self._buffer = EncodedFile(BytesIO(), data_encoding='utf-8')
- return self._buffer
- @property
- def _rolled(self):
- return not isinstance(self.buffer.stream, BytesIO)
- def rollover(self):
- """Roll the buffer over to a TempFile"""
- if not self._rolled:
- tmp = EncodedFile(TemporaryFile(dir=self._dir),
- data_encoding='utf-8')
- pos = self.buffer.tell()
- tmp.write(self.buffer.getvalue())
- tmp.seek(pos)
- self.buffer.close()
- self._buffer = tmp
- def tell(self):
- """Return the codepoint position"""
- self._checkClosed()
- return self._tell
- @property
- def len(self):
- """Determine the number of codepoints in the file"""
- pos = self.buffer.tell()
- self.buffer.seek(0)
- total = 0
- while True:
- ret = self.read(READ_CHUNK_SIZE)
- if not ret:
- break
- total += len(ret)
- self.buffer.seek(pos)
- return total
- def is_text_fileobj(fileobj):
- if getattr(fileobj, 'encoding', False):
- # codecs.open and io.TextIOBase
- return True
- if getattr(fileobj, 'getvalue', False):
- # StringIO.StringIO / cStringIO.StringIO / io.StringIO
- try:
- if isinstance(fileobj.getvalue(), type(u'')):
- return True
- except Exception:
- pass
- return False
- class MultiFileReader(object):
- """Takes a list of open files or file-like objects and provides an
- interface to read from them all contiguously. Like
- :func:`itertools.chain()`, but for reading files.
- >>> mfr = MultiFileReader(BytesIO(b'ab'), BytesIO(b'cd'), BytesIO(b'e'))
- >>> mfr.read(3).decode('ascii')
- u'abc'
- >>> mfr.read(3).decode('ascii')
- u'de'
- The constructor takes as many fileobjs as you hand it, and will
- raise a TypeError on non-file-like objects. A ValueError is raised
- when file-like objects are a mix of bytes- and text-handling
- objects (for instance, BytesIO and StringIO).
- """
- def __init__(self, *fileobjs):
- if not all([callable(getattr(f, 'read', None)) and
- callable(getattr(f, 'seek', None)) for f in fileobjs]):
- raise TypeError('MultiFileReader expected file-like objects'
- ' with .read() and .seek()')
- if all([is_text_fileobj(f) for f in fileobjs]):
- # codecs.open and io.TextIOBase
- self._joiner = u''
- elif any([is_text_fileobj(f) for f in fileobjs]):
- raise ValueError('All arguments to MultiFileReader must handle'
- ' bytes OR text, not a mix')
- else:
- # open/file and io.BytesIO
- self._joiner = b''
- self._fileobjs = fileobjs
- self._index = 0
- def read(self, amt=None):
- """Read up to the specified *amt*, seamlessly bridging across
- files. Returns the appropriate type of string (bytes or text)
- for the input, and returns an empty string when the files are
- exhausted.
- """
- if not amt:
- return self._joiner.join(f.read() for f in self._fileobjs)
- parts = []
- while amt > 0 and self._index < len(self._fileobjs):
- parts.append(self._fileobjs[self._index].read(amt))
- got = len(parts[-1])
- if got < amt:
- self._index += 1
- amt -= got
- return self._joiner.join(parts)
- def seek(self, offset, whence=os.SEEK_SET):
- """Enables setting position of the file cursor to a given
- *offset*. Currently only supports ``offset=0``.
- """
- if whence != os.SEEK_SET:
- raise NotImplementedError(
- 'MultiFileReader.seek() only supports os.SEEK_SET')
- if offset != 0:
- raise NotImplementedError(
- 'MultiFileReader only supports seeking to start at this time')
- for f in self._fileobjs:
- f.seek(0)
|