Refactor database code to support standalone batches, transactions.

This is spin-off of changes from #459.

Transactions are not being used yet, but batches are updated to work
with the new API.

`database/` package was refactored to split abstract interfaces and
implementation via goleveldb. This should make it easier to implement
new database types.
This commit is contained in:
Andrey Smirnov
2019-08-02 00:10:36 +03:00
committed by Andrey Smirnov
parent 26098f6c8d
commit 67e38955ae
23 changed files with 539 additions and 315 deletions
+69
View File
@@ -0,0 +1,69 @@
// Package database provides KV database for meta-information
package database
import "errors"
// Errors for Storage
var (
ErrNotFound = errors.New("key not found")
)
// StorageProcessor is a function to process one single storage entry
type StorageProcessor func(key []byte, value []byte) error
// Reader provides KV read calls
type Reader interface {
Get(key []byte) ([]byte, error)
}
// PrefixReader provides prefixed operations
type PrefixReader interface {
HasPrefix(prefix []byte) bool
ProcessByPrefix(prefix []byte, proc StorageProcessor) error
KeysByPrefix(prefix []byte) [][]byte
FetchByPrefix(prefix []byte) [][]byte
}
// Writer provides KV update/delete calls
type Writer interface {
Put(key []byte, value []byte) error
Delete(key []byte) error
}
// Storage is an interface to KV storage
type Storage interface {
Reader
Writer
PrefixReader
CreateBatch() Batch
OpenTransaction() (Transaction, error)
CreateTemporary() (Storage, error)
Open() error
Close() error
CompactDB() error
Drop() error
}
// Batch provides a way to pack many writes.
type Batch interface {
Writer
// Write closes batch and send accumulated writes to the database
Write() error
}
// Transaction provides isolated atomic way to perform updates.
//
// Transactions might be expensive.
// Transaction should always finish with either Discard() or Commit()
type Transaction interface {
Reader
Writer
Commit() error
Discard()
}
+34
View File
@@ -0,0 +1,34 @@
package goleveldb
import (
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/aptly-dev/aptly/database"
)
type batch struct {
db *leveldb.DB
b *leveldb.Batch
}
func (b *batch) Put(key, value []byte) error {
b.b.Put(key, value)
return nil
}
func (b *batch) Delete(key []byte) error {
b.b.Delete(key)
return nil
}
func (b *batch) Write() error {
return b.db.Write(b.b, &opt.WriteOptions{})
}
// batch should implement database.Batch
var (
_ database.Batch = &batch{}
)
+58
View File
@@ -0,0 +1,58 @@
package goleveldb
import (
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
leveldbstorage "github.com/syndtr/goleveldb/leveldb/storage"
"github.com/aptly-dev/aptly/database"
)
func internalOpen(path string, throttleCompaction bool) (*leveldb.DB, error) {
o := &opt.Options{
Filter: filter.NewBloomFilter(10),
OpenFilesCacheCapacity: 256,
}
if throttleCompaction {
o.CompactionL0Trigger = 32
o.WriteL0PauseTrigger = 96
o.WriteL0SlowdownTrigger = 64
}
return leveldb.OpenFile(path, o)
}
// NewDB creates new instance of DB, but doesn't open it (yet)
func NewDB(path string) (database.Storage, error) {
return &storage{path: path}, nil
}
// NewOpenDB creates new instance of DB and opens it
func NewOpenDB(path string) (database.Storage, error) {
db, err := NewDB(path)
if err != nil {
return nil, err
}
return db, db.Open()
}
// RecoverDB recovers LevelDB database from corruption
func RecoverDB(path string) error {
stor, err := leveldbstorage.OpenFile(path, false)
if err != nil {
return err
}
db, err := leveldb.Recover(stor, nil)
if err != nil {
return err
}
db.Close()
stor.Close()
return nil
}
@@ -1,9 +1,12 @@
package database
package goleveldb_test
import (
"testing"
. "gopkg.in/check.v1"
"github.com/aptly-dev/aptly/database"
"github.com/aptly-dev/aptly/database/goleveldb"
)
// Launch gocheck tests
@@ -13,7 +16,7 @@ func Test(t *testing.T) {
type LevelDBSuite struct {
path string
db Storage
db database.Storage
}
var _ = Suite(&LevelDBSuite{})
@@ -22,7 +25,7 @@ func (s *LevelDBSuite) SetUpTest(c *C) {
var err error
s.path = c.MkDir()
s.db, err = NewOpenDB(s.path)
s.db, err = goleveldb.NewOpenDB(s.path)
c.Assert(err, IsNil)
}
@@ -43,10 +46,10 @@ func (s *LevelDBSuite) TestRecoverDB(c *C) {
err = s.db.Close()
c.Check(err, IsNil)
err = RecoverDB(s.path)
err = goleveldb.RecoverDB(s.path)
c.Check(err, IsNil)
s.db, err = NewOpenDB(s.path)
s.db, err = goleveldb.NewOpenDB(s.path)
c.Check(err, IsNil)
result, err := s.db.Get(key)
@@ -143,11 +146,11 @@ func (s *LevelDBSuite) TestByPrefix(c *C) {
c.Check(keys, DeepEquals, [][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}})
c.Check(s.db.ProcessByPrefix([]byte{0x80}, func(k, v []byte) error {
return ErrNotFound
}), Equals, ErrNotFound)
return database.ErrNotFound
}), Equals, database.ErrNotFound)
c.Check(s.db.ProcessByPrefix([]byte{0xa0}, func(k, v []byte) error {
return ErrNotFound
return database.ErrNotFound
}), IsNil)
c.Check(s.db.FetchByPrefix([]byte{0xa0}), DeepEquals, [][]byte{})
@@ -176,9 +179,9 @@ func (s *LevelDBSuite) TestBatch(c *C) {
err := s.db.Put(key, value)
c.Assert(err, IsNil)
s.db.StartBatch()
s.db.Put(key2, value2)
s.db.Delete(key)
batch := s.db.CreateBatch()
batch.Put(key2, value2)
batch.Delete(key)
v, err := s.db.Get(key)
c.Check(err, IsNil)
@@ -187,7 +190,7 @@ func (s *LevelDBSuite) TestBatch(c *C) {
_, err = s.db.Get(key2)
c.Check(err, ErrorMatches, "key not found")
err = s.db.FinishBatch()
err = batch.Write()
c.Check(err, IsNil)
v2, err := s.db.Get(key2)
@@ -196,11 +199,87 @@ func (s *LevelDBSuite) TestBatch(c *C) {
_, err = s.db.Get(key)
c.Check(err, ErrorMatches, "key not found")
}
c.Check(func() { s.db.FinishBatch() }, Panics, "no batch")
func (s *LevelDBSuite) TestTransactionCommit(c *C) {
var (
key = []byte("key")
key2 = []byte("key2")
value = []byte("value")
value2 = []byte("value2")
)
s.db.StartBatch()
c.Check(func() { s.db.StartBatch() }, Panics, "batch already started")
err := s.db.Put(key, value)
c.Assert(err, IsNil)
transaction, err := s.db.OpenTransaction()
c.Assert(err, IsNil)
transaction.Put(key2, value2)
transaction.Delete(key)
v, err := s.db.Get(key)
c.Check(err, IsNil)
c.Check(v, DeepEquals, value)
_, err = s.db.Get(key2)
c.Check(err, ErrorMatches, "key not found")
v2, err := transaction.Get(key2)
c.Check(err, IsNil)
c.Check(v2, DeepEquals, value2)
_, err = transaction.Get(key)
c.Check(err, ErrorMatches, "key not found")
err = transaction.Commit()
c.Check(err, IsNil)
v2, err = s.db.Get(key2)
c.Check(err, IsNil)
c.Check(v2, DeepEquals, value2)
_, err = s.db.Get(key)
c.Check(err, ErrorMatches, "key not found")
}
func (s *LevelDBSuite) TestTransactionDiscard(c *C) {
var (
key = []byte("key")
key2 = []byte("key2")
value = []byte("value")
value2 = []byte("value2")
)
err := s.db.Put(key, value)
c.Assert(err, IsNil)
transaction, err := s.db.OpenTransaction()
c.Assert(err, IsNil)
transaction.Put(key2, value2)
transaction.Delete(key)
v, err := s.db.Get(key)
c.Check(err, IsNil)
c.Check(v, DeepEquals, value)
_, err = s.db.Get(key2)
c.Check(err, ErrorMatches, "key not found")
v2, err := transaction.Get(key2)
c.Check(err, IsNil)
c.Check(v2, DeepEquals, value2)
_, err = transaction.Get(key)
c.Check(err, ErrorMatches, "key not found")
transaction.Discard()
v, err = s.db.Get(key)
c.Check(err, IsNil)
c.Check(v, DeepEquals, value)
_, err = s.db.Get(key2)
c.Check(err, ErrorMatches, "key not found")
}
func (s *LevelDBSuite) TestCompactDB(c *C) {
+2
View File
@@ -0,0 +1,2 @@
// Package goleveldb implements database interface via goleveldb
package goleveldb
+180
View File
@@ -0,0 +1,180 @@
package goleveldb
import (
"bytes"
"errors"
"io/ioutil"
"os"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
"github.com/aptly-dev/aptly/database"
)
type storage struct {
path string
db *leveldb.DB
}
// CreateTemporary creates new DB of the same type in temp dir
func (s *storage) CreateTemporary() (database.Storage, error) {
tempdir, err := ioutil.TempDir("", "aptly")
if err != nil {
return nil, err
}
db, err := internalOpen(tempdir, true)
if err != nil {
return nil, err
}
return &storage{db: db, path: tempdir}, nil
}
// Get key value from database
func (s *storage) Get(key []byte) ([]byte, error) {
value, err := s.db.Get(key, nil)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, database.ErrNotFound
}
return nil, err
}
return value, nil
}
// Put saves key to database, if key has the same value in DB already, it is not saved
func (s *storage) Put(key []byte, value []byte) error {
old, err := s.db.Get(key, nil)
if err != nil {
if err != leveldb.ErrNotFound {
return err
}
} else {
if bytes.Equal(old, value) {
return nil
}
}
return s.db.Put(key, value, nil)
}
// Delete removes key from DB
func (s *storage) Delete(key []byte) error {
return s.db.Delete(key, nil)
}
// KeysByPrefix returns all keys that start with prefix
func (s *storage) KeysByPrefix(prefix []byte) [][]byte {
result := make([][]byte, 0, 20)
iterator := s.db.NewIterator(nil, nil)
defer iterator.Release()
for ok := iterator.Seek(prefix); ok && bytes.HasPrefix(iterator.Key(), prefix); ok = iterator.Next() {
key := iterator.Key()
keyc := make([]byte, len(key))
copy(keyc, key)
result = append(result, keyc)
}
return result
}
// FetchByPrefix returns all values with keys that start with prefix
func (s *storage) FetchByPrefix(prefix []byte) [][]byte {
result := make([][]byte, 0, 20)
iterator := s.db.NewIterator(nil, nil)
defer iterator.Release()
for ok := iterator.Seek(prefix); ok && bytes.HasPrefix(iterator.Key(), prefix); ok = iterator.Next() {
val := iterator.Value()
valc := make([]byte, len(val))
copy(valc, val)
result = append(result, valc)
}
return result
}
// HasPrefix checks whether it can find any key with given prefix and returns true if one exists
func (s *storage) HasPrefix(prefix []byte) bool {
iterator := s.db.NewIterator(nil, nil)
defer iterator.Release()
return iterator.Seek(prefix) && bytes.HasPrefix(iterator.Key(), prefix)
}
// ProcessByPrefix iterates through all entries where key starts with prefix and calls
// StorageProcessor on key value pair
func (s *storage) ProcessByPrefix(prefix []byte, proc database.StorageProcessor) error {
iterator := s.db.NewIterator(nil, nil)
defer iterator.Release()
for ok := iterator.Seek(prefix); ok && bytes.HasPrefix(iterator.Key(), prefix); ok = iterator.Next() {
err := proc(iterator.Key(), iterator.Value())
if err != nil {
return err
}
}
return nil
}
// Close finishes DB work
func (s *storage) Close() error {
if s.db == nil {
return nil
}
err := s.db.Close()
s.db = nil
return err
}
// Reopen tries to open (re-open) the database
func (s *storage) Open() error {
if s.db != nil {
return nil
}
var err error
s.db, err = internalOpen(s.path, false)
return err
}
// CreateBatch creates a Batch object
func (s *storage) CreateBatch() database.Batch {
return &batch{
db: s.db,
b: &leveldb.Batch{},
}
}
// OpenTransaction creates new transaction.
func (s *storage) OpenTransaction() (database.Transaction, error) {
t, err := s.db.OpenTransaction()
if err != nil {
return nil, err
}
return &transaction{t: t}, nil
}
// CompactDB compacts database by merging layers
func (s *storage) CompactDB() error {
return s.db.CompactRange(util.Range{})
}
// Drop removes all the DB files (DANGEROUS!)
func (s *storage) Drop() error {
if s.db != nil {
return errors.New("DB is still open")
}
return os.RemoveAll(s.path)
}
// Check interface
var (
_ database.Storage = &storage{}
)
+60
View File
@@ -0,0 +1,60 @@
package goleveldb
import (
"bytes"
"github.com/aptly-dev/aptly/database"
"github.com/syndtr/goleveldb/leveldb"
)
type transaction struct {
t *leveldb.Transaction
}
// Get implements database.Reader interface.
func (t *transaction) Get(key []byte) ([]byte, error) {
value, err := t.t.Get(key, nil)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, database.ErrNotFound
}
return nil, err
}
return value, nil
}
// Put implements database.Writer interface.
func (t *transaction) Put(key, value []byte) error {
old, err := t.t.Get(key, nil)
if err != nil {
if err != leveldb.ErrNotFound {
return err
}
} else {
if bytes.Equal(old, value) {
return nil
}
}
return t.t.Put(key, value, nil)
}
// Delete implements database.Writer interface.
func (t *transaction) Delete(key []byte) error {
return t.t.Delete(key, nil)
}
// Commit finalizes transaction and commits changes to the stable storage.
func (t *transaction) Commit() error {
return t.t.Commit()
}
// Discard any transaction changes.
//
// Discard is safe to call after Commit(), it would be no-op
func (t *transaction) Discard() {
t.t.Discard()
}
// transaction should implement database.Transaction
var _ database.Transaction = &transaction{}
-267
View File
@@ -1,267 +0,0 @@
// Package database provides KV database for meta-information
package database
import (
"bytes"
"errors"
"io/ioutil"
"os"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
)
// Errors for Storage
var (
ErrNotFound = errors.New("key not found")
)
// StorageProcessor is a function to process one single storage entry
type StorageProcessor func(key []byte, value []byte) error
// Storage is an interface to KV storage
type Storage interface {
CreateTemporary() (Storage, error)
Get(key []byte) ([]byte, error)
Put(key []byte, value []byte) error
Delete(key []byte) error
HasPrefix(prefix []byte) bool
ProcessByPrefix(prefix []byte, proc StorageProcessor) error
KeysByPrefix(prefix []byte) [][]byte
FetchByPrefix(prefix []byte) [][]byte
Open() error
Close() error
StartBatch()
FinishBatch() error
CompactDB() error
Drop() error
}
type levelDB struct {
path string
db *leveldb.DB
batch *leveldb.Batch
}
// Check interface
var (
_ Storage = &levelDB{}
)
func internalOpen(path string, throttleCompaction bool) (*leveldb.DB, error) {
o := &opt.Options{
Filter: filter.NewBloomFilter(10),
OpenFilesCacheCapacity: 256,
}
if throttleCompaction {
o.CompactionL0Trigger = 32
o.WriteL0PauseTrigger = 96
o.WriteL0SlowdownTrigger = 64
}
return leveldb.OpenFile(path, o)
}
// NewDB creates new instance of DB, but doesn't open it (yet)
func NewDB(path string) (Storage, error) {
return &levelDB{path: path}, nil
}
// NewOpenDB creates new instance of DB and opens it
func NewOpenDB(path string) (Storage, error) {
db, err := NewDB(path)
if err != nil {
return nil, err
}
return db, db.Open()
}
// RecoverDB recovers LevelDB database from corruption
func RecoverDB(path string) error {
stor, err := storage.OpenFile(path, false)
if err != nil {
return err
}
db, err := leveldb.Recover(stor, nil)
if err != nil {
return err
}
db.Close()
stor.Close()
return nil
}
// CreateTemporary creates new DB of the same type in temp dir
func (l *levelDB) CreateTemporary() (Storage, error) {
tempdir, err := ioutil.TempDir("", "aptly")
if err != nil {
return nil, err
}
db, err := internalOpen(tempdir, true)
if err != nil {
return nil, err
}
return &levelDB{db: db, path: tempdir}, nil
}
// Get key value from database
func (l *levelDB) Get(key []byte) ([]byte, error) {
value, err := l.db.Get(key, nil)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, ErrNotFound
}
return nil, err
}
return value, nil
}
// Put saves key to database, if key has the same value in DB already, it is not saved
func (l *levelDB) Put(key []byte, value []byte) error {
if l.batch != nil {
l.batch.Put(key, value)
return nil
}
old, err := l.db.Get(key, nil)
if err != nil {
if err != leveldb.ErrNotFound {
return err
}
} else {
if bytes.Equal(old, value) {
return nil
}
}
return l.db.Put(key, value, nil)
}
// Delete removes key from DB
func (l *levelDB) Delete(key []byte) error {
if l.batch != nil {
l.batch.Delete(key)
return nil
}
return l.db.Delete(key, nil)
}
// KeysByPrefix returns all keys that start with prefix
func (l *levelDB) KeysByPrefix(prefix []byte) [][]byte {
result := make([][]byte, 0, 20)
iterator := l.db.NewIterator(nil, nil)
defer iterator.Release()
for ok := iterator.Seek(prefix); ok && bytes.HasPrefix(iterator.Key(), prefix); ok = iterator.Next() {
key := iterator.Key()
keyc := make([]byte, len(key))
copy(keyc, key)
result = append(result, keyc)
}
return result
}
// FetchByPrefix returns all values with keys that start with prefix
func (l *levelDB) FetchByPrefix(prefix []byte) [][]byte {
result := make([][]byte, 0, 20)
iterator := l.db.NewIterator(nil, nil)
defer iterator.Release()
for ok := iterator.Seek(prefix); ok && bytes.HasPrefix(iterator.Key(), prefix); ok = iterator.Next() {
val := iterator.Value()
valc := make([]byte, len(val))
copy(valc, val)
result = append(result, valc)
}
return result
}
// HasPrefix checks whether it can find any key with given prefix and returns true if one exists
func (l *levelDB) HasPrefix(prefix []byte) bool {
iterator := l.db.NewIterator(nil, nil)
defer iterator.Release()
return iterator.Seek(prefix) && bytes.HasPrefix(iterator.Key(), prefix)
}
// ProcessByPrefix iterates through all entries where key starts with prefix and calls
// StorageProcessor on key value pair
func (l *levelDB) ProcessByPrefix(prefix []byte, proc StorageProcessor) error {
iterator := l.db.NewIterator(nil, nil)
defer iterator.Release()
for ok := iterator.Seek(prefix); ok && bytes.HasPrefix(iterator.Key(), prefix); ok = iterator.Next() {
err := proc(iterator.Key(), iterator.Value())
if err != nil {
return err
}
}
return nil
}
// Close finishes DB work
func (l *levelDB) Close() error {
if l.db == nil {
return nil
}
err := l.db.Close()
l.db = nil
return err
}
// Reopen tries to open (re-open) the database
func (l *levelDB) Open() error {
if l.db != nil {
return nil
}
var err error
l.db, err = internalOpen(l.path, false)
return err
}
// StartBatch starts batch processing of keys
//
// All subsequent Get, Put and Delete would work on batch
func (l *levelDB) StartBatch() {
if l.batch != nil {
panic("batch already started")
}
l.batch = new(leveldb.Batch)
}
// FinishBatch finalizes the batch, saving operations
func (l *levelDB) FinishBatch() error {
if l.batch == nil {
panic("no batch")
}
err := l.db.Write(l.batch, nil)
l.batch = nil
return err
}
// CompactDB compacts database by merging layers
func (l *levelDB) CompactDB() error {
return l.db.CompactRange(util.Range{})
}
// Drop removes all the DB files (DANGEROUS!)
func (l *levelDB) Drop() error {
if l.db != nil {
return errors.New("DB is still open")
}
return os.RemoveAll(l.path)
}