Joedb 9.1.4
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Server.h
Go to the documentation of this file.
1#ifndef joedb_Server_declared
2#define joedb_Server_declared
3
7
8#include <queue>
9#include <iosfwd>
10#include <set>
11#include <chrono>
12#include <optional>
13
14#include <asio/ip/tcp.hpp>
15#include <asio/steady_timer.hpp>
16#include <asio/signal_set.hpp>
17
18namespace joedb
19{
20 /// @ingroup concurrency
21 class Server
22 {
23 private:
24 const std::chrono::time_point<std::chrono::steady_clock> start_time;
25 Client &client;
26 Writable_Journal_Client *push_client;
27 const bool share_client;
28 std::optional<Writable_Journal_Client_Lock> client_lock;
29 asio::io_context &io_context;
30 asio::ip::tcp::acceptor acceptor;
31 const uint16_t port;
32 bool stopped;
33 asio::signal_set interrupt_signals;
34
35 int64_t session_id;
36
37 struct Session: public std::enable_shared_from_this<Session>
38 {
39 const int64_t id;
40 Server &server;
41 asio::ip::tcp::socket socket;
42 Buffer<13> buffer;
43 enum class State
44 {
45 not_locking,
46 waiting_for_push_to_pull,
47 waiting_for_lock_to_pull,
48 waiting_for_lock_to_push,
49 locking
50 };
51 State state;
52
53 size_t push_remaining_size;
54 char push_status;
55 std::optional<Async_Writer> push_writer;
56 bool unlock_after_push;
57
58 std::optional<asio::steady_timer> pull_timer;
59 bool lock_before_pulling;
60 bool send_pull_data;
61 int64_t pull_checkpoint;
62
63 std::ostream &write_id(std::ostream &out) const;
64 std::optional<Progress_Bar> progress_bar;
65
66 Session(Server &server, asio::ip::tcp::socket &&socket);
67 ~Session();
68 };
69
70 typedef void (Server::*Transfer_Handler)
71 (
72 std::shared_ptr<Session> session,
73 std::error_code error,
74 size_t bytes_transferred
75 );
76
77 void async_read
78 (
79 std::shared_ptr<Session> session,
80 size_t offset,
81 size_t size,
82 Transfer_Handler handler
83 );
84
85 std::set<Session *> sessions;
86
87 void write_status();
88
89 const std::chrono::milliseconds lock_timeout;
90 asio::steady_timer lock_timeout_timer;
91 bool locked;
92 std::queue<std::shared_ptr<Session>> lock_queue;
93 void lock_dequeue();
94 void lock(std::shared_ptr<Session> session, Session::State state);
95 void unlock(Session &session);
96
97 void lock_timeout_handler
98 (
99 std::shared_ptr<Session> session,
100 std::error_code error
101 );
102
103 void refresh_lock_timeout(std::shared_ptr<Session> session);
104
105 void push_transfer_handler
106 (
107 std::shared_ptr<Session> session,
108 std::error_code error,
109 size_t bytes_transferred
110 );
111
112 void push_transfer
113 (
114 std::shared_ptr<Session> session
115 );
116
117 void push_handler
118 (
119 std::shared_ptr<Session> session,
120 std::error_code error,
121 size_t bytes_transferred
122 );
123
124 void read_transfer_handler
125 (
126 std::shared_ptr<Session> session,
127 Async_Reader reader,
128 std::error_code error,
129 size_t bytes_transferred,
130 size_t offset
131 );
132
133 void start_reading(std::shared_ptr<Session> session, Async_Reader reader);
134
135 void start_pulling(std::shared_ptr<Session> session);
136
137 void pull_handler
138 (
139 std::shared_ptr<Session> session,
140 std::error_code error,
141 size_t bytes_transferred
142 );
143
144 void pull(std::shared_ptr<Session> session, bool lock, bool send);
145
146 void read_handler
147 (
148 std::shared_ptr<Session> session,
149 std::error_code error,
150 size_t bytes_transferred
151 );
152
153 void read_blob_handler
154 (
155 std::shared_ptr<Session> session,
156 std::error_code error,
157 size_t bytes_transferred
158 );
159
160 void check_hash_handler
161 (
162 std::shared_ptr<Session> session,
163 std::error_code error,
164 size_t bytes_transferred
165 );
166
167 void read_command_handler
168 (
169 std::shared_ptr<Session> session,
170 std::error_code error,
171 size_t bytes_transferred
172 );
173
174 void read_command(std::shared_ptr<Session> session);
175
176 void write_buffer_and_next_command
177 (
178 std::shared_ptr<Session> session,
179 size_t size
180 );
181
182 void handshake_handler
183 (
184 std::shared_ptr<Session> session,
185 std::error_code error,
186 size_t bytes_transferred
187 );
188
189 void handle_accept
190 (
191 std::error_code error,
192 asio::ip::tcp::socket socket
193 );
194
195 void start_accept();
196
197 std::ostream *log_pointer;
198
199 template<typename F> void log(F f)
200 {
201 if (log_pointer)
202 {
203 f(*log_pointer);
204 log_pointer->flush();
205 }
206 }
207
208 public:
209 Server
210 (
211 Client &client,
212 bool share_client,
213 asio::io_context &io_context,
214 uint16_t port,
215 std::chrono::milliseconds lock_timeout,
216 std::ostream *log_pointer
217 );
218
219 uint16_t get_port() const {return port;}
220
221 bool is_readonly() const
222 {
223 return client.is_readonly() || !push_client;
224 }
225
226 std::chrono::milliseconds get_time_stamp() const;
227
228 // Note: run on io_context if on another thread: io_context.post([&](){server.stop();});
229 void start();
230 void stop_after_sessions();
231 void stop();
232
233 ~Server();
234 };
235}
236
237#endif
Handle concurrent access to a file with a joedb::Connection.
Definition Client.h:12
bool is_readonly() const
Definition Client.h:91
void stop()
Definition Server.cpp:885
void start()
Definition Server.cpp:841
bool is_readonly() const
Definition Server.h:221
void stop_after_sessions()
Definition Server.cpp:876
std::chrono::milliseconds get_time_stamp() const
Definition Server.cpp:17
uint16_t get_port() const
Definition Server.h:219
Definition Blob.h:7