1 Star 0 Fork 0

李易 / gorm-dm8-dialect

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
m.go 18.11 KB
一键复制 编辑 原始数据 按行查看 历史
zhenyong.li 提交于 2024-01-19 03:54 . update driver to embed dm dir
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878
/*
* Copyright (c) 2000-2018, 达梦数据库有限公司.
* All rights reserved.
*/
package dm
import (
"bytes"
"context"
"database/sql"
"database/sql/driver"
"fmt"
"sync"
"sync/atomic"
"gitee.com/java-dev101/gorm-dm8-dialect/dm/parser"
"gitee.com/java-dev101/gorm-dm8-dialect/dm/util"
"golang.org/x/text/encoding"
)
type DmConnection struct {
filterable
mu sync.Mutex
dmConnector *DmConnector
Access *dm_build_414
stmtMap map[int32]*DmStatement
lastExecInfo *execRetInfo
lexer *parser.Lexer
encode encoding.Encoding
encodeBuffer *bytes.Buffer
transformReaderDst []byte
transformReaderSrc []byte
serverEncoding string
GlobalServerSeries int
ServerVersion string
Malini2 bool
Execute2 bool
LobEmptyCompOrcl bool
IsoLevel int32
ReadOnly bool
NewLobFlag bool
sslEncrypt int
MaxRowSize int32
DDLAutoCommit bool
BackslashEscape bool
SvrStat int32
SvrMode int32
ConstParaOpt bool
DbTimezone int16
LifeTimeRemainder int16
InstanceName string
Schema string
LastLoginIP string
LastLoginTime string
FailedAttempts int32
LoginWarningID int32
GraceTimeRemainder int32
Guid string
DbName string
StandbyHost string
StandbyPort int32
StandbyCount int32
SessionID int64
OracleDateLanguage byte
FormatDate string
FormatTimestamp string
FormatTimestampTZ string
FormatTime string
FormatTimeTZ string
Local bool
MsgVersion int32
TrxStatus int32
dscControl bool
trxFinish bool
autoCommit bool
isBatch bool
watching bool
watcher chan<- context.Context
closech chan struct{}
finished chan<- struct{}
canceled atomicError
closed atomicBool
}
func (conn *DmConnection) setTrxFinish(status int32) {
switch status & Dm_build_814 {
case Dm_build_811, Dm_build_812, Dm_build_813:
conn.trxFinish = true
default:
conn.trxFinish = false
}
}
func (dmConn *DmConnection) init() {
dmConn.stmtMap = make(map[int32]*DmStatement)
dmConn.DbTimezone = 0
dmConn.GlobalServerSeries = 0
dmConn.MaxRowSize = 0
dmConn.LobEmptyCompOrcl = false
dmConn.ReadOnly = false
dmConn.DDLAutoCommit = false
dmConn.ConstParaOpt = false
dmConn.IsoLevel = -1
dmConn.Malini2 = true
dmConn.NewLobFlag = true
dmConn.Execute2 = true
dmConn.serverEncoding = ENCODING_GB18030
dmConn.TrxStatus = Dm_build_762
dmConn.setTrxFinish(dmConn.TrxStatus)
dmConn.OracleDateLanguage = byte(Locale)
dmConn.lastExecInfo = NewExceInfo()
dmConn.MsgVersion = Dm_build_695
dmConn.idGenerator = dmConnIDGenerator
}
func (dmConn *DmConnection) reset() {
dmConn.DbTimezone = 0
dmConn.GlobalServerSeries = 0
dmConn.MaxRowSize = 0
dmConn.LobEmptyCompOrcl = false
dmConn.ReadOnly = false
dmConn.DDLAutoCommit = false
dmConn.ConstParaOpt = false
dmConn.IsoLevel = -1
dmConn.Malini2 = true
dmConn.NewLobFlag = true
dmConn.Execute2 = true
dmConn.serverEncoding = ENCODING_GB18030
dmConn.TrxStatus = Dm_build_762
dmConn.setTrxFinish(dmConn.TrxStatus)
}
func (dc *DmConnection) checkClosed() error {
if dc.closed.IsSet() {
return driver.ErrBadConn
}
return nil
}
func (dc *DmConnection) executeInner(query string, execType int16) (interface{}, error) {
stmt, err := NewDmStmt(dc, query)
if err != nil {
return nil, err
}
if execType == Dm_build_779 {
defer stmt.close()
}
stmt.innerUsed = true
if stmt.dmConn.dmConnector.escapeProcess {
stmt.nativeSql, err = stmt.dmConn.escape(stmt.nativeSql, stmt.dmConn.dmConnector.keyWords)
if err != nil {
stmt.close()
return nil, err
}
}
var optParamList []OptParameter
if stmt.dmConn.ConstParaOpt {
optParamList = make([]OptParameter, 0)
stmt.nativeSql, optParamList, err = stmt.dmConn.execOpt(stmt.nativeSql, optParamList, stmt.dmConn.getServerEncoding())
if err != nil {
stmt.close()
optParamList = nil
}
}
if execType == Dm_build_778 && dc.dmConnector.enRsCache {
rpv, err := rp.get(stmt, query)
if err != nil {
return nil, err
}
if rpv != nil {
stmt.execInfo = rpv.execInfo
dc.lastExecInfo = rpv.execInfo
return newDmRows(rpv.getResultSet(stmt)), nil
}
}
var info *execRetInfo
if optParamList != nil && len(optParamList) > 0 {
info, err = dc.Access.Dm_build_494(stmt, optParamList)
if err != nil {
stmt.nativeSql = query
info, err = dc.Access.Dm_build_500(stmt, execType)
}
} else {
info, err = dc.Access.Dm_build_500(stmt, execType)
}
if err != nil {
stmt.close()
return nil, err
}
dc.lastExecInfo = info
if execType == Dm_build_778 && info.hasResultSet {
return newDmRows(newInnerRows(0, stmt, info)), nil
} else {
return newDmResult(stmt, info), nil
}
}
func g2dbIsoLevel(isoLevel int32) int32 {
switch isoLevel {
case 1:
return Dm_build_766
case 2:
return Dm_build_767
case 4:
return Dm_build_768
case 6:
return Dm_build_769
default:
return -1
}
}
func (dc *DmConnection) Begin() (driver.Tx, error) {
if len(dc.filterChain.filters) == 0 {
return dc.begin()
} else {
return dc.filterChain.reset().DmConnectionBegin(dc)
}
}
func (dc *DmConnection) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
if len(dc.filterChain.filters) == 0 {
return dc.beginTx(ctx, opts)
}
return dc.filterChain.reset().DmConnectionBeginTx(dc, ctx, opts)
}
func (dc *DmConnection) Commit() error {
if len(dc.filterChain.filters) == 0 {
return dc.commit()
} else {
return dc.filterChain.reset().DmConnectionCommit(dc)
}
}
func (dc *DmConnection) Rollback() error {
if len(dc.filterChain.filters) == 0 {
return dc.rollback()
} else {
return dc.filterChain.reset().DmConnectionRollback(dc)
}
}
func (dc *DmConnection) Close() error {
if len(dc.filterChain.filters) == 0 {
return dc.close()
} else {
return dc.filterChain.reset().DmConnectionClose(dc)
}
}
func (dc *DmConnection) Ping(ctx context.Context) error {
if len(dc.filterChain.filters) == 0 {
return dc.ping(ctx)
} else {
return dc.filterChain.reset().DmConnectionPing(dc, ctx)
}
}
func (dc *DmConnection) Exec(query string, args []driver.Value) (driver.Result, error) {
if len(dc.filterChain.filters) == 0 {
return dc.exec(query, args)
}
return dc.filterChain.reset().DmConnectionExec(dc, query, args)
}
func (dc *DmConnection) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
if len(dc.filterChain.filters) == 0 {
return dc.execContext(ctx, query, args)
}
return dc.filterChain.reset().DmConnectionExecContext(dc, ctx, query, args)
}
func (dc *DmConnection) Query(query string, args []driver.Value) (driver.Rows, error) {
if len(dc.filterChain.filters) == 0 {
return dc.query(query, args)
}
return dc.filterChain.reset().DmConnectionQuery(dc, query, args)
}
func (dc *DmConnection) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
if len(dc.filterChain.filters) == 0 {
return dc.queryContext(ctx, query, args)
}
return dc.filterChain.reset().DmConnectionQueryContext(dc, ctx, query, args)
}
func (dc *DmConnection) Prepare(query string) (driver.Stmt, error) {
if len(dc.filterChain.filters) == 0 {
return dc.prepare(query)
}
return dc.filterChain.reset().DmConnectionPrepare(dc, query)
}
func (dc *DmConnection) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
if len(dc.filterChain.filters) == 0 {
return dc.prepareContext(ctx, query)
}
return dc.filterChain.reset().DmConnectionPrepareContext(dc, ctx, query)
}
func (dc *DmConnection) ResetSession(ctx context.Context) error {
if len(dc.filterChain.filters) == 0 {
return dc.resetSession(ctx)
}
if err := dc.filterChain.reset().DmConnectionResetSession(dc, ctx); err != nil {
return driver.ErrBadConn
} else {
return nil
}
}
func (dc *DmConnection) CheckNamedValue(nv *driver.NamedValue) error {
if len(dc.filterChain.filters) == 0 {
return dc.checkNamedValue(nv)
}
return dc.filterChain.reset().DmConnectionCheckNamedValue(dc, nv)
}
func (dc *DmConnection) begin() (*DmConnection, error) {
return dc.beginTx(context.Background(), driver.TxOptions{driver.IsolationLevel(sql.LevelDefault), false})
}
func (dc *DmConnection) beginTx(ctx context.Context, opts driver.TxOptions) (*DmConnection, error) {
if err := dc.watchCancel(ctx); err != nil {
return nil, err
}
defer dc.finish()
err := dc.checkClosed()
if err != nil {
return nil, err
}
dc.autoCommit = false
if sql.IsolationLevel(opts.Isolation) == sql.LevelDefault {
opts.Isolation = driver.IsolationLevel(sql.LevelReadCommitted)
}
if dc.ReadOnly != opts.ReadOnly {
dc.ReadOnly = opts.ReadOnly
var readonly = 0
if opts.ReadOnly {
readonly = 1
}
dc.exec(fmt.Sprintf("SP_SET_SESSION_READONLY(%d)", readonly), nil)
}
if dc.IsoLevel != int32(opts.Isolation) {
switch sql.IsolationLevel(opts.Isolation) {
case sql.LevelDefault, sql.LevelReadUncommitted:
return dc, nil
case sql.LevelReadCommitted, sql.LevelSerializable:
dc.IsoLevel = int32(opts.Isolation)
case sql.LevelRepeatableRead:
if dc.CompatibleMysql() {
dc.IsoLevel = int32(sql.LevelReadCommitted)
} else {
return nil, ECGO_INVALID_TRAN_ISOLATION.throw()
}
default:
return nil, ECGO_INVALID_TRAN_ISOLATION.throw()
}
err = dc.Access.Dm_build_554(dc)
if err != nil {
return nil, err
}
}
return dc, nil
}
func (dc *DmConnection) commit() error {
err := dc.checkClosed()
if err != nil {
return err
}
defer func() {
dc.autoCommit = dc.dmConnector.autoCommit
if dc.ReadOnly {
dc.exec("SP_SET_SESSION_READONLY(0)", nil)
}
}()
if !dc.autoCommit {
err = dc.Access.Commit()
if err != nil {
return err
}
dc.trxFinish = true
return nil
} else if !dc.dmConnector.alwayseAllowCommit {
return ECGO_COMMIT_IN_AUTOCOMMIT_MODE.throw()
}
return nil
}
func (dc *DmConnection) rollback() error {
err := dc.checkClosed()
if err != nil {
return err
}
defer func() {
dc.autoCommit = dc.dmConnector.autoCommit
if dc.ReadOnly {
dc.exec("SP_SET_SESSION_READONLY(0)", nil)
}
}()
if !dc.autoCommit {
err = dc.Access.Rollback()
if err != nil {
return err
}
dc.trxFinish = true
return nil
} else if !dc.dmConnector.alwayseAllowCommit {
return ECGO_ROLLBACK_IN_AUTOCOMMIT_MODE.throw()
}
return nil
}
func (dc *DmConnection) reconnect() error {
err := dc.Access.Close()
if err != nil {
return err
}
for _, stmt := range dc.stmtMap {
for id, rs := range stmt.rsMap {
rs.Close()
delete(stmt.rsMap, id)
}
}
var newConn *DmConnection
if dc.dmConnector.group != nil {
if newConn, err = dc.dmConnector.group.connect(dc.dmConnector); err != nil {
return err
}
} else {
newConn, err = dc.dmConnector.connect(context.Background())
}
oldMap := dc.stmtMap
newConn.mu = dc.mu
newConn.filterable = dc.filterable
*dc = *newConn
for _, stmt := range oldMap {
if stmt.closed {
continue
}
err = dc.Access.Dm_build_472(stmt)
if err != nil {
stmt.free()
continue
}
if stmt.prepared || stmt.paramCount > 0 {
if err = stmt.prepare(); err != nil {
continue
}
}
dc.stmtMap[stmt.id] = stmt
}
return nil
}
func (dc *DmConnection) cleanup() {
dc.close()
}
func (dc *DmConnection) close() error {
if !dc.closed.TrySet(true) {
return nil
}
util.AbsorbPanic(func() {
close(dc.closech)
})
if dc.Access == nil {
return nil
}
dc.rollback()
for _, stmt := range dc.stmtMap {
stmt.free()
}
dc.Access.Close()
return nil
}
func (dc *DmConnection) ping(ctx context.Context) error {
if err := dc.watchCancel(ctx); err != nil {
return err
}
defer dc.finish()
rows, err := dc.query("select 1", nil)
if err != nil {
return err
}
return rows.close()
}
func (dc *DmConnection) exec(query string, args []driver.Value) (*DmResult, error) {
err := dc.checkClosed()
if err != nil {
return nil, err
}
if args != nil && len(args) > 0 {
stmt, err := dc.prepare(query)
defer stmt.close()
if err != nil {
return nil, err
}
dc.lastExecInfo = stmt.execInfo
return stmt.exec(args)
} else {
r1, err := dc.executeInner(query, Dm_build_779)
if err != nil {
return nil, err
}
if r2, ok := r1.(*DmResult); ok {
return r2, nil
} else {
return nil, ECGO_NOT_EXEC_SQL.throw()
}
}
}
func (dc *DmConnection) execContext(ctx context.Context, query string, args []driver.NamedValue) (*DmResult, error) {
if err := dc.watchCancel(ctx); err != nil {
return nil, err
}
defer dc.finish()
err := dc.checkClosed()
if err != nil {
return nil, err
}
if args != nil && len(args) > 0 {
stmt, err := dc.prepare(query)
defer stmt.close()
if err != nil {
return nil, err
}
dc.lastExecInfo = stmt.execInfo
dargs, err := namedValueToValue(stmt, args)
if err != nil {
return nil, err
}
return stmt.exec(dargs)
} else {
r1, err := dc.executeInner(query, Dm_build_779)
if err != nil {
return nil, err
}
if r2, ok := r1.(*DmResult); ok {
return r2, nil
} else {
return nil, ECGO_NOT_EXEC_SQL.throw()
}
}
}
func (dc *DmConnection) query(query string, args []driver.Value) (*DmRows, error) {
err := dc.checkClosed()
if err != nil {
return nil, err
}
if args != nil && len(args) > 0 {
stmt, err := dc.prepare(query)
if err != nil {
stmt.close()
return nil, err
}
dc.lastExecInfo = stmt.execInfo
stmt.innerUsed = true
return stmt.query(args)
} else {
r1, err := dc.executeInner(query, Dm_build_778)
if err != nil {
return nil, err
}
if r2, ok := r1.(*DmRows); ok {
return r2, nil
} else {
return nil, ECGO_NOT_QUERY_SQL.throw()
}
}
}
func (dc *DmConnection) queryContext(ctx context.Context, query string, args []driver.NamedValue) (*DmRows, error) {
if err := dc.watchCancel(ctx); err != nil {
return nil, err
}
defer dc.finish()
err := dc.checkClosed()
if err != nil {
return nil, err
}
if args != nil && len(args) > 0 {
stmt, err := dc.prepare(query)
if err != nil {
if stmt != nil {
stmt.close()
}
return nil, err
}
dc.lastExecInfo = stmt.execInfo
stmt.innerUsed = true
dargs, err := namedValueToValue(stmt, args)
if err != nil {
return nil, err
}
return stmt.query(dargs)
} else {
r1, err := dc.executeInner(query, Dm_build_778)
if err != nil {
return nil, err
}
if r2, ok := r1.(*DmRows); ok {
return r2, nil
} else {
return nil, ECGO_NOT_QUERY_SQL.throw()
}
}
}
func (dc *DmConnection) prepare(query string) (stmt *DmStatement, err error) {
if err = dc.checkClosed(); err != nil {
return
}
if stmt, err = NewDmStmt(dc, query); err != nil {
return
}
err = stmt.prepare()
return
}
func (dc *DmConnection) prepareContext(ctx context.Context, query string) (*DmStatement, error) {
if err := dc.watchCancel(ctx); err != nil {
return nil, err
}
defer dc.finish()
return dc.prepare(query)
}
func (dc *DmConnection) resetSession(ctx context.Context) error {
if err := dc.watchCancel(ctx); err != nil {
return err
}
defer dc.finish()
err := dc.checkClosed()
if err != nil {
return err
}
return nil
}
func (dc *DmConnection) checkNamedValue(nv *driver.NamedValue) error {
var err error
var cvt = converter{dc, false}
nv.Value, err = cvt.ConvertValue(nv.Value)
dc.isBatch = cvt.isBatch
return err
}
func (dc *DmConnection) driverQuery(query string) (*DmStatement, *DmRows, error) {
stmt, err := NewDmStmt(dc, query)
if err != nil {
return nil, nil, err
}
stmt.innerUsed = true
stmt.innerExec = true
info, err := dc.Access.Dm_build_500(stmt, Dm_build_778)
if err != nil {
return nil, nil, err
}
dc.lastExecInfo = info
stmt.innerExec = false
return stmt, newDmRows(newInnerRows(0, stmt, info)), nil
}
func (dc *DmConnection) getIndexOnEPGroup() int32 {
if dc.dmConnector.group == nil || dc.dmConnector.group.epList == nil {
return -1
}
for i := 0; i < len(dc.dmConnector.group.epList); i++ {
ep := dc.dmConnector.group.epList[i]
if dc.dmConnector.host == ep.host && dc.dmConnector.port == ep.port {
return int32(i)
}
}
return -1
}
func (dc *DmConnection) getServerEncoding() string {
if dc.dmConnector.charCode != "" {
return dc.dmConnector.charCode
}
return dc.serverEncoding
}
func (dc *DmConnection) lobFetchAll() bool {
return dc.dmConnector.lobMode == 2
}
func (conn *DmConnection) CompatibleOracle() bool {
return conn.dmConnector.compatibleMode == COMPATIBLE_MODE_ORACLE
}
func (conn *DmConnection) CompatibleMysql() bool {
return conn.dmConnector.compatibleMode == COMPATIBLE_MODE_MYSQL
}
func (conn *DmConnection) cancel(err error) {
conn.canceled.Set(err)
conn.close()
}
func (conn *DmConnection) finish() {
if !conn.watching || conn.finished == nil {
return
}
select {
case conn.finished <- struct{}{}:
conn.watching = false
case <-conn.closech:
}
}
func (conn *DmConnection) startWatcher() {
watcher := make(chan context.Context, 1)
conn.watcher = watcher
finished := make(chan struct{})
conn.finished = finished
go func() {
for {
var ctx context.Context
select {
case ctx = <-watcher:
case <-conn.closech:
return
}
select {
case <-ctx.Done():
conn.cancel(ctx.Err())
case <-finished:
case <-conn.closech:
return
}
}
}()
}
func (conn *DmConnection) watchCancel(ctx context.Context) error {
if conn.watching {
conn.cleanup()
return nil
}
if err := ctx.Err(); err != nil {
return err
}
if ctx.Done() == nil {
return nil
}
if conn.watcher == nil {
return nil
}
conn.watching = true
conn.watcher <- ctx
return nil
}
type noCopy struct{}
func (*noCopy) Lock() {}
type atomicBool struct {
_noCopy noCopy
value uint32
}
func (ab *atomicBool) IsSet() bool {
return atomic.LoadUint32(&ab.value) > 0
}
func (ab *atomicBool) Set(value bool) {
if value {
atomic.StoreUint32(&ab.value, 1)
} else {
atomic.StoreUint32(&ab.value, 0)
}
}
func (ab *atomicBool) TrySet(value bool) bool {
if value {
return atomic.SwapUint32(&ab.value, 1) == 0
}
return atomic.SwapUint32(&ab.value, 0) > 0
}
type atomicError struct {
_noCopy noCopy
value atomic.Value
}
func (ae *atomicError) Set(value error) {
ae.value.Store(value)
}
func (ae *atomicError) Value() error {
if v := ae.value.Load(); v != nil {
return v.(error)
}
return nil
}
Go
1
https://gitee.com/java-dev101/gorm-dm8-dialect.git
git@gitee.com:java-dev101/gorm-dm8-dialect.git
java-dev101
gorm-dm8-dialect
gorm-dm8-dialect
v1.0.0

搜索帮助