#pragma once #include #include #include #include namespace reproc { /*! `reproc_drain` but takes lambdas as sinks. Return an error code from a sink to break out of `drain` early. `out` and `err` expect the following signature: ```c++ std::error_code sink(stream stream, const uint8_t *buffer, size_t size); ``` */ template std::error_code drain(process &process, Out &&out, Err &&err) { static constexpr uint8_t initial = 0; std::error_code ec; // A single call to `read` might contain multiple messages. By always calling // both sinks once with no data before reading, we give them the chance to // process all previous output before reading from the child process again. ec = out(stream::in, &initial, 0); if (ec) { return ec; } ec = err(stream::in, &initial, 0); if (ec) { return ec; } static constexpr size_t BUFFER_SIZE = 4096; uint8_t buffer[BUFFER_SIZE] = {}; for (;;) { int events = 0; std::tie(events, ec) = process.poll(event::out | event::err, infinite); if (ec) { ec = ec == error::broken_pipe ? std::error_code() : ec; break; } if (events & event::deadline) { ec = std::make_error_code(std::errc::timed_out); break; } stream stream = events & event::out ? stream::out : stream::err; size_t bytes_read = 0; std::tie(bytes_read, ec) = process.read(stream, buffer, BUFFER_SIZE); if (ec && ec != error::broken_pipe) { break; } bytes_read = ec == error::broken_pipe ? 0 : bytes_read; // This used to be `auto &sink = stream == stream::out ? out : err;` but // that doesn't actually work if `out` and `err` are not the same type. if (stream == stream::out) { ec = out(stream, buffer, bytes_read); } else { ec = err(stream, buffer, bytes_read); } if (ec) { break; } } return ec; } namespace sink { /*! Reads all output into `string`. */ class string { std::string &string_; public: explicit string(std::string &string) noexcept : string_(string) {} std::error_code operator()(stream stream, const uint8_t *buffer, size_t size) { (void) stream; string_.append(reinterpret_cast(buffer), size); return {}; } }; /*! Forwards all output to `ostream`. */ class ostream { std::ostream &ostream_; public: explicit ostream(std::ostream &ostream) noexcept : ostream_(ostream) {} std::error_code operator()(stream stream, const uint8_t *buffer, size_t size) { (void) stream; ostream_.write(reinterpret_cast(buffer), static_cast(size)); return {}; } }; /*! Discards all output. */ class discard { public: std::error_code operator()(stream stream, const uint8_t *buffer, size_t size) const noexcept { (void) stream; (void) buffer; (void) size; return {}; } }; constexpr discard null = discard(); namespace thread_safe { /*! `sink::string` but locks the given mutex before invoking the sink. */ class string { sink::string sink_; std::mutex &mutex_; public: string(std::string &string, std::mutex &mutex) noexcept : sink_(string), mutex_(mutex) {} std::error_code operator()(stream stream, const uint8_t *buffer, size_t size) { std::lock_guard lock(mutex_); return sink_(stream, buffer, size); } }; } } }