mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-01-11 03:11:50 +00:00
etcd: implement temporary db support
- temporary db support is implemented with a unique key prefix - prevent closing etcd connection when closing temporary db
This commit is contained in:
@@ -32,7 +32,7 @@ func NewDB(url string) (database.Storage, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &EtcDStorage{url, cli}, nil
|
||||
return &EtcDStorage{url, cli, ""}, nil
|
||||
}
|
||||
|
||||
func NewOpenDB(url string) (database.Storage, error) {
|
||||
|
||||
@@ -2,21 +2,38 @@ package etcddb
|
||||
|
||||
import (
|
||||
"github.com/aptly-dev/aptly/database"
|
||||
"github.com/pborman/uuid"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type EtcDStorage struct {
|
||||
url string
|
||||
db *clientv3.Client
|
||||
url string
|
||||
db *clientv3.Client
|
||||
tmpPrefix string // prefix for temporary DBs
|
||||
}
|
||||
|
||||
// CreateTemporary creates new DB of the same type in temp dir
|
||||
func (s *EtcDStorage) CreateTemporary() (database.Storage, error) {
|
||||
return s, nil
|
||||
tmp := uuid.NewRandom().String()
|
||||
return &EtcDStorage{
|
||||
url: s.url,
|
||||
db: s.db,
|
||||
tmpPrefix: tmp,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *EtcDStorage) applyPrefix(key []byte) []byte {
|
||||
if len(s.tmpPrefix) != 0 {
|
||||
return append([]byte(s.tmpPrefix+"/"), key...)
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
// Get key value from etcd
|
||||
func (s *EtcDStorage) Get(key []byte) (value []byte, err error) {
|
||||
key = s.applyPrefix(key)
|
||||
getResp, err := s.db.Get(Ctx, string(key))
|
||||
if err != nil {
|
||||
return
|
||||
@@ -33,6 +50,7 @@ func (s *EtcDStorage) Get(key []byte) (value []byte, err error) {
|
||||
|
||||
// Put saves key to etcd, if key has the same value in DB already, it is not saved
|
||||
func (s *EtcDStorage) Put(key []byte, value []byte) (err error) {
|
||||
key = s.applyPrefix(key)
|
||||
_, err = s.db.Put(Ctx, string(key), string(value))
|
||||
if err != nil {
|
||||
return
|
||||
@@ -42,6 +60,7 @@ func (s *EtcDStorage) Put(key []byte, value []byte) (err error) {
|
||||
|
||||
// Delete removes key from etcd
|
||||
func (s *EtcDStorage) Delete(key []byte) (err error) {
|
||||
key = s.applyPrefix(key)
|
||||
_, err = s.db.Delete(Ctx, string(key))
|
||||
if err != nil {
|
||||
return
|
||||
@@ -51,6 +70,7 @@ func (s *EtcDStorage) Delete(key []byte) (err error) {
|
||||
|
||||
// KeysByPrefix returns all keys that start with prefix
|
||||
func (s *EtcDStorage) KeysByPrefix(prefix []byte) [][]byte {
|
||||
prefix = s.applyPrefix(prefix)
|
||||
result := make([][]byte, 0, 20)
|
||||
getResp, err := s.db.Get(Ctx, string(prefix), clientv3.WithPrefix())
|
||||
if err != nil {
|
||||
@@ -67,6 +87,7 @@ func (s *EtcDStorage) KeysByPrefix(prefix []byte) [][]byte {
|
||||
|
||||
// FetchByPrefix returns all values with keys that start with prefix
|
||||
func (s *EtcDStorage) FetchByPrefix(prefix []byte) [][]byte {
|
||||
prefix = s.applyPrefix(prefix)
|
||||
result := make([][]byte, 0, 20)
|
||||
getResp, err := s.db.Get(Ctx, string(prefix), clientv3.WithPrefix())
|
||||
if err != nil {
|
||||
@@ -83,6 +104,7 @@ func (s *EtcDStorage) FetchByPrefix(prefix []byte) [][]byte {
|
||||
|
||||
// HasPrefix checks whether it can find any key with given prefix and returns true if one exists
|
||||
func (s *EtcDStorage) HasPrefix(prefix []byte) bool {
|
||||
prefix = s.applyPrefix(prefix)
|
||||
getResp, err := s.db.Get(Ctx, string(prefix), clientv3.WithPrefix())
|
||||
if err != nil {
|
||||
return false
|
||||
@@ -96,6 +118,7 @@ func (s *EtcDStorage) HasPrefix(prefix []byte) bool {
|
||||
// ProcessByPrefix iterates through all entries where key starts with prefix and calls
|
||||
// StorageProcessor on key value pair
|
||||
func (s *EtcDStorage) ProcessByPrefix(prefix []byte, proc database.StorageProcessor) error {
|
||||
prefix = s.applyPrefix(prefix)
|
||||
getResp, err := s.db.Get(Ctx, string(prefix), clientv3.WithPrefix())
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -112,6 +135,10 @@ func (s *EtcDStorage) ProcessByPrefix(prefix []byte, proc database.StorageProces
|
||||
|
||||
// Close finishes etcd connect
|
||||
func (s *EtcDStorage) Close() error {
|
||||
// do not close temporary db
|
||||
if len(s.tmpPrefix) != 0 {
|
||||
return nil
|
||||
}
|
||||
if s.db == nil {
|
||||
return nil
|
||||
}
|
||||
@@ -132,6 +159,9 @@ func (s *EtcDStorage) Open() error {
|
||||
|
||||
// CreateBatch creates a Batch object
|
||||
func (s *EtcDStorage) CreateBatch() database.Batch {
|
||||
if s.db == nil {
|
||||
return nil
|
||||
}
|
||||
return &EtcDBatch{
|
||||
db: s.db,
|
||||
}
|
||||
@@ -147,13 +177,25 @@ func (s *EtcDStorage) OpenTransaction() (database.Transaction, error) {
|
||||
return &transaction{t: kvc}, nil
|
||||
}
|
||||
|
||||
// CompactDB compacts database by merging layers
|
||||
// CompactDB does nothing for etcd
|
||||
func (s *EtcDStorage) CompactDB() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Drop removes all the etcd files (DANGEROUS!)
|
||||
// Drop removes only temporary DBs with etcd (i.e. remove all prefixed keys)
|
||||
func (s *EtcDStorage) Drop() error {
|
||||
if len(s.tmpPrefix) != 0 {
|
||||
getResp, err := s.db.Get(Ctx, s.tmpPrefix, clientv3.WithPrefix())
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
for _, kv := range getResp.Kvs {
|
||||
_, err = s.db.Delete(Ctx, string(kv.Key))
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot delete tempdb entry: %s", kv.Key)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user