feat: database backend add ssdb support

Change-Id: I054c5fc9b02f613601781de8613d684faa0ea7f2
This commit is contained in:
hudeng
2022-09-22 16:08:27 +08:00
committed by André Roth
parent ab18da351d
commit b3b6ce3539
7 changed files with 826 additions and 0 deletions

View File

@@ -5,11 +5,13 @@ import (
gocontext "context"
"fmt"
"math/rand"
"net/url"
"os"
"os/signal"
"path/filepath"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync"
"syscall"
@@ -21,6 +23,7 @@ import (
"github.com/aptly-dev/aptly/database"
"github.com/aptly-dev/aptly/database/etcddb"
"github.com/aptly-dev/aptly/database/goleveldb"
"github.com/aptly-dev/aptly/database/ssdb"
"github.com/aptly-dev/aptly/deb"
"github.com/aptly-dev/aptly/files"
"github.com/aptly-dev/aptly/http"
@@ -29,6 +32,7 @@ import (
"github.com/aptly-dev/aptly/swift"
"github.com/aptly-dev/aptly/task"
"github.com/aptly-dev/aptly/utils"
"github.com/seefan/gossdb/v2/conf"
"github.com/smira/commander"
"github.com/smira/flag"
)
@@ -301,6 +305,21 @@ func (context *AptlyContext) _database() (database.Storage, error) {
context.database, err = goleveldb.NewDB(dbPath)
case "etcd":
context.database, err = etcddb.NewDB(context.config().DatabaseBackend.URL)
case "ssdb":
var cfg conf.Config
u, e := url.Parse(context.config().DatabaseBackend.URL)
if e != nil {
return nil, e
}
cfg.Port, e = strconv.Atoi(u.Port())
cfg.Host = strings.Split(u.Host, ":")[0]
if e != nil {
return nil, e
}
password, _ := u.User.Password()
cfg.Password = password
context.database, err = ssdb.NewOpenDB(&cfg)
default:
context.database, err = goleveldb.NewDB(context.dbPath())
}

129
database/ssdb/batch.go Normal file
View File

@@ -0,0 +1,129 @@
package ssdb
import (
"fmt"
"github.com/aptly-dev/aptly/database"
"github.com/seefan/gossdb/v2/conf"
"github.com/seefan/gossdb/v2/pool"
)
const (
delOpt = "del"
)
type bWriteData struct {
key []byte
value []byte
opts string
err error
}
type Batch struct {
cfg *conf.Config
// key-value chan
w chan bWriteData
p map[string]interface{}
d []string
db *pool.Client
}
// func internalOpenBatch...
func internalOpenBatch(t database.Storage) *Batch {
b := &Batch{
w: make(chan bWriteData),
p: make(map[string]interface{}),
}
b.run()
return b
}
func (b *Batch) run() {
go func() {
for {
select {
case w, ok := <-b.w:
{
if !ok {
ssdbLog("ssdb batch write chan closed")
return
}
if w.opts == "write" {
ssdbLog("ssdb batch write")
var err error
if len(b.p) > 0 && len(b.d) == 0 {
err = b.db.MultiSet(b.p)
ssdbLog("ssdb batch set errinfo: ", err)
} else if len(b.d) > 0 && len(b.p) == 0 {
err = b.db.MultiDel(b.d...)
ssdbLog("ssdb batch del errinfo: ", err)
} else if len(b.p) == 0 && len(b.d) == 0 {
err = nil
} else {
err = fmt.Errorf("ssdb batch does not support both put and delete operations")
}
ssdbLog("ssdb batch write errinfo: ", err)
b.w <- bWriteData{
err: err,
}
ssdbLog("ssdb batch write end")
} else {
ssdbLog("ssdb batch", w.opts)
if w.opts == "put" {
b.p[string(w.key)] = w.value
} else if w.opts == delOpt {
b.d = append(b.d, string(w.key))
}
}
}
}
}
}()
}
func (b *Batch) stop() {
ssdbLog("ssdb batch stop")
close(b.w)
}
func (b *Batch) Put(key, value []byte) (err error) {
// err = b.db.Set(string(key), string(value))
w := bWriteData{
key: key,
value: value,
opts: "put",
}
b.w <- w
return nil
}
func (b *Batch) Delete(key []byte) (err error) {
/* err = b.db.Del(string(key))
return */
w := bWriteData{
key: key,
opts: delOpt,
}
b.w <- w
return nil
}
func (b *Batch) Write() (err error) {
defer b.stop()
w := bWriteData{
opts: "write",
}
b.w <- w
result := <-b.w
return result.err
}
// batch should implement database.Batch
var (
_ database.Batch = &Batch{}
)

62
database/ssdb/database.go Normal file
View File

@@ -0,0 +1,62 @@
package ssdb
import (
"os"
"strconv"
"github.com/aptly-dev/aptly/database"
"github.com/seefan/gossdb/v2"
"github.com/seefan/gossdb/v2/conf"
"github.com/seefan/gossdb/v2/pool"
)
var defaultBufSize = 102400
var defaultPoolSize = 1
func internalOpen(cfg *conf.Config) (*pool.Client, error) {
ssdbLog("internalOpen")
cfg.ReadBufferSize = defaultBufSize
cfg.WriteBufferSize = defaultBufSize
cfg.MaxPoolSize = defaultPoolSize
cfg.PoolSize = defaultPoolSize
cfg.MinPoolSize = defaultPoolSize
cfg.MaxWaitSize = 100 * defaultPoolSize
cfg.RetryEnabled = true
//override by env
if os.Getenv("SSDB_READBUFFERSIZE") != "" {
readBufSize, err := strconv.Atoi(os.Getenv("SSDB_READBUFFERSIZE"))
if err != nil {
cfg.ReadBufferSize = readBufSize
}
}
if os.Getenv("SSDB_WRITEBUFFERSIZE") != "" {
writeBufSize, err := strconv.Atoi(os.Getenv("SSDB_WRITEBUFFERSIZE"))
if err != nil {
cfg.WriteBufferSize = writeBufSize
}
}
var cfgs = []*conf.Config{cfg}
err := gossdb.Start(cfgs...)
if err != nil {
return nil, err
}
return gossdb.NewClient()
}
func NewDB(cfg *conf.Config) (database.Storage, error) {
return &Storage{cfg: cfg}, nil
}
func NewOpenDB(cfg *conf.Config) (database.Storage, error) {
db, err := NewDB(cfg)
if err != nil {
return nil, err
}
return db, db.Open()
}

View File

@@ -0,0 +1,233 @@
package ssdb_test
import (
"fmt"
"testing"
"github.com/aptly-dev/aptly/database"
"github.com/aptly-dev/aptly/database/ssdb"
"github.com/seefan/gossdb/v2/conf"
. "gopkg.in/check.v1"
)
// Launch gocheck tests
func Test(t *testing.T) {
TestingT(t)
}
type SSDBSuite struct {
cfg *conf.Config
db database.Storage
}
var _ = Suite(&SSDBSuite{cfg: &conf.Config{
Host: "127.0.0.1",
Port: 8888,
}})
func (s *SSDBSuite) SetUpTest(c *C) {
var err error
s.db, err = ssdb.NewOpenDB(s.cfg)
c.Assert(err, IsNil)
}
func (s *SSDBSuite) TestSetUpTest(c *C) {
var err error
s.db, err = ssdb.NewOpenDB(s.cfg)
c.Assert(err, IsNil)
}
func (s *SSDBSuite) TestGetPut(c *C) {
var (
key = []byte("key")
value = []byte("value")
)
var err error
err = s.db.Put(key, value)
c.Assert(err, IsNil)
result, err := s.db.Get(key)
c.Assert(err, IsNil)
c.Assert(result, DeepEquals, value)
}
func (s *SSDBSuite) TestTemporaryDelete(c *C) {
fmt.Println("TestTemporaryDelete")
var (
key = []byte("key")
value = []byte("value")
)
temp, err := s.db.CreateTemporary()
c.Assert(err, IsNil)
c.Check(temp.HasPrefix([]byte(nil)), Equals, false)
err = temp.Put(key, value)
c.Assert(err, IsNil)
c.Check(temp.HasPrefix([]byte(nil)), Equals, true)
c.Assert(temp.Close(), IsNil)
c.Assert(temp.Drop(), IsNil)
}
func (s *SSDBSuite) TestDelete(c *C) {
var (
key = []byte("key")
value = []byte("value")
)
err := s.db.Put(key, value)
c.Assert(err, IsNil)
_, err = s.db.Get(key)
c.Assert(err, IsNil)
err = s.db.Delete(key)
c.Assert(err, IsNil)
}
func (s *SSDBSuite) TestByPrefix(c *C) {
//c.Check(s.db.FetchByPrefix([]byte{0x80}), DeepEquals, [][]byte{})
s.db.Put([]byte{0x80, 0x01}, []byte{0x01})
s.db.Put([]byte{0x80, 0x03}, []byte{0x03})
s.db.Put([]byte{0x80, 0x02}, []byte{0x02})
c.Check(len(s.db.FetchByPrefix([]byte{0x80})), DeepEquals, len([][]byte{{0x01}, {0x02}, {0x03}}))
c.Check(len(s.db.KeysByPrefix([]byte{0x80})), DeepEquals, len([][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}}))
s.db.Put([]byte{0x90, 0x01}, []byte{0x04})
c.Check(len(s.db.FetchByPrefix([]byte{0x80})), DeepEquals, len([][]byte{{0x01}, {0x02}, {0x03}}))
c.Check(len(s.db.KeysByPrefix([]byte{0x80})), DeepEquals, len([][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}}))
s.db.Put([]byte{0x00, 0x01}, []byte{0x05})
c.Check(len(s.db.FetchByPrefix([]byte{0x80})), DeepEquals, len([][]byte{{0x01}, {0x02}, {0x03}}))
c.Check(len(s.db.KeysByPrefix([]byte{0x80})), DeepEquals, len([][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}}))
keys := [][]byte{}
values := [][]byte{}
c.Check(s.db.ProcessByPrefix([]byte{0x80}, func(k, v []byte) error {
keys = append(keys, append([]byte(nil), k...))
values = append(values, append([]byte(nil), v...))
return nil
}), IsNil)
c.Check(len(values), DeepEquals, len([][]byte{{0x01}, {0x02}, {0x03}}))
c.Check(len(keys), DeepEquals, len([][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}}))
c.Check(s.db.ProcessByPrefix([]byte{0x80}, func(k, v []byte) error {
return database.ErrNotFound
}), Equals, database.ErrNotFound)
c.Check(s.db.ProcessByPrefix([]byte{0xa0}, func(k, v []byte) error {
return database.ErrNotFound
}), IsNil)
c.Check(s.db.FetchByPrefix([]byte{0xa0}), DeepEquals, [][]byte{})
c.Check(s.db.KeysByPrefix([]byte{0xa0}), DeepEquals, [][]byte{})
}
func (s *SSDBSuite) TestHasPrefix(c *C) {
s.db.Put([]byte{0x80, 0x01}, []byte{0x01})
//c.Check(s.db.HasPrefix([]byte("")), Equals, true)
c.Check(s.db.HasPrefix([]byte{0x80}), Equals, true)
c.Check(s.db.HasPrefix([]byte{0x79}), Equals, false)
}
func (s *SSDBSuite) TestTransactionCommit(c *C) {
var (
key = []byte("key")
key2 = []byte("key2")
value = []byte("value")
value2 = []byte("value2")
)
s.db.Delete(key)
s.db.Delete(key2)
transaction, err := s.db.OpenTransaction()
c.Assert(err, IsNil)
defer transaction.Discard()
err = s.db.Put(key, value)
c.Assert(err, IsNil)
v, err := s.db.Get(key)
c.Assert(err, IsNil)
c.Check(v, DeepEquals, value)
err = transaction.Put(key2, value2)
c.Assert(err, IsNil)
v, err = transaction.Get(key2)
c.Check(err, IsNil)
c.Check(v, DeepEquals, value2)
_, err = s.db.Get(key2)
c.Assert(err, ErrorMatches, "key not found")
err = transaction.Delete(key)
c.Assert(err, IsNil)
_, err = transaction.Get(key)
c.Assert(err, ErrorMatches, "key not found")
v, err = s.db.Get(key)
c.Assert(err, IsNil)
c.Check(v, DeepEquals, value)
err = transaction.Commit()
c.Check(err, IsNil)
v, err = s.db.Get(key2)
c.Check(err, IsNil)
c.Check(v, DeepEquals, value2)
_, err = s.db.Get(key)
c.Assert(err, ErrorMatches, "key not found")
}
func (s *SSDBSuite) TestBatch(c *C) {
var (
key = []byte("bkey")
key2 = []byte("bkey2")
value = []byte("bvalue")
value2 = []byte("bvalue2")
)
err := s.db.Put(key, value)
c.Check(err, IsNil)
batch := s.db.CreateBatch()
batch.Put(key2, value2)
v, err := s.db.Get(key)
c.Check(err, IsNil)
c.Check(v, DeepEquals, value)
_, err = s.db.Get(key2)
c.Check(err, ErrorMatches, "key not found")
err = batch.Write()
c.Check(err, IsNil)
v, err = s.db.Get(key2)
c.Check(err, IsNil)
c.Check(v, DeepEquals, value2)
batch = s.db.CreateBatch()
batch.Delete(key)
batch.Delete(key2)
c.Check(err, IsNil)
v, err = s.db.Get(key)
c.Check(err, IsNil)
c.Check(v, DeepEquals, value)
c.Check(err, IsNil)
v, err = s.db.Get(key2)
c.Check(err, IsNil)
c.Check(v, DeepEquals, value2)
err = batch.Write()
c.Check(err, IsNil)
_, err = s.db.Get(key2)
c.Check(err, ErrorMatches, "key not found")
_, err = s.db.Get(key)
c.Check(err, ErrorMatches, "key not found")
}

12
database/ssdb/log.go Normal file
View File

@@ -0,0 +1,12 @@
package ssdb
import (
"fmt"
"os"
)
func ssdbLog(a ...interface{}) {
if os.Getenv("SSDB_DEBUG") != "" {
fmt.Println(a...)
}
}

183
database/ssdb/storage.go Normal file
View File

@@ -0,0 +1,183 @@
package ssdb
import (
"os"
"github.com/aptly-dev/aptly/database"
"github.com/aptly-dev/aptly/database/goleveldb"
"github.com/seefan/gossdb/v2"
"github.com/seefan/gossdb/v2/conf"
"github.com/seefan/gossdb/v2/pool"
)
type Storage struct {
cfg *conf.Config
db *pool.Client
}
// CreateTemporary creates new DB of the same type in temp dir
func (s *Storage) CreateTemporary() (database.Storage, error) {
// use leveldb as temp db
tmpPath := os.Getenv("SSDB_TMPDB_PATH")
if tmpPath == "" {
tmpPath = "/tmp/ssdb_tmpdb_path"
}
gdb, err := goleveldb.NewDB(tmpPath)
if err != nil {
return nil, err
}
return gdb.CreateTemporary()
}
// Get key value from ssdb
func (s *Storage) Get(key []byte) (value []byte, err error) {
// ssdbLog("ssdb origin db get key:", string(key))
getResp, err := s.db.Get(string(key))
if err != nil {
return
}
value = getResp.Bytes()
if len(value) == 0 {
err = database.ErrNotFound
return
}
return
}
// Put saves key to ssdb, if key has the same value in DB already, it is not saved
func (s *Storage) Put(key []byte, value []byte) (err error) {
//ssdbLog("ssdb origin db put key:", string(key), " value: ", string(value))
err = s.db.Set(string(key), value)
if err != nil {
return
}
return
}
// Delete removes key from ssdb
func (s *Storage) Delete(key []byte) (err error) {
//ssdbLog("ssdb origin db del key:", string(key))
err = s.db.Del(string(key))
if err != nil {
return
}
return
}
// KeysByPrefix returns all keys that start with prefix
func (s *Storage) KeysByPrefix(prefix []byte) [][]byte {
result := make([][]byte, 0)
getResp, err := s.db.Keys(string(prefix), string(prefix)+"}", -1)
if err != nil {
return nil
}
for _, ev := range getResp {
key := []byte(ev)
keyc := make([]byte, len(key))
copy(keyc, key)
result = append(result, key)
}
return result
}
// FetchByPrefix returns all values with keys that start with prefix
func (s *Storage) FetchByPrefix(prefix []byte) [][]byte {
result := make([][]byte, 0)
getResp, err := s.db.Scan(string(prefix), string(prefix)+"}", -1)
if err != nil {
return nil
}
for _, ev := range getResp {
value := ev.Bytes()
valuec := make([]byte, len(value))
copy(valuec, value)
result = append(result, valuec)
}
return result
}
// HasPrefix checks whether it can find any key with given prefix and returns true if one exists
func (s *Storage) HasPrefix(prefix []byte) bool {
//ssdbLog("HasPrefix", string(prefix), string(prefix)+"}")
getResp, err := s.db.Keys(string(prefix), string(prefix)+"}", -1)
if err != nil {
return false
}
//ssdbLog("HasPrefix", len(getResp))
if len(getResp) > 0 {
return true
}
return false
}
// ProcessByPrefix iterates through all entries where key starts with prefix and calls
// StorageProcessor on key value pair
func (s *Storage) ProcessByPrefix(prefix []byte, proc database.StorageProcessor) error {
getResp, err := s.db.Scan(string(prefix), string(prefix)+"}", -1)
if err != nil {
return err
}
for k, v := range getResp {
err := proc([]byte(k), v.Bytes())
if err != nil {
return err
}
}
return nil
}
// Close finishes ssdb connect
func (s *Storage) Close() error {
ssdbLog("ssdb close")
if s.db != nil {
s.db.Close()
s.db = nil
}
gossdb.Shutdown()
return nil
}
// Reopen tries to open (re-open) the database
func (s *Storage) Open() error {
ssdbLog("ssdb open")
if s.db != nil && s.db.IsOpen() {
ssdbLog("ssdb opened")
return nil
}
var err error
s.db, err = internalOpen(s.cfg)
return err
}
// CreateBatch creates a Batch object
func (s *Storage) CreateBatch() database.Batch {
Batch := internalOpenBatch(s)
Batch.cfg = s.cfg
Batch.db = s.db
return Batch
}
// OpenTransaction creates new transaction.
func (s *Storage) OpenTransaction() (database.Transaction, error) {
return internalOpenTransaction(s)
}
// CompactDB compacts database by merging layers
func (s *Storage) CompactDB() error {
return nil
}
// Drop removes all the ssdb files (DANGEROUS!)
func (s *Storage) Drop() error {
return nil
}
// Check interface
var (
_ database.Storage = &Storage{}
)

View File

@@ -0,0 +1,188 @@
package ssdb
import (
"fmt"
"github.com/aptly-dev/aptly/database"
)
type trWriteData struct {
key []byte
value []byte
opts string
err error
}
type trReadData struct {
kv []byte
err error
}
type transaction struct {
// for key-value-operation chan
w chan trWriteData
// key read chan
r chan trReadData
q map[string]trWriteData
t database.Storage
}
// func internalOpenTransaction...
func internalOpenTransaction(t database.Storage) (*transaction, error) {
tr := &transaction{
w: make(chan trWriteData),
r: make(chan trReadData),
q: make(map[string]trWriteData),
t: t,
}
return tr, tr.run()
}
// func run...
func (t *transaction) run() error {
go func() {
for {
select {
case w, ok := <-t.w:
{
if !ok {
ssdbLog("ssdb transaction write chan closed")
return
}
if w.opts == "commit" {
ssdbLog("ssdb transaction commit")
var errs []error
for _, vo := range t.q {
if vo.opts == "put" {
err := t.t.Put(vo.key, vo.value)
if err != nil {
//ssdbLog(err)
errs = append(errs, err)
}
}
if vo.opts == delOpt {
err := t.t.Delete(vo.key)
if err != nil {
errs = append(errs, err)
}
}
}
if len(errs) == 0 {
t.w <- trWriteData{
err: nil,
}
} else {
t.w <- trWriteData{
err: fmt.Errorf("ssdb transaction write errs: %v", errs),
}
}
ssdbLog("ssdb transaction commit end")
} else {
ssdbLog("ssdb transaction", w.opts)
//ssdbLog("ssdb r transaction", w.opts, "key: ", string(w.key), "value: ", string(w.value))
t.q[string(w.key)] = w
}
}
case r, ok := <-t.r:
{
if !ok {
ssdbLog("ssdb transaction read chan closed")
return
}
if rData, ok := t.q[string(r.kv)]; ok {
if rData.opts == delOpt {
// del return not found error
t.r <- trReadData{
kv: nil,
err: database.ErrNotFound,
}
} else {
t.r <- trReadData{
kv: rData.value,
err: nil,
}
}
} else {
v, err := t.t.Get(r.kv)
t.r <- trReadData{
kv: v,
err: err,
}
}
}
}
}
}()
return nil
}
// Get implements database.Reader interface.
func (t *transaction) Get(key []byte) ([]byte, error) {
keyc := make([]byte, len(key))
copy(keyc, key)
r := trReadData{
kv: keyc,
err: nil,
}
t.r <- r
result := <-t.r
return result.kv, result.err
}
// Put implements database.Writer interface.
func (t *transaction) Put(key, value []byte) error {
//ssdbLog("golf*********************ssdb put")
//ssdbLog("ssdb transaction db put key:", string(key), " value: ", string(value))
keyc := make([]byte, len(key))
copy(keyc, key)
valuec := make([]byte, len(value))
copy(valuec, value)
w := trWriteData{
key: keyc,
value: valuec,
opts: "put",
}
t.w <- w
return nil
}
// Delete implements database.Writer interface.
func (t *transaction) Delete(key []byte) error {
//return t.t.Delete(key)
//ssdbLog("golf*********************ssdb del")
keyc := make([]byte, len(key))
copy(keyc, key)
w := trWriteData{
key: keyc,
opts: delOpt,
}
t.w <- w
return nil
}
func (t *transaction) Commit() error {
w := trWriteData{
opts: "commit",
}
t.w <- w
result := <-t.w
return result.err
}
// Discard is safe to call after Commit(), it would be no-op
func (t *transaction) Discard() {
ssdbLog("ssdb transaction stop")
close(t.r)
close(t.w)
}
// transaction should implement database.Transaction
var _ database.Transaction = &transaction{}