1 Star 5 Fork 2

huangpeizhi2018/rosedb

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
txn_api.go 11.49 KB
一键复制 编辑 原始数据 按行查看 历史
Nang Feng 提交于 2021-10-13 20:59 . add redis-cli command
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
package rosedb
import (
"bytes"
"encoding/binary"
"github.com/roseduan/rosedb/storage"
"github.com/roseduan/rosedb/utils"
"time"
)
// Set see db_str.go:Set
func (tx *Txn) Set(key, value interface{}) (err error) {
encKey, encVal, err := tx.db.encode(key, value)
if err != nil {
return err
}
if err = tx.db.checkKeyValue(encKey, encVal); err != nil {
return
}
e := storage.NewEntryWithTxn(encKey, encVal, nil, String, StringSet, tx.id)
if err = tx.putEntry(e); err != nil {
return
}
return
}
// SetNx see db_str.go:SetNx
func (tx *Txn) SetNx(key, value interface{}) (ok bool, err error) {
encKey, encVal, err := tx.db.encode(key, value)
if err != nil {
return false, err
}
if err = tx.db.checkKeyValue(encKey, encVal); err != nil {
return
}
if tx.StrExists(encKey) {
return
}
if err = tx.Set(key, value); err == nil {
ok = true
}
return
}
// SetEx see db_str.go:SetEx
func (tx *Txn) SetEx(key, value interface{}, duration int64) (err error) {
encKey, encVal, err := tx.db.encode(key, value)
if err != nil {
return err
}
if err = tx.db.checkKeyValue(encKey, encVal); err != nil {
return
}
if duration <= 0 {
return ErrInvalidTTL
}
deadline := time.Now().Unix() + duration
e := storage.NewEntryWithTxn(encKey, encVal, nil, String, StringExpire, tx.id)
e.Timestamp = uint64(deadline)
if err = tx.putEntry(e); err != nil {
return
}
return
}
// Get see db_str.go:Get
func (tx *Txn) Get(key, dest interface{}) (err error) {
encKey, err := utils.EncodeKey(key)
if err != nil {
return err
}
var val []byte
if e, ok := tx.strEntries[string(encKey)]; ok {
if e.GetMark() == StringRem {
err = ErrKeyNotExist
return
}
if e.GetMark() == StringExpire && e.Timestamp < uint64(time.Now().Unix()) {
return
}
val = e.Meta.Value
} else {
val, err = tx.db.getVal(encKey)
}
if len(val) > 0 {
err = utils.DecodeValue(val, dest)
}
return
}
// GetSet see db_str.go:GetSet
func (tx *Txn) GetSet(key, value, dest interface{}) (err error) {
err = tx.Get(key, dest)
if err != nil && err != ErrKeyNotExist && err != ErrKeyExpired {
return
}
return tx.Set(key, value)
}
// Append see db_str.go:Append
func (tx *Txn) Append(key interface{}, value string) (err error) {
encKey, err := utils.EncodeKey(key)
if err != nil {
return err
}
if e, ok := tx.strEntries[string(encKey)]; ok && e.GetMark() != StringRem {
e.Meta.Value = append(e.Meta.Value, value...)
return
}
var existVal []byte
err = tx.Get(key, &existVal)
if err != nil && err != ErrKeyNotExist && err != ErrKeyExpired {
return err
}
existVal = append(existVal, []byte(value)...)
return tx.Set(key, existVal)
}
// StrExists see db_str.go:StrExists
func (tx *Txn) StrExists(key interface{}) (ok bool) {
encKey, err := utils.EncodeKey(key)
if err != nil {
return false
}
if e, ok := tx.strEntries[string(encKey)]; ok && e.GetMark() != StringRem {
return true
}
if tx.db.checkExpired(encKey, String) {
return false
}
ok = tx.db.strIndex.idxList.Exist(encKey)
return
}
// Remove see db_str.go:Remove
func (tx *Txn) Remove(key interface{}) (err error) {
encKey, err := utils.EncodeKey(key)
if err != nil {
return err
}
if err = tx.db.checkKeyValue(encKey, nil); err != nil {
return
}
if _, ok := tx.strEntries[string(encKey)]; ok {
delete(tx.strEntries, string(encKey))
return
}
e := storage.NewEntryWithTxn(encKey, nil, nil, String, StringRem, tx.id)
if err = tx.putEntry(e); err != nil {
return
}
return
}
// LPush see db_list.go:LPush
func (tx *Txn) LPush(key interface{}, values ...interface{}) (err error) {
encKey, err := utils.EncodeKey(key)
if err != nil {
return err
}
for _, v := range values {
var encVal []byte
if encVal, err = utils.EncodeValue(v); err != nil {
return
}
if err = tx.db.checkKeyValue(encKey, encVal); err != nil {
return
}
e := storage.NewEntryWithTxn(encKey, encVal, nil, List, ListLPush, tx.id)
if err = tx.putEntry(e); err != nil {
return
}
}
return
}
// RPush see db_list.go:RPush
func (tx *Txn) RPush(key interface{}, values ...interface{}) (err error) {
encKey, err := utils.EncodeKey(key)
if err != nil {
return err
}
for _, v := range values {
var encVal []byte
if encVal, err = utils.EncodeValue(v); err != nil {
return
}
if err = tx.db.checkKeyValue(encKey, encVal); err != nil {
return
}
e := storage.NewEntryWithTxn(encKey, encVal, nil, List, ListRPush, tx.id)
if err = tx.putEntry(e); err != nil {
return
}
}
return
}
// HSet see db_hash.go:HSet
func (tx *Txn) HSet(key, field, value interface{}) (err error) {
encKey, encVal, err := tx.db.encode(key, value)
if err != nil {
return err
}
if err = tx.db.checkKeyValue(encKey, encVal); err != nil {
return
}
encField, err := utils.EncodeValue(field)
if err != nil {
return err
}
// compare to the old val.
oldVal, err := tx.hGetVal(key, field)
if err != nil {
return err
}
if bytes.Compare(encVal, oldVal) == 0 {
return
}
e := storage.NewEntryWithTxn(encKey, encVal, encField, Hash, HashHSet, tx.id)
if err = tx.putEntry(e); err != nil {
return
}
eKey := tx.encodeKey(encKey, encField, Hash)
tx.keysMap[eKey] = len(tx.writeEntries) - 1
return
}
// HSetNx see db_hash.go:HSetNx
func (tx *Txn) HSetNx(key, field, value interface{}) (err error) {
if oldVal, err := tx.hGetVal(key, field); err == nil && len(oldVal) > 0 {
return err
}
encKey, encVal, err := tx.db.encode(key, value)
if err != nil {
return err
}
if err = tx.db.checkKeyValue(encKey, encVal); err != nil {
return
}
encField, err := utils.EncodeValue(field)
if err != nil {
return err
}
e := storage.NewEntryWithTxn(encKey, encVal, encField, Hash, HashHSet, tx.id)
if err = tx.putEntry(e); err != nil {
return
}
eKey := tx.encodeKey(encKey, encField, Hash)
tx.keysMap[eKey] = len(tx.writeEntries) - 1
return
}
// HGet see db_hash.go:HGet
func (tx *Txn) HGet(key, field, dest interface{}) (err error) {
val, err := tx.hGetVal(key, field)
if err != nil {
return err
}
if len(val) > 0 {
err = utils.DecodeValue(val, dest)
}
return
}
func (tx *Txn) hGetVal(key, field interface{}) (val []byte, err error) {
encKey, encField, err := tx.db.encode(key, field)
if err != nil {
return nil, err
}
eKey := tx.encodeKey(encKey, encField, Hash)
if idx, ok := tx.keysMap[eKey]; ok {
entry := tx.writeEntries[idx]
if entry.GetMark() == HashHDel {
return
}
val = entry.Meta.Value
return
}
if tx.db.checkExpired(encKey, Hash) {
return
}
val = tx.db.hashIndex.indexes.HGet(string(encKey), string(encField))
return
}
// HDel see db_hash.go:HDel
func (tx *Txn) HDel(key interface{}, fields ...interface{}) (err error) {
if key == nil || len(fields) == 0 {
return
}
encKey, err := utils.EncodeKey(key)
if err != nil {
return err
}
if tx.db.checkExpired(encKey, Hash) {
return
}
for _, field := range fields {
var encField []byte
if encField, err = utils.EncodeValue(field); err != nil {
return
}
eKey := tx.encodeKey(encKey, encField, Hash)
if idx, ok := tx.keysMap[eKey]; ok {
tx.skipIds[idx] = struct{}{}
}
e := storage.NewEntryWithTxn(encKey, nil, encField, Hash, HashHDel, tx.id)
if err = tx.putEntry(e); err != nil {
return
}
tx.keysMap[eKey] = len(tx.writeEntries) - 1
}
return
}
// HExists see db_hash.go:HExists
func (tx *Txn) HExists(key, field interface{}) (ok bool) {
encKey, encFiled, err := tx.db.encode(key, field)
if err != nil {
return false
}
eKey := tx.encodeKey(encKey, encFiled, Hash)
if idx, exist := tx.keysMap[eKey]; exist {
if tx.writeEntries[idx].GetMark() == HashHDel {
return
}
return true
}
if tx.db.checkExpired(encKey, Hash) {
return
}
ok = tx.db.hashIndex.indexes.HExists(string(encKey), string(encFiled))
return
}
// SAdd see db_set.go:SAdd
func (tx *Txn) SAdd(key interface{}, members ...interface{}) (err error) {
encKey, err := utils.EncodeKey(key)
if err != nil {
return err
}
for _, mem := range members {
var encMem []byte
if encMem, err = utils.EncodeValue(mem); err != nil {
return
}
if err = tx.db.checkKeyValue(encKey, encMem); err != nil {
return
}
if !tx.SIsMember(key, mem) {
e := storage.NewEntryWithTxn(encKey, encMem, nil, Set, SetSAdd, tx.id)
if err = tx.putEntry(e); err != nil {
return
}
encKey := tx.encodeKey(encKey, encMem, Set)
tx.keysMap[encKey] = len(tx.writeEntries) - 1
}
}
return
}
// SIsMember see db_set.go:SIsMember
func (tx *Txn) SIsMember(key, member interface{}) (ok bool) {
encKey, encMem, err := tx.db.encode(key, member)
if err != nil {
return
}
eKey := tx.encodeKey(encKey, encMem, Set)
if idx, exist := tx.keysMap[eKey]; exist {
entry := tx.writeEntries[idx]
if entry.GetMark() == SetSRem {
return
}
if bytes.Compare(entry.Meta.Value, encMem) == 0 {
return true
}
}
if tx.db.checkExpired(encKey, Set) {
return
}
ok = tx.db.setIndex.indexes.SIsMember(string(encKey), encMem)
return
}
// SRem see db_set.go:SRem
func (tx *Txn) SRem(key interface{}, members ...interface{}) (err error) {
encKey, err := utils.EncodeKey(key)
if err != nil {
return err
}
if tx.db.checkExpired(encKey, Set) {
return
}
for _, mem := range members {
var encMem []byte
if encMem, err = utils.EncodeValue(mem); err != nil {
return
}
eKey := tx.encodeKey(encKey, encMem, Set)
if idx, ok := tx.keysMap[eKey]; ok {
tx.skipIds[idx] = struct{}{}
}
e := storage.NewEntryWithTxn(encKey, encMem, nil, Set, SetSRem, tx.id)
if err = tx.putEntry(e); err != nil {
return
}
tx.keysMap[eKey] = len(tx.writeEntries) - 1
}
return
}
// ZScore see db_zset.go/ZAdd
func (tx *Txn) ZAdd(key interface{}, score float64, member interface{}) (err error) {
encKey, encMember, err := tx.db.encode(key, member)
if err != nil {
return err
}
ok, oldScore, err := tx.ZScore(key, member)
if err != nil {
return err
}
if ok && oldScore == score {
return
}
extra := []byte(utils.Float64ToStr(score))
e := storage.NewEntryWithTxn(encKey, encMember, extra, ZSet, ZSetZAdd, tx.id)
if err = tx.putEntry(e); err != nil {
return
}
eKey := tx.encodeKey(encKey, encMember, ZSet)
tx.keysMap[eKey] = len(tx.writeEntries) - 1
return
}
// ZScore see db_zset.go/ZScore
func (tx *Txn) ZScore(key, member interface{}) (exist bool, score float64, err error) {
encKey, encMember, err := tx.db.encode(key, member)
if err != nil {
return false, 0, err
}
eKey := tx.encodeKey(encKey, encMember, ZSet)
if idx, ok := tx.keysMap[eKey]; ok {
entry := tx.writeEntries[idx]
if entry.GetMark() == ZSetZRem {
return
}
score, err = utils.StrToFloat64(string(entry.Meta.Extra))
if err != nil {
return
}
}
if tx.db.checkExpired(encKey, ZSet) {
err = ErrKeyExpired
return
}
exist, score = tx.db.zsetIndex.indexes.ZScore(string(encKey), string(encMember))
return
}
// ZRem see db_zset.go/ZRem
func (tx *Txn) ZRem(key, member interface{}) (err error) {
encKey, encMember, err := tx.db.encode(key, member)
if err != nil {
return err
}
if tx.db.checkExpired(encKey, ZSet) {
return
}
eKey := tx.encodeKey(encKey, encMember, ZSet)
if idx, ok := tx.keysMap[eKey]; ok {
tx.skipIds[idx] = struct{}{}
}
e := storage.NewEntryWithTxn(encKey, encMember, nil, ZSet, ZSetZRem, tx.id)
if err = tx.putEntry(e); err != nil {
return
}
tx.keysMap[eKey] = len(tx.writeEntries) - 1
return
}
func (tx *Txn) encodeKey(key, extra []byte, dType DataType) string {
keyLen, extraLen := len(key), len(extra)
buf := make([]byte, keyLen+extraLen+2)
binary.BigEndian.PutUint16(buf[:2], dType)
copy(buf[2:keyLen+2], key)
if extraLen > 0 {
copy(buf[keyLen:keyLen+extraLen+2], extra)
}
return string(buf)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/huangpeizhi2018/rosedb.git
git@gitee.com:huangpeizhi2018/rosedb.git
huangpeizhi2018
rosedb
rosedb
master

搜索帮助

0d507c66 1850385 C8b1a773 1850385