1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
| import threading
import time
from enum import Enum
class IsolationLevel(Enum):
READ_UNCOMMITTED = "读未提交"
READ_COMMITTED = "读已提交"
REPEATABLE_READ = "可重复读"
SERIALIZABLE = "序列化"
class Lock:
"""锁对象"""
def __init__(self, lock_type, resource, owner):
self.type = lock_type # SHARED, EXCLUSIVE
self.resource = resource
self.owner = owner
self.timestamp = time.time()
class IsolationManager:
"""隔离性管理器"""
def __init__(self, isolation_level=IsolationLevel.READ_COMMITTED):
self.isolation_level = isolation_level
self.locks = {} # resource -> Lock
self.waiting_transactions = {} # tx_id -> [resources]
self.lock_table = threading.Lock()
self.version_store = {} # 多版本并发控制
def acquire_lock(self, tx_id, resource, lock_type):
"""获取锁"""
with self.lock_table:
# 检查是否已有锁
if resource in self.locks:
existing_lock = self.locks[resource]
# 如果是同一个事务,检查锁升级
if existing_lock.owner == tx_id:
if existing_lock.type == "SHARED" and lock_type == "EXCLUSIVE":
# 锁升级
existing_lock.type = "EXCLUSIVE"
print(f"事务 {tx_id} 锁升级: {resource}")
return True
return True
# 共享锁兼容性检查
if existing_lock.type == "SHARED" and lock_type == "SHARED":
# 可以共享
print(f"事务 {tx_id} 获取共享锁: {resource}")
return True
# 锁冲突,加入等待队列
if tx_id not in self.waiting_transactions:
self.waiting_transactions[tx_id] = []
self.waiting_transactions[tx_id].append(resource)
print(f"事务 {tx_id} 等待锁: {resource}")
return False
# 获取新锁
self.locks[resource] = Lock(lock_type, resource, tx_id)
print(f"事务 {tx_id} 获取{lock_type}锁: {resource}")
return True
def release_lock(self, tx_id, resource):
"""释放锁"""
with self.lock_table:
if resource in self.locks and self.locks[resource].owner == tx_id:
del self.locks[resource]
print(f"事务 {tx_id} 释放锁: {resource}")
# 检查等待队列
self._check_waiting_transactions(resource)
def _check_waiting_transactions(self, resource):
"""检查等待事务队列"""
for tx_id, waiting_resources in self.waiting_transactions.items():
if resource in waiting_resources:
waiting_resources.remove(resource)
print(f"唤醒等待事务 {tx_id}")
def read_with_isolation(self, tx_id, resource):
"""根据隔离级别执行读操作"""
if self.isolation_level == IsolationLevel.READ_UNCOMMITTED:
return self._read_uncommitted(resource)
elif self.isolation_level == IsolationLevel.READ_COMMITTED:
return self._read_committed(tx_id, resource)
elif self.isolation_level == IsolationLevel.REPEATABLE_READ:
return self._repeatable_read(tx_id, resource)
elif self.isolation_level == IsolationLevel.SERIALIZABLE:
return self._serializable_read(tx_id, resource)
def _read_uncommitted(self, resource):
"""读未提交 - 直接读取当前值"""
# 不需要锁,直接读取
return f"读取{resource}的当前值"
def _read_committed(self, tx_id, resource):
"""读已提交 - 只读取已提交的值"""
if self.acquire_lock(tx_id, resource, "SHARED"):
value = f"读取{resource}的已提交值"
# 读完立即释放共享锁
self.release_lock(tx_id, resource)
return value
return None
def _repeatable_read(self, tx_id, resource):
"""可重复读 - 事务期间保持读锁"""
if self.acquire_lock(tx_id, resource, "SHARED"):
# 保持锁直到事务结束
return f"读取{resource}的快照值"
return None
def _serializable_read(self, tx_id, resource):
"""序列化 - 最严格的隔离级别"""
if self.acquire_lock(tx_id, resource, "SHARED"):
# 使用范围锁防止幻读
range_resource = f"{resource}_range"
if self.acquire_lock(tx_id, range_resource, "SHARED"):
return f"读取{resource}的序列化值"
return None
class ConcurrentTransactionDemo:
"""并发事务演示"""
def __init__(self, isolation_level=IsolationLevel.READ_COMMITTED):
self.isolation_manager = IsolationManager(isolation_level)
self.data_store = {"account_1": 1000, "account_2": 2000}
self.transaction_locks = {}
def simulate_transaction(self, tx_id, operations, delay=0.1):
"""模拟事务执行"""
print(f"\n=== 事务 {tx_id} 开始 ===")
self.transaction_locks[tx_id] = []
try:
for operation in operations:
time.sleep(delay) # 模拟操作延迟
if operation['type'] == 'READ':
resource = operation['resource']
lock_acquired = self.isolation_manager.acquire_lock(
tx_id, resource, "SHARED")
if lock_acquired:
self.transaction_locks[tx_id].append(resource)
value = self.isolation_manager.read_with_isolation(tx_id, resource)
print(f"事务 {tx_id} 读取 {resource}: {value}")
elif operation['type'] == 'write':
resource = operation['resource']
lock_acquired = self.isolation_manager.acquire_lock(
tx_id, resource, "EXCLUSIVE")
if lock_acquired:
self.transaction_locks[tx_id].append(resource)
self.data_store[resource] = operation['value']
print(f"事务 {tx_id} 写入 {resource}: {operation['value']}")
except Exception as e:
print(f"事务 {tx_id} 异常: {e}")
finally:
# 释放所有锁
for resource in self.transaction_locks.get(tx_id, []):
self.isolation_manager.release_lock(tx_id, resource)
print(f"事务 {tx_id} 完成")
# 隔离性测试
def test_isolation():
"""测试不同隔离级别的效果"""
print("=== 测试隔离性 ===")
# 测试读已提交隔离级别
demo = ConcurrentTransactionDemo(IsolationLevel.READ_COMMITTED)
# 定义两个并发事务
tx1_operations = [
{'type': 'read', 'resource': 'account_1'},
{'type': 'write', 'resource': 'account_1', 'value': 1500},
{'type': 'read', 'resource': 'account_2'},
]
tx2_operations = [
{'type': 'read', 'resource': 'account_1'},
{'type': 'write', 'resource': 'account_2', 'value': 2500},
{'type': 'read', 'resource': 'account_1'},
]
# 创建并启动并发事务
thread1 = threading.Thread(
target=demo.simulate_transaction,
args=("TX1", tx1_operations, 0.2)
)
thread2 = threading.Thread(
target=demo.simulate_transaction,
args=("TX2", tx2_operations, 0.15)
)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(f"\n最终数据状态: {demo.data_store}")
# 运行测试
test_isolation()
|