drain.hpp 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. #pragma once
  2. #include <mutex>
  3. #include <ostream>
  4. #include <string>
  5. #include <reproc++/reproc.hpp>
  6. namespace reproc {
  7. /*!
  8. `reproc_drain` but takes lambdas as sinks. Return an error code from a sink to
  9. break out of `drain` early. `out` and `err` expect the following signature:
  10. ```c++
  11. std::error_code sink(stream stream, const uint8_t *buffer, size_t size);
  12. ```
  13. */
  14. template <typename Out, typename Err>
  15. std::error_code drain(process &process, Out &&out, Err &&err)
  16. {
  17. static constexpr uint8_t initial = 0;
  18. std::error_code ec;
  19. // A single call to `read` might contain multiple messages. By always calling
  20. // both sinks once with no data before reading, we give them the chance to
  21. // process all previous output before reading from the child process again.
  22. ec = out(stream::in, &initial, 0);
  23. if (ec) {
  24. return ec;
  25. }
  26. ec = err(stream::in, &initial, 0);
  27. if (ec) {
  28. return ec;
  29. }
  30. static constexpr size_t BUFFER_SIZE = 4096;
  31. uint8_t buffer[BUFFER_SIZE] = {};
  32. for (;;) {
  33. int events = 0;
  34. std::tie(events, ec) = process.poll(event::out | event::err, infinite);
  35. if (ec) {
  36. ec = ec == error::broken_pipe ? std::error_code() : ec;
  37. break;
  38. }
  39. if (events & event::deadline) {
  40. ec = std::make_error_code(std::errc::timed_out);
  41. break;
  42. }
  43. stream stream = events & event::out ? stream::out : stream::err;
  44. size_t bytes_read = 0;
  45. std::tie(bytes_read, ec) = process.read(stream, buffer, BUFFER_SIZE);
  46. if (ec && ec != error::broken_pipe) {
  47. break;
  48. }
  49. bytes_read = ec == error::broken_pipe ? 0 : bytes_read;
  50. // This used to be `auto &sink = stream == stream::out ? out : err;` but
  51. // that doesn't actually work if `out` and `err` are not the same type.
  52. if (stream == stream::out) {
  53. ec = out(stream, buffer, bytes_read);
  54. } else {
  55. ec = err(stream, buffer, bytes_read);
  56. }
  57. if (ec) {
  58. break;
  59. }
  60. }
  61. return ec;
  62. }
  63. namespace sink {
  64. /*! Reads all output into `string`. */
  65. class string {
  66. std::string &string_;
  67. public:
  68. explicit string(std::string &string) noexcept : string_(string) {}
  69. std::error_code operator()(stream stream, const uint8_t *buffer, size_t size)
  70. {
  71. (void) stream;
  72. string_.append(reinterpret_cast<const char *>(buffer), size);
  73. return {};
  74. }
  75. };
  76. /*! Forwards all output to `ostream`. */
  77. class ostream {
  78. std::ostream &ostream_;
  79. public:
  80. explicit ostream(std::ostream &ostream) noexcept : ostream_(ostream) {}
  81. std::error_code operator()(stream stream, const uint8_t *buffer, size_t size)
  82. {
  83. (void) stream;
  84. ostream_.write(reinterpret_cast<const char *>(buffer),
  85. static_cast<std::streamsize>(size));
  86. return {};
  87. }
  88. };
  89. /*! Discards all output. */
  90. class discard {
  91. public:
  92. std::error_code
  93. operator()(stream stream, const uint8_t *buffer, size_t size) const noexcept
  94. {
  95. (void) stream;
  96. (void) buffer;
  97. (void) size;
  98. return {};
  99. }
  100. };
  101. constexpr discard null = discard();
  102. namespace thread_safe {
  103. /*! `sink::string` but locks the given mutex before invoking the sink. */
  104. class string {
  105. sink::string sink_;
  106. std::mutex &mutex_;
  107. public:
  108. string(std::string &string, std::mutex &mutex) noexcept
  109. : sink_(string), mutex_(mutex)
  110. {}
  111. std::error_code operator()(stream stream, const uint8_t *buffer, size_t size)
  112. {
  113. std::lock_guard<std::mutex> lock(mutex_);
  114. return sink_(stream, buffer, size);
  115. }
  116. };
  117. }
  118. }
  119. }