10 #include <xdrpp/rpc_msg.hh> 41 std::shared_ptr<bool> destroyed{destroyed_};
42 for (
int i = 0; i < 3 && !*destroyed; i++) {
45 iov[0].iov_base = rdmsg_->data() + rdpos_;
46 iov[0].iov_len = rdmsg_->size() - rdpos_;
47 iov[1].iov_base = nextlenp();
48 iov[1].iov_len =
sizeof nextlen_;
49 ssize_t n = readv(s_, iov, 2);
51 if (n < 0 && eagain(errno))
56 std::cerr <<
"msg_sock::input: " <<
sock_errmsg() << std::endl;
61 if (rdpos_ >= rdmsg_->size()) {
62 rdpos_ -= rdmsg_->size();
63 rcb_(std::move(rdmsg_));
68 else if (rdpos_ <
sizeof nextlen_) {
69 ssize_t n = read(s_, nextlenp() + rdpos_,
sizeof nextlen_ - rdpos_);
71 if (n < 0 && eagain(errno))
74 errno = rdpos_ ? ECONNRESET : 0;
76 std::cerr <<
"msg_sock::input: " <<
sock_errmsg() << std::endl;
83 if (rdmsg_ || rdpos_ <
sizeof nextlen_)
85 size_t len = nextlen();
86 if (!(len & 0x80000000)) {
87 std::cerr <<
"msgsock: message fragments unimplemented" << std::endl;
99 if (len <= maxmsglen_) {
102 catch (
const std::bad_alloc &) {
103 std::cerr <<
"msg_sock: allocation of " << len <<
"-byte message failed" 108 std::cerr <<
"msg_sock: rejecting " << len <<
"-byte message (too long)" 122 msg_sock::putmsg(msg_ptr &mb)
129 bool was_empty = !wsize_;
130 wsize_ += mb->raw_size();
131 wqueue_.emplace_back(mb.release());
137 msg_sock::pop_wbytes(
size_t n)
141 assert (n <= wsize_);
143 size_t frontbytes = wqueue_.front()->raw_size() - wstart_;
144 if (n < frontbytes) {
150 while (n > 0 && n >= (frontbytes = wqueue_.front()->raw_size())) {
158 msg_sock::output(
bool cbset)
160 static constexpr
size_t maxiov = 8;
163 for (
auto b = wqueue_.begin(); i < maxiov && b != wqueue_.end(); ++b, ++i) {
165 v[i].iov_len = (*b)->raw_size();
166 v[i].iov_base =
const_cast<char *
> ((*b)->raw_data());
169 v[i].iov_len = (*b)->raw_size() - wstart_;
170 v[i].iov_base =
const_cast<char *
> ((*b)->raw_data()) + wstart_;
173 ssize_t n = writev(s_, v, i);
175 if (n != -1 || !eagain(errno)) {
177 wsize_ = wstart_ = 0;
184 if (wsize_ && !cbset)
186 else if (!wsize_ && cbset)
191 rpc_sock::abort_all_calls()
193 decltype(calls_) calls(std::move(calls_));
195 for (
auto &call : calls)
196 try { call.second(
nullptr); }
197 catch (
const std::exception &e) {
198 std::cerr << e.what() << std::endl;
203 rpc_sock::recv_msg(msg_ptr b)
205 if (!b || b->size() < 8) {
209 else if (b->word(1) ==
swap32le(CALL))
210 recv_call(std::move(b));
211 else if (b->word(1) ==
swap32le(REPLY)) {
212 auto calli = calls_.find(b->word(0));
213 if (calli == calls_.end()) {
214 std::cerr <<
"ignoring reply to unknown call" << std::endl;
217 auto cb (std::move(calli->second));
228 rpc_sock::send_call(msg_ptr &b, msg_sock::rcb_t cb)
230 calls_.emplace(b->word(0), std::move(cb));
235 rpc_sock::recv_call(msg_ptr b)
238 servcb_(std::move(b));
240 std::cerr <<
"rpc_sock::recv_call: incoming call but no server" 242 send_reply(rpc_accepted_error_msg(b->word(0), PROG_UNAVAIL));
static msg_ptr alloc(std::size_t size)
Allocate a new buffer.
Specify interest in read-ready condition.
Specify interest in write-ready condition.
Most of the xdrpp library is encapsulated in the xdr namespace.
Constexpr std::uint32_t swap32le(std::uint32_t v)
Byteswap 32-bit value only on little-endian machines, identity function on big-endian machines...
void set_nonblock(sock_t s)
Set the O_NONBLOCK flag on a socket.
Classes for implementing RPC servers.
void fd_cb(sock_t s, op_t op, CB &&cb)
Set a read or write callback on a particular file descriptor.
const char * sock_errmsg()
Last socket error message (strerror(errno) on POSIX).
Send and receive delimited messages over non-blocking sockets.
Valid only when removing callbacks.