Joedb 9.1.4
The Journal-Only Embedded Database
Loading...
Searching...
No Matches
Client_Command_Processor.cpp
Go to the documentation of this file.
10#include "joedb/Signal.h"
11
12#include <thread>
13#include <chrono>
14#include <cmath>
15
16namespace joedb
17{
18 ////////////////////////////////////////////////////////////////////////////
19 void Client_Command_Processor::write_prompt(std::ostream &out) const
20 ////////////////////////////////////////////////////////////////////////////
21 {
22 out << "joedb_client(";
23
24 const int64_t client_checkpoint = client.get_checkpoint();
25 const int64_t server_checkpoint = client.get_server_checkpoint();
26
27 out << client_checkpoint;
28 if (client_checkpoint < server_checkpoint)
29 out << '+' << server_checkpoint - client_checkpoint << ")(pull to sync";
30 else if (server_checkpoint < client_checkpoint)
31 out << '-' << client_checkpoint - server_checkpoint << ")(push to sync";
32
33 out << ')';
34
35 if (client.is_readonly())
36 out << "(readonly)";
37 }
38
39 ////////////////////////////////////////////////////////////////////////////
40 void Client_Command_Processor::pull
41 ////////////////////////////////////////////////////////////////////////////
42 (
43 std::ostream &out,
44 std::chrono::milliseconds wait
45 )
46 {
47 const int64_t byte_count = client.pull(wait);
48 if (byte_count > 0)
49 out << "pulled " << byte_count << " bytes\n";
50 }
51
52 ////////////////////////////////////////////////////////////////////////////
53 void Client_Command_Processor::sleep(int seconds, std::ostream &out)
54 ////////////////////////////////////////////////////////////////////////////
55 {
57 out << ". Sleeping for " << seconds << " seconds...\n";
58 out.flush();
59 for (int i = seconds; Signal::get_signal() != SIGINT && --i >= 0;)
60 std::this_thread::sleep_for(std::chrono::seconds(1));
61 }
62
63 ////////////////////////////////////////////////////////////////////////////
64 Command_Processor::Status Client_Command_Processor::process_command
65 ////////////////////////////////////////////////////////////////////////////
66 (
67 const std::string &command,
68 std::istream &parameters,
69 std::istream &in,
70 std::ostream &out
71 )
72 {
73 if (command == "help") ////////////////////////////////////////////////////
74 {
75 Command_Interpreter::process_command(command, parameters, in, out);
76
77 out << R"RRR(Client
78~~~~~~
79 pull [<wait_seconds>]
80 pull_every [<wait_seconds>] [<sleep_seconds>]
81 db
82 push
83 transaction
84
85)RRR";
86
87 return Status::ok;
88 }
89 else if (command == "db") /////////////////////////////////////////////////
90 {
91 const Database *database = nullptr;
92
93 {
94 auto * const rdc = dynamic_cast<Readonly_Database_Client *>(&client);
95 auto * const wdc = dynamic_cast<Writable_Database_Client *>(&client);
96
97 if (rdc)
98 database = &rdc->get_database();
99 else if (wdc)
100 database = &wdc->get_database();
101 }
102
103 if (database)
104 {
105 Readable_Interpreter interpreter(*database, &client.get_journal().get_file());
106 interpreter.set_parent(this);
107 interpreter.main_loop(in, out);
108 }
109 else
110 {
111 Command_Interpreter interpreter;
112 Blob_Reader_Command_Processor processor(client.get_journal().get_file());
113 interpreter.add_processor(processor);
114 interpreter.set_parent(this);
115 interpreter.main_loop(in, out);
116 }
117 }
118 else if (command == "push") ///////////////////////////////////////////////
119 {
120 client.push_unlock();
121 }
122 else if (command == "pull") ///////////////////////////////////////////////
123 {
124 float wait_seconds = 0;
125 parameters >> wait_seconds;
126 pull(out, std::chrono::milliseconds(std::lround(wait_seconds * 1000)));
127 }
128 else if (command == "pull_every") /////////////////////////////////////////
129 {
130 float wait_seconds = 1;
131 int sleep_seconds = 1;
132 parameters >> wait_seconds >> sleep_seconds;
133
136
137 while (Signal::get_signal() != SIGINT)
138 {
139 pull(out, std::chrono::milliseconds(std::lround(wait_seconds * 1000)));
140 sleep(sleep_seconds, out);
141 }
142 }
143 else if (command == "transaction") ////////////////////////////////////////
144 {
145 auto * const wdc = dynamic_cast<Writable_Database_Client *>(&client);
146 auto * const wjc = dynamic_cast<Writable_Journal_Client *>(&client);
147
148 if (wdc)
149 {
150 wdc->transaction([&](const Readable &readable, Writable &writable)
151 {
152 Interpreter interpreter
153 (
154 readable,
155 writable,
156 &client.get_journal().get_file(),
157 writable,
158 0
159 );
160 interpreter.set_parent(this);
161 interpreter.main_loop(in, out);
162 });
163 }
164 else if (wjc)
165 {
166 wjc->transaction([&](Writable_Journal &journal)
167 {
168 Writable_Interpreter interpreter(journal, journal);
169 Blob_Reader_Command_Processor processor(journal.get_file());
170 interpreter.add_processor(processor);
171 interpreter.set_parent(this);
172 interpreter.main_loop(in, out);
173 });
174 }
175 else
176 out << "Client is not writable, cannot run transaction\n";
177 }
178 else //////////////////////////////////////////////////////////////////////
179 return Command_Interpreter::process_command(command, parameters, in, out);
180
181 return Status::done;
182 }
183}
Status process_command(const std::string &command, std::istream &parameters, std::istream &in, std::ostream &out) override
static int get_signal()
Definition Signal.cpp:27
static void start()
Definition Signal.cpp:34
static constexpr int no_signal
Definition Signal.h:15
static void set_signal(int status)
Definition Signal.cpp:20
std::string get_time_string_of_now()
Definition Blob.h:7