Joedb 9.1.4
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Server_Client.cpp
Go to the documentation of this file.
5
6#include <iostream>
7#include <optional>
8
9#define LOG(x) do {if (log) *log << x;} while (false)
10
11namespace joedb
12{
13 ////////////////////////////////////////////////////////////////////////////
14 void Server_Client::ping(Channel_Lock &lock)
15 ////////////////////////////////////////////////////////////////////////////
16 {
17 buffer.index = 0;
18 buffer.write<char>('i');
19 buffer.write<int64_t>(0);
20 buffer.write<int64_t>(0);
21 lock.write(buffer.data, buffer.index);
22 lock.read(buffer.data, 9);
23 }
24
25 ////////////////////////////////////////////////////////////////////////////
27 ////////////////////////////////////////////////////////////////////////////
28 {
30 ping(lock);
31 }
32
33 ////////////////////////////////////////////////////////////////////////////
34 void Server_Client::keep_alive()
35 ////////////////////////////////////////////////////////////////////////////
36 {
37 try
38 {
40
41 while (!keep_alive_thread_must_stop)
42 {
43 condition.wait_for(lock, keep_alive_interval);
44
45 if (keep_alive_thread_must_stop)
46 break;
47
48 ping(lock);
49 }
50 }
51 catch(...)
52 {
53 }
54 }
55
56 ////////////////////////////////////////////////////////////////////////////
57 void Server_Client::connect()
58 ////////////////////////////////////////////////////////////////////////////
59 {
60 LOG("Connecting... ");
61
62 buffer.index = 0;
63 buffer.write<char>('j');
64 buffer.write<char>('o');
65 buffer.write<char>('e');
66 buffer.write<char>('d');
67 buffer.write<char>('b');
69
70 {
71 Channel_Lock lock(channel);
72 lock.write(buffer.data, buffer.index);
73 LOG("Waiting for \"joedb\"... ");
74 lock.read(buffer.data, 5 + 8 + 8 + 8 + 1);
75 }
76
77 buffer.index = 0;
78
79 if
80 (
81 buffer.read<char>() != 'j' ||
82 buffer.read<char>() != 'o' ||
83 buffer.read<char>() != 'e' ||
84 buffer.read<char>() != 'd' ||
85 buffer.read<char>() != 'b'
86 )
87 {
88 throw Exception("Did not receive \"joedb\" from server");
89 }
90
91 const int64_t server_version = buffer.read<int64_t>();
92
93 if (server_version == 0)
94 throw Exception("Client version rejected by server");
95
96 LOG("server_version = " << server_version << ". ");
97
98 if (server_version < protocol_version)
99 throw Exception("Unsupported server version");
100
101 session_id = buffer.read<int64_t>();
102 server_checkpoint = buffer.read<int64_t>();
103 const char mode = buffer.read<char>();
104
105 if (mode == 'R')
106 pullonly_server = true;
107 else if (mode == 'W')
108 pullonly_server = false;
109 else
110 throw Exception("Unexpected server mode");
111
112 LOG
113 (
114 "session_id = " << session_id <<
115 "; server_checkpoint = " << server_checkpoint <<
116 "; mode = " << mode <<
117 ". OK.\n"
118 );
119
120 if (keep_alive_interval.count() > 0)
121 {
122 keep_alive_thread_must_stop = false;
123 keep_alive_thread = std::thread([this](){keep_alive();});
124 }
125 }
126
127 ////////////////////////////////////////////////////////////////////////////
129 ////////////////////////////////////////////////////////////////////////////
130 (
131 Async_Writer &writer,
132 Channel_Lock &lock,
133 int64_t size
134 ) const
135 {
136 LOG("downloading, size = " << size);
137
138 std::optional<Progress_Bar> progress_bar;
139 if (size > buffer.ssize && log)
140 progress_bar.emplace(size, *log);
141
142 for (int64_t read = 0; read < size;)
143 {
144 const int64_t remaining = size - read;
145 const size_t read_size = size_t
146 (
147 std::min(int64_t(buffer.size), remaining)
148 );
149 const size_t n = lock.read_some(buffer.data, read_size);
150 writer.write(buffer.data, n);
151 read += int64_t(n);
152 if (progress_bar)
153 progress_bar->print(read);
154 }
155
156 if (!progress_bar)
157 LOG(" OK\n");
158 }
159
160 ////////////////////////////////////////////////////////////////////////////
162 ////////////////////////////////////////////////////////////////////////////
163 keep_alive_interval(std::chrono::seconds{240}),
164 channel(channel),
165 log(nullptr),
166 session_id(-1),
167 pullonly_server(false)
168 {
169 connect();
170 }
171
172 ////////////////////////////////////////////////////////////////////////////
173 void Server_Client::disconnect()
174 ////////////////////////////////////////////////////////////////////////////
175 {
176 {
177 Channel_Lock lock(channel);
178 keep_alive_thread_must_stop = true;
179 }
180
181 condition.notify_one();
182 if (keep_alive_thread.joinable())
183 keep_alive_thread.join();
184 }
185
186 ////////////////////////////////////////////////////////////////////////////
188 ////////////////////////////////////////////////////////////////////////////
189 {
190 try { disconnect(); } catch (...) {}
191 }
192}
193
194#undef LOG
#define LOG(x)
Definition Server.cpp:8
size_t index
Definition Buffer.h:21
void write(T x)
Definition Buffer.h:24
char data[size+extra_size]
Definition Buffer.h:20
void download(Async_Writer &writer, Channel_Lock &lock, int64_t size) const
Thread_Safe_Channel channel
Server_Client(Channel &channel)
constexpr int protocol_version
Definition Blob.h:7