引言

BASE理论是对CAP理论的实践性补充,它为大规模分布式系统提供了一种更加灵活的数据一致性模型。与ACID理论强调强一致性不同,BASE理论通过牺牲强一致性来获得更好的可用性和分区容错性,是现代NoSQL数据库和微服务架构的理论基础。

BASE理论概述

三大特性定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
┌─────────────────────────────────────┐
│            BASE 理论                │
├─────────────────────────────────────┤
│                                     │
│ BA - Basically Available           │
│      基本可用                       │
│                                     │
│ S - Soft State                     │
│     软状态                         │
│                                     │
│ E - Eventually Consistent          │
│   最终一致性                       │
│                                     │
└─────────────────────────────────────┘

BASE vs ACID对比

流程图表

关系流向:

1
2
3
4
5
A[事务处理模型] → B[ACID模型]
A → C[BASE模型]
B → D[强一致性]
B → E[低可用性]
B → F[垂直扩展]

详细特性分析

1. 基本可用 (Basically Available)

基本可用指系统在出现故障时,仍然能够保证核心功能的可用性,允许损失部分可用性。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 基本可用性管理器
 */
public class BasicAvailabilityManager {
    private final Map<String, ServiceNode> serviceNodes;
    private final CircuitBreaker circuitBreaker;
    private final LoadBalancer loadBalancer;
    private final ExecutorService executor;

    public BasicAvailabilityManager() {
        this.serviceNodes = new ConcurrentHashMap<>();
        this.circuitBreaker = new CircuitBreaker();
        this.loadBalancer = new LoadBalancer();
        this.executor = Executors.newCachedThreadPool();
    }

    /**
     * 处理请求,确保基本可用性
     */
    public CompletableFuture<ServiceResponse> handleRequest(ServiceRequest request) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                // 1. 检查熔断器状态
                if (circuitBreaker.isOpen()) {
                    return createDegradedResponse(request, "服务熔断中");
                }

                // 2. 选择可用的服务节点
                ServiceNode availableNode = loadBalancer.selectAvailableNode(serviceNodes.values());
                if (availableNode == null) {
                    return createDegradedResponse(request, "暂无可用节点");
                }

                // 3. 执行请求
                ServiceResponse response = availableNode.processRequest(request);

                // 4. 更新熔断器状态
                if (response.isSuccess()) {
                    circuitBreaker.recordSuccess();
                } else {
                    circuitBreaker.recordFailure();
                }

                return response;

            } catch (Exception e) {
                circuitBreaker.recordFailure();
                return createDegradedResponse(request, "服务异常: " + e.getMessage());
            }
        }, executor);
    }

    /**
     * 创建降级响应
     */
    private ServiceResponse createDegradedResponse(ServiceRequest request, String reason) {
        return new ServiceResponse(
            false,
            "降级响应: " + reason,
            Collections.singletonMap("degraded", true)
        );
    }

    public void addServiceNode(String nodeId, ServiceNode node) {
        serviceNodes.put(nodeId, node);
    }

    public void removeServiceNode(String nodeId) {
        serviceNodes.remove(nodeId);
    }
}

/**
 * 熔断器实现
 */
class CircuitBreaker {
    private final int failureThreshold = 5;
    private final long timeoutMs = 60000; // 1分钟
    private final AtomicInteger failureCount = new AtomicInteger(0);
    private final AtomicBoolean isOpen = new AtomicBoolean(false);
    private volatile long lastFailureTime = 0;

    public boolean isOpen() {
        if (isOpen.get()) {
            // 检查是否可以尝试恢复
            if (System.currentTimeMillis() - lastFailureTime > timeoutMs) {
                isOpen.set(false);
                failureCount.set(0);
                return false;
            }
            return true;
        }
        return false;
    }

    public void recordSuccess() {
        failureCount.set(0);
        isOpen.set(false);
    }

    public void recordFailure() {
        lastFailureTime = System.currentTimeMillis();
        if (failureCount.incrementAndGet() >= failureThreshold) {
            isOpen.set(true);
        }
    }
}

/**
 * 负载均衡器
 */
class LoadBalancer {
    private final Random random = new Random();

    public ServiceNode selectAvailableNode(Collection<ServiceNode> nodes) {
        List<ServiceNode> availableNodes = nodes.stream()
            .filter(ServiceNode::isAvailable)
            .collect(Collectors.toList());

        if (availableNodes.isEmpty()) {
            return null;
        }

        // 简单的随机负载均衡
        return availableNodes.get(random.nextInt(availableNodes.size()));
    }
}

/**
 * 服务节点
 */
class ServiceNode {
    private final String nodeId;
    private final AtomicBoolean available;
    private final Random random = new Random();

    public ServiceNode(String nodeId) {
        this.nodeId = nodeId;
        this.available = new AtomicBoolean(true);
    }

    public ServiceResponse processRequest(ServiceRequest request) {
        if (!available.get()) {
            throw new RuntimeException("节点不可用");
        }

        // 模拟处理时间
        try {
            Thread.sleep(random.nextInt(100) + 50);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("处理中断");
        }

        // 模拟偶发故障
        if (random.nextDouble() < 0.1) { // 10% 故障率
            throw new RuntimeException("模拟节点故障");
        }

        return new ServiceResponse(
            true,
            "处理成功",
            Map.of("nodeId", nodeId, "timestamp", System.currentTimeMillis())
        );
    }

    public boolean isAvailable() {
        return available.get();
    }

    public void setAvailable(boolean available) {
        this.available.set(available);
    }

    public String getNodeId() {
        return nodeId;
    }
}

/**
 * 服务请求
 */
class ServiceRequest {
    private final String requestId;
    private final String operation;
    private final Map<String, Object> parameters;

    public ServiceRequest(String requestId, String operation, Map<String, Object> parameters) {
        this.requestId = requestId;
        this.operation = operation;
        this.parameters = parameters;
    }

    // Getters
    public String getRequestId() { return requestId; }
    public String getOperation() { return operation; }
    public Map<String, Object> getParameters() { return parameters; }
}

/**
 * 服务响应
 */
class ServiceResponse {
    private final boolean success;
    private final String message;
    private final Map<String, Object> data;

    public ServiceResponse(boolean success, String message, Map<String, Object> data) {
        this.success = success;
        this.message = message;
        this.data = data;
    }

    // Getters
    public boolean isSuccess() { return success; }
    public String getMessage() { return message; }
    public Map<String, Object> getData() { return data; }
}

// 使用示例
class BasicAvailabilityDemo {
    public static void main(String[] args) throws Exception {
        BasicAvailabilityManager manager = new BasicAvailabilityManager();

        // 添加服务节点
        manager.addServiceNode("node1", new ServiceNode("node1"));
        manager.addServiceNode("node2", new ServiceNode("node2"));
        manager.addServiceNode("node3", new ServiceNode("node3"));

        // 并发测试基本可用性
        List<CompletableFuture<ServiceResponse>> futures = new ArrayList<>();
        for (int i = 0; i < 20; i++) {
            ServiceRequest request = new ServiceRequest(
                "req_" + i,
                "test_operation",
                Map.of("data", "test_data_" + i)
            );
            futures.add(manager.handleRequest(request));
        }

        // 收集结果
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .thenRun(() -> {
                System.out.println("=== 基本可用性测试结果 ===");
                futures.forEach(future -> {
                    try {
                        ServiceResponse response = future.get();
                        System.out.println("响应: " + response.getMessage() +
                                         ", 成功: " + response.isSuccess());
                    } catch (Exception e) {
                        System.err.println("获取响应失败: " + e.getMessage());
                    }
                });
            }).get();
    }
}

2. 软状态 (Soft State)

软状态指系统中的数据不要求在任何时刻都完全一致,允许存在中间状态。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
import java.util.*;
import java.util.concurrent.*;

/**
 * 软状态数据存储
 */
public class SoftStateDataStore {
    private final Map<String, VersionedData> primaryStore;
    private final Map<String, List<VersionedData>> replicationLog;
    private final ScheduledExecutorService scheduler;
    private final ExecutorService replicationExecutor;

    public SoftStateDataStore() {
        this.primaryStore = new ConcurrentHashMap<>();
        this.replicationLog = new ConcurrentHashMap<>();
        this.scheduler = Executors.newScheduledThreadPool(2);
        this.replicationExecutor = Executors.newCachedThreadPool();

        // 启动后台同步任务
        startBackgroundSync();
    }

    /**
     * 写入数据(允许软状态)
     */
    public WriteResult write(String key, String value, WriteOptions options) {
        long timestamp = System.currentTimeMillis();
        String version = generateVersion(timestamp);

        VersionedData data = new VersionedData(value, version, timestamp, WriteStatus.PENDING);

        // 1. 立即写入主存储
        primaryStore.put(key, data);

        // 2. 记录到复制日志
        replicationLog.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>()).add(data);

        // 3. 异步复制到其他节点
        if (options.isAsyncReplication()) {
            scheduleAsyncReplication(key, data);
        } else {
            // 同步复制(仍然允许软状态)
            scheduleSyncReplication(key, data);
        }

        return new WriteResult(true, version, WriteStatus.PENDING);
    }

    /**
     * 读取数据(可能返回软状态)
     */
    public ReadResult read(String key, ReadOptions options) {
        VersionedData data = primaryStore.get(key);

        if (data == null) {
            return new ReadResult(false, null, null, ReadStatus.NOT_FOUND);
        }

        ReadStatus status = determineReadStatus(data, options);

        return new ReadResult(true, data.getValue(), data.getVersion(), status);
    }

    /**
     * 确定读取状态
     */
    private ReadStatus determineReadStatus(VersionedData data, ReadOptions options) {
        long currentTime = System.currentTimeMillis();
        long dataAge = currentTime - data.getTimestamp();

        if (data.getStatus() == WriteStatus.COMMITTED) {
            return ReadStatus.CONSISTENT;
        } else if (dataAge < options.getMaxStalenessMs()) {
            return ReadStatus.SOFT_STATE;
        } else {
            return ReadStatus.STALE;
        }
    }

    /**
     * 异步复制调度
     */
    private void scheduleAsyncReplication(String key, VersionedData data) {
        replicationExecutor.submit(() -> {
            try {
                // 模拟网络延迟
                Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));

                // 模拟复制过程
                if (simulateReplication(key, data)) {
                    data.setStatus(WriteStatus.COMMITTED);
                    System.out.println("异步复制成功: " + key + " -> " + data.getVersion());
                } else {
                    data.setStatus(WriteStatus.FAILED);
                    System.out.println("异步复制失败: " + key + " -> " + data.getVersion());
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                data.setStatus(WriteStatus.FAILED);
            }
        });
    }

    /**
     * 同步复制调度
     */
    private void scheduleSyncReplication(String key, VersionedData data) {
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(ThreadLocalRandom.current().nextInt(50, 200));
                return simulateReplication(key, data);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }, replicationExecutor)
        .thenAccept(success -> {
            if (success) {
                data.setStatus(WriteStatus.COMMITTED);
            } else {
                data.setStatus(WriteStatus.FAILED);
            }
        });
    }

    /**
     * 模拟复制过程
     */
    private boolean simulateReplication(String key, VersionedData data) {
        // 90% 成功率
        return ThreadLocalRandom.current().nextDouble() < 0.9;
    }

    /**
     * 后台同步任务
     */
    private void startBackgroundSync() {
        // 定期清理已提交的复制日志
        scheduler.scheduleWithFixedDelay(() -> {
            replicationLog.entrySet().removeIf(entry -> {
                entry.getValue().removeIf(data ->
                    data.getStatus() == WriteStatus.COMMITTED &&
                    System.currentTimeMillis() - data.getTimestamp() > 300000 // 5分钟
                );
                return entry.getValue().isEmpty();
            });
        }, 60, 60, TimeUnit.SECONDS);

        // 定期重试失败的复制
        scheduler.scheduleWithFixedDelay(() -> {
            replicationLog.forEach((key, dataList) -> {
                dataList.stream()
                    .filter(data -> data.getStatus() == WriteStatus.FAILED)
                    .forEach(data -> {
                        System.out.println("重试复制: " + key + " -> " + data.getVersion());
                        scheduleAsyncReplication(key, data);
                    });
            });
        }, 30, 30, TimeUnit.SECONDS);
    }

    private String generateVersion(long timestamp) {
        return timestamp + "_" + ThreadLocalRandom.current().nextInt(1000);
    }

    public void shutdown() {
        scheduler.shutdown();
        replicationExecutor.shutdown();
    }
}

/**
 * 版本化数据
 */
class VersionedData {
    private final String value;
    private final String version;
    private final long timestamp;
    private volatile WriteStatus status;

    public VersionedData(String value, String version, long timestamp, WriteStatus status) {
        this.value = value;
        this.version = version;
        this.timestamp = timestamp;
        this.status = status;
    }

    // Getters and setters
    public String getValue() { return value; }
    public String getVersion() { return version; }
    public long getTimestamp() { return timestamp; }
    public WriteStatus getStatus() { return status; }
    public void setStatus(WriteStatus status) { this.status = status; }
}

/**
 * 写入状态枚举
 */
enum WriteStatus {
    PENDING("待处理"),
    COMMITTED("已提交"),
    FAILED("失败");

    private final String description;

    WriteStatus(String description) {
        this.description = description;
    }

    public String getDescription() { return description; }
}

/**
 * 读取状态枚举
 */
enum ReadStatus {
    CONSISTENT("一致"),
    SOFT_STATE("软状态"),
    STALE("过期"),
    NOT_FOUND("未找到");

    private final String description;

    ReadStatus(String description) {
        this.description = description;
    }

    public String getDescription() { return description; }
}

/**
 * 写入选项
 */
class WriteOptions {
    private boolean asyncReplication = true;
    private int replicationFactor = 3;
    private long timeoutMs = 5000;

    public boolean isAsyncReplication() { return asyncReplication; }
    public void setAsyncReplication(boolean asyncReplication) { this.asyncReplication = asyncReplication; }

    public int getReplicationFactor() { return replicationFactor; }
    public void setReplicationFactor(int replicationFactor) { this.replicationFactor = replicationFactor; }

    public long getTimeoutMs() { return timeoutMs; }
    public void setTimeoutMs(long timeoutMs) { this.timeoutMs = timeoutMs; }
}

/**
 * 读取选项
 */
class ReadOptions {
    private long maxStalenessMs = 10000; // 最大容忍的数据延迟
    private boolean allowSoftState = true;

    public long getMaxStalenessMs() { return maxStalenessMs; }
    public void setMaxStalenessMs(long maxStalenessMs) { this.maxStalenessMs = maxStalenessMs; }

    public boolean isAllowSoftState() { return allowSoftState; }
    public void setAllowSoftState(boolean allowSoftState) { this.allowSoftState = allowSoftState; }
}

/**
 * 写入结果
 */
class WriteResult {
    private final boolean success;
    private final String version;
    private final WriteStatus status;

    public WriteResult(boolean success, String version, WriteStatus status) {
        this.success = success;
        this.version = version;
        this.status = status;
    }

    public boolean isSuccess() { return success; }
    public String getVersion() { return version; }
    public WriteStatus getStatus() { return status; }
}

/**
 * 读取结果
 */
class ReadResult {
    private final boolean found;
    private final String value;
    private final String version;
    private final ReadStatus status;

    public ReadResult(boolean found, String value, String version, ReadStatus status) {
        this.found = found;
        this.value = value;
        this.version = version;
        this.status = status;
    }

    public boolean isFound() { return found; }
    public String getValue() { return value; }
    public String getVersion() { return version; }
    public ReadStatus getStatus() { return status; }
}

// 软状态演示
class SoftStateDemo {
    public static void main(String[] args) throws Exception {
        SoftStateDataStore store = new SoftStateDataStore();

        System.out.println("=== 软状态数据存储测试 ===");

        // 写入数据
        WriteOptions writeOpts = new WriteOptions();
        writeOpts.setAsyncReplication(true);

        WriteResult writeResult = store.write("user:1", "Alice", writeOpts);
        System.out.println("写入结果: " + writeResult.isSuccess() +
                         ", 状态: " + writeResult.getStatus().getDescription());

        // 立即读取(可能是软状态)
        ReadOptions readOpts = new ReadOptions();
        readOpts.setMaxStalenessMs(5000);

        ReadResult readResult = store.read("user:1", readOpts);
        System.out.println("读取结果: " + readResult.getValue() +
                         ", 状态: " + readResult.getStatus().getDescription());

        // 等待一段时间后再读取
        Thread.sleep(1000);

        ReadResult readResult2 = store.read("user:1", readOpts);
        System.out.println("延迟读取结果: " + readResult2.getValue() +
                         ", 状态: " + readResult2.getStatus().getDescription());

        store.shutdown();
    }
}

3. 最终一致性 (Eventually Consistent)

最终一致性保证系统在停止接收输入后,经过一段时间后所有节点的数据将达到一致状态。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
import java.util.*;
import java.util.concurrent.*;

/**
 * 最终一致性协调器
 */
public class EventualConsistencyCoordinator {
    private final Map<String, Node> nodes;
    private final VersionVector globalVersionVector;
    private final ScheduledExecutorService scheduler;
    private final ExecutorService syncExecutor;
    private final ConflictResolver conflictResolver;

    public EventualConsistencyCoordinator() {
        this.nodes = new ConcurrentHashMap<>();
        this.globalVersionVector = new VersionVector();
        this.scheduler = Executors.newScheduledThreadPool(3);
        this.syncExecutor = Executors.newCachedThreadPool();
        this.conflictResolver = new ConflictResolver();

        startPeriodicSync();
    }

    /**
     * 添加节点
     */
    public void addNode(String nodeId, Node node) {
        nodes.put(nodeId, node);
        globalVersionVector.increment(nodeId);
    }

    /**
     * 分布式写入操作
     */
    public CompletableFuture<Void> distributedWrite(String key, String value, String sourceNodeId) {
        Node sourceNode = nodes.get(sourceNodeId);
        if (sourceNode == null) {
            return CompletableFuture.failedFuture(new IllegalArgumentException("节点不存在: " + sourceNodeId));
        }

        // 1. 在源节点写入
        long timestamp = System.currentTimeMillis();
        VersionedEntry entry = new VersionedEntry(
            value,
            sourceNodeId,
            globalVersionVector.increment(sourceNodeId),
            timestamp
        );

        sourceNode.write(key, entry);

        // 2. 异步传播到其他节点
        return propagateToOtherNodes(key, entry, sourceNodeId);
    }

    /**
     * 传播到其他节点
     */
    private CompletableFuture<Void> propagateToOtherNodes(String key, VersionedEntry entry, String sourceNodeId) {
        List<CompletableFuture<Void>> propagationFutures = new ArrayList<>();

        nodes.entrySet().stream()
            .filter(e -> !e.getKey().equals(sourceNodeId))
            .forEach(e -> {
                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                    try {
                        // 模拟网络延迟
                        Thread.sleep(ThreadLocalRandom.current().nextInt(100, 1000));

                        Node targetNode = e.getValue();
                        targetNode.receiveUpdate(key, entry);

                        System.out.println("传播成功: " + key + " -> " + e.getKey());
                    } catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException("传播中断", ex);
                    } catch (Exception ex) {
                        System.err.println("传播失败: " + key + " -> " + e.getKey() + ", " + ex.getMessage());
                        throw new RuntimeException("传播失败", ex);
                    }
                }, syncExecutor);

                propagationFutures.add(future);
            });

        return CompletableFuture.allOf(propagationFutures.toArray(new CompletableFuture[0]));
    }

    /**
     * 分布式读取操作
     */
    public CompletableFuture<ConsistentReadResult> distributedRead(String key) {
        List<CompletableFuture<VersionedEntry>> readFutures = new ArrayList<>();

        // 从所有节点读取
        nodes.values().forEach(node -> {
            CompletableFuture<VersionedEntry> future = CompletableFuture.supplyAsync(() -> {
                return node.read(key);
            }, syncExecutor);
            readFutures.add(future);
        });

        return CompletableFuture.allOf(readFutures.toArray(new CompletableFuture[0]))
            .thenApply(v -> {
                List<VersionedEntry> entries = readFutures.stream()
                    .map(CompletableFuture::join)
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList());

                return resolveConsistentValue(key, entries);
            });
    }

    /**
     * 解析一致的值
     */
    private ConsistentReadResult resolveConsistentValue(String key, List<VersionedEntry> entries) {
        if (entries.isEmpty()) {
            return new ConsistentReadResult(null, ConsistencyStatus.NOT_FOUND, Collections.emptyList());
        }

        // 检查是否所有节点都有相同的值
        if (entries.stream().allMatch(e -> e.equals(entries.get(0)))) {
            return new ConsistentReadResult(
                entries.get(0).getValue(),
                ConsistencyStatus.CONSISTENT,
                Collections.singletonList(entries.get(0))
            );
        }

        // 存在冲突,需要解决
        VersionedEntry resolvedEntry = conflictResolver.resolve(entries);
        return new ConsistentReadResult(
            resolvedEntry.getValue(),
            ConsistencyStatus.EVENTUALLY_CONSISTENT,
            entries
        );
    }

    /**
     * 启动定期同步
     */
    private void startPeriodicSync() {
        // 反熵同步 - 定期比较和同步节点间的数据
        scheduler.scheduleWithFixedDelay(() -> {
            performAntiEntropySync();
        }, 10, 30, TimeUnit.SECONDS);

        // 版本向量同步
        scheduler.scheduleWithFixedDelay(() -> {
            syncVersionVectors();
        }, 5, 15, TimeUnit.SECONDS);
    }

    /**
     * 反熵同步
     */
    private void performAntiEntropySync() {
        System.out.println("开始反熵同步...");

        List<String> nodeIds = new ArrayList<>(nodes.keySet());
        for (int i = 0; i < nodeIds.size(); i++) {
            for (int j = i + 1; j < nodeIds.size(); j++) {
                String nodeId1 = nodeIds.get(i);
                String nodeId2 = nodeIds.get(j);

                syncExecutor.submit(() -> syncBetweenNodes(nodeId1, nodeId2));
            }
        }
    }

    /**
     * 节点间同步
     */
    private void syncBetweenNodes(String nodeId1, String nodeId2) {
        Node node1 = nodes.get(nodeId1);
        Node node2 = nodes.get(nodeId2);

        if (node1 == null || node2 == null) return;

        try {
            // 比较两个节点的数据
            Map<String, VersionedEntry> data1 = node1.getAllData();
            Map<String, VersionedEntry> data2 = node2.getAllData();

            // 找出差异并同步
            Set<String> allKeys = new HashSet<>();
            allKeys.addAll(data1.keySet());
            allKeys.addAll(data2.keySet());

            for (String key : allKeys) {
                VersionedEntry entry1 = data1.get(key);
                VersionedEntry entry2 = data2.get(key);

                if (entry1 == null && entry2 != null) {
                    node1.receiveUpdate(key, entry2);
                } else if (entry1 != null && entry2 == null) {
                    node2.receiveUpdate(key, entry1);
                } else if (entry1 != null && entry2 != null) {
                    // 比较版本,同步最新的
                    if (entry1.getVersion() > entry2.getVersion()) {
                        node2.receiveUpdate(key, entry1);
                    } else if (entry2.getVersion() > entry1.getVersion()) {
                        node1.receiveUpdate(key, entry2);
                    }
                    // 如果版本相同但值不同,使用冲突解决策略
                    else if (!entry1.getValue().equals(entry2.getValue())) {
                        VersionedEntry resolved = conflictResolver.resolve(Arrays.asList(entry1, entry2));
                        node1.receiveUpdate(key, resolved);
                        node2.receiveUpdate(key, resolved);
                    }
                }
            }

            System.out.println("节点同步完成: " + nodeId1 + " <-> " + nodeId2);
        } catch (Exception e) {
            System.err.println("节点同步失败: " + nodeId1 + " <-> " + nodeId2 + ", " + e.getMessage());
        }
    }

    /**
     * 同步版本向量
     */
    private void syncVersionVectors() {
        nodes.values().forEach(node -> {
            VersionVector nodeVector = node.getVersionVector();
            globalVersionVector.merge(nodeVector);
        });
    }

    public void shutdown() {
        scheduler.shutdown();
        syncExecutor.shutdown();
    }
}

/**
 * 节点实现
 */
class Node {
    private final String nodeId;
    private final Map<String, VersionedEntry> data;
    private final VersionVector versionVector;

    public Node(String nodeId) {
        this.nodeId = nodeId;
        this.data = new ConcurrentHashMap<>();
        this.versionVector = new VersionVector();
    }

    public void write(String key, VersionedEntry entry) {
        data.put(key, entry);
        versionVector.increment(nodeId);
        System.out.println("节点 " + nodeId + " 写入: " + key + " = " + entry.getValue());
    }

    public VersionedEntry read(String key) {
        return data.get(key);
    }

    public void receiveUpdate(String key, VersionedEntry entry) {
        VersionedEntry existing = data.get(key);

        if (existing == null || entry.getVersion() > existing.getVersion()) {
            data.put(key, entry);
            versionVector.update(entry.getNodeId(), entry.getVersion());
            System.out.println("节点 " + nodeId + " 接收更新: " + key + " = " + entry.getValue());
        }
    }

    public Map<String, VersionedEntry> getAllData() {
        return new HashMap<>(data);
    }

    public VersionVector getVersionVector() {
        return versionVector.copy();
    }

    public String getNodeId() {
        return nodeId;
    }
}

/**
 * 版本化条目
 */
class VersionedEntry {
    private final String value;
    private final String nodeId;
    private final long version;
    private final long timestamp;

    public VersionedEntry(String value, String nodeId, long version, long timestamp) {
        this.value = value;
        this.nodeId = nodeId;
        this.version = version;
        this.timestamp = timestamp;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj) return true;
        if (!(obj instanceof VersionedEntry)) return false;
        VersionedEntry other = (VersionedEntry) obj;
        return Objects.equals(value, other.value) &&
               Objects.equals(nodeId, other.nodeId) &&
               version == other.version;
    }

    @Override
    public int hashCode() {
        return Objects.hash(value, nodeId, version);
    }

    // Getters
    public String getValue() { return value; }
    public String getNodeId() { return nodeId; }
    public long getVersion() { return version; }
    public long getTimestamp() { return timestamp; }
}

/**
 * 版本向量
 */
class VersionVector {
    private final Map<String, Long> vector;

    public VersionVector() {
        this.vector = new ConcurrentHashMap<>();
    }

    public long increment(String nodeId) {
        return vector.compute(nodeId, (k, v) -> (v == null) ? 1 : v + 1);
    }

    public void update(String nodeId, long version) {
        vector.compute(nodeId, (k, v) -> (v == null || version > v) ? version : v);
    }

    public void merge(VersionVector other) {
        other.vector.forEach((nodeId, version) -> {
            update(nodeId, version);
        });
    }

    public VersionVector copy() {
        VersionVector copy = new VersionVector();
        copy.vector.putAll(this.vector);
        return copy;
    }

    public Map<String, Long> getVector() {
        return new HashMap<>(vector);
    }
}

/**
 * 冲突解决器
 */
class ConflictResolver {
    /**
     * 解决冲突 - 使用最后写入获胜策略
     */
    public VersionedEntry resolve(List<VersionedEntry> conflictingEntries) {
        return conflictingEntries.stream()
            .max(Comparator.comparingLong(VersionedEntry::getTimestamp))
            .orElse(conflictingEntries.get(0));
    }
}

/**
 * 一致性状态
 */
enum ConsistencyStatus {
    CONSISTENT("一致"),
    EVENTUALLY_CONSISTENT("最终一致"),
    INCONSISTENT("不一致"),
    NOT_FOUND("未找到");

    private final String description;

    ConsistencyStatus(String description) {
        this.description = description;
    }

    public String getDescription() { return description; }
}

/**
 * 一致读取结果
 */
class ConsistentReadResult {
    private final String value;
    private final ConsistencyStatus status;
    private final List<VersionedEntry> allVersions;

    public ConsistentReadResult(String value, ConsistencyStatus status, List<VersionedEntry> allVersions) {
        this.value = value;
        this.status = status;
        this.allVersions = allVersions;
    }

    public String getValue() { return value; }
    public ConsistencyStatus getStatus() { return status; }
    public List<VersionedEntry> getAllVersions() { return allVersions; }
}

// 最终一致性演示
class EventualConsistencyDemo {
    public static void main(String[] args) throws Exception {
        EventualConsistencyCoordinator coordinator = new EventualConsistencyCoordinator();

        // 添加节点
        coordinator.addNode("node1", new Node("node1"));
        coordinator.addNode("node2", new Node("node2"));
        coordinator.addNode("node3", new Node("node3"));

        System.out.println("=== 最终一致性测试 ===");

        // 并发写入不同节点
        List<CompletableFuture<Void>> writeFutures = Arrays.asList(
            coordinator.distributedWrite("user:1", "Alice_v1", "node1"),
            coordinator.distributedWrite("user:1", "Alice_v2", "node2"),
            coordinator.distributedWrite("user:2", "Bob", "node3")
        );

        // 等待写入完成
        CompletableFuture.allOf(writeFutures.toArray(new CompletableFuture[0])).get();

        // 立即读取(可能不一致)
        ConsistentReadResult result1 = coordinator.distributedRead("user:1").get();
        System.out.println("立即读取: " + result1.getValue() +
                         ", 状态: " + result1.getStatus().getDescription());

        // 等待同步
        System.out.println("等待最终一致性...");
        Thread.sleep(5000);

        // 再次读取(应该一致了)
        ConsistentReadResult result2 = coordinator.distributedRead("user:1").get();
        System.out.println("最终读取: " + result2.getValue() +
                         ", 状态: " + result2.getStatus().getDescription());

        coordinator.shutdown();
    }
}

NoSQL数据库中的BASE实践

典型NoSQL系统分析

1
2
3
4
5
6
7
8
9
┌──────────────────┬──────────────┬──────────────┬──────────────┐
│ 数据库类型       │ 基本可用     │ 软状态       │ 最终一致性   │
├──────────────────┼──────────────┼──────────────┼──────────────┤
│ Cassandra        │ 多副本冗余   │ 版本控制     │ Gossip协议   │
│ DynamoDB         │ 自动扩展     │ 异步复制     │ 读修复       │
│ MongoDB          │ 副本集       │ 写关注       │ 读偏好       │
│ CouchDB          │ 分布式架构   │ MVCC         │ 复制冲突解决 │
│ Riak             │ N/R/W配置    │ 向量时钟     │ Merkle树     │
└──────────────────┴──────────────┴──────────────┴──────────────┘

Cassandra-like系统实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
import java.util.*;
import java.util.concurrent.*;

/**
 * 类Cassandra分布式数据库实现
 */
public class CassandraLikeDatabase {
    private final Map<String, DataNode> nodes;
    private final ConsistentHashRing hashRing;
    private final int replicationFactor;
    private final GossipProtocol gossipProtocol;

    public CassandraLikeDatabase(int replicationFactor) {
        this.nodes = new ConcurrentHashMap<>();
        this.hashRing = new ConsistentHashRing();
        this.replicationFactor = replicationFactor;
        this.gossipProtocol = new GossipProtocol();
    }

    /**
     * 添加数据节点
     */
    public void addNode(String nodeId, DataNode node) {
        nodes.put(nodeId, node);
        hashRing.addNode(nodeId);
        gossipProtocol.addNode(nodeId, node);
    }

    /**
     * 写入操作(可调节一致性级别)
     */
    public WriteResponse write(String key, String value, ConsistencyLevel consistencyLevel) {
        // 1. 确定负责的节点
        List<String> responsibleNodes = hashRing.getResponsibleNodes(key, replicationFactor);

        // 2. 根据一致性级别确定需要成功的写入数量
        int requiredWrites = calculateRequiredWrites(responsibleNodes.size(), consistencyLevel);

        // 3. 执行写入
        List<CompletableFuture<Boolean>> writeFutures = new ArrayList<>();
        long timestamp = System.currentTimeMillis();

        for (String nodeId : responsibleNodes) {
            DataNode node = nodes.get(nodeId);
            if (node != null) {
                CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
                    try {
                        return node.write(key, value, timestamp);
                    } catch (Exception e) {
                        System.err.println("写入节点 " + nodeId + " 失败: " + e.getMessage());
                        return false;
                    }
                });
                writeFutures.add(future);
            }
        }

        // 4. 等待足够的写入成功
        return waitForWrites(writeFutures, requiredWrites, consistencyLevel);
    }

    /**
     * 读取操作(可调节一致性级别)
     */
    public ReadResponse read(String key, ConsistencyLevel consistencyLevel) {
        // 1. 确定负责的节点
        List<String> responsibleNodes = hashRing.getResponsibleNodes(key, replicationFactor);

        // 2. 根据一致性级别确定需要读取的节点数量
        int requiredReads = calculateRequiredReads(responsibleNodes.size(), consistencyLevel);

        // 3. 执行读取
        List<CompletableFuture<DataRecord>> readFutures = new ArrayList<>();

        for (int i = 0; i < Math.min(requiredReads, responsibleNodes.size()); i++) {
            String nodeId = responsibleNodes.get(i);
            DataNode node = nodes.get(nodeId);
            if (node != null) {
                CompletableFuture<DataRecord> future = CompletableFuture.supplyAsync(() -> {
                    try {
                        return node.read(key);
                    } catch (Exception e) {
                        System.err.println("读取节点 " + nodeId + " 失败: " + e.getMessage());
                        return null;
                    }
                });
                readFutures.add(future);
            }
        }

        // 4. 收集读取结果并解决冲突
        return collectAndResolveReads(readFutures, key, consistencyLevel);
    }

    private int calculateRequiredWrites(int totalNodes, ConsistencyLevel level) {
        switch (level) {
            case ONE: return 1;
            case QUORUM: return (totalNodes / 2) + 1;
            case ALL: return totalNodes;
            default: return 1;
        }
    }

    private int calculateRequiredReads(int totalNodes, ConsistencyLevel level) {
        switch (level) {
            case ONE: return 1;
            case QUORUM: return (totalNodes / 2) + 1;
            case ALL: return totalNodes;
            default: return 1;
        }
    }

    private WriteResponse waitForWrites(List<CompletableFuture<Boolean>> futures,
                                      int required, ConsistencyLevel level) {
        try {
            int successCount = 0;
            int completedCount = 0;

            for (CompletableFuture<Boolean> future : futures) {
                try {
                    if (future.get(5, TimeUnit.SECONDS)) {
                        successCount++;
                    }
                } catch (Exception e) {
                    // 写入失败
                }
                completedCount++;

                if (successCount >= required) {
                    return new WriteResponse(true, "写入成功", level);
                }

                if (completedCount - successCount > futures.size() - required) {
                    // 无法达到所需的成功写入数量
                    break;
                }
            }

            return new WriteResponse(false, "写入失败: 成功节点不足", level);
        } catch (Exception e) {
            return new WriteResponse(false, "写入异常: " + e.getMessage(), level);
        }
    }

    private ReadResponse collectAndResolveReads(List<CompletableFuture<DataRecord>> futures,
                                              String key, ConsistencyLevel level) {
        try {
            List<DataRecord> records = new ArrayList<>();

            for (CompletableFuture<DataRecord> future : futures) {
                try {
                    DataRecord record = future.get(5, TimeUnit.SECONDS);
                    if (record != null) {
                        records.add(record);
                    }
                } catch (Exception e) {
                    // 读取失败
                }
            }

            if (records.isEmpty()) {
                return new ReadResponse(false, null, "未找到数据", level);
            }

            // 解决读取冲突(最后写入获胜)
            DataRecord latestRecord = records.stream()
                .max(Comparator.comparingLong(DataRecord::getTimestamp))
                .orElse(records.get(0));

            // 检查数据一致性
            boolean isConsistent = records.stream()
                .allMatch(r -> r.getValue().equals(latestRecord.getValue()));

            String status = isConsistent ? "一致" : "最终一致";

            return new ReadResponse(true, latestRecord.getValue(), status, level);

        } catch (Exception e) {
            return new ReadResponse(false, null, "读取异常: " + e.getMessage(), level);
        }
    }
}

/**
 * 数据节点
 */
class DataNode {
    private final String nodeId;
    private final Map<String, DataRecord> data;
    private final boolean isAvailable;

    public DataNode(String nodeId) {
        this.nodeId = nodeId;
        this.data = new ConcurrentHashMap<>();
        this.isAvailable = true;
    }

    public boolean write(String key, String value, long timestamp) {
        if (!isAvailable) {
            throw new RuntimeException("节点不可用: " + nodeId);
        }

        DataRecord record = new DataRecord(value, timestamp);
        data.put(key, record);
        System.out.println("节点 " + nodeId + " 写入: " + key + " = " + value);
        return true;
    }

    public DataRecord read(String key) {
        if (!isAvailable) {
            throw new RuntimeException("节点不可用: " + nodeId);
        }

        return data.get(key);
    }

    public String getNodeId() { return nodeId; }
    public boolean isAvailable() { return isAvailable; }
}

/**
 * 数据记录
 */
class DataRecord {
    private final String value;
    private final long timestamp;

    public DataRecord(String value, long timestamp) {
        this.value = value;
        this.timestamp = timestamp;
    }

    public String getValue() { return value; }
    public long getTimestamp() { return timestamp; }
}

/**
 * 一致性级别
 */
enum ConsistencyLevel {
    ONE("只需一个节点"),
    QUORUM("大多数节点"),
    ALL("所有节点");

    private final String description;

    ConsistencyLevel(String description) {
        this.description = description;
    }

    public String getDescription() { return description; }
}

/**
 * 一致性哈希环
 */
class ConsistentHashRing {
    private final TreeMap<Integer, String> ring = new TreeMap<>();

    public void addNode(String nodeId) {
        int hash = nodeId.hashCode();
        ring.put(hash, nodeId);
    }

    public List<String> getResponsibleNodes(String key, int replicationFactor) {
        int keyHash = key.hashCode();
        List<String> nodes = new ArrayList<>();

        Map.Entry<Integer, String> entry = ring.ceilingEntry(keyHash);
        if (entry == null) {
            entry = ring.firstEntry();
        }

        Iterator<Map.Entry<Integer, String>> iterator = ring.tailMap(entry.getKey()).entrySet().iterator();
        int count = 0;

        while (count < replicationFactor && iterator.hasNext()) {
            nodes.add(iterator.next().getValue());
            count++;
        }

        // 如果需要更多节点,从头开始
        if (count < replicationFactor) {
            iterator = ring.entrySet().iterator();
            while (count < replicationFactor && iterator.hasNext()) {
                String nodeId = iterator.next().getValue();
                if (!nodes.contains(nodeId)) {
                    nodes.add(nodeId);
                    count++;
                }
            }
        }

        return nodes;
    }
}

/**
 * Gossip协议(简化实现)
 */
class GossipProtocol {
    private final Map<String, DataNode> nodes = new ConcurrentHashMap<>();
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    public void addNode(String nodeId, DataNode node) {
        nodes.put(nodeId, node);
        startGossip();
    }

    private void startGossip() {
        scheduler.scheduleWithFixedDelay(() -> {
            // 简化的gossip实现:定期检查节点状态
            nodes.forEach((nodeId, node) -> {
                System.out.println("Gossip: 节点 " + nodeId + " 状态: " +
                                 (node.isAvailable() ? "可用" : "不可用"));
            });
        }, 10, 30, TimeUnit.SECONDS);
    }

    public void shutdown() {
        scheduler.shutdown();
    }
}

/**
 * 写入响应
 */
class WriteResponse {
    private final boolean success;
    private final String message;
    private final ConsistencyLevel level;

    public WriteResponse(boolean success, String message, ConsistencyLevel level) {
        this.success = success;
        this.message = message;
        this.level = level;
    }

    public boolean isSuccess() { return success; }
    public String getMessage() { return message; }
    public ConsistencyLevel getLevel() { return level; }
}

/**
 * 读取响应
 */
class ReadResponse {
    private final boolean found;
    private final String value;
    private final String status;
    private final ConsistencyLevel level;

    public ReadResponse(boolean found, String value, String status, ConsistencyLevel level) {
        this.found = found;
        this.value = value;
        this.status = status;
        this.level = level;
    }

    public boolean isFound() { return found; }
    public String getValue() { return value; }
    public String getStatus() { return status; }
    public ConsistencyLevel getLevel() { return level; }
}

// Cassandra-like系统演示
class CassandraDemo {
    public static void main(String[] args) throws Exception {
        CassandraLikeDatabase db = new CassandraLikeDatabase(3);

        // 添加节点
        db.addNode("node1", new DataNode("node1"));
        db.addNode("node2", new DataNode("node2"));
        db.addNode("node3", new DataNode("node3"));
        db.addNode("node4", new DataNode("node4"));

        System.out.println("=== Cassandra-like数据库测试 ===");

        // 测试不同一致性级别的写入
        WriteResponse writeResult1 = db.write("user:1", "Alice", ConsistencyLevel.ONE);
        System.out.println("ONE级别写入: " + writeResult1.isSuccess() + " - " + writeResult1.getMessage());

        WriteResponse writeResult2 = db.write("user:2", "Bob", ConsistencyLevel.QUORUM);
        System.out.println("QUORUM级别写入: " + writeResult2.isSuccess() + " - " + writeResult2.getMessage());

        // 测试不同一致性级别的读取
        ReadResponse readResult1 = db.read("user:1", ConsistencyLevel.ONE);
        System.out.println("ONE级别读取: " + readResult1.getValue() + " - " + readResult1.getStatus());

        ReadResponse readResult2 = db.read("user:2", ConsistencyLevel.QUORUM);
        System.out.println("QUORUM级别读取: " + readResult2.getValue() + " - " + readResult2.getStatus());
    }
}

性能优化与监控

BASE系统性能分析

1
2
3
4
5
6
7
8
9
┌──────────────────┬──────────────┬──────────────┬──────────────┐
│ 性能指标         │ ACID系统     │ BASE系统     │ 优化策略      │
├──────────────────┼──────────────┼──────────────┼──────────────┤
│ 写入延迟         │ 高           │ 低           │ 异步复制      │
│ 读取延迟         │ 中等         │ 低           │ 本地读取      │
│ 吞吐量           │ 低           │ 高           │ 分片扩展      │
│ 可用性           │ 中等         │ 高           │ 多副本冗余    │
│ 一致性延迟       │ 无           │ 秒级到分钟级  │ 优化同步频率  │
└──────────────────┴──────────────┴──────────────┴──────────────┘

监控指标实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.time.LocalDateTime;
import java.util.*;

/**
 * BASE系统监控
 */
public class BaseSystemMonitor {
    private final LongAdder totalWrites = new LongAdder();
    private final LongAdder totalReads = new LongAdder();
    private final LongAdder successfulWrites = new LongAdder();
    private final LongAdder successfulReads = new LongAdder();
    private final AtomicLong maxWriteLatency = new AtomicLong(0);
    private final AtomicLong maxReadLatency = new AtomicLong(0);
    private final Map<String, AtomicLong> nodeAvailability = new ConcurrentHashMap<>();
    private final Queue<ConsistencyLagRecord> consistencyLagHistory = new ConcurrentLinkedQueue<>();

    /**
     * 记录写入操作
     */
    public void recordWrite(boolean success, long latencyMs, String nodeId) {
        totalWrites.increment();
        if (success) {
            successfulWrites.increment();
        }

        maxWriteLatency.updateAndGet(current -> Math.max(current, latencyMs));

        // 记录节点可用性
        nodeAvailability.computeIfAbsent(nodeId, k -> new AtomicLong(0))
                        .addAndGet(success ? 1 : 0);
    }

    /**
     * 记录读取操作
     */
    public void recordRead(boolean success, long latencyMs, String consistencyStatus) {
        totalReads.increment();
        if (success) {
            successfulReads.increment();
        }

        maxReadLatency.updateAndGet(current -> Math.max(current, latencyMs));

        // 记录一致性延迟
        if ("最终一致".equals(consistencyStatus)) {
            consistencyLagHistory.offer(new ConsistencyLagRecord(
                LocalDateTime.now(),
                latencyMs,
                consistencyStatus
            ));

            // 保持历史记录在合理范围内
            while (consistencyLagHistory.size() > 1000) {
                consistencyLagHistory.poll();
            }
        }
    }

    /**
     * 生成监控报告
     */
    public MonitoringReport generateReport() {
        long totalWriteCount = totalWrites.sum();
        long totalReadCount = totalReads.sum();

        double writeSuccessRate = totalWriteCount > 0 ?
            (double) successfulWrites.sum() / totalWriteCount : 0.0;
        double readSuccessRate = totalReadCount > 0 ?
            (double) successfulReads.sum() / totalReadCount : 0.0;

        Map<String, Double> nodeAvailabilityRates = new HashMap<>();
        nodeAvailability.forEach((nodeId, count) -> {
            double rate = totalWriteCount > 0 ? (double) count.get() / totalWriteCount : 0.0;
            nodeAvailabilityRates.put(nodeId, rate);
        });

        long avgConsistencyLag = consistencyLagHistory.stream()
            .mapToLong(record -> record.latencyMs)
            .sum() / Math.max(1, consistencyLagHistory.size());

        return new MonitoringReport(
            writeSuccessRate,
            readSuccessRate,
            maxWriteLatency.get(),
            maxReadLatency.get(),
            nodeAvailabilityRates,
            avgConsistencyLag,
            consistencyLagHistory.size()
        );
    }

    /**
     * 一致性延迟记录
     */
    private static class ConsistencyLagRecord {
        final LocalDateTime timestamp;
        final long latencyMs;
        final String status;

        ConsistencyLagRecord(LocalDateTime timestamp, long latencyMs, String status) {
            this.timestamp = timestamp;
            this.latencyMs = latencyMs;
            this.status = status;
        }
    }
}

/**
 * 监控报告
 */
class MonitoringReport {
    private final double writeSuccessRate;
    private final double readSuccessRate;
    private final long maxWriteLatency;
    private final long maxReadLatency;
    private final Map<String, Double> nodeAvailabilityRates;
    private final long avgConsistencyLag;
    private final int consistencyLagSamples;

    public MonitoringReport(double writeSuccessRate, double readSuccessRate,
                          long maxWriteLatency, long maxReadLatency,
                          Map<String, Double> nodeAvailabilityRates,
                          long avgConsistencyLag, int consistencyLagSamples) {
        this.writeSuccessRate = writeSuccessRate;
        this.readSuccessRate = readSuccessRate;
        this.maxWriteLatency = maxWriteLatency;
        this.maxReadLatency = maxReadLatency;
        this.nodeAvailabilityRates = nodeAvailabilityRates;
        this.avgConsistencyLag = avgConsistencyLag;
        this.consistencyLagSamples = consistencyLagSamples;
    }

    @Override
    public String toString() {
        return String.format(
            "监控报告:\n" +
            "写入成功率: %.2f%%\n" +
            "读取成功率: %.2f%%\n" +
            "最大写入延迟: %d ms\n" +
            "最大读取延迟: %d ms\n" +
            "平均一致性延迟: %d ms\n" +
            "一致性延迟样本数: %d\n" +
            "节点可用性: %s",
            writeSuccessRate * 100,
            readSuccessRate * 100,
            maxWriteLatency,
            maxReadLatency,
            avgConsistencyLag,
            consistencyLagSamples,
            nodeAvailabilityRates
        );
    }
}

总结

BASE理论为大规模分布式系统提供了实用的数据一致性模型:

核心要点

  1. 基本可用:通过冗余和降级策略保证核心功能
  2. 软状态:允许数据存在中间状态,提高系统灵活性
  3. 最终一致性:保证数据最终达到一致状态

适用场景

1
2
3
4
5
6
7
BASE理论适用场景:
├─ 大规模Web应用
├─ 内容分发网络
├─ 社交媒体平台
├─ 物联网数据收集
├─ 日志聚合系统
└─ 实时推荐系统

实践建议

  1. 场景选择:根据业务对一致性的要求选择BASE或ACID
  2. 性能优化:通过异步复制和分片提高性能
  3. 监控告警:建立完善的一致性延迟监控
  4. 冲突解决:设计合理的冲突解决策略

BASE理论为现代分布式系统提供了在可用性和一致性之间的最佳平衡点,是构建大规模系统的重要理论基础。

参考资料

  1. Pritchett, D. (2008). BASE: An Acid Alternative
  2. Gilbert, S., & Lynch, N. (2002). Brewer’s conjecture and the feasibility of consistent, available, partition-tolerant web services
  3. Vogels, W. (2009). Eventually consistent