1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
| // 支持多种操作的键值存储状态机
class KVStateMachine : public braft::StateMachine {
public:
enum Operation {
OP_PUT = 1,
OP_DELETE = 2,
OP_CAS = 3 // Compare-And-Swap
};
struct LogEntry {
Operation op;
std::string key;
std::string value;
std::string expected_value; // for CAS
std::string serialize() const {
butil::IOBuf buf;
buf.append(reinterpret_cast<const char*>(&op), sizeof(op));
uint32_t key_len = key.length();
buf.append(reinterpret_cast<const char*>(&key_len), sizeof(key_len));
buf.append(key);
uint32_t value_len = value.length();
buf.append(reinterpret_cast<const char*>(&value_len), sizeof(value_len));
buf.append(value);
if (op == OP_CAS) {
uint32_t expected_len = expected_value.length();
buf.append(reinterpret_cast<const char*>(&expected_len), sizeof(expected_len));
buf.append(expected_value);
}
return buf.to_string();
}
static LogEntry deserialize(const std::string& data) {
LogEntry entry;
const char* ptr = data.c_str();
size_t offset = 0;
// 读取操作类型
entry.op = *reinterpret_cast<const Operation*>(ptr + offset);
offset += sizeof(Operation);
// 读取 key
uint32_t key_len = *reinterpret_cast<const uint32_t*>(ptr + offset);
offset += sizeof(uint32_t);
entry.key = std::string(ptr + offset, key_len);
offset += key_len;
// 读取 value
uint32_t value_len = *reinterpret_cast<const uint32_t*>(ptr + offset);
offset += sizeof(uint32_t);
entry.value = std::string(ptr + offset, value_len);
offset += value_len;
// 如果是 CAS 操作,读取期望值
if (entry.op == OP_CAS && offset < data.length()) {
uint32_t expected_len = *reinterpret_cast<const uint32_t*>(ptr + offset);
offset += sizeof(uint32_t);
entry.expected_value = std::string(ptr + offset, expected_len);
}
return entry;
}
};
private:
std::unordered_map<std::string, std::string> _kv_store;
mutable bthread::Mutex _mutex;
std::atomic<int64_t> _applied_index{0};
public:
void on_apply(braft::Iterator& iter) override {
for (; iter.valid(); iter.next()) {
braft::AsyncClosureGuard closure_guard(iter.done());
_applied_index.store(iter.index());
LogEntry entry = LogEntry::deserialize(iter.data().to_string());
bool success = false;
std::string old_value;
{
std::unique_lock<bthread::Mutex> lock(_mutex);
switch (entry.op) {
case OP_PUT:
old_value = _kv_store[entry.key];
_kv_store[entry.key] = entry.value;
success = true;
break;
case OP_DELETE:
if (_kv_store.count(entry.key)) {
old_value = _kv_store[entry.key];
_kv_store.erase(entry.key);
success = true;
}
break;
case OP_CAS:
if (_kv_store.count(entry.key) &&
_kv_store[entry.key] == entry.expected_value) {
old_value = _kv_store[entry.key];
_kv_store[entry.key] = entry.value;
success = true;
}
break;
}
}
LOG(INFO) << "Applied " << entry.key << ": " << old_value
<< " -> " << entry.value << ", success: " << success;
// 返回结果给客户端
if (iter.done()) {
KVClosure* kv_closure = dynamic_cast<KVClosure*>(iter.done());
if (kv_closure) {
kv_closure->set_result(success, old_value);
}
}
}
}
void on_snapshot_save(braft::SnapshotWriter* writer, braft::Closure* done) override {
brpc::ClosureGuard done_guard(done);
std::string snapshot_path = writer->get_path() + "/kv_data";
std::ofstream file(snapshot_path, std::ios::binary);
if (!file.is_open()) {
LOG(ERROR) << "Failed to open snapshot file";
done->status().set_error(EIO, "Failed to save snapshot");
return;
}
{
std::unique_lock<bthread::Mutex> lock(_mutex);
// 保存当前应用的索引
file.write(reinterpret_cast<const char*>(&_applied_index), sizeof(_applied_index));
// 保存键值对数量
uint64_t count = _kv_store.size();
file.write(reinterpret_cast<const char*>(&count), sizeof(count));
// 保存所有键值对
for (const auto& pair : _kv_store) {
uint32_t key_len = pair.first.length();
file.write(reinterpret_cast<const char*>(&key_len), sizeof(key_len));
file.write(pair.first.c_str(), key_len);
uint32_t value_len = pair.second.length();
file.write(reinterpret_cast<const char*>(&value_len), sizeof(value_len));
file.write(pair.second.c_str(), value_len);
}
}
file.close();
if (writer->add_file("kv_data") != 0) {
LOG(ERROR) << "Failed to add file to snapshot";
done->status().set_error(EIO, "Failed to add file");
return;
}
LOG(INFO) << "Saved snapshot with " << _kv_store.size()
<< " keys at index " << _applied_index.load();
}
int on_snapshot_load(braft::SnapshotReader* reader) override {
if (!reader->list_files().count("kv_data")) {
LOG(ERROR) << "kv_data not found in snapshot";
return -1;
}
std::string snapshot_path = reader->get_path() + "/kv_data";
std::ifstream file(snapshot_path, std::ios::binary);
if (!file.is_open()) {
LOG(ERROR) << "Failed to open snapshot file";
return -1;
}
{
std::unique_lock<bthread::Mutex> lock(_mutex);
_kv_store.clear();
// 读取应用索引
int64_t applied_index;
file.read(reinterpret_cast<char*>(&applied_index), sizeof(applied_index));
_applied_index.store(applied_index);
// 读取键值对数量
uint64_t count;
file.read(reinterpret_cast<char*>(&count), sizeof(count));
// 读取所有键值对
for (uint64_t i = 0; i < count; ++i) {
uint32_t key_len;
file.read(reinterpret_cast<char*>(&key_len), sizeof(key_len));
std::string key(key_len, '\0');
file.read(&key[0], key_len);
uint32_t value_len;
file.read(reinterpret_cast<char*>(&value_len), sizeof(value_len));
std::string value(value_len, '\0');
file.read(&value[0], value_len);
_kv_store[key] = value;
}
}
file.close();
LOG(INFO) << "Loaded snapshot with " << _kv_store.size()
<< " keys at index " << _applied_index.load();
return 0;
}
// 获取值
bool get(const std::string& key, std::string* value) const {
std::unique_lock<bthread::Mutex> lock(_mutex);
auto it = _kv_store.find(key);
if (it != _kv_store.end()) {
*value = it->second;
return true;
}
return false;
}
// 获取所有键
std::vector<std::string> list_keys() const {
std::unique_lock<bthread::Mutex> lock(_mutex);
std::vector<std::string> keys;
for (const auto& pair : _kv_store) {
keys.push_back(pair.first);
}
return keys;
}
size_t size() const {
std::unique_lock<bthread::Mutex> lock(_mutex);
return _kv_store.size();
}
};
// KV 操作的回调
class KVClosure : public braft::Closure {
public:
void Run() override {
if (status().ok()) {
LOG(INFO) << "KV operation succeeded, result: " << _success
<< ", old_value: " << _old_value;
} else {
LOG(ERROR) << "KV operation failed: " << status();
}
delete this;
}
void set_result(bool success, const std::string& old_value) {
_success = success;
_old_value = old_value;
}
bool get_success() const { return _success; }
const std::string& get_old_value() const { return _old_value; }
private:
bool _success = false;
std::string _old_value;
};
|