分布式事务是指在分布式系统中,由多个独立的服务或数据库参与的事务,确保这些分布式节点上的操作能够一致完成或回滚,从而保证数据的一致性。
一、两阶段提交(2PC)
将事务分为准备阶段和提交阶段。各节点在准备阶段确认操作无误后再提交,在遇到错误时会回滚。在2PC中,有两个核心角色,协调者、参与者。
优点:能保证各参与节点数据的一致性,适用于对数据一致性要求高的场景。
缺点:性能开销大(锁定资源时间较长)、可能会出现“协调者单点故障”问题,导致系统阻塞。
准备阶段:
(1)协调者向所有参与者发送commit请求,询问是否可以提交事务,并等待答复。
(2)各参与者开始准备执行事务,将uodo log和redo log记入事务日志中,并不提交事务。
(3)如果参与者执行成功,则向协调者返回yes,否则返回no。
提交阶段:
(1)协调者向所有参与者发送正式提交事务请求(即:commit请求)。
(2)参与者收到协调者的commit请求后,参与者正式执行事务提交操作,并释放整个事务期间占用的资源。
(3)参与者完成事务提交后,向协调者发送ACK消息。
(4)协调者收到所有参与者反馈的ACK消息后,完成事务。
事务回滚:
如果任意一个参与者在第一阶段返回中止信息,或者由于超时协调者无法获取到所有参与者的信息,那么这个事务将会被回滚。
(1)协调者向所有参与者发送回滚请求(即:rollback请求)。
(2)参与者收到协调者发送的回滚请求后,参与者使用第一阶段中的undo log信息执行回滚操作,并释放在整个事务期间占用的资源。
(3)参与者在执行完回滚操作之后,向协调者发送ACK信息。
(4)协调者受到所有参与者反馈的信息后,取消事务。
基于XAResource实现:
XAResource 是 Java Transaction API (JTA) 中的一个接口,用于支持分布式事务处理。在 MySQL 中XAResource 提供了一种管理和协调事务的方法,以便在多个资源(例如数据库、消息队列等)之间实现一致性和原子性。
public static void main(String[] args) throws SQLException {
//true表示打印XA语句,,用于调试
boolean logXaCommands = true;
// 获得资源管理器操作接口实例 RM1
Connection conn1 = DriverManager.getConnection
("jdbc:mysql://localhost:3306/db_order", "root", "root");
XAConnection xaConn1 = new MysqlXAConnection(
(com.mysql.jdbc.Connection) conn1, logXaCommands);
XAResource rm1 = xaConn1.getXAResource();
// 获得资源管理器操作接口实例 RM2
Connection conn2 = DriverManager.getConnection
("jdbc:mysql://localhost:3306/db_storage", "root", "root");
XAConnection xaConn2 = new MysqlXAConnection(
(com.mysql.jdbc.Connection) conn2, logXaCommands);
XAResource rm2 = xaConn2.getXAResource();
// AP请求TM执行一个分布式事务,TM生成全局事务id
byte[] gtrid = "g12345".getBytes();
int formatId = 1;
try {
// ==============分别执行RM1和RM2上的事务分支====================
// TM生成rm1上的事务分支id
byte[] bqual1 = "b00001".getBytes();
Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
// 执行rm1上的事务分支
rm1.start(xid1, XAResource.TMNOFLAGS);//One of TMNOFLAGS, TMJOIN, or TMRESUME.
PreparedStatement ps1 = conn1.prepareStatement(
"INSERT into order_tbl(user_id,commodity_code,count,money,status)
VALUES (1001,2001,2,10,1)");
ps1.execute();
rm1.end(xid1, XAResource.TMSUCCESS);
// TM生成rm2上的事务分支id
byte[] bqual2 = "b00002".getBytes();
Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
// 执行rm2上的事务分支
rm2.start(xid2, XAResource.TMNOFLAGS);
PreparedStatement ps2 = conn2.prepareStatement(
"update stock_tbl set count=count-2 where commodity_code=2001");
ps2.execute();
rm2.end(xid2, XAResource.TMSUCCESS);
// ===================两阶段提交================================
// phase1:询问所有的RM 准备提交事务分支
int rm1_prepare = rm1.prepare(xid1);
int rm2_prepare = rm2.prepare(xid2);
// phase2:提交所有事务分支
boolean onePhase = false;
//TM判断有2个事务分支,所以不能优化为一阶段提交
if (rm1_prepare == XAResource.XA_OK
&& rm2_prepare == XAResource.XA_OK) {
//所有事务分支都prepare成功,提交所有事务分支
rm1.commit(xid1, onePhase);
rm2.commit(xid2, onePhase);
} else {
//如果有事务分支没有成功,则回滚
rm1.rollback(xid1);
rm2.rollback(xid2);
}
} catch (XAException e) {
// 如果出现异常,也要进行回滚
e.printStackTrace();
}
}