123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- """Abstract Transport class."""
- __all__ = (
- 'BaseTransport', 'ReadTransport', 'WriteTransport',
- 'Transport', 'DatagramTransport', 'SubprocessTransport',
- )
- class BaseTransport:
- """Base class for transports."""
- __slots__ = ('_extra',)
- def __init__(self, extra=None):
- if extra is None:
- extra = {}
- self._extra = extra
- def get_extra_info(self, name, default=None):
- """Get optional transport information."""
- return self._extra.get(name, default)
- def is_closing(self):
- """Return True if the transport is closing or closed."""
- raise NotImplementedError
- def close(self):
- """Close the transport.
- Buffered data will be flushed asynchronously. No more data
- will be received. After all buffered data is flushed, the
- protocol's connection_lost() method will (eventually) be
- called with None as its argument.
- """
- raise NotImplementedError
- def set_protocol(self, protocol):
- """Set a new protocol."""
- raise NotImplementedError
- def get_protocol(self):
- """Return the current protocol."""
- raise NotImplementedError
- class ReadTransport(BaseTransport):
- """Interface for read-only transports."""
- __slots__ = ()
- def is_reading(self):
- """Return True if the transport is receiving."""
- raise NotImplementedError
- def pause_reading(self):
- """Pause the receiving end.
- No data will be passed to the protocol's data_received()
- method until resume_reading() is called.
- """
- raise NotImplementedError
- def resume_reading(self):
- """Resume the receiving end.
- Data received will once again be passed to the protocol's
- data_received() method.
- """
- raise NotImplementedError
- class WriteTransport(BaseTransport):
- """Interface for write-only transports."""
- __slots__ = ()
- def set_write_buffer_limits(self, high=None, low=None):
- """Set the high- and low-water limits for write flow control.
- These two values control when to call the protocol's
- pause_writing() and resume_writing() methods. If specified,
- the low-water limit must be less than or equal to the
- high-water limit. Neither value can be negative.
- The defaults are implementation-specific. If only the
- high-water limit is given, the low-water limit defaults to an
- implementation-specific value less than or equal to the
- high-water limit. Setting high to zero forces low to zero as
- well, and causes pause_writing() to be called whenever the
- buffer becomes non-empty. Setting low to zero causes
- resume_writing() to be called only once the buffer is empty.
- Use of zero for either limit is generally sub-optimal as it
- reduces opportunities for doing I/O and computation
- concurrently.
- """
- raise NotImplementedError
- def get_write_buffer_size(self):
- """Return the current size of the write buffer."""
- raise NotImplementedError
- def get_write_buffer_limits(self):
- """Get the high and low watermarks for write flow control.
- Return a tuple (low, high) where low and high are
- positive number of bytes."""
- raise NotImplementedError
- def write(self, data):
- """Write some data bytes to the transport.
- This does not block; it buffers the data and arranges for it
- to be sent out asynchronously.
- """
- raise NotImplementedError
- def writelines(self, list_of_data):
- """Write a list (or any iterable) of data bytes to the transport.
- The default implementation concatenates the arguments and
- calls write() on the result.
- """
- data = b''.join(list_of_data)
- self.write(data)
- def write_eof(self):
- """Close the write end after flushing buffered data.
- (This is like typing ^D into a UNIX program reading from stdin.)
- Data may still be received.
- """
- raise NotImplementedError
- def can_write_eof(self):
- """Return True if this transport supports write_eof(), False if not."""
- raise NotImplementedError
- def abort(self):
- """Close the transport immediately.
- Buffered data will be lost. No more data will be received.
- The protocol's connection_lost() method will (eventually) be
- called with None as its argument.
- """
- raise NotImplementedError
- class Transport(ReadTransport, WriteTransport):
- """Interface representing a bidirectional transport.
- There may be several implementations, but typically, the user does
- not implement new transports; rather, the platform provides some
- useful transports that are implemented using the platform's best
- practices.
- The user never instantiates a transport directly; they call a
- utility function, passing it a protocol factory and other
- information necessary to create the transport and protocol. (E.g.
- EventLoop.create_connection() or EventLoop.create_server().)
- The utility function will asynchronously create a transport and a
- protocol and hook them up by calling the protocol's
- connection_made() method, passing it the transport.
- The implementation here raises NotImplemented for every method
- except writelines(), which calls write() in a loop.
- """
- __slots__ = ()
- class DatagramTransport(BaseTransport):
- """Interface for datagram (UDP) transports."""
- __slots__ = ()
- def sendto(self, data, addr=None):
- """Send data to the transport.
- This does not block; it buffers the data and arranges for it
- to be sent out asynchronously.
- addr is target socket address.
- If addr is None use target address pointed on transport creation.
- """
- raise NotImplementedError
- def abort(self):
- """Close the transport immediately.
- Buffered data will be lost. No more data will be received.
- The protocol's connection_lost() method will (eventually) be
- called with None as its argument.
- """
- raise NotImplementedError
- class SubprocessTransport(BaseTransport):
- __slots__ = ()
- def get_pid(self):
- """Get subprocess id."""
- raise NotImplementedError
- def get_returncode(self):
- """Get subprocess returncode.
- See also
- http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode
- """
- raise NotImplementedError
- def get_pipe_transport(self, fd):
- """Get transport for pipe with number fd."""
- raise NotImplementedError
- def send_signal(self, signal):
- """Send signal to subprocess.
- See also:
- docs.python.org/3/library/subprocess#subprocess.Popen.send_signal
- """
- raise NotImplementedError
- def terminate(self):
- """Stop the subprocess.
- Alias for close() method.
- On Posix OSs the method sends SIGTERM to the subprocess.
- On Windows the Win32 API function TerminateProcess()
- is called to stop the subprocess.
- See also:
- http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate
- """
- raise NotImplementedError
- def kill(self):
- """Kill the subprocess.
- On Posix OSs the function sends SIGKILL to the subprocess.
- On Windows kill() is an alias for terminate().
- See also:
- http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
- """
- raise NotImplementedError
- class _FlowControlMixin(Transport):
- """All the logic for (write) flow control in a mix-in base class.
- The subclass must implement get_write_buffer_size(). It must call
- _maybe_pause_protocol() whenever the write buffer size increases,
- and _maybe_resume_protocol() whenever it decreases. It may also
- override set_write_buffer_limits() (e.g. to specify different
- defaults).
- The subclass constructor must call super().__init__(extra). This
- will call set_write_buffer_limits().
- The user may call set_write_buffer_limits() and
- get_write_buffer_size(), and their protocol's pause_writing() and
- resume_writing() may be called.
- """
- __slots__ = ('_loop', '_protocol_paused', '_high_water', '_low_water')
- def __init__(self, extra=None, loop=None):
- super().__init__(extra)
- assert loop is not None
- self._loop = loop
- self._protocol_paused = False
- self._set_write_buffer_limits()
- def _maybe_pause_protocol(self):
- size = self.get_write_buffer_size()
- if size <= self._high_water:
- return
- if not self._protocol_paused:
- self._protocol_paused = True
- try:
- self._protocol.pause_writing()
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
- self._loop.call_exception_handler({
- 'message': 'protocol.pause_writing() failed',
- 'exception': exc,
- 'transport': self,
- 'protocol': self._protocol,
- })
- def _maybe_resume_protocol(self):
- if (self._protocol_paused and
- self.get_write_buffer_size() <= self._low_water):
- self._protocol_paused = False
- try:
- self._protocol.resume_writing()
- except (SystemExit, KeyboardInterrupt):
- raise
- except BaseException as exc:
- self._loop.call_exception_handler({
- 'message': 'protocol.resume_writing() failed',
- 'exception': exc,
- 'transport': self,
- 'protocol': self._protocol,
- })
- def get_write_buffer_limits(self):
- return (self._low_water, self._high_water)
- def _set_write_buffer_limits(self, high=None, low=None):
- if high is None:
- if low is None:
- high = 64 * 1024
- else:
- high = 4 * low
- if low is None:
- low = high // 4
- if not high >= low >= 0:
- raise ValueError(
- f'high ({high!r}) must be >= low ({low!r}) must be >= 0')
- self._high_water = high
- self._low_water = low
- def set_write_buffer_limits(self, high=None, low=None):
- self._set_write_buffer_limits(high=high, low=low)
- self._maybe_pause_protocol()
- def get_write_buffer_size(self):
- raise NotImplementedError
|