The Battle for Wesnoth  1.17.0-dev
wesnothd_connection.cpp
Go to the documentation of this file.
1 /*
2  Copyright (C) 2011 - 2021
3  by Sergey Popov <loonycyborg@gmail.com>
4  Part of the Battle for Wesnoth Project https://www.wesnoth.org/
5 
6  This program is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation; either version 2 of the License, or
9  (at your option) any later version.
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY.
12 
13  See the COPYING file for more details.
14 */
15 
16 #define BOOST_ASIO_NO_DEPRECATED
17 
18 #include "wesnothd_connection.hpp"
19 
20 #include "gettext.hpp"
21 #include "log.hpp"
22 #include "serialization/parser.hpp"
23 #include "tls_root_store.hpp"
24 
25 #include <boost/asio/connect.hpp>
26 #include <boost/asio/read.hpp>
27 #include <boost/asio/write.hpp>
28 
29 #include <cstdint>
30 #include <deque>
31 #include <functional>
32 
33 static lg::log_domain log_network("network");
34 #define DBG_NW LOG_STREAM(debug, log_network)
35 #define LOG_NW LOG_STREAM(info, log_network)
36 #define WRN_NW LOG_STREAM(warn, log_network)
37 #define ERR_NW LOG_STREAM(err, log_network)
38 
39 #if 0
40 // code for the travis test
41 #include <sys/types.h>
42 #include <unistd.h>
43 namespace {
44 struct mptest_log
45 {
46  mptest_log(const char* functionname)
47  {
48  WRN_NW << "Process:" << getpid() << " Thread:" << std::this_thread::get_id() << " Function: " << functionname << " Start\n";
49  }
50 };
51 }
52 #define MPTEST_LOG mptest_log mptest_log__(__func__)
53 #else
54 #define MPTEST_LOG ((void)0)
55 #endif
56 
57 using boost::system::error_code;
58 using boost::system::system_error;
59 
60 // main thread
61 wesnothd_connection::wesnothd_connection(const std::string& host, const std::string& service)
62  : worker_thread_()
63  , io_context_()
64  , resolver_(io_context_)
65  , tls_context_(boost::asio::ssl::context::sslv23)
66  , host_(host)
67  , service_(service)
68  , use_tls_(true)
69  , socket_(raw_socket{ new raw_socket::element_type{io_context_} })
70  , last_error_()
73  , read_buf_()
75  , recv_queue_()
78  , payload_size_(0)
79  , bytes_to_write_(0)
80  , bytes_written_(0)
81  , bytes_to_read_(0)
82  , bytes_read_(0)
83 {
84  MPTEST_LOG;
85 
86  error_code ec;
87  auto result = resolver_.resolve(host, service, boost::asio::ip::resolver_query_base::numeric_host, ec);
88  if(!ec) { // if numeric resolve succeeds then we got raw ip address so TLS host name validation would never pass
89  use_tls_ = false;
90  boost::asio::post(io_context_, [this, ec, result](){ handle_resolve(ec, { result } ); } );
91  } else {
92  resolver_.async_resolve(host, service,
93  std::bind(&wesnothd_connection::handle_resolve, this, std::placeholders::_1, std::placeholders::_2));
94  }
95 
96  // Starts the worker thread. Do this *after* the above async_resolve call or it will just exit immediately!
97  worker_thread_ = std::thread([this]() {
98  try {
99  io_context_.run();
100  } catch(const boost::system::system_error&) {
101  try {
102  // Attempt to pass the exception on to the handshake promise.
103  handshake_finished_.set_exception(std::current_exception());
104  } catch(const std::future_error&) {
105  // Handshake already complete. Do nothing.
106  }
107  } catch(...) {
108  }
109 
110  LOG_NW << "wesnothd_connection::io_service::run() returned\n";
111  });
112 
113  LOG_NW << "Resolving hostname: " << host << '\n';
114 }
115 
117 {
118  MPTEST_LOG;
119 
120  if(auto socket = utils::get_if<tls_socket>(&socket_)) {
121  error_code ec;
122  // this sends close_notify for secure connection shutdown
123  (*socket)->async_shutdown([](const error_code&) {} );
124  const char buffer[] = "";
125  // this write is needed to trigger immediate close instead of waiting for other side's close_notify
126  boost::asio::write(**socket, boost::asio::buffer(buffer, 0), ec);
127  }
128  // Stop the io_service and wait for the worker thread to terminate.
129  stop();
130  worker_thread_.join();
131 }
132 
133 // worker thread
134 void wesnothd_connection::handle_resolve(const error_code& ec, results_type results)
135 {
136  MPTEST_LOG;
137  if(ec) {
138  LOG_NW << __func__ << " Throwing: " << ec << "\n";
139  throw system_error(ec);
140  }
141 
142  boost::asio::async_connect(*utils::get<raw_socket>(socket_), results,
143  std::bind(&wesnothd_connection::handle_connect, this, std::placeholders::_1, std::placeholders::_2));
144 }
145 
146 // worker thread
147 void wesnothd_connection::handle_connect(const boost::system::error_code& ec, endpoint endpoint)
148 {
149  MPTEST_LOG;
150  if(ec) {
151  ERR_NW << "Tried all IPs. Giving up" << std::endl;
152  throw system_error(ec);
153  } else {
154  LOG_NW << "Connected to " << endpoint.address() << '\n';
155 
156  if(endpoint.address().is_loopback()) {
157  use_tls_ = false;
158  }
159  handshake();
160  }
161 }
162 
163 // worker thread
165 {
166  MPTEST_LOG;
167  static const uint32_t handshake = 0;
168  static const uint32_t tls_handshake = htonl(uint32_t(1));
169 
170  boost::asio::async_write(*utils::get<raw_socket>(socket_), boost::asio::buffer(use_tls_ ? reinterpret_cast<const char*>(&tls_handshake) : reinterpret_cast<const char*>(&handshake), 4),
171  [](const error_code& ec, std::size_t) { if(ec) { throw system_error(ec); } });
172  boost::asio::async_read(*utils::get<raw_socket>(socket_), boost::asio::buffer(reinterpret_cast<std::byte*>(&handshake_response_), 4),
173  std::bind(&wesnothd_connection::handle_handshake, this, std::placeholders::_1));
174 }
175 
176 // worker thread
177 void wesnothd_connection::handle_handshake(const error_code& ec)
178 {
179  MPTEST_LOG;
180  if(ec) {
181  if(ec == boost::asio::error::eof && use_tls_) {
182  // immediate disconnect likely means old server not supporting TLS handshake code
184  return;
185  }
186  LOG_NW << __func__ << " Throwing: " << ec << "\n";
187  throw system_error(ec);
188  }
189 
190  if(use_tls_) {
191  if(handshake_response_ == 0xFFFFFFFFU) {
192  use_tls_ = false;
193  handle_handshake(ec);
194  return;
195  }
196 
197  if(handshake_response_ == 0x00000000) {
199  raw_socket s { std::move(utils::get<raw_socket>(socket_)) };
200  tls_socket ts { new tls_socket::element_type{std::move(*s), tls_context_} };
201  socket_ = std::move(ts);
202 
203  auto& socket { *utils::get<tls_socket>(socket_) };
204 
205  socket.set_verify_mode(
206  boost::asio::ssl::verify_peer |
207  boost::asio::ssl::verify_fail_if_no_peer_cert
208  );
209 
210 #if BOOST_VERSION >= 107300
211  socket.set_verify_callback(boost::asio::ssl::host_name_verification(host_));
212 #else
213  socket.set_verify_callback(boost::asio::ssl::rfc2818_verification(host_));
214 #endif
215 
216  socket.async_handshake(boost::asio::ssl::stream_base::client, [this](const error_code& ec) {
217  if(ec) {
218  LOG_NW << __func__ << " Throwing: " << ec << "\n";
219  throw system_error(ec);
220  }
221 
222  handshake_finished_.set_value();
223  recv();
224  });
225  return;
226  }
227 
229  } else {
230  handshake_finished_.set_value();
231  recv();
232  }
233 }
234 
235 // worker thread
237 {
238  assert(use_tls_ == true);
239  use_tls_ = false;
240 
241  boost::asio::ip::tcp::endpoint endpoint { utils::get<raw_socket>(socket_)->remote_endpoint() };
242  utils::get<raw_socket>(socket_)->close();
243 
244  utils::get<raw_socket>(socket_)->async_connect(endpoint,
245  std::bind(&wesnothd_connection::handle_connect, this, std::placeholders::_1, endpoint));
246 }
247 
248 // main thread
250 {
251  MPTEST_LOG;
252  LOG_NW << "Waiting for handshake" << std::endl;
253 
254  try {
255  // TODO: make this duration customizable. Should default to 1 minute.
256  const std::chrono::seconds timeout { 60 };
257 
258  switch(auto future = handshake_finished_.get_future(); future.wait_for(timeout)) {
259  case std::future_status::ready:
260  // This is a void future, so this just serves to re-throw any system_error exceptions
261  // stored by the worker thread. Additional handling occurs in the catch block below.
262  future.get();
263  break;
264  case std::future_status::timeout:
265  throw error(boost::asio::error::make_error_code(boost::asio::error::timed_out));
266  default:
267  break;
268  }
269  } catch(const boost::system::system_error& err) {
270  if(err.code() == boost::asio::error::operation_aborted || err.code() == boost::asio::error::eof) {
271  return;
272  }
273 
274  WRN_NW << __func__ << " Rethrowing: " << err.code() << "\n";
275  throw error(err.code());
276  } catch(const std::future_error& e) {
277  if(e.code() == std::future_errc::future_already_retrieved) {
278  return;
279  }
280  }
281 }
282 
283 // main thread
285 {
286  MPTEST_LOG;
287 
288  auto buf_ptr = std::make_unique<boost::asio::streambuf>();
289 
290  std::ostream os(buf_ptr.get());
291  write_gz(os, request);
292 
293  boost::asio::post(io_context_, [this, buf_ptr = std::move(buf_ptr)]() mutable {
294 
295  DBG_NW << "In wesnothd_connection::send_data::lambda\n";
296  send_queue_.push(std::move(buf_ptr));
297 
298  if(send_queue_.size() == 1) {
299  send();
300  }
301  });
302 }
303 
304 // main thread
306 {
307  MPTEST_LOG;
308  utils::visit([](auto&& socket) {
309  if(socket->lowest_layer().is_open()) {
310  boost::system::error_code ec;
311 
312 #ifdef _MSC_VER
313 // Silence warning about boost::asio::basic_socket<Protocol>::cancel always
314 // returning an error on XP, which we don't support anymore.
315 #pragma warning(push)
316 #pragma warning(disable:4996)
317 #endif
318  socket->lowest_layer().cancel(ec);
319 #ifdef _MSC_VER
320 #pragma warning(pop)
321 #endif
322 
323  if(ec) {
324  WRN_NW << "Failed to cancel network operations: " << ec.message() << std::endl;
325  }
326  }
327  }, socket_);
328 }
329 
330 // main thread
332 {
333  // TODO: wouldn't cancel() have the same effect?
334  MPTEST_LOG;
335  io_context_.stop();
336 }
337 
338 // worker thread
339 std::size_t wesnothd_connection::is_write_complete(const boost::system::error_code& ec, std::size_t bytes_transferred)
340 {
341  MPTEST_LOG;
342  if(ec) {
343  {
344  std::scoped_lock lock(last_error_mutex_);
345  last_error_ = ec;
346  }
347 
348  LOG_NW << __func__ << " Error: " << ec << "\n";
349 
350  io_context_.stop();
351  return bytes_to_write_ - bytes_transferred;
352  }
353 
354  bytes_written_ = bytes_transferred;
355  return bytes_to_write_ - bytes_transferred;
356 }
357 
358 // worker thread
359 void wesnothd_connection::handle_write(const boost::system::error_code& ec, std::size_t bytes_transferred)
360 {
361  MPTEST_LOG;
362  DBG_NW << "Written " << bytes_transferred << " bytes.\n";
363 
364  send_queue_.pop();
365 
366  if(ec) {
367  {
368  std::scoped_lock lock(last_error_mutex_);
369  last_error_ = ec;
370  }
371 
372  LOG_NW << __func__ << " Error: " << ec << "\n";
373 
374  io_context_.stop();
375  return;
376  }
377 
378  if(!send_queue_.empty()) {
379  send();
380  }
381 }
382 
383 // worker thread
384 std::size_t wesnothd_connection::is_read_complete(const boost::system::error_code& ec, std::size_t bytes_transferred)
385 {
386  // We use custom is_write/read_complete function to be able to see the current progress of the upload/download
387  MPTEST_LOG;
388  if(ec) {
389  {
390  std::scoped_lock lock(last_error_mutex_);
391  last_error_ = ec;
392  }
393 
394  LOG_NW << __func__ << " Error: " << ec << "\n";
395 
396  io_context_.stop();
397  return bytes_to_read_ - bytes_transferred;
398  }
399 
400  bytes_read_ = bytes_transferred;
401 
402  if(bytes_transferred < 4) {
403  return 4;
404  }
405 
406  if(!bytes_to_read_) {
407  std::istream is(&read_buf_);
408  uint32_t data_size;
409 
410  is.read(reinterpret_cast<char*>(&data_size), 4);
411  bytes_to_read_ = ntohl(data_size) + 4;
412 
413  // Close immediately if we receive an invalid length
414  if(bytes_to_read_ < 4) {
415  bytes_to_read_ = bytes_transferred;
416  }
417  }
418 
419  return bytes_to_read_ - bytes_transferred;
420 }
421 
422 // worker thread
423 void wesnothd_connection::handle_read(const boost::system::error_code& ec, std::size_t bytes_transferred)
424 {
425  MPTEST_LOG;
426  DBG_NW << "Read " << bytes_transferred << " bytes.\n";
427 
428  bytes_to_read_ = 0;
429  if(last_error_ && ec != boost::asio::error::eof) {
430  {
431  std::scoped_lock lock(last_error_mutex_);
432  last_error_ = ec;
433  }
434 
435  LOG_NW << __func__ << " Error: " << ec << "\n";
436 
437  io_context_.stop();
438  return;
439  }
440 
441  std::istream is(&read_buf_);
442  config data;
443  read_gz(data, is);
444  if(!data.empty()) { DBG_NW << "Received:\n" << data; }
445 
446  {
447  std::scoped_lock lock(recv_queue_mutex_);
448  recv_queue_.emplace(std::move(data));
449  recv_queue_lock_.notify_all();
450  }
451 
452  recv();
453 }
454 
455 // worker thread
457 {
458  MPTEST_LOG;
459  auto& buf = *send_queue_.front();
460 
461  std::size_t buf_size = buf.size();
462  bytes_to_write_ = buf_size + 4;
463  bytes_written_ = 0;
464  payload_size_ = htonl(buf_size);
465 
466  std::deque<boost::asio::const_buffer> bufs {
467  boost::asio::buffer(reinterpret_cast<const char*>(&payload_size_), 4),
468  buf.data()
469  };
470 
471  utils::visit([this, &bufs](auto&& socket) {
472  boost::asio::async_write(*socket, bufs,
473  std::bind(&wesnothd_connection::is_write_complete, this, std::placeholders::_1, std::placeholders::_2),
474  std::bind(&wesnothd_connection::handle_write, this, std::placeholders::_1, std::placeholders::_2));
475  }, socket_);
476 }
477 
478 // worker thread
480 {
481  MPTEST_LOG;
482 
483  utils::visit([this](auto&& socket) {
484  boost::asio::async_read(*socket, read_buf_,
485  std::bind(&wesnothd_connection::is_read_complete, this, std::placeholders::_1, std::placeholders::_2),
486  std::bind(&wesnothd_connection::handle_read, this, std::placeholders::_1, std::placeholders::_2));
487  }, socket_);
488 }
489 
490 // main thread
492 {
493  MPTEST_LOG;
494 
495  {
496  std::scoped_lock lock(recv_queue_mutex_);
497  if(!recv_queue_.empty()) {
498  result.swap(recv_queue_.front());
499  recv_queue_.pop();
500  return true;
501  }
502  }
503 
504  {
505  std::scoped_lock lock(last_error_mutex_);
506  if(last_error_) {
507  std::string user_msg;
508 
509  if(last_error_ == boost::asio::error::eof) {
510  user_msg = _("Disconnected from server.");
511  }
512 
513  throw error(last_error_, user_msg);
514  }
515  }
516 
517  return false;
518 }
519 
521 {
522  {
523  std::unique_lock<std::mutex> lock(recv_queue_mutex_);
524  recv_queue_lock_.wait(lock, [this]() { return has_data_received(); });
525  }
526 
527  return receive_data(data);
528 };
void send_data(const configr_of &request)
Queues the given data to be sent to the server.
std::size_t is_read_complete(const boost::system::error_code &error, std::size_t bytes_transferred)
std::condition_variable recv_queue_lock_
std::promise< void > handshake_finished_
boost::asio::ssl::context tls_context_
void handle_write(const boost::system::error_code &ec, std::size_t bytes_transferred)
wesnothd_connection_error error
resolver::results_type results_type
boost::asio::io_context io_context_
std::size_t is_write_complete(const boost::system::error_code &error, std::size_t bytes_transferred)
void handle_connect(const boost::system::error_code &ec, endpoint endpoint)
void load_tls_root_certs(boost::asio::ssl::context &ctx)
bool receive_data(config &result)
Receives the next pending data pack from the server, if available.
void read_gz(config &cfg, std::istream &file, abstract_validator *validator)
Might throw a std::ios_base::failure especially a gzip_error.
Definition: parser.cpp:683
static std::string _(const char *str)
Definition: gettext.hpp:93
#define ERR_NW
#define MPTEST_LOG
#define DBG_NW
std::unique_ptr< boost::asio::ssl::stream< raw_socket::element_type > > tls_socket
void write(std::ostream &out, const configr_of &cfg, unsigned int level)
Definition: parser.cpp:764
void swap(config &cfg)
Definition: config.cpp:1447
void write_gz(std::ostream &out, const configr_of &cfg)
Definition: parser.cpp:783
#define WRN_NW
void handle_handshake(const boost::system::error_code &ec)
boost::asio::streambuf read_buf_
boost::system::error_code last_error_
logger & err()
Definition: log.cpp:77
static map_location::DIRECTION s
wesnothd_connection(const wesnothd_connection &)=delete
static lg::log_domain log_network("network")
data_queue< config > recv_queue_
void handle_read(const boost::system::error_code &ec, std::size_t bytes_transferred)
const boost::asio::ip::tcp::endpoint & endpoint
data_queue< std::unique_ptr< boost::asio::streambuf > > send_queue_
Standard logging facilities (interface).
void handle_resolve(const boost::system::error_code &ec, results_type results)
bool wait_and_receive_data(config &data)
Unlike receive_data, waits until data is available instead of returning immediately.
std::unique_ptr< boost::asio::ip::tcp::socket > raw_socket
#define e
A config object defines a single node in a WML file, with access to child nodes.
Definition: config.hpp:61
#define LOG_NW
bool empty() const
Definition: config.cpp:941
HOTKEY_COMMAND get_id(const std::string &command)
returns get_hotkey_command(command).id
void wait_for_handshake()
Waits until the server handshake is complete.