The Battle for Wesnoth  1.19.5+dev
wesnothd_connection.cpp
Go to the documentation of this file.
1 /*
2  Copyright (C) 2011 - 2024
3  by Sergey Popov <loonycyborg@gmail.com>
4  Part of the Battle for Wesnoth Project https://www.wesnoth.org/
5 
6  This program is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation; either version 2 of the License, or
9  (at your option) any later version.
10  This program is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY.
12 
13  See the COPYING file for more details.
14 */
15 
16 #define BOOST_ASIO_NO_DEPRECATED
17 
18 #include "wesnothd_connection.hpp"
19 
20 #include "gettext.hpp"
22 #include "log.hpp"
24 #include "serialization/parser.hpp"
25 #include "tls_root_store.hpp"
26 
27 #include <boost/asio/connect.hpp>
28 #include <boost/asio/read.hpp>
29 
30 #include <cstdint>
31 #include <deque>
32 #include <functional>
33 
34 static lg::log_domain log_network("network");
35 #define DBG_NW LOG_STREAM(debug, log_network)
36 #define LOG_NW LOG_STREAM(info, log_network)
37 #define WRN_NW LOG_STREAM(warn, log_network)
38 #define ERR_NW LOG_STREAM(err, log_network)
39 
40 #if 0
41 // code for the travis test
42 #include <sys/types.h>
43 #include <unistd.h>
44 namespace {
45 struct mptest_log
46 {
47  mptest_log(const char* functionname)
48  {
49  WRN_NW << "Process:" << getpid() << " Thread:" << std::this_thread::get_id() << " Function: " << functionname << " Start";
50  }
51 };
52 }
53 #define MPTEST_LOG mptest_log mptest_log__(__func__)
54 #else
55 #define MPTEST_LOG ((void)0)
56 #endif
57 
58 using boost::system::error_code;
59 using boost::system::system_error;
60 
61 using namespace std::chrono_literals; // s, ms, etc
62 
63 // main thread
64 wesnothd_connection::wesnothd_connection(const std::string& host, const std::string& service)
65  : worker_thread_()
66  , io_context_()
67  , resolver_(io_context_)
68  , tls_context_(boost::asio::ssl::context::sslv23)
69  , host_(host)
70  , service_(service)
71  , use_tls_(true)
72  , socket_(raw_socket{ new raw_socket::element_type{io_context_} })
73  , last_error_()
74  , last_error_mutex_()
75  , handshake_finished_()
76  , read_buf_()
77  , handshake_response_()
78  , recv_queue_()
79  , recv_queue_mutex_()
80  , recv_queue_lock_()
81  , payload_size_(0)
82  , bytes_to_write_(0)
83  , bytes_written_(0)
84  , bytes_to_read_(0)
85  , bytes_read_(0)
86 {
87  MPTEST_LOG;
88 
89  error_code ec;
90  auto result = resolver_.resolve(host, service, boost::asio::ip::resolver_query_base::numeric_host, ec);
91  if(!ec) { // if numeric resolve succeeds then we got raw ip address so TLS host name validation would never pass
92  use_tls_ = false;
93  boost::asio::post(io_context_, [this, ec, result](){ handle_resolve(ec, { result } ); } );
94  } else {
95  resolver_.async_resolve(host, service,
96  std::bind(&wesnothd_connection::handle_resolve, this, std::placeholders::_1, std::placeholders::_2));
97  }
98 
99  // Starts the worker thread. Do this *after* the above async_resolve call or it will just exit immediately!
100  worker_thread_ = std::thread([this]() {
101  try {
102  io_context_.run();
103  } catch(const boost::system::system_error&) {
104  try {
105  // Attempt to pass the exception on to the handshake promise.
106  handshake_finished_.set_exception(std::current_exception());
107  } catch(const std::future_error&) {
108  // Handshake already complete. Do nothing.
109  }
110  } catch(...) {
111  DBG_NW << "wesnothd_connection worker thread threw general exception: " << utils::get_unknown_exception_type();
112  }
113 
114  LOG_NW << "wesnothd_connection::io_service::run() returned";
115  });
116 
117  LOG_NW << "Resolving hostname: " << host;
118 }
119 
121 {
122  MPTEST_LOG;
123 
124  if(auto socket = utils::get_if<tls_socket>(&socket_)) {
125  error_code ec;
126  // this sends close_notify for secure connection shutdown
127  (*socket)->async_shutdown([](const error_code&) {} );
128  const char buffer[] = "";
129  // this write is needed to trigger immediate close instead of waiting for other side's close_notify
130  boost::asio::write(**socket, boost::asio::buffer(buffer, 0), ec);
131  }
132  // Stop the io_service and wait for the worker thread to terminate.
133  stop();
134  worker_thread_.join();
135 }
136 
137 // worker thread
138 void wesnothd_connection::handle_resolve(const error_code& ec, results_type results)
139 {
140  MPTEST_LOG;
141  if(ec) {
142  LOG_NW << __func__ << " Throwing: " << ec;
143  throw system_error(ec);
144  }
145 
146  boost::asio::async_connect(*utils::get<raw_socket>(socket_), results,
147  std::bind(&wesnothd_connection::handle_connect, this, std::placeholders::_1, std::placeholders::_2));
148 }
149 
150 // worker thread
151 void wesnothd_connection::handle_connect(const boost::system::error_code& ec, endpoint endpoint)
152 {
153  MPTEST_LOG;
154  if(ec) {
155  ERR_NW << "Tried all IPs. Giving up";
156  throw system_error(ec);
157  } else {
158  LOG_NW << "Connected to " << endpoint.address();
159 
160  if(endpoint.address().is_loopback()) {
161  use_tls_ = false;
162  }
163  handshake();
164  }
165 }
166 
167 // worker thread
169 {
170  MPTEST_LOG;
171 
172  DBG_NW << "Connecting with keepalive of: " << prefs::get().keepalive_timeout();
173  set_keepalive(prefs::get().keepalive_timeout());
174 
175  static const uint32_t handshake = 0;
176  static const uint32_t tls_handshake = htonl(uint32_t(1));
177 
178  boost::asio::async_write(*utils::get<raw_socket>(socket_), boost::asio::buffer(use_tls_ ? reinterpret_cast<const char*>(&tls_handshake) : reinterpret_cast<const char*>(&handshake), 4),
179  [](const error_code& ec, std::size_t) { if(ec) { throw system_error(ec); } });
180  boost::asio::async_read(*utils::get<raw_socket>(socket_), boost::asio::buffer(reinterpret_cast<std::byte*>(&handshake_response_), 4),
181  std::bind(&wesnothd_connection::handle_handshake, this, std::placeholders::_1));
182 }
183 
184 template<typename Verifier> auto verbose_verify(Verifier&& verifier)
185 {
186  return [verifier](bool preverified, boost::asio::ssl::verify_context& ctx) {
187  char subject_name[256];
188  X509* cert = X509_STORE_CTX_get_current_cert(ctx.native_handle());
189  X509_NAME_oneline(X509_get_subject_name(cert), subject_name, 256);
190  bool verified = verifier(preverified, ctx);
191  DBG_NW << "Verifying TLS certificate: " << subject_name << ": " <<
192  (verified ? "verified" : "failed");
193  BIO* bio = BIO_new(BIO_s_mem());
194  char buffer[1024];
195  X509_print(bio, cert);
196  while(BIO_read(bio, buffer, 1024) > 0)
197  {
198  DBG_NW << buffer;
199  }
200  BIO_free(bio);
201  return verified;
202  };
203 }
204 
205 // worker thread
206 void wesnothd_connection::handle_handshake(const error_code& ec)
207 {
208  MPTEST_LOG;
209  if(ec) {
210  if(ec == boost::asio::error::eof && use_tls_) {
211  // immediate disconnect likely means old server not supporting TLS handshake code
213  return;
214  }
215  LOG_NW << __func__ << " Throwing: " << ec;
216  throw system_error(ec);
217  }
218 
219  if(use_tls_) {
220  if(handshake_response_ == 0xFFFFFFFFU) {
221  use_tls_ = false;
222  handle_handshake(ec);
223  return;
224  }
225 
226  if(handshake_response_ == 0x00000000) {
228  raw_socket s { std::move(utils::get<raw_socket>(socket_)) };
229  tls_socket ts { new tls_socket::element_type{std::move(*s), tls_context_} };
230  socket_ = std::move(ts);
231 
232  auto& socket { *utils::get<tls_socket>(socket_) };
233 
234  socket.set_verify_mode(
235  boost::asio::ssl::verify_peer |
236  boost::asio::ssl::verify_fail_if_no_peer_cert
237  );
238 
239 #if BOOST_VERSION >= 107300
240  socket.set_verify_callback(verbose_verify(boost::asio::ssl::host_name_verification(host_)));
241 #else
242  socket.set_verify_callback(verbose_verify(boost::asio::ssl::rfc2818_verification(host_)));
243 #endif
244 
245  socket.async_handshake(boost::asio::ssl::stream_base::client, [this](const error_code& ec) {
246  if(ec) {
247  LOG_NW << __func__ << " Throwing: " << ec;
248  throw system_error(ec);
249  }
250 
251  handshake_finished_.set_value();
252  recv();
253  });
254  return;
255  }
256 
258  } else {
259  handshake_finished_.set_value();
260  recv();
261  }
262 }
263 
264 // worker thread
266 {
267  assert(use_tls_ == true);
268  use_tls_ = false;
269 
270  boost::asio::ip::tcp::endpoint endpoint { utils::get<raw_socket>(socket_)->remote_endpoint() };
271  utils::get<raw_socket>(socket_)->close();
272 
273  utils::get<raw_socket>(socket_)->async_connect(endpoint,
274  std::bind(&wesnothd_connection::handle_connect, this, std::placeholders::_1, endpoint));
275 }
276 
277 // main thread
279 {
280  MPTEST_LOG;
281  LOG_NW << "Waiting for handshake";
282 
283  try {
284  // TODO: make this duration customizable. Should default to 1 minute.
285  auto timeout = 60s;
286 
287  auto future = handshake_finished_.get_future();
288  for(auto time = 0ms;
289  future.wait_for(10ms) == std::future_status::timeout
290  && time < timeout;
291  time += 10ms)
292  {
294  }
295 
296  switch(future.wait_for(0ms)) {
297  case std::future_status::ready:
298  // This is a void future, so this just serves to re-throw any system_error exceptions
299  // stored by the worker thread. Additional handling occurs in the catch block below.
300  future.get();
301  break;
302  case std::future_status::timeout:
303  throw error(boost::asio::error::make_error_code(boost::asio::error::timed_out));
304  default:
305  break;
306  }
307  } catch(const boost::system::system_error& err) {
308  if(err.code() == boost::asio::error::operation_aborted || err.code() == boost::asio::error::eof) {
309  return;
310  }
311 
312  WRN_NW << __func__ << " Rethrowing: " << err.code();
313  throw error(err.code());
314  } catch(const std::future_error& e) {
315  if(e.code() == std::future_errc::future_already_retrieved) {
316  return;
317  }
318  }
319 }
320 
321 // main thread
323 {
324  MPTEST_LOG;
325 
326  auto buf_ptr = std::make_unique<boost::asio::streambuf>();
327 
328  std::ostream os(buf_ptr.get());
329  write_gz(os, request);
330 
331  boost::asio::post(io_context_, [this, buf_ptr = std::move(buf_ptr)]() mutable {
332 
333  DBG_NW << "In wesnothd_connection::send_data::lambda";
334  send_queue_.push(std::move(buf_ptr));
335 
336  if(send_queue_.size() == 1) {
337  send();
338  }
339  });
340 }
341 
342 // main thread
344 {
345  MPTEST_LOG;
346  utils::visit([](auto&& socket) {
347  if(socket->lowest_layer().is_open()) {
348  boost::system::error_code ec;
349 
350 #ifdef _MSC_VER
351 // Silence warning about boost::asio::basic_socket<Protocol>::cancel always
352 // returning an error on XP, which we don't support anymore.
353 #pragma warning(push)
354 #pragma warning(disable:4996)
355 #endif
356  socket->lowest_layer().cancel(ec);
357 #ifdef _MSC_VER
358 #pragma warning(pop)
359 #endif
360 
361  if(ec) {
362  WRN_NW << "Failed to cancel network operations: " << ec.message();
363  }
364  }
365  }, socket_);
366 }
367 
368 // main thread
370 {
371  // TODO: wouldn't cancel() have the same effect?
372  MPTEST_LOG;
373  io_context_.stop();
374 }
375 
376 // worker thread
377 std::size_t wesnothd_connection::is_write_complete(const boost::system::error_code& ec, std::size_t bytes_transferred)
378 {
379  MPTEST_LOG;
380  if(ec) {
381  {
382  std::scoped_lock lock(last_error_mutex_);
383  last_error_ = ec;
384  }
385 
386  LOG_NW << __func__ << " Error: " << ec;
387 
388  io_context_.stop();
389  return bytes_to_write_ - bytes_transferred;
390  }
391 
392  bytes_written_ = bytes_transferred;
393  return bytes_to_write_ - bytes_transferred;
394 }
395 
396 // worker thread
397 void wesnothd_connection::handle_write(const boost::system::error_code& ec, std::size_t bytes_transferred)
398 {
399  MPTEST_LOG;
400  DBG_NW << "Written " << bytes_transferred << " bytes.";
401 
402  send_queue_.pop();
403 
404  if(ec) {
405  {
406  std::scoped_lock lock(last_error_mutex_);
407  last_error_ = ec;
408  }
409 
410  LOG_NW << __func__ << " Error: " << ec;
411 
412  io_context_.stop();
413  return;
414  }
415 
416  if(!send_queue_.empty()) {
417  send();
418  }
419 }
420 
421 // worker thread
422 std::size_t wesnothd_connection::is_read_complete(const boost::system::error_code& ec, std::size_t bytes_transferred)
423 {
424  // We use custom is_write/read_complete function to be able to see the current progress of the upload/download
425  MPTEST_LOG;
426  if(ec) {
427  {
428  std::scoped_lock lock(last_error_mutex_);
429  last_error_ = ec;
430  }
431 
432  LOG_NW << __func__ << " Error: " << ec;
433 
434  io_context_.stop();
435  return bytes_to_read_ - bytes_transferred;
436  }
437 
438  bytes_read_ = bytes_transferred;
439 
440  if(bytes_transferred < 4) {
441  return 4;
442  }
443 
444  if(!bytes_to_read_) {
445  std::istream is(&read_buf_);
446  uint32_t data_size;
447 
448  is.read(reinterpret_cast<char*>(&data_size), 4);
449  bytes_to_read_ = ntohl(data_size) + 4;
450 
451  // Close immediately if we receive an invalid length
452  if(bytes_to_read_ < 4) {
453  bytes_to_read_ = bytes_transferred;
454  }
455  }
456 
457  return bytes_to_read_ - bytes_transferred;
458 }
459 
460 // worker thread
461 void wesnothd_connection::handle_read(const boost::system::error_code& ec, std::size_t bytes_transferred)
462 {
463  MPTEST_LOG;
464  DBG_NW << "Read " << bytes_transferred << " bytes.";
465 
466  bytes_to_read_ = 0;
467  if(last_error_ && ec != boost::asio::error::eof) {
468  {
469  std::scoped_lock lock(last_error_mutex_);
470  last_error_ = ec;
471  }
472 
473  LOG_NW << __func__ << " Error: " << ec;
474 
475  io_context_.stop();
476  return;
477  }
478 
479  std::istream is(&read_buf_);
480  config data;
481  read_gz(data, is);
482  if(!data.empty()) { DBG_NW << "Received:\n" << data; }
483 
484  {
485  std::scoped_lock lock(recv_queue_mutex_);
486  recv_queue_.emplace(std::move(data));
487  recv_queue_lock_.notify_all();
488  }
489 
490  recv();
491 }
492 
493 // worker thread
495 {
496  MPTEST_LOG;
497  auto& buf = *send_queue_.front();
498 
499  std::size_t buf_size = buf.size();
500  bytes_to_write_ = buf_size + 4;
501  bytes_written_ = 0;
502  payload_size_ = htonl(buf_size);
503 
504  std::deque<boost::asio::const_buffer> bufs {
505  boost::asio::buffer(reinterpret_cast<const char*>(&payload_size_), 4),
506  buf.data()
507  };
508 
509  utils::visit([this, &bufs](auto&& socket) {
510  boost::asio::async_write(*socket, bufs,
511  std::bind(&wesnothd_connection::is_write_complete, this, std::placeholders::_1, std::placeholders::_2),
512  std::bind(&wesnothd_connection::handle_write, this, std::placeholders::_1, std::placeholders::_2));
513  }, socket_);
514 }
515 
516 // worker thread
518 {
519  MPTEST_LOG;
520 
521  utils::visit([this](auto&& socket) {
522  boost::asio::async_read(*socket, read_buf_,
523  std::bind(&wesnothd_connection::is_read_complete, this, std::placeholders::_1, std::placeholders::_2),
524  std::bind(&wesnothd_connection::handle_read, this, std::placeholders::_1, std::placeholders::_2));
525  }, socket_);
526 }
527 
528 // main thread
530 {
531  MPTEST_LOG;
532 
533  {
534  std::scoped_lock lock(recv_queue_mutex_);
535  if(!recv_queue_.empty()) {
536  result.swap(recv_queue_.front());
537  recv_queue_.pop();
538  return true;
539  }
540  }
541 
542  {
543  std::scoped_lock lock(last_error_mutex_);
544  if(last_error_) {
545  std::string user_msg;
546 
547  if(last_error_ == boost::asio::error::eof) {
548  user_msg = _("Disconnected from server.");
549  }
550 
551  throw error(last_error_, user_msg);
552  }
553  }
554 
555  return false;
556 }
557 
559 {
560  {
561  std::unique_lock<std::mutex> lock(recv_queue_mutex_);
562  while(!recv_queue_lock_.wait_for(
563  lock, 10ms, [this]() { return has_data_received(); }))
564  {
566  }
567  }
568 
569  return receive_data(data);
570 };
571 
573 {
574  boost::asio::socket_base::keep_alive option(true);
575  utils::get<raw_socket>(socket_)->set_option(option);
576 
577 #ifdef __linux__
578  int timeout = 10;
579  int cnt = std::max((seconds - 10) / 10, 1);
580  int interval = 10;
581  setsockopt(utils::get<raw_socket>(socket_)->native_handle(), SOL_TCP, TCP_KEEPIDLE, &timeout, sizeof(timeout));
582  setsockopt(utils::get<raw_socket>(socket_)->native_handle(), SOL_TCP, TCP_KEEPCNT, &cnt, sizeof(cnt));
583  setsockopt(utils::get<raw_socket>(socket_)->native_handle(), SOL_TCP, TCP_KEEPINTVL, &interval, sizeof(interval));
584 #elif defined(__APPLE__) && defined(__MACH__)
585  setsockopt(utils::get<raw_socket>(socket_)->native_handle(), IPPROTO_TCP, TCP_KEEPALIVE, &seconds, sizeof(seconds));
586 #elif defined(_WIN32)
587  // these are in milliseconds for windows
588  DWORD timeout_ms = seconds * 1000;
589  setsockopt(utils::get<raw_socket>(socket_)->native_handle(), SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<const char*>(&timeout_ms), sizeof(timeout_ms));
590  setsockopt(utils::get<raw_socket>(socket_)->native_handle(), SOL_SOCKET, SO_SNDTIMEO, reinterpret_cast<const char*>(&timeout_ms), sizeof(timeout_ms));
591 #endif
592 }
A config object defines a single node in a WML file, with access to child nodes.
Definition: config.hpp:172
void swap(config &cfg)
Definition: config.cpp:1336
static void spin()
Indicate to the player that loading is progressing.
static prefs & get()
int keepalive_timeout()
boost::asio::ssl::context tls_context_
wesnothd_connection_error error
void handle_handshake(const boost::system::error_code &ec)
const boost::asio::ip::tcp::endpoint & endpoint
void handle_connect(const boost::system::error_code &ec, endpoint endpoint)
void handle_resolve(const boost::system::error_code &ec, results_type results)
data_queue< std::unique_ptr< boost::asio::streambuf > > send_queue_
std::condition_variable recv_queue_lock_
std::size_t is_write_complete(const boost::system::error_code &error, std::size_t bytes_transferred)
wesnothd_connection(const wesnothd_connection &)=delete
data_queue< config > recv_queue_
void wait_for_handshake()
Waits until the server handshake is complete.
boost::asio::io_context io_context_
bool receive_data(config &result)
Receives the next pending data pack from the server, if available.
void handle_write(const boost::system::error_code &ec, std::size_t bytes_transferred)
void send_data(const configr_of &request)
Queues the given data to be sent to the server.
void set_keepalive(int seconds)
resolver::results_type results_type
boost::asio::streambuf read_buf_
bool wait_and_receive_data(config &data)
Unlike receive_data, waits until data is available instead of returning immediately.
void handle_read(const boost::system::error_code &ec, std::size_t bytes_transferred)
std::unique_ptr< boost::asio::ip::tcp::socket > raw_socket
boost::system::error_code last_error_
std::unique_ptr< boost::asio::ssl::stream< raw_socket::element_type > > tls_socket
std::promise< void > handshake_finished_
std::size_t is_read_complete(const boost::system::error_code &error, std::size_t bytes_transferred)
static std::string _(const char *str)
Definition: gettext.hpp:93
Standard logging facilities (interface).
logger & err()
Definition: log.cpp:307
void load_tls_root_certs(boost::asio::ssl::context &ctx)
std::string get_unknown_exception_type()
Utility function for finding the type of thing caught with catch(...).
Definition: general.cpp:23
std::string_view data
Definition: picture.cpp:178
void write_gz(std::ostream &out, const configr_of &cfg)
Definition: parser.cpp:778
void write(std::ostream &out, const configr_of &cfg, unsigned int level)
Definition: parser.cpp:759
void read_gz(config &cfg, std::istream &file, abstract_validator *validator)
Might throw a std::ios_base::failure especially a gzip_error.
Definition: parser.cpp:678
static map_location::direction s
#define WRN_NW
#define ERR_NW
auto verbose_verify(Verifier &&verifier)
#define LOG_NW
static lg::log_domain log_network("network")
#define DBG_NW
#define MPTEST_LOG
#define e