Fix S3 published storage to use new PackagePool interface

Change PackagePool to return Seeker interface from Open call.
This commit is contained in:
Andrey Smirnov
2017-04-06 21:49:35 +03:00
parent 7bad358408
commit f4ff8d957f
4 changed files with 71 additions and 32 deletions

View File

@@ -4,24 +4,46 @@ package aptly
import ( import (
"io" "io"
"os"
"github.com/smira/aptly/utils" "github.com/smira/aptly/utils"
) )
// ReadSeekerCloser = ReadSeeker + Closer
type ReadSeekerCloser interface {
io.ReadSeeker
io.Closer
}
// PackagePool is asbtraction of package pool storage. // PackagePool is asbtraction of package pool storage.
// //
// PackagePool stores all the package files, deduplicating them. // PackagePool stores all the package files, deduplicating them.
type PackagePool interface { type PackagePool interface {
// Path returns full path to package file in pool given any name and hash of file contents // Import copies file into package pool
Path(filename string, checksums utils.ChecksumInfo) (string, error) //
// RelativePath returns path relative to pool's root for package files given checksums and original filename // - srcPath is full path to source file as it is now
RelativePath(filename string, checksums utils.ChecksumInfo) (string, error) // - basename is desired human-readable name (canonical filename)
// - checksums are used to calculate file placement
// - move indicates whether srcPath can be removed
Import(srcPath, basename string, checksums *utils.ChecksumInfo, move bool) (path string, err error)
// LegacyPath returns legacy (pre 1.1) path to package file (relative to root)
LegacyPath(filename string, checksums *utils.ChecksumInfo) (string, error)
// Stat returns Unix stat(2) info
Stat(path string) (os.FileInfo, error)
// Open returns ReadSeekerCloser to access the file
Open(path string) (ReadSeekerCloser, error)
// FilepathList returns file paths of all the files in the pool // FilepathList returns file paths of all the files in the pool
FilepathList(progress Progress) ([]string, error) FilepathList(progress Progress) ([]string, error)
// Remove deletes file in package pool returns its size // Remove deletes file in package pool returns its size
Remove(path string) (size int64, err error) Remove(path string) (size int64, err error)
// Import copies file into package pool }
Import(path string, checksums utils.ChecksumInfo) error
// LocalPackagePool is implemented by PackagePools residing on the same filesystem
type LocalPackagePool interface {
// GenerateTempPath generates temporary path for download (which is fast to import into package pool later on)
GenerateTempPath(filename string) (string, error)
// Link generates hardlink to destination path
Link(path, dstPath string) error
} }
// PublishedStorage is abstraction of filesystem storing all published repositories // PublishedStorage is abstraction of filesystem storing all published repositories

View File

@@ -240,7 +240,7 @@ func (pool *PackagePool) Import(srcPath, basename string, checksums *utils.Check
} }
// Open returns io.ReadCloser to access the file // Open returns io.ReadCloser to access the file
func (pool *PackagePool) Open(path string) (io.ReadCloser, error) { func (pool *PackagePool) Open(path string) (aptly.ReadSeekerCloser, error) {
return os.Open(filepath.Join(pool.rootPath, path)) return os.Open(filepath.Join(pool.rootPath, path))
} }

View File

@@ -2,6 +2,7 @@ package s3
import ( import (
"fmt" "fmt"
"io"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
@@ -12,8 +13,8 @@ import (
"github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3"
"github.com/pkg/errors"
"github.com/smira/aptly/aptly" "github.com/smira/aptly/aptly"
"github.com/smira/aptly/files"
"github.com/smira/aptly/utils" "github.com/smira/aptly/utils"
"github.com/smira/go-aws-auth" "github.com/smira/go-aws-auth"
) )
@@ -135,6 +136,17 @@ func (storage *PublishedStorage) PutFile(path string, sourceFilename string) err
} }
defer source.Close() defer source.Close()
err = storage.putFile(path, source)
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("error uploading %s to %s", sourceFilename, storage))
}
return err
}
// putFile uploads file-like object to
func (storage *PublishedStorage) putFile(path string, source io.ReadSeeker) error {
params := &s3.PutObjectInput{ params := &s3.PutObjectInput{
Bucket: aws.String(storage.bucket), Bucket: aws.String(storage.bucket),
Key: aws.String(filepath.Join(storage.prefix, path)), Key: aws.String(filepath.Join(storage.prefix, path)),
@@ -148,13 +160,18 @@ func (storage *PublishedStorage) PutFile(path string, sourceFilename string) err
params.ServerSideEncryption = aws.String(storage.encryptionMethod) params.ServerSideEncryption = aws.String(storage.encryptionMethod)
} }
_, err = storage.s3.PutObject(params) _, err := storage.s3.PutObject(params)
if err != nil { if err != nil {
return fmt.Errorf("error uploading %s to %s: %s", sourceFilename, storage, err) return err
} }
if storage.plusWorkaround && strings.Index(path, "+") != -1 { if storage.plusWorkaround && strings.Index(path, "+") != -1 {
return storage.PutFile(strings.Replace(path, "+", " ", -1), sourceFilename) _, err = source.Seek(0, 0)
if err != nil {
return err
}
return storage.putFile(strings.Replace(path, "+", " ", -1), source)
} }
return nil return nil
} }
@@ -167,7 +184,7 @@ func (storage *PublishedStorage) Remove(path string) error {
} }
_, err := storage.s3.DeleteObject(params) _, err := storage.s3.DeleteObject(params)
if err != nil { if err != nil {
return fmt.Errorf("error deleting %s from %s: %s", path, storage, err) return errors.Wrap(err, fmt.Sprintf("error deleting %s from %s", path, storage))
} }
if storage.plusWorkaround && strings.Index(path, "+") != -1 { if storage.plusWorkaround && strings.Index(path, "+") != -1 {
@@ -242,21 +259,15 @@ func (storage *PublishedStorage) RemoveDirs(path string, progress aptly.Progress
// LinkFromPool returns relative path for the published file to be included in package index // LinkFromPool returns relative path for the published file to be included in package index
func (storage *PublishedStorage) LinkFromPool(publishedDirectory string, sourcePool aptly.PackagePool, func (storage *PublishedStorage) LinkFromPool(publishedDirectory string, sourcePool aptly.PackagePool,
sourcePath string, sourceChecksums utils.ChecksumInfo, force bool) error { sourcePath string, sourceChecksums utils.ChecksumInfo, force bool) error {
// verify that package pool is local pool in filesystem
_ = sourcePool.(*files.PackagePool)
baseName := filepath.Base(sourcePath) baseName := filepath.Base(sourcePath)
relPath := filepath.Join(publishedDirectory, baseName) relPath := filepath.Join(publishedDirectory, baseName)
poolPath := filepath.Join(storage.prefix, relPath) poolPath := filepath.Join(storage.prefix, relPath)
var (
err error
)
if storage.pathCache == nil { if storage.pathCache == nil {
paths, md5s, err := storage.internalFilelist(storage.prefix, true) paths, md5s, err := storage.internalFilelist(storage.prefix, true)
if err != nil { if err != nil {
return fmt.Errorf("error caching paths under prefix: %s", err) return errors.Wrap(err, "error caching paths under prefix")
} }
storage.pathCache = make(map[string]string, len(paths)) storage.pathCache = make(map[string]string, len(paths))
@@ -284,7 +295,13 @@ func (storage *PublishedStorage) LinkFromPool(publishedDirectory string, sourceP
} }
} }
err = storage.PutFile(relPath, sourcePath) source, err := sourcePool.Open(sourcePath)
if err != nil {
return err
}
defer source.Close()
err = storage.putFile(relPath, source)
if err == nil { if err == nil {
storage.pathCache[relPath] = sourceMD5 storage.pathCache[relPath] = sourceMD5
} }

View File

@@ -3,7 +3,6 @@ package s3
import ( import (
"bytes" "bytes"
"io/ioutil" "io/ioutil"
"os"
"path/filepath" "path/filepath"
. "gopkg.in/check.v1" . "gopkg.in/check.v1"
@@ -219,40 +218,41 @@ func (s *PublishedStorageSuite) TestLinkFromPool(c *C) {
root := c.MkDir() root := c.MkDir()
pool := files.NewPackagePool(root) pool := files.NewPackagePool(root)
sourcePath := filepath.Join(root, "pool/c1/df/mars-invaders_1.03.deb") tmpFile1 := filepath.Join(c.MkDir(), "mars-invaders_1.03.deb")
err := os.MkdirAll(filepath.Dir(sourcePath), 0755) err := ioutil.WriteFile(tmpFile1, []byte("Contents"), 0644)
c.Assert(err, IsNil) c.Assert(err, IsNil)
cksum1 := utils.ChecksumInfo{MD5: "c1df1da7a1ce305a3b60af9d5733ac1d"}
err = ioutil.WriteFile(sourcePath, []byte("Contents"), 0644) tmpFile2 := filepath.Join(c.MkDir(), "mars-invaders_1.03.deb")
err = ioutil.WriteFile(tmpFile2, []byte("Spam"), 0644)
c.Assert(err, IsNil) c.Assert(err, IsNil)
cksum2 := utils.ChecksumInfo{MD5: "e9dfd31cc505d51fc26975250750deab"}
sourcePath2 := filepath.Join(root, "pool/e9/df/mars-invaders_1.03.deb") src1, err := pool.Import(tmpFile1, "mars-invaders_1.03.deb", &cksum1, true)
err = os.MkdirAll(filepath.Dir(sourcePath2), 0755)
c.Assert(err, IsNil) c.Assert(err, IsNil)
src2, err := pool.Import(tmpFile2, "mars-invaders_1.03.deb", &cksum2, true)
err = ioutil.WriteFile(sourcePath2, []byte("Spam"), 0644)
c.Assert(err, IsNil) c.Assert(err, IsNil)
// first link from pool // first link from pool
err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), pool, sourcePath, utils.ChecksumInfo{MD5: "c1df1da7a1ce305a3b60af9d5733ac1d"}, false) err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), pool, src1, cksum1, false)
c.Check(err, IsNil) c.Check(err, IsNil)
c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents")) c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents"))
// duplicate link from pool // duplicate link from pool
err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), pool, sourcePath, utils.ChecksumInfo{MD5: "c1df1da7a1ce305a3b60af9d5733ac1d"}, false) err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), pool, src1, cksum1, false)
c.Check(err, IsNil) c.Check(err, IsNil)
c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents")) c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents"))
// link from pool with conflict // link from pool with conflict
err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), pool, sourcePath2, utils.ChecksumInfo{MD5: "e9dfd31cc505d51fc26975250750deab"}, false) err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), pool, src2, cksum2, false)
c.Check(err, ErrorMatches, ".*file already exists and is different.*") c.Check(err, ErrorMatches, ".*file already exists and is different.*")
c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents")) c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents"))
// link from pool with conflict and force // link from pool with conflict and force
err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), pool, sourcePath2, utils.ChecksumInfo{MD5: "e9dfd31cc505d51fc26975250750deab"}, true) err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), pool, src2, cksum2, true)
c.Check(err, IsNil) c.Check(err, IsNil)
c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Spam")) c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Spam"))