1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
| public class SnapshotManager {
private final HashicorpRaft raft;
private final SnapshotStorage snapshotStorage;
private final ScheduledExecutorService executor;
private volatile boolean snapshotInProgress = false;
// 快照存储接口
public interface SnapshotStorage {
void saveSnapshot(SnapshotMeta meta, byte[] data) throws Exception;
byte[] loadSnapshot(String snapshotId) throws Exception;
List<SnapshotMeta> listSnapshots() throws Exception;
void deleteSnapshot(String snapshotId) throws Exception;
}
// 快照任务
public static class SnapshotTask {
public final CompletableFuture<SnapshotMeta> future = new CompletableFuture<>();
public final long index;
public final long term;
public final boolean forced;
public SnapshotTask(long index, long term, boolean forced) {
this.index = index;
this.term = term;
this.forced = forced;
}
}
// 增量快照信息
public static class IncrementalSnapshot {
public final SnapshotMeta baseMeta;
public final List<LogEntry> incrementalEntries;
public final long newIndex;
public final long newTerm;
public IncrementalSnapshot(SnapshotMeta baseMeta, List<LogEntry> incrementalEntries,
long newIndex, long newTerm) {
this.baseMeta = baseMeta;
this.incrementalEntries = new ArrayList<>(incrementalEntries);
this.newIndex = newIndex;
this.newTerm = newTerm;
}
}
public SnapshotManager(HashicorpRaft raft, SnapshotStorage snapshotStorage) {
this.raft = raft;
this.snapshotStorage = snapshotStorage;
this.executor = Executors.newSingleThreadScheduledExecutor();
}
// 检查是否需要创建快照
public boolean shouldCreateSnapshot() {
long lastSnapshotIndex = raft.getLastSnapshotIndex();
long currentIndex = raft.getLastLogIndex();
return (currentIndex - lastSnapshotIndex) >= raft.getConfig().snapshotThreshold;
}
// 异步创建快照
public CompletableFuture<SnapshotMeta> createSnapshotAsync() {
if (snapshotInProgress) {
return CompletableFuture.failedFuture(
new IllegalStateException("Snapshot already in progress"));
}
SnapshotTask task = new SnapshotTask(raft.getLastApplied(),
raft.getCurrentTerm(), false);
executor.submit(() -> performSnapshot(task));
return task.future;
}
// 强制创建快照
public CompletableFuture<SnapshotMeta> forceSnapshot() {
SnapshotTask task = new SnapshotTask(raft.getLastApplied(),
raft.getCurrentTerm(), true);
executor.submit(() -> performSnapshot(task));
return task.future;
}
private void performSnapshot(SnapshotTask task) {
try {
snapshotInProgress = true;
// 1. 检查是否可以创建增量快照
SnapshotMeta incrementalBase = findIncrementalBase(task.index);
SnapshotMeta newSnapshot;
if (incrementalBase != null && !task.forced) {
newSnapshot = createIncrementalSnapshot(incrementalBase, task);
} else {
newSnapshot = createFullSnapshot(task);
}
// 2. 清理旧快照
cleanupOldSnapshots();
// 3. 压缩日志
if (newSnapshot != null) {
raft.getStorage().compactLogs(newSnapshot.index);
}
task.future.complete(newSnapshot);
} catch (Exception e) {
task.future.completeExceptionally(e);
} finally {
snapshotInProgress = false;
}
}
// 查找增量快照基础
private SnapshotMeta findIncrementalBase(long targetIndex) {
try {
List<SnapshotMeta> snapshots = snapshotStorage.listSnapshots();
return snapshots.stream()
.filter(s -> s.index <= targetIndex)
.max(Comparator.comparing(s -> s.index))
.orElse(null);
} catch (Exception e) {
logger.warning("Failed to find incremental base: " + e.getMessage());
return null;
}
}
// 创建增量快照
private SnapshotMeta createIncrementalSnapshot(SnapshotMeta base, SnapshotTask task) {
try {
// 获取增量数据
List<LogEntry> incrementalEntries = raft.getStorage()
.getLogEntries(base.index + 1, task.index);
if (incrementalEntries.size() > raft.getConfig().snapshotThreshold / 2) {
// 增量太大,创建全量快照
return createFullSnapshot(task);
}
// 序列化增量数据
IncrementalSnapshot incremental = new IncrementalSnapshot(
base, incrementalEntries, task.index, task.term);
byte[] data = serializeIncrementalSnapshot(incremental);
String snapshotId = generateSnapshotId(task.index, task.term, true);
SnapshotMeta meta = new SnapshotMeta(snapshotId, task.index, task.term,
raft.getCurrentPeers(), data.length);
snapshotStorage.saveSnapshot(meta, data);
logger.info(String.format("Created incremental snapshot %s (base: %s, entries: %d)",
snapshotId, base.id, incrementalEntries.size()));
return meta;
} catch (Exception e) {
logger.warning("Failed to create incremental snapshot: " + e.getMessage());
return createFullSnapshot(task);
}
}
// 创建全量快照
private SnapshotMeta createFullSnapshot(SnapshotTask task) {
try {
// 获取状态机快照
byte[] stateMachineData = raft.getStateMachine().createSnapshot();
// 获取配置信息
Set<String> peers = raft.getCurrentPeers();
// 创建快照数据
FullSnapshot fullSnapshot = new FullSnapshot(stateMachineData, peers,
task.index, task.term);
byte[] data = serializeFullSnapshot(fullSnapshot);
String snapshotId = generateSnapshotId(task.index, task.term, false);
SnapshotMeta meta = new SnapshotMeta(snapshotId, task.index, task.term,
peers, data.length);
snapshotStorage.saveSnapshot(meta, data);
logger.info(String.format("Created full snapshot %s (size: %d bytes)",
snapshotId, data.length));
return meta;
} catch (Exception e) {
logger.severe("Failed to create full snapshot: " + e.getMessage());
throw new RuntimeException(e);
}
}
// 清理旧快照
private void cleanupOldSnapshots() {
try {
List<SnapshotMeta> snapshots = snapshotStorage.listSnapshots();
// 保留最新的3个快照
snapshots.stream()
.sorted(Comparator.comparing((SnapshotMeta s) -> s.index).reversed())
.skip(3)
.forEach(snapshot -> {
try {
snapshotStorage.deleteSnapshot(snapshot.id);
logger.info("Deleted old snapshot: " + snapshot.id);
} catch (Exception e) {
logger.warning("Failed to delete snapshot " + snapshot.id + ": " + e.getMessage());
}
});
} catch (Exception e) {
logger.warning("Failed to cleanup old snapshots: " + e.getMessage());
}
}
// 恢复快照
public void restoreSnapshot(SnapshotMeta meta) {
try {
byte[] data = snapshotStorage.loadSnapshot(meta.id);
if (meta.id.contains("incremental")) {
restoreIncrementalSnapshot(data, meta);
} else {
restoreFullSnapshot(data, meta);
}
raft.setLastSnapshotIndex(meta.index);
raft.setLastSnapshotTerm(meta.term);
logger.info("Successfully restored snapshot: " + meta.id);
} catch (Exception e) {
logger.severe("Failed to restore snapshot " + meta.id + ": " + e.getMessage());
throw new RuntimeException(e);
}
}
private void restoreIncrementalSnapshot(byte[] data, SnapshotMeta meta) throws Exception {
IncrementalSnapshot incremental = deserializeIncrementalSnapshot(data);
// 1. 恢复基础快照
restoreSnapshot(incremental.baseMeta);
// 2. 应用增量数据
for (LogEntry entry : incremental.incrementalEntries) {
raft.getStateMachine().apply(entry);
}
}
private void restoreFullSnapshot(byte[] data, SnapshotMeta meta) throws Exception {
FullSnapshot fullSnapshot = deserializeFullSnapshot(data);
// 恢复状态机
raft.getStateMachine().restoreSnapshot(fullSnapshot.stateMachineData);
// 更新配置
raft.updateConfiguration(fullSnapshot.peers);
}
// 序列化和反序列化方法
private byte[] serializeIncrementalSnapshot(IncrementalSnapshot snapshot) {
// 实现序列化逻辑
ByteBuffer buffer = ByteBuffer.allocate(8192);
// ... 序列化实现
return buffer.array();
}
private IncrementalSnapshot deserializeIncrementalSnapshot(byte[] data) {
// 实现反序列化逻辑
ByteBuffer buffer = ByteBuffer.wrap(data);
// ... 反序列化实现
return null; // 示例返回
}
private byte[] serializeFullSnapshot(FullSnapshot snapshot) {
// 实现序列化逻辑
return new byte[0]; // 示例返回
}
private FullSnapshot deserializeFullSnapshot(byte[] data) {
// 实现反序列化逻辑
return null; // 示例返回
}
private String generateSnapshotId(long index, long term, boolean incremental) {
String type = incremental ? "incremental" : "full";
return String.format("%s_%d_%d_%d", type, index, term, System.currentTimeMillis());
}
// 全量快照数据结构
private static class FullSnapshot {
public final byte[] stateMachineData;
public final Set<String> peers;
public final long index;
public final long term;
public FullSnapshot(byte[] stateMachineData, Set<String> peers, long index, long term) {
this.stateMachineData = stateMachineData.clone();
this.peers = new HashSet<>(peers);
this.index = index;
this.term = term;
}
}
}
|