The Battle for Wesnoth  1.15.12+dev
wesnothd_connection.cpp
Go to the documentation of this file.
1 /*
2  Copyright (C) 2011 - 2018 by Sergey Popov <loonycyborg@gmail.com>
3  Part of the Battle for Wesnoth Project https://www.wesnoth.org/
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; either version 2 of the License, or
8  (at your option) any later version.
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY.
11 
12  See the COPYING file for more details.
13 */
14 
15 #define BOOST_ASIO_NO_DEPRECATED
16 
17 #include "wesnothd_connection.hpp"
18 
19 #include "gettext.hpp"
20 #include "log.hpp"
21 #include "serialization/parser.hpp"
22 
23 #include <boost/asio/connect.hpp>
24 #include <boost/asio/read.hpp>
25 #include <boost/asio/write.hpp>
26 
27 #include <cstdint>
28 #include <deque>
29 #include <functional>
30 
31 static lg::log_domain log_network("network");
32 #define DBG_NW LOG_STREAM(debug, log_network)
33 #define LOG_NW LOG_STREAM(info, log_network)
34 #define WRN_NW LOG_STREAM(warn, log_network)
35 #define ERR_NW LOG_STREAM(err, log_network)
36 
37 #if 0
38 // code for the travis test
39 #include <sys/types.h>
40 #include <unistd.h>
41 namespace {
42 struct mptest_log
43 {
44  mptest_log(const char* functionname)
45  {
46  WRN_NW << "Process:" << getpid() << " Thread:" << std::this_thread::get_id() << " Function: " << functionname << " Start\n";
47  }
48 };
49 }
50 #define MPTEST_LOG mptest_log mptest_log__(__func__)
51 #else
52 #define MPTEST_LOG ((void)0)
53 #endif
54 
55 using boost::system::error_code;
56 using boost::system::system_error;
57 
58 // main thread
59 wesnothd_connection::wesnothd_connection(const std::string& host, const std::string& service)
60  : worker_thread_()
61  , io_context_()
62  , resolver_(io_context_)
63  , socket_(io_context_)
64  , last_error_()
65  , last_error_mutex_()
66  , handshake_finished_()
67  , read_buf_()
68  , handshake_response_()
69  , recv_queue_()
70  , recv_queue_mutex_()
71  , recv_queue_lock_()
72  , payload_size_(0)
73  , bytes_to_write_(0)
74  , bytes_written_(0)
75  , bytes_to_read_(0)
76  , bytes_read_(0)
77 {
78  MPTEST_LOG;
79 #if BOOST_VERSION >= 106600
80  resolver_.async_resolve(host, service,
81 #else
82  resolver_.async_resolve(boost::asio::ip::tcp::resolver::query(host, service),
83 #endif
84  std::bind(&wesnothd_connection::handle_resolve, this, std::placeholders::_1, std::placeholders::_2));
85 
86  // Starts the worker thread. Do this *after* the above async_resolve call or it will just exit immediately!
87  worker_thread_ = std::thread([this]() {
88  try {
89  io_context_.run();
90  } catch(const boost::system::system_error&) {
91  try {
92  // Attempt to pass the exception on to the handshake promise.
93  handshake_finished_.set_exception(std::current_exception());
94  } catch(const std::future_error&) {
95  // Handshake already complete. Do nothing.
96  }
97  } catch(...) {
98  }
99 
100  LOG_NW << "wesnothd_connection::io_service::run() returned\n";
101  });
102 
103  LOG_NW << "Resolving hostname: " << host << '\n';
104 }
105 
107 {
108  MPTEST_LOG;
109 
110  // Stop the io_service and wait for the worker thread to terminate.
111  stop();
112  worker_thread_.join();
113 }
114 
115 // worker thread
116 void wesnothd_connection::handle_resolve(const error_code& ec, results_type results)
117 {
118  MPTEST_LOG;
119  if(ec) {
120  LOG_NW << __func__ << " Throwing: " << ec << "\n";
121  throw system_error(ec);
122  }
123 
124  boost::asio::async_connect(socket_, results,
125  std::bind(&wesnothd_connection::handle_connect, this, std::placeholders::_1, std::placeholders::_2));
126 }
127 
128 // worker thread
129 void wesnothd_connection::handle_connect(const boost::system::error_code& ec, endpoint endpoint)
130 {
131  MPTEST_LOG;
132  if(ec) {
133  ERR_NW << "Tried all IPs. Giving up" << std::endl;
134  throw system_error(ec);
135  } else {
136 #if BOOST_VERSION >= 106600
137  LOG_NW << "Connected to " << endpoint.address() << '\n';
138 #else
139  LOG_NW << "Connected to " << endpoint->endpoint().address() << '\n';
140 #endif
141  handshake();
142  }
143 }
144 
145 // worker thread
147 {
148  MPTEST_LOG;
149  static const uint32_t handshake = 0;
150 
151  boost::asio::async_write(socket_, boost::asio::buffer(reinterpret_cast<const char*>(&handshake), 4),
152  [](const error_code& ec, std::size_t) { if(ec) { throw system_error(ec); } });
153 
154  boost::asio::async_read(socket_, boost::asio::buffer(&handshake_response_.binary, 4),
155  std::bind(&wesnothd_connection::handle_handshake, this, std::placeholders::_1));
156 }
157 
158 // worker thread
159 void wesnothd_connection::handle_handshake(const error_code& ec)
160 {
161  MPTEST_LOG;
162  if(ec) {
163  LOG_NW << __func__ << " Throwing: " << ec << "\n";
164  throw system_error(ec);
165  }
166 
167  handshake_finished_.set_value();
168  recv();
169 }
170 
171 // main thread
173 {
174  MPTEST_LOG;
175  LOG_NW << "Waiting for handshake" << std::endl;
176 
177  try {
178  handshake_finished_.get_future().get();
179  } catch(const boost::system::system_error& err) {
180  if(err.code() == boost::asio::error::operation_aborted || err.code() == boost::asio::error::eof) {
181  return;
182  }
183 
184  WRN_NW << __func__ << " Rethrowing: " << err.code() << "\n";
185  throw error(err.code());
186  } catch(const std::future_error& e) {
187  if(e.code() == std::future_errc::future_already_retrieved) {
188  return;
189  }
190  }
191 }
192 
193 // main thread
195 {
196  MPTEST_LOG;
197 
198 #if BOOST_VERSION >= 106600
199  auto buf_ptr = std::make_unique<boost::asio::streambuf>();
200 #else
201  auto buf_ptr = std::make_shared<boost::asio::streambuf>();
202 #endif
203 
204  std::ostream os(buf_ptr.get());
205  write_gz(os, request);
206 
207  // No idea why io_context::post doesn't like this lambda while asio::post does.
208 #if BOOST_VERSION >= 106600
209  boost::asio::post(io_context_, [this, buf_ptr = std::move(buf_ptr)]() mutable {
210 #else
211  io_context_.post([this, buf_ptr]() {
212 #endif
213  DBG_NW << "In wesnothd_connection::send_data::lambda\n";
214  send_queue_.push(std::move(buf_ptr));
215 
216  if(send_queue_.size() == 1) {
217  send();
218  }
219  });
220 }
221 
222 // main thread
224 {
225  MPTEST_LOG;
226  if(socket_.is_open()) {
227  boost::system::error_code ec;
228 
229 #ifdef _MSC_VER
230 // Silence warning about boost::asio::basic_socket<Protocol>::cancel always
231 // returning an error on XP, which we don't support anymore.
232 #pragma warning(push)
233 #pragma warning(disable:4996)
234 #endif
235  socket_.cancel(ec);
236 #ifdef _MSC_VER
237 #pragma warning(pop)
238 #endif
239 
240  if(ec) {
241  WRN_NW << "Failed to cancel network operations: " << ec.message() << std::endl;
242  }
243  }
244 }
245 
246 // main thread
248 {
249  // TODO: wouldn't cancel() have the same effect?
250  MPTEST_LOG;
251  io_context_.stop();
252 }
253 
254 // worker thread
255 std::size_t wesnothd_connection::is_write_complete(const boost::system::error_code& ec, std::size_t bytes_transferred)
256 {
257  MPTEST_LOG;
258  if(ec) {
259  {
260  std::lock_guard lock(last_error_mutex_);
261  last_error_ = ec;
262  }
263 
264  LOG_NW << __func__ << " Error: " << ec << "\n";
265 
266  io_context_.stop();
267  return bytes_to_write_ - bytes_transferred;
268  }
269 
270  bytes_written_ = bytes_transferred;
271  return bytes_to_write_ - bytes_transferred;
272 }
273 
274 // worker thread
275 void wesnothd_connection::handle_write(const boost::system::error_code& ec, std::size_t bytes_transferred)
276 {
277  MPTEST_LOG;
278  DBG_NW << "Written " << bytes_transferred << " bytes.\n";
279 
280  send_queue_.pop();
281 
282  if(ec) {
283  {
284  std::lock_guard lock(last_error_mutex_);
285  last_error_ = ec;
286  }
287 
288  LOG_NW << __func__ << " Error: " << ec << "\n";
289 
290  io_context_.stop();
291  return;
292  }
293 
294  if(!send_queue_.empty()) {
295  send();
296  }
297 }
298 
299 // worker thread
300 std::size_t wesnothd_connection::is_read_complete(const boost::system::error_code& ec, std::size_t bytes_transferred)
301 {
302  // We use custom is_write/read_complete function to be able to see the current progress of the upload/download
303  MPTEST_LOG;
304  if(ec) {
305  {
306  std::lock_guard lock(last_error_mutex_);
307  last_error_ = ec;
308  }
309 
310  LOG_NW << __func__ << " Error: " << ec << "\n";
311 
312  io_context_.stop();
313  return bytes_to_read_ - bytes_transferred;
314  }
315 
316  bytes_read_ = bytes_transferred;
317 
318  if(bytes_transferred < 4) {
319  return 4;
320  }
321 
322  if(!bytes_to_read_) {
323  std::istream is(&read_buf_);
324  data_union data_size;
325 
326  is.read(data_size.binary, 4);
327  bytes_to_read_ = ntohl(data_size.num) + 4;
328 
329  // Close immediately if we receive an invalid length
330  if(bytes_to_read_ < 4) {
331  bytes_to_read_ = bytes_transferred;
332  }
333  }
334 
335  return bytes_to_read_ - bytes_transferred;
336 }
337 
338 // worker thread
339 void wesnothd_connection::handle_read(const boost::system::error_code& ec, std::size_t bytes_transferred)
340 {
341  MPTEST_LOG;
342  DBG_NW << "Read " << bytes_transferred << " bytes.\n";
343 
344  bytes_to_read_ = 0;
345  if(last_error_ && ec != boost::asio::error::eof) {
346  {
347  std::lock_guard lock(last_error_mutex_);
348  last_error_ = ec;
349  }
350 
351  LOG_NW << __func__ << " Error: " << ec << "\n";
352 
353  io_context_.stop();
354  return;
355  }
356 
357  std::istream is(&read_buf_);
358  config data;
359  read_gz(data, is);
360  if(!data.empty()) { DBG_NW << "Received:\n" << data; }
361 
362  {
363  std::lock_guard lock(recv_queue_mutex_);
364  recv_queue_.emplace(std::move(data));
365  recv_queue_lock_.notify_all();
366  }
367 
368  recv();
369 }
370 
371 // worker thread
373 {
374  MPTEST_LOG;
375  auto& buf = *send_queue_.front();
376 
377  std::size_t buf_size = buf.size();
378  bytes_to_write_ = buf_size + 4;
379  bytes_written_ = 0;
380  payload_size_ = htonl(buf_size);
381 
382  std::deque<boost::asio::const_buffer> bufs {
383  boost::asio::buffer(reinterpret_cast<const char*>(&payload_size_), 4),
384  buf.data()
385  };
386 
387  boost::asio::async_write(socket_, bufs,
388  std::bind(&wesnothd_connection::is_write_complete, this, std::placeholders::_1, std::placeholders::_2),
389  std::bind(&wesnothd_connection::handle_write, this, std::placeholders::_1, std::placeholders::_2));
390 }
391 
392 // worker thread
394 {
395  MPTEST_LOG;
396 
397  boost::asio::async_read(socket_, read_buf_,
398  std::bind(&wesnothd_connection::is_read_complete, this, std::placeholders::_1, std::placeholders::_2),
399  std::bind(&wesnothd_connection::handle_read, this, std::placeholders::_1, std::placeholders::_2));
400 }
401 
402 // main thread
404 {
405  MPTEST_LOG;
406 
407  {
408  std::lock_guard lock(recv_queue_mutex_);
409  if(!recv_queue_.empty()) {
410  result.swap(recv_queue_.front());
411  recv_queue_.pop();
412  return true;
413  }
414  }
415 
416  {
417  std::lock_guard lock(last_error_mutex_);
418  if(last_error_) {
419  std::string user_msg;
420 
421  if(last_error_ == boost::asio::error::eof) {
422  user_msg = _("Disconnected from server.");
423  }
424 
425  throw error(last_error_, user_msg);
426  }
427  }
428 
429  return false;
430 }
431 
433 {
434  {
435  std::unique_lock<std::mutex> lock(recv_queue_mutex_);
436  recv_queue_lock_.wait(lock, [this]() { return has_data_received(); });
437  }
438 
439  return receive_data(data);
440 };
void send_data(const configr_of &request)
Queues the given data to be sent to the server.
std::size_t is_read_complete(const boost::system::error_code &error, std::size_t bytes_transferred)
resolver::iterator endpoint
std::condition_variable recv_queue_lock_
std::promise< void > handshake_finished_
void handle_write(const boost::system::error_code &ec, std::size_t bytes_transferred)
wesnothd_connection_error error
std::size_t is_write_complete(const boost::system::error_code &error, std::size_t bytes_transferred)
void handle_connect(const boost::system::error_code &ec, endpoint endpoint)
bool receive_data(config &result)
Receives the next pending data pack from the server, if available.
void read_gz(config &cfg, std::istream &file, abstract_validator *validator)
Might throw a std::ios_base::failure especially a gzip_error.
Definition: parser.cpp:682
static std::string _(const char *str)
Definition: gettext.hpp:92
#define ERR_NW
#define MPTEST_LOG
#define DBG_NW
void swap(config &cfg)
Definition: config.cpp:1422
resolver::iterator results_type
data_queue< std::shared_ptr< boost::asio::streambuf > > send_queue_
void write_gz(std::ostream &out, const configr_of &cfg)
Definition: parser.cpp:782
#define WRN_NW
void handle_handshake(const boost::system::error_code &ec)
boost::asio::streambuf read_buf_
boost::system::error_code last_error_
boost::asio::io_service io_context_
logger & err()
Definition: log.cpp:76
wesnothd_connection(const wesnothd_connection &)=delete
static lg::log_domain log_network("network")
data_queue< config > recv_queue_
void handle_read(const boost::system::error_code &ec, std::size_t bytes_transferred)
Standard logging facilities (interface).
void handle_resolve(const boost::system::error_code &ec, results_type results)
bool wait_and_receive_data(config &data)
Unlike receive_data, waits until data is available instead of returning immediately.
#define e
A config object defines a single node in a WML file, with access to child nodes.
Definition: config.hpp:59
#define LOG_NW
bool empty() const
Definition: config.cpp:916
HOTKEY_COMMAND get_id(const std::string &command)
returns get_hotkey_command(command).id
void wait_for_handshake()
Waits until the server handshake is complete.