xdrpp
RFC4506 XDR compiler and message library
msgsock.h
Go to the documentation of this file.
1 // -*- C++ -*-
2 
3 //! \file msgsock.h Send and receive delimited messages over
4 //! non-blocking sockets.
5 
6 #ifndef _XDRPP_MSGSOCK_H_INCLUDED_
7 #define _XDRPP_MSGSOCK_H_INCLUDED_ 1
8 
9 #include <deque>
10 #include <xdrpp/message.h>
11 #include <xdrpp/pollset.h>
12 
13 namespace xdr {
14 
15 //! Send and receive a series of delimited messages on a stream
16 //! socket. The format (specified in RFC5531, Section 11) is simple:
17 //! A 4-byte length (in little-endian format) followed by that many
18 //! bytes. The implementation is optimized for having many sockets
19 //! each receiving a small number of messages, as opposed to receiving
20 //! many messages over the same socket.
21 //!
22 //! Currently this calls read once or twice per message to get the
23 //! exact length before allocating buffer space and reading the
24 //! message body (possibly including the next message length). This
25 //! could be fixed to read at least a little bit more data
26 //! speculatively and reduce the number of system calls.
27 class msg_sock {
28 public:
29  static constexpr std::size_t default_maxmsglen = 0x100000;
30  using rcb_t = std::function<void(msg_ptr)>;
31 
32  template<typename T> msg_sock(pollset &ps, sock_t s, T &&rcb,
33  size_t maxmsglen = default_maxmsglen)
34  : ps_(ps), s_(s), maxmsglen_(maxmsglen), rcb_(std::forward<T>(rcb)) {
35  init();
36  }
37  msg_sock(pollset &ps, sock_t s) : msg_sock(ps, s, nullptr) {}
38  ~msg_sock();
39  msg_sock &operator=(msg_sock &&) = delete;
40 
41  template<typename T> void setrcb(T &&rcb) {
42  rcb_ = std::forward<T>(rcb);
43  initcb();
44  }
45 
46  size_t wsize() const { return wsize_; }
47  void putmsg(msg_ptr &b);
48  void putmsg(msg_ptr &&b) { putmsg(b); }
49  //! Returns pointer to a \c bool that becomes \c true once the
50  //! msg_sock has been deleted.
51  std::shared_ptr<const bool> destroyed_ptr() const { return destroyed_; }
52  pollset &get_pollset() { return ps_; }
53  //! Returns the socket, but do not do IO on it. This is just for
54  //! calling things like \c getpeername.
55  sock_t get_sock() const { return s_; }
56 
57 private:
58  pollset &ps_;
59  const sock_t s_;
60  const size_t maxmsglen_;
61  std::shared_ptr<bool> destroyed_{std::make_shared<bool>(false)};
62 
63  rcb_t rcb_;
64  uint32_t nextlen_;
65  msg_ptr rdmsg_;
66  size_t rdpos_ {0};
67 
68  std::deque<msg_ptr> wqueue_;
69  size_t wsize_ {0};
70  size_t wstart_ {0};
71  bool wfail_ {false};
72 
73  static constexpr bool eagain(int err) {
74  return err == EAGAIN || err == EWOULDBLOCK || err == EINTR;
75  }
76  char *nextlenp() { return reinterpret_cast<char *>(&nextlen_); }
77  uint32_t nextlen() const { return swap32le(nextlen_); }
78 
79  void init();
80  void initcb();
81  void input();
82  void pop_wbytes(size_t n);
83  void output(bool cbset);
84 };
85 
86 //! A wrapper around xdr::msg_sock that separates calls from replies.
87 //! Incoming calls are forwarded to whatever callback is registered
88 //! with rpc_sock::set_servcb, while replies are matched up to
89 //! callbacks set with rpc_sock::send_call. Calls sent via \c
90 //! rpc_sock::send_call should already have a unique xid generated by
91 //! \c rpc_sock::get_xid().
92 class rpc_sock {
93  uint32_t xid_{0};
94  std::unordered_map<uint32_t, msg_sock::rcb_t> calls_;
95 
96  void abort_all_calls();
97  void recv_msg(msg_ptr b);
98  void recv_call(msg_ptr);
99 public:
100  std::unique_ptr<msg_sock> ms_;
101  using rcb_t = msg_sock::rcb_t;
102  rcb_t servcb_;
103 
104  template<typename T>
105  rpc_sock(pollset &ps, sock_t s, T &&t,
106  size_t maxmsglen = msg_sock::default_maxmsglen)
107  : ms_(new msg_sock(ps, s,
108  std::bind(&rpc_sock::recv_msg, this,
109  std::placeholders::_1),
110  maxmsglen)),
111  servcb_(std::forward<T>(t)) {}
112  rpc_sock(pollset &ps, sock_t s) : rpc_sock(ps, s, rcb_t(nullptr)) {}
113  ~rpc_sock() { abort_all_calls(); }
114  template<typename T> void set_servcb(T &&scb) {
115  servcb_ = std::forward<T>(scb);
116  }
117 
118  uint32_t get_xid() {
119  while (calls_.find(++xid_) != calls_.end() && xid_ != 0)
120  ;
121  return xid_;
122  }
123 
124  void send_call(msg_ptr &b, rcb_t cb);
125  void send_call(msg_ptr &&b, rcb_t cb) { send_call(b, cb); }
126  void send_reply(msg_ptr &&b) { ms_->putmsg(std::move(b)); }
127 };
128 
129 //! Functor wrapper around \c rpc_sock::send_reply. Mostly useful
130 //! because std::function implementations avoid memory allocation with
131 //! \c operator(), whereas passing any other method to \c std::bind
132 //! may require more overhead.
134  rpc_sock *ms_;
135  constexpr rpc_sock_reply_t(rpc_sock *ms) : ms_(ms) {}
136  void operator()(msg_ptr b) const { ms_->send_reply(std::move(b)); }
137 };
138 
139 } // namespace xdr
140 
141 #endif // !_XDRPP_MSGSOCK_H_INCLUDED_
Most of the xdrpp library is encapsulated in the xdr namespace.
Definition: arpc.cc:4
Constexpr std::uint32_t swap32le(std::uint32_t v)
Byteswap 32-bit value only on little-endian machines, identity function on big-endian machines...
Definition: endian.h:78
Asynchronous I/O and event harness.
Structure to poll for a set of file descriptors and timeouts.
Definition: pollset.h:23
Message buffer with space for marshaled length.
A wrapper around xdr::msg_sock that separates calls from replies.
Definition: msgsock.h:92
Send and receive a series of delimited messages on a stream socket.
Definition: msgsock.h:27
Abstract away the type of a socket (for windows).
Definition: socket.h:28
Functor wrapper around rpc_sock::send_reply.
Definition: msgsock.h:133
std::shared_ptr< const bool > destroyed_ptr() const
Returns pointer to a bool that becomes true once the msg_sock has been deleted.
Definition: msgsock.h:51
sock_t get_sock() const
Returns the socket, but do not do IO on it.
Definition: msgsock.h:55