feat: Add etcd database support

improve concurrent access and high availability of aptly with the help of the characteristics of etcd
This commit is contained in:
hudeng
2022-02-08 11:16:16 +08:00
committed by André Roth
parent f42ff697d4
commit 78172d11d7
14 changed files with 497 additions and 6 deletions

34
database/etcddb/batch.go Normal file
View File

@@ -0,0 +1,34 @@
package etcddb
import (
"github.com/aptly-dev/aptly/database"
clientv3 "go.etcd.io/etcd/client/v3"
)
type EtcDBatch struct {
db *clientv3.Client
}
type WriteOptions struct {
NoWriteMerge bool
Sync bool
}
func (b *EtcDBatch) Put(key, value []byte) (err error) {
_, err = b.db.Put(Ctx, string(key), string(value))
return
}
func (b *EtcDBatch) Delete(key []byte) (err error) {
_, err = b.db.Delete(Ctx, string(key))
return
}
func (b *EtcDBatch) Write() (err error) {
return
}
// batch should implement database.Batch
var (
_ database.Batch = &EtcDBatch{}
)

View File

@@ -0,0 +1,45 @@
package etcddb
import (
"context"
"time"
"github.com/aptly-dev/aptly/database"
clientv3 "go.etcd.io/etcd/client/v3"
)
var Ctx = context.TODO()
func internalOpen(url string) (*clientv3.Client, error) {
cfg := clientv3.Config{
Endpoints: []string{url},
DialTimeout: 30 * time.Second,
MaxCallSendMsgSize: 2048 * 1024 * 1024,
MaxCallRecvMsgSize: 2048 * 1024 * 1024,
DialKeepAliveTimeout: 7200 * time.Second,
}
cli, err := clientv3.New(cfg)
if err != nil {
return nil, err
}
return cli, nil
}
func NewDB(url string) (database.Storage, error) {
cli, err := internalOpen(url)
if err != nil {
return nil, err
}
return &EtcDStorage{url, cli}, nil
}
func NewOpenDB(url string) (database.Storage, error) {
db, err := NewDB(url)
if err != nil {
return nil, err
}
return db, nil
}

View File

@@ -0,0 +1,156 @@
package etcddb_test
import (
"testing"
"github.com/aptly-dev/aptly/database"
"github.com/aptly-dev/aptly/database/etcddb"
. "gopkg.in/check.v1"
)
// Launch gocheck tests
func Test(t *testing.T) {
TestingT(t)
}
type EtcDDBSuite struct {
url string
db database.Storage
}
var _ = Suite(&EtcDDBSuite{})
func (s *EtcDDBSuite) SetUpTest(c *C) {
var err error
s.db, err = etcddb.NewOpenDB("127.0.0.1:2379")
c.Assert(err, IsNil)
}
func (s *EtcDDBSuite) TestSetUpTest(c *C) {
var err error
s.db, err = etcddb.NewOpenDB("127.0.0.1:2379")
c.Assert(err, IsNil)
}
func (s *EtcDDBSuite) TestGetPut(c *C) {
var (
key = []byte("key")
value = []byte("value")
)
var err error
err = s.db.Put(key, value)
c.Assert(err, IsNil)
result, err := s.db.Get(key)
c.Assert(err, IsNil)
c.Assert(result, DeepEquals, value)
}
func (s *EtcDDBSuite) TestDelete(c *C) {
var (
key = []byte("key")
value = []byte("value")
)
err := s.db.Put(key, value)
c.Assert(err, IsNil)
_, err = s.db.Get(key)
c.Assert(err, IsNil)
err = s.db.Delete(key)
c.Assert(err, IsNil)
}
func (s *EtcDDBSuite) TestByPrefix(c *C) {
//c.Check(s.db.FetchByPrefix([]byte{0x80}), DeepEquals, [][]byte{})
s.db.Put([]byte{0x80, 0x01}, []byte{0x01})
s.db.Put([]byte{0x80, 0x03}, []byte{0x03})
s.db.Put([]byte{0x80, 0x02}, []byte{0x02})
c.Check(s.db.FetchByPrefix([]byte{0x80}), DeepEquals, [][]byte{{0x01}, {0x02}, {0x03}})
c.Check(s.db.KeysByPrefix([]byte{0x80}), DeepEquals, [][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}})
s.db.Put([]byte{0x90, 0x01}, []byte{0x04})
c.Check(s.db.FetchByPrefix([]byte{0x80}), DeepEquals, [][]byte{{0x01}, {0x02}, {0x03}})
c.Check(s.db.KeysByPrefix([]byte{0x80}), DeepEquals, [][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}})
s.db.Put([]byte{0x00, 0x01}, []byte{0x05})
c.Check(s.db.FetchByPrefix([]byte{0x80}), DeepEquals, [][]byte{{0x01}, {0x02}, {0x03}})
c.Check(s.db.KeysByPrefix([]byte{0x80}), DeepEquals, [][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}})
keys := [][]byte{}
values := [][]byte{}
c.Check(s.db.ProcessByPrefix([]byte{0x80}, func(k, v []byte) error {
keys = append(keys, append([]byte(nil), k...))
values = append(values, append([]byte(nil), v...))
return nil
}), IsNil)
c.Check(values, DeepEquals, [][]byte{{0x01}, {0x02}, {0x03}})
c.Check(keys, DeepEquals, [][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}})
c.Check(s.db.ProcessByPrefix([]byte{0x80}, func(k, v []byte) error {
return database.ErrNotFound
}), Equals, database.ErrNotFound)
c.Check(s.db.ProcessByPrefix([]byte{0xa0}, func(k, v []byte) error {
return database.ErrNotFound
}), IsNil)
c.Check(s.db.FetchByPrefix([]byte{0xa0}), DeepEquals, [][]byte{})
c.Check(s.db.KeysByPrefix([]byte{0xa0}), DeepEquals, [][]byte{})
}
func (s *EtcDDBSuite) TestHasPrefix(c *C) {
//c.Check(s.db.HasPrefix([]byte(nil)), Equals, false)
//c.Check(s.db.HasPrefix([]byte{0x80}), Equals, false)
s.db.Put([]byte{0x80, 0x01}, []byte{0x01})
c.Check(s.db.HasPrefix([]byte(nil)), Equals, true)
c.Check(s.db.HasPrefix([]byte{0x80}), Equals, true)
c.Check(s.db.HasPrefix([]byte{0x79}), Equals, false)
}
func (s *EtcDDBSuite) TestTransactionCommit(c *C) {
var (
key = []byte("key")
key2 = []byte("key2")
value = []byte("value")
value2 = []byte("value2")
)
transaction, err := s.db.OpenTransaction()
err = s.db.Put(key, value)
c.Assert(err, IsNil)
c.Assert(err, IsNil)
transaction.Put(key2, value2)
v, err := s.db.Get(key)
c.Check(v, DeepEquals, value)
transaction.Delete(key)
_, err = transaction.Get(key2)
c.Assert(err, IsNil)
v2, err := transaction.Get(key2)
c.Check(err, IsNil)
c.Check(v2, DeepEquals, value2)
_, err = transaction.Get(key)
c.Assert(err, IsNil)
err = transaction.Commit()
c.Check(err, IsNil)
v2, err = transaction.Get(key2)
c.Check(err, IsNil)
c.Check(v2, DeepEquals, value2)
_, err = transaction.Get(key)
c.Assert(err, IsNil)
}

163
database/etcddb/storage.go Normal file
View File

@@ -0,0 +1,163 @@
package etcddb
import (
"github.com/aptly-dev/aptly/database"
clientv3 "go.etcd.io/etcd/client/v3"
)
type EtcDStorage struct {
url string
db *clientv3.Client
}
// CreateTemporary creates new DB of the same type in temp dir
func (s *EtcDStorage) CreateTemporary() (database.Storage, error) {
return s, nil
}
// Get key value from etcd
func (s *EtcDStorage) Get(key []byte) (value []byte, err error) {
getResp, err := s.db.Get(Ctx, string(key))
if err != nil {
return
}
for _, kv := range getResp.Kvs {
value = kv.Value
}
if len(value) == 0 {
err = database.ErrNotFound
return
}
return
}
// Put saves key to etcd, if key has the same value in DB already, it is not saved
func (s *EtcDStorage) Put(key []byte, value []byte) (err error) {
_, err = s.db.Put(Ctx, string(key), string(value))
if err != nil {
return
}
return
}
// Delete removes key from etcd
func (s *EtcDStorage) Delete(key []byte) (err error) {
_, err = s.db.Delete(Ctx, string(key))
if err != nil {
return
}
return
}
// KeysByPrefix returns all keys that start with prefix
func (s *EtcDStorage) KeysByPrefix(prefix []byte) [][]byte {
result := make([][]byte, 0, 20)
getResp, err := s.db.Get(Ctx, string(prefix), clientv3.WithPrefix())
if err != nil {
return nil
}
for _, ev := range getResp.Kvs {
key := ev.Key
keyc := make([]byte, len(key))
copy(keyc, key)
result = append(result, key)
}
return result
}
// FetchByPrefix returns all values with keys that start with prefix
func (s *EtcDStorage) FetchByPrefix(prefix []byte) [][]byte {
result := make([][]byte, 0, 20)
getResp, err := s.db.Get(Ctx, string(prefix), clientv3.WithPrefix())
if err != nil {
return nil
}
for _, kv := range getResp.Kvs {
valc := make([]byte, len(kv.Value))
copy(valc, kv.Value)
result = append(result, kv.Value)
}
return result
}
// HasPrefix checks whether it can find any key with given prefix and returns true if one exists
func (s *EtcDStorage) HasPrefix(prefix []byte) bool {
getResp, err := s.db.Get(Ctx, string(prefix), clientv3.WithPrefix())
if err != nil {
return false
}
if getResp.Count != 0 {
return true
}
return false
}
// ProcessByPrefix iterates through all entries where key starts with prefix and calls
// StorageProcessor on key value pair
func (s *EtcDStorage) ProcessByPrefix(prefix []byte, proc database.StorageProcessor) error {
getResp, err := s.db.Get(Ctx, string(prefix), clientv3.WithPrefix())
if err != nil {
return err
}
for _, kv := range getResp.Kvs {
err := proc(kv.Key, kv.Value)
if err != nil {
return err
}
}
return nil
}
// Close finishes etcd connect
func (s *EtcDStorage) Close() error {
if s.db == nil {
return nil
}
err := s.db.Close()
s.db = nil
return err
}
// Reopen tries to open (re-open) the database
func (s *EtcDStorage) Open() error {
if s.db != nil {
return nil
}
var err error
s.db, err = internalOpen(s.url)
return err
}
// CreateBatch creates a Batch object
func (s *EtcDStorage) CreateBatch() database.Batch {
return &EtcDBatch{
db: s.db,
}
}
// OpenTransaction creates new transaction.
func (s *EtcDStorage) OpenTransaction() (database.Transaction, error) {
cli, err := internalOpen(s.url)
if err != nil {
return nil, err
}
kvc := clientv3.NewKV(cli)
return &transaction{t: kvc}, nil
}
// CompactDB compacts database by merging layers
func (s *EtcDStorage) CompactDB() error {
return nil
}
// Drop removes all the etcd files (DANGEROUS!)
func (s *EtcDStorage) Drop() error {
return nil
}
// Check interface
var (
_ database.Storage = &EtcDStorage{}
)

View File

@@ -0,0 +1,55 @@
package etcddb
import (
"github.com/aptly-dev/aptly/database"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/clientv3util"
)
type transaction struct {
t clientv3.KV
}
// Get implements database.Reader interface.
func (t *transaction) Get(key []byte) ([]byte, error) {
getResp, err := t.t.Get(Ctx, string(key))
if err != nil {
return nil, err
}
var value []byte
for _, kv := range getResp.Kvs {
valc := make([]byte, len(kv.Value))
copy(valc, kv.Value)
value = valc
}
return value, nil
}
// Put implements database.Writer interface.
func (t *transaction) Put(key, value []byte) (err error) {
_, err = t.t.Txn(Ctx).
If().Then(clientv3.OpPut(string(key), string(value))).Commit()
return
}
// Delete implements database.Writer interface.
func (t *transaction) Delete(key []byte) (err error) {
_, err = t.t.Txn(Ctx).
If(clientv3util.KeyExists(string(key))).
Then(clientv3.OpDelete(string(key))).Commit()
return
}
func (t *transaction) Commit() (err error) {
return
}
// Discard is safe to call after Commit(), it would be no-op
func (t *transaction) Discard() {
return
}
// transaction should implement database.Transaction
var _ database.Transaction = &transaction{}