xdrpp
RFC4506 XDR compiler and message library
pollset.cc
1 
2 #include <algorithm>
3 #include <atomic>
4 #include <cassert>
5 #include <chrono>
6 #include <cstring>
7 #include <iostream>
8 #include <limits>
9 #include <system_error>
10 #include <signal.h>
11 #include <unistd.h>
12 #include <xdrpp/pollset.h>
13 
14 namespace xdr {
15 
16 std::mutex pollset_plus::signal_owners_lock;
17 pollset_plus *pollset_plus::signal_owners[num_sig];
18 volatile std::sig_atomic_t pollset_plus::signal_flags[num_sig];
19 
20 pollset::Timeout
22 {
23  static decltype(pollset::time_cbs_) dummy_time_cbs_;
24  return Timeout{dummy_time_cbs_.end()};
25 }
27 
28 void
29 pollset_plus::signal_handler(int sig)
30 {
31  assert(sig > 0 && sig < num_sig);
32  if (signal_flags[sig])
33  return;
34  signal_flags[sig] = 1;
35  std::atomic_thread_fence(std::memory_order_seq_cst);
36  if (pollset_plus *ps = signal_owners[sig])
37  // It would be pretty unpleasant if ps got deleted in another
38  // thread right here. To prevent this, we set signal_flags[sig]
39  // to the value 1 to signal "wake in progress", then spin on the 1
40  // when deleting signal callbacks.
41  ps->wake(wake_type::Signal);
42  std::atomic_thread_fence(std::memory_order_seq_cst);
43  signal_flags[sig] = 2;
44 }
45 
46 pollset_plus::pollset_plus()
47 {
48  create_selfpipe(selfpipe_);
49  set_close_on_exec(selfpipe_[0]);
50  set_close_on_exec(selfpipe_[1]);
51  set_nonblock(selfpipe_[0]);
52  set_nonblock(selfpipe_[1]);
53  this->fd_cb(selfpipe_[0], Read, [this](){ this->run_pending_asyncs(); });
54 }
55 
56 pollset_plus::~pollset_plus()
57 {
58  {
59  std::lock_guard<std::mutex> lk {signal_owners_lock};
60  while (signal_cbs_.begin() != signal_cbs_.end())
61  erase_signal_cb(signal_cbs_.begin()->first);
62  }
63 
64  fd_cb(selfpipe_[0], Read);
65  close(selfpipe_[0]);
66  close(selfpipe_[1]);
67 }
68 
69 pollset::fd_state::~fd_state()
70 {
71  // XXX - eventually remove
72  assert (!rcb && !wcb);
73 }
74 
75 void
76 pollset_plus::wake(wake_type wt)
77 {
78  static_assert(sizeof wt == 1, "uint8_t enum has wrong size");
79  write(selfpipe_[1], &wt, 1);
80 }
81 
82 void
83 pollset_plus::run_pending_asyncs()
84 {
85  std::vector<cb_t> cbs;
86  std::vector<cb_t>::iterator i;
87 
88  // Catching and re-throwing exceptions ruins the stack trace from
89  // uncaught exceptions, which hurts debugability, particularly in a
90  // core routine that calls a bunch of callbacks. Hence, we abuse
91  // RAII where catch would be more approriate.
92  struct cleanup {
93  bool active;
94  cb_t action;
95  ~cleanup() { if (active) action(); }
96  } c { false, [&](){inject_cb_vec(i+1, cbs.end());} };
97 
98  {
99  wake_type buf[128];
100  int n;
101  while ((n = read(selfpipe_[0], buf, sizeof buf)) > 0)
102  for (int i = 0; i < n && !signal_pending_; i++)
103  if (buf[i] == wake_type::Signal)
104  signal_pending_ = true;
105  }
106  {
107  std::lock_guard<std::mutex> lk {async_cbs_lock_};
108  async_pending_ = false;
109  swap(cbs, async_cbs_);
110  }
111 
112  for (i = cbs.begin(), c.active = true; i != cbs.end(); i++)
113  (*i)();
114  c.active = false;
115 }
116 
117 void
118 pollset_plus::inject_cb_vec(std::vector<cb_t>::iterator b,
119  std::vector<cb_t>::iterator e)
120 {
121  if (b != e) {
122  std::lock_guard<std::mutex> lk {async_cbs_lock_};
123  std::move(b, e, async_cbs_.end());
124  if (!async_pending_) {
125  async_pending_ = true;
126  wake();
127  }
128  }
129 }
130 
131 pollset::cb_t &
132 pollset::fd_cb_helper(sock_t s, op_t op)
133 {
134  fd_state &fs = state_[s];
135  pollfd *pfdp;
136  if (fs.idx < 0) {
137  fs.idx = pollfds_.size();
138  pollfds_.resize(fs.idx + 1);
139  pfdp = &pollfds_.back();
140  pfdp->fd = s.fd_; // XXX
141  }
142  else {
143  pfdp = &pollfds_.at(fs.idx);
144  assert (pfdp->fd == s.fd_); // XXX
145  }
146  if (op & kReadFlag) {
147  if (op & kWriteFlag) {
148  std::cerr << "Illegal call to pollset::fd_cb with ReadWrite"
149  << std::endl;
150  std::terminate();
151  }
152  fs.roneshot = op & kOnceFlag;
153  pfdp->events |= POLLIN;
154  return fs.rcb;
155  }
156  else if (op & kWriteFlag) {
157  fs.woneshot = op & kOnceFlag;
158  pfdp->events |= POLLOUT;
159  return fs.wcb;
160  }
161  else {
162  std::cerr << "Illegal call to pollset::fd_cb with"
163  " neither Read nor Write"
164  << std::endl;
165  std::terminate();
166  }
167 }
168 
169 void
170 pollset::fd_cb(sock_t s, op_t op, std::nullptr_t)
171 {
172  auto fi = state_.find(s);
173  if (fi == state_.end())
174  return;
175  pollfd &pfd = pollfds_.at(fi->second.idx);
176 
177  if (op & kReadFlag) {
178  pfd.events &= ~POLLIN;
179  fi->second.rcb = nullptr;
180  }
181  if (op & kWriteFlag) {
182  pfd.events &= ~POLLOUT;
183  fi->second.wcb = nullptr;
184  }
185 }
186 
187 std::size_t
189 {
190  return pollfds_.size() + time_cbs_.size();
191 }
192 
193 bool
195 {
196  return nasync_ || num_cbs();
197 }
198 
199 int
200 pollset::next_timeout(int ms)
201 {
202  auto next = time_cbs_.begin();
203  if (next == time_cbs_.end())
204  return ms;
205  int64_t now = now_ms();
206  if (now >= next->first)
207  return 0;
208  int64_t wait = next->first - now;
209  if (wait > std::numeric_limits<int>::max())
210  wait = std::numeric_limits<int>::max();
211  if (ms >= 0 && ms <= wait)
212  return ms;
213  return wait;
214 }
215 
216 void
218 {
219  int r = ::poll(pollfds_.data(), pollfds_.size(), next_timeout(timeout));
220  if (r < 0) {
221  if (errno == EINTR)
222  return;
223  std::cerr << "poll: " << sock_errmsg() << std::endl;
224  std::terminate();
225  }
226  size_t maxpoll = pollfds_.size();
227  for (size_t i = 0; r > 0 && i < maxpoll; i++) {
228  pollfd *pfp = &pollfds_.at(i);
229  fd_state &fi = state_.at(sock_t(pfp->fd)); // XXX
230  assert (!(pfp->revents & POLLNVAL));
231  if (pfp->revents)
232  --r;
233  if (pfp->revents & (POLLIN|POLLHUP|POLLERR) && fi.rcb) {
234  if (fi.roneshot) {
235  cb_t cb {std::move(fi.rcb)};
236  fi.rcb = nullptr;
237  pfp->events &= ~POLLIN;
238  cb();
239  }
240  else
241  fi.rcb();
242  }
243  pfp = &pollfds_.at(i); // callback might have resized vector
244  if (pfp->revents & (POLLOUT|POLLHUP|POLLERR) && fi.wcb) {
245  if (fi.woneshot) {
246  cb_t cb {std::move(fi.wcb)};
247  fi.wcb = nullptr;
248  pfp->events &= ~POLLOUT;
249  cb();
250  }
251  else
252  fi.wcb();
253  }
254  }
255 
256  run_timeouts();
257  run_subtype_handlers();
258  consolidate();
259 }
260 
261 void
262 pollset::run_timeouts()
263 {
264  auto i = time_cbs_.begin();
265  if (i != time_cbs_.end()) {
266  int64_t now = now_ms();
267  while (i != time_cbs_.end() && now >= i->first) {
268  struct cleanup {
269  cb_t cb_;
270  ~cleanup() { cb_(); }
271  } c {[&]() { time_cbs_.erase(i++); }};
272  i->second();
273  }
274  }
275 }
276 
277 void
278 pollset_plus::run_subtype_handlers()
279 {
280  if (!signal_pending_)
281  return;
282 
283  // Slightly convoluted logic because A) a callback might try to
284  // change signal callbacks (so we have to release signal_owners_lock
285  // for the duration of the callback), B) callbacks can throw
286  // exceptions (and we don't want to lose other signals just because
287  // one callback throws an exception), and C) other threads could
288  // steal our signal callbacks (invalidating iterators to
289  // signal_cbs_) whenever we release signal_owners_lock.
290  std::vector<int> pending;
291  std::unique_lock<std::mutex> lk {signal_owners_lock};
292  for (auto i : signal_cbs_)
293  if (signal_flags[i.first])
294  pending.push_back(i.first);
295 
296  for (auto i : pending) {
297  auto cbi = signal_cbs_.find(i);
298  if (cbi == signal_cbs_.end())
299  continue;
300  while(signal_flags[i] & 1)
301  std::this_thread::yield();
302  signal_flags[i] = 0;
303  cb_t cb {cbi->second};
304  lk.unlock();
305  cb();
306  lk.lock();
307  }
308  signal_pending_ = false;
309 }
310 
311 void
312 pollset::consolidate()
313 {
314  while (!pollfds_.empty() && !pollfds_.back().events) {
315  auto fi = state_.find(sock_t(pollfds_.back().fd)); // XXX
316  if (fi != state_.end())
317  state_.erase(fi);
318  pollfds_.pop_back();
319  }
320 
321  int i = pollfds_.size();
322  if (i < 2)
323  return;
324  for (i -= 2; i >= 0; --i) {
325  {
326  pollfd &pfd1 = pollfds_.at(i);
327  if (pfd1.events)
328  continue;
329  auto fi = state_.find(sock_t(pfd1.fd)); // XXX
330  if (fi != state_.end())
331  state_.erase(fi);
332  }
333  const pollfd &pfd = pollfds_[i] = pollfds_.back();
334  state_.at(sock_t(pfd.fd)).idx = i; // XXX
335  pollfds_.pop_back();
336  }
337 }
338 
339 void
340 pollset_plus::signal_cb(int sig, cb_t cb)
341 {
342  if (!cb) {
343  signal_cb(sig);
344  return;
345  }
346  assert(sig > 0 && sig < num_sig);
347 
348  std::lock_guard<std::mutex> lk {signal_owners_lock};
349  signal_cbs_[sig] = std::move(cb);
350  if (signal_owners[sig] == this)
351  return;
352  if (pollset_plus *ps = signal_owners[sig]) {
353  signal_owners[sig] = this;
354  ps->signal_cbs_.erase(sig);
355  }
356  else {
357  signal_owners[sig] = this;
358  struct sigaction sa;
359  sa.sa_handler = signal_handler;
360  sigemptyset(&sa.sa_mask);
361  sa.sa_flags = 0;
362  if (sigaction(sig, &sa, nullptr) == -1) {
363  signal_owners[sig] = nullptr;
364  throw std::system_error(errno, std::system_category(), "sigaction");
365  }
366  }
367  std::atomic_thread_fence(std::memory_order_seq_cst);
368  if (signal_flags[sig])
369  wake(wake_type::Signal);
370 }
371 
372 // Assumes signal_owners_lock already held when called.
373 void
374 pollset_plus::erase_signal_cb(int sig)
375 {
376  pollset_plus *ps = signal_owners[sig];
377  if (!ps)
378  return;
379 
380  struct sigaction sa;
381  sa.sa_handler = SIG_DFL;
382  sigemptyset(&sa.sa_mask);
383  sa.sa_flags = 0;
384  if (sigaction(sig, &sa, nullptr) == -1)
385  throw std::system_error(errno, std::system_category(), "sigaction");
386 
387  signal_owners[sig] = nullptr;
388  std::atomic_thread_fence(std::memory_order_seq_cst);
389  ps->signal_cbs_.erase(sig);
390 
391  while(signal_flags[sig] & 1)
392  std::this_thread::yield();
393 
394  if (signal_flags[sig]) {
395  signal_flags[sig] = 0;
396  std::raise(sig);
397  }
398 }
399 
400 void
401 pollset_plus::signal_cb(int sig, std::nullptr_t)
402 {
403  assert(sig > 0 && sig < num_sig);
404  std::lock_guard<std::mutex> lk {signal_owners_lock};
405  erase_signal_cb(sig);
406 }
407 
408 std::int64_t
410 {
411  using namespace std::chrono;
412  return duration_cast<milliseconds>(steady_clock::now().time_since_epoch())
413  .count();
414 }
415 
416 void
418 {
419  if (t) {
420  assert(time_cbs_.find(t.i_->first) != time_cbs_.end());
421  time_cbs_.erase(t.i_);
422  t = timeout_null();
423  }
424 }
425 
426 void
428 {
429  auto i = t.i_;
430  t.i_ = time_cbs_.emplace(ms, std::move(i->second));
431  time_cbs_.erase(i);
432 }
433 
434 }
static const Timeout null_
A null timeout. Relies on static initalization.
Definition: pollset.h:133
static Timeout timeout_null()
An invalid timeout, useful for initializing PollSet::Timeout values before a timeout has been schedul...
Definition: pollset.cc:21
void timeout_cancel(Timeout &t)
Cancel a pending timeout.
Definition: pollset.cc:417
std::size_t num_cbs() const
Number of registered file decriptor and timeout callbacks.
Definition: pollset.cc:188
void signal_cb(int sig, cb_t cb)
Add a callback for a particular signal.
Definition: pollset.cc:340
Specify interest in read-ready condition.
Definition: pollset.h:34
Most of the xdrpp library is encapsulated in the xdr namespace.
Definition: arpc.cc:4
Asynchronous I/O and event harness.
static std::int64_t now_ms()
Number of milliseconds since an arbitrary but fixed time, used as the basis of all timeouts...
Definition: pollset.cc:409
void set_nonblock(sock_t s)
Set the O_NONBLOCK flag on a socket.
Definition: socket_unix.cc:70
void poll(int timeout=-1)
Go through one round of checking all file descriptors.
Definition: pollset.cc:217
Abstract class used to represent a pending timeout.
Definition: pollset.h:125
Timeout timeout(std::int64_t ms, CB &&cb)
Set a callback to run a certain number of milliseconds from now.
Definition: pollset.h:148
void wake()
Cause PollSet::poll to return if it is sleeping.
Definition: pollset.h:240
void fd_cb(sock_t s, op_t op, CB &&cb)
Set a read or write callback on a particular file descriptor.
Definition: pollset.h:108
void timeout_reschedule_at(Timeout &t, std::int64_t ms)
Reschedule a timeout to run at a specific time.
Definition: pollset.cc:427
const char * sock_errmsg()
Last socket error message (strerror(errno) on POSIX).
Definition: socket_unix.cc:25
Abstract away the type of a socket (for windows).
Definition: socket.h:28
bool pending() const override
Returns false if no file descriptor callbacks are registered and no timeouts or asynchronous events a...
Definition: pollset.cc:194
void create_selfpipe(sock_t ss[2])
Create a socket (or pipe on unix, where both are file descriptors) that is connected to itself...
Definition: socket_unix.cc:89
virtual bool pending() const
Returns false if no file descriptor callbacks are registered and no timeouts or asynchronous events a...
Definition: pollset.h:93
void set_close_on_exec(sock_t s)
Set the close-on-exec flag of a file descriptor.
Definition: socket_unix.cc:79
Adds support for signal handlers, asynchonous events, and callbacks injected from other threads to th...
Definition: pollset.h:182