1 Star 0 Fork 0

ltotal/seata-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
xa_resource_manager.go 7.68 KB
一键复制 编辑 原始数据 按行查看 历史
ltotal 提交于 2024-05-30 10:15 . 初始化提交
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sql
import (
"context"
"database/sql"
"errors"
"flag"
"fmt"
"sync"
"time"
"github.com/bluele/gcache"
"gitee.com/ltotal/seata-go/pkg/datasource/sql/datasource"
"gitee.com/ltotal/seata-go/pkg/datasource/sql/types"
"gitee.com/ltotal/seata-go/pkg/protocol/branch"
"gitee.com/ltotal/seata-go/pkg/rm"
"gitee.com/ltotal/seata-go/pkg/util/log"
)
var branchStatusCache gcache.Cache
type XAConnConf struct {
XaBranchExecutionTimeout time.Duration `json:"xa_branch_execution_timeout" xml:"xa_branch_execution_timeout" koanf:"xa_branch_execution_timeout"`
}
func (cfg *XAConnConf) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.XaBranchExecutionTimeout, prefix+".xa_branch_execution_timeout", time.Minute, "Undo log table name.")
}
type XAConfig struct {
xaConnConf XAConnConf
TwoPhaseHoldTime time.Duration `json:"two_phase_hold_time" yaml:"xa_two_phase_hold_time" koanf:"xa_two_phase_hold_time"`
}
func (cfg *XAConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.TwoPhaseHoldTime, prefix+".two_phase_hold_time", time.Millisecond*1000, "Undo log table name.")
cfg.xaConnConf.RegisterFlagsWithPrefix(prefix, f)
}
func InitXA(config XAConfig) *XAResourceManager {
xaSourceManager := &XAResourceManager{
resourceCache: sync.Map{},
basic: datasource.NewBasicSourceManager(),
rmRemoting: rm.GetRMRemotingInstance(),
config: config,
}
xaConnTimeout = config.xaConnConf.XaBranchExecutionTimeout
branchStatusCache = gcache.New(1024).LRU().Expiration(time.Minute * 10).Build()
rm.GetRmCacheInstance().RegisterResourceManager(xaSourceManager)
go xaSourceManager.xaTwoPhaseTimeoutChecker()
return xaSourceManager
}
type XAResourceManager struct {
config XAConfig
resourceCache sync.Map
basic *datasource.BasicSourceManager
rmRemoting *rm.RMRemoting
}
func (xaManager *XAResourceManager) xaTwoPhaseTimeoutChecker() {
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
xaManager.resourceCache.Range(func(key, value any) bool {
source, ok := value.(*DBResource)
if !ok {
return true
}
if source.IsShouldBeHeld() {
return true
}
source.GetKeeper().Range(func(key, value any) bool {
connectionXA, isConnectionXA := value.(*XAConn)
if !isConnectionXA {
return true
}
if time.Now().Sub(connectionXA.prepareTime) > xaManager.config.TwoPhaseHoldTime {
if err := connectionXA.CloseForce(); err != nil {
log.Errorf("Force close the xa xid:%s physical connection fail", connectionXA.txCtx.XID)
}
}
return true
})
return true
})
}
}
}
func (xaManager *XAResourceManager) GetBranchType() branch.BranchType {
return branch.BranchTypeXA
}
func (xaManager *XAResourceManager) GetCachedResources() *sync.Map {
return &xaManager.resourceCache
}
func (xaManager *XAResourceManager) RegisterResource(res rm.Resource) error {
xaManager.resourceCache.Store(res.GetResourceId(), res)
return xaManager.basic.RegisterResource(res)
}
func (xaManager *XAResourceManager) UnregisterResource(resource rm.Resource) error {
return xaManager.basic.UnregisterResource(resource)
}
func (xaManager *XAResourceManager) xaIDBuilder(xid string, branchId uint64) XAXid {
return XaIdBuild(xid, branchId)
}
func (xaManager *XAResourceManager) finishBranch(ctx context.Context, xaID XAXid, branchResource rm.BranchResource) (*XAConn, error) {
resource, ok := xaManager.resourceCache.Load(branchResource.ResourceId)
if !ok {
err := fmt.Errorf("unknow resource for rollback xa, resourceId: %s", branchResource.ResourceId)
log.Errorf(err.Error())
return nil, err
}
dbResource, ok := resource.(*DBResource)
if !ok {
err := fmt.Errorf("unknow resource for rollback xa, resourceId: %s", branchResource.ResourceId)
log.Errorf(err.Error())
return nil, err
}
connectionProxyXA, err := dbResource.ConnectionForXA(ctx, xaID)
if err != nil {
err := fmt.Errorf("get connection for rollback xa, resourceId: %s", branchResource.ResourceId)
log.Errorf(err.Error())
return nil, err
}
return connectionProxyXA, nil
}
func (xaManager *XAResourceManager) BranchCommit(ctx context.Context, branchResource rm.BranchResource) (branch.BranchStatus, error) {
xaID := xaManager.xaIDBuilder(branchResource.Xid, uint64(branchResource.BranchId))
connectionProxyXA, err := xaManager.finishBranch(ctx, xaID, branchResource)
if err != nil {
return branch.BranchStatusPhasetwoRollbackFailedUnretryable, err
}
if err := connectionProxyXA.XaCommit(ctx, xaID); err != nil {
log.Errorf("commit xa, resourceId: %s, err %v", branchResource.ResourceId, err)
setBranchStatus(xaID.String(), branch.BranchStatusPhasetwoCommitted)
return branch.BranchStatusPhasetwoCommitFailedUnretryable, err
}
log.Infof("%s was committed", xaID.String())
return branch.BranchStatusPhasetwoCommitted, nil
}
func (xaManager *XAResourceManager) BranchRollback(ctx context.Context, branchResource rm.BranchResource) (branch.BranchStatus, error) {
xaID := xaManager.xaIDBuilder(branchResource.Xid, uint64(branchResource.BranchId))
connectionProxyXA, err := xaManager.finishBranch(ctx, xaID, branchResource)
if err != nil {
return branch.BranchStatusPhasetwoRollbackFailedUnretryable, err
}
if err = connectionProxyXA.XaRollbackByBranchId(ctx, xaID); err != nil {
log.Errorf("rollback xa, resourceId: %s, err %v", branchResource.ResourceId, err)
setBranchStatus(xaID.String(), branch.BranchStatusPhasetwoRollbacked)
return branch.BranchStatusPhasetwoRollbackFailedUnretryable, err
}
log.Infof("%s was rollback", xaID.String())
return branch.BranchStatusPhasetwoRollbacked, nil
}
func (xaManager *XAResourceManager) LockQuery(ctx context.Context, param rm.LockQueryParam) (bool, error) {
return false, nil
}
func (xaManager *XAResourceManager) BranchRegister(ctx context.Context, req rm.BranchRegisterParam) (int64, error) {
return xaManager.rmRemoting.BranchRegister(req)
}
func (xaManager *XAResourceManager) BranchReport(ctx context.Context, param rm.BranchReportParam) error {
return xaManager.rmRemoting.BranchReport(param)
}
func (xaManager *XAResourceManager) CreateTableMetaCache(ctx context.Context, resID string, dbType types.DBType, db *sql.DB) (datasource.TableMetaCache, error) {
return xaManager.basic.CreateTableMetaCache(ctx, resID, dbType, db)
}
func branchStatus(xaBranchXid string) (branch.BranchStatus, error) {
tmpBranchStatus, err := branchStatusCache.GetIFPresent(xaBranchXid)
if err != nil {
if errors.Is(err, gcache.KeyNotFoundError) {
return branch.BranchStatusUnknown, nil
}
return branch.BranchStatusUnknown, err
}
branchStatus, isBranchStatus := tmpBranchStatus.(branch.BranchStatus)
if !isBranchStatus {
return branch.BranchStatusUnknown, fmt.Errorf("branchId:%s get result isn't branch status", xaBranchXid)
}
return branchStatus, nil
}
func setBranchStatus(xaBranchXid string, status branch.BranchStatus) {
branchStatusCache.Set(xaBranchXid, status)
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/ltotal/seata-go.git
git@gitee.com:ltotal/seata-go.git
ltotal
seata-go
seata-go
v1.2.1

搜索帮助