10#define LOG(x) do {if (log) *log << x;} while (false)
11#define LOGID(x) do {if (log) *log << get_time_string_of_now() << ' ' << get_session_id() << ": " << x;} while (false)
21 LOGID(
"releasing lock... ");
30 LOG(
"The lock had timed out\n");
32 throw Exception(
"Unexpected server reply");
40 std::chrono::milliseconds wait,
46 LOGID(
"pulling(" << pull_type <<
")... ");
49 buffer.write<
char>(pull_type);
50 const int64_t client_checkpoint = client_journal
51 ? client_journal->get_checkpoint_position()
53 buffer.write<int64_t>(client_checkpoint);
54 buffer.write<int64_t>(wait.count());
55 lock.
write(buffer.data, buffer.index);
58 lock.
read(buffer.data, 9);
60 const char reply = buffer.read<
char>();
62 throw Exception(
"Server is pull-only, cannot lock");
63 else if (reply != pull_type)
64 throw Exception(
"Unexpected server reply");
66 server_checkpoint = buffer.read<int64_t>();
71 lock.
read(buffer.data, 8);
72 const int64_t size = buffer.read<int64_t>();
73 client_journal->flush();
74 const int64_t old_position = client_journal->get_position();
75 Async_Writer writer = client_journal->get_async_tail_writer();
76 download(writer, lock, size);
78 client_journal->default_checkpoint();
79 client_journal->set_position(old_position);
84 return server_checkpoint;
92 std::chrono::milliseconds wait
95 return pull(&client_journal, wait,
'P');
103 std::chrono::milliseconds wait
106 return pull(&client_journal, wait,
'L');
114 std::chrono::milliseconds wait
117 return pull(
nullptr, wait,
'i');
125 int64_t server_position,
126 int64_t until_position,
141 buffer.write<
char>(unlock_after ?
'U' :
'p');
142 buffer.write<int64_t>(server_position);
143 buffer.write<int64_t>(push_size);
145 LOGID(
"pushing(U)... position = " << server_position <<
", size = " << push_size);
148 size_t offset = buffer.index;
150 std::optional<Progress_Bar> progress_bar;
158 const size_t size = reader.
read(buffer.data + offset, buffer.size - offset);
159 lock.
write(buffer.data, size + offset);
160 written += int64_t(size + offset);
163 progress_bar->print(written);
170 lock.
read(buffer.data, 1);
172 if (buffer.data[0] ==
'C')
173 throw Exception(
"Conflict: push failed");
174 else if (buffer.data[0] ==
'R')
175 throw Exception(
"Server is pull-only: push failed");
176 else if (buffer.data[0] ==
't')
178 else if (buffer.data[0] !=
'U')
179 throw Exception(
"Unexpected server reply");
181 server_checkpoint = server_position + push_size;
182 return server_checkpoint;
190 int64_t server_checkpoint
195 LOGID(
"checking_hash... ");
198 buffer.write<
char>(
'H');
200 const int64_t checkpoint = std::min
203 client_journal.get_checkpoint_position()
206 buffer.write(checkpoint);
209 lock.
write(buffer.data, buffer.index);
210 lock.
read(buffer.data, 1);
212 const bool result = (buffer.data[0] ==
'H');
231 if (!check_matching_content(client_journal, server_checkpoint))
234 return server_checkpoint;
int64_t get_remaining() const
size_t read(char *buffer, size_t capacity)
int64_t get_position() const
char data[size+extra_size]
void write(const char *data, size_t size)
void read(char *data, size_t size)
static SHA_256::Hash get_hash(const Readonly_Journal &journal, int64_t checkpoint)
Thread_Safe_Channel channel
void unlock() override
Can be used to cancel a transaction without pushing.
int64_t pull(Writable_Journal *client_journal, std::chrono::milliseconds wait, char pull_type)
int64_t handshake(Readonly_Journal &client_journal, bool content_check) override
Called during Client construction.
int64_t push_until(Readonly_Journal &client_journal, int64_t server_position, int64_t until_position, bool unlock_after) override
Push new data to the connection.
int64_t lock_pull(Writable_Journal &client_journal, std::chrono::milliseconds wait=std::chrono::milliseconds{0}) override
Fused lock_pull, executed at the start of a write transaction.
int64_t get_checkpoint(Readonly_Journal &client_journal, std::chrono::milliseconds wait) override
Get new connection checkpoint without pulling.
bool check_matching_content(Readonly_Journal &client_journal, int64_t server_checkpoint)