publish: lock pool on non MultiDist publish

* revert mutex on LinkFromPool
* add tests
This commit is contained in:
André Roth
2026-01-03 23:25:26 +01:00
parent 4445839adc
commit e52962955b
7 changed files with 811 additions and 320 deletions
+32
View File
@@ -298,6 +298,18 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
multiDist = *b.MultiDist
}
// Non-MultiDist publishes share a single pool/ directory under the
// prefix. Lock at the prefix level so that concurrent publish/drop
// operations on sibling distributions cannot race during cleanup.
if !multiDist {
storagePrefix := prefix
if storage != "" {
storagePrefix = storage + ":" + prefix
}
resources = append(resources, deb.PrefixPoolLockKey(storagePrefix))
}
taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"",
b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`))
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) {
@@ -494,6 +506,13 @@ func apiPublishUpdateSwitch(c *gin.Context) {
return
}
// Non-MultiDist distributions share a single pool/ directory under the
// prefix. Acquire the prefix-level pool lock so that concurrent updates
// on sibling distributions are serialised and cannot race during cleanup.
if !published.MultiDist {
resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix()))
}
// Field mutations and fresh DB load are deferred to inside the task so
// they always operate on a consistent state after the lock is held.
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
@@ -614,6 +633,12 @@ func apiPublishDrop(c *gin.Context) {
}
resources := []string{string(published.Key())}
// Non-MultiDist distributions share a single pool/ directory under the
// prefix. Acquire the prefix-level pool lock so that a drop cannot race
// with a concurrent update or drop of a sibling distribution during cleanup.
if !published.MultiDist {
resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix()))
}
taskName := fmt.Sprintf("Delete published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
taskCollectionFactory := context.NewCollectionFactory()
@@ -1123,6 +1148,13 @@ func apiPublishUpdate(c *gin.Context) {
resources := []string{string(published.Key())}
// Non-MultiDist distributions share a single pool/ directory under the
// prefix. Acquire the prefix-level pool lock so that concurrent updates
// on sibling distributions are serialised and cannot race during cleanup.
if !published.MultiDist {
resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix()))
}
// Lock source repos / snapshots the same way apiPublishUpdateSwitch does,
// because published.Update() reads from them and concurrent modification
// would produce an inconsistent view.
+763
View File
@@ -0,0 +1,763 @@
package api
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"path/filepath"
"sync"
"time"
"github.com/aptly-dev/aptly/aptly"
ctx "github.com/aptly-dev/aptly/context"
"github.com/aptly-dev/aptly/deb"
"github.com/gin-gonic/gin"
"github.com/smira/flag"
. "gopkg.in/check.v1"
)
// PublishedFileMissingSuite reproduces the exact bug where:
// - Package import succeeds
// - Metadata is updated (Packages.gz shows the package)
// - Publish reports success
// - BUT the .deb file is missing from the published pool directory
// - Result: apt-get returns 404 when trying to download the package
type PublishedFileMissingSuite struct {
context *ctx.AptlyContext
flags *flag.FlagSet
configFile *os.File
router http.Handler
tempDir string
poolPath string
publicPath string
}
var _ = Suite(&PublishedFileMissingSuite{})
func (s *PublishedFileMissingSuite) SetUpSuite(c *C) {
aptly.Version = "publishedFileMissingTest"
tempDir, err := os.MkdirTemp("", "aptly-published-missing-test")
c.Assert(err, IsNil)
s.tempDir = tempDir
s.poolPath = filepath.Join(tempDir, "pool")
s.publicPath = filepath.Join(tempDir, "public")
file, err := os.CreateTemp("", "aptly-published-missing-config")
c.Assert(err, IsNil)
s.configFile = file
config := gin.H{
"rootDir": tempDir,
"downloadDir": filepath.Join(tempDir, "download"),
"architectures": []string{"amd64"},
"dependencyFollowSuggests": false,
"dependencyFollowRecommends": false,
"gpgDisableSign": true,
"gpgDisableVerify": true,
"gpgProvider": "internal",
"skipLegacyPool": true,
"enableMetricsEndpoint": false,
}
jsonString, err := json.Marshal(config)
c.Assert(err, IsNil)
_, err = file.Write(jsonString)
c.Assert(err, IsNil)
flags := flag.NewFlagSet("publishedFileMissingTestFlags", flag.ContinueOnError)
flags.Bool("no-lock", true, "disable database locking for test")
flags.Int("db-open-attempts", 3, "dummy")
flags.String("config", s.configFile.Name(), "config file")
flags.String("architectures", "", "dummy")
s.flags = flags
context, err := ctx.NewContext(s.flags)
c.Assert(err, IsNil)
s.context = context
s.router = Router(context)
}
func (s *PublishedFileMissingSuite) TearDownSuite(c *C) {
if s.configFile != nil {
_ = os.Remove(s.configFile.Name())
}
if s.context != nil {
s.context.Shutdown()
}
if s.tempDir != "" {
_ = os.RemoveAll(s.tempDir)
}
}
func (s *PublishedFileMissingSuite) SetUpTest(c *C) {
collectionFactory := s.context.NewCollectionFactory()
localRepoCollection := collectionFactory.LocalRepoCollection()
_ = localRepoCollection.ForEach(func(repo *deb.LocalRepo) error {
_ = localRepoCollection.Drop(repo)
return nil
})
publishedCollection := collectionFactory.PublishedRepoCollection()
_ = publishedCollection.ForEach(func(published *deb.PublishedRepo) error {
_ = publishedCollection.Remove(s.context, published.Storage, published.Prefix,
published.Distribution, collectionFactory, nil, true, true)
return nil
})
}
func (s *PublishedFileMissingSuite) TearDownTest(c *C) {
s.SetUpTest(c)
}
func (s *PublishedFileMissingSuite) httpRequest(c *C, method string, url string, body []byte) *httptest.ResponseRecorder {
w := httptest.NewRecorder()
var req *http.Request
var err error
if body != nil {
req, err = http.NewRequest(method, url, bytes.NewReader(body))
} else {
req, err = http.NewRequest(method, url, nil)
}
c.Assert(err, IsNil)
req.Header.Add("Content-Type", "application/json")
s.router.ServeHTTP(w, req)
return w
}
func (s *PublishedFileMissingSuite) createDebPackage(c *C, uploadID, packageName, version string) {
uploadPath := s.context.UploadPath()
uploadDir := filepath.Join(uploadPath, uploadID)
err := os.MkdirAll(uploadDir, 0755)
c.Assert(err, IsNil)
tempDir, err := os.MkdirTemp("", "deb-build")
c.Assert(err, IsNil)
defer func() { _ = os.RemoveAll(tempDir) }()
debianDir := filepath.Join(tempDir, "DEBIAN")
err = os.MkdirAll(debianDir, 0755)
c.Assert(err, IsNil)
controlContent := fmt.Sprintf(`Package: %s
Version: %s
Section: libs
Priority: optional
Architecture: amd64
Maintainer: Test <test@example.com>
Description: Test package
Test package for published file missing bug.
`, packageName, version)
err = os.WriteFile(filepath.Join(debianDir, "control"), []byte(controlContent), 0644)
c.Assert(err, IsNil)
usrDir := filepath.Join(tempDir, "usr", "lib")
err = os.MkdirAll(usrDir, 0755)
c.Assert(err, IsNil)
err = os.WriteFile(filepath.Join(usrDir, "lib.so"), []byte("library"), 0644)
c.Assert(err, IsNil)
debFile := filepath.Join(uploadDir, fmt.Sprintf("%s_%s_amd64.deb", packageName, version))
cmd := exec.Command("dpkg-deb", "--build", tempDir, debFile)
err = cmd.Run()
c.Assert(err, IsNil)
}
// TestPublishedFileGoMissing reproduces the exact production bug
func (s *PublishedFileMissingSuite) TestPublishedFileGoMissing(c *C) {
c.Log("=== Reproducing: Package in metadata but 404 on download ===")
// Create and publish a repository
repoName := "test-repo"
distribution := "bullseye"
createBody, _ := json.Marshal(gin.H{
"Name": repoName,
"DefaultDistribution": distribution,
"DefaultComponent": "main",
})
resp := s.httpRequest(c, "POST", "/api/repos", createBody)
c.Assert(resp.Code, Equals, 201, Commentf("Failed to create repo: %s", resp.Body.String()))
publishBody, _ := json.Marshal(gin.H{
"SourceKind": "local",
"Distribution": distribution,
"Architectures": []string{"amd64"},
"Sources": []gin.H{
{"Component": "main", "Name": repoName},
},
"Signing": gin.H{"Skip": true},
})
resp = s.httpRequest(c, "POST", "/api/publish/hrt", publishBody)
c.Assert(resp.Code, Equals, 201, Commentf("Failed to publish: %s", resp.Body.String()))
// Create package
packageName := "hrt-libblobbyclient1"
version := "20250926.152427+hrtdeb11"
uploadID := "test-upload-1"
s.createDebPackage(c, uploadID, packageName, version)
// Add package
resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoName, uploadID), nil)
c.Assert(resp.Code, Equals, 200, Commentf("Failed to add package: %s", resp.Body.String()))
// Update publish
updateBody, _ := json.Marshal(gin.H{
"Signing": gin.H{"Skip": true},
"ForceOverwrite": true,
})
resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/hrt/%s", distribution), updateBody)
c.Assert(resp.Code, Equals, 200, Commentf("Failed to update publish: %s", resp.Body.String()))
// Now check if the file is actually accessible in the published location
publishedStorage := s.context.GetPublishedStorage("")
publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath()
// Expected file path: hrt/pool/main/h/hrt-libblobbyclient1/hrt-libblobbyclient1_20250926.152427+hrtdeb11_amd64.deb
expectedPath := filepath.Join(publicPath, "hrt", "pool", "main", "h", packageName,
fmt.Sprintf("%s_%s_amd64.deb", packageName, version))
c.Logf("Checking for published file at: %s", expectedPath)
fileInfo, err := os.Stat(expectedPath)
fileExists := err == nil
c.Logf("File exists: %v", fileExists)
if fileExists {
c.Logf("File size: %d bytes", fileInfo.Size())
}
// Check metadata
resp = s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repoName), nil)
var packages []string
err = json.Unmarshal(resp.Body.Bytes(), &packages)
c.Assert(err, IsNil)
c.Logf("Packages in metadata: %d", len(packages))
// THE BUG: Metadata says package exists, but file is missing from published location
if len(packages) > 0 && !fileExists {
c.Logf("★★★ BUG REPRODUCED! ★★★")
c.Logf("Metadata shows %d package(s) but file is missing at: %s", len(packages), expectedPath)
c.Logf("This is exactly what causes: 404 Not Found [IP: 10.20.72.62 3142]")
c.Fatal("BUG CONFIRMED: Package in metadata but missing from published directory!")
}
c.Assert(fileExists, Equals, true, Commentf(
"Published file should exist at %s when package is in metadata", expectedPath))
}
// TestConcurrentPublishRace tries to trigger the race with concurrent publishes
func (s *PublishedFileMissingSuite) TestConcurrentPublishRace(c *C) {
c.Log("=== Testing concurrent publish race condition ===")
const numIterations = 4
for iteration := 0; iteration < numIterations; iteration++ {
c.Logf("--- Iteration %d/%d ---", iteration+1, numIterations)
// Create repo
repoName := fmt.Sprintf("race-repo-%d", iteration)
distribution := fmt.Sprintf("dist-%d", iteration)
createBody, _ := json.Marshal(gin.H{
"Name": repoName,
"DefaultDistribution": distribution,
"DefaultComponent": "main",
})
resp := s.httpRequest(c, "POST", "/api/repos", createBody)
c.Assert(resp.Code, Equals, 201)
publishBody, _ := json.Marshal(gin.H{
"SourceKind": "local",
"Distribution": distribution,
"Architectures": []string{"amd64"},
"Sources": []gin.H{
{"Component": "main", "Name": repoName},
},
"Signing": gin.H{"Skip": true},
})
resp = s.httpRequest(c, "POST", "/api/publish/concurrent", publishBody)
c.Assert(resp.Code, Equals, 201)
// Create multiple packages
var wg sync.WaitGroup
numPackages := 5
for i := 0; i < numPackages; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
packageName := fmt.Sprintf("pkg-%d-%d", iteration, idx)
version := "1.0.0"
uploadID := fmt.Sprintf("upload-%d-%d", iteration, idx)
s.createDebPackage(c, uploadID, packageName, version)
// Add package
resp := s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoName, uploadID), nil)
c.Logf("Package %d add: %d", idx, resp.Code)
// Small delay
time.Sleep(time.Duration(5+idx*2) * time.Millisecond)
// Publish
updateBody, _ := json.Marshal(gin.H{
"Signing": gin.H{"Skip": true},
"ForceOverwrite": true,
})
resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/concurrent/%s", distribution), updateBody)
c.Logf("Publish %d: %d", idx, resp.Code)
}(i)
}
wg.Wait()
time.Sleep(100 * time.Millisecond)
// Check all packages
resp = s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repoName), nil)
var packages []string
err := json.Unmarshal(resp.Body.Bytes(), &packages)
c.Assert(err, IsNil)
// Check published files
publishedStorage := s.context.GetPublishedStorage("")
publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath()
missingFiles := []string{}
for i := 0; i < numPackages; i++ {
packageName := fmt.Sprintf("pkg-%d-%d", iteration, i)
version := "1.0.0"
// Calculate pool path
poolSubdir := string(packageName[0])
expectedPath := filepath.Join(publicPath, "concurrent", "pool", "main", poolSubdir, packageName,
fmt.Sprintf("%s_%s_amd64.deb", packageName, version))
if _, err := os.Stat(expectedPath); os.IsNotExist(err) {
missingFiles = append(missingFiles, expectedPath)
}
}
if len(missingFiles) > 0 {
c.Logf("★★★ BUG DETECTED in iteration %d/%d! ★★★", iteration+1, numIterations)
c.Logf("Metadata shows %d packages, but %d files are MISSING:", len(packages), len(missingFiles))
for i, f := range missingFiles {
c.Logf(" [iter %d] File MISSING %d/%d: %s", iteration+1, i+1, len(missingFiles), f)
}
c.Fatalf("BUG REPRODUCED in iteration %d/%d: %d published files missing", iteration+1, numIterations, len(missingFiles))
} else {
c.Logf("[iter %d/%d] All %d files present - OK", iteration+1, numIterations, numPackages)
}
}
c.Logf("All %d iterations passed - bug not reproduced with current timing", numIterations)
}
// TestIdenticalPackageRace tests the specific case of identical SHA256 packages
func (s *PublishedFileMissingSuite) TestIdenticalPackageRace(c *C) {
c.Log("=== AGGRESSIVE test: identical package (same SHA256) race ===")
const numIterations = 4
packageName := "shared-package"
for iter := 0; iter < numIterations; iter++ {
c.Logf("Iteration %d/%d", iter+1, numIterations)
// Create two repos that will get the SAME package (unique per iteration)
repos := []string{fmt.Sprintf("identical-a-%d", iter), fmt.Sprintf("identical-b-%d", iter)}
dists := []string{fmt.Sprintf("dist-a-%d", iter), fmt.Sprintf("dist-b-%d", iter)}
for i := range repos {
createBody, _ := json.Marshal(gin.H{
"Name": repos[i],
"DefaultDistribution": dists[i],
"DefaultComponent": "main",
})
resp := s.httpRequest(c, "POST", "/api/repos", createBody)
c.Assert(resp.Code, Equals, 201)
publishBody, _ := json.Marshal(gin.H{
"SourceKind": "local",
"Distribution": dists[i],
"Architectures": []string{"amd64"},
"Sources": []gin.H{
{"Component": "main", "Name": repos[i]},
},
"Signing": gin.H{"Skip": true},
"SkipBz2": true,
})
resp = s.httpRequest(c, "POST", "/api/publish/identical", publishBody)
c.Assert(resp.Code, Equals, 201)
}
// Create IDENTICAL package file with UNIQUE VERSION per iteration
version := fmt.Sprintf("1.0.%d", iter)
uploadID1 := fmt.Sprintf("identical-upload-1-%d", iter)
uploadID2 := fmt.Sprintf("identical-upload-2-%d", iter)
s.createDebPackage(c, uploadID1, packageName, version)
// Copy to second upload (same SHA256)
uploadPath := s.context.UploadPath()
src := filepath.Join(uploadPath, uploadID1, fmt.Sprintf("%s_%s_amd64.deb", packageName, version))
destDir := filepath.Join(uploadPath, uploadID2)
err := os.MkdirAll(destDir, 0755)
c.Assert(err, IsNil)
dest := filepath.Join(destDir, fmt.Sprintf("%s_%s_amd64.deb", packageName, version))
srcData, readErr := os.ReadFile(src)
c.Assert(readErr, IsNil)
err = os.WriteFile(dest, srcData, 0644)
c.Assert(err, IsNil)
// Race: add and publish both simultaneously
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
//time.Sleep(5 * time.Millisecond)
c.Logf("[iter %d] Import A", iter)
resp := s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[0], uploadID1), nil)
c.Logf("[iter %d] Import A complete: %d", iter, resp.Code)
updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
c.Logf("[iter %d] Publish A", iter)
resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[0]), updateBody)
c.Logf("[iter %d] Publish A complete: %d", iter, resp.Code)
}()
go func() {
defer wg.Done()
//time.Sleep(7 * time.Millisecond)
c.Logf("[iter %d] Import B", iter)
resp := s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[1], uploadID2), nil)
c.Logf("[iter %d] Import B complete: %d", iter, resp.Code)
updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
c.Logf("[iter %d] Publish B", iter)
resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[1]), updateBody)
c.Logf("[iter %d] Publish B complete: %d", iter, resp.Code)
}()
//go func() {
//defer wg.Done()
//time.Sleep(15 * time.Millisecond)
//updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
//c.Logf("[iter %d] Publish A", iter)
//resp := s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[0]), updateBody)
//c.Logf("[iter %d] Publish A complete: %d", iter, resp.Code)
//}()
//go func() {
//defer wg.Done()
//time.Sleep(18 * time.Millisecond)
//updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true})
//c.Logf("[iter %d] Publish B", iter)
//resp := s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[1]), updateBody)
//c.Logf("[iter %d] Publish B complete: %d", iter, resp.Code)
//}()
wg.Wait()
time.Sleep(200 * time.Millisecond)
c.Logf("[iter %d] All operations complete", iter)
// Check the shared pool location
publishedStorage := s.context.GetPublishedStorage("")
publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath()
poolSubdir := string(packageName[0])
sharedPoolPath := filepath.Join(publicPath, "identical", "pool", "main", poolSubdir, packageName,
fmt.Sprintf("%s_%s_amd64.deb", packageName, version))
fileInfo, err := os.Stat(sharedPoolPath)
fileExists := err == nil
if fileExists {
c.Logf("[iter %d] File EXISTS at %s (size: %d)", iter, sharedPoolPath, fileInfo.Size())
} else {
c.Logf("[iter %d] File MISSING at %s (error: %v)", iter, sharedPoolPath, err)
}
// Check metadata
var packagesA, packagesB []string
resp := s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repos[0]), nil)
err = json.Unmarshal(resp.Body.Bytes(), &packagesA)
c.Assert(err, IsNil)
resp = s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repos[1]), nil)
err = json.Unmarshal(resp.Body.Bytes(), &packagesB)
c.Assert(err, IsNil)
c.Logf("[iter %d] Packages in metadata: A=%d, B=%d", iter, len(packagesA), len(packagesB))
// THE BUG: Both repos show packages in metadata, but the shared pool file is missing
if (len(packagesA) > 0 || len(packagesB) > 0) && !fileExists {
c.Logf("★★★ BUG REPRODUCED in iteration %d! ★★★", iter+1)
c.Logf("Packages in metadata A: %d, B: %d", len(packagesA), len(packagesB))
c.Logf("Shared pool file exists: %v", fileExists)
c.Logf("Pool path: %s", sharedPoolPath)
// List what files ARE in the pool directory
poolDir := filepath.Dir(sharedPoolPath)
if entries, err := os.ReadDir(poolDir); err == nil {
c.Logf("Files in pool directory %s:", poolDir)
for _, entry := range entries {
c.Logf(" - %s", entry.Name())
}
}
c.Fatalf("Metadata shows packages but shared pool file is missing (iteration %d)", iter+1)
}
}
c.Logf("All %d iterations passed - bug not reproduced", numIterations)
}
// TestConcurrentSnapshotPublishToSamePrefix reproduces the EXACT production bug:
// Multiple snapshots are published concurrently to the SAME prefix but different distributions.
// Example from production logs:
// - trixie-pgdg published to "external/postgres-auto/trixie"
// - bullseye-pgdg published to "external/postgres-auto/bullseye"
// Both share the same pool directory, causing cleanup race conditions.
func (s *PublishedFileMissingSuite) TestConcurrentSnapshotPublishToSamePrefix(c *C) {
const numIterations = 4
for iter := 0; iter < numIterations; iter++ {
c.Logf("--- Iteration %d/%d ---", iter+1, numIterations)
// Create two repos with different packages (simulating trixie-pgdg and bullseye-pgdg)
repoTrixie := fmt.Sprintf("trixie-pgdg-%d", iter)
repoBullseye := fmt.Sprintf("bullseye-pgdg-%d", iter)
// Create trixie repo
createBody, _ := json.Marshal(gin.H{
"Name": repoTrixie,
"DefaultDistribution": "trixie",
"DefaultComponent": "main",
})
resp := s.httpRequest(c, "POST", "/api/repos", createBody)
c.Assert(resp.Code, Equals, 201, Commentf("Failed to create trixie repo"))
// Create bullseye repo
createBody, _ = json.Marshal(gin.H{
"Name": repoBullseye,
"DefaultDistribution": "bullseye",
"DefaultComponent": "main",
})
resp = s.httpRequest(c, "POST", "/api/repos", createBody)
c.Assert(resp.Code, Equals, 201, Commentf("Failed to create bullseye repo"))
// Add packages to both repos
numPackages := 3
// Add packages to trixie repo
for i := 0; i < numPackages; i++ {
packageName := fmt.Sprintf("postgresql-17-trixie-pkg%d", i)
version := fmt.Sprintf("17.0.%d", iter)
uploadID := fmt.Sprintf("trixie-upload-%d-%d", iter, i)
s.createDebPackage(c, uploadID, packageName, version)
resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoTrixie, uploadID), nil)
c.Assert(resp.Code, Equals, 200, Commentf("Failed to add package to trixie"))
}
// Add packages to bullseye repo
for i := 0; i < numPackages; i++ {
packageName := fmt.Sprintf("postgresql-17-bullseye-pkg%d", i)
version := fmt.Sprintf("17.0.%d", iter)
uploadID := fmt.Sprintf("bullseye-upload-%d-%d", iter, i)
s.createDebPackage(c, uploadID, packageName, version)
resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoBullseye, uploadID), nil)
c.Assert(resp.Code, Equals, 200, Commentf("Failed to add package to bullseye"))
}
// Create snapshots from both repos
snapshotTrixie := fmt.Sprintf("%s-snap", repoTrixie)
snapshotBullseye := fmt.Sprintf("%s-snap", repoBullseye)
createSnapshotBody, _ := json.Marshal(gin.H{"Name": snapshotTrixie})
resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/snapshots", repoTrixie), createSnapshotBody)
c.Assert(resp.Code, Equals, 201, Commentf("Failed to create trixie snapshot"))
createSnapshotBody, _ = json.Marshal(gin.H{"Name": snapshotBullseye})
resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/snapshots", repoBullseye), createSnapshotBody)
c.Assert(resp.Code, Equals, 201, Commentf("Failed to create bullseye snapshot"))
// Publish both snapshots CONCURRENTLY to the SAME prefix
// This mimics production where both are published to "external/postgres-auto"
// Use the SAME prefix across all iterations to trigger the race more aggressively
sharedPrefix := "postgres-auto"
var wg sync.WaitGroup
var trixiePublishCode, bullseyePublishCode int
wg.Add(2)
// Publish or update trixie snapshot
go func() {
defer wg.Done()
var resp *httptest.ResponseRecorder
if iter == 0 {
// First iteration: CREATE
publishBody, _ := json.Marshal(gin.H{
"SourceKind": "snapshot",
"Distribution": "trixie",
"Architectures": []string{"amd64"},
"Sources": []gin.H{
{"Name": snapshotTrixie},
},
"Signing": gin.H{"Skip": true},
"SkipBz2": true,
"ForceOverwrite": true,
"SkipCleanup": false, // Force cleanup to run
})
resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/publish/%s", sharedPrefix), publishBody)
} else {
// Subsequent iterations: UPDATE (this is what happens in production)
updateBody, _ := json.Marshal(gin.H{
"Snapshots": []gin.H{
{"Component": "main", "Name": snapshotTrixie},
},
"Signing": gin.H{"Skip": true},
"SkipBz2": true,
"ForceOverwrite": true,
"SkipCleanup": false,
})
resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/%s/trixie", sharedPrefix), updateBody)
}
trixiePublishCode = resp.Code
c.Logf("[iter %d] Trixie publish/update completed: %d", iter, resp.Code)
}()
// Publish or update bullseye snapshot
go func() {
defer wg.Done()
var resp *httptest.ResponseRecorder
if iter == 0 {
// First iteration: CREATE
publishBody, _ := json.Marshal(gin.H{
"SourceKind": "snapshot",
"Distribution": "bullseye",
"Architectures": []string{"amd64"},
"Sources": []gin.H{
{"Name": snapshotBullseye},
},
"Signing": gin.H{"Skip": true},
"SkipBz2": true,
"ForceOverwrite": true,
"SkipCleanup": false,
})
resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/publish/%s", sharedPrefix), publishBody)
} else {
// Subsequent iterations: UPDATE
updateBody, _ := json.Marshal(gin.H{
"Snapshots": []gin.H{
{"Component": "main", "Name": snapshotBullseye},
},
"Signing": gin.H{"Skip": true},
"SkipBz2": true,
"ForceOverwrite": true,
"SkipCleanup": false,
})
resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/%s/bullseye", sharedPrefix), updateBody)
}
bullseyePublishCode = resp.Code
c.Logf("[iter %d] Bullseye publish/update completed: %d", iter, resp.Code)
}()
wg.Wait()
time.Sleep(50 * time.Millisecond)
// Verify publishes succeeded (201 for create, 200 for update)
expectedCode := 201
if iter > 0 {
expectedCode = 200
}
c.Assert(trixiePublishCode, Equals, expectedCode, Commentf("Trixie publish/update should succeed"))
c.Assert(bullseyePublishCode, Equals, expectedCode, Commentf("Bullseye publish/update should succeed"))
// Verify ALL package files exist in the published pool
publishedStorage := s.context.GetPublishedStorage("")
publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath()
missingFiles := []string{}
expectedFiles := []string{}
// Check trixie packages
for i := 0; i < numPackages; i++ {
packageName := fmt.Sprintf("postgresql-17-trixie-pkg%d", i)
version := fmt.Sprintf("17.0.%d", iter)
poolSubdir := string(packageName[0])
expectedPath := filepath.Join(publicPath, sharedPrefix, "pool", "main", poolSubdir, packageName,
fmt.Sprintf("%s_%s_amd64.deb", packageName, version))
expectedFiles = append(expectedFiles, expectedPath)
if _, err := os.Stat(expectedPath); os.IsNotExist(err) {
missingFiles = append(missingFiles, fmt.Sprintf("TRIXIE: %s", filepath.Base(expectedPath)))
}
}
// Check bullseye packages
for i := 0; i < numPackages; i++ {
packageName := fmt.Sprintf("postgresql-17-bullseye-pkg%d", i)
version := fmt.Sprintf("17.0.%d", iter)
poolSubdir := string(packageName[0])
expectedPath := filepath.Join(publicPath, sharedPrefix, "pool", "main", poolSubdir, packageName,
fmt.Sprintf("%s_%s_amd64.deb", packageName, version))
expectedFiles = append(expectedFiles, expectedPath)
if _, err := os.Stat(expectedPath); os.IsNotExist(err) {
missingFiles = append(missingFiles, fmt.Sprintf("BULLSEYE: %s", filepath.Base(expectedPath)))
}
}
// BUG: Files from one distribution are deleted by the other's cleanup
if len(missingFiles) > 0 {
c.Logf("★★★ BUG REPRODUCED in iteration %d/%d! ★★★", iter+1, numIterations)
c.Logf("Both publishes to prefix '%s' succeeded, but %d files are MISSING:", sharedPrefix, len(missingFiles))
for i, f := range missingFiles {
c.Logf(" Missing file %d/%d: %s", i+1, len(missingFiles), f)
}
c.Logf("\nThis reproduces the exact production bug where:")
c.Logf(" 1. Mirror updates complete successfully")
c.Logf(" 2. Snapshots are created")
c.Logf(" 3. Both snapshots publish to same prefix (different distributions)")
c.Logf(" 4. Cleanup from one publish DELETES files from the other")
c.Logf(" 5. Result: apt-get returns 404 when downloading packages")
// List what's actually in the pool
poolDir := filepath.Join(publicPath, sharedPrefix, "pool", "main")
if entries, err := os.ReadDir(poolDir); err == nil {
c.Logf("\nActual pool directory contents (%s):", poolDir)
for _, entry := range entries {
c.Logf(" - %s/", entry.Name())
}
}
c.Fatalf("BUG CONFIRMED (iteration %d/%d): %d files missing from shared pool",
iter+1, numIterations, len(missingFiles))
} else {
c.Logf("[iter %d/%d] All %d files present - OK", iter+1, numIterations, len(expectedFiles))
}
}
c.Logf("✓ All %d iterations passed - no files missing", numIterations)
}
+9
View File
@@ -612,6 +612,15 @@ func (p *PublishedRepo) Key() []byte {
return []byte("U" + p.StoragePrefix() + ">>" + p.Distribution)
}
// PrefixPoolLockKey returns the task-queue resource key that serialises all
// publish operations sharing the same pool directory under storagePrefix.
// It must be held whenever a non-MultiDist publish may read or clean the
// shared pool, to prevent concurrent cleanup runs from deleting each other's
// files. See docs/Resource-Locking.md for the full key-namespace table.
func PrefixPoolLockKey(storagePrefix string) string {
return "P" + storagePrefix
}
// RefKey is a unique id for package reference list
func (p *PublishedRepo) RefKey(component string) []byte {
return []byte("E" + p.UUID + component)
+7 -2
View File
@@ -873,7 +873,10 @@ func (s *PublishedRepoCollectionSuite) TestListReferencedFiles(c *C) {
snap3 := NewSnapshotFromRefList("snap3", []*Snapshot{}, s.snap2.RefList(), "desc3")
_ = s.snapshotCollection.Add(snap3)
// Ensure that adding a second publish point with matching files doesn't give duplicate results.
// When a second publish point references the same package (snap3 is a clone of snap2,
// both containing p3/lonely-strangers), listReferencedFilesByComponent deduplicates by
// package ref so the file appears only once. StrSlicesSubstract handles a single entry
// correctly, so no duplicate is needed for cleanup safety.
repo3, err := NewPublishedRepo("", "", "anaconda-2", []string{}, []string{"main"}, []interface{}{snap3}, s.factory, false)
c.Check(err, IsNil)
c.Check(s.collection.Add(repo3), IsNil)
@@ -888,7 +891,9 @@ func (s *PublishedRepoCollectionSuite) TestListReferencedFiles(c *C) {
"a/alien-arena/alien-arena-common_7.40-2_i386.deb",
"a/alien-arena/mars-invaders_7.40-2_i386.deb",
},
"main": {"a/alien-arena/lonely-strangers_7.40-2_i386.deb"},
"main": {
"a/alien-arena/lonely-strangers_7.40-2_i386.deb",
},
})
}
-283
View File
@@ -1,283 +0,0 @@
package files
import (
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/aptly-dev/aptly/aptly"
"github.com/aptly-dev/aptly/utils"
. "gopkg.in/check.v1"
)
type LinkFromPoolConcurrencySuite struct {
root string
poolDir string
storage *PublishedStorage
pool *PackagePool
cs aptly.ChecksumStorage
testFile string
testContent []byte
testChecksums utils.ChecksumInfo
srcPoolPath string
}
var _ = Suite(&LinkFromPoolConcurrencySuite{})
func (s *LinkFromPoolConcurrencySuite) SetUpTest(c *C) {
s.root = c.MkDir()
s.poolDir = filepath.Join(s.root, "pool")
publishDir := filepath.Join(s.root, "public")
// Create package pool and published storage
s.pool = NewPackagePool(s.poolDir, true)
s.storage = NewPublishedStorage(publishDir, "copy", "checksum")
s.cs = NewMockChecksumStorage()
// Create test file content
s.testContent = []byte("test package content for concurrency testing")
s.testFile = filepath.Join(s.root, "test-package.deb")
err := os.WriteFile(s.testFile, s.testContent, 0644)
c.Assert(err, IsNil)
// Calculate checksums
md5sum, err := utils.MD5ChecksumForFile(s.testFile)
c.Assert(err, IsNil)
s.testChecksums = utils.ChecksumInfo{
Size: int64(len(s.testContent)),
MD5: md5sum,
}
// Import the test file into the pool
s.srcPoolPath, err = s.pool.Import(s.testFile, "test-package.deb", &s.testChecksums, false, s.cs)
c.Assert(err, IsNil)
}
func (s *LinkFromPoolConcurrencySuite) TestLinkFromPoolConcurrency(c *C) {
// Test concurrent LinkFromPool operations to ensure no race conditions
concurrency := 5000
iterations := 10
for iter := 0; iter < iterations; iter++ {
c.Logf("Iteration %d: Testing concurrent LinkFromPool with %d goroutines", iter+1, concurrency)
destPath := fmt.Sprintf("main/t/test%d", iter)
var wg sync.WaitGroup
errors := make(chan error, concurrency)
successes := make(chan struct{}, concurrency)
start := time.Now()
// Launch concurrent LinkFromPool operations
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// Use force=true to test the most vulnerable code path (remove-then-create)
err := s.storage.LinkFromPool(
"", // publishedPrefix
destPath, // publishedRelPath
"test-package.deb", // fileName
s.pool, // sourcePool
s.srcPoolPath, // sourcePath
s.testChecksums, // sourceChecksums
true, // force - this triggers vulnerable remove-then-create pattern
)
if err != nil {
errors <- fmt.Errorf("goroutine %d failed: %v", id, err)
} else {
successes <- struct{}{}
}
}(i)
}
// Wait for completion
wg.Wait()
duration := time.Since(start)
close(errors)
close(successes)
// Count results
errorCount := 0
successCount := 0
var firstError error
for err := range errors {
errorCount++
if firstError == nil {
firstError = err
}
c.Logf("Race condition error: %v", err)
}
for range successes {
successCount++
}
c.Logf("Results: %d successes, %d errors, took %v", successCount, errorCount, duration)
// Assert no race conditions occurred
if errorCount > 0 {
c.Fatalf("Race condition detected in iteration %d! "+
"Errors: %d out of %d operations (%.1f%% failure rate). "+
"First error: %v. "+
"This indicates the fix is not working properly.",
iter+1, errorCount, concurrency,
float64(errorCount)/float64(concurrency)*100, firstError)
}
// Verify the final file exists and has correct content
finalFile := filepath.Join(s.storage.rootPath, destPath, "test-package.deb")
_, err := os.Stat(finalFile)
c.Assert(err, IsNil, Commentf("Final file should exist after concurrent operations"))
content, err := os.ReadFile(finalFile)
c.Assert(err, IsNil, Commentf("Should be able to read final file"))
c.Assert(content, DeepEquals, s.testContent, Commentf("File content should be intact after concurrent operations"))
c.Logf("✓ Iteration %d: No race conditions detected", iter+1)
}
c.Logf("SUCCESS: Handled %d total concurrent operations across %d iterations with no race conditions",
concurrency*iterations, iterations)
}
func (s *LinkFromPoolConcurrencySuite) TestLinkFromPoolConcurrencyDifferentFiles(c *C) {
// Test concurrent operations on different files to ensure no blocking
concurrency := 10
var wg sync.WaitGroup
errors := make(chan error, concurrency)
start := time.Now()
// Launch concurrent operations on different destination files
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
destPath := fmt.Sprintf("main/t/test-file-%d", id)
err := s.storage.LinkFromPool(
"", // publishedPrefix
destPath, // publishedRelPath
"test-package.deb", // fileName
s.pool, // sourcePool
s.srcPoolPath, // sourcePath
s.testChecksums, // sourceChecksums
false, // force
)
if err != nil {
errors <- fmt.Errorf("goroutine %d failed: %v", id, err)
}
}(i)
}
// Wait for completion
wg.Wait()
duration := time.Since(start)
close(errors)
// Count errors
errorCount := 0
for err := range errors {
errorCount++
c.Logf("Error: %v", err)
}
c.Assert(errorCount, Equals, 0, Commentf("No errors should occur when linking to different files"))
c.Logf("SUCCESS: %d concurrent operations on different files completed in %v", concurrency, duration)
// Verify all files were created correctly
for i := 0; i < concurrency; i++ {
finalFile := filepath.Join(s.storage.rootPath, fmt.Sprintf("main/t/test-file-%d", i), "test-package.deb")
_, err := os.Stat(finalFile)
c.Assert(err, IsNil, Commentf("File %d should exist", i))
content, err := os.ReadFile(finalFile)
c.Assert(err, IsNil, Commentf("Should be able to read file %d", i))
c.Assert(content, DeepEquals, s.testContent, Commentf("File %d content should be correct", i))
}
}
func (s *LinkFromPoolConcurrencySuite) TestLinkFromPoolWithoutForceNoConcurrencyIssues(c *C) {
// Test that when force=false, concurrent operations fail gracefully without corruption
concurrency := 20
destPath := "main/t/single-dest"
var wg sync.WaitGroup
errors := make(chan error, concurrency)
successes := make(chan struct{}, concurrency)
// First, create the file so subsequent operations will conflict
err := s.storage.LinkFromPool("", destPath, "test-package.deb", s.pool, s.srcPoolPath, s.testChecksums, false)
c.Assert(err, IsNil)
start := time.Now()
// Launch concurrent operations that should mostly fail
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
err := s.storage.LinkFromPool(
"", // publishedPrefix
destPath, // publishedRelPath
"test-package.deb", // fileName
s.pool, // sourcePool
s.srcPoolPath, // sourcePath
s.testChecksums, // sourceChecksums
false, // force=false - should fail if file exists and is same
)
if err != nil {
errors <- err
} else {
successes <- struct{}{}
}
}(i)
}
// Wait for completion
wg.Wait()
duration := time.Since(start)
close(errors)
close(successes)
errorCount := 0
successCount := 0
for range errors {
errorCount++
}
for range successes {
successCount++
}
c.Logf("Results with force=false: %d successes, %d errors, took %v", successCount, errorCount, duration)
// With force=false and identical files, operations should succeed (file already exists with same content)
// No race conditions should cause crashes or corruption
c.Assert(errorCount, Equals, 0, Commentf("With identical files and force=false, operations should succeed"))
// Verify the file still exists and has correct content
finalFile := filepath.Join(s.storage.rootPath, destPath, "test-package.deb")
content, err := os.ReadFile(finalFile)
c.Assert(err, IsNil)
c.Assert(content, DeepEquals, s.testContent, Commentf("File should not be corrupted by concurrent access"))
}
-25
View File
@@ -26,26 +26,6 @@ type PublishedStorage struct {
verifyMethod uint
}
// Global mutex map to prevent concurrent access to the same destinationPath in LinkFromPool
var (
fileLockMutex sync.Mutex
fileLocks = make(map[string]*sync.Mutex)
)
// getFileLock returns a mutex for a specific file path to prevent concurrent modifications
func getFileLock(filePath string) *sync.Mutex {
fileLockMutex.Lock()
defer fileLockMutex.Unlock()
if mutex, exists := fileLocks[filePath]; exists {
return mutex
}
mutex := &sync.Mutex{}
fileLocks[filePath] = mutex
return mutex
}
// Check interfaces
var (
_ aptly.PublishedStorage = (*PublishedStorage)(nil)
@@ -172,11 +152,6 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
poolPath := filepath.Join(storage.rootPath, publishedPrefix, publishedRelPath, filepath.Dir(fileName))
destinationPath := filepath.Join(poolPath, baseName)
// Acquire file-specific lock to prevent concurrent access to the same file
fileLock := getFileLock(destinationPath)
fileLock.Lock()
defer fileLock.Unlock()
var localSourcePool aptly.LocalPackagePool
if storage.linkMethod != LinkMethodCopy {
pp, ok := sourcePool.(aptly.LocalPackagePool)
-10
View File
@@ -632,16 +632,6 @@ func (s *DiskFullNoRootSuite) TestLinkFromPoolCopySyncErrorIsReturned(c *C) {
c.Check(strings.Contains(err.Error(), "error syncing file"), Equals, true)
}
func (s *DiskFullNoRootSuite) TestGetFileLockReusesMutex(c *C) {
a := getFileLock(filepath.Join(s.root, "a"))
b := getFileLock(filepath.Join(s.root, "a"))
c.Check(a == b, Equals, true)
c1 := getFileLock(filepath.Join(s.root, "c1"))
c2 := getFileLock(filepath.Join(s.root, "c2"))
c.Check(c1 == c2, Equals, false)
}
func (s *DiskFullNoRootSuite) TestPutFileFailsIfDestinationDirMissing(c *C) {
storage := NewPublishedStorage(s.root, "", "")