The Battle for Wesnoth  1.15.6+dev
concurrency.cpp
Go to the documentation of this file.
1 //
2 // M A R I A D B + +
3 //
4 // Copyright Sylvain Rochette Langlois 2013,
5 // Frantisek Boranek 2015,
6 // The ViaDuck Project 2016 - 2018.
7 // Distributed under the Boost Software License, Version 1.0.
8 // (See accompanying file LICENSE or copy at
9 // http://www.boost.org/LICENSE_1_0.txt)
10 
11 #include <mysql.h>
12 #include <mariadb++/types.hpp>
13 
14 #include <atomic>
15 #include <thread>
16 #include <mutex>
17 
18 #define LOCK_MUTEX() std::lock_guard<std::mutex> lock(g_mutex)
19 
20 #include "worker.hpp"
21 #include "private.hpp"
22 
23 using namespace mariadb;
24 using namespace mariadb::concurrency;
25 
26 namespace {
27 handle g_next_handle(0);
28 account_ref g_account;
29 std::mutex g_mutex;
30 std::vector<worker*> g_querys_in;
31 std::map<handle, worker*> g_querys_out;
32 bool g_thread_running(false);
33 
34 typedef std::map<handle, worker*> map_t;
35 
36 //
37 // Worker thread
38 //
39 void worker_thread() {
40  g_thread_running = true;
41 
42  mysql_thread_init();
43 
44  // breaking in infinite loop because checking g_querys_in would result in a race condition
45  while (true) {
46  worker* w = NULL;
47  {
48  LOCK_MUTEX();
49  auto possible_w = g_querys_in.begin();
50  if (possible_w == g_querys_in.end()) break;
51 
52  w = *possible_w;
53  g_querys_in.erase(possible_w);
54  }
55 
56  w->execute();
57 
58  if (!w->keep_handle()) delete w;
59  }
60 
61  g_thread_running = false;
62  mysql_thread_end();
63 }
64 
65 //
66 // Start thread if no thread is running
67 //
68 void start_thread() {
69  if (!g_thread_running) {
70  std::thread t(worker_thread);
71  t.detach();
72  }
73 }
74 
75 //
76 // Get worker from handle
77 //
78 const worker& get_worker(handle h) {
79  LOCK_MUTEX();
80  const map_t::const_iterator w = g_querys_out.find(h);
81 
82  if (w != g_querys_out.end()) return *w->second;
83 
84  static worker removed;
85  return removed;
86 }
87 
88 //
89 // Add / remove a new query / command to the thread
90 //
91 handle add(const std::string& query, command::type command, bool keep_handle) {
92  LOCK_MUTEX();
93  worker* w = new worker(g_account, ++g_next_handle, keep_handle, command, query);
94  g_querys_in.push_back(w);
95 
96  if (keep_handle) g_querys_out[g_next_handle] = w;
97 
98  start_thread();
99 
100  return g_next_handle;
101 }
102 
103 handle add(statement_ref& statement, command::type command, bool keep_handle) {
104  LOCK_MUTEX();
105  worker* w = new worker(g_account, ++g_next_handle, keep_handle, command, statement);
106  g_querys_in.push_back(w);
107 
108  if (keep_handle) g_querys_out[g_next_handle] = w;
109 
110  start_thread();
111 
112  return g_next_handle;
113 }
114 }
115 
116 //
117 // Set account for connection
118 //
119 void concurrency::set_account(account_ref& account) { g_account = account; }
120 
121 //
122 // Query status
123 //
125  const worker& w = get_worker(h);
126  return w.status();
127 }
128 
129 //
130 // Query executed, result ready to be used
131 //
133  const worker& w = get_worker(h);
134  return w.status() == status::removed ? 0 : w.result();
135 }
136 
138  const worker& w = get_worker(h);
139  return w.status() == status::removed ? 0 : w.result();
140 }
141 
143  const worker& w = get_worker(h);
144  return w.status() == status::removed ? result_set_ref() : w.result_set();
145 }
146 
147 //
148 // Execute a query
149 //
150 handle concurrency::execute(const std::string& query, bool keep_handle) {
151  return add(query, command::execute, keep_handle);
152 }
153 
154 handle concurrency::insert(const std::string& query, bool keep_handle) {
155  return add(query, command::insert, keep_handle);
156 }
157 
158 handle concurrency::query(const std::string& query, bool keep_handle) {
159  return add(query, command::query, keep_handle);
160 }
161 
162 //
163 // Execute a query using a statement
164 //
167  statement_ref statement = connection->create_statement(query);
168  statement->set_connection(connection);
169  return statement;
170 }
171 
173  return add(statement, command::execute, keep_handle);
174 }
175 
177  return add(statement, command::insert, keep_handle);
178 }
179 
181  return add(statement, command::query, keep_handle);
182 }
183 
184 //
185 // Remove a query
186 //
187 // Please note, if a result_set is used, it must be done after the result_set is used...
188 //
190  LOCK_MUTEX();
191  map_t::iterator w = g_querys_out.find(h);
192 
193  if (w == g_querys_out.end()) return;
194 
195  delete w->second;
196  g_querys_out.erase(w);
197 }
198 
199 bool concurrency::wait_handle(handle h, u64 wait_time_ms) {
200  while (worker_status(h) < status::succeed) {
201  std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_ms));
202  }
203 
204  return worker_status(h) == status::succeed;
205 }
void set_account(account_ref &account)
std::shared_ptr< result_set > result_set_ref
Definition: result_set.hpp:219
u64 result() const
Definition: worker.cpp:59
result_set_ref result_set() const
Definition: worker.cpp:61
std::shared_ptr< account > account_ref
Definition: account.hpp:19
#define LOCK_MUTEX()
Definition: concurrency.cpp:18
handle query(const std::string &query, bool keep_handle)
#define h
Class used to store account and connection information used by mariadb::connection when connecting...
Definition: account.hpp:46
void execute()
Definition: worker.cpp:66
handle execute(const std::string &query, bool keep_handle)
result_set_ref get_query_result(handle h)
bool keep_handle() const
Definition: worker.cpp:52
status::type status() const
Definition: worker.cpp:50
u64 handle
Definition: types.hpp:38
Class representing a prepared statement with binding functionality.
Definition: statement.hpp:39
statement_ref create_statement(const std::string &query)
static connection_ref create(const account_ref &account)
Creates a new connection using the given account.
Definition: connection.cpp:28
handle insert(const std::string &query, bool keep_handle)
Wraps a Database connection.
Definition: connection.hpp:27
static void statement(LexState *ls)
Definition: lparser.cpp:1537
bool wait_handle(handle h, u64 wait_time_ms=100)
int w
u64 get_insert_result(handle h)
double t
Definition: astarsearch.cpp:64
unsigned long long u64
Definition: types.hpp:34
std::shared_ptr< connection > connection_ref
Definition: statement.hpp:33
void set_connection(connection_ref &connection)
Set connection ref, used by concurrency.
Definition: statement.cpp:44
void release_handle(handle h)
u64 get_execute_result(handle h)
std::shared_ptr< statement > statement_ref
Definition: statement.hpp:109
std::string::const_iterator iterator
Definition: tokenizer.hpp:24
status::type worker_status(handle h)