SpringCloud Alibaba使用Seata处理分布式事务的技巧

网友投稿 337 2023-01-05

SpringCloud Alibaba使用Seata处理分布式事务的技巧

Seata简介

在传统的单体项目中,我们使用@Transactional注解就能实现基本的ACID事务了。

但是前提是:

1) 数据库支持事务(如:mysql的innoDB引擎)

2) 所有业务都在同一个数据库中执行

随着微服务架构的引入,需要对数据库进行分库分表,每个服务拥有自己的数据库,这样传统的事务就不起作用了,那么我们如何保证多个服务中数据的一致性呢?

这样就出现了分布式事务,而Seata就是为微服务架构而生的一种高性能、易于使用的分布式事务解决方案。

Seata 中有三个基础组件:

Transaction Coordinator(TC协调者):维护全局和分支事务的状态,驱动全局提交或回滚。

Transaction Manager(TM事务管理):定义全局事务的范围,开启、提交或回滚一个全局事务。

Resource Manager(RM资源管理):管理分支事务资源,与 TC 通讯并报告分支事务状态,管理本地事务的提交与回滚。

可以这么说一个分布式事务就是全局事务GlobalTransaction,而全局事务是由一个个的分支事务组成的,每个分支事务就是一个本地事务。

Seata的生命周期

TM 要求 TC 生成一个全局事务,并由 TC 生成一个全局事务XID 返回。

XID 通过微服务调用链传播。

RM 向 TC 注册本地事务,将其注册到 ID 为 XID 的全局事务中。

TM 要求 TC 提交或回滚XID 对应的全局事务。

TC 驱动 XID 对应的全局事务对应的所有的分支事务提交或回滚。

Seata安装和配置

安装nacos,本案例使用了nacos作为注册中心

https://github.com/alibaba/nacos/releases

下载nacos,本文使用的是windows版本1.4.0

使用命令行进入bin目录,以单机模式启动nacos

startup -m standalone

安装和配置Seata

http://seata.io/zh-cn/blog/download.html

下载Seata,这里是Windows版本的1.4.0

解压后,进入conf目录,配置file.conf和registry.conf两个文件

file.conf主要是数据库的配置,配置如下

registry.conf 是注册中心的配置

另外conf目录中还需要一个脚本文件:nacos-config.sh 用于对nacos进行初始化配置

在seata1.4.0中是没有的,需要自行创建,内容如下:

#!/usr/bin/env bash

# Copyright 1999-2019 Seata.io Group.

#

# Licensed under the Apache License, Version 2.0 (the "License");

# you may not use this file except in compliance with the License.

# You may obtain a copy of the License at、

#

# http://apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

while getopts ":h:p:g:t:u:w:" opt

do

case $opt in

h)

host=$OPTARG

;;

p)

port=$OPTARG

;;

g)

group=$OPTARG

;;

t)

tenant=$OPTARG

;;

u)

username=$OPTARG

;;

w)

password=$OPTARG

;;

?)

echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password] "

exit 1

;;

esac

done

urlencode() {

for ((i=0; i < ${#1}; i++))

do

char="${1:$i:1}"

case $char in

[a-zA-Z0-9.~_-]) printf $char ;;

*) printf '%%%02X' "'$char" ;;

esac

done

}

if [[ -z ${host} ]]; then

host=localhost

fi

if [[ -z ${port} ]]; then

port=8848

fi

if [[ -z ${group} ]]; then

group="SEATA_GROUP"

fi

if [[ -z ${tenant} ]]; then

tenant=""

fi

if [[ -z ${username} ]]; then

username=""

fi

if [[ -z ${password} ]]; then

password=""

fi

nacosAddr=$host:$port

contentType="content-type:application/json;charset=UTF-8"

echo "set nacosAddr=$nacosAddr"

echo "set group=$group"

failCount=0

tempLog=$(mktemp -u)

function addConfig() {

curl -X POST -H "${contentType}" "http://$nacosAddr/nacos/v1/cs/configs?dataId=$(urlencode $1)&group=$group&content=$(urlencode $2)&tenant=$tenant&username=$username&password=$password" >"${tempLog}" 2>/dev/null

if [[ -z $(cat "${tempLog}") ]]; then

echo " Please check the cluster status. "

exit 1

fi

if [[ $(cat "${tempLog}") =~ "true" ]]; then

echo "Set $1=$2 successfully "

else

echo "Set $1=$2 failure "

(( failCount++ ))

fi

}

count=0

for line in $(cat $(dirname "$PWD")/config.txt | sed s/[[:space:]]//g); do

(( count++ ))

key=${line%%=*}

value=${line#*=}

addConfig "${key}" "${value}"

done

echo "========================================================================="

echo " Complete initialization parameters, total-count:$count , failure-count:$failCount "

echo "========================================================================="

if [[ ${failCount} -eq 0 ]]; then

echo " Init nacos config finished, please start seata-server. "

else

echo " init nacos config fail. "

fi

在seata的根目录,与conf同级的目录下,还需要config.txt 配置文件,默认也是没有的

只需要对mysql的配置进行修改

完整文件:

transport.type=TCP

transport.server=NIO

transport.heartbeat=true

transport.enableClientBatchSendRequest=true

transport.threadFactory.bossThreadPrefix=NettyBoss

transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker

transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler

transport.threadFactory.shareBossWorker=false

transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector

transport.threadFactory.clientSelectorThreadSize=1

transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread

transport.threadFactory.bossThreadSize=1

transport.threadFactory.workerThreadSize=default

transport.shutdown.wait=3

service.vgroupMapping.my_test_tx_group=default

service.default.grouplist=127.0.0.1:8091

service.enableDegrade=false

service.disableGlobalTransaction=false

client.rm.asyncCommitBufferLimit=10000

client.rm.lock.retryInterval=10

client.rm.lock.retryTimes=30

client.rm.lock.retryPolicyBranchRollbackOnConflict=true

client.rm.reportRetryCount=5

client.rm.tableMetaCheckEnable=false

client.rm.tableMetaCheckerInterval=60000

client.rm.sqlParserType=druid

client.rm.reportSuccessEnable=false

client.rm.sagaBranchRegisterEnable=false

client.rm.tccActionInterceptorOrder=-2147482648

client.tm.commitRetryCount=5

client.tm.rollbackRetryCount=5

client.tm.defaultGlobalTransactionTimeout=60000

client.tm.degradeCheck=false

client.tm.degradeCheckAllowTimes=10

client.tm.degradeCheckPeriod=2000

client.tm.interceptorOrder=-2147482648

store.mode=file

store.lock.mode=file

store.session.mode=file

store.publicKey=xx

store.file.dir=file_store/data

store.file.maxBranchSessionSize=16384

store.file.maxGlobalSessionSize=512

store.file.fileWriteBufferCacheSize=16384

store.file.flushDiskMode=async

store.file.sessionReloadReadSize=100

store.db.datasource=druid

store.db.dbType=mysql

store.db.driverClassName=com.mysql.jdbc.Driver

store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true

store.db.user=root

store.db.password=123456

store.db.minConn=5

store.db.maxConn=30

store.db.globalTable=global_table

store.db.branchTable=branch_table

store.db.queryLimit=100

store.db.lockTable=lock_table

storhttp://e.db.maxWait=5000

store.redis.mode=single

store.redis.single.host=127.0.0.1

store.redis.single.port=6379

store.redis.sentinel.masterName=xx

store.redis.sentinel.sentinelHosts=xx

store.redis.maxConn=10

store.redis.minConn=1

store.redis.maxTotal=100

store.redis.database=0

store.redis.password=xx

store.redis.queryLimit=100

server.recovery.committingRetryPeriod=1000

server.recovery.asynCommittingRetryPeriod=1000

server.recovery.rollbackingRetryPeriod=1000

server.recovery.timeoutRetryPeriod=1000

server.maxCommitRetryTimeout=-1

server.maxRollbackRetryTimeout=-1

server.rollbackRetryTimeoutUnlockEnable=false

server.distributedLockExpireTime=10000

client.undo.dataValidation=true

client.undo.logSerialization=jackson

client.undo.onlyCareUpdateColumns=true

server.undo.logSaveDays=7

server.undo.logDeletePeriod=86400000

client.undo.logTable=undo_log

client.undo.compress.enable=true

client.undo.compress.type=zip

client.undo.compress.threshold=64k

log.exceptionRate=100

transport.serialization=seata

transport.compressor=none

metrics.enabled=false

metrics.registryType=compact

metrics.exporterList=prometheus

metrics.exporterPrometheusPort=9898

在conf目录中,使用Git Bash进入命令行,输入

sh nacos-config.sh 127.0.0.1

这是对Seata进行初始化配置,上图表示所有配置都成功设置了

在nacos中可以看到出现了seata相关的配置

接下来在seata数据库中,新建三个表

drop table if exists `global_table`;

create table `global_table` (

`xid` varchar(128) not null,

`transaction_id` bigint,

`status` tinyint not null,

`application_id` varchar(32),

`transaction_service_group` varchar(32),

`transaction_name` varchar(128),

`timeout` int,

`begin_time` bigint,

`application_http://data` varchar(2000),

`gmt_create` datetime,

`gmt_modified` datetime,

primary key (`xid`),

key `idx_gmt_modified_status` (`gmt_modified`, `status`),

key `idx_transaction_id` (`transaction_id`)

);

drop table if exists `branch_table`;

create table `branch_table` (

`branch_id` bigint not null,

`xid` varchar(128) not null,

`transaction_id` bigint ,

`resource_group_id` varchar(32),

`resource_id` varchar(256) ,

`lock_key` varchar(128) ,

`branch_type` varchar(8) ,

`status` tinyint,

`client_id` varchar(64),

`application_data` varchar(2000),

`gmt_create` datetime,

`gmt_modified` datetime,

primary key (`branch_id`),

key `idx_xid` (`xid`)

);

drop table if exists `lock_table`;

create table `lock_table` (

`row_key` varchar(128) not null,

`xid` varchar(96),

`transaction_id` long ,

`branch_id` long,

`resource_id` varchar(256) ,

`table_name` varchar(32) ,

`pk` varchar(36) ,

`gmt_create` datetime ,

`gmt_modified` datetime,

primary key(`row_key`)

);

在项目相关的数据库中,新建表undo_log 用于记录撤销日志

CREATE TABLE `undo_log` (

`id` bigint(20) NOT NULL AUTO_INCREMENT,

`branch_id` bigint(20) NOT NULL,

`xid` varchar(100) NOT NULL,

`context` varchar(128) NOT NULL,

`rollback_info` longblob NOT NULL,

`log_status` int(11) NOT NULL,

`log_created` datetime NOT NULL,

`log_modified` datetime NOT NULL,

`ext` varchar(100) DEFAULT NULL,

PRIMARY KEY (`id`),

UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

最后在bin目录中,启动命令行,执行seata-server.bat 启动Seata服务

项目应用Seata

SpringCloud项目中有两个服务:订单服务和库存服务,基本业务是:

购买商品

插入订单

减少库存

订单详情表

DROP TABLE IF EXISTS `tb_order_detail`;

CREATE TABLE `tb_order_detail` (

`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '订单详情id ',

`order_id` bigint(20) NOT NULL COMMENT '订单id',

`sku_id` bigint(20) NOT NULL COMMENT 'sku商品id',

`num` int(11) NOT NULL COMMENT '购买数量',

`title` varchar(256) NOT NULL COMMENT '商品标题',

`own_spec` varchar(1024) DEFAULT '' COMMENT '商品动态属性键值集',

`price` bigint(20) NOT NULL COMMENT '价格,单位:分',

`image` varchar(128) DEFAULT '' COMMENT '商品图片',

PRIMARY KEY (`id`),

KEY `key_order_id` (`order_id`) USING BTREE

) ENGINE=MyISAM AUTO_INCREMENT=131 DEFAULT CHARSET=utf8 COMMENT='订单详情表';

库存表

DROP TABLE IF EXISTS `tb_stock`;

CREATE TABLE `tb_stock` (

`sku_id` bigint(20) NOT NULL COMMENT '库存对应的商品sku id',

`seckill_stock` int(9) DEFAULT '0' COMMENT '可秒杀库存',

`seckill_total` int(9) DEFAULT '0' COMMENT '秒杀总数量',

`stock` int(9) NOT NULL COMMENT '库存数量',

PRIMARY KEY (`sku_id`)

) ENGINE=MyISAM DEFAULT CHARSET=utf8 COMMENT='库存表,代表库存,秒杀库存等信息';

父项目定义了springboot、springcloud、springcloud-alibaba的版本

org.springframework.boot

spring-boot-starter-parent

2.3.10.RELEASE

org.springframework.cloud

spring-cloud-alibaba-dependencies

0.9.0.RELEASE

pom

import

com.alibaba.cloud

spring-cloud-alibaba-dependencies

2.2.1.RELEASE

pom

import

org.springframework.cloud

spring-cloud-dependencies

Hoxton.SR8

pom

import

子项目的依赖定义了nacos和seata客户端

mysql

mysql-connector-java

runtime

com.baomidou

mybatis-plus-boot-starter

3.3.2

org.springframework.boot

spring-boot-starter-web

org.springframework.cloud

spring-cloud-starter-alibaba-nacos-discovery

com.alibaba.cloud

spring-cloud-starter-alibaba-seata

io.seata

seata-spring-boot-starter

io.seata

seata-spring-boot-starter

1.2.0

子项目配置文件

完整配置

server:

port: 8001

spring:

application:

name: stock-service

cloud:

nacos:

discovery:

server-addr: localhost:8848

alibaba:

seata:

enabled: true

enable-auto-data-source-proxy: true

tx-service-group: my_test_tx_group

registry:

type: nacos

nacos:

application: seata-server

server-addr: 127.0.0.1:8848

username: nacos

password: nacos

config:

type: nacos

nacos:

server-addr: 127.0.0.1:8848

group: SEATA_GROUP

username: nacos

password: nacos

service:

vgroup-mapping:

my_test_tx_group: default

disable-global-transaction: false

client:

rm:

report-success-enable: false

datasource:

driver-class-name: com.mysql.cj.jdbc.Driver

url: jdbc:mysql://localhost:3306/eshop?serverTimezone=UTC&useUnicode=true&useSSL=false&characterEncoding=utf8

username: root

password: 123456

库存服务定义了减库存的方法

@RestController

public class StockController {

@Autowired

private IStockService stockService;

@PutMapping("/stock")

public ResponseEntity reduceSkuStock(@RequestParam("skuId")Long skuId,

@RequestParam("number")Integer number){

Stock stock = stockService.getById(skuId);

if(stock.getStock() < number){

throw new RuntimeException("库存不足,SkuId:" + skuId);

}

stock.setStock(stock.getStock() - number);

stockService.updateById(stock);

return ResponseEntity.ok(stock);

}

}

订单服务在插入订单后,使用Feign调用了减库存的服务

@Service

public class OrderDetailServiceImpl extends ServiceImpl implements IOrderDetailService {

//库存服务Feign

@Autowired

private StockFeignClient stockFeignClient;

// @Transactional

@GlobalTransactional(rollbackFor = {Exception.class})

@Override

public void makeOrder(OrderDetail orderDetail) {

this.save(orderDetail); //保存订单

int x = 11 / 0; //抛出异常

//减库存

stockFeignClient.reduceSkuStock(orderDetail.getSkuId(),orderDetail.getNum());

}

}

插订单和减库存属于两个服务,传统的@Transactional已经不能保证它们的原子性了

这里使用了Seata提供的@GlobalTransactional全局事务注解,出现任何异常后都能实现业务回滚。

测试用例:

@RunWith(SpringRunner.class)

@SpringBootTest

public class OrderServiceApplicationTests {

@Autowired

private IOrderDetailService orderDetailService;

@Test

public void testOrder() {

OrderDetail orderDetail = new OrderDetail();

orderDetail.setNum(100);

orderDetail.setOrderId(9999L);

orderDetail.setPrice(9999L);

orderDetail.setSkuId(27359021728L);

orderDetail.setTitle(UUID.randomUUID().toString());

orderDetailService.makeOrder(orderDetail);

}

}

运行后看到启动了全局事务,发生异常后,两个服务也都能成功回滚。

以上就是SpringCloud Alibaba使用Seata 分布式事务的详细内容,更多关于SpringCloud Alibaba分布式事务的资料请关注我们其它相关文章!

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:调用api接口教程(调用api接口教程html)
下一篇:各种api接口网站源码是什么(各种api接口网站源码是什么)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~