1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
| import java.io.Serializable;
import java.util.*;
import java.util.concurrent.*;
/**
* ZAB协议中的事务ID
*/
public class ZXID implements Comparable<ZXID>, Serializable {
private final long epoch; // 选举轮次(高32位)
private final long counter; // 事务计数器(低32位)
public ZXID(long epoch, long counter) {
this.epoch = epoch;
this.counter = counter;
}
/**
* 从long值构造ZXID
*/
public static ZXID fromLong(long zxid) {
long epoch = zxid >> 32;
long counter = zxid & 0xFFFFFFFFL;
return new ZXID(epoch, counter);
}
/**
* 转换为long值
*/
public long toLong() {
return (epoch << 32) | counter;
}
/**
* 获取下一个ZXID
*/
public ZXID next() {
return new ZXID(epoch, counter + 1);
}
/**
* 新epoch的第一个ZXID
*/
public static ZXID newEpoch(long epoch) {
return new ZXID(epoch, 0);
}
@Override
public int compareTo(ZXID other) {
if (this.epoch != other.epoch) {
return Long.compare(this.epoch, other.epoch);
}
return Long.compare(this.counter, other.counter);
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
ZXID zxid = (ZXID) obj;
return epoch == zxid.epoch && counter == zxid.counter;
}
@Override
public int hashCode() {
return Objects.hash(epoch, counter);
}
@Override
public String toString() {
return String.format("0x%x%08x", epoch, counter);
}
// Getters
public long getEpoch() { return epoch; }
public long getCounter() { return counter; }
}
/**
* ZAB事务提议
*/
public class Proposal implements Serializable {
private final ZXID zxid;
private final byte[] data;
private final String type;
private final long timestamp;
public Proposal(ZXID zxid, byte[] data, String type) {
this.zxid = zxid;
this.data = data.clone();
this.type = type;
this.timestamp = System.currentTimeMillis();
}
// Getters
public ZXID getZxid() { return zxid; }
public byte[] getData() { return data.clone(); }
public String getType() { return type; }
public long getTimestamp() { return timestamp; }
@Override
public String toString() {
return String.format("Proposal{zxid=%s, type='%s', dataSize=%d}",
zxid, type, data.length);
}
}
/**
* ZAB消息类型
*/
public enum ZabMessageType {
// 选举相关
LOOKING, // 寻找Leader
FOLLOWING, // 跟随Leader
LEADING, // 作为Leader
OBSERVING, // 观察者模式
// 同步相关
LEADERINFO, // Leader信息
ACKEPOCH, // 确认epoch
DIFF, // 差异同步
SNAP, // 快照同步
NEWLEADER, // 新Leader通知
UPTODATE, // 同步完成
// 广播相关
PROPOSAL, // 事务提议
ACK, // 确认
COMMIT, // 提交
PING, // 心跳
REVALIDATE // 重新验证
}
/**
* ZAB协议消息
*/
public class ZabMessage implements Serializable {
private final ZabMessageType type;
private final String senderId;
private final ZXID zxid;
private final long epoch;
private final byte[] data;
private final long timestamp;
public ZabMessage(ZabMessageType type, String senderId, ZXID zxid, long epoch, byte[] data) {
this.type = type;
this.senderId = senderId;
this.zxid = zxid;
this.epoch = epoch;
this.data = data != null ? data.clone() : null;
this.timestamp = System.currentTimeMillis();
}
public ZabMessage(ZabMessageType type, String senderId, ZXID zxid, long epoch) {
this(type, senderId, zxid, epoch, null);
}
// Getters
public ZabMessageType getType() { return type; }
public String getSenderId() { return senderId; }
public ZXID getZxid() { return zxid; }
public long getEpoch() { return epoch; }
public byte[] getData() { return data != null ? data.clone() : null; }
public long getTimestamp() { return timestamp; }
@Override
public String toString() {
return String.format("ZabMessage{type=%s, sender='%s', zxid=%s, epoch=%d}",
type, senderId, zxid, epoch);
}
}
|