The Battle for Wesnoth  1.15.2+dev
wesnothd_connection.cpp
Go to the documentation of this file.
1 /*
2  Copyright (C) 2011 - 2018 by Sergey Popov <loonycyborg@gmail.com>
3  Part of the Battle for Wesnoth Project https://www.wesnoth.org/
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; either version 2 of the License, or
8  (at your option) any later version.
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY.
11 
12  See the COPYING file for more details.
13 */
14 
15 #include "wesnothd_connection.hpp"
16 
18 #include "gettext.hpp"
19 #include "log.hpp"
20 #include "serialization/parser.hpp"
21 #include "utils/functional.hpp"
22 
23 #include <SDL2/SDL_timer.h>
24 
25 #include <cstdint>
26 #include <deque>
27 
28 static lg::log_domain log_network("network");
29 #define DBG_NW LOG_STREAM(debug, log_network)
30 #define LOG_NW LOG_STREAM(info, log_network)
31 #define WRN_NW LOG_STREAM(warn, log_network)
32 #define ERR_NW LOG_STREAM(err, log_network)
33 
34 #if 0
35 // code for the travis test
36 #include <sys/types.h>
37 #include <unistd.h>
38 namespace {
39 struct mptest_log
40 {
41  mptest_log(const char* functionname)
42  {
43  WRN_NW << "Process:" << getpid() << " Thread:" << std::this_thread::get_id() << " Function: " << functionname << " Start\n";
44  }
45 };
46 }
47 
48 #define MPTEST_LOG mptest_log mptest_log__(__func__)
49 #else
50 #define MPTEST_LOG ((void)0)
51 #endif
52 
53 using boost::system::error_code;
54 using boost::system::system_error;
55 
56 // main thread
57 wesnothd_connection::wesnothd_connection(const std::string& host, const std::string& service)
58  : worker_thread_()
59  , io_service_()
60  , resolver_(io_service_)
61  , socket_(io_service_)
62  , last_error_()
63  , last_error_mutex_()
64  , handshake_finished_()
65  , read_buf_()
66  , handshake_response_()
67  , recv_queue_()
68  , recv_queue_mutex_()
69  , recv_queue_lock_()
70  , payload_size_(0)
71  , bytes_to_write_(0)
72  , bytes_written_(0)
73  , bytes_to_read_(0)
74  , bytes_read_(0)
75 {
76  MPTEST_LOG;
77  resolver_.async_resolve(boost::asio::ip::tcp::resolver::query(host, service),
78  std::bind(&wesnothd_connection::handle_resolve, this, _1, _2));
79 
80  // Starts the worker thread. Do this *after* the above async_resolve call or it will just exit immediately!
81  worker_thread_ = std::thread([this]() {
82  try {
83  io_service_.run();
84  } catch(const boost::system::system_error&) {
85  try {
86  // Attempt to pass the exception on to the handshake promise.
87  handshake_finished_.set_exception(std::current_exception());
88  } catch(const std::future_error&) {
89  // Handshake already complete. Do nothing.
90  }
91  }
92 
93  LOG_NW << "wesnothd_connection::io_service::run() returned\n";
94  });
95 
96  LOG_NW << "Resolving hostname: " << host << '\n';
97 }
98 
100 {
101  MPTEST_LOG;
102 
103  // Stop the io_service and wait for the worker thread to terminate.
104  stop();
105  worker_thread_.join();
106 }
107 
108 // worker thread
110 {
111  MPTEST_LOG;
112  if(ec) {
113  LOG_NW << __func__ << " Throwing: " << ec << "\n";
114  throw system_error(ec);
115  }
116 
117  connect(iterator);
118 }
119 
120 // worker thread
122 {
123  MPTEST_LOG;
124  socket_.async_connect(*iterator, std::bind(&wesnothd_connection::handle_connect, this, _1, iterator));
125  LOG_NW << "Connecting to " << iterator->endpoint().address() << '\n';
126 }
127 
128 // worker thread
129 void wesnothd_connection::handle_connect(const boost::system::error_code& ec, resolver::iterator iterator)
130 {
131  MPTEST_LOG;
132  if(ec) {
133  WRN_NW << "Failed to connect to " << iterator->endpoint().address() << ": " << ec.message() << '\n';
134  socket_.close();
135 
136  if(++iterator == resolver::iterator()) {
137  ERR_NW << "Tried all IPs. Giving up" << std::endl;
138  throw system_error(ec);
139  } else {
140  connect(iterator);
141  }
142  } else {
143  LOG_NW << "Connected to " << iterator->endpoint().address() << '\n';
144  handshake();
145  }
146 }
147 
148 // worker thread
150 {
151  MPTEST_LOG;
152  static const uint32_t handshake = 0;
153 
154  boost::asio::async_write(socket_, boost::asio::buffer(reinterpret_cast<const char*>(&handshake), 4),
155  [](const error_code& ec, std::size_t) { if(ec) { throw system_error(ec); } });
156 
157  boost::asio::async_read(socket_, boost::asio::buffer(&handshake_response_.binary, 4),
158  std::bind(&wesnothd_connection::handle_handshake, this, _1));
159 }
160 
161 // worker thread
162 void wesnothd_connection::handle_handshake(const error_code& ec)
163 {
164  MPTEST_LOG;
165  if(ec) {
166  LOG_NW << __func__ << " Throwing: " << ec << "\n";
167  throw system_error(ec);
168  }
169 
170  handshake_finished_.set_value();
171  recv();
172 }
173 
174 // main thread
176 {
177  MPTEST_LOG;
178  LOG_NW << "Waiting for handshake" << std::endl;
179 
180  try {
181  handshake_finished_.get_future().get();
182  } catch(const boost::system::system_error& err) {
183  if(err.code() == boost::asio::error::operation_aborted || err.code() == boost::asio::error::eof) {
184  return;
185  }
186 
187  WRN_NW << __func__ << " Rethrowing: " << err.code() << "\n";
188  throw error(err.code());
189  } catch(const std::future_error& e) {
190  if(e.code() == std::future_errc::future_already_retrieved) {
191  return;
192  }
193  }
194 }
195 
196 // main thread
198 {
199  MPTEST_LOG;
200 
201  // C++11 doesn't allow lambda captuting by moving. This could maybe use std::unique_ptr in c++14;
202  std::shared_ptr<boost::asio::streambuf> buf_ptr(new boost::asio::streambuf());
203 
204  std::ostream os(buf_ptr.get());
205  write_gz(os, request);
206 
207  // TODO: should I capture a shared_ptr for this?
208  io_service_.post([this, buf_ptr]() {
209  DBG_NW << "In wesnothd_connection::send_data::lambda\n";
210  send_queue_.push(buf_ptr);
211 
212  if(send_queue_.size() == 1) {
213  send();
214  }
215  });
216 }
217 
218 // main thread
220 {
221  MPTEST_LOG;
222  if(socket_.is_open()) {
223  boost::system::error_code ec;
224 
225 #ifdef _MSC_VER
226 // Silence warning about boost::asio::basic_socket<Protocol>::cancel always
227 // returning an error on XP, which we don't support anymore.
228 #pragma warning(push)
229 #pragma warning(disable:4996)
230 #endif
231  socket_.cancel(ec);
232 #ifdef _MSC_VER
233 #pragma warning(pop)
234 #endif
235 
236  if(ec) {
237  WRN_NW << "Failed to cancel network operations: " << ec.message() << std::endl;
238  }
239  }
240 }
241 
242 // main thread
244 {
245  // TODO: wouldn't cancel() have the same effect?
246  MPTEST_LOG;
247  io_service_.stop();
248 }
249 
250 // worker thread
251 std::size_t wesnothd_connection::is_write_complete(const boost::system::error_code& ec, std::size_t bytes_transferred)
252 {
253  MPTEST_LOG;
254  if(ec) {
255  {
256  std::lock_guard<std::mutex> lock(last_error_mutex_);
257  last_error_ = ec;
258  }
259 
260  LOG_NW << __func__ << " Error: " << ec << "\n";
261 
262  io_service_.stop();
263  return bytes_to_write_ - bytes_transferred;
264  }
265 
266  bytes_written_ = bytes_transferred;
267  return bytes_to_write_ - bytes_transferred;
268 }
269 
270 // worker thread
271 void wesnothd_connection::handle_write(const boost::system::error_code& ec, std::size_t bytes_transferred)
272 {
273  MPTEST_LOG;
274  DBG_NW << "Written " << bytes_transferred << " bytes.\n";
275 
276  send_queue_.pop();
277 
278  if(ec) {
279  {
280  std::lock_guard<std::mutex> lock(last_error_mutex_);
281  last_error_ = ec;
282  }
283 
284  LOG_NW << __func__ << " Error: " << ec << "\n";
285 
286  io_service_.stop();
287  return;
288  }
289 
290  if(!send_queue_.empty()) {
291  send();
292  }
293 }
294 
295 // worker thread
296 std::size_t wesnothd_connection::is_read_complete(const boost::system::error_code& ec, std::size_t bytes_transferred)
297 {
298  // We use custom is_write/read_complete function to be able to see the current progress of the upload/download
299  MPTEST_LOG;
300  if(ec) {
301  {
302  std::lock_guard<std::mutex> lock(last_error_mutex_);
303  last_error_ = ec;
304  }
305 
306  LOG_NW << __func__ << " Error: " << ec << "\n";
307 
308  io_service_.stop();
309  return bytes_to_read_ - bytes_transferred;
310  }
311 
312  bytes_read_ = bytes_transferred;
313 
314  if(bytes_transferred < 4) {
315  return 4;
316  }
317 
318  if(!bytes_to_read_) {
319  std::istream is(&read_buf_);
320  data_union data_size;
321 
322  is.read(data_size.binary, 4);
323  bytes_to_read_ = ntohl(data_size.num) + 4;
324 
325  // Close immediately if we receive an invalid length
326  if(bytes_to_read_ < 4) {
327  bytes_to_read_ = bytes_transferred;
328  }
329  }
330 
331  return bytes_to_read_ - bytes_transferred;
332 }
333 
334 // worker thread
335 void wesnothd_connection::handle_read(const boost::system::error_code& ec, std::size_t bytes_transferred)
336 {
337  MPTEST_LOG;
338  DBG_NW << "Read " << bytes_transferred << " bytes.\n";
339 
340  bytes_to_read_ = 0;
341  if(last_error_ && ec != boost::asio::error::eof) {
342  {
343  std::lock_guard<std::mutex> lock(last_error_mutex_);
344  last_error_ = ec;
345  }
346 
347  LOG_NW << __func__ << " Error: " << ec << "\n";
348 
349  io_service_.stop();
350  return;
351  }
352 
353  std::istream is(&read_buf_);
354  config data;
355  read_gz(data, is);
356  if(!data.empty()) { DBG_NW << "Received:\n" << data; }
357 
358  {
359  std::lock_guard<std::mutex> lock(recv_queue_mutex_);
360  recv_queue_.emplace(std::move(data));
361  recv_queue_lock_.notify_all();
362  }
363 
364  recv();
365 }
366 
367 // worker thread
369 {
370  MPTEST_LOG;
371  auto& buf = *send_queue_.front();
372 
373  std::size_t buf_size = buf.size();
374  bytes_to_write_ = buf_size + 4;
375  bytes_written_ = 0;
376  payload_size_ = htonl(buf_size);
377 
378  boost::asio::streambuf::const_buffers_type gzipped_data = buf.data();
379  std::deque<boost::asio::const_buffer> bufs(gzipped_data.begin(), gzipped_data.end());
380 
381  bufs.push_front(boost::asio::buffer(reinterpret_cast<const char*>(&payload_size_), 4));
382 
383  boost::asio::async_write(socket_, bufs,
384  std::bind(&wesnothd_connection::is_write_complete, this, _1, _2),
385  std::bind(&wesnothd_connection::handle_write, this, _1, _2));
386 }
387 
388 // worker thread
390 {
391  MPTEST_LOG;
392 
393  boost::asio::async_read(socket_, read_buf_,
394  std::bind(&wesnothd_connection::is_read_complete, this, _1, _2),
395  std::bind(&wesnothd_connection::handle_read, this, _1, _2));
396 }
397 
398 // main thread
400 {
401  MPTEST_LOG;
402 
403  {
404  std::lock_guard<std::mutex> lock(recv_queue_mutex_);
405  if(!recv_queue_.empty()) {
406  result.swap(recv_queue_.front());
407  recv_queue_.pop();
408  return true;
409  }
410  }
411 
412  {
413  std::lock_guard<std::mutex> lock(last_error_mutex_);
414  if(last_error_) {
415  std::string user_msg;
416 
417  if(last_error_ == boost::asio::error::eof) {
418  user_msg = _("Disconnected from server.");
419  }
420 
421  throw error(last_error_, user_msg);
422  }
423  }
424 
425  return false;
426 }
427 
429 {
430  {
431  std::unique_lock<std::mutex> lock(recv_queue_mutex_);
432  recv_queue_lock_.wait(lock, [this]() { return has_data_received(); });
433  }
434 
435  return receive_data(data);
436 };
437 
439 {
440  assert(stage != loading_stage::none);
441 
442  bool res = false;
445 
446  res = wait_and_receive_data(cfg);
447  });
448 
449  return res;
450 }
void send_data(const configr_of &request)
std::size_t is_read_complete(const boost::system::error_code &error, std::size_t bytes_transferred)
std::condition_variable recv_queue_lock_
std::promise< void > handshake_finished_
void handle_write(const boost::system::error_code &ec, std::size_t bytes_transferred)
void handle_resolve(const boost::system::error_code &ec, resolver::iterator iterator)
wesnothd_connection_error error
bool fetch_data_with_loading_screen(config &cfg, loading_stage stage)
static void progress(loading_stage stage=loading_stage::none)
std::size_t is_write_complete(const boost::system::error_code &error, std::size_t bytes_transferred)
void connect(resolver::iterator iterator)
bool receive_data(config &result)
void read_gz(config &cfg, std::istream &file, abstract_validator *validator)
Might throw a std::ios_base::failure especially a gzip_error.
Definition: parser.cpp:681
#define ERR_NW
#define MPTEST_LOG
#define DBG_NW
loading_stage
Loading screen stage IDs.
void swap(config &cfg)
Definition: config.cpp:1377
static UNUSEDNOWARN std::string _(const char *str)
Definition: gettext.hpp:91
data_queue< std::shared_ptr< boost::asio::streambuf > > send_queue_
void write_gz(std::ostream &out, const configr_of &cfg)
Definition: parser.cpp:781
#define WRN_NW
void handle_connect(const boost::system::error_code &ec, resolver::iterator iterator)
void handle_handshake(const boost::system::error_code &ec)
boost::asio::streambuf read_buf_
boost::system::error_code last_error_
logger & err()
Definition: log.cpp:78
boost::asio::io_service io_service_
wesnothd_connection(const wesnothd_connection &)=delete
static lg::log_domain log_network("network")
data_queue< config > recv_queue_
static void display(std::function< void()> f)
void handle_read(const boost::system::error_code &ec, std::size_t bytes_transferred)
Standard logging facilities (interface).
bool wait_and_receive_data(config &data)
Helper function that spins until data has been received.
#define e
A config object defines a single node in a WML file, with access to child nodes.
Definition: config.hpp:68
#define LOG_NW
std::string::const_iterator iterator
Definition: tokenizer.hpp:24
bool empty() const
Definition: config.cpp:884
HOTKEY_COMMAND get_id(const std::string &command)
returns get_hotkey_command(command).id