xdrpp
RFC4506 XDR compiler and message library
msgsock.cc
1 
2 #include <cassert>
3 #include <cstddef>
4 #include <cstring>
5 #include <iostream>
6 #include <unistd.h>
7 #include <sys/uio.h>
8 
9 #include <xdrpp/msgsock.h>
10 #include <xdrpp/rpc_msg.hh>
11 #include <xdrpp/server.h>
12 
13 namespace xdr {
14 
15 msg_sock::~msg_sock()
16 {
17  ps_.fd_cb(s_, pollset::ReadWrite);
18  close(s_);
19  *destroyed_ = true;
20 }
21 
22 void
23 msg_sock::init()
24 {
25  set_nonblock(s_);
26  initcb();
27 }
28 
29 void
30 msg_sock::initcb()
31 {
32  if (rcb_)
33  ps_.fd_cb(s_, pollset::Read, [this](){ input(); });
34  else
35  ps_.fd_cb(s_, pollset::Read);
36 }
37 
38 void
39 msg_sock::input()
40 {
41  std::shared_ptr<bool> destroyed{destroyed_};
42  for (int i = 0; i < 3 && !*destroyed; i++) {
43  if (rdmsg_) {
44  iovec iov[2];
45  iov[0].iov_base = rdmsg_->data() + rdpos_;
46  iov[0].iov_len = rdmsg_->size() - rdpos_;
47  iov[1].iov_base = nextlenp();
48  iov[1].iov_len = sizeof nextlen_;
49  ssize_t n = readv(s_, iov, 2);
50  if (n <= 0) {
51  if (n < 0 && eagain(errno))
52  return;
53  if (n == 0)
54  errno = ECONNRESET;
55  else
56  std::cerr << "msg_sock::input: " << sock_errmsg() << std::endl;
57  rcb_(nullptr);
58  return;
59  }
60  rdpos_ += n;
61  if (rdpos_ >= rdmsg_->size()) {
62  rdpos_ -= rdmsg_->size();
63  rcb_(std::move(rdmsg_));
64  if (*destroyed)
65  return;
66  }
67  }
68  else if (rdpos_ < sizeof nextlen_) {
69  ssize_t n = read(s_, nextlenp() + rdpos_, sizeof nextlen_ - rdpos_);
70  if (n <= 0) {
71  if (n < 0 && eagain(errno))
72  return;
73  if (n == 0)
74  errno = rdpos_ ? ECONNRESET : 0;
75  else
76  std::cerr << "msg_sock::input: " << sock_errmsg() << std::endl;
77  rcb_(nullptr);
78  return;
79  }
80  rdpos_ += n;
81  }
82 
83  if (rdmsg_ || rdpos_ < sizeof nextlen_)
84  return;
85  size_t len = nextlen();
86  if (!(len & 0x80000000)) {
87  std::cerr << "msgsock: message fragments unimplemented" << std::endl;
88  errno = ECONNRESET;
89  rcb_(nullptr);
90  return;
91  }
92  len &= 0x7fffffff;
93  if (!len) {
94  rdpos_ = 0;
95  rcb_(message_t::alloc(0));
96  continue;
97  }
98 
99  if (len <= maxmsglen_) {
100  // Length comes from untrusted source; don't crash if can't alloc
101  try { rdmsg_ = message_t::alloc(len); }
102  catch (const std::bad_alloc &) {
103  std::cerr << "msg_sock: allocation of " << len << "-byte message failed"
104  << std::endl;
105  }
106  }
107  else {
108  std::cerr << "msg_sock: rejecting " << len << "-byte message (too long)"
109  << std::endl;
110  ps_.fd_cb(s_, pollset::Read);
111  }
112  if (rdmsg_)
113  rdpos_ = 0;
114  else {
115  errno = E2BIG;
116  rcb_(nullptr);
117  }
118  }
119 }
120 
121 void
122 msg_sock::putmsg(msg_ptr &mb)
123 {
124  if (wfail_) {
125  mb.reset();
126  return;
127  }
128 
129  bool was_empty = !wsize_;
130  wsize_ += mb->raw_size();
131  wqueue_.emplace_back(mb.release());
132  if (was_empty)
133  output(false);
134 }
135 
136 void
137 msg_sock::pop_wbytes(size_t n)
138 {
139  if (n == 0)
140  return;
141  assert (n <= wsize_);
142  wsize_ -= n;
143  size_t frontbytes = wqueue_.front()->raw_size() - wstart_;
144  if (n < frontbytes) {
145  wstart_ += n;
146  return;
147  }
148  n -= frontbytes;
149  wqueue_.pop_front();
150  while (n > 0 && n >= (frontbytes = wqueue_.front()->raw_size())) {
151  n -= frontbytes;
152  wqueue_.pop_front();
153  }
154  wstart_ = n;
155 }
156 
157 void
158 msg_sock::output(bool cbset)
159 {
160  static constexpr size_t maxiov = 8;
161  size_t i = 0;
162  iovec v[maxiov];
163  for (auto b = wqueue_.begin(); i < maxiov && b != wqueue_.end(); ++b, ++i) {
164  if (i) {
165  v[i].iov_len = (*b)->raw_size();
166  v[i].iov_base = const_cast<char *> ((*b)->raw_data());
167  }
168  else {
169  v[i].iov_len = (*b)->raw_size() - wstart_;
170  v[i].iov_base = const_cast<char *> ((*b)->raw_data()) + wstart_;
171  }
172  }
173  ssize_t n = writev(s_, v, i);
174  if (n <= 0) {
175  if (n != -1 || !eagain(errno)) {
176  wfail_ = true;
177  wsize_ = wstart_ = 0;
178  wqueue_.clear();
179  }
180  return;
181  }
182  pop_wbytes(n);
183 
184  if (wsize_ && !cbset)
185  ps_.fd_cb(s_, pollset::Write, [this](){ output(true); });
186  else if (!wsize_ && cbset)
187  ps_.fd_cb(s_, pollset::Write);
188 }
189 
190 void
191 rpc_sock::abort_all_calls()
192 {
193  decltype(calls_) calls(std::move(calls_));
194  calls_.clear();
195  for (auto &call : calls)
196  try { call.second(nullptr); }
197  catch (const std::exception &e) {
198  std::cerr << e.what() << std::endl;
199  }
200 }
201 
202 void
203 rpc_sock::recv_msg(msg_ptr b)
204 {
205  if (!b || b->size() < 8) {
206  abort_all_calls();
207  recv_call(nullptr);
208  }
209  else if (b->word(1) == swap32le(CALL))
210  recv_call(std::move(b));
211  else if (b->word(1) == swap32le(REPLY)) {
212  auto calli = calls_.find(b->word(0));
213  if (calli == calls_.end()) {
214  std::cerr << "ignoring reply to unknown call" << std::endl;
215  return;
216  }
217  auto cb (std::move(calli->second));
218  calls_.erase(calli);
219  cb(std::move(b));
220  }
221  else {
222  abort_all_calls();
223  recv_call(nullptr);
224  }
225 }
226 
227 void
228 rpc_sock::send_call(msg_ptr &b, msg_sock::rcb_t cb)
229 {
230  calls_.emplace(b->word(0), std::move(cb));
231  ms_->putmsg(b);
232 }
233 
234 void
235 rpc_sock::recv_call(msg_ptr b)
236 {
237  if (servcb_)
238  servcb_(std::move(b));
239  else {
240  std::cerr << "rpc_sock::recv_call: incoming call but no server"
241  << std::endl;
242  send_reply(rpc_accepted_error_msg(b->word(0), PROG_UNAVAIL));
243  }
244 }
245 
246 }
static msg_ptr alloc(std::size_t size)
Allocate a new buffer.
Definition: marshal.cc:7
Specify interest in read-ready condition.
Definition: pollset.h:34
Specify interest in write-ready condition.
Definition: pollset.h:36
Most of the xdrpp library is encapsulated in the xdr namespace.
Definition: arpc.cc:4
Constexpr std::uint32_t swap32le(std::uint32_t v)
Byteswap 32-bit value only on little-endian machines, identity function on big-endian machines...
Definition: endian.h:78
void set_nonblock(sock_t s)
Set the O_NONBLOCK flag on a socket.
Definition: socket_unix.cc:70
Classes for implementing RPC servers.
void fd_cb(sock_t s, op_t op, CB &&cb)
Set a read or write callback on a particular file descriptor.
Definition: pollset.h:108
const char * sock_errmsg()
Last socket error message (strerror(errno) on POSIX).
Definition: socket_unix.cc:25
Send and receive delimited messages over non-blocking sockets.
Valid only when removing callbacks.
Definition: pollset.h:38