3 #ifndef _XDRPP_POLLSET_H_INCLUDED_ 4 #define _XDRPP_POLLSET_H_INCLUDED_ 1 14 #include <type_traits> 15 #include <unordered_map> 25 static constexpr
int kReadFlag = 0x1;
26 static constexpr
int kWriteFlag = 0x2;
27 static constexpr
int kOnceFlag = 0x4;
45 using cb_t = std::function<void()>;
59 std::vector<pollfd> pollfds_;
60 std::unordered_map<sock_t, fd_state> state_;
63 std::multimap<std::int64_t, cb_t> time_cbs_;
67 int next_timeout(
int ms);
71 virtual void run_subtype_handlers() {}
109 if (!(fd_cb_helper(s, op) = std::forward<CB>(cb)))
122 static std::int64_t
now_ms();
126 using iterator = decltype(time_cbs_)::iterator;
128 explicit Timeout(iterator i) : i_(i) {}
129 Timeout &operator=(iterator i) { i_ = i;
return *
this; }
140 explicit operator bool()
const {
return i_ != null_.i_; }
154 return Timeout(time_cbs_.emplace(ms, std::forward<CB>(cb)));
183 enum class wake_type : std::uint8_t {
189 template<
typename R>
struct async_task {
191 std::function<R()> work_;
192 std::function<void(R)> cb_;
193 std::unique_ptr<R> rp_;
196 rp_.reset(
new R { work_() });
197 ps_->
inject_cb(std::bind(&async_task::done,
this));
200 std::unique_ptr<async_task>
self {
this};
202 cb_(std::move(*rp_));
210 std::mutex async_cbs_lock_;
211 std::vector<cb_t> async_cbs_;
212 bool async_pending_{
false};
216 static constexpr
int num_sig = 32;
217 static std::mutex signal_owners_lock;
219 static volatile std::sig_atomic_t signal_flags[num_sig];
220 bool signal_pending_{
false};
221 std::map<int, cb_t> signal_cbs_;
223 void wake(wake_type wt);
224 void run_pending_asyncs();
225 void inject_cb_vec(std::vector<cb_t>::iterator b,
226 std::vector<cb_t>::iterator e);
227 void run_subtype_handlers()
override;
228 static void signal_handler(
int);
229 static void erase_signal_cb(
int);
249 std::lock_guard<std::mutex> lk(async_cbs_lock_);
250 async_cbs_.emplace_back(std::forward<CB>(cb));
251 if (!async_pending_) {
252 async_pending_ =
true;
264 template<
typename Work,
typename CB>
void async(Work &&work, CB &&cb) {
265 using R = decltype(work());
266 async_task<R> *a =
new async_task<R> {
267 this, std::forward<Work>(work), std::forward<CB>(cb),
nullptr 270 std::thread(&async_task<R>::start, a).detach();
281 void signal_cb(
int sig, cb_t cb);
286 static void signal_cb(
int sig, std::nullptr_t =
nullptr);
291 #endif // !_XDRPP_POLLSET_H_INCLUDED_ static const Timeout null_
A null timeout. Relies on static initalization.
static Timeout timeout_null()
An invalid timeout, useful for initializing PollSet::Timeout values before a timeout has been schedul...
void timeout_cancel(Timeout &t)
Cancel a pending timeout.
std::size_t num_cbs() const
Number of registered file decriptor and timeout callbacks.
Like Read, but the callback only executes once.
Specify interest in read-ready condition.
Specify interest in write-ready condition.
Most of the xdrpp library is encapsulated in the xdr namespace.
Timeout()
After static initialization, Timeouts are null by default.
Structure to poll for a set of file descriptors and timeouts.
static std::int64_t now_ms()
Number of milliseconds since an arbitrary but fixed time, used as the basis of all timeouts...
void poll(int timeout=-1)
Go through one round of checking all file descriptors.
Timeout timeout_at(std::int64_t ms, CB &&cb)
Set a callback to run at a specific time (as returned by PollSet::now_ms()).
Abstract class used to represent a pending timeout.
Timeout timeout(std::int64_t ms, CB &&cb)
Set a callback to run a certain number of milliseconds from now.
void wake()
Cause PollSet::poll to return if it is sleeping.
void fd_cb(sock_t s, op_t op, CB &&cb)
Set a read or write callback on a particular file descriptor.
void timeout_reschedule_at(Timeout &t, std::int64_t ms)
Reschedule a timeout to run at a specific time.
std::int64_t timeout_time(Timeout t) const
Returns the absolute time (in milliseconds) at which a timeout will run.
void timeout_reschedule(Timeout &t, std::int64_t ms)
Reschedule a timeout some number of milliseconds in the future.
Abstract away the type of a socket (for windows).
void run()
Continously poll and only return on exception or when there is no more work to do.
void async(Work &&work, CB &&cb)
Execute a task asynchonously in another thread, then run callback on the task's result in the main th...
void inject_cb(CB &&cb)
Inject a callback to run immediately.
Valid only when removing callbacks.
virtual bool pending() const
Returns false if no file descriptor callbacks are registered and no timeouts or asynchronous events a...
Like Write, but the callback only executes once.
Simplified support for creating sockets.
Adds support for signal handlers, asynchonous events, and callbacks injected from other threads to th...