xdrpp
RFC4506 XDR compiler and message library
pollset.h
Go to the documentation of this file.
1 // -*- C++ -*-
2 
3 #ifndef _XDRPP_POLLSET_H_INCLUDED_
4 #define _XDRPP_POLLSET_H_INCLUDED_ 1
5 
6 /** \file pollset.h Asynchronous I/O and event harness. */
7 
8 #include <csignal>
9 #include <functional>
10 #include <map>
11 #include <memory>
12 #include <mutex>
13 #include <thread>
14 #include <type_traits>
15 #include <unordered_map>
16 #include <vector>
17 #include <poll.h>
18 #include <xdrpp/socket.h>
19 
20 namespace xdr {
21 
22 //! Structure to poll for a set of file descriptors and timeouts.
23 class pollset {
24 protected:
25  static constexpr int kReadFlag = 0x1;
26  static constexpr int kWriteFlag = 0x2;
27  static constexpr int kOnceFlag = 0x4;
28  //! Number of registered file decriptor and timeout callbacks.
29  std::size_t num_cbs() const;
30 
31 public:
32  enum op_t {
33  //! Specify interest in read-ready condition
34  Read = kReadFlag,
35  //! Specify interest in write-ready condition
36  Write = kWriteFlag,
37  //! Valid only when removing callbacks.
38  ReadWrite = kReadFlag | kWriteFlag,
39  //! Like \c Read, but the callback only executes once.
40  ReadOnce = kReadFlag | kOnceFlag,
41  //! Like \c Write, but the callback only executes once.
42  WriteOnce = kWriteFlag | kOnceFlag
43  };
44 
45  using cb_t = std::function<void()>;
46 
47 private:
48  // File descriptor callback information
49  struct fd_state {
50  cb_t rcb;
51  cb_t wcb;
52  int idx {-1}; // Index in pollfds_
53  bool roneshot;
54  bool woneshot;
55  ~fd_state(); // Sanity check no active callbacks
56  };
57 
58  // File descriptor callback state
59  std::vector<pollfd> pollfds_;
60  std::unordered_map<sock_t, fd_state> state_;
61 
62  // Timeout callback state
63  std::multimap<std::int64_t, cb_t> time_cbs_;
64 
65  cb_t &fd_cb_helper(sock_t s, op_t op);
66  void consolidate();
67  int next_timeout(int ms);
68  void run_timeouts();
69 
70  // Hook for subtypes
71  virtual void run_subtype_handlers() {}
72 
73 public:
74  pollset() = default;
75  pollset(const pollset &) = delete;
76 
77  //! Go through one round of checking all file descriptors. \arg \c
78  //! timeout is a timeout in milliseconds (or -1 to wait forever).
79  //!
80  //! Typical usage: \code
81  //! PollSet ps;
82  //! // ... register some callbacks or asynchronous events ...
83  //! while(ps.pending())
84  //! ps.poll();
85  //! \endcode
86  void poll(int timeout = -1);
87 
88  //! Returns \c false if no file descriptor callbacks are registered
89  //! and no timeouts or asynchronous events are pending. If it
90  //! returns \c false, then PollSet::poll will pause forever in the
91  //! absence of a signal or a call to PollSet::inject_cb in a
92  //! different thread.
93  virtual bool pending() const { return num_cbs(); }
94 
95  //! Continously poll and only return on exception or when there is
96  //! no more work to do.
97  void run() { while (pending()) poll(); }
98 
99  //! Set a read or write callback on a particular file descriptor.
100  //! \arg \c fd is the file descriptor. \arg \c op specifies the
101  //! condition on which to invoke the callback. Only one \c Read and
102  //! one \c Write callback are permitted per file descriptor. E.g.,
103  //! calling \c set_cb with \c ReadOnce overwrites a previous \c Read
104  //! callback on the same file descriptor. The value \c ReadWrite is
105  //! illegal when adding a callback (you must set \c Read and \c
106  //! Write callbacks separately). \arg \c cb is the callback, which
107  //! must be convertible to PollSet::cb_t.
108  template<typename CB> void fd_cb(sock_t s, op_t op, CB &&cb) {
109  if (!(fd_cb_helper(s, op) = std::forward<CB>(cb)))
110  fd_cb(s, op);
111  }
112 
113  //! Remove a callback on a file descriptor. If \c op is \c
114  //! ReadWrite, removes both read and write callbacks on the
115  //! descriptor.
116  void fd_cb(sock_t s, op_t op, std::nullptr_t = nullptr);
117 
118  //! Number of milliseconds since an arbitrary but fixed time, used
119  //! as the basis of all timeouts. Time zero is
120  //! std::chrono::steady_clock's epoch, which in some implementations
121  //! is the time a machine was booted.
122  static std::int64_t now_ms();
123 
124  //! Abstract class used to represent a pending timeout.
125  class Timeout {
126  using iterator = decltype(time_cbs_)::iterator;
127  iterator i_;
128  explicit Timeout(iterator i) : i_(i) {}
129  Timeout &operator=(iterator i) { i_ = i; return *this; }
130  friend class pollset;
131  public:
132  //! A null timeout. Relies on static initalization.
133  static const Timeout null_;
134  //! After static initialization, Timeouts are null by default.
135  //! However, since \c null_ relies on static initialization,
136  //! static/global Timeouts are not guaranteed to be null and
137  //! should be explicitly initialized from \c
138  //! pollset::timeout_null().
139  Timeout() : i_(null_.i_) {}
140  explicit operator bool() const { return i_ != null_.i_; }
141  };
142 
143  //! Set a callback to run a certain number of milliseconds from now.
144  //! \arg \c ms is the delay in milliseconds before running the
145  //! callback. \arg \c cb must be convertible to PollSet::cb_t.
146  //! \returns an object on which you can call the method
147  //! PollSet::timeout_cancel to cancel the timeout.
148  template<typename CB> Timeout timeout(std::int64_t ms, CB &&cb) {
149  return timeout_at(now_ms() + ms, std::forward<CB>(cb));
150  }
151  //! Set a callback to run at a specific time (as returned by
152  //! PollSet::now_ms()).
153  template<typename CB> Timeout timeout_at(std::int64_t ms, CB &&cb) {
154  return Timeout(time_cbs_.emplace(ms, std::forward<CB>(cb)));
155  }
156 
157  //! An invalid timeout, useful for initializing PollSet::Timeout
158  //! values before a timeout has been scheduled.
159  static Timeout timeout_null();
160 
161  //! Cancel a pending timeout. Sets the PollSet::Timeout argument \c
162  //! t to PollSet::timeout_null(), but obviously does not not affect
163  //! other copies of \c t that will now be invalid.
164  void timeout_cancel(Timeout &t);
165 
166  //! Returns the absolute time (in milliseconds) at which a timeout
167  //! will run.
168  std::int64_t timeout_time(Timeout t) const { return t.i_->first; }
169 
170  //! Reschedule a timeout to run at a specific time. Updates the
171  //! argument \c t, but invalidates any other copies of \c t.
172  void timeout_reschedule_at(Timeout &t, std::int64_t ms);
173  //! Reschedule a timeout some number of milliseconds in the future.
174  void timeout_reschedule(Timeout &t, std::int64_t ms) {
175  timeout_reschedule_at(t, now_ms() + ms);
176  }
177 };
178 
179 //! Adds support for signal handlers, asynchonous events, and
180 //! callbacks injected from other threads to the basic functionality
181 //! in \c pollset_light.
182 class pollset_plus : public pollset {
183  enum class wake_type : std::uint8_t {
184  Normal = 0,
185  Signal = 1
186  };
187 
188  // State for asynchronous tasks
189  template<typename R> struct async_task {
190  pollset_plus *ps_;
191  std::function<R()> work_;
192  std::function<void(R)> cb_;
193  std::unique_ptr<R> rp_;
194 
195  void start() {
196  rp_.reset(new R { work_() });
197  ps_->inject_cb(std::bind(&async_task::done, this));
198  }
199  void done() {
200  std::unique_ptr<async_task> self {this};
201  ps_->nasync_--;
202  cb_(std::move(*rp_));
203  }
204  };
205 
206  // Self-pipe used to wake up poll from signal handlers and other threads
207  sock_t selfpipe_[2];
208 
209  // Asynchronous events enqueued from other threads
210  std::mutex async_cbs_lock_;
211  std::vector<cb_t> async_cbs_;
212  bool async_pending_{false};
213  size_t nasync_{0};
214 
215  // Signal callback state
216  static constexpr int num_sig = 32;
217  static std::mutex signal_owners_lock;
218  static pollset_plus *signal_owners[num_sig];
219  static volatile std::sig_atomic_t signal_flags[num_sig];
220  bool signal_pending_{false};
221  std::map<int, cb_t> signal_cbs_;
222 
223  void wake(wake_type wt);
224  void run_pending_asyncs();
225  void inject_cb_vec(std::vector<cb_t>::iterator b,
226  std::vector<cb_t>::iterator e);
227  void run_subtype_handlers() override;
228  static void signal_handler(int);
229  static void erase_signal_cb(int);
230 
231 public:
232  pollset_plus();
233  ~pollset_plus();
234 
235  bool pending() const override;
236 
237  //! Cause PollSet::poll to return if it is sleeping. Unlike most
238  //! other methods, \c wake is safe to call from a signal handler or
239  //! a different thread.
240  void wake() { wake(wake_type::Normal); }
241 
242  //! Inject a callback to run immediately. Unlike most methods, it
243  //! is safe to call this function from another thread. Being
244  //! thread-safe adds extra overhead, so it does not make sense to
245  //! call this function from the same thread as PollSet::poll. Note
246  //! that \c inject_cb acquires a lock and definitely must <i>not</i>
247  //! be called from a signal handler (or deadlock could ensue).
248  template<typename CB> void inject_cb(CB &&cb) {
249  std::lock_guard<std::mutex> lk(async_cbs_lock_);
250  async_cbs_.emplace_back(std::forward<CB>(cb));
251  if (!async_pending_) {
252  async_pending_ = true;
253  wake();
254  }
255  }
256 
257  //! Execute a task asynchonously in another thread, then run
258  //! callback on the task's result in the main thread. \arg \c work
259  //! is a task to perform asynchronously in another thread, and must
260  //! be convertible to std::function<R()> for some type \c R. \arg
261  //! \c cb is the callback that processes the result in the main
262  //! thread, and must be convertible to std::function<void(R)> for
263  //! the same type \c R.
264  template<typename Work, typename CB> void async(Work &&work, CB &&cb) {
265  using R = decltype(work());
266  async_task<R> *a = new async_task<R> {
267  this, std::forward<Work>(work), std::forward<CB>(cb), nullptr
268  };
269  ++nasync_;
270  std::thread(&async_task<R>::start, a).detach();
271  }
272 
273  //! Add a callback for a particular signal. Note that only one
274  //! callback can be added for a particular signal across all
275  //! `pollset_plus` instances in a single process. Hence, calling
276  //! this function may "steal" a signal from a different
277  //! `pollset_plus` (erasing whatever callback the other
278  //! `pollset_plus` had for the signal). Such callback stealing is
279  //! atomic, allowing one to steal a `pollset_plus`'s signals before
280  //! deleting it with no risk of signals going uncaught.
281  void signal_cb(int sig, cb_t cb);
282 
283  //! Remove any previously added callback for a particular signal.
284  //! Because signal callbacks are process-wide, this static method
285  //! will affect whatever `pollset_plus` currently owns the signal.
286  static void signal_cb(int sig, std::nullptr_t = nullptr);
287 };
288 
289 }
290 
291 #endif // !_XDRPP_POLLSET_H_INCLUDED_
static const Timeout null_
A null timeout. Relies on static initalization.
Definition: pollset.h:133
static Timeout timeout_null()
An invalid timeout, useful for initializing PollSet::Timeout values before a timeout has been schedul...
Definition: pollset.cc:21
void timeout_cancel(Timeout &t)
Cancel a pending timeout.
Definition: pollset.cc:417
std::size_t num_cbs() const
Number of registered file decriptor and timeout callbacks.
Definition: pollset.cc:188
Like Read, but the callback only executes once.
Definition: pollset.h:40
Specify interest in read-ready condition.
Definition: pollset.h:34
Specify interest in write-ready condition.
Definition: pollset.h:36
Most of the xdrpp library is encapsulated in the xdr namespace.
Definition: arpc.cc:4
Timeout()
After static initialization, Timeouts are null by default.
Definition: pollset.h:139
Structure to poll for a set of file descriptors and timeouts.
Definition: pollset.h:23
static std::int64_t now_ms()
Number of milliseconds since an arbitrary but fixed time, used as the basis of all timeouts...
Definition: pollset.cc:409
void poll(int timeout=-1)
Go through one round of checking all file descriptors.
Definition: pollset.cc:217
Timeout timeout_at(std::int64_t ms, CB &&cb)
Set a callback to run at a specific time (as returned by PollSet::now_ms()).
Definition: pollset.h:153
Abstract class used to represent a pending timeout.
Definition: pollset.h:125
Timeout timeout(std::int64_t ms, CB &&cb)
Set a callback to run a certain number of milliseconds from now.
Definition: pollset.h:148
void wake()
Cause PollSet::poll to return if it is sleeping.
Definition: pollset.h:240
void fd_cb(sock_t s, op_t op, CB &&cb)
Set a read or write callback on a particular file descriptor.
Definition: pollset.h:108
void timeout_reschedule_at(Timeout &t, std::int64_t ms)
Reschedule a timeout to run at a specific time.
Definition: pollset.cc:427
std::int64_t timeout_time(Timeout t) const
Returns the absolute time (in milliseconds) at which a timeout will run.
Definition: pollset.h:168
void timeout_reschedule(Timeout &t, std::int64_t ms)
Reschedule a timeout some number of milliseconds in the future.
Definition: pollset.h:174
Abstract away the type of a socket (for windows).
Definition: socket.h:28
void run()
Continously poll and only return on exception or when there is no more work to do.
Definition: pollset.h:97
void async(Work &&work, CB &&cb)
Execute a task asynchonously in another thread, then run callback on the task&#39;s result in the main th...
Definition: pollset.h:264
void inject_cb(CB &&cb)
Inject a callback to run immediately.
Definition: pollset.h:248
Valid only when removing callbacks.
Definition: pollset.h:38
virtual bool pending() const
Returns false if no file descriptor callbacks are registered and no timeouts or asynchronous events a...
Definition: pollset.h:93
Like Write, but the callback only executes once.
Definition: pollset.h:42
Simplified support for creating sockets.
Adds support for signal handlers, asynchonous events, and callbacks injected from other threads to th...
Definition: pollset.h:182