19 void Client_Command_Processor::write_prompt(std::ostream &out)
const
22 out <<
"joedb_client(";
24 const int64_t client_checkpoint = client.get_checkpoint();
25 const int64_t server_checkpoint = client.get_server_checkpoint();
27 out << client_checkpoint;
28 if (client_checkpoint < server_checkpoint)
29 out <<
'+' << server_checkpoint - client_checkpoint <<
")(pull to sync";
30 else if (server_checkpoint < client_checkpoint)
31 out <<
'-' << client_checkpoint - server_checkpoint <<
")(push to sync";
35 if (client.is_readonly())
40 void Client_Command_Processor::pull
44 std::chrono::milliseconds wait
47 const int64_t byte_count = client.pull(wait);
49 out <<
"pulled " << byte_count <<
" bytes\n";
53 void Client_Command_Processor::sleep(
int seconds, std::ostream &out)
57 out <<
". Sleeping for " << seconds <<
" seconds...\n";
60 std::this_thread::sleep_for(std::chrono::seconds(1));
67 const std::string &command,
68 std::istream ¶meters,
73 if (command ==
"help")
80 pull_every [<wait_seconds>] [<sleep_seconds>]
89 else if (command ==
"db")
91 const Database *database =
nullptr;
94 auto *
const rdc =
dynamic_cast<Readonly_Database_Client *
>(&client);
95 auto *
const wdc =
dynamic_cast<Writable_Database_Client *
>(&client);
98 database = &rdc->get_database();
100 database = &wdc->get_database();
105 Readable_Interpreter interpreter(*database, &client.get_journal().get_file());
106 interpreter.set_parent(
this);
107 interpreter.main_loop(in, out);
111 Command_Interpreter interpreter;
112 Blob_Reader_Command_Processor processor(client.get_journal().get_file());
113 interpreter.add_processor(processor);
114 interpreter.set_parent(
this);
115 interpreter.main_loop(in, out);
118 else if (command ==
"push")
120 client.push_unlock();
122 else if (command ==
"pull")
124 float wait_seconds = 0;
125 parameters >> wait_seconds;
126 pull(out, std::chrono::milliseconds(std::lround(wait_seconds * 1000)));
128 else if (command ==
"pull_every")
130 float wait_seconds = 1;
131 int sleep_seconds = 1;
132 parameters >> wait_seconds >> sleep_seconds;
139 pull(out, std::chrono::milliseconds(std::lround(wait_seconds * 1000)));
140 sleep(sleep_seconds, out);
143 else if (command ==
"transaction")
145 auto *
const wdc =
dynamic_cast<Writable_Database_Client *
>(&client);
146 auto *
const wjc =
dynamic_cast<Writable_Journal_Client *
>(&client);
150 wdc->transaction([&](
const Readable &readable, Writable &writable)
152 Interpreter interpreter
156 &client.get_journal().get_file(),
160 interpreter.set_parent(
this);
161 interpreter.main_loop(in, out);
166 wjc->transaction([&](Writable_Journal &journal)
168 Writable_Interpreter interpreter(journal, journal);
169 Blob_Reader_Command_Processor processor(journal.get_file());
170 interpreter.add_processor(processor);
171 interpreter.set_parent(
this);
172 interpreter.main_loop(in, out);
176 out <<
"Client is not writable, cannot run transaction\n";
Status process_command(const std::string &command, std::istream ¶meters, std::istream &in, std::ostream &out) override
static constexpr int no_signal
static void set_signal(int status)
std::string get_time_string_of_now()