Vendor update goleveldb

There are number of changes which went in recently which should improve
performance: https://github.com/syndtr/goleveldb/issues/226#issuecomment-477568827
This commit is contained in:
Andrey Smirnov
2019-09-18 00:24:34 +03:00
committed by Oliver Sauder
parent c75ef8546e
commit bb66b2296d
27 changed files with 681 additions and 194 deletions
Generated
+11 -11
View File
@@ -25,7 +25,7 @@
"ast", "ast",
"parser", "parser",
"scanner", "scanner",
"token", "token"
] ]
pruneopts = "" pruneopts = ""
revision = "761fd5fbb34e4c2c138c280395b65b48e4ff5a53" revision = "761fd5fbb34e4c2c138c280395b65b48e4ff5a53"
@@ -69,7 +69,7 @@
"private/protocol/restxml", "private/protocol/restxml",
"private/protocol/xml/xmlutil", "private/protocol/xml/xmlutil",
"service/s3", "service/s3",
"service/sts", "service/sts"
] ]
pruneopts = "" pruneopts = ""
revision = "420cda5d6383f94f7d9c231aa44bad3325181950" revision = "420cda5d6383f94f7d9c231aa44bad3325181950"
@@ -97,7 +97,7 @@
packages = [ packages = [
".", ".",
"binding", "binding",
"render", "render"
] ]
pruneopts = "" pruneopts = ""
revision = "d459835d2b077e44f7c9b453505ee29881d5d12d" revision = "d459835d2b077e44f7c9b453505ee29881d5d12d"
@@ -196,7 +196,7 @@
name = "github.com/ncw/swift" name = "github.com/ncw/swift"
packages = [ packages = [
".", ".",
"swifttest", "swifttest"
] ]
pruneopts = "" pruneopts = ""
revision = "8e9b10220613abdbc2896808ee6b43e411a4fa6c" revision = "8e9b10220613abdbc2896808ee6b43e411a4fa6c"
@@ -258,8 +258,7 @@
revision = "0c531f070014e218b21f3cfca801cc992d52726d" revision = "0c531f070014e218b21f3cfca801cc992d52726d"
[[projects]] [[projects]]
branch = "master" digest = "1:0a9834d471916f392c608d4f13225444d47b6b7a0378dce1aef683d83711179f"
digest = "1:175bc8d87d54849b9b9fc6cd473c5830e090fbf3c2f0ca71c7642a697e5d9237"
name = "github.com/syndtr/goleveldb" name = "github.com/syndtr/goleveldb"
packages = [ packages = [
"leveldb", "leveldb",
@@ -273,10 +272,11 @@
"leveldb/opt", "leveldb/opt",
"leveldb/storage", "leveldb/storage",
"leveldb/table", "leveldb/table",
"leveldb/util", "leveldb/util"
] ]
pruneopts = "" pruneopts = ""
revision = "714f901b98fdb3aa954b4193d8cbd64a28d80cad" revision = "9d007e481048296f09f59bd19bb7ae584563cd95"
version = "v1.0.0"
[[projects]] [[projects]]
digest = "1:a1ca17cff1abec6d4ecdeb03c52338c559affd7f0b2474e928c162986da2d348" digest = "1:a1ca17cff1abec6d4ecdeb03c52338c559affd7f0b2474e928c162986da2d348"
@@ -307,7 +307,7 @@
"openpgp/errors", "openpgp/errors",
"openpgp/packet", "openpgp/packet",
"openpgp/s2k", "openpgp/s2k",
"ssh/terminal", "ssh/terminal"
] ]
pruneopts = "" pruneopts = ""
revision = "b2aa35443fbc700ab74c586ae79b81c171851023" revision = "b2aa35443fbc700ab74c586ae79b81c171851023"
@@ -318,7 +318,7 @@
name = "golang.org/x/sys" name = "golang.org/x/sys"
packages = [ packages = [
"unix", "unix",
"windows", "windows"
] ]
pruneopts = "" pruneopts = ""
revision = "1d206c9fa8975fb4cf00df1dc8bf3283dc24ba0e" revision = "1d206c9fa8975fb4cf00df1dc8bf3283dc24ba0e"
@@ -399,7 +399,7 @@
"golang.org/x/crypto/openpgp/packet", "golang.org/x/crypto/openpgp/packet",
"golang.org/x/crypto/ssh/terminal", "golang.org/x/crypto/ssh/terminal",
"golang.org/x/sys/unix", "golang.org/x/sys/unix",
"gopkg.in/check.v1", "gopkg.in/check.v1"
] ]
solver-name = "gps-cdcl" solver-name = "gps-cdcl"
solver-version = 1 solver-version = 1
+4 -5
View File
@@ -1,13 +1,12 @@
language: go language: go
go: go:
- 1.5 - 1.9.x
- 1.6 - 1.10.x
- 1.7 - 1.11.x
- 1.8
- 1.9
- tip - tip
script: script:
- go vet ./...
- go test -timeout 1h ./... - go test -timeout 1h ./...
- go test -timeout 30m -race -run "TestDB_(Concurrent|GoleveldbIssue74)" ./leveldb - go test -timeout 30m -race -run "TestDB_(Concurrent|GoleveldbIssue74)" ./leveldb
+7
View File
@@ -0,0 +1,7 @@
module github.com/syndtr/goleveldb
require (
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/onsi/ginkgo v1.7.0 // indirect
github.com/onsi/gomega v1.4.3 // indirect
)
+25
View File
@@ -0,0 +1,25 @@
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-1
View File
@@ -331,7 +331,6 @@ func (r *Cache) delete(n *Node) bool {
return deleted return deleted
} }
} }
return false
} }
// Nodes returns number of 'cache node' in the map. // Nodes returns number of 'cache node' in the map.
+2 -2
View File
@@ -29,7 +29,7 @@ func (bytesComparer) Separator(dst, a, b []byte) []byte {
// Do not shorten if one string is a prefix of the other // Do not shorten if one string is a prefix of the other
} else if c := a[i]; c < 0xff && c+1 < b[i] { } else if c := a[i]; c < 0xff && c+1 < b[i] {
dst = append(dst, a[:i+1]...) dst = append(dst, a[:i+1]...)
dst[i]++ dst[len(dst)-1]++
return dst return dst
} }
return nil return nil
@@ -39,7 +39,7 @@ func (bytesComparer) Successor(dst, b []byte) []byte {
for i, c := range b { for i, c := range b {
if c != 0xff { if c != 0xff {
dst = append(dst, b[:i+1]...) dst = append(dst, b[:i+1]...)
dst[i]++ dst[len(dst)-1]++
return dst return dst
} }
} }
+1 -1
View File
@@ -36,7 +36,7 @@ type Comparer interface {
// by any users of this package. // by any users of this package.
Name() string Name() string
// Bellow are advanced functions used used to reduce the space requirements // Bellow are advanced functions used to reduce the space requirements
// for internal data structures such as index blocks. // for internal data structures such as index blocks.
// Separator appends a sequence of bytes x to dst such that a <= x && x < b, // Separator appends a sequence of bytes x to dst such that a <= x && x < b,
+77 -2
View File
@@ -35,6 +35,7 @@ type DB struct {
// Stats. Need 64-bit alignment. // Stats. Need 64-bit alignment.
cWriteDelay int64 // The cumulative duration of write delays cWriteDelay int64 // The cumulative duration of write delays
cWriteDelayN int32 // The cumulative number of write delays cWriteDelayN int32 // The cumulative number of write delays
inWritePaused int32 // The indicator whether write operation is paused by compaction
aliveSnaps, aliveIters int32 aliveSnaps, aliveIters int32
// Session. // Session.
@@ -181,7 +182,7 @@ func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
err = s.recover() err = s.recover()
if err != nil { if err != nil {
if !os.IsNotExist(err) || s.o.GetErrorIfMissing() { if !os.IsNotExist(err) || s.o.GetErrorIfMissing() || s.o.GetReadOnly() {
return return
} }
err = s.create() err = s.create()
@@ -871,6 +872,10 @@ func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
// DB. And a nil Range.Limit is treated as a key after all keys in // DB. And a nil Range.Limit is treated as a key after all keys in
// the DB. // the DB.
// //
// WARNING: Any slice returned by interator (e.g. slice returned by calling
// Iterator.Key() or Iterator.Key() methods), its content should not be modified
// unless noted otherwise.
//
// The iterator must be released after use, by calling Release method. // The iterator must be released after use, by calling Release method.
// //
// Also read Iterator documentation of the leveldb/iterator package. // Also read Iterator documentation of the leveldb/iterator package.
@@ -967,7 +972,8 @@ func (db *DB) GetProperty(name string) (value string, err error) {
float64(db.s.stor.writes())/1048576.0) float64(db.s.stor.writes())/1048576.0)
case p == "writedelay": case p == "writedelay":
writeDelayN, writeDelay := atomic.LoadInt32(&db.cWriteDelayN), time.Duration(atomic.LoadInt64(&db.cWriteDelay)) writeDelayN, writeDelay := atomic.LoadInt32(&db.cWriteDelayN), time.Duration(atomic.LoadInt64(&db.cWriteDelay))
value = fmt.Sprintf("DelayN:%d Delay:%s", writeDelayN, writeDelay) paused := atomic.LoadInt32(&db.inWritePaused) == 1
value = fmt.Sprintf("DelayN:%d Delay:%s Paused:%t", writeDelayN, writeDelay, paused)
case p == "sstables": case p == "sstables":
for level, tables := range v.levels { for level, tables := range v.levels {
value += fmt.Sprintf("--- level %d ---\n", level) value += fmt.Sprintf("--- level %d ---\n", level)
@@ -996,6 +1002,75 @@ func (db *DB) GetProperty(name string) (value string, err error) {
return return
} }
// DBStats is database statistics.
type DBStats struct {
WriteDelayCount int32
WriteDelayDuration time.Duration
WritePaused bool
AliveSnapshots int32
AliveIterators int32
IOWrite uint64
IORead uint64
BlockCacheSize int
OpenedTablesCount int
LevelSizes []int64
LevelTablesCounts []int
LevelRead []int64
LevelWrite []int64
LevelDurations []time.Duration
}
// Stats populates s with database statistics.
func (db *DB) Stats(s *DBStats) error {
err := db.ok()
if err != nil {
return err
}
s.IORead = db.s.stor.reads()
s.IOWrite = db.s.stor.writes()
s.WriteDelayCount = atomic.LoadInt32(&db.cWriteDelayN)
s.WriteDelayDuration = time.Duration(atomic.LoadInt64(&db.cWriteDelay))
s.WritePaused = atomic.LoadInt32(&db.inWritePaused) == 1
s.OpenedTablesCount = db.s.tops.cache.Size()
if db.s.tops.bcache != nil {
s.BlockCacheSize = db.s.tops.bcache.Size()
} else {
s.BlockCacheSize = 0
}
s.AliveIterators = atomic.LoadInt32(&db.aliveIters)
s.AliveSnapshots = atomic.LoadInt32(&db.aliveSnaps)
s.LevelDurations = s.LevelDurations[:0]
s.LevelRead = s.LevelRead[:0]
s.LevelWrite = s.LevelWrite[:0]
s.LevelSizes = s.LevelSizes[:0]
s.LevelTablesCounts = s.LevelTablesCounts[:0]
v := db.s.version()
defer v.release()
for level, tables := range v.levels {
duration, read, write := db.compStats.getStat(level)
if len(tables) == 0 && duration == 0 {
continue
}
s.LevelDurations = append(s.LevelDurations, duration)
s.LevelRead = append(s.LevelRead, read)
s.LevelWrite = append(s.LevelWrite, write)
s.LevelSizes = append(s.LevelSizes, tables.size())
s.LevelTablesCounts = append(s.LevelTablesCounts, len(tables))
}
return nil
}
// SizeOf calculates approximate sizes of the given key ranges. // SizeOf calculates approximate sizes of the given key ranges.
// The length of the returned sizes are equal with the length of the given // The length of the returned sizes are equal with the length of the given
// ranges. The returned sizes measure storage space usage, so if the user // ranges. The returned sizes measure storage space usage, so if the user
+39 -11
View File
@@ -640,6 +640,16 @@ func (db *DB) tableNeedCompaction() bool {
return v.needCompaction() return v.needCompaction()
} }
// resumeWrite returns an indicator whether we should resume write operation if enough level0 files are compacted.
func (db *DB) resumeWrite() bool {
v := db.s.version()
defer v.release()
if v.tLen(0) < db.s.o.GetWriteL0PauseTrigger() {
return true
}
return false
}
func (db *DB) pauseCompaction(ch chan<- struct{}) { func (db *DB) pauseCompaction(ch chan<- struct{}) {
select { select {
case ch <- struct{}{}: case ch <- struct{}{}:
@@ -653,6 +663,7 @@ type cCmd interface {
} }
type cAuto struct { type cAuto struct {
// Note for table compaction, an non-empty ackC represents it's a compaction waiting command.
ackC chan<- error ackC chan<- error
} }
@@ -765,8 +776,10 @@ func (db *DB) mCompaction() {
} }
func (db *DB) tCompaction() { func (db *DB) tCompaction() {
var x cCmd var (
var ackQ []cCmd x cCmd
waitQ []cCmd
)
defer func() { defer func() {
if x := recover(); x != nil { if x := recover(); x != nil {
@@ -774,9 +787,9 @@ func (db *DB) tCompaction() {
panic(x) panic(x)
} }
} }
for i := range ackQ { for i := range waitQ {
ackQ[i].ack(ErrClosed) waitQ[i].ack(ErrClosed)
ackQ[i] = nil waitQ[i] = nil
} }
if x != nil { if x != nil {
x.ack(ErrClosed) x.ack(ErrClosed)
@@ -795,12 +808,20 @@ func (db *DB) tCompaction() {
return return
default: default:
} }
} else { // Resume write operation as soon as possible.
for i := range ackQ { if len(waitQ) > 0 && db.resumeWrite() {
ackQ[i].ack(nil) for i := range waitQ {
ackQ[i] = nil waitQ[i].ack(nil)
waitQ[i] = nil
}
waitQ = waitQ[:0]
} }
ackQ = ackQ[:0] } else {
for i := range waitQ {
waitQ[i].ack(nil)
waitQ[i] = nil
}
waitQ = waitQ[:0]
select { select {
case x = <-db.tcompCmdC: case x = <-db.tcompCmdC:
case ch := <-db.tcompPauseC: case ch := <-db.tcompPauseC:
@@ -813,7 +834,14 @@ func (db *DB) tCompaction() {
if x != nil { if x != nil {
switch cmd := x.(type) { switch cmd := x.(type) {
case cAuto: case cAuto:
ackQ = append(ackQ, x) if cmd.ackC != nil {
// Check the write pause state before caching it.
if db.resumeWrite() {
x.ack(nil)
} else {
waitQ = append(waitQ, x)
}
}
case cRange: case cRange:
x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max)) x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
default: default:
+4
View File
@@ -142,6 +142,10 @@ func (snap *Snapshot) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error)
// DB. And a nil Range.Limit is treated as a key after all keys in // DB. And a nil Range.Limit is treated as a key after all keys in
// the DB. // the DB.
// //
// WARNING: Any slice returned by interator (e.g. slice returned by calling
// Iterator.Key() or Iterator.Value() methods), its content should not be
// modified unless noted otherwise.
//
// The iterator must be released after use, by calling Release method. // The iterator must be released after use, by calling Release method.
// Releasing the snapshot doesn't mean releasing the iterator too, the // Releasing the snapshot doesn't mean releasing the iterator too, the
// iterator would be still valid until released. // iterator would be still valid until released.
+2 -2
View File
@@ -431,7 +431,7 @@ func (h *dbHarness) compactRange(min, max string) {
func (h *dbHarness) sizeOf(start, limit string) int64 { func (h *dbHarness) sizeOf(start, limit string) int64 {
sz, err := h.db.SizeOf([]util.Range{ sz, err := h.db.SizeOf([]util.Range{
{[]byte(start), []byte(limit)}, {Start: []byte(start), Limit: []byte(limit)},
}) })
if err != nil { if err != nil {
h.t.Error("SizeOf: got error: ", err) h.t.Error("SizeOf: got error: ", err)
@@ -1592,7 +1592,7 @@ func TestDB_ClosedIsClosed(t *testing.T) {
_, err = db.GetProperty("leveldb.stats") _, err = db.GetProperty("leveldb.stats")
assertErr(t, err, true) assertErr(t, err, true)
_, err = db.SizeOf([]util.Range{{[]byte("a"), []byte("z")}}) _, err = db.SizeOf([]util.Range{{Start: []byte("a"), Limit: []byte("z")}})
assertErr(t, err, true) assertErr(t, err, true)
assertErr(t, db.CompactRange(util.Range{}), true) assertErr(t, db.CompactRange(util.Range{}), true)
+4
View File
@@ -69,6 +69,10 @@ func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
// DB. And a nil Range.Limit is treated as a key after all keys in // DB. And a nil Range.Limit is treated as a key after all keys in
// the DB. // the DB.
// //
// WARNING: Any slice returned by interator (e.g. slice returned by calling
// Iterator.Key() or Iterator.Key() methods), its content should not be modified
// unless noted otherwise.
//
// The iterator must be released after use, by calling Release method. // The iterator must be released after use, by calling Release method.
// //
// Also read Iterator documentation of the leveldb/iterator package. // Also read Iterator documentation of the leveldb/iterator package.
+1 -1
View File
@@ -84,7 +84,7 @@ func (db *DB) checkAndCleanFiles() error {
var mfds []storage.FileDesc var mfds []storage.FileDesc
for num, present := range tmap { for num, present := range tmap {
if !present { if !present {
mfds = append(mfds, storage.FileDesc{storage.TypeTable, num}) mfds = append(mfds, storage.FileDesc{Type: storage.TypeTable, Num: num})
db.logf("db@janitor table missing @%d", num) db.logf("db@janitor table missing @%d", num)
} }
} }
+4
View File
@@ -89,7 +89,11 @@ func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
return false return false
case tLen >= pauseTrigger: case tLen >= pauseTrigger:
delayed = true delayed = true
// Set the write paused flag explicitly.
atomic.StoreInt32(&db.inWritePaused, 1)
err = db.compTriggerWait(db.tcompCmdC) err = db.compTriggerWait(db.tcompCmdC)
// Unset the write paused flag.
atomic.StoreInt32(&db.inWritePaused, 0)
if err != nil { if err != nil {
return false return false
} }
+2 -2
View File
@@ -40,11 +40,11 @@ type IteratorSeeker interface {
Seek(key []byte) bool Seek(key []byte) bool
// Next moves the iterator to the next key/value pair. // Next moves the iterator to the next key/value pair.
// It returns whether the iterator is exhausted. // It returns false if the iterator is exhausted.
Next() bool Next() bool
// Prev moves the iterator to the previous key/value pair. // Prev moves the iterator to the previous key/value pair.
// It returns whether the iterator is exhausted. // It returns false if the iterator is exhausted.
Prev() bool Prev() bool
} }
+4
View File
@@ -397,6 +397,10 @@ func (p *DB) Find(key []byte) (rkey, value []byte, err error) {
// DB. And a nil Range.Limit is treated as a key after all keys in // DB. And a nil Range.Limit is treated as a key after all keys in
// the DB. // the DB.
// //
// WARNING: Any slice returned by interator (e.g. slice returned by calling
// Iterator.Key() or Iterator.Key() methods), its content should not be modified
// unless noted otherwise.
//
// The iterator must be released after use, by calling Release method. // The iterator must be released after use, by calling Release method.
// //
// Also read Iterator documentation of the leveldb/iterator package. // Also read Iterator documentation of the leveldb/iterator package.
+13
View File
@@ -158,6 +158,12 @@ type Options struct {
// The default value is 8MiB. // The default value is 8MiB.
BlockCacheCapacity int BlockCacheCapacity int
// BlockCacheEvictRemoved allows enable forced-eviction on cached block belonging
// to removed 'sorted table'.
//
// The default if false.
BlockCacheEvictRemoved bool
// BlockRestartInterval is the number of keys between restart points for // BlockRestartInterval is the number of keys between restart points for
// delta encoding of keys. // delta encoding of keys.
// //
@@ -384,6 +390,13 @@ func (o *Options) GetBlockCacheCapacity() int {
return o.BlockCacheCapacity return o.BlockCacheCapacity
} }
func (o *Options) GetBlockCacheEvictRemoved() bool {
if o == nil {
return false
}
return o.BlockCacheEvictRemoved
}
func (o *Options) GetBlockRestartInterval() int { func (o *Options) GetBlockRestartInterval() int {
if o == nil || o.BlockRestartInterval <= 0 { if o == nil || o.BlockRestartInterval <= 0 {
return DefaultBlockRestartInterval return DefaultBlockRestartInterval
+2 -2
View File
@@ -36,7 +36,7 @@ func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf
func (s *session) newTemp() storage.FileDesc { func (s *session) newTemp() storage.FileDesc {
num := atomic.AddInt64(&s.stTempFileNum, 1) - 1 num := atomic.AddInt64(&s.stTempFileNum, 1) - 1
return storage.FileDesc{storage.TypeTemp, num} return storage.FileDesc{Type: storage.TypeTemp, Num: num}
} }
func (s *session) addFileRef(fd storage.FileDesc, ref int) int { func (s *session) addFileRef(fd storage.FileDesc, ref int) int {
@@ -190,7 +190,7 @@ func (s *session) recordCommited(rec *sessionRecord) {
// Create a new manifest file; need external synchronization. // Create a new manifest file; need external synchronization.
func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
fd := storage.FileDesc{storage.TypeManifest, s.allocFileNum()} fd := storage.FileDesc{Type: storage.TypeManifest, Num: s.allocFileNum()}
writer, err := s.stor.Create(fd) writer, err := s.stor.Create(fd)
if err != nil { if err != nil {
return return
+191 -119
View File
@@ -9,10 +9,12 @@ package storage
import ( import (
"errors" "errors"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"runtime" "runtime"
"sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@@ -42,6 +44,30 @@ func (lock *fileStorageLock) Unlock() {
} }
} }
type int64Slice []int64
func (p int64Slice) Len() int { return len(p) }
func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func writeFileSynced(filename string, data []byte, perm os.FileMode) error {
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return err
}
n, err := f.Write(data)
if err == nil && n < len(data) {
err = io.ErrShortWrite
}
if err1 := f.Sync(); err == nil {
err = err1
}
if err1 := f.Close(); err == nil {
err = err1
}
return err
}
const logSizeThreshold = 1024 * 1024 // 1 MiB const logSizeThreshold = 1024 * 1024 // 1 MiB
// fileStorage is a file-system backed storage. // fileStorage is a file-system backed storage.
@@ -60,7 +86,7 @@ type fileStorage struct {
day int day int
} }
// OpenFile returns a new filesytem-backed storage implementation with the given // OpenFile returns a new filesystem-backed storage implementation with the given
// path. This also acquire a file lock, so any subsequent attempt to open the // path. This also acquire a file lock, so any subsequent attempt to open the
// same path will fail. // same path will fail.
// //
@@ -189,7 +215,8 @@ func (fs *fileStorage) doLog(t time.Time, str string) {
// write // write
fs.buf = append(fs.buf, []byte(str)...) fs.buf = append(fs.buf, []byte(str)...)
fs.buf = append(fs.buf, '\n') fs.buf = append(fs.buf, '\n')
fs.logw.Write(fs.buf) n, _ := fs.logw.Write(fs.buf)
fs.logSize += int64(n)
} }
func (fs *fileStorage) Log(str string) { func (fs *fileStorage) Log(str string) {
@@ -210,7 +237,46 @@ func (fs *fileStorage) log(str string) {
} }
} }
func (fs *fileStorage) SetMeta(fd FileDesc) (err error) { func (fs *fileStorage) setMeta(fd FileDesc) error {
content := fsGenName(fd) + "\n"
// Check and backup old CURRENT file.
currentPath := filepath.Join(fs.path, "CURRENT")
if _, err := os.Stat(currentPath); err == nil {
b, err := ioutil.ReadFile(currentPath)
if err != nil {
fs.log(fmt.Sprintf("backup CURRENT: %v", err))
return err
}
if string(b) == content {
// Content not changed, do nothing.
return nil
}
if err := writeFileSynced(currentPath+".bak", b, 0644); err != nil {
fs.log(fmt.Sprintf("backup CURRENT: %v", err))
return err
}
} else if !os.IsNotExist(err) {
return err
}
path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num)
if err := writeFileSynced(path, []byte(content), 0644); err != nil {
fs.log(fmt.Sprintf("create CURRENT.%d: %v", fd.Num, err))
return err
}
// Replace CURRENT file.
if err := rename(path, currentPath); err != nil {
fs.log(fmt.Sprintf("rename CURRENT.%d: %v", fd.Num, err))
return err
}
// Sync root directory.
if err := syncDir(fs.path); err != nil {
fs.log(fmt.Sprintf("syncDir: %v", err))
return err
}
return nil
}
func (fs *fileStorage) SetMeta(fd FileDesc) error {
if !FileDescOk(fd) { if !FileDescOk(fd) {
return ErrInvalidFile return ErrInvalidFile
} }
@@ -223,44 +289,10 @@ func (fs *fileStorage) SetMeta(fd FileDesc) (err error) {
if fs.open < 0 { if fs.open < 0 {
return ErrClosed return ErrClosed
} }
defer func() { return fs.setMeta(fd)
if err != nil {
fs.log(fmt.Sprintf("CURRENT: %v", err))
}
}()
path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num)
w, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return
}
_, err = fmt.Fprintln(w, fsGenName(fd))
if err != nil {
fs.log(fmt.Sprintf("write CURRENT.%d: %v", fd.Num, err))
return
}
if err = w.Sync(); err != nil {
fs.log(fmt.Sprintf("flush CURRENT.%d: %v", fd.Num, err))
return
}
if err = w.Close(); err != nil {
fs.log(fmt.Sprintf("close CURRENT.%d: %v", fd.Num, err))
return
}
if err != nil {
return
}
if err = rename(path, filepath.Join(fs.path, "CURRENT")); err != nil {
fs.log(fmt.Sprintf("rename CURRENT.%d: %v", fd.Num, err))
return
}
// Sync root directory.
if err = syncDir(fs.path); err != nil {
fs.log(fmt.Sprintf("syncDir: %v", err))
}
return
} }
func (fs *fileStorage) GetMeta() (fd FileDesc, err error) { func (fs *fileStorage) GetMeta() (FileDesc, error) {
fs.mu.Lock() fs.mu.Lock()
defer fs.mu.Unlock() defer fs.mu.Unlock()
if fs.open < 0 { if fs.open < 0 {
@@ -268,7 +300,7 @@ func (fs *fileStorage) GetMeta() (fd FileDesc, err error) {
} }
dir, err := os.Open(fs.path) dir, err := os.Open(fs.path)
if err != nil { if err != nil {
return return FileDesc{}, err
} }
names, err := dir.Readdirnames(0) names, err := dir.Readdirnames(0)
// Close the dir first before checking for Readdirnames error. // Close the dir first before checking for Readdirnames error.
@@ -276,94 +308,134 @@ func (fs *fileStorage) GetMeta() (fd FileDesc, err error) {
fs.log(fmt.Sprintf("close dir: %v", ce)) fs.log(fmt.Sprintf("close dir: %v", ce))
} }
if err != nil { if err != nil {
return return FileDesc{}, err
} }
// Find latest CURRENT file. // Try this in order:
var rem []string // - CURRENT.[0-9]+ ('pending rename' file, descending order)
var pend bool // - CURRENT
var cerr error // - CURRENT.bak
//
// Skip corrupted file or file that point to a missing target file.
type currentFile struct {
name string
fd FileDesc
}
tryCurrent := func(name string) (*currentFile, error) {
b, err := ioutil.ReadFile(filepath.Join(fs.path, name))
if err != nil {
if os.IsNotExist(err) {
err = os.ErrNotExist
}
return nil, err
}
var fd FileDesc
if len(b) < 1 || b[len(b)-1] != '\n' || !fsParseNamePtr(string(b[:len(b)-1]), &fd) {
fs.log(fmt.Sprintf("%s: corrupted content: %q", name, b))
err := &ErrCorrupted{
Err: errors.New("leveldb/storage: corrupted or incomplete CURRENT file"),
}
return nil, err
}
if _, err := os.Stat(filepath.Join(fs.path, fsGenName(fd))); err != nil {
if os.IsNotExist(err) {
fs.log(fmt.Sprintf("%s: missing target file: %s", name, fd))
err = os.ErrNotExist
}
return nil, err
}
return &currentFile{name: name, fd: fd}, nil
}
tryCurrents := func(names []string) (*currentFile, error) {
var (
cur *currentFile
// Last corruption error.
lastCerr error
)
for _, name := range names {
var err error
cur, err = tryCurrent(name)
if err == nil {
break
} else if err == os.ErrNotExist {
// Fallback to the next file.
} else if isCorrupted(err) {
lastCerr = err
// Fallback to the next file.
} else {
// In case the error is due to permission, etc.
return nil, err
}
}
if cur == nil {
err := os.ErrNotExist
if lastCerr != nil {
err = lastCerr
}
return nil, err
}
return cur, nil
}
// Try 'pending rename' files.
var nums []int64
for _, name := range names { for _, name := range names {
if strings.HasPrefix(name, "CURRENT") { if strings.HasPrefix(name, "CURRENT.") && name != "CURRENT.bak" {
pend1 := len(name) > 7 i, err := strconv.ParseInt(name[8:], 10, 64)
var pendNum int64 if err == nil {
// Make sure it is valid name for a CURRENT file, otherwise skip it. nums = append(nums, i)
if pend1 {
if name[7] != '.' || len(name) < 9 {
fs.log(fmt.Sprintf("skipping %s: invalid file name", name))
continue
}
var e1 error
if pendNum, e1 = strconv.ParseInt(name[8:], 10, 0); e1 != nil {
fs.log(fmt.Sprintf("skipping %s: invalid file num: %v", name, e1))
continue
}
} }
path := filepath.Join(fs.path, name) }
r, e1 := os.OpenFile(path, os.O_RDONLY, 0) }
if e1 != nil { var (
return FileDesc{}, e1 pendCur *currentFile
} pendErr = os.ErrNotExist
b, e1 := ioutil.ReadAll(r) pendNames []string
if e1 != nil { )
r.Close() if len(nums) > 0 {
return FileDesc{}, e1 sort.Sort(sort.Reverse(int64Slice(nums)))
} pendNames = make([]string, len(nums))
var fd1 FileDesc for i, num := range nums {
if len(b) < 1 || b[len(b)-1] != '\n' || !fsParseNamePtr(string(b[:len(b)-1]), &fd1) { pendNames[i] = fmt.Sprintf("CURRENT.%d", num)
fs.log(fmt.Sprintf("skipping %s: corrupted or incomplete", name)) }
if pend1 { pendCur, pendErr = tryCurrents(pendNames)
rem = append(rem, name) if pendErr != nil && pendErr != os.ErrNotExist && !isCorrupted(pendErr) {
} return FileDesc{}, pendErr
if !pend1 || cerr == nil { }
metaFd, _ := fsParseName(name) }
cerr = &ErrCorrupted{
Fd: metaFd, // Try CURRENT and CURRENT.bak.
Err: errors.New("leveldb/storage: corrupted or incomplete meta file"), curCur, curErr := tryCurrents([]string{"CURRENT", "CURRENT.bak"})
if curErr != nil && curErr != os.ErrNotExist && !isCorrupted(curErr) {
return FileDesc{}, curErr
}
// pendCur takes precedence, but guards against obsolete pendCur.
if pendCur != nil && (curCur == nil || pendCur.fd.Num > curCur.fd.Num) {
curCur = pendCur
}
if curCur != nil {
// Restore CURRENT file to proper state.
if !fs.readOnly && (curCur.name != "CURRENT" || len(pendNames) != 0) {
// Ignore setMeta errors, however don't delete obsolete files if we
// catch error.
if err := fs.setMeta(curCur.fd); err == nil {
// Remove 'pending rename' files.
for _, name := range pendNames {
if err := os.Remove(filepath.Join(fs.path, name)); err != nil {
fs.log(fmt.Sprintf("remove %s: %v", name, err))
} }
} }
} else if pend1 && pendNum != fd1.Num {
fs.log(fmt.Sprintf("skipping %s: inconsistent pending-file num: %d vs %d", name, pendNum, fd1.Num))
rem = append(rem, name)
} else if fd1.Num < fd.Num {
fs.log(fmt.Sprintf("skipping %s: obsolete", name))
if pend1 {
rem = append(rem, name)
}
} else {
fd = fd1
pend = pend1
}
if err := r.Close(); err != nil {
fs.log(fmt.Sprintf("close %s: %v", name, err))
} }
} }
return curCur.fd, nil
} }
// Don't remove any files if there is no valid CURRENT file.
if fd.Zero() { // Nothing found.
if cerr != nil { if isCorrupted(pendErr) {
err = cerr return FileDesc{}, pendErr
} else {
err = os.ErrNotExist
}
return
} }
if !fs.readOnly { return FileDesc{}, curErr
// Rename pending CURRENT file to an effective CURRENT.
if pend {
path := fmt.Sprintf("%s.%d", filepath.Join(fs.path, "CURRENT"), fd.Num)
if err := rename(path, filepath.Join(fs.path, "CURRENT")); err != nil {
fs.log(fmt.Sprintf("CURRENT.%d -> CURRENT: %v", fd.Num, err))
}
}
// Remove obsolete or incomplete pending CURRENT files.
for _, name := range rem {
path := filepath.Join(fs.path, name)
if err := os.Remove(path); err != nil {
fs.log(fmt.Sprintf("remove %s: %v", name, err))
}
}
}
return
} }
func (fs *fileStorage) List(ft FileType) (fds []FileDesc, err error) { func (fs *fileStorage) List(ft FileType) (fds []FileDesc, err error) {
+242 -18
View File
@@ -8,8 +8,11 @@ package storage
import ( import (
"fmt" "fmt"
"io/ioutil"
"math/rand"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"testing" "testing"
) )
@@ -53,6 +56,15 @@ var invalidCases = []string{
"100.lop", "100.lop",
} }
func tempDir(t *testing.T) string {
dir, err := ioutil.TempDir("", "goleveldb-")
if err != nil {
t.Fatal(t)
}
t.Log("Using temp-dir:", dir)
return dir
}
func TestFileStorage_CreateFileName(t *testing.T) { func TestFileStorage_CreateFileName(t *testing.T) {
for _, c := range cases { for _, c := range cases {
if name := fsGenName(FileDesc{c.ftype, c.num}); name != c.name { if name := fsGenName(FileDesc{c.ftype, c.num}); name != c.name {
@@ -61,6 +73,224 @@ func TestFileStorage_CreateFileName(t *testing.T) {
} }
} }
func TestFileStorage_MetaSetGet(t *testing.T) {
temp := tempDir(t)
fs, err := OpenFile(temp, false)
if err != nil {
t.Fatal("OpenFile: got error: ", err)
}
for i := 0; i < 10; i++ {
num := rand.Int63()
fd := FileDesc{Type: TypeManifest, Num: num}
w, err := fs.Create(fd)
if err != nil {
t.Fatalf("Create(%d): got error: %v", i, err)
}
w.Write([]byte("TEST"))
w.Close()
if err := fs.SetMeta(fd); err != nil {
t.Fatalf("SetMeta(%d): got error: %v", i, err)
}
rfd, err := fs.GetMeta()
if err != nil {
t.Fatalf("GetMeta(%d): got error: %v", i, err)
}
if fd != rfd {
t.Fatalf("Invalid meta (%d): got '%s', want '%s'", i, rfd, fd)
}
}
os.RemoveAll(temp)
}
func TestFileStorage_Meta(t *testing.T) {
type current struct {
num int64
backup bool
current bool
manifest bool
corrupt bool
}
type testCase struct {
currents []current
notExist bool
corrupt bool
expect int64
}
cases := []testCase{
{
currents: []current{
{num: 2, backup: true, manifest: true},
{num: 1, current: true},
},
expect: 2,
},
{
currents: []current{
{num: 2, backup: true, manifest: true},
{num: 1, current: true, manifest: true},
},
expect: 1,
},
{
currents: []current{
{num: 2, manifest: true},
{num: 3, manifest: true},
{num: 4, current: true, manifest: true},
},
expect: 4,
},
{
currents: []current{
{num: 2, manifest: true},
{num: 3, manifest: true},
{num: 4, current: true, manifest: true, corrupt: true},
},
expect: 3,
},
{
currents: []current{
{num: 2, manifest: true},
{num: 3, manifest: true},
{num: 5, current: true, manifest: true, corrupt: true},
{num: 4, backup: true, manifest: true},
},
expect: 4,
},
{
currents: []current{
{num: 4, manifest: true},
{num: 3, manifest: true},
{num: 2, current: true, manifest: true},
},
expect: 4,
},
{
currents: []current{
{num: 4, manifest: true, corrupt: true},
{num: 3, manifest: true},
{num: 2, current: true, manifest: true},
},
expect: 3,
},
{
currents: []current{
{num: 4, manifest: true, corrupt: true},
{num: 3, manifest: true, corrupt: true},
{num: 2, current: true, manifest: true},
},
expect: 2,
},
{
currents: []current{
{num: 4},
{num: 3, manifest: true},
{num: 2, current: true, manifest: true},
},
expect: 3,
},
{
currents: []current{
{num: 4},
{num: 3, manifest: true},
{num: 6, current: true},
{num: 5, backup: true, manifest: true},
},
expect: 5,
},
{
currents: []current{
{num: 4},
{num: 3},
{num: 6, current: true},
{num: 5, backup: true},
},
notExist: true,
},
{
currents: []current{
{num: 4, corrupt: true},
{num: 3},
{num: 6, current: true},
{num: 5, backup: true},
},
corrupt: true,
},
}
for i, tc := range cases {
t.Logf("Test-%d", i)
temp := tempDir(t)
fs, err := OpenFile(temp, false)
if err != nil {
t.Fatal("OpenFile: got error: ", err)
}
for _, cur := range tc.currents {
var curName string
switch {
case cur.current:
curName = "CURRENT"
case cur.backup:
curName = "CURRENT.bak"
default:
curName = fmt.Sprintf("CURRENT.%d", cur.num)
}
fd := FileDesc{Type: TypeManifest, Num: cur.num}
content := fmt.Sprintf("%s\n", fsGenName(fd))
if cur.corrupt {
content = content[:len(content)-1-rand.Intn(3)]
}
if err := ioutil.WriteFile(filepath.Join(temp, curName), []byte(content), 0644); err != nil {
t.Fatal(err)
}
if cur.manifest {
w, err := fs.Create(fd)
if err != nil {
t.Fatal(err)
}
if _, err := w.Write([]byte("TEST")); err != nil {
t.Fatal(err)
}
w.Close()
}
}
ret, err := fs.GetMeta()
if tc.notExist {
if err != os.ErrNotExist {
t.Fatalf("expect ErrNotExist, got: %v", err)
}
} else if tc.corrupt {
if !isCorrupted(err) {
t.Fatalf("expect ErrCorrupted, got: %v", err)
}
} else {
if err != nil {
t.Fatal(err)
}
if ret.Type != TypeManifest {
t.Fatalf("expecting manifest, got: %s", ret.Type)
}
if ret.Num != tc.expect {
t.Fatalf("invalid num, expect=%d got=%d", tc.expect, ret.Num)
}
fis, err := ioutil.ReadDir(temp)
if err != nil {
t.Fatal(err)
}
for _, fi := range fis {
if strings.HasPrefix(fi.Name(), "CURRENT") {
switch fi.Name() {
case "CURRENT", "CURRENT.bak":
default:
t.Fatalf("found rouge CURRENT file: %s", fi.Name())
}
}
t.Logf("-> %s", fi.Name())
}
}
os.RemoveAll(temp)
}
}
func TestFileStorage_ParseFileName(t *testing.T) { func TestFileStorage_ParseFileName(t *testing.T) {
for _, c := range cases { for _, c := range cases {
for _, name := range append([]string{c.name}, c.oldName...) { for _, name := range append([]string{c.name}, c.oldName...) {
@@ -88,18 +318,15 @@ func TestFileStorage_InvalidFileName(t *testing.T) {
} }
func TestFileStorage_Locking(t *testing.T) { func TestFileStorage_Locking(t *testing.T) {
path := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldb-testrwlock-%d", os.Getuid())) temp := tempDir(t)
if err := os.RemoveAll(path); err != nil && !os.IsNotExist(err) { defer os.RemoveAll(temp)
t.Fatal("RemoveAll: got error: ", err)
}
defer os.RemoveAll(path)
p1, err := OpenFile(path, false) p1, err := OpenFile(temp, false)
if err != nil { if err != nil {
t.Fatal("OpenFile(1): got error: ", err) t.Fatal("OpenFile(1): got error: ", err)
} }
p2, err := OpenFile(path, false) p2, err := OpenFile(temp, false)
if err != nil { if err != nil {
t.Logf("OpenFile(2): got error: %s (expected)", err) t.Logf("OpenFile(2): got error: %s (expected)", err)
} else { } else {
@@ -110,7 +337,7 @@ func TestFileStorage_Locking(t *testing.T) {
p1.Close() p1.Close()
p3, err := OpenFile(path, false) p3, err := OpenFile(temp, false)
if err != nil { if err != nil {
t.Fatal("OpenFile(3): got error: ", err) t.Fatal("OpenFile(3): got error: ", err)
} }
@@ -134,18 +361,15 @@ func TestFileStorage_Locking(t *testing.T) {
} }
func TestFileStorage_ReadOnlyLocking(t *testing.T) { func TestFileStorage_ReadOnlyLocking(t *testing.T) {
path := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldb-testrolock-%d", os.Getuid())) temp := tempDir(t)
if err := os.RemoveAll(path); err != nil && !os.IsNotExist(err) { defer os.RemoveAll(temp)
t.Fatal("RemoveAll: got error: ", err)
}
defer os.RemoveAll(path)
p1, err := OpenFile(path, false) p1, err := OpenFile(temp, false)
if err != nil { if err != nil {
t.Fatal("OpenFile(1): got error: ", err) t.Fatal("OpenFile(1): got error: ", err)
} }
_, err = OpenFile(path, true) _, err = OpenFile(temp, true)
if err != nil { if err != nil {
t.Logf("OpenFile(2): got error: %s (expected)", err) t.Logf("OpenFile(2): got error: %s (expected)", err)
} else { } else {
@@ -154,17 +378,17 @@ func TestFileStorage_ReadOnlyLocking(t *testing.T) {
p1.Close() p1.Close()
p3, err := OpenFile(path, true) p3, err := OpenFile(temp, true)
if err != nil { if err != nil {
t.Fatal("OpenFile(3): got error: ", err) t.Fatal("OpenFile(3): got error: ", err)
} }
p4, err := OpenFile(path, true) p4, err := OpenFile(temp, true)
if err != nil { if err != nil {
t.Fatal("OpenFile(4): got error: ", err) t.Fatal("OpenFile(4): got error: ", err)
} }
_, err = OpenFile(path, false) _, err = OpenFile(temp, false)
if err != nil { if err != nil {
t.Logf("OpenFile(5): got error: %s (expected)", err) t.Logf("OpenFile(5): got error: %s (expected)", err)
} else { } else {
@@ -67,13 +67,25 @@ func isErrInvalid(err error) bool {
if err == os.ErrInvalid { if err == os.ErrInvalid {
return true return true
} }
// Go < 1.8
if syserr, ok := err.(*os.SyscallError); ok && syserr.Err == syscall.EINVAL { if syserr, ok := err.(*os.SyscallError); ok && syserr.Err == syscall.EINVAL {
return true return true
} }
// Go >= 1.8 returns *os.PathError instead
if patherr, ok := err.(*os.PathError); ok && patherr.Err == syscall.EINVAL {
return true
}
return false return false
} }
func syncDir(name string) error { func syncDir(name string) error {
// As per fsync manpage, Linux seems to expect fsync on directory, however
// some system don't support this, so we will ignore syscall.EINVAL.
//
// From fsync(2):
// Calling fsync() does not necessarily ensure that the entry in the
// directory containing the file has also reached disk. For that an
// explicit fsync() on a file descriptor for the directory is also needed.
f, err := os.Open(name) f, err := os.Open(name)
if err != nil { if err != nil {
return err return err
+8
View File
@@ -55,6 +55,14 @@ type ErrCorrupted struct {
Err error Err error
} }
func isCorrupted(err error) bool {
switch err.(type) {
case *ErrCorrupted:
return true
}
return false
}
func (e *ErrCorrupted) Error() string { func (e *ErrCorrupted) Error() string {
if !e.Fd.Zero() { if !e.Fd.Zero() {
return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd) return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd)
+16 -14
View File
@@ -78,7 +78,7 @@ func newTableFile(fd storage.FileDesc, size int64, imin, imax internalKey) *tFil
} }
func tableFileFromRecord(r atRecord) *tFile { func tableFileFromRecord(r atRecord) *tFile {
return newTableFile(storage.FileDesc{storage.TypeTable, r.num}, r.size, r.imin, r.imax) return newTableFile(storage.FileDesc{Type: storage.TypeTable, Num: r.num}, r.size, r.imin, r.imax)
} }
// tFiles hold multiple tFile. // tFiles hold multiple tFile.
@@ -290,16 +290,17 @@ func (x *tFilesSortByNum) Less(i, j int) bool {
// Table operations. // Table operations.
type tOps struct { type tOps struct {
s *session s *session
noSync bool noSync bool
cache *cache.Cache evictRemoved bool
bcache *cache.Cache cache *cache.Cache
bpool *util.BufferPool bcache *cache.Cache
bpool *util.BufferPool
} }
// Creates an empty table and returns table writer. // Creates an empty table and returns table writer.
func (t *tOps) create() (*tWriter, error) { func (t *tOps) create() (*tWriter, error) {
fd := storage.FileDesc{storage.TypeTable, t.s.allocFileNum()} fd := storage.FileDesc{Type: storage.TypeTable, Num: t.s.allocFileNum()}
fw, err := t.s.stor.Create(fd) fw, err := t.s.stor.Create(fd)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -422,7 +423,7 @@ func (t *tOps) remove(f *tFile) {
} else { } else {
t.s.logf("table@remove removed @%d", f.fd.Num) t.s.logf("table@remove removed @%d", f.fd.Num)
} }
if t.bcache != nil { if t.evictRemoved && t.bcache != nil {
t.bcache.EvictNS(uint64(f.fd.Num)) t.bcache.EvictNS(uint64(f.fd.Num))
} }
}) })
@@ -451,7 +452,7 @@ func newTableOps(s *session) *tOps {
if !s.o.GetDisableBlockCache() { if !s.o.GetDisableBlockCache() {
var bcacher cache.Cacher var bcacher cache.Cacher
if s.o.GetBlockCacheCapacity() > 0 { if s.o.GetBlockCacheCapacity() > 0 {
bcacher = cache.NewLRU(s.o.GetBlockCacheCapacity()) bcacher = s.o.GetBlockCacher().New(s.o.GetBlockCacheCapacity())
} }
bcache = cache.NewCache(bcacher) bcache = cache.NewCache(bcacher)
} }
@@ -459,11 +460,12 @@ func newTableOps(s *session) *tOps {
bpool = util.NewBufferPool(s.o.GetBlockSize() + 5) bpool = util.NewBufferPool(s.o.GetBlockSize() + 5)
} }
return &tOps{ return &tOps{
s: s, s: s,
noSync: s.o.GetNoSync(), noSync: s.o.GetNoSync(),
cache: cache.NewCache(cacher), evictRemoved: s.o.GetBlockCacheEvictRemoved(),
bcache: bcache, cache: cache.NewCache(cacher),
bpool: bpool, bcache: bcache,
bpool: bpool,
} }
} }
+4
View File
@@ -787,6 +787,10 @@ func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChe
// table. And a nil Range.Limit is treated as a key after all keys in // table. And a nil Range.Limit is treated as a key after all keys in
// the table. // the table.
// //
// WARNING: Any slice returned by interator (e.g. slice returned by calling
// Iterator.Key() or Iterator.Key() methods), its content should not be modified
// unless noted otherwise.
//
// The returned iterator is not safe for concurrent use and should be released // The returned iterator is not safe for concurrent use and should be released
// after use. // after use.
// //
+4 -1
View File
@@ -147,7 +147,10 @@ func packFile(fd storage.FileDesc) uint64 {
} }
func unpackFile(x uint64) storage.FileDesc { func unpackFile(x uint64) storage.FileDesc {
return storage.FileDesc{storage.FileType(x) & storage.TypeAll, int64(x >> typeCount)} return storage.FileDesc{
Type: storage.FileType(x) & storage.TypeAll,
Num: int64(x >> typeCount),
}
} }
type emulatedError struct { type emulatedError struct {
+1 -1
View File
@@ -20,7 +20,7 @@ func shorten(str string) string {
return str[:3] + ".." + str[len(str)-3:] return str[:3] + ".." + str[len(str)-3:]
} }
var bunits = [...]string{"", "Ki", "Mi", "Gi"} var bunits = [...]string{"", "Ki", "Mi", "Gi", "Ti"}
func shortenb(bytes int) string { func shortenb(bytes int) string {
i := 0 i := 0
+1 -1
View File
@@ -526,7 +526,7 @@ func main() {
getStat.record(1) getStat.record(1)
if checksum0, checksum1 := dataChecksum(v2); checksum0 != checksum1 { if checksum0, checksum1 := dataChecksum(v2); checksum0 != checksum1 {
err := &errors.ErrCorrupted{Fd: storage.FileDesc{0xff, 0}, Err: fmt.Errorf("v2: %x: checksum mismatch: %v vs %v", v2, checksum0, checksum1)} err := &errors.ErrCorrupted{Fd: storage.FileDesc{Type: 0xff, Num: 0}, Err: fmt.Errorf("v2: %x: checksum mismatch: %v vs %v", v2, checksum0, checksum1)}
fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2) fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2)
} }