diff --git a/api/publish.go b/api/publish.go index 86fa3f8f..7aa5afca 100644 --- a/api/publish.go +++ b/api/publish.go @@ -298,6 +298,18 @@ func apiPublishRepoOrSnapshot(c *gin.Context) { multiDist = *b.MultiDist } + // Non-MultiDist publishes share a single pool/ directory under the + // prefix. Lock at the prefix level so that concurrent publish/drop + // operations on sibling distributions cannot race during cleanup. + if !multiDist { + storagePrefix := prefix + if storage != "" { + storagePrefix = storage + ":" + prefix + } + + resources = append(resources, deb.PrefixPoolLockKey(storagePrefix)) + } + taskName := fmt.Sprintf("Publish %s repository %s/%s with components \"%s\" and sources \"%s\"", b.SourceKind, param, b.Distribution, strings.Join(components, `", "`), strings.Join(names, `", "`)) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, detail *task.Detail) (*task.ProcessReturnValue, error) { @@ -494,6 +506,13 @@ func apiPublishUpdateSwitch(c *gin.Context) { return } + // Non-MultiDist distributions share a single pool/ directory under the + // prefix. Acquire the prefix-level pool lock so that concurrent updates + // on sibling distributions are serialised and cannot race during cleanup. + if !published.MultiDist { + resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix())) + } + // Field mutations and fresh DB load are deferred to inside the task so // they always operate on a consistent state after the lock is held. taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) @@ -614,6 +633,12 @@ func apiPublishDrop(c *gin.Context) { } resources := []string{string(published.Key())} + // Non-MultiDist distributions share a single pool/ directory under the + // prefix. Acquire the prefix-level pool lock so that a drop cannot race + // with a concurrent update or drop of a sibling distribution during cleanup. + if !published.MultiDist { + resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix())) + } taskName := fmt.Sprintf("Delete published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution) maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) { taskCollectionFactory := context.NewCollectionFactory() @@ -1123,6 +1148,13 @@ func apiPublishUpdate(c *gin.Context) { resources := []string{string(published.Key())} + // Non-MultiDist distributions share a single pool/ directory under the + // prefix. Acquire the prefix-level pool lock so that concurrent updates + // on sibling distributions are serialised and cannot race during cleanup. + if !published.MultiDist { + resources = append(resources, deb.PrefixPoolLockKey(published.StoragePrefix())) + } + // Lock source repos / snapshots the same way apiPublishUpdateSwitch does, // because published.Update() reads from them and concurrent modification // would produce an inconsistent view. diff --git a/api/published_file_missing_test.go b/api/published_file_missing_test.go new file mode 100644 index 00000000..ecd1f347 --- /dev/null +++ b/api/published_file_missing_test.go @@ -0,0 +1,763 @@ +package api + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "os" + "os/exec" + "path/filepath" + "sync" + "time" + + "github.com/aptly-dev/aptly/aptly" + ctx "github.com/aptly-dev/aptly/context" + "github.com/aptly-dev/aptly/deb" + "github.com/gin-gonic/gin" + "github.com/smira/flag" + + . "gopkg.in/check.v1" +) + +// PublishedFileMissingSuite reproduces the exact bug where: +// - Package import succeeds +// - Metadata is updated (Packages.gz shows the package) +// - Publish reports success +// - BUT the .deb file is missing from the published pool directory +// - Result: apt-get returns 404 when trying to download the package +type PublishedFileMissingSuite struct { + context *ctx.AptlyContext + flags *flag.FlagSet + configFile *os.File + router http.Handler + tempDir string + poolPath string + publicPath string +} + +var _ = Suite(&PublishedFileMissingSuite{}) + +func (s *PublishedFileMissingSuite) SetUpSuite(c *C) { + aptly.Version = "publishedFileMissingTest" + + tempDir, err := os.MkdirTemp("", "aptly-published-missing-test") + c.Assert(err, IsNil) + s.tempDir = tempDir + s.poolPath = filepath.Join(tempDir, "pool") + s.publicPath = filepath.Join(tempDir, "public") + + file, err := os.CreateTemp("", "aptly-published-missing-config") + c.Assert(err, IsNil) + s.configFile = file + + config := gin.H{ + "rootDir": tempDir, + "downloadDir": filepath.Join(tempDir, "download"), + "architectures": []string{"amd64"}, + "dependencyFollowSuggests": false, + "dependencyFollowRecommends": false, + "gpgDisableSign": true, + "gpgDisableVerify": true, + "gpgProvider": "internal", + "skipLegacyPool": true, + "enableMetricsEndpoint": false, + } + + jsonString, err := json.Marshal(config) + c.Assert(err, IsNil) + _, err = file.Write(jsonString) + c.Assert(err, IsNil) + + flags := flag.NewFlagSet("publishedFileMissingTestFlags", flag.ContinueOnError) + flags.Bool("no-lock", true, "disable database locking for test") + flags.Int("db-open-attempts", 3, "dummy") + flags.String("config", s.configFile.Name(), "config file") + flags.String("architectures", "", "dummy") + s.flags = flags + + context, err := ctx.NewContext(s.flags) + c.Assert(err, IsNil) + + s.context = context + s.router = Router(context) +} + +func (s *PublishedFileMissingSuite) TearDownSuite(c *C) { + if s.configFile != nil { + _ = os.Remove(s.configFile.Name()) + } + if s.context != nil { + s.context.Shutdown() + } + if s.tempDir != "" { + _ = os.RemoveAll(s.tempDir) + } +} + +func (s *PublishedFileMissingSuite) SetUpTest(c *C) { + collectionFactory := s.context.NewCollectionFactory() + + localRepoCollection := collectionFactory.LocalRepoCollection() + _ = localRepoCollection.ForEach(func(repo *deb.LocalRepo) error { + _ = localRepoCollection.Drop(repo) + return nil + }) + + publishedCollection := collectionFactory.PublishedRepoCollection() + _ = publishedCollection.ForEach(func(published *deb.PublishedRepo) error { + _ = publishedCollection.Remove(s.context, published.Storage, published.Prefix, + published.Distribution, collectionFactory, nil, true, true) + return nil + }) +} + +func (s *PublishedFileMissingSuite) TearDownTest(c *C) { + s.SetUpTest(c) +} + +func (s *PublishedFileMissingSuite) httpRequest(c *C, method string, url string, body []byte) *httptest.ResponseRecorder { + w := httptest.NewRecorder() + var req *http.Request + var err error + + if body != nil { + req, err = http.NewRequest(method, url, bytes.NewReader(body)) + } else { + req, err = http.NewRequest(method, url, nil) + } + c.Assert(err, IsNil) + req.Header.Add("Content-Type", "application/json") + s.router.ServeHTTP(w, req) + return w +} + +func (s *PublishedFileMissingSuite) createDebPackage(c *C, uploadID, packageName, version string) { + uploadPath := s.context.UploadPath() + uploadDir := filepath.Join(uploadPath, uploadID) + err := os.MkdirAll(uploadDir, 0755) + c.Assert(err, IsNil) + + tempDir, err := os.MkdirTemp("", "deb-build") + c.Assert(err, IsNil) + defer func() { _ = os.RemoveAll(tempDir) }() + + debianDir := filepath.Join(tempDir, "DEBIAN") + err = os.MkdirAll(debianDir, 0755) + c.Assert(err, IsNil) + + controlContent := fmt.Sprintf(`Package: %s +Version: %s +Section: libs +Priority: optional +Architecture: amd64 +Maintainer: Test +Description: Test package + Test package for published file missing bug. +`, packageName, version) + + err = os.WriteFile(filepath.Join(debianDir, "control"), []byte(controlContent), 0644) + c.Assert(err, IsNil) + + usrDir := filepath.Join(tempDir, "usr", "lib") + err = os.MkdirAll(usrDir, 0755) + c.Assert(err, IsNil) + err = os.WriteFile(filepath.Join(usrDir, "lib.so"), []byte("library"), 0644) + c.Assert(err, IsNil) + + debFile := filepath.Join(uploadDir, fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + cmd := exec.Command("dpkg-deb", "--build", tempDir, debFile) + err = cmd.Run() + c.Assert(err, IsNil) +} + +// TestPublishedFileGoMissing reproduces the exact production bug +func (s *PublishedFileMissingSuite) TestPublishedFileGoMissing(c *C) { + c.Log("=== Reproducing: Package in metadata but 404 on download ===") + + // Create and publish a repository + repoName := "test-repo" + distribution := "bullseye" + + createBody, _ := json.Marshal(gin.H{ + "Name": repoName, + "DefaultDistribution": distribution, + "DefaultComponent": "main", + }) + resp := s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create repo: %s", resp.Body.String())) + + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "local", + "Distribution": distribution, + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Component": "main", "Name": repoName}, + }, + "Signing": gin.H{"Skip": true}, + }) + resp = s.httpRequest(c, "POST", "/api/publish/hrt", publishBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to publish: %s", resp.Body.String())) + + // Create package + packageName := "hrt-libblobbyclient1" + version := "20250926.152427+hrtdeb11" + uploadID := "test-upload-1" + + s.createDebPackage(c, uploadID, packageName, version) + + // Add package + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoName, uploadID), nil) + c.Assert(resp.Code, Equals, 200, Commentf("Failed to add package: %s", resp.Body.String())) + + // Update publish + updateBody, _ := json.Marshal(gin.H{ + "Signing": gin.H{"Skip": true}, + "ForceOverwrite": true, + }) + resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/hrt/%s", distribution), updateBody) + c.Assert(resp.Code, Equals, 200, Commentf("Failed to update publish: %s", resp.Body.String())) + + // Now check if the file is actually accessible in the published location + publishedStorage := s.context.GetPublishedStorage("") + publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath() + + // Expected file path: hrt/pool/main/h/hrt-libblobbyclient1/hrt-libblobbyclient1_20250926.152427+hrtdeb11_amd64.deb + expectedPath := filepath.Join(publicPath, "hrt", "pool", "main", "h", packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + c.Logf("Checking for published file at: %s", expectedPath) + + fileInfo, err := os.Stat(expectedPath) + fileExists := err == nil + + c.Logf("File exists: %v", fileExists) + if fileExists { + c.Logf("File size: %d bytes", fileInfo.Size()) + } + + // Check metadata + resp = s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repoName), nil) + var packages []string + err = json.Unmarshal(resp.Body.Bytes(), &packages) + c.Assert(err, IsNil) + c.Logf("Packages in metadata: %d", len(packages)) + + // THE BUG: Metadata says package exists, but file is missing from published location + if len(packages) > 0 && !fileExists { + c.Logf("★★★ BUG REPRODUCED! ★★★") + c.Logf("Metadata shows %d package(s) but file is missing at: %s", len(packages), expectedPath) + c.Logf("This is exactly what causes: 404 Not Found [IP: 10.20.72.62 3142]") + + c.Fatal("BUG CONFIRMED: Package in metadata but missing from published directory!") + } + + c.Assert(fileExists, Equals, true, Commentf( + "Published file should exist at %s when package is in metadata", expectedPath)) +} + +// TestConcurrentPublishRace tries to trigger the race with concurrent publishes +func (s *PublishedFileMissingSuite) TestConcurrentPublishRace(c *C) { + c.Log("=== Testing concurrent publish race condition ===") + + const numIterations = 4 + + for iteration := 0; iteration < numIterations; iteration++ { + c.Logf("--- Iteration %d/%d ---", iteration+1, numIterations) + + // Create repo + repoName := fmt.Sprintf("race-repo-%d", iteration) + distribution := fmt.Sprintf("dist-%d", iteration) + + createBody, _ := json.Marshal(gin.H{ + "Name": repoName, + "DefaultDistribution": distribution, + "DefaultComponent": "main", + }) + resp := s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201) + + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "local", + "Distribution": distribution, + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Component": "main", "Name": repoName}, + }, + "Signing": gin.H{"Skip": true}, + }) + resp = s.httpRequest(c, "POST", "/api/publish/concurrent", publishBody) + c.Assert(resp.Code, Equals, 201) + + // Create multiple packages + var wg sync.WaitGroup + numPackages := 5 + + for i := 0; i < numPackages; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + + packageName := fmt.Sprintf("pkg-%d-%d", iteration, idx) + version := "1.0.0" + uploadID := fmt.Sprintf("upload-%d-%d", iteration, idx) + + s.createDebPackage(c, uploadID, packageName, version) + + // Add package + resp := s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoName, uploadID), nil) + c.Logf("Package %d add: %d", idx, resp.Code) + + // Small delay + time.Sleep(time.Duration(5+idx*2) * time.Millisecond) + + // Publish + updateBody, _ := json.Marshal(gin.H{ + "Signing": gin.H{"Skip": true}, + "ForceOverwrite": true, + }) + resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/concurrent/%s", distribution), updateBody) + c.Logf("Publish %d: %d", idx, resp.Code) + }(i) + } + + wg.Wait() + time.Sleep(100 * time.Millisecond) + + // Check all packages + resp = s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repoName), nil) + var packages []string + err := json.Unmarshal(resp.Body.Bytes(), &packages) + c.Assert(err, IsNil) + + // Check published files + publishedStorage := s.context.GetPublishedStorage("") + publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath() + + missingFiles := []string{} + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("pkg-%d-%d", iteration, i) + version := "1.0.0" + + // Calculate pool path + poolSubdir := string(packageName[0]) + expectedPath := filepath.Join(publicPath, "concurrent", "pool", "main", poolSubdir, packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + if _, err := os.Stat(expectedPath); os.IsNotExist(err) { + missingFiles = append(missingFiles, expectedPath) + } + } + + if len(missingFiles) > 0 { + c.Logf("★★★ BUG DETECTED in iteration %d/%d! ★★★", iteration+1, numIterations) + c.Logf("Metadata shows %d packages, but %d files are MISSING:", len(packages), len(missingFiles)) + for i, f := range missingFiles { + c.Logf(" [iter %d] File MISSING %d/%d: %s", iteration+1, i+1, len(missingFiles), f) + } + + c.Fatalf("BUG REPRODUCED in iteration %d/%d: %d published files missing", iteration+1, numIterations, len(missingFiles)) + } else { + c.Logf("[iter %d/%d] All %d files present - OK", iteration+1, numIterations, numPackages) + } + } + + c.Logf("All %d iterations passed - bug not reproduced with current timing", numIterations) +} + +// TestIdenticalPackageRace tests the specific case of identical SHA256 packages +func (s *PublishedFileMissingSuite) TestIdenticalPackageRace(c *C) { + c.Log("=== AGGRESSIVE test: identical package (same SHA256) race ===") + + const numIterations = 4 + packageName := "shared-package" + + for iter := 0; iter < numIterations; iter++ { + c.Logf("Iteration %d/%d", iter+1, numIterations) + + // Create two repos that will get the SAME package (unique per iteration) + repos := []string{fmt.Sprintf("identical-a-%d", iter), fmt.Sprintf("identical-b-%d", iter)} + dists := []string{fmt.Sprintf("dist-a-%d", iter), fmt.Sprintf("dist-b-%d", iter)} + + for i := range repos { + createBody, _ := json.Marshal(gin.H{ + "Name": repos[i], + "DefaultDistribution": dists[i], + "DefaultComponent": "main", + }) + resp := s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201) + + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "local", + "Distribution": dists[i], + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Component": "main", "Name": repos[i]}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + }) + resp = s.httpRequest(c, "POST", "/api/publish/identical", publishBody) + c.Assert(resp.Code, Equals, 201) + } + + // Create IDENTICAL package file with UNIQUE VERSION per iteration + version := fmt.Sprintf("1.0.%d", iter) + uploadID1 := fmt.Sprintf("identical-upload-1-%d", iter) + uploadID2 := fmt.Sprintf("identical-upload-2-%d", iter) + + s.createDebPackage(c, uploadID1, packageName, version) + + // Copy to second upload (same SHA256) + uploadPath := s.context.UploadPath() + src := filepath.Join(uploadPath, uploadID1, fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + destDir := filepath.Join(uploadPath, uploadID2) + err := os.MkdirAll(destDir, 0755) + c.Assert(err, IsNil) + dest := filepath.Join(destDir, fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + srcData, readErr := os.ReadFile(src) + c.Assert(readErr, IsNil) + err = os.WriteFile(dest, srcData, 0644) + c.Assert(err, IsNil) + + // Race: add and publish both simultaneously + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + //time.Sleep(5 * time.Millisecond) + c.Logf("[iter %d] Import A", iter) + resp := s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[0], uploadID1), nil) + c.Logf("[iter %d] Import A complete: %d", iter, resp.Code) + + updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true}) + c.Logf("[iter %d] Publish A", iter) + resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[0]), updateBody) + c.Logf("[iter %d] Publish A complete: %d", iter, resp.Code) + }() + + go func() { + defer wg.Done() + //time.Sleep(7 * time.Millisecond) + c.Logf("[iter %d] Import B", iter) + resp := s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repos[1], uploadID2), nil) + c.Logf("[iter %d] Import B complete: %d", iter, resp.Code) + + updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true}) + c.Logf("[iter %d] Publish B", iter) + resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[1]), updateBody) + c.Logf("[iter %d] Publish B complete: %d", iter, resp.Code) + }() + + //go func() { + //defer wg.Done() + //time.Sleep(15 * time.Millisecond) + //updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true}) + //c.Logf("[iter %d] Publish A", iter) + //resp := s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[0]), updateBody) + //c.Logf("[iter %d] Publish A complete: %d", iter, resp.Code) + //}() + + //go func() { + //defer wg.Done() + //time.Sleep(18 * time.Millisecond) + //updateBody, _ := json.Marshal(gin.H{"Signing": gin.H{"Skip": true}, "ForceOverwrite": true, "SkipBz2": true}) + //c.Logf("[iter %d] Publish B", iter) + //resp := s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/identical/%s", dists[1]), updateBody) + //c.Logf("[iter %d] Publish B complete: %d", iter, resp.Code) + //}() + + wg.Wait() + time.Sleep(200 * time.Millisecond) + c.Logf("[iter %d] All operations complete", iter) + + // Check the shared pool location + publishedStorage := s.context.GetPublishedStorage("") + publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath() + + poolSubdir := string(packageName[0]) + sharedPoolPath := filepath.Join(publicPath, "identical", "pool", "main", poolSubdir, packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + fileInfo, err := os.Stat(sharedPoolPath) + fileExists := err == nil + + if fileExists { + c.Logf("[iter %d] File EXISTS at %s (size: %d)", iter, sharedPoolPath, fileInfo.Size()) + } else { + c.Logf("[iter %d] File MISSING at %s (error: %v)", iter, sharedPoolPath, err) + } + + // Check metadata + var packagesA, packagesB []string + resp := s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repos[0]), nil) + err = json.Unmarshal(resp.Body.Bytes(), &packagesA) + c.Assert(err, IsNil) + resp = s.httpRequest(c, "GET", fmt.Sprintf("/api/repos/%s/packages", repos[1]), nil) + err = json.Unmarshal(resp.Body.Bytes(), &packagesB) + c.Assert(err, IsNil) + + c.Logf("[iter %d] Packages in metadata: A=%d, B=%d", iter, len(packagesA), len(packagesB)) + + // THE BUG: Both repos show packages in metadata, but the shared pool file is missing + if (len(packagesA) > 0 || len(packagesB) > 0) && !fileExists { + c.Logf("★★★ BUG REPRODUCED in iteration %d! ★★★", iter+1) + c.Logf("Packages in metadata A: %d, B: %d", len(packagesA), len(packagesB)) + c.Logf("Shared pool file exists: %v", fileExists) + c.Logf("Pool path: %s", sharedPoolPath) + + // List what files ARE in the pool directory + poolDir := filepath.Dir(sharedPoolPath) + if entries, err := os.ReadDir(poolDir); err == nil { + c.Logf("Files in pool directory %s:", poolDir) + for _, entry := range entries { + c.Logf(" - %s", entry.Name()) + } + } + + c.Fatalf("Metadata shows packages but shared pool file is missing (iteration %d)", iter+1) + } + } + + c.Logf("All %d iterations passed - bug not reproduced", numIterations) +} + +// TestConcurrentSnapshotPublishToSamePrefix reproduces the EXACT production bug: +// Multiple snapshots are published concurrently to the SAME prefix but different distributions. +// Example from production logs: +// - trixie-pgdg published to "external/postgres-auto/trixie" +// - bullseye-pgdg published to "external/postgres-auto/bullseye" +// Both share the same pool directory, causing cleanup race conditions. +func (s *PublishedFileMissingSuite) TestConcurrentSnapshotPublishToSamePrefix(c *C) { + const numIterations = 4 + + for iter := 0; iter < numIterations; iter++ { + c.Logf("--- Iteration %d/%d ---", iter+1, numIterations) + + // Create two repos with different packages (simulating trixie-pgdg and bullseye-pgdg) + repoTrixie := fmt.Sprintf("trixie-pgdg-%d", iter) + repoBullseye := fmt.Sprintf("bullseye-pgdg-%d", iter) + + // Create trixie repo + createBody, _ := json.Marshal(gin.H{ + "Name": repoTrixie, + "DefaultDistribution": "trixie", + "DefaultComponent": "main", + }) + resp := s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create trixie repo")) + + // Create bullseye repo + createBody, _ = json.Marshal(gin.H{ + "Name": repoBullseye, + "DefaultDistribution": "bullseye", + "DefaultComponent": "main", + }) + resp = s.httpRequest(c, "POST", "/api/repos", createBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create bullseye repo")) + + // Add packages to both repos + numPackages := 3 + + // Add packages to trixie repo + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("postgresql-17-trixie-pkg%d", i) + version := fmt.Sprintf("17.0.%d", iter) + uploadID := fmt.Sprintf("trixie-upload-%d-%d", iter, i) + + s.createDebPackage(c, uploadID, packageName, version) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoTrixie, uploadID), nil) + c.Assert(resp.Code, Equals, 200, Commentf("Failed to add package to trixie")) + } + + // Add packages to bullseye repo + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("postgresql-17-bullseye-pkg%d", i) + version := fmt.Sprintf("17.0.%d", iter) + uploadID := fmt.Sprintf("bullseye-upload-%d-%d", iter, i) + + s.createDebPackage(c, uploadID, packageName, version) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/file/%s?noRemove=0", repoBullseye, uploadID), nil) + c.Assert(resp.Code, Equals, 200, Commentf("Failed to add package to bullseye")) + } + + // Create snapshots from both repos + snapshotTrixie := fmt.Sprintf("%s-snap", repoTrixie) + snapshotBullseye := fmt.Sprintf("%s-snap", repoBullseye) + + createSnapshotBody, _ := json.Marshal(gin.H{"Name": snapshotTrixie}) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/snapshots", repoTrixie), createSnapshotBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create trixie snapshot")) + + createSnapshotBody, _ = json.Marshal(gin.H{"Name": snapshotBullseye}) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/repos/%s/snapshots", repoBullseye), createSnapshotBody) + c.Assert(resp.Code, Equals, 201, Commentf("Failed to create bullseye snapshot")) + + // Publish both snapshots CONCURRENTLY to the SAME prefix + // This mimics production where both are published to "external/postgres-auto" + // Use the SAME prefix across all iterations to trigger the race more aggressively + sharedPrefix := "postgres-auto" + + var wg sync.WaitGroup + var trixiePublishCode, bullseyePublishCode int + + wg.Add(2) + + // Publish or update trixie snapshot + go func() { + defer wg.Done() + + var resp *httptest.ResponseRecorder + if iter == 0 { + // First iteration: CREATE + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "snapshot", + "Distribution": "trixie", + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Name": snapshotTrixie}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + "ForceOverwrite": true, + "SkipCleanup": false, // Force cleanup to run + }) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/publish/%s", sharedPrefix), publishBody) + } else { + // Subsequent iterations: UPDATE (this is what happens in production) + updateBody, _ := json.Marshal(gin.H{ + "Snapshots": []gin.H{ + {"Component": "main", "Name": snapshotTrixie}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + "ForceOverwrite": true, + "SkipCleanup": false, + }) + resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/%s/trixie", sharedPrefix), updateBody) + } + trixiePublishCode = resp.Code + c.Logf("[iter %d] Trixie publish/update completed: %d", iter, resp.Code) + }() + + // Publish or update bullseye snapshot + go func() { + defer wg.Done() + + var resp *httptest.ResponseRecorder + if iter == 0 { + // First iteration: CREATE + publishBody, _ := json.Marshal(gin.H{ + "SourceKind": "snapshot", + "Distribution": "bullseye", + "Architectures": []string{"amd64"}, + "Sources": []gin.H{ + {"Name": snapshotBullseye}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + "ForceOverwrite": true, + "SkipCleanup": false, + }) + resp = s.httpRequest(c, "POST", fmt.Sprintf("/api/publish/%s", sharedPrefix), publishBody) + } else { + // Subsequent iterations: UPDATE + updateBody, _ := json.Marshal(gin.H{ + "Snapshots": []gin.H{ + {"Component": "main", "Name": snapshotBullseye}, + }, + "Signing": gin.H{"Skip": true}, + "SkipBz2": true, + "ForceOverwrite": true, + "SkipCleanup": false, + }) + resp = s.httpRequest(c, "PUT", fmt.Sprintf("/api/publish/%s/bullseye", sharedPrefix), updateBody) + } + bullseyePublishCode = resp.Code + c.Logf("[iter %d] Bullseye publish/update completed: %d", iter, resp.Code) + }() + + wg.Wait() + time.Sleep(50 * time.Millisecond) + + // Verify publishes succeeded (201 for create, 200 for update) + expectedCode := 201 + if iter > 0 { + expectedCode = 200 + } + c.Assert(trixiePublishCode, Equals, expectedCode, Commentf("Trixie publish/update should succeed")) + c.Assert(bullseyePublishCode, Equals, expectedCode, Commentf("Bullseye publish/update should succeed")) + + // Verify ALL package files exist in the published pool + publishedStorage := s.context.GetPublishedStorage("") + publicPath := publishedStorage.(aptly.FileSystemPublishedStorage).PublicPath() + + missingFiles := []string{} + expectedFiles := []string{} + + // Check trixie packages + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("postgresql-17-trixie-pkg%d", i) + version := fmt.Sprintf("17.0.%d", iter) + + poolSubdir := string(packageName[0]) + expectedPath := filepath.Join(publicPath, sharedPrefix, "pool", "main", poolSubdir, packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + expectedFiles = append(expectedFiles, expectedPath) + if _, err := os.Stat(expectedPath); os.IsNotExist(err) { + missingFiles = append(missingFiles, fmt.Sprintf("TRIXIE: %s", filepath.Base(expectedPath))) + } + } + + // Check bullseye packages + for i := 0; i < numPackages; i++ { + packageName := fmt.Sprintf("postgresql-17-bullseye-pkg%d", i) + version := fmt.Sprintf("17.0.%d", iter) + + poolSubdir := string(packageName[0]) + expectedPath := filepath.Join(publicPath, sharedPrefix, "pool", "main", poolSubdir, packageName, + fmt.Sprintf("%s_%s_amd64.deb", packageName, version)) + + expectedFiles = append(expectedFiles, expectedPath) + if _, err := os.Stat(expectedPath); os.IsNotExist(err) { + missingFiles = append(missingFiles, fmt.Sprintf("BULLSEYE: %s", filepath.Base(expectedPath))) + } + } + + // BUG: Files from one distribution are deleted by the other's cleanup + if len(missingFiles) > 0 { + c.Logf("★★★ BUG REPRODUCED in iteration %d/%d! ★★★", iter+1, numIterations) + c.Logf("Both publishes to prefix '%s' succeeded, but %d files are MISSING:", sharedPrefix, len(missingFiles)) + for i, f := range missingFiles { + c.Logf(" Missing file %d/%d: %s", i+1, len(missingFiles), f) + } + + c.Logf("\nThis reproduces the exact production bug where:") + c.Logf(" 1. Mirror updates complete successfully") + c.Logf(" 2. Snapshots are created") + c.Logf(" 3. Both snapshots publish to same prefix (different distributions)") + c.Logf(" 4. Cleanup from one publish DELETES files from the other") + c.Logf(" 5. Result: apt-get returns 404 when downloading packages") + + // List what's actually in the pool + poolDir := filepath.Join(publicPath, sharedPrefix, "pool", "main") + if entries, err := os.ReadDir(poolDir); err == nil { + c.Logf("\nActual pool directory contents (%s):", poolDir) + for _, entry := range entries { + c.Logf(" - %s/", entry.Name()) + } + } + + c.Fatalf("BUG CONFIRMED (iteration %d/%d): %d files missing from shared pool", + iter+1, numIterations, len(missingFiles)) + } else { + c.Logf("[iter %d/%d] All %d files present - OK", iter+1, numIterations, len(expectedFiles)) + } + } + c.Logf("✓ All %d iterations passed - no files missing", numIterations) +} diff --git a/deb/publish.go b/deb/publish.go index cc4c2a0f..dbaa81a7 100644 --- a/deb/publish.go +++ b/deb/publish.go @@ -612,6 +612,15 @@ func (p *PublishedRepo) Key() []byte { return []byte("U" + p.StoragePrefix() + ">>" + p.Distribution) } +// PrefixPoolLockKey returns the task-queue resource key that serialises all +// publish operations sharing the same pool directory under storagePrefix. +// It must be held whenever a non-MultiDist publish may read or clean the +// shared pool, to prevent concurrent cleanup runs from deleting each other's +// files. See docs/Resource-Locking.md for the full key-namespace table. +func PrefixPoolLockKey(storagePrefix string) string { + return "P" + storagePrefix +} + // RefKey is a unique id for package reference list func (p *PublishedRepo) RefKey(component string) []byte { return []byte("E" + p.UUID + component) diff --git a/deb/publish_test.go b/deb/publish_test.go index 125397b2..933f9ef6 100644 --- a/deb/publish_test.go +++ b/deb/publish_test.go @@ -873,7 +873,10 @@ func (s *PublishedRepoCollectionSuite) TestListReferencedFiles(c *C) { snap3 := NewSnapshotFromRefList("snap3", []*Snapshot{}, s.snap2.RefList(), "desc3") _ = s.snapshotCollection.Add(snap3) - // Ensure that adding a second publish point with matching files doesn't give duplicate results. + // When a second publish point references the same package (snap3 is a clone of snap2, + // both containing p3/lonely-strangers), listReferencedFilesByComponent deduplicates by + // package ref so the file appears only once. StrSlicesSubstract handles a single entry + // correctly, so no duplicate is needed for cleanup safety. repo3, err := NewPublishedRepo("", "", "anaconda-2", []string{}, []string{"main"}, []interface{}{snap3}, s.factory, false) c.Check(err, IsNil) c.Check(s.collection.Add(repo3), IsNil) @@ -888,7 +891,9 @@ func (s *PublishedRepoCollectionSuite) TestListReferencedFiles(c *C) { "a/alien-arena/alien-arena-common_7.40-2_i386.deb", "a/alien-arena/mars-invaders_7.40-2_i386.deb", }, - "main": {"a/alien-arena/lonely-strangers_7.40-2_i386.deb"}, + "main": { + "a/alien-arena/lonely-strangers_7.40-2_i386.deb", + }, }) } diff --git a/files/linkfrompool_concurrency_test.go b/files/linkfrompool_concurrency_test.go deleted file mode 100644 index 0acbab3f..00000000 --- a/files/linkfrompool_concurrency_test.go +++ /dev/null @@ -1,283 +0,0 @@ -package files - -import ( - "fmt" - "os" - "path/filepath" - "sync" - "time" - - "github.com/aptly-dev/aptly/aptly" - "github.com/aptly-dev/aptly/utils" - - . "gopkg.in/check.v1" -) - -type LinkFromPoolConcurrencySuite struct { - root string - poolDir string - storage *PublishedStorage - pool *PackagePool - cs aptly.ChecksumStorage - testFile string - testContent []byte - testChecksums utils.ChecksumInfo - srcPoolPath string -} - -var _ = Suite(&LinkFromPoolConcurrencySuite{}) - -func (s *LinkFromPoolConcurrencySuite) SetUpTest(c *C) { - s.root = c.MkDir() - s.poolDir = filepath.Join(s.root, "pool") - publishDir := filepath.Join(s.root, "public") - - // Create package pool and published storage - s.pool = NewPackagePool(s.poolDir, true) - s.storage = NewPublishedStorage(publishDir, "copy", "checksum") - s.cs = NewMockChecksumStorage() - - // Create test file content - s.testContent = []byte("test package content for concurrency testing") - s.testFile = filepath.Join(s.root, "test-package.deb") - - err := os.WriteFile(s.testFile, s.testContent, 0644) - c.Assert(err, IsNil) - - // Calculate checksums - md5sum, err := utils.MD5ChecksumForFile(s.testFile) - c.Assert(err, IsNil) - - s.testChecksums = utils.ChecksumInfo{ - Size: int64(len(s.testContent)), - MD5: md5sum, - } - - // Import the test file into the pool - s.srcPoolPath, err = s.pool.Import(s.testFile, "test-package.deb", &s.testChecksums, false, s.cs) - c.Assert(err, IsNil) -} - -func (s *LinkFromPoolConcurrencySuite) TestLinkFromPoolConcurrency(c *C) { - // Test concurrent LinkFromPool operations to ensure no race conditions - concurrency := 5000 - iterations := 10 - - for iter := 0; iter < iterations; iter++ { - c.Logf("Iteration %d: Testing concurrent LinkFromPool with %d goroutines", iter+1, concurrency) - - destPath := fmt.Sprintf("main/t/test%d", iter) - - var wg sync.WaitGroup - errors := make(chan error, concurrency) - successes := make(chan struct{}, concurrency) - - start := time.Now() - - // Launch concurrent LinkFromPool operations - for i := 0; i < concurrency; i++ { - wg.Add(1) - go func(id int) { - defer wg.Done() - - // Use force=true to test the most vulnerable code path (remove-then-create) - err := s.storage.LinkFromPool( - "", // publishedPrefix - destPath, // publishedRelPath - "test-package.deb", // fileName - s.pool, // sourcePool - s.srcPoolPath, // sourcePath - s.testChecksums, // sourceChecksums - true, // force - this triggers vulnerable remove-then-create pattern - ) - - if err != nil { - errors <- fmt.Errorf("goroutine %d failed: %v", id, err) - } else { - successes <- struct{}{} - } - }(i) - } - - // Wait for completion - wg.Wait() - duration := time.Since(start) - - close(errors) - close(successes) - - // Count results - errorCount := 0 - successCount := 0 - var firstError error - - for err := range errors { - errorCount++ - if firstError == nil { - firstError = err - } - c.Logf("Race condition error: %v", err) - } - - for range successes { - successCount++ - } - - c.Logf("Results: %d successes, %d errors, took %v", successCount, errorCount, duration) - - // Assert no race conditions occurred - if errorCount > 0 { - c.Fatalf("Race condition detected in iteration %d! "+ - "Errors: %d out of %d operations (%.1f%% failure rate). "+ - "First error: %v. "+ - "This indicates the fix is not working properly.", - iter+1, errorCount, concurrency, - float64(errorCount)/float64(concurrency)*100, firstError) - } - - // Verify the final file exists and has correct content - finalFile := filepath.Join(s.storage.rootPath, destPath, "test-package.deb") - _, err := os.Stat(finalFile) - c.Assert(err, IsNil, Commentf("Final file should exist after concurrent operations")) - - content, err := os.ReadFile(finalFile) - c.Assert(err, IsNil, Commentf("Should be able to read final file")) - c.Assert(content, DeepEquals, s.testContent, Commentf("File content should be intact after concurrent operations")) - - c.Logf("✓ Iteration %d: No race conditions detected", iter+1) - } - - c.Logf("SUCCESS: Handled %d total concurrent operations across %d iterations with no race conditions", - concurrency*iterations, iterations) -} - -func (s *LinkFromPoolConcurrencySuite) TestLinkFromPoolConcurrencyDifferentFiles(c *C) { - // Test concurrent operations on different files to ensure no blocking - concurrency := 10 - - var wg sync.WaitGroup - errors := make(chan error, concurrency) - - start := time.Now() - - // Launch concurrent operations on different destination files - for i := 0; i < concurrency; i++ { - wg.Add(1) - go func(id int) { - defer wg.Done() - - destPath := fmt.Sprintf("main/t/test-file-%d", id) - - err := s.storage.LinkFromPool( - "", // publishedPrefix - destPath, // publishedRelPath - "test-package.deb", // fileName - s.pool, // sourcePool - s.srcPoolPath, // sourcePath - s.testChecksums, // sourceChecksums - false, // force - ) - - if err != nil { - errors <- fmt.Errorf("goroutine %d failed: %v", id, err) - } - }(i) - } - - // Wait for completion - wg.Wait() - duration := time.Since(start) - - close(errors) - - // Count errors - errorCount := 0 - for err := range errors { - errorCount++ - c.Logf("Error: %v", err) - } - - c.Assert(errorCount, Equals, 0, Commentf("No errors should occur when linking to different files")) - c.Logf("SUCCESS: %d concurrent operations on different files completed in %v", concurrency, duration) - - // Verify all files were created correctly - for i := 0; i < concurrency; i++ { - finalFile := filepath.Join(s.storage.rootPath, fmt.Sprintf("main/t/test-file-%d", i), "test-package.deb") - _, err := os.Stat(finalFile) - c.Assert(err, IsNil, Commentf("File %d should exist", i)) - - content, err := os.ReadFile(finalFile) - c.Assert(err, IsNil, Commentf("Should be able to read file %d", i)) - c.Assert(content, DeepEquals, s.testContent, Commentf("File %d content should be correct", i)) - } -} - -func (s *LinkFromPoolConcurrencySuite) TestLinkFromPoolWithoutForceNoConcurrencyIssues(c *C) { - // Test that when force=false, concurrent operations fail gracefully without corruption - concurrency := 20 - destPath := "main/t/single-dest" - - var wg sync.WaitGroup - errors := make(chan error, concurrency) - successes := make(chan struct{}, concurrency) - - // First, create the file so subsequent operations will conflict - err := s.storage.LinkFromPool("", destPath, "test-package.deb", s.pool, s.srcPoolPath, s.testChecksums, false) - c.Assert(err, IsNil) - - start := time.Now() - - // Launch concurrent operations that should mostly fail - for i := 0; i < concurrency; i++ { - wg.Add(1) - go func(id int) { - defer wg.Done() - - err := s.storage.LinkFromPool( - "", // publishedPrefix - destPath, // publishedRelPath - "test-package.deb", // fileName - s.pool, // sourcePool - s.srcPoolPath, // sourcePath - s.testChecksums, // sourceChecksums - false, // force=false - should fail if file exists and is same - ) - - if err != nil { - errors <- err - } else { - successes <- struct{}{} - } - }(i) - } - - // Wait for completion - wg.Wait() - duration := time.Since(start) - - close(errors) - close(successes) - - errorCount := 0 - successCount := 0 - - for range errors { - errorCount++ - } - - for range successes { - successCount++ - } - - c.Logf("Results with force=false: %d successes, %d errors, took %v", successCount, errorCount, duration) - - // With force=false and identical files, operations should succeed (file already exists with same content) - // No race conditions should cause crashes or corruption - c.Assert(errorCount, Equals, 0, Commentf("With identical files and force=false, operations should succeed")) - - // Verify the file still exists and has correct content - finalFile := filepath.Join(s.storage.rootPath, destPath, "test-package.deb") - content, err := os.ReadFile(finalFile) - c.Assert(err, IsNil) - c.Assert(content, DeepEquals, s.testContent, Commentf("File should not be corrupted by concurrent access")) -} diff --git a/files/public.go b/files/public.go index a2d3f815..dea35ea8 100644 --- a/files/public.go +++ b/files/public.go @@ -26,26 +26,6 @@ type PublishedStorage struct { verifyMethod uint } -// Global mutex map to prevent concurrent access to the same destinationPath in LinkFromPool -var ( - fileLockMutex sync.Mutex - fileLocks = make(map[string]*sync.Mutex) -) - -// getFileLock returns a mutex for a specific file path to prevent concurrent modifications -func getFileLock(filePath string) *sync.Mutex { - fileLockMutex.Lock() - defer fileLockMutex.Unlock() - - if mutex, exists := fileLocks[filePath]; exists { - return mutex - } - - mutex := &sync.Mutex{} - fileLocks[filePath] = mutex - return mutex -} - // Check interfaces var ( _ aptly.PublishedStorage = (*PublishedStorage)(nil) @@ -172,11 +152,6 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath, poolPath := filepath.Join(storage.rootPath, publishedPrefix, publishedRelPath, filepath.Dir(fileName)) destinationPath := filepath.Join(poolPath, baseName) - // Acquire file-specific lock to prevent concurrent access to the same file - fileLock := getFileLock(destinationPath) - fileLock.Lock() - defer fileLock.Unlock() - var localSourcePool aptly.LocalPackagePool if storage.linkMethod != LinkMethodCopy { pp, ok := sourcePool.(aptly.LocalPackagePool) diff --git a/files/public_test.go b/files/public_test.go index 058bd8c4..9f9198f3 100644 --- a/files/public_test.go +++ b/files/public_test.go @@ -632,16 +632,6 @@ func (s *DiskFullNoRootSuite) TestLinkFromPoolCopySyncErrorIsReturned(c *C) { c.Check(strings.Contains(err.Error(), "error syncing file"), Equals, true) } -func (s *DiskFullNoRootSuite) TestGetFileLockReusesMutex(c *C) { - a := getFileLock(filepath.Join(s.root, "a")) - b := getFileLock(filepath.Join(s.root, "a")) - c.Check(a == b, Equals, true) - - c1 := getFileLock(filepath.Join(s.root, "c1")) - c2 := getFileLock(filepath.Join(s.root, "c2")) - c.Check(c1 == c2, Equals, false) -} - func (s *DiskFullNoRootSuite) TestPutFileFailsIfDestinationDirMissing(c *C) { storage := NewPublishedStorage(s.root, "", "")