From 7a01c9c62df8ae26f1455d4da141b5d6ac6bb5bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Roth?= Date: Thu, 25 Jul 2024 11:59:38 +0200 Subject: [PATCH] etcd: implement batch operations - cache the operations internally in a list - Write() applies the list to etcd --- database/etcddb/batch.go | 27 +++++++++++++++++++++++---- database/etcddb/storage.go | 2 +- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/database/etcddb/batch.go b/database/etcddb/batch.go index e50c1162..24b83de9 100644 --- a/database/etcddb/batch.go +++ b/database/etcddb/batch.go @@ -6,7 +6,8 @@ import ( ) type EtcDBatch struct { - db *clientv3.Client + s *EtcDStorage + ops []clientv3.Op } type WriteOptions struct { @@ -14,17 +15,35 @@ type WriteOptions struct { Sync bool } -func (b *EtcDBatch) Put(key, value []byte) (err error) { - _, err = b.db.Put(Ctx, string(key), string(value)) +func (b *EtcDBatch) Put(key []byte, value []byte) (err error) { + b.ops = append(b.ops, clientv3.OpPut(string(key), string(value))) return } func (b *EtcDBatch) Delete(key []byte) (err error) { - _, err = b.db.Delete(Ctx, string(key)) + b.ops = append(b.ops, clientv3.OpDelete(string(key))) return } func (b *EtcDBatch) Write() (err error) { + kv := clientv3.NewKV(b.s.db) + + batchSize := 128 + for i := 0; i < len(b.ops); i += batchSize { + txn := kv.Txn(Ctx) + end := i + batchSize + if end > len(b.ops) { + end = len(b.ops) + } + + batch := b.ops[i:end] + txn.Then(batch...) + _, err = txn.Commit() + if err != nil { + panic(err) + } + } + return } diff --git a/database/etcddb/storage.go b/database/etcddb/storage.go index 221b9ebd..b1faa885 100644 --- a/database/etcddb/storage.go +++ b/database/etcddb/storage.go @@ -163,7 +163,7 @@ func (s *EtcDStorage) CreateBatch() database.Batch { return nil } return &EtcDBatch{ - db: s.db, + s: s, } }