mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-06-22 08:10:52 +00:00
s3: fix pathCache race condition
Make sure pathCache is properly locked for concurrent access. Add RWMutex to the PublishedStorage struct: - Cache initialization Read-lock to test for nil, then write-lock with a second nil check before populating - Cache reads RLock/RUnlock, allowing concurrent readers - Cache writes / deletes Lock/Unlock
This commit is contained in:
@@ -14,7 +14,8 @@ import (
|
||||
// @Router /api/s3 [get]
|
||||
func apiS3List(c *gin.Context) {
|
||||
keys := []string{}
|
||||
for k := range context.Config().S3PublishRoots {
|
||||
s3Roots := context.Config().S3PublishRoots
|
||||
for k := range s3Roots {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
c.JSON(200, keys)
|
||||
|
||||
+31
-8
@@ -7,6 +7,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/aptly-dev/aptly/aptly"
|
||||
"github.com/aptly-dev/aptly/utils"
|
||||
@@ -51,6 +52,7 @@ type PublishedStorage struct {
|
||||
plusWorkaround bool
|
||||
disableMultiDel bool
|
||||
pathCache map[string]string
|
||||
pathCacheMutex sync.RWMutex
|
||||
|
||||
// True if the bucket encrypts objects by default.
|
||||
encryptByDefault bool
|
||||
@@ -251,7 +253,9 @@ func (storage *PublishedStorage) Remove(path string) error {
|
||||
_ = storage.Remove(strings.Replace(path, "+", " ", -1))
|
||||
}
|
||||
|
||||
storage.pathCacheMutex.Lock()
|
||||
delete(storage.pathCache, path)
|
||||
storage.pathCacheMutex.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -280,7 +284,9 @@ func (storage *PublishedStorage) RemoveDirs(path string, _ aptly.Progress) error
|
||||
if err != nil {
|
||||
return fmt.Errorf("error deleting path %s from %s: %s", filelist[i], storage, err)
|
||||
}
|
||||
storage.pathCacheMutex.Lock()
|
||||
delete(storage.pathCache, filepath.Join(path, filelist[i]))
|
||||
storage.pathCacheMutex.Unlock()
|
||||
}
|
||||
} else {
|
||||
numParts := (len(filelist) + page - 1) / page
|
||||
@@ -313,9 +319,11 @@ func (storage *PublishedStorage) RemoveDirs(path string, _ aptly.Progress) error
|
||||
if err != nil {
|
||||
return fmt.Errorf("error deleting multiple paths from %s: %s", storage, err)
|
||||
}
|
||||
storage.pathCacheMutex.Lock()
|
||||
for i := range part {
|
||||
delete(storage.pathCache, filepath.Join(path, part[i]))
|
||||
}
|
||||
storage.pathCacheMutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -337,20 +345,31 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
|
||||
relPath := filepath.Join(publishedDirectory, fileName)
|
||||
poolPath := filepath.Join(storage.prefix, relPath)
|
||||
|
||||
if storage.pathCache == nil {
|
||||
paths, md5s, err := storage.internalFilelist(filepath.Join(publishedPrefix, "pool"), true)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error caching paths under prefix")
|
||||
}
|
||||
storage.pathCacheMutex.RLock()
|
||||
cacheNil := storage.pathCache == nil
|
||||
storage.pathCacheMutex.RUnlock()
|
||||
|
||||
storage.pathCache = make(map[string]string, len(paths))
|
||||
if cacheNil {
|
||||
storage.pathCacheMutex.Lock()
|
||||
if storage.pathCache == nil {
|
||||
paths, md5s, err := storage.internalFilelist(filepath.Join(publishedPrefix, "pool"), true)
|
||||
if err != nil {
|
||||
storage.pathCacheMutex.Unlock()
|
||||
return errors.Wrap(err, "error caching paths under prefix")
|
||||
}
|
||||
|
||||
for i := range paths {
|
||||
storage.pathCache[filepath.Join(publishedPrefix, "pool", paths[i])] = md5s[i]
|
||||
storage.pathCache = make(map[string]string, len(paths))
|
||||
|
||||
for i := range paths {
|
||||
storage.pathCache[filepath.Join(publishedPrefix, "pool", paths[i])] = md5s[i]
|
||||
}
|
||||
}
|
||||
storage.pathCacheMutex.Unlock()
|
||||
}
|
||||
|
||||
storage.pathCacheMutex.RLock()
|
||||
destinationMD5, exists := storage.pathCache[relPath]
|
||||
storage.pathCacheMutex.RUnlock()
|
||||
sourceMD5 := sourceChecksums.MD5
|
||||
|
||||
if exists {
|
||||
@@ -367,7 +386,9 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
|
||||
err = errors.Wrap(err, fmt.Sprintf("error verifying MD5 for %s: %s", storage, poolPath))
|
||||
return err
|
||||
}
|
||||
storage.pathCacheMutex.Lock()
|
||||
storage.pathCache[relPath] = destinationMD5
|
||||
storage.pathCacheMutex.Unlock()
|
||||
}
|
||||
|
||||
if destinationMD5 == sourceMD5 {
|
||||
@@ -388,7 +409,9 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
|
||||
log.Debug().Msgf("S3: LinkFromPool '%s'", relPath)
|
||||
err = storage.putFile(relPath, source, sourceMD5)
|
||||
if err == nil {
|
||||
storage.pathCacheMutex.Lock()
|
||||
storage.pathCache[relPath] = sourceMD5
|
||||
storage.pathCacheMutex.Unlock()
|
||||
} else {
|
||||
err = errors.Wrap(err, fmt.Sprintf("error uploading %s to %s: %s", sourcePath, storage, poolPath))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user