Batch writes/deletes in LevelDB.

This commit is contained in:
Andrey Smirnov
2014-02-11 21:11:15 +04:00
parent 6b02b18c8e
commit bff299d268
2 changed files with 66 additions and 1 deletions

View File

@@ -22,10 +22,13 @@ type Storage interface {
KeysByPrefix(prefix []byte) [][]byte
FetchByPrefix(prefix []byte) [][]byte
Close() error
StartBatch()
FinishBatch() error
}
type levelDB struct {
db *leveldb.DB
db *leveldb.DB
batch *leveldb.Batch
}
// Check interface
@@ -59,10 +62,18 @@ func (l *levelDB) Get(key []byte) ([]byte, error) {
}
func (l *levelDB) Put(key []byte, value []byte) error {
if l.batch != nil {
l.batch.Put(key, value)
return nil
}
return l.db.Put(key, value, nil)
}
func (l *levelDB) Delete(key []byte) error {
if l.batch != nil {
l.batch.Delete(key)
return nil
}
return l.db.Delete(key, nil)
}
@@ -106,3 +117,19 @@ func (l *levelDB) FetchByPrefix(prefix []byte) [][]byte {
func (l *levelDB) Close() error {
return l.db.Close()
}
func (l *levelDB) StartBatch() {
if l.batch != nil {
panic("batch already started")
}
l.batch = new(leveldb.Batch)
}
func (l *levelDB) FinishBatch() error {
if l.batch == nil {
panic("no batch")
}
err := l.db.Write(l.batch, nil)
l.batch = nil
return err
}

View File

@@ -84,3 +84,41 @@ func (s *LevelDBSuite) TestByPrefix(c *C) {
c.Check(s.db.FetchByPrefix([]byte{0xa0}), DeepEquals, [][]byte{})
c.Check(s.db.KeysByPrefix([]byte{0xa0}), DeepEquals, [][]byte{})
}
func (s *LevelDBSuite) TestBatch(c *C) {
var (
key = []byte("key")
key2 = []byte("key2")
value = []byte("value")
value2 = []byte("value2")
)
err := s.db.Put(key, value)
c.Assert(err, IsNil)
s.db.StartBatch()
s.db.Put(key2, value2)
s.db.Delete(key)
v, err := s.db.Get(key)
c.Check(err, IsNil)
c.Check(v, DeepEquals, value)
_, err = s.db.Get(key2)
c.Check(err, ErrorMatches, "key not found")
err = s.db.FinishBatch()
c.Check(err, IsNil)
v2, err := s.db.Get(key2)
c.Check(err, IsNil)
c.Check(v2, DeepEquals, value2)
_, err = s.db.Get(key)
c.Check(err, ErrorMatches, "key not found")
c.Check(func() { s.db.FinishBatch() }, Panics, "no batch")
s.db.StartBatch()
c.Check(func() { s.db.StartBatch() }, Panics, "batch already started")
}