Joedb 9.1.4
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Server_Connection.cpp
Go to the documentation of this file.
6
7#include <iostream>
8#include <optional>
9
10#define LOG(x) do {if (log) *log << x;} while (false)
11#define LOGID(x) do {if (log) *log << get_time_string_of_now() << ' ' << get_session_id() << ": " << x;} while (false)
12
13namespace joedb
14{
15 ////////////////////////////////////////////////////////////////////////////
17 ////////////////////////////////////////////////////////////////////////////
18 {
20
21 LOGID("releasing lock... ");
22
23 buffer.data[0] = 'u';
24 lock.write(buffer.data, 1);
25 lock.read(buffer.data, 1);
26
27 if (buffer.data[0] == 'u')
28 LOG("OK\n");
29 else if (buffer.data[0] == 't')
30 LOG("The lock had timed out\n");
31 else
32 throw Exception("Unexpected server reply");
33 }
34
35 ////////////////////////////////////////////////////////////////////////////
37 ////////////////////////////////////////////////////////////////////////////
38 (
39 Writable_Journal *client_journal,
40 std::chrono::milliseconds wait,
41 char pull_type
42 )
43 {
44 Channel_Lock lock(channel);
45
46 LOGID("pulling(" << pull_type << ")... ");
47
48 buffer.index = 0;
49 buffer.write<char>(pull_type);
50 const int64_t client_checkpoint = client_journal
51 ? client_journal->get_checkpoint_position()
52 : 0;
53 buffer.write<int64_t>(client_checkpoint);
54 buffer.write<int64_t>(wait.count());
55 lock.write(buffer.data, buffer.index);
56
57 buffer.index = 0;
58 lock.read(buffer.data, 9);
59 {
60 const char reply = buffer.read<char>();
61 if (reply == 'R')
62 throw Exception("Server is pull-only, cannot lock");
63 else if (reply != pull_type)
64 throw Exception("Unexpected server reply");
65 }
66 server_checkpoint = buffer.read<int64_t>();
67
68 if (client_journal)
69 {
70 buffer.index = 0;
71 lock.read(buffer.data, 8);
72 const int64_t size = buffer.read<int64_t>();
73 client_journal->flush(); // ??? necessary ???
74 const int64_t old_position = client_journal->get_position();
75 Async_Writer writer = client_journal->get_async_tail_writer();
76 download(writer, lock, size);
77 client_journal->set_position(writer.get_position());
78 client_journal->default_checkpoint();
79 client_journal->set_position(old_position);
80 }
81 else
82 LOG("no data\n");
83
84 return server_checkpoint;
85 }
86
87 ////////////////////////////////////////////////////////////////////////////
89 ////////////////////////////////////////////////////////////////////////////
90 (
91 Writable_Journal &client_journal,
92 std::chrono::milliseconds wait
93 )
94 {
95 return pull(&client_journal, wait, 'P');
96 }
97
98 ////////////////////////////////////////////////////////////////////////////
100 ////////////////////////////////////////////////////////////////////////////
101 (
102 Writable_Journal &client_journal,
103 std::chrono::milliseconds wait
104 )
105 {
106 return pull(&client_journal, wait, 'L');
107 }
108
109 ////////////////////////////////////////////////////////////////////////////
111 ////////////////////////////////////////////////////////////////////////////
112 (
113 Readonly_Journal &client_journal,
114 std::chrono::milliseconds wait
115 )
116 {
117 return pull(nullptr, wait, 'i');
118 }
119
120 ////////////////////////////////////////////////////////////////////////////
122 ////////////////////////////////////////////////////////////////////////////
123 (
124 Readonly_Journal &client_journal,
125 int64_t server_position,
126 int64_t until_position,
127 bool unlock_after
128 )
129 {
130 Channel_Lock lock(channel);
131
132 Async_Reader reader = client_journal.get_async_reader
133 (
134 server_position,
135 until_position
136 );
137
138 const int64_t push_size = reader.get_remaining();
139
140 buffer.index = 0;
141 buffer.write<char>(unlock_after ? 'U' : 'p');
142 buffer.write<int64_t>(server_position);
143 buffer.write<int64_t>(push_size);
144
145 LOGID("pushing(U)... position = " << server_position << ", size = " << push_size);
146
147 {
148 size_t offset = buffer.index;
149
150 std::optional<Progress_Bar> progress_bar;
151 if (reader.get_remaining() > buffer.ssize && log)
152 progress_bar.emplace(reader.get_remaining(), *log);
153
154 int64_t written = 0;
155
156 while (offset + reader.get_remaining() > 0)
157 {
158 const size_t size = reader.read(buffer.data + offset, buffer.size - offset);
159 lock.write(buffer.data, size + offset);
160 written += int64_t(size + offset);
161 offset = 0;
162 if (progress_bar)
163 progress_bar->print(written);
164 }
165
166 if (!progress_bar)
167 LOG(" done\n");
168 }
169
170 lock.read(buffer.data, 1);
171
172 if (buffer.data[0] == 'C')
173 throw Exception("Conflict: push failed");
174 else if (buffer.data[0] == 'R')
175 throw Exception("Server is pull-only: push failed");
176 else if (buffer.data[0] == 't')
177 throw Exception("Timeout: push failed");
178 else if (buffer.data[0] != 'U')
179 throw Exception("Unexpected server reply");
180
181 server_checkpoint = server_position + push_size;
182 return server_checkpoint;
183 }
184
185 ////////////////////////////////////////////////////////////////////////////
187 ////////////////////////////////////////////////////////////////////////////
188 (
189 Readonly_Journal &client_journal,
190 int64_t server_checkpoint
191 )
192 {
193 Channel_Lock lock(channel);
194
195 LOGID("checking_hash... ");
196
197 buffer.index = 0;
198 buffer.write<char>('H');
199
200 const int64_t checkpoint = std::min
201 (
202 server_checkpoint,
203 client_journal.get_checkpoint_position()
204 );
205
206 buffer.write(checkpoint);
207 buffer.write(Journal_Hasher::get_hash(client_journal, checkpoint));
208
209 lock.write(buffer.data, buffer.index);
210 lock.read(buffer.data, 1);
211
212 const bool result = (buffer.data[0] == 'H');
213
214 if (result)
215 LOG("OK\n");
216 else
217 LOG("Error\n");
218
219 return result;
220 }
221
222 ////////////////////////////////////////////////////////////////////////////
224 ////////////////////////////////////////////////////////////////////////////
225 (
226 Readonly_Journal &client_journal,
227 bool content_check
228 )
229 {
230 if (content_check)
231 if (!check_matching_content(client_journal, server_checkpoint))
232 content_mismatch();
233
234 return server_checkpoint;
235 }
236}
237
238#undef LOGID
239#undef LOG
#define LOG(x)
Definition Server.cpp:8
#define LOGID(x)
Definition Server.cpp:9
int64_t get_remaining() const
size_t read(char *buffer, size_t capacity)
int64_t get_position() const
char data[size+extra_size]
Definition Buffer.h:20
void write(const char *data, size_t size)
void read(char *data, size_t size)
static SHA_256::Hash get_hash(const Readonly_Journal &journal, int64_t checkpoint)
Thread_Safe_Channel channel
void unlock() override
Can be used to cancel a transaction without pushing.
int64_t pull(Writable_Journal *client_journal, std::chrono::milliseconds wait, char pull_type)
int64_t handshake(Readonly_Journal &client_journal, bool content_check) override
Called during Client construction.
int64_t push_until(Readonly_Journal &client_journal, int64_t server_position, int64_t until_position, bool unlock_after) override
Push new data to the connection.
int64_t lock_pull(Writable_Journal &client_journal, std::chrono::milliseconds wait=std::chrono::milliseconds{0}) override
Fused lock_pull, executed at the start of a write transaction.
int64_t get_checkpoint(Readonly_Journal &client_journal, std::chrono::milliseconds wait) override
Get new connection checkpoint without pulling.
bool check_matching_content(Readonly_Journal &client_journal, int64_t server_checkpoint)
Definition Blob.h:7