1 Star 0 Fork 22

penggy/dm

forked from springrain/dm 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
n.go 17.73 KB
一键复制 编辑 原始数据 按行查看 历史
springrain 提交于 2022-09-21 12:05 . v1.8.7 来自 达梦8.1.2.128
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867
/*
* Copyright (c) 2000-2018, 达梦数据库有限公司.
* All rights reserved.
*/
package dm
import (
"bytes"
"context"
"database/sql"
"database/sql/driver"
"fmt"
"sync/atomic"
"gitee.com/chunanyong/dm/parser"
"golang.org/x/text/encoding"
)
type DmConnection struct {
filterable
dmConnector *DmConnector
Access *dm_build_1312
stmtMap map[int32]*DmStatement
stmtPool []stmtPoolInfo
lastExecInfo *execRetInfo
lexer *parser.Lexer
encode encoding.Encoding
encodeBuffer *bytes.Buffer
transformReaderDst []byte
transformReaderSrc []byte
serverEncoding string
GlobalServerSeries int
ServerVersion string
Malini2 bool
Execute2 bool
LobEmptyCompOrcl bool
IsoLevel int32
ReadOnly bool
NewLobFlag bool
sslEncrypt int
MaxRowSize int32
DDLAutoCommit bool
BackslashEscape bool
SvrStat int32
SvrMode int32
ConstParaOpt bool
DbTimezone int16
LifeTimeRemainder int16
InstanceName string
Schema string
LastLoginIP string
LastLoginTime string
FailedAttempts int32
LoginWarningID int32
GraceTimeRemainder int32
Guid string
DbName string
StandbyHost string
StandbyPort int32
StandbyCount int32
SessionID int64
OracleDateLanguage byte
FormatDate string
FormatTimestamp string
FormatTimestampTZ string
FormatTime string
FormatTimeTZ string
Local bool
MsgVersion int32
TrxStatus int32
dscControl bool
trxFinish bool
sessionID int64
autoCommit bool
isBatch bool
watching bool
watcher chan<- context.Context
closech chan struct{}
finished chan<- struct{}
canceled atomicError
closed atomicBool
}
func (conn *DmConnection) setTrxFinish(status int32) {
switch status & Dm_build_125 {
case Dm_build_122, Dm_build_123, Dm_build_124:
conn.trxFinish = true
default:
conn.trxFinish = false
}
}
func (dmConn *DmConnection) init() {
if dmConn.dmConnector.stmtPoolMaxSize > 0 {
dmConn.stmtPool = make([]stmtPoolInfo, 0, dmConn.dmConnector.stmtPoolMaxSize)
}
dmConn.stmtMap = make(map[int32]*DmStatement)
dmConn.DbTimezone = 0
dmConn.GlobalServerSeries = 0
dmConn.MaxRowSize = 0
dmConn.LobEmptyCompOrcl = false
dmConn.ReadOnly = false
dmConn.DDLAutoCommit = false
dmConn.ConstParaOpt = false
dmConn.IsoLevel = -1
dmConn.sessionID = -1
dmConn.Malini2 = true
dmConn.NewLobFlag = true
dmConn.Execute2 = true
dmConn.serverEncoding = ENCODING_GB18030
dmConn.TrxStatus = Dm_build_73
dmConn.OracleDateLanguage = byte(Locale)
dmConn.lastExecInfo = NewExceInfo()
dmConn.MsgVersion = Dm_build_6
dmConn.idGenerator = dmConnIDGenerator
}
func (dmConn *DmConnection) reset() {
dmConn.DbTimezone = 0
dmConn.GlobalServerSeries = 0
dmConn.MaxRowSize = 0
dmConn.LobEmptyCompOrcl = false
dmConn.ReadOnly = false
dmConn.DDLAutoCommit = false
dmConn.ConstParaOpt = false
dmConn.IsoLevel = -1
dmConn.sessionID = -1
dmConn.Malini2 = true
dmConn.NewLobFlag = true
dmConn.Execute2 = true
dmConn.serverEncoding = ENCODING_GB18030
dmConn.TrxStatus = Dm_build_73
}
func (dc *DmConnection) checkClosed() error {
if dc.closed.IsSet() {
return driver.ErrBadConn
}
return nil
}
func (dc *DmConnection) executeInner(query string, execType int16) (interface{}, error) {
stmt, err := NewDmStmt(dc, query)
if err != nil {
return nil, err
}
if execType == Dm_build_90 {
defer stmt.close()
}
stmt.innerUsed = true
if stmt.dmConn.dmConnector.escapeProcess {
stmt.nativeSql, err = stmt.dmConn.escape(stmt.nativeSql, stmt.dmConn.dmConnector.keyWords)
if err != nil {
stmt.close()
return nil, err
}
}
var optParamList []OptParameter
if stmt.dmConn.ConstParaOpt {
optParamList = make([]OptParameter, 0)
stmt.nativeSql, optParamList, err = stmt.dmConn.execOpt(stmt.nativeSql, optParamList, stmt.dmConn.getServerEncoding())
if err != nil {
stmt.close()
optParamList = nil
}
}
if execType == Dm_build_89 && dc.dmConnector.enRsCache {
rpv, err := rp.get(stmt, query)
if err != nil {
return nil, err
}
if rpv != nil {
stmt.execInfo = rpv.execInfo
dc.lastExecInfo = rpv.execInfo
return newDmRows(rpv.getResultSet(stmt)), nil
}
}
var info *execRetInfo
if optParamList != nil && len(optParamList) > 0 {
info, err = dc.Access.Dm_build_1391(stmt, optParamList)
if err != nil {
stmt.nativeSql = query
info, err = dc.Access.Dm_build_1397(stmt, execType)
}
} else {
info, err = dc.Access.Dm_build_1397(stmt, execType)
}
if err != nil {
stmt.close()
return nil, err
}
dc.lastExecInfo = info
if info.hasResultSet {
return newDmRows(newInnerRows(0, stmt, info)), nil
} else {
return newDmResult(stmt, info), nil
}
}
func g2dbIsoLevel(isoLevel int32) int32 {
switch isoLevel {
case 1:
return Dm_build_77
case 2:
return Dm_build_78
case 4:
return Dm_build_79
case 6:
return Dm_build_80
default:
return -1
}
}
func (dc *DmConnection) Begin() (driver.Tx, error) {
if len(dc.filterChain.filters) == 0 {
return dc.begin()
} else {
return dc.filterChain.reset().DmConnectionBegin(dc)
}
}
func (dc *DmConnection) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
if len(dc.filterChain.filters) == 0 {
return dc.beginTx(ctx, opts)
}
return dc.filterChain.reset().DmConnectionBeginTx(dc, ctx, opts)
}
func (dc *DmConnection) Commit() error {
if len(dc.filterChain.filters) == 0 {
return dc.commit()
} else {
return dc.filterChain.reset().DmConnectionCommit(dc)
}
}
func (dc *DmConnection) Rollback() error {
if len(dc.filterChain.filters) == 0 {
return dc.rollback()
} else {
return dc.filterChain.reset().DmConnectionRollback(dc)
}
}
func (dc *DmConnection) Close() error {
if len(dc.filterChain.filters) == 0 {
return dc.close()
} else {
return dc.filterChain.reset().DmConnectionClose(dc)
}
}
func (dc *DmConnection) Ping(ctx context.Context) error {
if len(dc.filterChain.filters) == 0 {
return dc.ping(ctx)
} else {
return dc.filterChain.reset().DmConnectionPing(dc, ctx)
}
}
func (dc *DmConnection) Exec(query string, args []driver.Value) (driver.Result, error) {
if len(dc.filterChain.filters) == 0 {
return dc.exec(query, args)
}
return dc.filterChain.reset().DmConnectionExec(dc, query, args)
}
func (dc *DmConnection) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
if len(dc.filterChain.filters) == 0 {
return dc.execContext(ctx, query, args)
}
return dc.filterChain.reset().DmConnectionExecContext(dc, ctx, query, args)
}
func (dc *DmConnection) Query(query string, args []driver.Value) (driver.Rows, error) {
if len(dc.filterChain.filters) == 0 {
return dc.query(query, args)
}
return dc.filterChain.reset().DmConnectionQuery(dc, query, args)
}
func (dc *DmConnection) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
if len(dc.filterChain.filters) == 0 {
return dc.queryContext(ctx, query, args)
}
return dc.filterChain.reset().DmConnectionQueryContext(dc, ctx, query, args)
}
func (dc *DmConnection) Prepare(query string) (driver.Stmt, error) {
if len(dc.filterChain.filters) == 0 {
return dc.prepare(query)
}
return dc.filterChain.reset().DmConnectionPrepare(dc, query)
}
func (dc *DmConnection) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
if len(dc.filterChain.filters) == 0 {
return dc.prepareContext(ctx, query)
}
return dc.filterChain.reset().DmConnectionPrepareContext(dc, ctx, query)
}
func (dc *DmConnection) ResetSession(ctx context.Context) error {
if len(dc.filterChain.filters) == 0 {
return dc.resetSession(ctx)
}
return dc.filterChain.reset().DmConnectionResetSession(dc, ctx)
}
func (dc *DmConnection) CheckNamedValue(nv *driver.NamedValue) error {
if len(dc.filterChain.filters) == 0 {
return dc.checkNamedValue(nv)
}
return dc.filterChain.reset().DmConnectionCheckNamedValue(dc, nv)
}
func (dc *DmConnection) begin() (*DmConnection, error) {
return dc.beginTx(context.Background(), driver.TxOptions{driver.IsolationLevel(sql.LevelDefault), false})
}
func (dc *DmConnection) beginTx(ctx context.Context, opts driver.TxOptions) (*DmConnection, error) {
if err := dc.watchCancel(ctx); err != nil {
return nil, err
}
defer dc.finish()
err := dc.checkClosed()
if err != nil {
return nil, err
}
dc.autoCommit = false
if sql.IsolationLevel(opts.Isolation) == sql.LevelDefault {
opts.Isolation = driver.IsolationLevel(sql.LevelReadCommitted)
}
dc.ReadOnly = opts.ReadOnly
if dc.IsoLevel == int32(opts.Isolation) {
return dc, nil
}
switch sql.IsolationLevel(opts.Isolation) {
case sql.LevelDefault, sql.LevelReadUncommitted:
return dc, nil
case sql.LevelReadCommitted, sql.LevelSerializable:
dc.IsoLevel = int32(opts.Isolation)
case sql.LevelRepeatableRead:
if dc.CompatibleMysql() {
dc.IsoLevel = int32(sql.LevelReadCommitted)
} else {
return nil, ECGO_INVALID_TRAN_ISOLATION.throw()
}
default:
return nil, ECGO_INVALID_TRAN_ISOLATION.throw()
}
err = dc.Access.Dm_build_1451(dc)
if err != nil {
return nil, err
}
return dc, nil
}
func (dc *DmConnection) commit() error {
err := dc.checkClosed()
if err != nil {
return err
}
defer func() {
dc.autoCommit = dc.dmConnector.autoCommit
}()
if !dc.autoCommit {
err = dc.Access.Commit()
if err != nil {
return err
}
dc.trxFinish = true
return nil
} else if !dc.dmConnector.alwayseAllowCommit {
return ECGO_COMMIT_IN_AUTOCOMMIT_MODE.throw()
}
return nil
}
func (dc *DmConnection) rollback() error {
err := dc.checkClosed()
if err != nil {
return err
}
defer func() {
dc.autoCommit = dc.dmConnector.autoCommit
}()
if !dc.autoCommit {
err = dc.Access.Rollback()
if err != nil {
return err
}
dc.trxFinish = true
return nil
} else if !dc.dmConnector.alwayseAllowCommit {
return ECGO_ROLLBACK_IN_AUTOCOMMIT_MODE.throw()
}
return nil
}
func (dc *DmConnection) reconnect() error {
err := dc.Access.Close()
if err != nil {
return err
}
for _, stmt := range dc.stmtMap {
stmt.closed = true
for id, _ := range stmt.rsMap {
delete(stmt.rsMap, id)
}
}
if dc.stmtPool != nil {
dc.stmtPool = dc.stmtPool[:0]
}
dc.dmConnector.reConnection = dc
if dc.dmConnector.group != nil {
_, err = dc.dmConnector.group.connect(dc.dmConnector)
if err != nil {
return err
}
} else {
_, err = dc.dmConnector.connect(context.Background())
}
for _, stmt := range dc.stmtMap {
err = dc.Access.Dm_build_1369(stmt)
if err != nil {
return err
}
if stmt.paramCount > 0 {
err = stmt.prepare()
if err != nil {
return err
}
}
}
return nil
}
func (dc *DmConnection) cleanup() {
dc.close()
}
func (dc *DmConnection) close() error {
if !dc.closed.TrySet(true) {
return nil
}
close(dc.closech)
if dc.Access == nil {
return nil
}
dc.rollback()
for _, stmt := range dc.stmtMap {
stmt.free()
}
if dc.stmtPool != nil {
for _, spi := range dc.stmtPool {
dc.Access.Dm_build_1374(spi.id)
}
dc.stmtPool = nil
}
dc.Access.Close()
return nil
}
func (dc *DmConnection) ping(ctx context.Context) error {
if err := dc.watchCancel(ctx); err != nil {
return err
}
defer dc.finish()
rows, err := dc.query("select 1", nil)
if err != nil {
return err
}
return rows.close()
}
func (dc *DmConnection) exec(query string, args []driver.Value) (*DmResult, error) {
err := dc.checkClosed()
if err != nil {
return nil, err
}
if args != nil && len(args) > 0 {
stmt, err := dc.prepare(query)
defer stmt.close()
if err != nil {
return nil, err
}
dc.lastExecInfo = stmt.execInfo
return stmt.exec(args)
} else {
r1, err := dc.executeInner(query, Dm_build_90)
if err != nil {
return nil, err
}
if r2, ok := r1.(*DmResult); ok {
return r2, nil
} else {
return nil, ECGO_NOT_EXEC_SQL.throw()
}
}
}
func (dc *DmConnection) execContext(ctx context.Context, query string, args []driver.NamedValue) (*DmResult, error) {
if err := dc.watchCancel(ctx); err != nil {
return nil, err
}
defer dc.finish()
err := dc.checkClosed()
if err != nil {
return nil, err
}
if args != nil && len(args) > 0 {
stmt, err := dc.prepare(query)
defer stmt.close()
if err != nil {
return nil, err
}
dc.lastExecInfo = stmt.execInfo
return stmt.execContext(ctx, args)
} else {
r1, err := dc.executeInner(query, Dm_build_90)
if err != nil {
return nil, err
}
if r2, ok := r1.(*DmResult); ok {
return r2, nil
} else {
return nil, ECGO_NOT_EXEC_SQL.throw()
}
}
}
func (dc *DmConnection) query(query string, args []driver.Value) (*DmRows, error) {
err := dc.checkClosed()
if err != nil {
return nil, err
}
if args != nil && len(args) > 0 {
stmt, err := dc.prepare(query)
if err != nil {
stmt.close()
return nil, err
}
dc.lastExecInfo = stmt.execInfo
stmt.innerUsed = true
return stmt.query(args)
} else {
r1, err := dc.executeInner(query, Dm_build_89)
if err != nil {
return nil, err
}
if r2, ok := r1.(*DmRows); ok {
return r2, nil
} else {
return nil, ECGO_NOT_QUERY_SQL.throw()
}
}
}
func (dc *DmConnection) queryContext(ctx context.Context, query string, args []driver.NamedValue) (*DmRows, error) {
if err := dc.watchCancel(ctx); err != nil {
return nil, err
}
defer dc.finish()
err := dc.checkClosed()
if err != nil {
return nil, err
}
if args != nil && len(args) > 0 {
stmt, err := dc.prepare(query)
if err != nil {
stmt.close()
return nil, err
}
dc.lastExecInfo = stmt.execInfo
stmt.innerUsed = true
return stmt.queryContext(ctx, args)
} else {
r1, err := dc.executeInner(query, Dm_build_89)
if err != nil {
return nil, err
}
if r2, ok := r1.(*DmRows); ok {
return r2, nil
} else {
return nil, ECGO_NOT_QUERY_SQL.throw()
}
}
}
func (dc *DmConnection) prepare(query string) (*DmStatement, error) {
err := dc.checkClosed()
if err != nil {
return nil, err
}
stmt, err := NewDmStmt(dc, query)
if err != nil {
return nil, err
}
err = stmt.prepare()
return stmt, err
}
func (dc *DmConnection) prepareContext(ctx context.Context, query string) (*DmStatement, error) {
if err := dc.watchCancel(ctx); err != nil {
return nil, err
}
defer dc.finish()
err := dc.checkClosed()
if err != nil {
return nil, err
}
stmt, err := dc.prepare(query)
if err != nil {
return nil, err
}
return stmt, nil
}
func (dc *DmConnection) resetSession(ctx context.Context) error {
err := dc.checkClosed()
if err != nil {
return err
}
for _, stmt := range dc.stmtMap {
stmt.inUse = false
}
return nil
}
func (dc *DmConnection) checkNamedValue(nv *driver.NamedValue) error {
var err error
var cvt = converter{dc, false}
nv.Value, err = cvt.ConvertValue(nv.Value)
dc.isBatch = cvt.isBatch
return err
}
func (dc *DmConnection) driverQuery(query string) (*DmStatement, *DmRows, error) {
stmt, err := NewDmStmt(dc, query)
if err != nil {
return nil, nil, err
}
stmt.innerUsed = true
stmt.innerExec = true
info, err := dc.Access.Dm_build_1397(stmt, Dm_build_89)
if err != nil {
return nil, nil, err
}
dc.lastExecInfo = info
stmt.innerExec = false
return stmt, newDmRows(newInnerRows(0, stmt, info)), nil
}
func (dc *DmConnection) getIndexOnEPGroup() int32 {
if dc.dmConnector.group == nil || dc.dmConnector.group.epList == nil {
return -1
}
for i := 0; i < len(dc.dmConnector.group.epList); i++ {
ep := dc.dmConnector.group.epList[i]
if dc.dmConnector.host == ep.host && dc.dmConnector.port == ep.port {
return int32(i)
}
}
return -1
}
func (dc *DmConnection) getServerEncoding() string {
if dc.dmConnector.charCode != "" {
return dc.dmConnector.charCode
}
return dc.serverEncoding
}
func (dc *DmConnection) lobFetchAll() bool {
return dc.dmConnector.lobMode == 2
}
func (conn *DmConnection) CompatibleOracle() bool {
return conn.dmConnector.compatibleMode == COMPATIBLE_MODE_ORACLE
}
func (conn *DmConnection) CompatibleMysql() bool {
return conn.dmConnector.compatibleMode == COMPATIBLE_MODE_MYSQL
}
func (conn *DmConnection) cancel(err error) {
conn.canceled.Set(err)
fmt.Println(conn.close())
}
func (conn *DmConnection) finish() {
if !conn.watching || conn.finished == nil {
return
}
select {
case conn.finished <- struct{}{}:
conn.watching = false
case <-conn.closech:
}
}
func (conn *DmConnection) startWatcher() {
watcher := make(chan context.Context, 1)
conn.watcher = watcher
finished := make(chan struct{})
conn.finished = finished
go func() {
for {
var ctx context.Context
select {
case ctx = <-watcher:
case <-conn.closech:
return
}
select {
case <-ctx.Done():
conn.cancel(ctx.Err())
case <-finished:
case <-conn.closech:
return
}
}
}()
}
func (conn *DmConnection) watchCancel(ctx context.Context) error {
if conn.watching {
conn.cleanup()
return nil
}
if err := ctx.Err(); err != nil {
return err
}
if ctx.Done() == nil {
return nil
}
if conn.watcher == nil {
return nil
}
conn.watching = true
conn.watcher <- ctx
return nil
}
type noCopy struct{}
func (*noCopy) Lock() {}
type atomicBool struct {
_noCopy noCopy
value uint32
}
func (ab *atomicBool) IsSet() bool {
return atomic.LoadUint32(&ab.value) > 0
}
func (ab *atomicBool) Set(value bool) {
if value {
atomic.StoreUint32(&ab.value, 1)
} else {
atomic.StoreUint32(&ab.value, 0)
}
}
func (ab *atomicBool) TrySet(value bool) bool {
if value {
return atomic.SwapUint32(&ab.value, 1) == 0
}
return atomic.SwapUint32(&ab.value, 0) > 0
}
type atomicError struct {
_noCopy noCopy
value atomic.Value
}
func (ae *atomicError) Set(value error) {
ae.value.Store(value)
}
func (ae *atomicError) Value() error {
if v := ae.value.Load(); v != nil {
return v.(error)
}
return nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/penggy/dm.git
git@gitee.com:penggy/dm.git
penggy
dm
dm
master

搜索帮助