8#define LOG(x) log([&](std::ostream &out){out << x;})
9#define LOGID(x) log([&](std::ostream &out){session->write_id(out) << x;})
11#include <asio/read.hpp>
12#include <asio/write.hpp>
20 return std::chrono::duration_cast<std::chrono::milliseconds>
22 std::chrono::steady_clock::now() - start_time
27 std::ostream &Server::Session::write_id(std::ostream &out)
const
31 out << server.get_time_stamp().count() <<
' ';
34 out << server.port <<
'(' <<
id <<
"): ";
40 Server::Session::Session(Server &server, asio::ip::tcp::socket &&socket):
42 id(++server.session_id),
44 socket(std::move(socket)),
45 state(State::not_locking)
47 server.log([
this](std::ostream &out)
49 write_id(out) <<
"created (remote endpoint: ";
50 out << this->socket.remote_endpoint() <<
")\n";
52 server.sessions.insert(
this);
53 server.write_status();
57 Server::Session::~Session()
62 server.sessions.erase(
this);
64 if (state == State::locking)
66 server.log([
this](std::ostream &out)
68 write_id(out) <<
"removing lock held by dying session.\n";
74 server.log([
this](std::ostream &out)
76 write_id(out) <<
"deleted\n";
79 server.write_status();
89 void Server::async_read
92 std::shared_ptr<Session> session,
95 Transfer_Handler handler
101 asio::buffer(session->buffer.data + offset, size),
102 [
this, handler, session](std::error_code e,
size_t s)
104 (this->*handler)(session, e, s);
110 void Server::write_status()
113 log([
this](std::ostream &out)
116 out <<
": pid = " << joedb::get_pid();
118 out <<
"; sessions = " << sessions.size();
119 out <<
"; checkpoint = ";
125 void Server::lock_dequeue()
128 if (!locked && !lock_queue.empty())
131 const std::shared_ptr<Session> session = lock_queue.front();
139 LOGID(
"Error: locking pull-only server\n");
140 session->buffer.data[0] =
'R';
145 client_lock.emplace(*push_client);
149 if (session->state == Session::State::waiting_for_lock_to_pull)
150 start_pulling(session);
152 session->state = Session::State::locking;
153 refresh_lock_timeout(session);
161 const std::shared_ptr<Session> session,
162 const Session::State state
165 if (session->state != Session::State::locking)
167 session->state = state;
168 lock_queue.emplace(session);
172 LOGID(
"Error: locking an already locked session\n");
176 void Server::unlock(Session &session)
179 if (session.state == Session::State::locking)
181 log([&session](std::ostream &out)
183 session.write_id(out) <<
"unlocking\n";
185 session.state = Session::State::not_locking;
187 lock_timeout_timer.cancel();
194 void Server::lock_timeout_handler
197 const std::shared_ptr<Session> session,
198 const std::error_code error
205 if (session->push_writer)
207 session->push_writer.reset();
208 session->push_status =
't';
216 void Server::refresh_lock_timeout(
const std::shared_ptr<Session> session)
219 if (lock_timeout.count() > 0 && session->state == Session::State::locking)
221 lock_timeout_timer.expires_after(lock_timeout);
222 lock_timeout_timer.async_wait
224 [
this, session](std::error_code e){lock_timeout_handler(session, e);}
230 void Server::push_transfer_handler
233 const std::shared_ptr<Session> session,
234 const std::error_code error,
235 const size_t bytes_transferred
240 if (session->push_writer)
241 session->push_writer->write(session->buffer.data, bytes_transferred);
243 session->push_remaining_size -= bytes_transferred;
245 if (session->progress_bar)
247 session->progress_bar->print_remaining
249 int64_t(session->push_remaining_size)
253 push_transfer(session);
258 void Server::push_transfer(
const std::shared_ptr<Session> session)
261 if (session->push_remaining_size > 0)
263 refresh_lock_timeout(session);
269 std::min(session->push_remaining_size, session->buffer.size),
270 &Server::push_transfer_handler
275 if (session->push_writer)
279 client_lock->get_journal().set_position
281 session->push_writer->get_position()
283 client_lock->get_journal().default_checkpoint();
286 if (share_client && session->unlock_after_push)
288 client_lock->push_unlock();
295 session->push_writer.reset();
298 session->buffer.data[0] = session->push_status;
300 if (session->progress_bar)
301 session->progress_bar.reset();
305 LOG(
"Returning '" << session->push_status <<
"'\n");
307 write_buffer_and_next_command(session, 1);
309 if (session->unlock_after_push)
312 for (
auto *other_session: sessions)
316 other_session->state == Session::State::waiting_for_push_to_pull &&
320 other_session->state = Session::State::not_locking;
321 start_pulling(other_session->shared_from_this());
328 void Server::push_handler
331 const std::shared_ptr<Session> session,
332 const std::error_code error,
333 const size_t bytes_transferred
338 session->buffer.index = 0;
339 const int64_t start = session->buffer.read<int64_t>();
340 const int64_t size = session->buffer.read<int64_t>();
342 if (locked && session->state != Session::State::locking)
344 LOGID(
"trying to push while someone else is locking\n");
348 LOGID(
"Taking the lock for push attempt.\n");
349 lock(session, Session::State::waiting_for_lock_to_push);
352 const bool conflict = (size != 0) &&
354 session->state != Session::State::locking ||
355 start != client.get_journal().get_checkpoint_position()
358 LOGID(
"pushing, start = " << start <<
", size = " << size);
360 if (log_pointer && size > session->buffer.ssize)
361 session->progress_bar.emplace(size, *log_pointer);
364 session->push_status =
'R';
366 session->push_status =
'C';
369 session->push_status =
'U';
372 session->push_writer.emplace
374 client_lock->get_journal().get_async_tail_writer()
379 session->push_remaining_size = size_t(size);
381 push_transfer(session);
386 void Server::read_transfer_handler
389 const std::shared_ptr<Session> session,
391 const std::error_code error,
392 const size_t bytes_transferred,
398 if (session->progress_bar)
399 session->progress_bar->print_remaining(reader.get_remaining());
401 if (offset + reader.get_remaining() > 0)
403 const size_t size = reader.read
405 session->buffer.data + offset,
406 session->buffer.size - offset
409 if (reader.is_end_of_file())
410 LOG(
"error: unexpected end of file\n");
413 refresh_lock_timeout(session);
418 asio::buffer(session->buffer.data, size + offset),
419 [
this, session, reader](std::error_code e,
size_t s)
421 read_transfer_handler(session, reader, e, s, 0);
428 session->pull_timer.reset();
429 if (session->progress_bar)
430 session->progress_bar.reset();
433 read_command(session);
439 void Server::start_reading
442 std::shared_ptr<Session> session,
446 session->buffer.write<int64_t>(reader.get_remaining());
448 LOGID(
"reading from = " << reader.get_current() <<
", size = "
449 << reader.get_remaining() <<
':');
451 if (log_pointer && reader.get_remaining() > session->buffer.ssize)
452 session->progress_bar.emplace(reader.get_remaining(), *log_pointer);
454 read_transfer_handler
460 session->buffer.index
465 void Server::start_pulling(std::shared_ptr<Session> session)
470 session->lock_before_pulling &&
471 session->state != Session::State::waiting_for_lock_to_pull
474 lock(session, Session::State::waiting_for_lock_to_pull);
478 session->buffer.index = 1;
481 if (session->send_pull_data)
490 write_buffer_and_next_command(session, session->buffer.index);
494 void Server::pull_handler
497 const std::shared_ptr<Session> session,
498 const std::error_code error,
499 const size_t bytes_transferred
504 session->buffer.index = 1;
505 session->pull_checkpoint = session->buffer.read<int64_t>();
506 const std::chrono::milliseconds wait{session->buffer.read<int64_t>()};
511 if (wait.count() > 0 && session->pull_checkpoint == client.get_checkpoint())
515 "waiting at checkpoint = " << session->pull_checkpoint <<
516 " for " << wait.count() <<
" milliseconds\n"
519 session->state = Session::State::waiting_for_push_to_pull;
520 session->pull_timer.emplace(io_context);
521 session->pull_timer->expires_after(wait);
522 session->pull_timer->async_wait
524 [
this, session](std::error_code timer_error)
528 if (session->state == Session::State::waiting_for_push_to_pull)
530 session->state = Session::State::not_locking;
531 start_pulling(session);
538 start_pulling(session);
546 const std::shared_ptr<Session> session,
551 session->lock_before_pulling = lock;
552 session->send_pull_data = send;
553 async_read(session, 1, 16, &Server::pull_handler);
557 void Server::read_handler
560 std::shared_ptr<Session> session,
561 std::error_code error,
562 size_t bytes_transferred
567 session->buffer.index = 1;
568 int64_t offset = session->buffer.read<int64_t>();
569 int64_t size = session->buffer.read<int64_t>();
570 int64_t until = offset + size;
572 if (until > client.get_checkpoint())
573 until = client.get_checkpoint();
577 const Async_Reader reader = client.get_journal().get_async_reader
583 session->buffer.index = 1;
584 start_reading(session, reader);
589 void Server::read_blob_handler
592 std::shared_ptr<Session> session,
593 std::error_code error,
594 size_t bytes_transferred
599 session->buffer.index = 1;
600 const int64_t blob_position = session->buffer.read<int64_t>();
601 const Async_Reader reader = client.get_journal().get_async_blob_reader
606 session->buffer.index = 1;
607 start_reading(session, reader);
612 void Server::check_hash_handler
615 const std::shared_ptr<Session> session,
616 const std::error_code error,
617 const size_t bytes_transferred
622 session->buffer.index = 1;
623 const auto checkpoint = session->buffer.read<int64_t>();
626 const Readonly_Journal &readonly_journal = client.get_journal();
630 checkpoint > readonly_journal.get_checkpoint_position() ||
634 session->buffer.data[0] =
'h';
637 LOGID(
"hash for checkpoint = " << checkpoint <<
", result = "
638 << session->buffer.data[0] <<
'\n');
640 write_buffer_and_next_command(session, 1);
645 void Server::read_command_handler
648 const std::shared_ptr<Session> session,
649 const std::error_code error,
650 const size_t bytes_transferred
655 LOGID(session->buffer.data[0] <<
'\n');
657 switch (session->buffer.data[0])
660 pull(session,
false,
true);
664 pull(session,
true,
true);
668 pull(session,
false,
false);
672 pull(session,
true,
false);
676 session->unlock_after_push = (session->buffer.data[0] ==
'U');
677 async_read(session, 0, 16, &Server::push_handler);
681 if (session->state == Session::State::locking)
684 session->buffer.data[0] =
't';
685 write_buffer_and_next_command(session, 1);
689 async_read(session, 1, 40, &Server::check_hash_handler);
693 async_read(session, 1, 16, &Server::read_handler);
697 async_read(session, 1, 8, &Server::read_blob_handler);
701 LOGID(
"unexpected command\n");
708 void Server::read_command(
const std::shared_ptr<Session> session)
711 async_read(session, 0, 1, &Server::read_command_handler);
715 void Server::write_buffer_and_next_command
718 const std::shared_ptr<Session> session,
725 asio::buffer(session->buffer.data, size),
726 [
this, session](std::error_code e,
size_t s)
729 read_command(session);
735 void Server::handshake_handler
738 const std::shared_ptr<Session> session,
739 const std::error_code error,
740 const size_t bytes_transferred
745 session->buffer.index = 0;
749 session->buffer.read<
char>() ==
'j' &&
750 session->buffer.read<
char>() ==
'o' &&
751 session->buffer.read<
char>() ==
'e' &&
752 session->buffer.read<
char>() ==
'd' &&
753 session->buffer.read<
char>() ==
'b'
756 const int64_t client_version = session->buffer.read<int64_t>();
757 LOGID(
"client_version = " << client_version <<
'\n');
759 session->buffer.index = 5;
761 session->buffer.write<int64_t>(session->id);
762 session->buffer.write<int64_t>(client.get_checkpoint());
763 session->buffer.write<
char>(is_readonly() ?
'R' :
'W');
765 write_buffer_and_next_command(session, session->buffer.index);
769 LOGID(
"bad handshake\n");
774 void Server::handle_accept
777 const std::error_code error,
778 asio::ip::tcp::socket socket
781 if (!error && !stopped)
783 socket.set_option(asio::ip::tcp::no_delay(
true));
784 std::shared_ptr<Session> session(
new Session(*
this, std::move(socket)));
785 async_read(session, 0, 13, &Server::handshake_handler);
792 void Server::start_accept()
797 acceptor.async_accept
800 [
this](std::error_code error, asio::ip::tcp::socket socket)
802 handle_accept(error, std::move(socket));
813 const bool share_client,
814 asio::io_context &io_context,
816 const std::chrono::milliseconds lock_timeout,
817 std::ostream *
const log_pointer
819 start_time(std::chrono::steady_clock::now()),
822 share_client(share_client),
823 io_context(io_context),
824 acceptor(io_context, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)),
825 port(acceptor.local_endpoint().port()),
827 interrupt_signals(io_context, SIGINT, SIGTERM),
829 lock_timeout(lock_timeout),
830 lock_timeout_timer(io_context),
832 log_pointer(log_pointer)
851 client_lock.emplace(*push_client);
856 interrupt_signals.async_wait([
this](
const asio::error_code &error,
int)
868 ": start. lock_timeout = " << lock_timeout.count() <<
880 interrupt_signals.cancel();
890 LOG(port <<
": stop\n");
892 for (Session *session: sessions)
894 session->socket.close();
895 session->pull_timer.reset();
900 client_lock->unlock();
914 if (!sessions.empty())
916 LOG(
"Bug: destroying server before sessions.\n");
Handle concurrent access to a file with a joedb::Connection.
int64_t get_checkpoint() const
int64_t pull(std::chrono::milliseconds wait=std::chrono::milliseconds(0))
const Readonly_Journal & get_journal() const
static SHA_256::Hash get_hash(const Readonly_Journal &journal, int64_t checkpoint)
Async_Reader get_async_tail_reader(int64_t start_position) const
std::array< uint32_t, 8 > Hash
void stop_after_sessions()
std::chrono::milliseconds get_time_stamp() const
Server(Client &client, bool share_client, asio::io_context &io_context, uint16_t port, std::chrono::milliseconds lock_timeout, std::ostream *log_pointer)
constexpr int protocol_version
std::string get_time_string_of_now()