9 #include <system_error> 16 std::mutex pollset_plus::signal_owners_lock;
17 pollset_plus *pollset_plus::signal_owners[num_sig];
18 volatile std::sig_atomic_t pollset_plus::signal_flags[num_sig];
23 static decltype(pollset::time_cbs_) dummy_time_cbs_;
24 return Timeout{dummy_time_cbs_.end()};
29 pollset_plus::signal_handler(
int sig)
31 assert(sig > 0 && sig < num_sig);
32 if (signal_flags[sig])
34 signal_flags[sig] = 1;
35 std::atomic_thread_fence(std::memory_order_seq_cst);
41 ps->wake(wake_type::Signal);
42 std::atomic_thread_fence(std::memory_order_seq_cst);
43 signal_flags[sig] = 2;
46 pollset_plus::pollset_plus()
53 this->
fd_cb(selfpipe_[0],
Read, [
this](){ this->run_pending_asyncs(); });
56 pollset_plus::~pollset_plus()
59 std::lock_guard<std::mutex> lk {signal_owners_lock};
60 while (signal_cbs_.begin() != signal_cbs_.end())
61 erase_signal_cb(signal_cbs_.begin()->first);
69 pollset::fd_state::~fd_state()
72 assert (!rcb && !wcb);
78 static_assert(
sizeof wt == 1,
"uint8_t enum has wrong size");
79 write(selfpipe_[1], &wt, 1);
83 pollset_plus::run_pending_asyncs()
85 std::vector<cb_t> cbs;
86 std::vector<cb_t>::iterator i;
95 ~cleanup() {
if (active) action(); }
96 } c {
false, [&](){inject_cb_vec(i+1, cbs.end());} };
101 while ((n = read(selfpipe_[0], buf,
sizeof buf)) > 0)
102 for (
int i = 0; i < n && !signal_pending_; i++)
103 if (buf[i] == wake_type::Signal)
104 signal_pending_ =
true;
107 std::lock_guard<std::mutex> lk {async_cbs_lock_};
108 async_pending_ =
false;
109 swap(cbs, async_cbs_);
112 for (i = cbs.begin(), c.active =
true; i != cbs.end(); i++)
118 pollset_plus::inject_cb_vec(std::vector<cb_t>::iterator b,
119 std::vector<cb_t>::iterator e)
122 std::lock_guard<std::mutex> lk {async_cbs_lock_};
123 std::move(b, e, async_cbs_.end());
124 if (!async_pending_) {
125 async_pending_ =
true;
134 fd_state &fs = state_[s];
137 fs.idx = pollfds_.size();
138 pollfds_.resize(fs.idx + 1);
139 pfdp = &pollfds_.back();
143 pfdp = &pollfds_.at(fs.idx);
144 assert (pfdp->fd == s.fd_);
146 if (op & kReadFlag) {
147 if (op & kWriteFlag) {
148 std::cerr <<
"Illegal call to pollset::fd_cb with ReadWrite" 152 fs.roneshot = op & kOnceFlag;
153 pfdp->events |= POLLIN;
156 else if (op & kWriteFlag) {
157 fs.woneshot = op & kOnceFlag;
158 pfdp->events |= POLLOUT;
162 std::cerr <<
"Illegal call to pollset::fd_cb with" 163 " neither Read nor Write" 172 auto fi = state_.find(s);
173 if (fi == state_.end())
175 pollfd &pfd = pollfds_.at(fi->second.idx);
177 if (op & kReadFlag) {
178 pfd.events &= ~POLLIN;
179 fi->second.rcb =
nullptr;
181 if (op & kWriteFlag) {
182 pfd.events &= ~POLLOUT;
183 fi->second.wcb =
nullptr;
190 return pollfds_.size() + time_cbs_.size();
200 pollset::next_timeout(
int ms)
202 auto next = time_cbs_.begin();
203 if (next == time_cbs_.end())
206 if (now >= next->first)
208 int64_t wait = next->first - now;
209 if (wait > std::numeric_limits<int>::max())
210 wait = std::numeric_limits<int>::max();
211 if (ms >= 0 && ms <= wait)
219 int r =
::poll(pollfds_.data(), pollfds_.size(), next_timeout(timeout));
223 std::cerr <<
"poll: " <<
sock_errmsg() << std::endl;
226 size_t maxpoll = pollfds_.size();
227 for (
size_t i = 0; r > 0 && i < maxpoll; i++) {
228 pollfd *pfp = &pollfds_.at(i);
229 fd_state &fi = state_.at(
sock_t(pfp->fd));
230 assert (!(pfp->revents & POLLNVAL));
233 if (pfp->revents & (POLLIN|POLLHUP|POLLERR) && fi.rcb) {
235 cb_t cb {std::move(fi.rcb)};
237 pfp->events &= ~POLLIN;
243 pfp = &pollfds_.at(i);
244 if (pfp->revents & (POLLOUT|POLLHUP|POLLERR) && fi.wcb) {
246 cb_t cb {std::move(fi.wcb)};
248 pfp->events &= ~POLLOUT;
257 run_subtype_handlers();
262 pollset::run_timeouts()
264 auto i = time_cbs_.begin();
265 if (i != time_cbs_.end()) {
267 while (i != time_cbs_.end() && now >= i->first) {
270 ~cleanup() { cb_(); }
271 } c {[&]() { time_cbs_.erase(i++); }};
278 pollset_plus::run_subtype_handlers()
280 if (!signal_pending_)
291 std::unique_lock<std::mutex> lk {signal_owners_lock};
292 for (
auto i : signal_cbs_)
293 if (signal_flags[i.first])
294 pending.push_back(i.first);
296 for (
auto i : pending) {
297 auto cbi = signal_cbs_.find(i);
298 if (cbi == signal_cbs_.end())
300 while(signal_flags[i] & 1)
301 std::this_thread::yield();
303 cb_t cb {cbi->second};
308 signal_pending_ =
false;
312 pollset::consolidate()
314 while (!pollfds_.empty() && !pollfds_.back().events) {
315 auto fi = state_.find(
sock_t(pollfds_.back().fd));
316 if (fi != state_.end())
321 int i = pollfds_.size();
324 for (i -= 2; i >= 0; --i) {
326 pollfd &pfd1 = pollfds_.at(i);
329 auto fi = state_.find(
sock_t(pfd1.fd));
330 if (fi != state_.end())
333 const pollfd &pfd = pollfds_[i] = pollfds_.back();
334 state_.at(
sock_t(pfd.fd)).idx = i;
346 assert(sig > 0 && sig < num_sig);
348 std::lock_guard<std::mutex> lk {signal_owners_lock};
349 signal_cbs_[sig] = std::move(cb);
350 if (signal_owners[sig] ==
this)
353 signal_owners[sig] =
this;
354 ps->signal_cbs_.erase(sig);
357 signal_owners[sig] =
this;
359 sa.sa_handler = signal_handler;
360 sigemptyset(&sa.sa_mask);
362 if (sigaction(sig, &sa,
nullptr) == -1) {
363 signal_owners[sig] =
nullptr;
364 throw std::system_error(errno, std::system_category(),
"sigaction");
367 std::atomic_thread_fence(std::memory_order_seq_cst);
368 if (signal_flags[sig])
369 wake(wake_type::Signal);
374 pollset_plus::erase_signal_cb(
int sig)
381 sa.sa_handler = SIG_DFL;
382 sigemptyset(&sa.sa_mask);
384 if (sigaction(sig, &sa,
nullptr) == -1)
385 throw std::system_error(errno, std::system_category(),
"sigaction");
387 signal_owners[sig] =
nullptr;
388 std::atomic_thread_fence(std::memory_order_seq_cst);
389 ps->signal_cbs_.erase(sig);
391 while(signal_flags[sig] & 1)
392 std::this_thread::yield();
394 if (signal_flags[sig]) {
395 signal_flags[sig] = 0;
403 assert(sig > 0 && sig < num_sig);
404 std::lock_guard<std::mutex> lk {signal_owners_lock};
405 erase_signal_cb(sig);
411 using namespace std::chrono;
412 return duration_cast<milliseconds>(steady_clock::now().time_since_epoch())
420 assert(time_cbs_.find(t.i_->first) != time_cbs_.end());
421 time_cbs_.erase(t.i_);
430 t.i_ = time_cbs_.emplace(ms, std::move(i->second));
static const Timeout null_
A null timeout. Relies on static initalization.
static Timeout timeout_null()
An invalid timeout, useful for initializing PollSet::Timeout values before a timeout has been schedul...
void timeout_cancel(Timeout &t)
Cancel a pending timeout.
std::size_t num_cbs() const
Number of registered file decriptor and timeout callbacks.
void signal_cb(int sig, cb_t cb)
Add a callback for a particular signal.
Specify interest in read-ready condition.
Most of the xdrpp library is encapsulated in the xdr namespace.
Asynchronous I/O and event harness.
static std::int64_t now_ms()
Number of milliseconds since an arbitrary but fixed time, used as the basis of all timeouts...
void set_nonblock(sock_t s)
Set the O_NONBLOCK flag on a socket.
void poll(int timeout=-1)
Go through one round of checking all file descriptors.
Abstract class used to represent a pending timeout.
Timeout timeout(std::int64_t ms, CB &&cb)
Set a callback to run a certain number of milliseconds from now.
void wake()
Cause PollSet::poll to return if it is sleeping.
void fd_cb(sock_t s, op_t op, CB &&cb)
Set a read or write callback on a particular file descriptor.
void timeout_reschedule_at(Timeout &t, std::int64_t ms)
Reschedule a timeout to run at a specific time.
const char * sock_errmsg()
Last socket error message (strerror(errno) on POSIX).
Abstract away the type of a socket (for windows).
bool pending() const override
Returns false if no file descriptor callbacks are registered and no timeouts or asynchronous events a...
void create_selfpipe(sock_t ss[2])
Create a socket (or pipe on unix, where both are file descriptors) that is connected to itself...
virtual bool pending() const
Returns false if no file descriptor callbacks are registered and no timeouts or asynchronous events a...
void set_close_on_exec(sock_t s)
Set the close-on-exec flag of a file descriptor.
Adds support for signal handlers, asynchonous events, and callbacks injected from other threads to th...