1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
| /**
* InfluxDB数据节点
*/
public class InfluxDBDataNode {
private final String nodeId;
private final String metaNodeAddress;
private final Map<String, Shard> localShards = new ConcurrentHashMap<>();
// 时序数据存储
private final Map<String, TSMFile> tsmFiles = new ConcurrentHashMap<>();
private final WAL wal = new WAL();
// Anti-Entropy相关
private final ScheduledExecutorService antiEntropyScheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<String, Long> lastSyncTime = new ConcurrentHashMap<>();
// 配置参数
private final long antiEntropyInterval = 300000; // 5分钟
private final int maxPointsPerWrite = 10000;
public InfluxDBDataNode(String nodeId, String metaNodeAddress) {
this.nodeId = nodeId;
this.metaNodeAddress = metaNodeAddress;
// 启动Anti-Entropy定时任务
antiEntropyScheduler.scheduleAtFixedRate(
this::performAntiEntropy,
antiEntropyInterval,
antiEntropyInterval,
TimeUnit.MILLISECONDS
);
}
/**
* 写入时序数据点
*/
public CompletableFuture<WriteResult> writePoints(String database, List<Point> points) {
return CompletableFuture.supplyAsync(() -> {
try {
// 按时间分组到不同的分片
Map<String, List<Point>> pointsByShards = groupPointsByShards(database, points);
List<WriteResult> results = new ArrayList<>();
for (Map.Entry<String, List<Point>> entry : pointsByShards.entrySet()) {
String shardId = entry.getKey();
List<Point> shardPoints = entry.getValue();
WriteResult result = writePointsToShard(shardId, shardPoints);
results.add(result);
}
return aggregateWriteResults(results);
} catch (Exception e) {
return new WriteResult(false, 0, e.getMessage());
}
});
}
/**
* 按分片分组数据点
*/
private Map<String, List<Point>> groupPointsByShards(String database, List<Point> points) {
Map<String, List<Point>> result = new HashMap<>();
for (Point point : points) {
String shardId = findShardForPoint(database, point);
result.computeIfAbsent(shardId, k -> new ArrayList<>()).add(point);
}
return result;
}
/**
* 查找数据点对应的分片
*/
private String findShardForPoint(String database, Point point) {
long timestamp = point.getTimestamp();
// 简化实现:基于时间戳找到对应的分片
for (Shard shard : localShards.values()) {
if (shard.getDatabase().equals(database) && shard.containsTime(timestamp)) {
return shard.getId();
}
}
// 如果没有找到分片,创建新的分片
return createShardForTime(database, timestamp);
}
/**
* 为指定时间创建分片
*/
private String createShardForTime(String database, long timestamp) {
// 计算分片时间窗口(例如:每小时一个分片)
long shardDuration = 3600000; // 1小时
long startTime = (timestamp / shardDuration) * shardDuration;
long endTime = startTime + shardDuration;
String shardId = database + "_" + startTime + "_" + endTime;
Shard shard = new Shard(shardId, database, startTime, endTime, Arrays.asList(nodeId));
localShards.put(shardId, shard);
return shardId;
}
/**
* 写入数据点到分片
*/
private WriteResult writePointsToShard(String shardId, List<Point> points) {
try {
Shard shard = localShards.get(shardId);
if (shard == null) {
return new WriteResult(false, 0, "分片不存在: " + shardId);
}
// 首先写入WAL
for (Point point : points) {
wal.writePoint(shardId, point);
}
// 写入内存中的分片
int writtenCount = 0;
for (Point point : points) {
if (shard.addPoint(point)) {
writtenCount++;
}
}
// 检查是否需要刷新到TSM文件
if (shard.shouldFlush()) {
flushShardToTSM(shard);
}
// 异步复制到其他副本
replicateToOtherNodes(shardId, points);
return new WriteResult(true, writtenCount, "写入成功");
} catch (Exception e) {
return new WriteResult(false, 0, "写入失败: " + e.getMessage());
}
}
/**
* 刷新分片到TSM文件
*/
private void flushShardToTSM(Shard shard) {
try {
String fileName = shard.getId() + "_" + System.currentTimeMillis() + ".tsm";
TSMFile tsmFile = new TSMFile(fileName);
// 将分片中的数据写入TSM文件
for (Series series : shard.getAllSeries()) {
tsmFile.writeSeries(series);
}
tsmFile.flush();
tsmFiles.put(fileName, tsmFile);
// 清空分片内存数据
shard.clearMemoryData();
System.out.println("分片 " + shard.getId() + " 刷新到TSM文件: " + fileName);
} catch (Exception e) {
System.err.println("刷新分片到TSM文件失败: " + e.getMessage());
}
}
/**
* 复制到其他节点
*/
private void replicateToOtherNodes(String shardId, List<Point> points) {
Shard shard = localShards.get(shardId);
if (shard == null) {
return;
}
// 获取分片的其他副本节点
List<String> otherNodes = shard.getOwners().stream()
.filter(owner -> !owner.equals(nodeId))
.collect(Collectors.toList());
if (otherNodes.isEmpty()) {
return;
}
// 异步复制到其他节点
CompletableFuture.runAsync(() -> {
for (String nodeId : otherNodes) {
try {
sendPointsToNode(nodeId, shardId, points);
} catch (Exception e) {
System.err.println("复制到节点 " + nodeId + " 失败: " + e.getMessage());
}
}
});
}
/**
* 发送数据点到指定节点
*/
private void sendPointsToNode(String targetNodeId, String shardId, List<Point> points) {
// 模拟网络发送
try {
Thread.sleep(10); // 模拟网络延迟
System.out.println("复制 " + points.size() + " 个数据点到节点 " + targetNodeId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* 查询时序数据
*/
public CompletableFuture<QueryResult> queryPoints(String database, String measurement,
long startTime, long endTime,
Map<String, String> tags) {
return CompletableFuture.supplyAsync(() -> {
try {
List<Point> result = new ArrayList<>();
// 查找相关分片
List<Shard> relevantShards = findShardsForTimeRange(database, startTime, endTime);
for (Shard shard : relevantShards) {
List<Point> shardPoints = queryPointsFromShard(shard, measurement, startTime, endTime, tags);
result.addAll(shardPoints);
}
// 按时间排序
result.sort(Comparator.comparing(Point::getTimestamp));
return new QueryResult(true, result, "查询成功");
} catch (Exception e) {
return new QueryResult(false, Collections.emptyList(), "查询失败: " + e.getMessage());
}
});
}
/**
* 查找时间范围内的分片
*/
private List<Shard> findShardsForTimeRange(String database, long startTime, long endTime) {
return localShards.values().stream()
.filter(shard -> shard.getDatabase().equals(database))
.filter(shard -> shard.overlapsTimeRange(startTime, endTime))
.collect(Collectors.toList());
}
/**
* 从分片查询数据点
*/
private List<Point> queryPointsFromShard(Shard shard, String measurement,
long startTime, long endTime,
Map<String, String> tags) {
List<Point> result = new ArrayList<>();
// 从内存数据查询
List<Point> memoryPoints = shard.queryPoints(measurement, startTime, endTime, tags);
result.addAll(memoryPoints);
// 从TSM文件查询
for (TSMFile tsmFile : tsmFiles.values()) {
if (tsmFile.getShardId().equals(shard.getId())) {
List<Point> tsmPoints = tsmFile.queryPoints(measurement, startTime, endTime, tags);
result.addAll(tsmPoints);
}
}
return result;
}
/**
* 执行Anti-Entropy
*/
private void performAntiEntropy() {
System.out.println("节点 " + nodeId + " 开始执行Anti-Entropy");
for (Shard shard : localShards.values()) {
try {
performShardAntiEntropy(shard);
} catch (Exception e) {
System.err.println("分片 " + shard.getId() + " Anti-Entropy失败: " + e.getMessage());
}
}
}
/**
* 对单个分片执行Anti-Entropy
*/
private void performShardAntiEntropy(Shard shard) {
List<String> otherNodes = shard.getOwners().stream()
.filter(owner -> !owner.equals(nodeId))
.collect(Collectors.toList());
if (otherNodes.isEmpty()) {
return;
}
for (String otherNode : otherNodes) {
try {
// 获取其他节点的分片校验和
String otherChecksum = getShardChecksumFromNode(otherNode, shard.getId());
String localChecksum = calculateShardChecksum(shard);
if (!localChecksum.equals(otherChecksum)) {
System.out.println("检测到分片 " + shard.getId() + " 与节点 " + otherNode + " 不一致");
repairShardWithNode(shard, otherNode);
}
} catch (Exception e) {
System.err.println("与节点 " + otherNode + " 进行Anti-Entropy失败: " + e.getMessage());
}
}
}
/**
* 获取其他节点的分片校验和
*/
private String getShardChecksumFromNode(String nodeId, String shardId) {
// 模拟网络请求
return "checksum_" + nodeId + "_" + shardId + "_" + System.currentTimeMillis();
}
/**
* 计算分片校验和
*/
private String calculateShardChecksum(Shard shard) {
// 简化的校验和计算
long pointCount = shard.getPointCount();
long lastModified = shard.getLastModified();
return "checksum_" + shard.getId() + "_" + pointCount + "_" + lastModified;
}
/**
* 修复分片与其他节点的差异
*/
private void repairShardWithNode(Shard shard, String otherNode) {
System.out.println("开始修复分片 " + shard.getId() + " 与节点 " + otherNode + " 的差异");
// 获取差异数据
List<Point> missingPoints = getMissingPointsFromNode(otherNode, shard.getId());
// 应用差异数据
for (Point point : missingPoints) {
shard.addPoint(point);
}
System.out.println("修复完成,添加了 " + missingPoints.size() + " 个数据点");
}
/**
* 从其他节点获取缺失的数据点
*/
private List<Point> getMissingPointsFromNode(String nodeId, String shardId) {
// 模拟从其他节点获取数据
List<Point> points = new ArrayList<>();
// 生成一些模拟数据
for (int i = 0; i < 5; i++) {
Point point = new Point(
"temperature",
Map.of("location", "room" + i),
Map.of("value", 20.0 + i),
System.currentTimeMillis() + i * 1000
);
points.add(point);
}
return points;
}
/**
* 聚合写入结果
*/
private WriteResult aggregateWriteResults(List<WriteResult> results) {
boolean allSuccess = results.stream().allMatch(WriteResult::isSuccess);
int totalWritten = results.stream().mapToInt(WriteResult::getPointsWritten).sum();
if (allSuccess) {
return new WriteResult(true, totalWritten, "所有写入成功");
} else {
String errorMsg = results.stream()
.filter(r -> !r.isSuccess())
.map(WriteResult::getMessage)
.collect(Collectors.joining("; "));
return new WriteResult(false, totalWritten, "部分写入失败: " + errorMsg);
}
}
/**
* 获取节点状态
*/
public DataNodeStatus getStatus() {
long totalPoints = localShards.values().stream()
.mapToLong(Shard::getPointCount)
.sum();
return new DataNodeStatus(
nodeId,
localShards.size(),
totalPoints,
tsmFiles.size(),
wal.getSize()
);
}
/**
* 关闭数据节点
*/
public void shutdown() {
antiEntropyScheduler.shutdown();
// 刷新所有分片
for (Shard shard : localShards.values()) {
if (shard.hasMemoryData()) {
flushShardToTSM(shard);
}
}
// 关闭WAL
wal.close();
// 关闭TSM文件
for (TSMFile tsmFile : tsmFiles.values()) {
tsmFile.close();
}
}
}
/**
* 数据点
*/
class Point {
private final String measurement;
private final Map<String, String> tags;
private final Map<String, Object> fields;
private final long timestamp;
public Point(String measurement, Map<String, String> tags, Map<String, Object> fields, long timestamp) {
this.measurement = measurement;
this.tags = new HashMap<>(tags);
this.fields = new HashMap<>(fields);
this.timestamp = timestamp;
}
// Getters
public String getMeasurement() { return measurement; }
public Map<String, String> getTags() { return new HashMap<>(tags); }
public Map<String, Object> getFields() { return new HashMap<>(fields); }
public long getTimestamp() { return timestamp; }
@Override
public String toString() {
return String.format("Point{measurement='%s', tags=%s, fields=%s, timestamp=%d}",
measurement, tags, fields, timestamp);
}
}
/**
* 分片实现
*/
class Shard {
private final String id;
private final String database;
private final long startTime;
private final long endTime;
private final List<String> owners;
private final Map<String, Series> series = new ConcurrentHashMap<>();
private final AtomicLong pointCount = new AtomicLong(0);
private volatile long lastModified = System.currentTimeMillis();
public Shard(String id, String database, long startTime, long endTime, List<String> owners) {
this.id = id;
this.database = database;
this.startTime = startTime;
this.endTime = endTime;
this.owners = new ArrayList<>(owners);
}
/**
* 添加数据点
*/
public boolean addPoint(Point point) {
if (!containsTime(point.getTimestamp())) {
return false;
}
String seriesKey = generateSeriesKey(point.getMeasurement(), point.getTags());
Series targetSeries = series.computeIfAbsent(seriesKey,
k -> new Series(point.getMeasurement(), point.getTags()));
targetSeries.addPoint(point);
pointCount.incrementAndGet();
lastModified = System.currentTimeMillis();
return true;
}
/**
* 检查时间是否在分片范围内
*/
public boolean containsTime(long timestamp) {
return timestamp >= startTime && timestamp < endTime;
}
/**
* 检查时间范围是否重叠
*/
public boolean overlapsTimeRange(long start, long end) {
return !(end <= startTime || start >= endTime);
}
/**
* 查询数据点
*/
public List<Point> queryPoints(String measurement, long startTime, long endTime, Map<String, String> tags) {
List<Point> result = new ArrayList<>();
for (Series s : series.values()) {
if (s.matches(measurement, tags)) {
List<Point> seriesPoints = s.getPointsInRange(startTime, endTime);
result.addAll(seriesPoints);
}
}
return result;
}
/**
* 检查是否需要刷新
*/
public boolean shouldFlush() {
return pointCount.get() > 100000 || // 超过10万个点
System.currentTimeMillis() - lastModified > 300000; // 超过5分钟
}
/**
* 清空内存数据
*/
public void clearMemoryData() {
series.clear();
pointCount.set(0);
}
/**
* 检查是否有内存数据
*/
public boolean hasMemoryData() {
return !series.isEmpty();
}
/**
* 生成系列键
*/
private String generateSeriesKey(String measurement, Map<String, String> tags) {
StringBuilder sb = new StringBuilder(measurement);
tags.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEach(entry -> sb.append(",").append(entry.getKey()).append("=").append(entry.getValue()));
return sb.toString();
}
// Getters
public String getId() { return id; }
public String getDatabase() { return database; }
public long getStartTime() { return startTime; }
public long getEndTime() { return endTime; }
public List<String> getOwners() { return new ArrayList<>(owners); }
public long getPointCount() { return pointCount.get(); }
public long getLastModified() { return lastModified; }
public Collection<Series> getAllSeries() { return new ArrayList<>(series.values()); }
}
/**
* 时序数据系列
*/
class Series {
private final String measurement;
private final Map<String, String> tags;
private final List<Point> points = new ArrayList<>();
public Series(String measurement, Map<String, String> tags) {
this.measurement = measurement;
this.tags = new HashMap<>(tags);
}
/**
* 添加数据点
*/
public synchronized void addPoint(Point point) {
points.add(point);
// 保持按时间排序
points.sort(Comparator.comparing(Point::getTimestamp));
}
/**
* 检查是否匹配查询条件
*/
public boolean matches(String measurement, Map<String, String> queryTags) {
if (!this.measurement.equals(measurement)) {
return false;
}
for (Map.Entry<String, String> entry : queryTags.entrySet()) {
String tagValue = tags.get(entry.getKey());
if (!entry.getValue().equals(tagValue)) {
return false;
}
}
return true;
}
/**
* 获取时间范围内的数据点
*/
public List<Point> getPointsInRange(long startTime, long endTime) {
return points.stream()
.filter(point -> point.getTimestamp() >= startTime && point.getTimestamp() < endTime)
.collect(Collectors.toList());
}
// Getters
public String getMeasurement() { return measurement; }
public Map<String, String> getTags() { return new HashMap<>(tags); }
public List<Point> getPoints() { return new ArrayList<>(points); }
}
/**
* WAL (Write-Ahead Log)
*/
class WAL {
private final Map<String, List<Point>> walData = new ConcurrentHashMap<>();
public void writePoint(String shardId, Point point) {
walData.computeIfAbsent(shardId, k -> new ArrayList<>()).add(point);
}
public long getSize() {
return walData.values().stream().mapToLong(List::size).sum();
}
public void close() {
walData.clear();
}
}
/**
* TSM文件
*/
class TSMFile {
private final String fileName;
private final String shardId;
private final Map<String, Series> storedSeries = new HashMap<>();
public TSMFile(String fileName) {
this.fileName = fileName;
this.shardId = extractShardIdFromFileName(fileName);
}
public void writeSeries(Series series) {
storedSeries.put(generateSeriesKey(series), series);
}
public void flush() {
// 模拟写入磁盘
System.out.println("TSM文件 " + fileName + " 写入磁盘,包含 " + storedSeries.size() + " 个系列");
}
public List<Point> queryPoints(String measurement, long startTime, long endTime, Map<String, String> tags) {
List<Point> result = new ArrayList<>();
for (Series series : storedSeries.values()) {
if (series.matches(measurement, tags)) {
List<Point> seriesPoints = series.getPointsInRange(startTime, endTime);
result.addAll(seriesPoints);
}
}
return result;
}
public void close() {
storedSeries.clear();
}
private String extractShardIdFromFileName(String fileName) {
return fileName.substring(0, fileName.lastIndexOf('_'));
}
private String generateSeriesKey(Series series) {
StringBuilder sb = new StringBuilder(series.getMeasurement());
series.getTags().entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEach(entry -> sb.append(",").append(entry.getKey()).append("=").append(entry.getValue()));
return sb.toString();
}
public String getShardId() { return shardId; }
public String getFileName() { return fileName; }
}
/**
* 写入结果
*/
class WriteResult {
private final boolean success;
private final int pointsWritten;
private final String message;
public WriteResult(boolean success, int pointsWritten, String message) {
this.success = success;
this.pointsWritten = pointsWritten;
this.message = message;
}
// Getters
public boolean isSuccess() { return success; }
public int getPointsWritten() { return pointsWritten; }
public String getMessage() { return message; }
}
/**
* 查询结果
*/
class QueryResult {
private final boolean success;
private final List<Point> points;
private final String message;
public QueryResult(boolean success, List<Point> points, String message) {
this.success = success;
this.points = new ArrayList<>(points);
this.message = message;
}
// Getters
public boolean isSuccess() { return success; }
public List<Point> getPoints() { return new ArrayList<>(points); }
public String getMessage() { return message; }
}
/**
* 数据节点状态
*/
class DataNodeStatus {
private final String nodeId;
private final int shardCount;
private final long totalPoints;
private final int tsmFileCount;
private final long walSize;
public DataNodeStatus(String nodeId, int shardCount, long totalPoints, int tsmFileCount, long walSize) {
this.nodeId = nodeId;
this.shardCount = shardCount;
this.totalPoints = totalPoints;
this.tsmFileCount = tsmFileCount;
this.walSize = walSize;
}
// Getters
public String getNodeId() { return nodeId; }
public int getShardCount() { return shardCount; }
public long getTotalPoints() { return totalPoints; }
public int getTsmFileCount() { return tsmFileCount; }
public long getWalSize() { return walSize; }
@Override
public String toString() {
return String.format("DataNodeStatus{nodeId='%s', shards=%d, points=%d, tsmFiles=%d, walSize=%d}",
nodeId, shardCount, totalPoints, tsmFileCount, walSize);
}
}
|