首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

Atomikos详解:数据库XA规范与Atomikos使用与源码分析

  • 25-03-05 20:46
  • 3475
  • 11767
blog.csdn.net

文章目录

  • 一、认识2PC - 两阶段提交
    • 1、理论
    • 2、手撸XA-两阶段提交
      • (1)时序图
      • (2)代码实例
    • 3、认识JTA
    • 4、今天的主角:Atomikos
    • 5、2PC存在的问题
  • 二、Atomikos使用
    • 1、依赖+配置
    • 2、定义AtomikosDataSourceBean数据源
    • 3、定义事务管理器JtaTransactionManager
    • 4、MyBatis配置
    • 5、验证
  • 三、Atomikos源码分析
    • 1、@Transactional入口:TransactionInterceptor创建事务流程
    • 2、启动事务
    • 3、小总结:启动全局事务流程图
    • 4、分支事务,业务流程执行过程
    • 5、事务提交与回滚

一、认识2PC - 两阶段提交

1、理论

理论性的东西,懒得再打一遍了,贴在这了:
分布式事务详解【分布式事务的几种解决方案】彻底搞懂分布式事务

关键的两张图:
下图展示了2PC的两个阶段,分成功和失败两个情况说明:
成功情况:
在这里插入图片描述

失败情况:
在这里插入图片描述

2、手撸XA-两阶段提交

(1)时序图

在这里插入图片描述

(2)代码实例

import com.mysql.cj.jdbc.JdbcConnection;
import com.mysql.cj.jdbc.MysqlXAConnection;
import com.mysql.cj.jdbc.MysqlXid;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

@SpringBootTest
public class MysqlXaTest {

    @Test
    public void testXa() {
        try {
            //获取员工库的连接以及资源管理器
            JdbcConnection employeeConnection = (JdbcConnection) DriverManager.getConnection("jdbc:mysql://localhost:3306/employee", "root", "rootroot");
            MysqlXAConnection employeeXAConnection = new MysqlXAConnection(employeeConnection, true);
            XAResource employeeXaResource = employeeXAConnection.getXAResource();

            //获取的员工薪资库的连接以及资源管理器
            JdbcConnection salaryConnection = (JdbcConnection) DriverManager.getConnection("jdbc:mysql://localhost:3306/salary", "root", "rootroot");
            MysqlXAConnection salaryXAConnection = new MysqlXAConnection(salaryConnection, true);
            XAResource salaryXaResource = salaryXAConnection.getXAResource();


            // 全局事务id
            byte[] gtrid = "g00003".getBytes();
            // 分支事务id
            byte[] bqual = "b00001".getBytes();
            // 标识,一般是个固定值
            int formatId = 1;

            //开启员工插入的分支事务
            Xid employeeXid = new MysqlXid(gtrid, bqual, formatId);
            employeeXaResource.start(employeeXid, XAResource.TMNOFLAGS);
            PreparedStatement preparedStatement = employeeConnection.prepareStatement("insert into employee (name, sex, level) values ('小10', '女', '7')");
            preparedStatement.execute();
            employeeXaResource.end(employeeXid, XAResource.TMSUCCESS);

            //开启员工薪资的分支事务
            byte[] salaryBqual = "b00002".getBytes();
            Xid salaryXid = new MysqlXid(gtrid, salaryBqual, formatId);
            salaryXaResource.start(salaryXid, XAResource.TMNOFLAGS);
            PreparedStatement salaryPreparedStatement = salaryConnection.prepareStatement("insert into employee_salary (employee_id, salary) values ('12', 7000)");
            salaryPreparedStatement.execute();
            salaryXaResource.end(salaryXid, XAResource.TMSUCCESS);

            //第一阶段-准备阶段
            int employeePrepareResult = employeeXaResource.prepare(employeeXid);
            int salaryPrepareResult = salaryXaResource.prepare(salaryXid);

            //第二阶段-根据准备阶段的结果。判断是要执行commit还是rollback
            if (employeePrepareResult == XAResource.XA_OK && salaryPrepareResult == XAResource.XA_OK) {
                employeeXaResource.commit(employeeXid, false);
                salaryXaResource.commit(salaryXid, false);
            } else {
                employeeXaResource.rollback(employeeXid);
                salaryXaResource.rollback(salaryXid);
            }
        } catch (SQLException | XAException e) {
            throw new RuntimeException(e);
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

3、认识JTA

JTA(Java Transaction API):是Java平台上一个标准API,用于管理和控制分布式事务的执行流程。

核心类:
javax.transaction.UserTransaction:暴露给应用使用,用来启动、提交、回滚事务。
javax.transaction.TransactionManager:提供给事务管理器的接口,用于协调和控制分布式事务的执行过程。
javax.transaction.XAResource:表示一个资源管理器,用于管理和操作资源。
javax.transaction.Xid:用于唯一标识一个分布式事务。

4、今天的主角:Atomikos

Atomikos是一个开源的事务管理器,用于管理和控制分布式事务的执行过程。提供了一个可靠的、高性能的事务管理解决方案,可以与多种应用程序和数据库集成。

简单理解就是,Atomikos是可以集成在我们Java代码里面,和我们的业务代码绑定到同一个Java进程里面的一个事务管理器的框架,可以帮助我们业务程序去自行实现分布式事务。

Atomikos特点:支持分布式事务、支持多种web服务器、支持多种数据库、支持XA协议、提供高性能的事务管理。

Atomikos可以解决,在同一个应用下,连接多个数据库,实现分布式事务。

5、2PC存在的问题

1、TM单点问题。TM挂掉之后,无法回滚和提交。
2、资源锁定的问题。资源锁定之后,TM挂掉无法回滚和提交。
3、性能瓶颈。资源锁定时间长。
4、数据不一致问题。commit时成功状态不一致就会造成数据不一致。

在这里插入图片描述

二、Atomikos使用

1、依赖+配置

<dependency>
    <groupId>org.springframework.bootgroupId>
    <artifactId>spring-boot-starter-jta-atomikosartifactId>
dependency>
  • 1
  • 2
  • 3
  • 4
server.port=8080

spring.employee-datasource.driverClassName = com.mysql.jdbc.Driver
spring.employee-datasource.jdbc-url = jdbc:mysql://localhost:3306/employee?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.employee-datasource.username = root
spring.employee-datasource.password = rootroot

spring.salary-datasource.driverClassName = com.mysql.jdbc.Driver
spring.salary-datasource.jdbc-url = jdbc:mysql://localhost:3306/salary?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
spring.salary-datasource.username = root
spring.salary-datasource.password = rootroot

logging.level.com.atomikos = debug
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2、定义AtomikosDataSourceBean数据源

import com.atomikos.jdbc.AtomikosDataSourceBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;
import java.util.Properties;

@Configuration
public class AtomikosDataSourceConfig {

    @Value("${spring.employee-datasource.jdbc-url}")
    private String employeeUrl;

    @Value("${spring.employee-datasource.username}")
    private String employeeUser;

    @Value("${spring.employee-datasource.password}")
    private String employeePassword;

    @Value("${spring.salary-datasource.jdbc-url}")
    private String salaryUrl;

    @Value("${spring.salary-datasource.username}")
    private String salaryUser;

    @Value("${spring.salary-datasource.password}")
    private String salaryPassword;

    /**
     * 定义两个数据源,分别对应两个数据库
     */
    @Bean(name = "employeeDataSource")
    public DataSource employeeDataSource(){
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setUniqueResourceName("employeeDataSource");
        atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");

        Properties properties = new Properties();
        properties.setProperty("URL", employeeUrl);
        properties.setProperty("user", employeeUser);
        properties.setProperty("password", employeePassword);
        atomikosDataSourceBean.setXaProperties(properties);
        return atomikosDataSourceBean;
    }

    /**
     * 定义两个数据源,分别对应两个数据库
     */
    @Bean(name = "salaryDataSource")
    public DataSource salaryDataSource(){
        AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
        atomikosDataSourceBean.setUniqueResourceName("salaryDataSource");
        atomikosDataSourceBean.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");

        Properties properties = new Properties();
        properties.setProperty("URL", salaryUrl);
        properties.setProperty("user", salaryUser);
        properties.setProperty("password", salaryPassword);
        atomikosDataSourceBean.setXaProperties(properties);
        return atomikosDataSourceBean;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

3、定义事务管理器JtaTransactionManager

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;

@Configuration
public class AtomikosConfig {

    // JTA的事务管理
    @Bean(name = "userTransaction")
    public UserTransaction userTransaction() {
        return new UserTransactionImp();
    }

    @Bean(name = "atomikosTransactionManager")
    public TransactionManager atomikosTransactionManager() {
        return new UserTransactionManager();
    }

    /**
     * 事务管理器
     */
    @Bean(name = "platformTransactionManager")
    @DependsOn({"userTransaction", "atomikosTransactionManager"})
    public PlatformTransactionManager transactionManager() {
        UserTransaction userTransaction = userTransaction();
        TransactionManager transactionManager = atomikosTransactionManager();
        return new JtaTransactionManager(userTransaction, transactionManager);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

4、MyBatis配置

import lombok.SneakyThrows;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

@Configuration
@MapperScan(basePackages = "com.example.distributetransaction.dao", sqlSessionFactoryRef = "sqlSessionFactoryEmployee")
public class EmployeeMybatisConfig {

    @SneakyThrows
    @Bean
    public SqlSessionFactory sqlSessionFactoryEmployee(@Qualifier("employeeDataSource") DataSource dataSource) throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(dataSource);
        return factoryBean.getObject();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
import lombok.SneakyThrows;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

@Configuration
@MapperScan(basePackages = "com.example.distributetransaction.dao1", sqlSessionFactoryRef = "sqlSessionFactorySalary")
public class SalaryMybatisConfig {

    @SneakyThrows
    @Bean
    public SqlSessionFactory sqlSessionFactorySalary(@Qualifier("salaryDataSource") DataSource dataSource) throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(dataSource);
        return factoryBean.getObject();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

5、验证

@Transactional(rollbackFor = Exception.class)
public String join(EmployeeEntity employeeEntity) {
    //第一步,插入员工基础信息
    employeeDao.insertEmployee(employeeEntity);
    //第二步,插入员工薪资
    employeeSalaryDao.insertEmployeeSalary(employeeEntity.getId(), employeeEntity.getSalary());

    int i = 1 / 0;
    return "员工入职成功";
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

三、Atomikos源码分析

1、@Transactional入口:TransactionInterceptor创建事务流程

  • (1)Spring事务入口:@Transactional
  • (2)TransactionInterceptor#invoke:Spring事务的代理拦截方法
  • (3)TransactionAspectSupport#determineTransactionManager:确定事务管理器=>我们创建的JtaTransactionManager
  • (4)TransactionAspectSupport#createTransactionIfNecessary:创建事务
  • (5)AbstractPlatformTransactionManager#getTransaction:获取事务
  • (6)JtaTransactionManager#doGetTransaction:获取事务,拿到JtaTransactionObject,里面封装了UserTransactionImp
  • (7)JtaTransactionManager获取我们配置的UserTransactionImp
    在这里插入图片描述
    在这里插入图片描述

在这里插入图片描述
JtaTransactionManager#doGetTransaction:获取事务
在这里插入图片描述
在这里插入图片描述

2、启动事务

  • (1)从AbstractPlatformTransactionManager#handleExistingTransaction调用AbstractPlatformTransactionManager#startTransaction开启事务
  • (2)调用JtaTransactionManager#doBegin开启事务
  • (3)调用JtaTransactionManager#doJtaBegin开启事务
  • (4)调用UserTransactionImp#begin开启事务
  • (5)最终调用的是UserTransactionManager#begin开启事务
  • (6)调用TransactionManagerImp#begin()开启事务
  • (7)调用CompositeTransactionManagerImp#createCompositeTransaction创建分布式事务
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

3、小总结:启动全局事务流程图

在这里插入图片描述

4、分支事务,业务流程执行过程

在这里插入图片描述

5、事务提交与回滚

在这里插入图片描述
在这里插入图片描述

文章知识点与官方知识档案匹配,可进一步学习相关知识
Java技能树使用JDBC操作数据库数据库操作150170 人正在系统学习中
注:本文转载自blog.csdn.net的秃了也弱了。的文章"https://feixiang.blog.csdn.net/article/details/136455450"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top