From 78172d11d7fca12e8ba8d7396e2f814171db844c Mon Sep 17 00:00:00 2001 From: hudeng Date: Tue, 8 Feb 2022 11:16:16 +0800 Subject: [PATCH] feat: Add etcd database support improve concurrent access and high availability of aptly with the help of the characteristics of etcd --- AUTHORS | 2 + Makefile | 13 +- context/context.go | 8 +- database/etcddb/batch.go | 34 +++++ database/etcddb/database.go | 45 +++++++ database/etcddb/database_test.go | 156 +++++++++++++++++++++++ database/etcddb/storage.go | 163 ++++++++++++++++++++++++ database/etcddb/transaction.go | 55 ++++++++ go.mod | 11 ++ man/aptly.1.ronn.tmpl | 4 + system/t02_config/ConfigShowTest_gold | 3 +- system/t02_config/CreateConfigTest_gold | 5 +- utils/config.go | 1 + utils/config_test.go | 3 +- 14 files changed, 497 insertions(+), 6 deletions(-) create mode 100644 database/etcddb/batch.go create mode 100644 database/etcddb/database.go create mode 100644 database/etcddb/database_test.go create mode 100644 database/etcddb/storage.go create mode 100644 database/etcddb/transaction.go diff --git a/AUTHORS b/AUTHORS index 3962217b..70a75464 100644 --- a/AUTHORS +++ b/AUTHORS @@ -61,3 +61,5 @@ List of contributors, in chronological order: * iofq (https://github.com/iofq) * Noa Resare (https://github.com/nresare) * Ramón N.Rodriguez (https://github.com/runitonmetal) +* Golf Hu (https://github.com/hudeng-go) +* Cookie Fei (https://github.com/wuhuang26) diff --git a/Makefile b/Makefile index 736e1597..217b8c82 100644 --- a/Makefile +++ b/Makefile @@ -12,6 +12,10 @@ COVERAGE_DIR?=$(shell mktemp -d) # Uncomment to update test outputs # CAPTURE := "--capture" +# etcd test env +ETCD_VER=v3.5.2 +DOWNLOAD_URL=https://storage.googleapis.com/etcd + all: modules test bench check system-test # Self-documenting Makefile @@ -22,6 +26,13 @@ help: ## Print this help prepare: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(shell go env GOPATH)/bin $(GOLANGCI_LINT_VERSION) +etcd-prepare: + # etcd test prepare + rm -rf /tmp/etcd-download-test/test-data && mkdir -p /tmp/etcd-download-test/test-data + if [ ! -e /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz ]; then curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz; fi + tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test --strip-components=1 + /tmp/etcd-download-test/etcd --data-dir /tmp/etcd-download-test/test-data & + modules: go mod download go mod verify @@ -68,7 +79,7 @@ docker-test: install export APTLY_VERSION=$(VERSION); \ $(PYTHON) system/run.py --long $(TESTS) --coverage-dir $(COVERAGE_DIR) $(CAPTURE) $(TEST) -test: +test: etcd-prepare go test -v ./... -gocheck.v=true -coverprofile=unit.out bench: diff --git a/context/context.go b/context/context.go index b6040f3b..7f17b84d 100644 --- a/context/context.go +++ b/context/context.go @@ -18,6 +18,7 @@ import ( "github.com/aptly-dev/aptly/azure" "github.com/aptly-dev/aptly/console" "github.com/aptly-dev/aptly/database" + "github.com/aptly-dev/aptly/database/etcddb" "github.com/aptly-dev/aptly/database/goleveldb" "github.com/aptly-dev/aptly/deb" "github.com/aptly-dev/aptly/files" @@ -288,7 +289,12 @@ func (context *AptlyContext) _database() (database.Storage, error) { if context.database == nil { var err error - context.database, err = goleveldb.NewDB(context.dbPath()) + if context.config().DatabaseEtcd != "" { + context.database, err = etcddb.NewDB(context.config().DatabaseEtcd) + } else { + context.database, err = goleveldb.NewDB(context.dbPath()) + } + if err != nil { return nil, fmt.Errorf("can't instantiate database: %s", err) } diff --git a/database/etcddb/batch.go b/database/etcddb/batch.go new file mode 100644 index 00000000..e50c1162 --- /dev/null +++ b/database/etcddb/batch.go @@ -0,0 +1,34 @@ +package etcddb + +import ( + "github.com/aptly-dev/aptly/database" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type EtcDBatch struct { + db *clientv3.Client +} + +type WriteOptions struct { + NoWriteMerge bool + Sync bool +} + +func (b *EtcDBatch) Put(key, value []byte) (err error) { + _, err = b.db.Put(Ctx, string(key), string(value)) + return +} + +func (b *EtcDBatch) Delete(key []byte) (err error) { + _, err = b.db.Delete(Ctx, string(key)) + return +} + +func (b *EtcDBatch) Write() (err error) { + return +} + +// batch should implement database.Batch +var ( + _ database.Batch = &EtcDBatch{} +) diff --git a/database/etcddb/database.go b/database/etcddb/database.go new file mode 100644 index 00000000..f69eee7e --- /dev/null +++ b/database/etcddb/database.go @@ -0,0 +1,45 @@ +package etcddb + +import ( + "context" + "time" + + "github.com/aptly-dev/aptly/database" + clientv3 "go.etcd.io/etcd/client/v3" +) + +var Ctx = context.TODO() + +func internalOpen(url string) (*clientv3.Client, error) { + cfg := clientv3.Config{ + Endpoints: []string{url}, + DialTimeout: 30 * time.Second, + MaxCallSendMsgSize: 2048 * 1024 * 1024, + MaxCallRecvMsgSize: 2048 * 1024 * 1024, + DialKeepAliveTimeout: 7200 * time.Second, + } + + cli, err := clientv3.New(cfg) + if err != nil { + return nil, err + } + + return cli, nil +} + +func NewDB(url string) (database.Storage, error) { + cli, err := internalOpen(url) + if err != nil { + return nil, err + } + return &EtcDStorage{url, cli}, nil +} + +func NewOpenDB(url string) (database.Storage, error) { + db, err := NewDB(url) + if err != nil { + return nil, err + } + + return db, nil +} diff --git a/database/etcddb/database_test.go b/database/etcddb/database_test.go new file mode 100644 index 00000000..1e601b4e --- /dev/null +++ b/database/etcddb/database_test.go @@ -0,0 +1,156 @@ +package etcddb_test + +import ( + "testing" + + "github.com/aptly-dev/aptly/database" + "github.com/aptly-dev/aptly/database/etcddb" + . "gopkg.in/check.v1" +) + +// Launch gocheck tests +func Test(t *testing.T) { + TestingT(t) +} + +type EtcDDBSuite struct { + url string + db database.Storage +} + +var _ = Suite(&EtcDDBSuite{}) + +func (s *EtcDDBSuite) SetUpTest(c *C) { + var err error + s.db, err = etcddb.NewOpenDB("127.0.0.1:2379") + c.Assert(err, IsNil) +} + +func (s *EtcDDBSuite) TestSetUpTest(c *C) { + var err error + s.db, err = etcddb.NewOpenDB("127.0.0.1:2379") + c.Assert(err, IsNil) +} + +func (s *EtcDDBSuite) TestGetPut(c *C) { + var ( + key = []byte("key") + value = []byte("value") + ) + var err error + + err = s.db.Put(key, value) + c.Assert(err, IsNil) + + result, err := s.db.Get(key) + c.Assert(err, IsNil) + c.Assert(result, DeepEquals, value) +} + +func (s *EtcDDBSuite) TestDelete(c *C) { + var ( + key = []byte("key") + value = []byte("value") + ) + + err := s.db.Put(key, value) + c.Assert(err, IsNil) + + _, err = s.db.Get(key) + c.Assert(err, IsNil) + + err = s.db.Delete(key) + c.Assert(err, IsNil) + +} + +func (s *EtcDDBSuite) TestByPrefix(c *C) { + //c.Check(s.db.FetchByPrefix([]byte{0x80}), DeepEquals, [][]byte{}) + + s.db.Put([]byte{0x80, 0x01}, []byte{0x01}) + s.db.Put([]byte{0x80, 0x03}, []byte{0x03}) + s.db.Put([]byte{0x80, 0x02}, []byte{0x02}) + c.Check(s.db.FetchByPrefix([]byte{0x80}), DeepEquals, [][]byte{{0x01}, {0x02}, {0x03}}) + c.Check(s.db.KeysByPrefix([]byte{0x80}), DeepEquals, [][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}}) + + s.db.Put([]byte{0x90, 0x01}, []byte{0x04}) + c.Check(s.db.FetchByPrefix([]byte{0x80}), DeepEquals, [][]byte{{0x01}, {0x02}, {0x03}}) + c.Check(s.db.KeysByPrefix([]byte{0x80}), DeepEquals, [][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}}) + + s.db.Put([]byte{0x00, 0x01}, []byte{0x05}) + c.Check(s.db.FetchByPrefix([]byte{0x80}), DeepEquals, [][]byte{{0x01}, {0x02}, {0x03}}) + c.Check(s.db.KeysByPrefix([]byte{0x80}), DeepEquals, [][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}}) + + keys := [][]byte{} + values := [][]byte{} + + c.Check(s.db.ProcessByPrefix([]byte{0x80}, func(k, v []byte) error { + keys = append(keys, append([]byte(nil), k...)) + values = append(values, append([]byte(nil), v...)) + return nil + }), IsNil) + + c.Check(values, DeepEquals, [][]byte{{0x01}, {0x02}, {0x03}}) + c.Check(keys, DeepEquals, [][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}}) + + c.Check(s.db.ProcessByPrefix([]byte{0x80}, func(k, v []byte) error { + return database.ErrNotFound + }), Equals, database.ErrNotFound) + + c.Check(s.db.ProcessByPrefix([]byte{0xa0}, func(k, v []byte) error { + return database.ErrNotFound + }), IsNil) + + c.Check(s.db.FetchByPrefix([]byte{0xa0}), DeepEquals, [][]byte{}) + c.Check(s.db.KeysByPrefix([]byte{0xa0}), DeepEquals, [][]byte{}) +} + +func (s *EtcDDBSuite) TestHasPrefix(c *C) { + //c.Check(s.db.HasPrefix([]byte(nil)), Equals, false) + //c.Check(s.db.HasPrefix([]byte{0x80}), Equals, false) + + s.db.Put([]byte{0x80, 0x01}, []byte{0x01}) + + c.Check(s.db.HasPrefix([]byte(nil)), Equals, true) + c.Check(s.db.HasPrefix([]byte{0x80}), Equals, true) + c.Check(s.db.HasPrefix([]byte{0x79}), Equals, false) +} + +func (s *EtcDDBSuite) TestTransactionCommit(c *C) { + var ( + key = []byte("key") + key2 = []byte("key2") + value = []byte("value") + value2 = []byte("value2") + ) + transaction, err := s.db.OpenTransaction() + + err = s.db.Put(key, value) + c.Assert(err, IsNil) + + c.Assert(err, IsNil) + transaction.Put(key2, value2) + v, err := s.db.Get(key) + c.Check(v, DeepEquals, value) + transaction.Delete(key) + + _, err = transaction.Get(key2) + c.Assert(err, IsNil) + + v2, err := transaction.Get(key2) + c.Check(err, IsNil) + c.Check(v2, DeepEquals, value2) + + _, err = transaction.Get(key) + c.Assert(err, IsNil) + + err = transaction.Commit() + c.Check(err, IsNil) + + v2, err = transaction.Get(key2) + c.Check(err, IsNil) + c.Check(v2, DeepEquals, value2) + + _, err = transaction.Get(key) + c.Assert(err, IsNil) +} diff --git a/database/etcddb/storage.go b/database/etcddb/storage.go new file mode 100644 index 00000000..c36c3bea --- /dev/null +++ b/database/etcddb/storage.go @@ -0,0 +1,163 @@ +package etcddb + +import ( + "github.com/aptly-dev/aptly/database" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type EtcDStorage struct { + url string + db *clientv3.Client +} + +// CreateTemporary creates new DB of the same type in temp dir +func (s *EtcDStorage) CreateTemporary() (database.Storage, error) { + return s, nil +} + +// Get key value from etcd +func (s *EtcDStorage) Get(key []byte) (value []byte, err error) { + getResp, err := s.db.Get(Ctx, string(key)) + if err != nil { + return + } + for _, kv := range getResp.Kvs { + value = kv.Value + } + if len(value) == 0 { + err = database.ErrNotFound + return + } + return +} + +// Put saves key to etcd, if key has the same value in DB already, it is not saved +func (s *EtcDStorage) Put(key []byte, value []byte) (err error) { + _, err = s.db.Put(Ctx, string(key), string(value)) + if err != nil { + return + } + return +} + +// Delete removes key from etcd +func (s *EtcDStorage) Delete(key []byte) (err error) { + _, err = s.db.Delete(Ctx, string(key)) + if err != nil { + return + } + return +} + +// KeysByPrefix returns all keys that start with prefix +func (s *EtcDStorage) KeysByPrefix(prefix []byte) [][]byte { + result := make([][]byte, 0, 20) + getResp, err := s.db.Get(Ctx, string(prefix), clientv3.WithPrefix()) + if err != nil { + return nil + } + for _, ev := range getResp.Kvs { + key := ev.Key + keyc := make([]byte, len(key)) + copy(keyc, key) + result = append(result, key) + } + return result +} + +// FetchByPrefix returns all values with keys that start with prefix +func (s *EtcDStorage) FetchByPrefix(prefix []byte) [][]byte { + result := make([][]byte, 0, 20) + getResp, err := s.db.Get(Ctx, string(prefix), clientv3.WithPrefix()) + if err != nil { + return nil + } + for _, kv := range getResp.Kvs { + valc := make([]byte, len(kv.Value)) + copy(valc, kv.Value) + result = append(result, kv.Value) + } + + return result +} + +// HasPrefix checks whether it can find any key with given prefix and returns true if one exists +func (s *EtcDStorage) HasPrefix(prefix []byte) bool { + getResp, err := s.db.Get(Ctx, string(prefix), clientv3.WithPrefix()) + if err != nil { + return false + } + if getResp.Count != 0 { + return true + } + return false +} + +// ProcessByPrefix iterates through all entries where key starts with prefix and calls +// StorageProcessor on key value pair +func (s *EtcDStorage) ProcessByPrefix(prefix []byte, proc database.StorageProcessor) error { + getResp, err := s.db.Get(Ctx, string(prefix), clientv3.WithPrefix()) + if err != nil { + return err + } + + for _, kv := range getResp.Kvs { + err := proc(kv.Key, kv.Value) + if err != nil { + return err + } + } + return nil +} + +// Close finishes etcd connect +func (s *EtcDStorage) Close() error { + if s.db == nil { + return nil + } + err := s.db.Close() + s.db = nil + return err +} + +// Reopen tries to open (re-open) the database +func (s *EtcDStorage) Open() error { + if s.db != nil { + return nil + } + var err error + s.db, err = internalOpen(s.url) + return err +} + +// CreateBatch creates a Batch object +func (s *EtcDStorage) CreateBatch() database.Batch { + return &EtcDBatch{ + db: s.db, + } +} + +// OpenTransaction creates new transaction. +func (s *EtcDStorage) OpenTransaction() (database.Transaction, error) { + cli, err := internalOpen(s.url) + if err != nil { + return nil, err + } + kvc := clientv3.NewKV(cli) + return &transaction{t: kvc}, nil +} + +// CompactDB compacts database by merging layers +func (s *EtcDStorage) CompactDB() error { + return nil +} + +// Drop removes all the etcd files (DANGEROUS!) +func (s *EtcDStorage) Drop() error { + return nil +} + +// Check interface +var ( + _ database.Storage = &EtcDStorage{} +) diff --git a/database/etcddb/transaction.go b/database/etcddb/transaction.go new file mode 100644 index 00000000..4b083056 --- /dev/null +++ b/database/etcddb/transaction.go @@ -0,0 +1,55 @@ +package etcddb + +import ( + "github.com/aptly-dev/aptly/database" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/clientv3util" +) + +type transaction struct { + t clientv3.KV +} + +// Get implements database.Reader interface. +func (t *transaction) Get(key []byte) ([]byte, error) { + getResp, err := t.t.Get(Ctx, string(key)) + if err != nil { + return nil, err + } + + var value []byte + for _, kv := range getResp.Kvs { + valc := make([]byte, len(kv.Value)) + copy(valc, kv.Value) + value = valc + } + + return value, nil +} + +// Put implements database.Writer interface. +func (t *transaction) Put(key, value []byte) (err error) { + _, err = t.t.Txn(Ctx). + If().Then(clientv3.OpPut(string(key), string(value))).Commit() + return +} + +// Delete implements database.Writer interface. +func (t *transaction) Delete(key []byte) (err error) { + _, err = t.t.Txn(Ctx). + If(clientv3util.KeyExists(string(key))). + Then(clientv3.OpDelete(string(key))).Commit() + return +} + +func (t *transaction) Commit() (err error) { + return +} + +// Discard is safe to call after Commit(), it would be no-op +func (t *transaction) Discard() { + return +} + +// transaction should implement database.Transaction +var _ database.Transaction = &transaction{} diff --git a/go.mod b/go.mod index 311217cb..d42e38c9 100644 --- a/go.mod +++ b/go.mod @@ -63,11 +63,14 @@ require ( github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect github.com/chenzhuoyu/iasm v0.9.0 // indirect github.com/cloudflare/circl v1.3.7 // indirect + github.com/coreos/go-semver v0.3.0 // indirect + github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/goccy/go-json v0.10.2 // indirect + github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.3.1 // indirect @@ -90,10 +93,17 @@ require ( github.com/rivo/uniseg v0.4.4 // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + go.etcd.io/etcd/api/v3 v3.5.0-rc.0 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.5.0-rc.0 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.6.0 // indirect + go.uber.org/zap v1.17.0 // indirect golang.org/x/arch v0.5.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sync v0.3.0 // indirect golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect + google.golang.org/grpc v1.38.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) @@ -104,4 +114,5 @@ require ( github.com/aws/aws-sdk-go-v2/credentials v1.13.43 github.com/aws/aws-sdk-go-v2/service/s3 v1.40.2 github.com/aws/smithy-go v1.15.0 + go.etcd.io/etcd/client/v3 v3.5.0-rc.0 ) diff --git a/man/aptly.1.ronn.tmpl b/man/aptly.1.ronn.tmpl index d721fe94..f892ef91 100644 --- a/man/aptly.1.ronn.tmpl +++ b/man/aptly.1.ronn.tmpl @@ -28,6 +28,7 @@ Configuration file is stored in JSON format (default values shown below): { "rootDir": "$HOME/.aptly", + "databaseEtcd": "", "downloadConcurrency": 4, "downloadSpeedLimit": 0, "downloadRetries": 0, @@ -119,6 +120,9 @@ Options: the default for downloaded packages (`rootDir`/pool) and the default for published repositories (`rootDir`/public) + * `databaseEtcd`: + the etcd database connection address is empty by default, which means it is not used + * `downloadConcurrency`: is a number of parallel download threads to use when downloading packages diff --git a/system/t02_config/ConfigShowTest_gold b/system/t02_config/ConfigShowTest_gold index 0ae32b67..58c65bf9 100644 --- a/system/t02_config/ConfigShowTest_gold +++ b/system/t02_config/ConfigShowTest_gold @@ -29,5 +29,6 @@ "enableMetricsEndpoint": true, "logLevel": "debug", "logFormat": "default", - "serveInAPIMode": true + "serveInAPIMode": true, + "databaseEtcd": "" } diff --git a/system/t02_config/CreateConfigTest_gold b/system/t02_config/CreateConfigTest_gold index 0b3ed792..2aaf7c64 100644 --- a/system/t02_config/CreateConfigTest_gold +++ b/system/t02_config/CreateConfigTest_gold @@ -29,5 +29,6 @@ "enableMetricsEndpoint": false, "logLevel": "debug", "logFormat": "default", - "serveInAPIMode": false -} \ No newline at end of file + "serveInAPIMode": false, + "databaseEtcd": "" +} diff --git a/utils/config.go b/utils/config.go index acc063d7..e9ebc536 100644 --- a/utils/config.go +++ b/utils/config.go @@ -40,6 +40,7 @@ type ConfigStructure struct { // nolint: maligned LogLevel string `json:"logLevel"` LogFormat string `json:"logFormat"` ServeInAPIMode bool `json:"serveInAPIMode"` + DatabaseEtcd string `json:"databaseEtcd"` } type LocalPoolStorage struct { diff --git a/utils/config_test.go b/utils/config_test.go index 4304ea8b..3036e8a8 100644 --- a/utils/config_test.go +++ b/utils/config_test.go @@ -143,7 +143,8 @@ func (s *ConfigSuite) TestSaveConfig(c *C) { " \"enableMetricsEndpoint\": false,\n"+ " \"logLevel\": \"info\",\n"+ " \"logFormat\": \"json\",\n"+ - " \"serveInAPIMode\": false\n"+ + " \"serveInAPIMode\": false,\n"+ + " \"databaseEtcd\": \"\"\n"+ "}") }