xdrpp
RFC4506 XDR compiler and message library
arpc.h
Go to the documentation of this file.
1 // -*- C++ -*-
2 
3 //! \file arpc.h Asynchronous RPC interface.
4 
5 #ifndef _XDRPP_ARPC_H_HEADER_INCLUDED_
6 #define _XDRPP_ARPC_H_HEADER_INCLUDED_ 1
7 
8 #include <xdrpp/exception.h>
9 #include <xdrpp/server.h>
10 #include <xdrpp/srpc.h> // XXX xdr_trace_client
11 
12 namespace xdr {
13 
14 //! A \c unique_ptr to a call result, or NULL if the call failed (in
15 //! which case \c message returns an error message).
16 template<typename T> struct call_result : std::unique_ptr<T> {
17  rpc_call_stat stat_;
18  call_result(const rpc_msg &hdr) : stat_(hdr) {
19  if (stat_)
20  this->reset(new T{});
21  }
22  call_result(rpc_call_stat::stat_type type) : stat_(type) {}
23  const char *message() const { return *this ? nullptr : stat_.message(); }
24 };
25 template<> struct call_result<void> {
26  rpc_call_stat stat_;
27  call_result(const rpc_msg &hdr) : stat_(hdr) {}
28  call_result(rpc_call_stat::stat_type type) : stat_(type) {}
29  const char *message() const { return stat_ ? nullptr : stat_.message(); }
30  explicit operator bool() const { return bool(stat_); }
31  xdr_void &operator*() { static xdr_void v; return v; }
32 };
33 
35  rpc_sock &s_;
36 
37 public:
38  asynchronous_client_base(rpc_sock &s) : s_(s) {}
40 
41  template<typename P, typename...A>
42  void invoke(const A &...a,
43  std::function<void(call_result<typename P::res_type>)> cb) {
44  rpc_msg hdr { s_.get_xid(), CALL };
45  hdr.body.cbody().rpcvers = 2;
46  hdr.body.cbody().prog = P::interface_type::program;
47  hdr.body.cbody().vers = P::interface_type::version;
48  hdr.body.cbody().proc = P::proc;
49 
50  if (xdr_trace_client) {
51  std::string s = "CALL ";
52  s += P::proc_name();
53  s += " -> [xid ";
54  s += std::to_string(hdr.xid);
55  s += "]";
56  std::clog << xdr_to_string(std::tie(a...), s.c_str());
57  }
58 
59  s_.send_call(xdr_to_msg(hdr, a...), [cb](msg_ptr m) {
60  if (!m)
61  return cb(rpc_call_stat::NETWORK_ERROR);
62  try {
63  xdr_get g(m);
64  rpc_msg hdr;
65  archive(g, hdr);
66  call_result<typename P::res_type> res(hdr);
67  if (res)
68  archive(g, *res);
69  g.done();
70 
71  if (xdr_trace_client) {
72  std::string s = "REPLY ";
73  s += P::proc_name();
74  s += " <- [xid " + std::to_string(hdr.xid) + "]";
75  if (res)
76  std::clog << xdr_to_string(*res, s.c_str());
77  else {
78  s += ": ";
79  s += res.message();
80  s += "\n";
81  std::clog << s;
82  }
83  }
84 
85  cb(std::move(res));
86  }
87  catch (const xdr_runtime_error &e) {
88  cb(rpc_call_stat::GARBAGE_RES);
89  }
90  });
91  }
92 
93  asynchronous_client_base *operator->() { return this; }
94 };
95 
96 template<typename T> using arpc_client =
97  typename T::template _xdr_client<asynchronous_client_base>;
98 
99 
100 // And now for the server
101 
102 template<typename T> class reply_cb;
103 
104 namespace detail {
105 class reply_cb_impl {
106  template<typename T> friend class xdr::reply_cb;
107  using cb_t = service_base::cb_t;
108  uint32_t xid_;
109  cb_t cb_;
110  const char *const proc_name_;
111 
112 public:
113  template<typename CB> reply_cb_impl(uint32_t xid, CB &&cb, const char *name)
114  : xid_(xid), cb_(std::forward<CB>(cb)), proc_name_(name) {}
115  reply_cb_impl(const reply_cb_impl &rcb) = delete;
116  reply_cb_impl &operator=(const reply_cb_impl &rcb) = delete;
117  ~reply_cb_impl() { if (cb_) reject(PROC_UNAVAIL); }
118 
119 private:
120  void send_reply_msg(msg_ptr &&b) {
121  assert(cb_); // If this fails you replied twice
122  cb_(std::move(b));
123  cb_ = nullptr;
124  }
125 
126  template<typename T> void send_reply(const T &t) {
127  if (xdr_trace_server) {
128  std::string s = "REPLY ";
129  s += proc_name_;
130  s += " -> [xid " + std::to_string(xid_) + "]";
131  std::clog << xdr_to_string(t, s.c_str());
132  }
133  send_reply_msg(xdr_to_msg(rpc_success_hdr(xid_), t));
134  }
135 
136  void reject(accept_stat stat) {
137  send_reply_msg(rpc_accepted_error_msg(xid_, stat));
138  }
139  void reject(auth_stat stat) {
140  send_reply_msg(rpc_auth_error_msg(xid_, stat));
141  }
142 };
143 } // namespace detail
144 
145 // Prior to C++14, it's a pain to move objects into another thread.
146 // Hence we used shared_ptr to make reply_cb copyable as well as
147 // moveable.
148 template<typename T> class reply_cb {
149  using impl_t = detail::reply_cb_impl;
150 public:
151  using type = T;
152  std::shared_ptr<impl_t> impl_;
153 
154  reply_cb() {}
155  template<typename CB> reply_cb(uint32_t xid, CB &&cb, const char *name)
156  : impl_(std::make_shared<impl_t>(xid, std::forward<CB>(cb), name)) {}
157 
158  void operator()(const type &t) const { impl_->send_reply(t); }
159  void reject(accept_stat stat) const { impl_->reject(stat); }
160  void reject(auth_stat stat) const { impl_->reject(stat); }
161 };
162 template<> class reply_cb<void> : public reply_cb<xdr_void> {
163 public:
164  using type = void;
167  void operator()() const { this->operator()(xdr_void{}); }
168 };
169 
170 template<typename T, typename Session, typename Interface>
171 class arpc_service : public service_base {
172  T &server_;
173 
174 public:
175  void process(void *session, rpc_msg &hdr, xdr_get &g, cb_t reply) override {
176  if (!check_call(hdr))
177  reply(nullptr);
178  if (!Interface::call_dispatch(*this, hdr.body.cbody().proc,
179  static_cast<Session *>(session),
180  hdr, g, std::move(reply)))
181  reply(rpc_accepted_error_msg(hdr.xid, PROC_UNAVAIL));
182  }
183 
184  template<typename P>
185  void dispatch(Session *session, rpc_msg &hdr, xdr_get &g, cb_t reply) {
187  if (!decode_arg(g, arg))
188  return reply(rpc_accepted_error_msg(hdr.xid, GARBAGE_ARGS));
189 
190  if (xdr_trace_server) {
191  std::string s = "CALL ";
192  s += P::proc_name();
193  s += " <- [xid " + std::to_string(hdr.xid) + "]";
194  std::clog << xdr_to_string(arg, s.c_str());
195  }
196 
197  dispatch_with_session<P>(server_, session, std::move(arg),
199  hdr.xid, std::move(reply), P::proc_name()});
200  }
201 
202  arpc_service(T &server)
203  : service_base(Interface::program, Interface::version),
204  server_(server) {}
205 };
206 
207 class arpc_server : public rpc_server_base {
208 public:
209  template<typename T, typename Interface = typename T::rpc_interface_type>
210  void register_service(T &t) {
211  register_service_base(new arpc_service<T, void, Interface>(t));
212  }
213  void receive(rpc_sock *ms, msg_ptr buf);
214 };
215 
216 template<typename Session = void,
217  typename SessionAllocator = session_allocator<Session>>
218 using arpc_tcp_listener =
220 
221 } // namespace xdr
222 
223 #endif // !_XDRPP_ARPC_H_HEADER_INCLUDED_
Structure that gets marshalled as an RPC success header.
Definition: server.h:27
std::tuple<> xdr_void
Placehoder type representing void values marshaled as 0 bytes.
Definition: types.h:812
A unique_ptr to a call result, or NULL if the call failed (in which case message returns an error mes...
Definition: arpc.h:16
Most of the xdrpp library is encapsulated in the xdr namespace.
Definition: arpc.cc:4
Archive type for unmarshaling from a buffer.
Definition: marshal.h:130
Definition: socket.h:47
msg_ptr xdr_to_msg(const Args &...args)
Marshal one or a series of XDR types into a newly allocated buffer referenced xdr::msg_ptr.
Definition: marshal.h:231
A wrapper around xdr::msg_sock that separates calls from replies.
Definition: msgsock.h:92
Exceptions raised by RPC calls.
Generic class of XDR unmarshaling errors.
Definition: types.h:41
Classes for implementing RPC servers.
Trivial session allocator that just calls new and delete.
Definition: server.h:176
Simple synchronous RPC functions.
Structure encoding all the various reasons a server can decline to process an RPC call it received...
Definition: exception.h:28
typename detail::wrap_transparent_ptr_helper< T >::type wrap_transparent_ptr
Wrap xdr::transparent_ptr around each type in a tuple to generate a new tuple type.
Definition: server.h:118
std::string xdr_to_string(const T &t, const char *name=nullptr, int indent=0)
Return a std::string containing a pretty-printed version an XDR data type.
Definition: printer.h:162