123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- #pragma once
- #include <mutex>
- #include <ostream>
- #include <string>
- #include <reproc++/reproc.hpp>
- namespace reproc {
- /*!
- `reproc_drain` but takes lambdas as sinks. Return an error code from a sink to
- break out of `drain` early. `out` and `err` expect the following signature:
- ```c++
- std::error_code sink(stream stream, const uint8_t *buffer, size_t size);
- ```
- */
- template <typename Out, typename Err>
- std::error_code drain(process &process, Out &&out, Err &&err)
- {
- static constexpr uint8_t initial = 0;
- std::error_code ec;
- // A single call to `read` might contain multiple messages. By always calling
- // both sinks once with no data before reading, we give them the chance to
- // process all previous output before reading from the child process again.
- ec = out(stream::in, &initial, 0);
- if (ec) {
- return ec;
- }
- ec = err(stream::in, &initial, 0);
- if (ec) {
- return ec;
- }
- static constexpr size_t BUFFER_SIZE = 4096;
- uint8_t buffer[BUFFER_SIZE] = {};
- for (;;) {
- int events = 0;
- std::tie(events, ec) = process.poll(event::out | event::err, infinite);
- if (ec) {
- ec = ec == error::broken_pipe ? std::error_code() : ec;
- break;
- }
- if (events & event::deadline) {
- ec = std::make_error_code(std::errc::timed_out);
- break;
- }
- stream stream = events & event::out ? stream::out : stream::err;
- size_t bytes_read = 0;
- std::tie(bytes_read, ec) = process.read(stream, buffer, BUFFER_SIZE);
- if (ec && ec != error::broken_pipe) {
- break;
- }
- bytes_read = ec == error::broken_pipe ? 0 : bytes_read;
- // This used to be `auto &sink = stream == stream::out ? out : err;` but
- // that doesn't actually work if `out` and `err` are not the same type.
- if (stream == stream::out) {
- ec = out(stream, buffer, bytes_read);
- } else {
- ec = err(stream, buffer, bytes_read);
- }
- if (ec) {
- break;
- }
- }
- return ec;
- }
- namespace sink {
- /*! Reads all output into `string`. */
- class string {
- std::string &string_;
- public:
- explicit string(std::string &string) noexcept : string_(string) {}
- std::error_code operator()(stream stream, const uint8_t *buffer, size_t size)
- {
- (void) stream;
- string_.append(reinterpret_cast<const char *>(buffer), size);
- return {};
- }
- };
- /*! Forwards all output to `ostream`. */
- class ostream {
- std::ostream &ostream_;
- public:
- explicit ostream(std::ostream &ostream) noexcept : ostream_(ostream) {}
- std::error_code operator()(stream stream, const uint8_t *buffer, size_t size)
- {
- (void) stream;
- ostream_.write(reinterpret_cast<const char *>(buffer),
- static_cast<std::streamsize>(size));
- return {};
- }
- };
- /*! Discards all output. */
- class discard {
- public:
- std::error_code
- operator()(stream stream, const uint8_t *buffer, size_t size) const noexcept
- {
- (void) stream;
- (void) buffer;
- (void) size;
- return {};
- }
- };
- constexpr discard null = discard();
- namespace thread_safe {
- /*! `sink::string` but locks the given mutex before invoking the sink. */
- class string {
- sink::string sink_;
- std::mutex &mutex_;
- public:
- string(std::string &string, std::mutex &mutex) noexcept
- : sink_(string), mutex_(mutex)
- {}
- std::error_code operator()(stream stream, const uint8_t *buffer, size_t size)
- {
- std::lock_guard<std::mutex> lock(mutex_);
- return sink_(stream, buffer, size);
- }
- };
- }
- }
- }
|