etcd: implement batch operations

- cache the operations internally in a list
- Write() applies the list to etcd
This commit is contained in:
André Roth
2024-07-25 11:59:38 +02:00
parent 9768ecef22
commit 7a01c9c62d
2 changed files with 24 additions and 5 deletions

View File

@@ -6,7 +6,8 @@ import (
)
type EtcDBatch struct {
db *clientv3.Client
s *EtcDStorage
ops []clientv3.Op
}
type WriteOptions struct {
@@ -14,17 +15,35 @@ type WriteOptions struct {
Sync bool
}
func (b *EtcDBatch) Put(key, value []byte) (err error) {
_, err = b.db.Put(Ctx, string(key), string(value))
func (b *EtcDBatch) Put(key []byte, value []byte) (err error) {
b.ops = append(b.ops, clientv3.OpPut(string(key), string(value)))
return
}
func (b *EtcDBatch) Delete(key []byte) (err error) {
_, err = b.db.Delete(Ctx, string(key))
b.ops = append(b.ops, clientv3.OpDelete(string(key)))
return
}
func (b *EtcDBatch) Write() (err error) {
kv := clientv3.NewKV(b.s.db)
batchSize := 128
for i := 0; i < len(b.ops); i += batchSize {
txn := kv.Txn(Ctx)
end := i + batchSize
if end > len(b.ops) {
end = len(b.ops)
}
batch := b.ops[i:end]
txn.Then(batch...)
_, err = txn.Commit()
if err != nil {
panic(err)
}
}
return
}

View File

@@ -163,7 +163,7 @@ func (s *EtcDStorage) CreateBatch() database.Batch {
return nil
}
return &EtcDBatch{
db: s.db,
s: s,
}
}