1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
| import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* Paxos算法实现
*/
public class PaxosAlgorithm {
/**
* 提案类
*/
public static class Proposal {
private final long proposalNumber;
private final String value;
private final String proposerId;
private final long timestamp;
public Proposal(long proposalNumber, String value, String proposerId) {
this.proposalNumber = proposalNumber;
this.value = value;
this.proposerId = proposerId;
this.timestamp = System.currentTimeMillis();
}
public long getProposalNumber() { return proposalNumber; }
public String getValue() { return value; }
public String getProposerId() { return proposerId; }
public long getTimestamp() { return timestamp; }
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (!(obj instanceof Proposal)) return false;
Proposal other = (Proposal) obj;
return proposalNumber == other.proposalNumber &&
Objects.equals(proposerId, other.proposerId);
}
@Override
public int hashCode() {
return Objects.hash(proposalNumber, proposerId);
}
@Override
public String toString() {
return String.format("Proposal{n=%d, v='%s', proposer='%s'}",
proposalNumber, value, proposerId);
}
}
/**
* Promise响应类
*/
public static class PromiseResponse {
private final boolean accepted;
private final Proposal acceptedProposal;
private final String acceptorId;
private final String reason;
public PromiseResponse(boolean accepted, Proposal acceptedProposal,
String acceptorId, String reason) {
this.accepted = accepted;
this.acceptedProposal = acceptedProposal;
this.acceptorId = acceptorId;
this.reason = reason;
}
public boolean isAccepted() { return accepted; }
public Proposal getAcceptedProposal() { return acceptedProposal; }
public String getAcceptorId() { return acceptorId; }
public String getReason() { return reason; }
}
/**
* Accept响应类
*/
public static class AcceptResponse {
private final boolean accepted;
private final Proposal proposal;
private final String acceptorId;
private final String reason;
public AcceptResponse(boolean accepted, Proposal proposal,
String acceptorId, String reason) {
this.accepted = accepted;
this.proposal = proposal;
this.acceptorId = acceptorId;
this.reason = reason;
}
public boolean isAccepted() { return accepted; }
public Proposal getProposal() { return proposal; }
public String getAcceptorId() { return acceptorId; }
public String getReason() { return reason; }
}
/**
* Proposer实现
*/
public static class Proposer {
private final String proposerId;
private final AtomicLong proposalCounter;
private final List<Acceptor> acceptors;
private final ExecutorService executor;
private final int majoritySize;
public Proposer(String proposerId, List<Acceptor> acceptors) {
this.proposerId = proposerId;
this.proposalCounter = new AtomicLong(0);
this.acceptors = acceptors;
this.executor = Executors.newCachedThreadPool();
this.majoritySize = (acceptors.size() / 2) + 1;
}
/**
* 提议一个值
*/
public CompletableFuture<String> propose(String value) {
long proposalNumber = generateProposalNumber();
Proposal proposal = new Proposal(proposalNumber, value, proposerId);
System.out.printf("Proposer %s 开始提议: %s%n", proposerId, proposal);
return preparePhase(proposal)
.thenCompose(prepareResult -> {
if (prepareResult == null) {
return CompletableFuture.completedFuture("PREPARE_FAILED");
}
return acceptPhase(prepareResult);
})
.exceptionally(throwable -> {
System.err.printf("Proposer %s 提议失败: %s%n",
proposerId, throwable.getMessage());
return "FAILED: " + throwable.getMessage();
});
}
/**
* Prepare阶段
*/
private CompletableFuture<Proposal> preparePhase(Proposal proposal) {
System.out.printf("Proposer %s 开始Prepare阶段: n=%d%n",
proposerId, proposal.getProposalNumber());
List<CompletableFuture<PromiseResponse>> prepareFutures = new ArrayList<>();
for (Acceptor acceptor : acceptors) {
CompletableFuture<PromiseResponse> future = CompletableFuture
.supplyAsync(() -> acceptor.prepare(proposal), executor);
prepareFutures.add(future);
}
return CompletableFuture.allOf(prepareFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
List<PromiseResponse> responses = prepareFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
return processPrepareResponses(proposal, responses);
});
}
/**
* 处理Prepare响应
*/
private Proposal processPrepareResponses(Proposal originalProposal,
List<PromiseResponse> responses) {
List<PromiseResponse> promises = responses.stream()
.filter(PromiseResponse::isAccepted)
.collect(Collectors.toList());
System.out.printf("Proposer %s 收到 %d/%d Promise响应%n",
proposerId, promises.size(), responses.size());
if (promises.size() < majoritySize) {
System.out.printf("Proposer %s Prepare失败: 未获得多数Promise%n", proposerId);
return null;
}
// 选择已接受的提案中编号最大的值
Proposal highestProposal = promises.stream()
.map(PromiseResponse::getAcceptedProposal)
.filter(Objects::nonNull)
.max(Comparator.comparingLong(Proposal::getProposalNumber))
.orElse(null);
if (highestProposal != null) {
// 使用已接受的值
return new Proposal(originalProposal.getProposalNumber(),
highestProposal.getValue(), proposerId);
} else {
// 使用原始提议值
return originalProposal;
}
}
/**
* Accept阶段
*/
private CompletableFuture<String> acceptPhase(Proposal proposal) {
System.out.printf("Proposer %s 开始Accept阶段: %s%n", proposerId, proposal);
List<CompletableFuture<AcceptResponse>> acceptFutures = new ArrayList<>();
for (Acceptor acceptor : acceptors) {
CompletableFuture<AcceptResponse> future = CompletableFuture
.supplyAsync(() -> acceptor.accept(proposal), executor);
acceptFutures.add(future);
}
return CompletableFuture.allOf(acceptFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
List<AcceptResponse> responses = acceptFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
return processAcceptResponses(proposal, responses);
});
}
/**
* 处理Accept响应
*/
private String processAcceptResponses(Proposal proposal, List<AcceptResponse> responses) {
List<AcceptResponse> accepts = responses.stream()
.filter(AcceptResponse::isAccepted)
.collect(Collectors.toList());
System.out.printf("Proposer %s 收到 %d/%d Accept响应%n",
proposerId, accepts.size(), responses.size());
if (accepts.size() >= majoritySize) {
System.out.printf("Proposer %s 提议成功: %s%n", proposerId, proposal);
return "SUCCESS: " + proposal.getValue();
} else {
System.out.printf("Proposer %s Accept失败: 未获得多数Accept%n", proposerId);
return "ACCEPT_FAILED";
}
}
/**
* 生成提案编号
*/
private long generateProposalNumber() {
// 使用时间戳和计数器生成唯一递增的提案编号
long timestamp = System.currentTimeMillis();
long counter = proposalCounter.incrementAndGet();
return (timestamp << 16) | (counter & 0xFFFF);
}
public void shutdown() {
executor.shutdown();
}
}
/**
* Acceptor实现
*/
public static class Acceptor {
private final String acceptorId;
private volatile Long promisedProposalNumber;
private volatile Proposal acceptedProposal;
private final Object lock = new Object();
public Acceptor(String acceptorId) {
this.acceptorId = acceptorId;
this.promisedProposalNumber = null;
this.acceptedProposal = null;
}
/**
* 处理Prepare请求
*/
public PromiseResponse prepare(Proposal proposal) {
synchronized (lock) {
long proposalNumber = proposal.getProposalNumber();
System.out.printf("Acceptor %s 收到Prepare请求: n=%d%n",
acceptorId, proposalNumber);
// 检查是否已经承诺过更高编号的提案
if (promisedProposalNumber != null && proposalNumber <= promisedProposalNumber) {
String reason = String.format("已承诺更高编号的提案: %d", promisedProposalNumber);
System.out.printf("Acceptor %s 拒绝Prepare: %s%n", acceptorId, reason);
return new PromiseResponse(false, null, acceptorId, reason);
}
// 承诺不再接受编号小于n的提案
promisedProposalNumber = proposalNumber;
System.out.printf("Acceptor %s 承诺提案: n=%d%n", acceptorId, proposalNumber);
return new PromiseResponse(true, acceptedProposal, acceptorId, "Promise成功");
}
}
/**
* 处理Accept请求
*/
public AcceptResponse accept(Proposal proposal) {
synchronized (lock) {
long proposalNumber = proposal.getProposalNumber();
System.out.printf("Acceptor %s 收到Accept请求: %s%n", acceptorId, proposal);
// 检查是否违反了之前的承诺
if (promisedProposalNumber != null && proposalNumber < promisedProposalNumber) {
String reason = String.format("违反承诺: 承诺编号=%d, 请求编号=%d",
promisedProposalNumber, proposalNumber);
System.out.printf("Acceptor %s 拒绝Accept: %s%n", acceptorId, reason);
return new AcceptResponse(false, proposal, acceptorId, reason);
}
// 接受提案
acceptedProposal = proposal;
promisedProposalNumber = proposalNumber;
System.out.printf("Acceptor %s 接受提案: %s%n", acceptorId, proposal);
return new AcceptResponse(true, proposal, acceptorId, "Accept成功");
}
}
public String getAcceptorId() {
return acceptorId;
}
public Proposal getAcceptedProposal() {
synchronized (lock) {
return acceptedProposal;
}
}
}
/**
* Learner实现
*/
public static class Learner {
private final String learnerId;
private final List<Acceptor> acceptors;
private final int majoritySize;
public Learner(String learnerId, List<Acceptor> acceptors) {
this.learnerId = learnerId;
this.acceptors = acceptors;
this.majoritySize = (acceptors.size() / 2) + 1;
}
/**
* 学习已达成共识的值
*/
public String learn() {
Map<Proposal, Integer> proposalCounts = new HashMap<>();
for (Acceptor acceptor : acceptors) {
Proposal accepted = acceptor.getAcceptedProposal();
if (accepted != null) {
proposalCounts.put(accepted, proposalCounts.getOrDefault(accepted, 0) + 1);
}
}
// 查找获得多数接受的提案
for (Map.Entry<Proposal, Integer> entry : proposalCounts.entrySet()) {
if (entry.getValue() >= majoritySize) {
Proposal consensusProposal = entry.getKey();
System.out.printf("Learner %s 学习到共识值: %s%n",
learnerId, consensusProposal.getValue());
return consensusProposal.getValue();
}
}
System.out.printf("Learner %s 未发现共识值%n", learnerId);
return null;
}
}
}
/**
* Paxos演示程序
*/
class PaxosDemo {
public static void main(String[] args) throws Exception {
System.out.println("=== Paxos算法演示 ===");
// 创建5个Acceptor
List<PaxosAlgorithm.Acceptor> acceptors = Arrays.asList(
new PaxosAlgorithm.Acceptor("A1"),
new PaxosAlgorithm.Acceptor("A2"),
new PaxosAlgorithm.Acceptor("A3"),
new PaxosAlgorithm.Acceptor("A4"),
new PaxosAlgorithm.Acceptor("A5")
);
// 创建Proposer
PaxosAlgorithm.Proposer proposer1 = new PaxosAlgorithm.Proposer("P1", acceptors);
PaxosAlgorithm.Proposer proposer2 = new PaxosAlgorithm.Proposer("P2", acceptors);
// 创建Learner
PaxosAlgorithm.Learner learner = new PaxosAlgorithm.Learner("L1", acceptors);
try {
// 测试单个提议
System.out.println("\n--- 测试1: 单个提议 ---");
String result1 = proposer1.propose("value_A").get();
System.out.println("提议结果: " + result1);
String learned1 = learner.learn();
System.out.println("学习结果: " + learned1);
// 测试并发提议
System.out.println("\n--- 测试2: 并发提议 ---");
CompletableFuture<String> future1 = proposer1.propose("value_B");
CompletableFuture<String> future2 = proposer2.propose("value_C");
String result2 = future1.get();
String result3 = future2.get();
System.out.println("Proposer1结果: " + result2);
System.out.println("Proposer2结果: " + result3);
String learned2 = learner.learn();
System.out.println("最终共识值: " + learned2);
} finally {
proposer1.shutdown();
proposer2.shutdown();
}
}
}
|