Add mutex on LinkFromPool to fix #1449

This fixes the race condition that happens when you call publish
concurrently. It adds a valuable test that reproduces the error almost
deterministically, it's hard to say always but I have run this in loop
100 times and it reproduces the error consistently without the patch and
after the patch it works consistently.
This commit is contained in:
Agustin Henze
2025-08-19 14:37:52 +02:00
parent 8ca4cb8dcb
commit 3608c137a0
3 changed files with 318 additions and 8 deletions

View File

@@ -22,6 +22,26 @@ type PublishedStorage struct {
verifyMethod uint
}
// Global mutex map to prevent concurrent access to the same destinationPath in LinkFromPool
var (
fileLockMutex sync.Mutex
fileLocks = make(map[string]*sync.Mutex)
)
// getFileLock returns a mutex for a specific file path to prevent concurrent modifications
func getFileLock(filePath string) *sync.Mutex {
fileLockMutex.Lock()
defer fileLockMutex.Unlock()
if mutex, exists := fileLocks[filePath]; exists {
return mutex
}
mutex := &sync.Mutex{}
fileLocks[filePath] = mutex
return mutex
}
// Check interfaces
var (
_ aptly.PublishedStorage = (*PublishedStorage)(nil)
@@ -136,6 +156,12 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
baseName := filepath.Base(fileName)
poolPath := filepath.Join(storage.rootPath, publishedPrefix, publishedRelPath, filepath.Dir(fileName))
destinationPath := filepath.Join(poolPath, baseName)
// Acquire file-specific lock to prevent concurrent access to the same file
fileLock := getFileLock(destinationPath)
fileLock.Lock()
defer fileLock.Unlock()
var localSourcePool aptly.LocalPackagePool
if storage.linkMethod != LinkMethodCopy {
@@ -154,7 +180,7 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
var dstStat os.FileInfo
dstStat, err = os.Stat(filepath.Join(poolPath, baseName))
dstStat, err = os.Stat(destinationPath)
if err == nil {
// already exists, check source file
@@ -173,7 +199,7 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
} else {
// if source and destination have the same checksums, no need to copy
var dstMD5 string
dstMD5, err = utils.MD5ChecksumForFile(filepath.Join(poolPath, baseName))
dstMD5, err = utils.MD5ChecksumForFile(destinationPath)
if err != nil {
return err
@@ -204,11 +230,11 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
// source and destination have different inodes, if !forced, this is fatal error
if !force {
return fmt.Errorf("error linking file to %s: file already exists and is different", filepath.Join(poolPath, baseName))
return fmt.Errorf("error linking file to %s: file already exists and is different", destinationPath)
}
// forced, so remove destination
err = os.Remove(filepath.Join(poolPath, baseName))
err = os.Remove(destinationPath)
if err != nil {
return err
}
@@ -223,7 +249,7 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
}
var dst *os.File
dst, err = os.Create(filepath.Join(poolPath, baseName))
dst, err = os.Create(destinationPath)
if err != nil {
_ = r.Close()
return err
@@ -244,9 +270,9 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
err = dst.Close()
} else if storage.linkMethod == LinkMethodSymLink {
err = localSourcePool.Symlink(sourcePath, filepath.Join(poolPath, baseName))
err = localSourcePool.Symlink(sourcePath, destinationPath)
} else {
err = localSourcePool.Link(sourcePath, filepath.Join(poolPath, baseName))
err = localSourcePool.Link(sourcePath, destinationPath)
}
return err