1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
| import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
/**
* TCC事务管理器核心实现
*/
@Slf4j
public class TCCTransactionManager {
// 事务存储
private final TransactionStore transactionStore;
// 参与者注册表
private final Map<String, TCCParticipant> participants = new ConcurrentHashMap<>();
// 事务执行线程池
private final ExecutorService executorService;
// 事务超时时间(毫秒)
private final long transactionTimeout;
// 重试策略
private final RetryPolicy retryPolicy;
// 事务恢复调度器
private final ScheduledExecutorService recoveryScheduler;
public TCCTransactionManager(TransactionStore transactionStore,
int threadPoolSize,
long transactionTimeout) {
this.transactionStore = transactionStore;
this.executorService = Executors.newFixedThreadPool(threadPoolSize);
this.transactionTimeout = transactionTimeout;
this.retryPolicy = new ExponentialBackoffRetryPolicy();
this.recoveryScheduler = Executors.newScheduledThreadPool(2);
// 启动事务恢复任务
startRecoveryTask();
}
/**
* 注册TCC参与者
*/
public void registerParticipant(String name, TCCParticipant participant) {
participants.put(name, participant);
log.info("Registered TCC participant: {}", name);
}
/**
* 执行TCC事务
*/
public <T> CompletableFuture<T> executeTransaction(TCCTransaction<T> transaction) {
String transactionId = generateTransactionId();
TransactionContext context = new TransactionContext(transactionId);
// 创建事务记录
TransactionRecord record = new TransactionRecord();
record.setTransactionId(transactionId);
record.setStatus(TransactionStatus.TRYING);
record.setCreateTime(System.currentTimeMillis());
record.setParticipants(new ArrayList<>());
// 保存事务记录
transactionStore.save(record);
return CompletableFuture.supplyAsync(() -> {
try {
// 设置事务上下文
TransactionContextHolder.set(context);
// 执行Try阶段
log.info("Starting Try phase for transaction: {}", transactionId);
List<ParticipantRecord> tryResults = executeTryPhase(transaction, context, record);
if (allTrySuccess(tryResults)) {
// Try全部成功,执行Confirm阶段
log.info("Try phase succeeded, starting Confirm phase: {}", transactionId);
record.setStatus(TransactionStatus.CONFIRMING);
transactionStore.update(record);
executeConfirmPhase(tryResults, context, record);
record.setStatus(TransactionStatus.CONFIRMED);
transactionStore.update(record);
log.info("Transaction confirmed successfully: {}", transactionId);
return transaction.getResult();
} else {
// Try存在失败,执行Cancel阶段
log.info("Try phase failed, starting Cancel phase: {}", transactionId);
record.setStatus(TransactionStatus.CANCELING);
transactionStore.update(record);
executeCancelPhase(tryResults, context, record);
record.setStatus(TransactionStatus.CANCELED);
transactionStore.update(record);
log.info("Transaction canceled successfully: {}", transactionId);
throw new TCCTransactionException("Transaction failed in Try phase");
}
} catch (Exception e) {
log.error("Transaction failed: {}", transactionId, e);
handleTransactionFailure(record, context, e);
throw new TCCTransactionException("Transaction execution failed", e);
} finally {
TransactionContextHolder.clear();
}
}, executorService);
}
/**
* 执行Try阶段
*/
private List<ParticipantRecord> executeTryPhase(TCCTransaction<?> transaction,
TransactionContext context,
TransactionRecord record) {
List<ParticipantRecord> results = new ArrayList<>();
List<CompletableFuture<ParticipantRecord>> futures = new ArrayList<>();
for (TCCAction action : transaction.getActions()) {
CompletableFuture<ParticipantRecord> future = CompletableFuture.supplyAsync(() -> {
ParticipantRecord participant = new ParticipantRecord();
participant.setParticipantName(action.getParticipantName());
participant.setActionId(UUID.randomUUID().toString());
try {
TCCParticipant tccParticipant = participants.get(action.getParticipantName());
if (tccParticipant == null) {
throw new IllegalStateException("Participant not found: " + action.getParticipantName());
}
// 执行Try操作
Object result = tccParticipant.doTry(context, action.getRequest());
participant.setTryResult(result);
participant.setStatus(ParticipantStatus.TRY_SUCCESS);
participant.setTryTime(System.currentTimeMillis());
log.info("Try succeeded for participant: {}", action.getParticipantName());
} catch (Exception e) {
participant.setStatus(ParticipantStatus.TRY_FAILED);
participant.setErrorMessage(e.getMessage());
log.error("Try failed for participant: {}", action.getParticipantName(), e);
}
return participant;
}, executorService);
futures.add(future);
}
// 等待所有Try操作完成
try {
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
allFutures.get(transactionTimeout, TimeUnit.MILLISECONDS);
for (CompletableFuture<ParticipantRecord> future : futures) {
ParticipantRecord result = future.get();
results.add(result);
record.getParticipants().add(result);
}
transactionStore.update(record);
} catch (Exception e) {
log.error("Error waiting for Try phase completion", e);
throw new TCCTransactionException("Try phase execution timeout or failed", e);
}
return results;
}
/**
* 执行Confirm阶段
*/
private void executeConfirmPhase(List<ParticipantRecord> participants,
TransactionContext context,
TransactionRecord record) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (ParticipantRecord participant : participants) {
if (participant.getStatus() == ParticipantStatus.TRY_SUCCESS) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
executeWithRetry(() -> {
TCCParticipant tccParticipant = this.participants.get(participant.getParticipantName());
tccParticipant.doConfirm(context, participant.getTryResult());
participant.setStatus(ParticipantStatus.CONFIRMED);
participant.setConfirmTime(System.currentTimeMillis());
log.info("Confirm succeeded for participant: {}", participant.getParticipantName());
}, "Confirm", participant.getParticipantName());
}, executorService);
futures.add(future);
}
}
// 等待所有Confirm操作完成
try {
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
allFutures.get(transactionTimeout, TimeUnit.MILLISECONDS);
transactionStore.update(record);
} catch (Exception e) {
log.error("Error during Confirm phase", e);
// Confirm阶段失败需要持续重试
scheduleRetry(record, TransactionStatus.CONFIRMING);
}
}
/**
* 执行Cancel阶段
*/
private void executeCancelPhase(List<ParticipantRecord> participants,
TransactionContext context,
TransactionRecord record) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (ParticipantRecord participant : participants) {
if (participant.getStatus() == ParticipantStatus.TRY_SUCCESS) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
executeWithRetry(() -> {
TCCParticipant tccParticipant = this.participants.get(participant.getParticipantName());
tccParticipant.doCancel(context, participant.getTryResult());
participant.setStatus(ParticipantStatus.CANCELED);
participant.setCancelTime(System.currentTimeMillis());
log.info("Cancel succeeded for participant: {}", participant.getParticipantName());
}, "Cancel", participant.getParticipantName());
}, executorService);
futures.add(future);
}
}
// 等待所有Cancel操作完成
try {
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
allFutures.get(transactionTimeout, TimeUnit.MILLISECONDS);
transactionStore.update(record);
} catch (Exception e) {
log.error("Error during Cancel phase", e);
// Cancel阶段失败需要持续重试
scheduleRetry(record, TransactionStatus.CANCELING);
}
}
/**
* 带重试的执行
*/
private void executeWithRetry(Runnable action, String phase, String participant) {
int maxRetries = retryPolicy.getMaxRetries();
long delay = retryPolicy.getInitialDelay();
for (int i = 0; i <= maxRetries; i++) {
try {
action.run();
return; // 成功执行
} catch (Exception e) {
if (i == maxRetries) {
log.error("{} failed after {} retries for participant: {}",
phase, maxRetries, participant, e);
throw new TCCTransactionException(phase + " failed after max retries", e);
}
log.warn("{} failed, retrying... (attempt {}/{}) for participant: {}",
phase, i + 1, maxRetries, participant);
try {
Thread.sleep(delay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new TCCTransactionException("Retry interrupted", ie);
}
delay = retryPolicy.getNextDelay(delay);
}
}
}
/**
* 判断所有Try是否成功
*/
private boolean allTrySuccess(List<ParticipantRecord> results) {
return results.stream().allMatch(r -> r.getStatus() == ParticipantStatus.TRY_SUCCESS);
}
/**
* 处理事务失败
*/
private void handleTransactionFailure(TransactionRecord record,
TransactionContext context,
Exception e) {
try {
record.setStatus(TransactionStatus.FAILED);
record.setErrorMessage(e.getMessage());
transactionStore.update(record);
// 尝试执行补偿
if (record.getStatus() == TransactionStatus.TRYING ||
record.getStatus() == TransactionStatus.CONFIRMING) {
scheduleRetry(record, TransactionStatus.CANCELING);
}
} catch (Exception ex) {
log.error("Failed to handle transaction failure", ex);
}
}
/**
* 调度重试任务
*/
private void scheduleRetry(TransactionRecord record, TransactionStatus targetStatus) {
recoveryScheduler.schedule(() -> {
try {
recoverTransaction(record, targetStatus);
} catch (Exception e) {
log.error("Failed to recover transaction: {}", record.getTransactionId(), e);
}
}, retryPolicy.getInitialDelay(), TimeUnit.MILLISECONDS);
}
/**
* 恢复事务
*/
private void recoverTransaction(TransactionRecord record, TransactionStatus targetStatus) {
TransactionContext context = new TransactionContext(record.getTransactionId());
if (targetStatus == TransactionStatus.CONFIRMING) {
executeConfirmPhase(record.getParticipants(), context, record);
} else if (targetStatus == TransactionStatus.CANCELING) {
executeCancelPhase(record.getParticipants(), context, record);
}
}
/**
* 启动事务恢复任务
*/
private void startRecoveryTask() {
recoveryScheduler.scheduleWithFixedDelay(() -> {
try {
recoverPendingTransactions();
} catch (Exception e) {
log.error("Error in recovery task", e);
}
}, 30, 60, TimeUnit.SECONDS);
}
/**
* 恢复未完成的事务
*/
private void recoverPendingTransactions() {
List<TransactionRecord> pendingTransactions = transactionStore.findPendingTransactions();
for (TransactionRecord record : pendingTransactions) {
if (isTimeout(record)) {
log.info("Recovering timeout transaction: {}", record.getTransactionId());
if (record.getStatus() == TransactionStatus.TRYING) {
// Try阶段超时,执行Cancel
scheduleRetry(record, TransactionStatus.CANCELING);
} else if (record.getStatus() == TransactionStatus.CONFIRMING) {
// Confirm阶段超时,继续Confirm
scheduleRetry(record, TransactionStatus.CONFIRMING);
} else if (record.getStatus() == TransactionStatus.CANCELING) {
// Cancel阶段超时,继续Cancel
scheduleRetry(record, TransactionStatus.CANCELING);
}
}
}
}
/**
* 判断事务是否超时
*/
private boolean isTimeout(TransactionRecord record) {
return System.currentTimeMillis() - record.getCreateTime() > transactionTimeout;
}
/**
* 生成事务ID
*/
private String generateTransactionId() {
return "TCC-" + System.currentTimeMillis() + "-" + UUID.randomUUID().toString();
}
/**
* 关闭事务管理器
*/
public void shutdown() {
executorService.shutdown();
recoveryScheduler.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
if (!recoveryScheduler.awaitTermination(60, TimeUnit.SECONDS)) {
recoveryScheduler.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
recoveryScheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
|