diff --git a/database/etcddb/database.go b/database/etcddb/database.go index 761c81fe..bdc94de7 100644 --- a/database/etcddb/database.go +++ b/database/etcddb/database.go @@ -32,7 +32,7 @@ func NewDB(url string) (database.Storage, error) { if err != nil { return nil, err } - return &EtcDStorage{url, cli}, nil + return &EtcDStorage{url, cli, ""}, nil } func NewOpenDB(url string) (database.Storage, error) { diff --git a/database/etcddb/storage.go b/database/etcddb/storage.go index c36c3bea..221b9ebd 100644 --- a/database/etcddb/storage.go +++ b/database/etcddb/storage.go @@ -2,21 +2,38 @@ package etcddb import ( "github.com/aptly-dev/aptly/database" + "github.com/pborman/uuid" clientv3 "go.etcd.io/etcd/client/v3" + + "fmt" ) type EtcDStorage struct { - url string - db *clientv3.Client + url string + db *clientv3.Client + tmpPrefix string // prefix for temporary DBs } // CreateTemporary creates new DB of the same type in temp dir func (s *EtcDStorage) CreateTemporary() (database.Storage, error) { - return s, nil + tmp := uuid.NewRandom().String() + return &EtcDStorage{ + url: s.url, + db: s.db, + tmpPrefix: tmp, + }, nil +} + +func (s *EtcDStorage) applyPrefix(key []byte) []byte { + if len(s.tmpPrefix) != 0 { + return append([]byte(s.tmpPrefix+"/"), key...) + } + return key } // Get key value from etcd func (s *EtcDStorage) Get(key []byte) (value []byte, err error) { + key = s.applyPrefix(key) getResp, err := s.db.Get(Ctx, string(key)) if err != nil { return @@ -33,6 +50,7 @@ func (s *EtcDStorage) Get(key []byte) (value []byte, err error) { // Put saves key to etcd, if key has the same value in DB already, it is not saved func (s *EtcDStorage) Put(key []byte, value []byte) (err error) { + key = s.applyPrefix(key) _, err = s.db.Put(Ctx, string(key), string(value)) if err != nil { return @@ -42,6 +60,7 @@ func (s *EtcDStorage) Put(key []byte, value []byte) (err error) { // Delete removes key from etcd func (s *EtcDStorage) Delete(key []byte) (err error) { + key = s.applyPrefix(key) _, err = s.db.Delete(Ctx, string(key)) if err != nil { return @@ -51,6 +70,7 @@ func (s *EtcDStorage) Delete(key []byte) (err error) { // KeysByPrefix returns all keys that start with prefix func (s *EtcDStorage) KeysByPrefix(prefix []byte) [][]byte { + prefix = s.applyPrefix(prefix) result := make([][]byte, 0, 20) getResp, err := s.db.Get(Ctx, string(prefix), clientv3.WithPrefix()) if err != nil { @@ -67,6 +87,7 @@ func (s *EtcDStorage) KeysByPrefix(prefix []byte) [][]byte { // FetchByPrefix returns all values with keys that start with prefix func (s *EtcDStorage) FetchByPrefix(prefix []byte) [][]byte { + prefix = s.applyPrefix(prefix) result := make([][]byte, 0, 20) getResp, err := s.db.Get(Ctx, string(prefix), clientv3.WithPrefix()) if err != nil { @@ -83,6 +104,7 @@ func (s *EtcDStorage) FetchByPrefix(prefix []byte) [][]byte { // HasPrefix checks whether it can find any key with given prefix and returns true if one exists func (s *EtcDStorage) HasPrefix(prefix []byte) bool { + prefix = s.applyPrefix(prefix) getResp, err := s.db.Get(Ctx, string(prefix), clientv3.WithPrefix()) if err != nil { return false @@ -96,6 +118,7 @@ func (s *EtcDStorage) HasPrefix(prefix []byte) bool { // ProcessByPrefix iterates through all entries where key starts with prefix and calls // StorageProcessor on key value pair func (s *EtcDStorage) ProcessByPrefix(prefix []byte, proc database.StorageProcessor) error { + prefix = s.applyPrefix(prefix) getResp, err := s.db.Get(Ctx, string(prefix), clientv3.WithPrefix()) if err != nil { return err @@ -112,6 +135,10 @@ func (s *EtcDStorage) ProcessByPrefix(prefix []byte, proc database.StorageProces // Close finishes etcd connect func (s *EtcDStorage) Close() error { + // do not close temporary db + if len(s.tmpPrefix) != 0 { + return nil + } if s.db == nil { return nil } @@ -132,6 +159,9 @@ func (s *EtcDStorage) Open() error { // CreateBatch creates a Batch object func (s *EtcDStorage) CreateBatch() database.Batch { + if s.db == nil { + return nil + } return &EtcDBatch{ db: s.db, } @@ -147,13 +177,25 @@ func (s *EtcDStorage) OpenTransaction() (database.Transaction, error) { return &transaction{t: kvc}, nil } -// CompactDB compacts database by merging layers +// CompactDB does nothing for etcd func (s *EtcDStorage) CompactDB() error { return nil } -// Drop removes all the etcd files (DANGEROUS!) +// Drop removes only temporary DBs with etcd (i.e. remove all prefixed keys) func (s *EtcDStorage) Drop() error { + if len(s.tmpPrefix) != 0 { + getResp, err := s.db.Get(Ctx, s.tmpPrefix, clientv3.WithPrefix()) + if err != nil { + return nil + } + for _, kv := range getResp.Kvs { + _, err = s.db.Delete(Ctx, string(kv.Key)) + if err != nil { + return fmt.Errorf("cannot delete tempdb entry: %s", kv.Key) + } + } + } return nil }