diff --git a/AUTHORS b/AUTHORS index 3d7f29ae..b76f85c1 100644 --- a/AUTHORS +++ b/AUTHORS @@ -19,3 +19,4 @@ List of contributors, in chronological order: * Paul Krohn (https://github.com/paul-krohn) * Vincent Bernat (https://github.com/vincentbernat) * x539 (https://github.com/x539) +* Phil Frost (https://github.com/bitglue) \ No newline at end of file diff --git a/Gomfile b/Gomfile index 084b4769..7c89aa3d 100644 --- a/Gomfile +++ b/Gomfile @@ -1,12 +1,14 @@ gom 'github.com/AlekSi/pointer', :commit => '5f6d527dae3d678b46fbb20331ddf44e2b841943' gom 'github.com/awalterschulze/gographviz', :commit => '20d1f693416d9be045340150094aa42035a41c9e' +gom 'github.com/aws/aws-sdk-go', :commit => 'a170e9cb76475a0da7c0326a13cc2b39e9244b3b' gom 'github.com/cheggaaa/pb', :commit => '2c1b74620cc58a81ac152ee2d322e28c806d81ed' gom 'github.com/DisposaBoy/JsonConfigReader', :commit => '33a99fdf1d5ee1f79b5077e9c06f955ad356d5f4' gom 'github.com/gin-gonic/gin', :commit => 'b1758d3bfa09e61ddbc1c9a627e936eec6a170de' +gom 'github.com/go-ini/ini', :commit => 'afbd495e5aaea13597b5e14fe514ddeaa4d76fc3' gom 'github.com/jlaffaye/ftp', :commit => 'fec71e62e457557fbe85cefc847a048d57815d76' +gom 'github.com/jmespath/go-jmespath', :commit => '0b12d6b521d83fc7f755e7cfc1b1fbdd35a01a74' gom 'github.com/julienschmidt/httprouter', :commit => '46807412fe50aaceb73bb57061c2230fd26a1640' gom 'github.com/mattn/go-shellwords', :commit => 'c7ca6f94add751566a61cf2199e1de78d4c3eee4' -gom 'github.com/mitchellh/goamz/s3', :commit => 'caaaea8b30ee15616494ee68abd5d8ebbbef05cf' gom 'github.com/mkrautz/goar', :commit => '282caa8bd9daba480b51f1d5a988714913b97aad' gom 'github.com/mxk/go-flowrate/flowrate', :commit => 'cca7078d478f8520f85629ad7c68962d31ed7682' gom 'github.com/ncw/swift', :commit => '384ef27c70645e285f8bb9d02276bf654d06027e' diff --git a/context/context.go b/context/context.go index 609858db..75b7a7c4 100644 --- a/context/context.go +++ b/context/context.go @@ -321,7 +321,8 @@ func (context *AptlyContext) GetPublishedStorage(name string) aptly.PublishedSto } var err error - publishedStorage, err = s3.NewPublishedStorage(params.AccessKeyID, params.SecretAccessKey, + publishedStorage, err = s3.NewPublishedStorage( + params.AccessKeyID, params.SecretAccessKey, params.SessionToken, params.Region, params.Endpoint, params.Bucket, params.ACL, params.Prefix, params.StorageClass, params.EncryptionMethod, params.PlusWorkaround, params.DisableMultiDel) if err != nil { diff --git a/man/aptly.1 b/man/aptly.1 index d7992777..f7926322 100644 --- a/man/aptly.1 +++ b/man/aptly.1 @@ -55,6 +55,7 @@ Configuration file is stored in JSON format (default values shown below): "endpoint": "", "awsAccessKeyID": "", "awsSecretAccessKey": "", + "awsSessionToken": "", "prefix": "", "acl": "public\-read", "storageClass": "", diff --git a/s3/public.go b/s3/public.go index 4482ce15..16b0aaa0 100644 --- a/s3/public.go +++ b/s3/public.go @@ -2,11 +2,12 @@ package s3 import ( "fmt" - "github.com/mitchellh/goamz/aws" - "github.com/mitchellh/goamz/s3" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" "github.com/smira/aptly/aptly" "github.com/smira/aptly/files" - "net/http" "os" "path/filepath" "strings" @@ -15,8 +16,9 @@ import ( // PublishedStorage abstract file system with published files (actually hosted on S3) type PublishedStorage struct { s3 *s3.S3 - bucket *s3.Bucket - acl s3.ACL + config *aws.Config + bucket string + acl string prefix string storageClass string encryptionMethod string @@ -31,8 +33,11 @@ var ( ) // NewPublishedStorageRaw creates published storage from raw aws credentials -func NewPublishedStorageRaw(auth aws.Auth, region aws.Region, bucket, defaultACL, prefix, - storageClass, encryptionMethod string, plusWorkaround, disabledMultiDel bool) (*PublishedStorage, error) { +func NewPublishedStorageRaw( + bucket, defaultACL, prefix, storageClass, encryptionMethod string, + plusWorkaround, disabledMultiDel bool, + config *aws.Config, +) (*PublishedStorage, error) { if defaultACL == "" { defaultACL = "private" } @@ -41,9 +46,13 @@ func NewPublishedStorageRaw(auth aws.Auth, region aws.Region, bucket, defaultACL storageClass = "" } + sess := session.New(config) + result := &PublishedStorage{ - s3: s3.New(auth, region), - acl: s3.ACL(defaultACL), + s3: s3.New(sess), + bucket: bucket, + config: config, + acl: defaultACL, prefix: prefix, storageClass: storageClass, encryptionMethod: encryptionMethod, @@ -51,48 +60,34 @@ func NewPublishedStorageRaw(auth aws.Auth, region aws.Region, bucket, defaultACL disableMultiDel: disabledMultiDel, } - result.s3.HTTPClient = func() *http.Client { - return RetryingClient - } - result.bucket = result.s3.Bucket(bucket) - return result, nil } // NewPublishedStorage creates new instance of PublishedStorage with specified S3 access // keys, region and bucket name -func NewPublishedStorage(accessKey, secretKey, region, endpoint, bucket, defaultACL, prefix, +func NewPublishedStorage(accessKey, secretKey, sessionToken, region, endpoint, bucket, defaultACL, prefix, storageClass, encryptionMethod string, plusWorkaround, disableMultiDel bool) (*PublishedStorage, error) { - auth, err := aws.GetAuth(accessKey, secretKey) - if err != nil { - return nil, err + + config := &aws.Config{ + HTTPClient: RetryingClient, + Region: aws.String(region), } - var awsRegion aws.Region - - if endpoint == "" { - var ok bool - - awsRegion, ok = aws.Regions[region] - if !ok { - return nil, fmt.Errorf("unknown region: %#v", region) - } - } else { - awsRegion = aws.Region{ - Name: region, - S3Endpoint: endpoint, - S3LocationConstraint: true, - S3LowercaseBucket: true, - } + if endpoint != "" { + config = config.WithEndpoint(endpoint).WithS3ForcePathStyle(true) } - return NewPublishedStorageRaw(auth, awsRegion, bucket, defaultACL, prefix, storageClass, encryptionMethod, - plusWorkaround, disableMultiDel) + if accessKey != "" { + config.Credentials = credentials.NewStaticCredentials(accessKey, secretKey, sessionToken) + } + + return NewPublishedStorageRaw(bucket, defaultACL, prefix, storageClass, + encryptionMethod, plusWorkaround, disableMultiDel, config) } // String func (storage *PublishedStorage) String() string { - return fmt.Sprintf("S3: %s:%s/%s", storage.s3.Region.Name, storage.bucket.Name, storage.prefix) + return fmt.Sprintf("S3: %s:%s/%s", *storage.config.Region, storage.bucket, storage.prefix) } // MkDir creates directory recursively under public path @@ -106,7 +101,6 @@ func (storage *PublishedStorage) PutFile(path string, sourceFilename string) err var ( source *os.File err error - fi os.FileInfo ) source, err = os.Open(sourceFilename) if err != nil { @@ -114,22 +108,20 @@ func (storage *PublishedStorage) PutFile(path string, sourceFilename string) err } defer source.Close() - fi, err = source.Stat() - if err != nil { - return err - } - - headers := map[string][]string{ - "Content-Type": {"binary/octet-stream"}, + params := &s3.PutObjectInput{ + Bucket: aws.String(storage.bucket), + Key: aws.String(filepath.Join(storage.prefix, path)), + Body: source, + ACL: aws.String(storage.acl), } if storage.storageClass != "" { - headers["x-amz-storage-class"] = []string{storage.storageClass} + params.StorageClass = aws.String(storage.storageClass) } if storage.encryptionMethod != "" { - headers["x-amz-server-side-encryption"] = []string{storage.encryptionMethod} + params.ServerSideEncryption = aws.String(storage.encryptionMethod) } - err = storage.bucket.PutReaderHeader(filepath.Join(storage.prefix, path), source, fi.Size(), headers, storage.acl) + _, err = storage.s3.PutObject(params) if err != nil { return fmt.Errorf("error uploading %s to %s: %s", sourceFilename, storage, err) } @@ -142,7 +134,11 @@ func (storage *PublishedStorage) PutFile(path string, sourceFilename string) err // Remove removes single file under public path func (storage *PublishedStorage) Remove(path string) error { - err := storage.bucket.Del(filepath.Join(storage.prefix, path)) + params := &s3.DeleteObjectInput{ + Bucket: aws.String(storage.bucket), + Key: aws.String(path), + } + _, err := storage.s3.DeleteObject(params) if err != nil { return fmt.Errorf("error deleting %s from %s: %s", path, storage, err) } @@ -165,7 +161,11 @@ func (storage *PublishedStorage) RemoveDirs(path string, progress aptly.Progress if storage.disableMultiDel { for i := range filelist { - err = storage.bucket.Del(filepath.Join(storage.prefix, path, filelist[i])) + params := &s3.DeleteObjectInput{ + Bucket: aws.String(storage.bucket), + Key: aws.String(filepath.Join(storage.prefix, path, filelist[i])), + } + _, err := storage.s3.DeleteObject(params) if err != nil { return fmt.Errorf("error deleting path %s from %s: %s", filelist[i], storage, err) } @@ -180,13 +180,23 @@ func (storage *PublishedStorage) RemoveDirs(path string, progress aptly.Progress } else { part = filelist[i*page : (i+1)*page] } - paths := make([]string, len(part)) + paths := make([]*s3.ObjectIdentifier, len(part)) for i := range part { - paths[i] = filepath.Join(storage.prefix, path, part[i]) + paths[i] = &s3.ObjectIdentifier{ + Key: aws.String(filepath.Join(storage.prefix, path, part[i])), + } } - err = storage.bucket.MultiDel(paths) + params := &s3.DeleteObjectsInput{ + Bucket: aws.String(storage.bucket), + Delete: &s3.Delete{ + Objects: paths, + Quiet: aws.Bool(true), + }, + } + + _, err := storage.s3.DeleteObjects(params) if err != nil { return fmt.Errorf("error deleting multiple paths from %s: %s", storage, err) } @@ -265,13 +275,19 @@ func (storage *PublishedStorage) internalFilelist(prefix string, hidePlusWorkaro prefix += "/" } for { - contents, err := storage.bucket.List(prefix, "", marker, 1000) + params := &s3.ListObjectsInput{ + Bucket: aws.String(storage.bucket), + Prefix: aws.String(prefix), + MaxKeys: aws.Int64(1000), + } + + contents, err := storage.s3.ListObjects(params) if err != nil { return nil, nil, fmt.Errorf("error listing under prefix %s in %s: %s", prefix, storage, err) } lastKey := "" for _, key := range contents.Contents { - lastKey = key.Key + lastKey = *key.Key if storage.plusWorkaround && hidePlusWorkaround && strings.Index(lastKey, " ") != -1 { // if we use plusWorkaround, we want to hide those duplicates /// from listing @@ -279,14 +295,14 @@ func (storage *PublishedStorage) internalFilelist(prefix string, hidePlusWorkaro } if prefix == "" { - paths = append(paths, key.Key) + paths = append(paths, *key.Key) } else { - paths = append(paths, key.Key[len(prefix):]) + paths = append(paths, (*key.Key)[len(prefix):]) } - md5s = append(md5s, strings.Replace(key.ETag, "\"", "", -1)) + md5s = append(md5s, strings.Replace(*key.ETag, "\"", "", -1)) } - if contents.IsTruncated { - marker = contents.NextMarker + if contents.IsTruncated != nil && *contents.IsTruncated { + marker = *contents.NextMarker if marker == "" { // From the s3 docs: If response does not include the // NextMarker and it is truncated, you can use the value of the @@ -304,7 +320,16 @@ func (storage *PublishedStorage) internalFilelist(prefix string, hidePlusWorkaro // RenameFile renames (moves) file func (storage *PublishedStorage) RenameFile(oldName, newName string) error { - err := storage.bucket.Copy(filepath.Join(storage.prefix, oldName), filepath.Join(storage.prefix, newName), storage.acl) + source := fmt.Sprintf("/%s/%s", storage.bucket, filepath.Join(storage.prefix, oldName)) + + params := &s3.CopyObjectInput{ + Bucket: aws.String(storage.bucket), + CopySource: aws.String(source), + Key: aws.String(filepath.Join(storage.prefix, newName)), + ACL: aws.String(storage.acl), + } + + _, err := storage.s3.CopyObject(params) if err != nil { return fmt.Errorf("error copying %s -> %s in %s: %s", oldName, newName, storage, err) } diff --git a/s3/public_test.go b/s3/public_test.go index f19945b8..1a234768 100644 --- a/s3/public_test.go +++ b/s3/public_test.go @@ -1,18 +1,21 @@ package s3 import ( - "github.com/mitchellh/goamz/aws" - "github.com/mitchellh/goamz/s3/s3test" - "github.com/smira/aptly/files" + "bytes" "io/ioutil" "os" "path/filepath" . "gopkg.in/check.v1" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + + "github.com/smira/aptly/files" ) type PublishedStorageSuite struct { - srv *s3test.Server + srv *Server storage, prefixedStorage *PublishedStorage } @@ -20,18 +23,16 @@ var _ = Suite(&PublishedStorageSuite{}) func (s *PublishedStorageSuite) SetUpTest(c *C) { var err error - s.srv, err = s3test.NewServer(&s3test.Config{}) + s.srv, err = NewServer(&Config{}) c.Assert(err, IsNil) c.Assert(s.srv, NotNil) - auth, _ := aws.GetAuth("aa", "bb") - s.storage, err = NewPublishedStorageRaw(auth, aws.Region{Name: "test-1", S3Endpoint: s.srv.URL(), S3LocationConstraint: true}, "test", "", "", "", "", false, true) + s.storage, err = NewPublishedStorage("aa", "bb", "", "test-1", s.srv.URL(), "test", "", "", "", "", false, true) + c.Assert(err, IsNil) + s.prefixedStorage, err = NewPublishedStorage("aa", "bb", "", "test-1", s.srv.URL(), "test", "", "lala", "", "", false, true) c.Assert(err, IsNil) - s.prefixedStorage, err = NewPublishedStorageRaw(auth, aws.Region{Name: "test-1", S3Endpoint: s.srv.URL(), S3LocationConstraint: true}, "test", "", "lala", "", "", false, true) - c.Assert(err, IsNil) - - err = s.storage.s3.Bucket("test").PutBucket("private") + _, err = s.storage.s3.CreateBucket(&s3.CreateBucketInput{Bucket: aws.String("test")}) c.Assert(err, IsNil) } @@ -39,10 +40,37 @@ func (s *PublishedStorageSuite) TearDownTest(c *C) { s.srv.Quit() } -func (s *PublishedStorageSuite) TestNewPublishedStorage(c *C) { - stor, err := NewPublishedStorage("aa", "bbb", "", "", "", "", "", "", "", false, false) - c.Check(stor, IsNil) - c.Check(err, ErrorMatches, "unknown region: .*") +func (s *PublishedStorageSuite) GetFile(c *C, path string) []byte { + resp, err := s.storage.s3.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(s.storage.bucket), + Key: aws.String(path), + }) + c.Assert(err, IsNil) + + body, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + c.Assert(err, IsNil) + + return body +} + +func (s *PublishedStorageSuite) AssertNoFile(c *C, path string) { + _, err := s.storage.s3.HeadObject(&s3.HeadObjectInput{ + Bucket: aws.String(s.storage.bucket), + Key: aws.String(path), + }) + c.Assert(err, ErrorMatches, ".*\n.*status code: 404.*") +} + +func (s *PublishedStorageSuite) PutFile(c *C, path string, data []byte) { + _, err := s.storage.s3.PutObject(&s3.PutObjectInput{ + Bucket: aws.String(s.storage.bucket), + Key: aws.String(path), + Body: bytes.NewReader(data), + ContentType: aws.String("binary/octet-stream"), + ACL: aws.String("private"), + }) + c.Assert(err, IsNil) } func (s *PublishedStorageSuite) TestPutFile(c *C) { @@ -53,16 +81,12 @@ func (s *PublishedStorageSuite) TestPutFile(c *C) { err = s.storage.PutFile("a/b.txt", filepath.Join(dir, "a")) c.Check(err, IsNil) - data, err := s.storage.bucket.Get("a/b.txt") - c.Check(err, IsNil) - c.Check(data, DeepEquals, []byte("welcome to s3!")) + c.Check(s.GetFile(c, "a/b.txt"), DeepEquals, []byte("welcome to s3!")) err = s.prefixedStorage.PutFile("a/b.txt", filepath.Join(dir, "a")) c.Check(err, IsNil) - data, err = s.storage.bucket.Get("lala/a/b.txt") - c.Check(err, IsNil) - c.Check(data, DeepEquals, []byte("welcome to s3!")) + c.Check(s.GetFile(c, "lala/a/b.txt"), DeepEquals, []byte("welcome to s3!")) } func (s *PublishedStorageSuite) TestPutFilePlusWorkaround(c *C) { @@ -75,20 +99,15 @@ func (s *PublishedStorageSuite) TestPutFilePlusWorkaround(c *C) { err = s.storage.PutFile("a/b+c.txt", filepath.Join(dir, "a")) c.Check(err, IsNil) - data, err := s.storage.bucket.Get("a/b+c.txt") - c.Check(err, IsNil) - c.Check(data, DeepEquals, []byte("welcome to s3!")) + c.Check(s.GetFile(c, "a/b+c.txt"), DeepEquals, []byte("welcome to s3!")) - data, err = s.storage.bucket.Get("a/b c.txt") - c.Check(err, IsNil) - c.Check(data, DeepEquals, []byte("welcome to s3!")) + c.Check(s.GetFile(c, "a/b c.txt"), DeepEquals, []byte("welcome to s3!")) } func (s *PublishedStorageSuite) TestFilelist(c *C) { paths := []string{"a", "b", "c", "testa", "test/a", "test/b", "lala/a", "lala/b", "lala/c"} for _, path := range paths { - err := s.storage.bucket.Put(path, []byte("test"), "binary/octet-stream", "private") - c.Check(err, IsNil) + s.PutFile(c, path, []byte("test")) } list, err := s.storage.Filelist("") @@ -114,8 +133,7 @@ func (s *PublishedStorageSuite) TestFilelistPlusWorkaround(c *C) { paths := []string{"a", "b", "c", "testa", "test/a+1", "test/a 1", "lala/a+b", "lala/a b", "lala/c"} for _, path := range paths { - err := s.storage.bucket.Put(path, []byte("test"), "binary/octet-stream", "private") - c.Check(err, IsNil) + s.PutFile(c, path, []byte("test")) } list, err := s.storage.Filelist("") @@ -136,40 +154,30 @@ func (s *PublishedStorageSuite) TestFilelistPlusWorkaround(c *C) { } func (s *PublishedStorageSuite) TestRemove(c *C) { - err := s.storage.bucket.Put("a/b", []byte("test"), "binary/octet-stream", "private") + s.PutFile(c, "a/b", []byte("test")) + + err := s.storage.Remove("a/b") c.Check(err, IsNil) - err = s.storage.Remove("a/b") - c.Check(err, IsNil) - - _, err = s.storage.bucket.Get("a/b") - c.Check(err, ErrorMatches, "The specified key does not exist.") + s.AssertNoFile(c, "a/b") } func (s *PublishedStorageSuite) TestRemovePlusWorkaround(c *C) { s.storage.plusWorkaround = true - err := s.storage.bucket.Put("a/b+c", []byte("test"), "binary/octet-stream", "private") + s.PutFile(c, "a/b+c", []byte("test")) + s.PutFile(c, "a/b", []byte("test")) + + err := s.storage.Remove("a/b+c") c.Check(err, IsNil) - err = s.storage.bucket.Put("a/b", []byte("test"), "binary/octet-stream", "private") - c.Check(err, IsNil) - - err = s.storage.Remove("a/b+c") - c.Check(err, IsNil) - - _, err = s.storage.bucket.Get("a/b+c") - c.Check(err, ErrorMatches, "The specified key does not exist.") - - _, err = s.storage.bucket.Get("a/b c") - c.Check(err, ErrorMatches, "The specified key does not exist.") + s.AssertNoFile(c, "a/b+c") + s.AssertNoFile(c, "a/b c") err = s.storage.Remove("a/b") c.Check(err, IsNil) - _, err = s.storage.bucket.Get("a/b") - c.Check(err, ErrorMatches, "The specified key does not exist.") - + s.AssertNoFile(c, "a/b") } func (s *PublishedStorageSuite) TestRemoveDirs(c *C) { @@ -177,8 +185,7 @@ func (s *PublishedStorageSuite) TestRemoveDirs(c *C) { paths := []string{"a", "b", "c", "testa", "test/a+1", "test/a 1", "lala/a+b", "lala/a b", "lala/c"} for _, path := range paths { - err := s.storage.bucket.Put(path, []byte("test"), "binary/octet-stream", "private") - c.Check(err, IsNil) + s.PutFile(c, path, []byte("test")) } err := s.storage.RemoveDirs("test", nil) @@ -192,8 +199,7 @@ func (s *PublishedStorageSuite) TestRemoveDirs(c *C) { func (s *PublishedStorageSuite) TestRemoveDirsPlusWorkaround(c *C) { paths := []string{"a", "b", "c", "testa", "test/a", "test/b", "lala/a", "lala/b", "lala/c"} for _, path := range paths { - err := s.storage.bucket.Put(path, []byte("test"), "binary/octet-stream", "private") - c.Check(err, IsNil) + s.PutFile(c, path, []byte("test")) } err := s.storage.RemoveDirs("test", nil) @@ -230,31 +236,23 @@ func (s *PublishedStorageSuite) TestLinkFromPool(c *C) { err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), pool, sourcePath, "c1df1da7a1ce305a3b60af9d5733ac1d", false) c.Check(err, IsNil) - data, err := s.storage.bucket.Get("pool/main/m/mars-invaders/mars-invaders_1.03.deb") - c.Check(err, IsNil) - c.Check(data, DeepEquals, []byte("Contents")) + c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents")) // duplicate link from pool err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), pool, sourcePath, "c1df1da7a1ce305a3b60af9d5733ac1d", false) c.Check(err, IsNil) - data, err = s.storage.bucket.Get("pool/main/m/mars-invaders/mars-invaders_1.03.deb") - c.Check(err, IsNil) - c.Check(data, DeepEquals, []byte("Contents")) + c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents")) // link from pool with conflict err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), pool, sourcePath2, "e9dfd31cc505d51fc26975250750deab", false) c.Check(err, ErrorMatches, ".*file already exists and is different.*") - data, err = s.storage.bucket.Get("pool/main/m/mars-invaders/mars-invaders_1.03.deb") - c.Check(err, IsNil) - c.Check(data, DeepEquals, []byte("Contents")) + c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents")) // link from pool with conflict and force err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), pool, sourcePath2, "e9dfd31cc505d51fc26975250750deab", true) c.Check(err, IsNil) - data, err = s.storage.bucket.Get("pool/main/m/mars-invaders/mars-invaders_1.03.deb") - c.Check(err, IsNil) - c.Check(data, DeepEquals, []byte("Spam")) + c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Spam")) } diff --git a/s3/server_test.go b/s3/server_test.go new file mode 100644 index 00000000..668e289b --- /dev/null +++ b/s3/server_test.go @@ -0,0 +1,699 @@ +package s3 + +import ( + "bytes" + "crypto/md5" + "encoding/hex" + "encoding/xml" + "fmt" + "io" + "io/ioutil" + "log" + "net" + "net/http" + "net/url" + "regexp" + "sort" + "strconv" + "strings" + "sync" + "time" +) + +const debug = true + +type s3Error struct { + statusCode int + XMLName struct{} `xml:"Error"` + Code string + Message string + BucketName string + RequestId string + HostId string +} + +type action struct { + srv *Server + w http.ResponseWriter + req *http.Request + reqId string +} + +// Config controls the internal behaviour of the Server. A nil config is the default +// and behaves as if all configurations assume their default behaviour. Once passed +// to NewServer, the configuration must not be modified. +type Config struct { + // Send409Conflict controls how the Server will respond to calls to PUT on a + // previously existing bucket. The default is false, and corresponds to the + // us-east-1 s3 enpoint. Setting this value to true emulates the behaviour of + // all other regions. + // http://docs.amazonwebservices.com/AmazonS3/latest/API/ErrorResponses.html + Send409Conflict bool +} + +func (c *Config) send409Conflict() bool { + if c != nil { + return c.Send409Conflict + } + return false +} + +// Server is a fake S3 server for testing purposes. +// All of the data for the server is kept in memory. +type Server struct { + url string + reqId int + listener net.Listener + mu sync.Mutex + buckets map[string]*bucket + config *Config +} + +type bucket struct { + name string + acl string + ctime time.Time + objects map[string]*object +} + +type object struct { + name string + mtime time.Time + meta http.Header // metadata to return with requests. + checksum []byte // also held as Content-MD5 in meta. + data []byte +} + +// A resource encapsulates the subject of an HTTP request. +// The resource referred to may or may not exist +// when the request is made. +type resource interface { + put(a *action) interface{} + get(a *action) interface{} + post(a *action) interface{} + delete(a *action) interface{} +} + +func NewServer(config *Config) (*Server, error) { + l, err := net.Listen("tcp", "localhost:0") + if err != nil { + return nil, fmt.Errorf("cannot listen on localhost: %v", err) + } + srv := &Server{ + listener: l, + url: "http://" + l.Addr().String(), + buckets: make(map[string]*bucket), + config: config, + } + go http.Serve(l, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + srv.serveHTTP(w, req) + })) + return srv, nil +} + +// Quit closes down the server. +func (srv *Server) Quit() { + srv.listener.Close() +} + +// URL returns a URL for the server. +func (srv *Server) URL() string { + return srv.url +} + +func fatalf(code int, codeStr string, errf string, a ...interface{}) { + panic(&s3Error{ + statusCode: code, + Code: codeStr, + Message: fmt.Sprintf(errf, a...), + }) +} + +// serveHTTP serves the S3 protocol. +func (srv *Server) serveHTTP(w http.ResponseWriter, req *http.Request) { + // ignore error from ParseForm as it's usually spurious. + req.ParseForm() + + srv.mu.Lock() + defer srv.mu.Unlock() + + if debug { + log.Printf("s3test %q %q", req.Method, req.URL) + } + a := &action{ + srv: srv, + w: w, + req: req, + reqId: fmt.Sprintf("%09X", srv.reqId), + } + srv.reqId++ + + var r resource + defer func() { + switch err := recover().(type) { + case *s3Error: + switch r := r.(type) { + case objectResource: + err.BucketName = r.bucket.name + case bucketResource: + err.BucketName = r.name + } + err.RequestId = a.reqId + // TODO HostId + w.Header().Set("Content-Type", `xml version="1.0" encoding="UTF-8"`) + w.WriteHeader(err.statusCode) + xmlMarshal(w, err) + case nil: + default: + panic(err) + } + }() + + r = srv.resourceForURL(req.URL) + + var resp interface{} + switch req.Method { + case "PUT": + resp = r.put(a) + case "GET", "HEAD": + resp = r.get(a) + case "DELETE": + resp = r.delete(a) + case "POST": + resp = r.post(a) + default: + fatalf(400, "MethodNotAllowed", "unknown http request method %q", req.Method) + } + if resp != nil && req.Method != "HEAD" { + xmlMarshal(w, resp) + } +} + +// xmlMarshal is the same as xml.Marshal except that +// it panics on error. The marshalling should not fail, +// but we want to know if it does. +func xmlMarshal(w io.Writer, x interface{}) { + if err := xml.NewEncoder(w).Encode(x); err != nil { + panic(fmt.Errorf("error marshalling %#v: %v", x, err)) + } +} + +// In a fully implemented test server, each of these would have +// its own resource type. +var unimplementedBucketResourceNames = map[string]bool{ + "acl": true, + "lifecycle": true, + "policy": true, + "location": true, + "logging": true, + "notification": true, + "versions": true, + "requestPayment": true, + "versioning": true, + "website": true, + "uploads": true, +} + +var unimplementedObjectResourceNames = map[string]bool{ + "uploadId": true, + "acl": true, + "torrent": true, + "uploads": true, +} + +var pathRegexp = regexp.MustCompile("/(([^/]+)(/(.*))?)?") + +// resourceForURL returns a resource object for the given URL. +func (srv *Server) resourceForURL(u *url.URL) (r resource) { + + if u.Path == "/" { + return serviceResource{ + buckets: srv.buckets, + } + } + + m := pathRegexp.FindStringSubmatch(u.Path) + if m == nil { + fatalf(404, "InvalidURI", "Couldn't parse the specified URI") + } + bucketName := m[2] + objectName := m[4] + if bucketName == "" { + return nullResource{} // root + } + b := bucketResource{ + name: bucketName, + bucket: srv.buckets[bucketName], + } + q := u.Query() + if objectName == "" { + for name := range q { + if unimplementedBucketResourceNames[name] { + return nullResource{} + } + } + return b + + } + if b.bucket == nil { + fatalf(404, "NoSuchBucket", "The specified bucket does not exist") + } + objr := objectResource{ + name: objectName, + version: q.Get("versionId"), + bucket: b.bucket, + } + for name := range q { + if unimplementedObjectResourceNames[name] { + return nullResource{} + } + } + if obj := objr.bucket.objects[objr.name]; obj != nil { + objr.object = obj + } + return objr +} + +// nullResource has error stubs for all resource methods. +type nullResource struct{} + +func notAllowed() interface{} { + fatalf(400, "MethodNotAllowed", "The specified method is not allowed against this resource") + return nil +} + +func (nullResource) put(a *action) interface{} { return notAllowed() } +func (nullResource) get(a *action) interface{} { return notAllowed() } +func (nullResource) post(a *action) interface{} { return notAllowed() } +func (nullResource) delete(a *action) interface{} { return notAllowed() } + +const timeFormat = "2006-01-02T15:04:05Z" + +type serviceResource struct { + buckets map[string]*bucket +} + +func (serviceResource) put(a *action) interface{} { return notAllowed() } +func (serviceResource) post(a *action) interface{} { return notAllowed() } +func (serviceResource) delete(a *action) interface{} { return notAllowed() } + +// GET on an s3 service lists the buckets. +// http://docs.aws.amazon.com/AmazonS3/latest/API/RESTServiceGET.html +func (r serviceResource) get(a *action) interface{} { + type respBucket struct { + Name string + } + + type response struct { + Buckets []respBucket `xml:">Bucket"` + } + + resp := response{} + + for _, bucketPtr := range r.buckets { + bkt := respBucket{ + Name: bucketPtr.name, + } + resp.Buckets = append(resp.Buckets, bkt) + } + + return &resp +} + +type bucketResource struct { + name string + bucket *bucket // non-nil if the bucket already exists. +} + +type Owner struct { + ID string + DisplayName string +} + +// The ListResp type holds the results of a List bucket operation. +type ListResp struct { + Name string + Prefix string + Delimiter string + Marker string + NextMarker string + MaxKeys int + // IsTruncated is true if the results have been truncated because + // there are more keys and prefixes than can fit in MaxKeys. + // N.B. this is the opposite sense to that documented (incorrectly) in + // http://goo.gl/YjQTc + IsTruncated bool + Contents []Key + CommonPrefixes []string `xml:">Prefix"` +} + +// The Key type represents an item stored in an S3 bucket. +type Key struct { + Key string + LastModified string + Size int64 + // ETag gives the hex-encoded MD5 sum of the contents, + // surrounded with double-quotes. + ETag string + StorageClass string + Owner Owner +} + +// GET on a bucket lists the objects in the bucket. +// http://docs.amazonwebservices.com/AmazonS3/latest/API/RESTBucketGET.html +func (r bucketResource) get(a *action) interface{} { + if r.bucket == nil { + fatalf(404, "NoSuchBucket", "The specified bucket does not exist") + } + delimiter := a.req.Form.Get("delimiter") + marker := a.req.Form.Get("marker") + maxKeys := -1 + if s := a.req.Form.Get("max-keys"); s != "" { + i, err := strconv.Atoi(s) + if err != nil || i < 0 { + fatalf(400, "invalid value for max-keys: %q", s) + } + maxKeys = i + } + prefix := a.req.Form.Get("prefix") + a.w.Header().Set("Content-Type", "application/xml") + + if a.req.Method == "HEAD" { + return nil + } + + var objs orderedObjects + + // first get all matching objects and arrange them in alphabetical order. + for name, obj := range r.bucket.objects { + if strings.HasPrefix(name, prefix) { + objs = append(objs, obj) + } + } + sort.Sort(objs) + + if maxKeys <= 0 { + maxKeys = 1000 + } + resp := &ListResp{ + Name: r.bucket.name, + Prefix: prefix, + Delimiter: delimiter, + Marker: marker, + MaxKeys: maxKeys, + } + + var prefixes []string + for _, obj := range objs { + if !strings.HasPrefix(obj.name, prefix) { + continue + } + name := obj.name + isPrefix := false + if delimiter != "" { + if i := strings.Index(obj.name[len(prefix):], delimiter); i >= 0 { + name = obj.name[:len(prefix)+i+len(delimiter)] + if prefixes != nil && prefixes[len(prefixes)-1] == name { + continue + } + isPrefix = true + } + } + if name <= marker { + continue + } + if len(resp.Contents)+len(prefixes) >= maxKeys { + resp.IsTruncated = true + break + } + if isPrefix { + prefixes = append(prefixes, name) + } else { + // Contents contains only keys not found in CommonPrefixes + resp.Contents = append(resp.Contents, obj.s3Key()) + } + } + resp.CommonPrefixes = prefixes + return resp +} + +// orderedObjects holds a slice of objects that can be sorted +// by name. +type orderedObjects []*object + +func (s orderedObjects) Len() int { + return len(s) +} +func (s orderedObjects) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} +func (s orderedObjects) Less(i, j int) bool { + return s[i].name < s[j].name +} + +func (obj *object) s3Key() Key { + return Key{ + Key: obj.name, + LastModified: obj.mtime.Format(timeFormat), + Size: int64(len(obj.data)), + ETag: fmt.Sprintf(`"%x"`, obj.checksum), + // TODO StorageClass + // TODO Owner + } +} + +// DELETE on a bucket deletes the bucket if it's not empty. +func (r bucketResource) delete(a *action) interface{} { + b := r.bucket + if b == nil { + fatalf(404, "NoSuchBucket", "The specified bucket does not exist") + } + if len(b.objects) > 0 { + fatalf(400, "BucketNotEmpty", "The bucket you tried to delete is not empty") + } + delete(a.srv.buckets, b.name) + return nil +} + +// PUT on a bucket creates the bucket. +// http://docs.amazonwebservices.com/AmazonS3/latest/API/RESTBucketPUT.html +func (r bucketResource) put(a *action) interface{} { + var created bool + if r.bucket == nil { + if !validBucketName(r.name) { + fatalf(400, "InvalidBucketName", "The specified bucket is not valid") + } + if loc := locationConstraint(a); loc == "" { + fatalf(400, "InvalidRequets", "The unspecified location constraint is incompatible for the region specific endpoint this request was sent to.") + } + // TODO validate acl + r.bucket = &bucket{ + name: r.name, + // TODO default acl + objects: make(map[string]*object), + } + a.srv.buckets[r.name] = r.bucket + created = true + } + if !created && a.srv.config.send409Conflict() { + fatalf(409, "BucketAlreadyOwnedByYou", "Your previous request to create the named bucket succeeded and you already own it.") + } + r.bucket.acl = a.req.Header.Get("x-amz-acl") + return nil +} + +func (bucketResource) post(a *action) interface{} { + fatalf(400, "Method", "bucket POST method not available") + return nil +} + +// validBucketName returns whether name is a valid bucket name. +// Here are the rules, from: +// http://docs.amazonwebservices.com/AmazonS3/2006-03-01/dev/BucketRestrictions.html +// +// Can contain lowercase letters, numbers, periods (.), underscores (_), +// and dashes (-). You can use uppercase letters for buckets only in the +// US Standard region. +// +// Must start with a number or letter +// +// Must be between 3 and 255 characters long +// +// There's one extra rule (Must not be formatted as an IP address (e.g., 192.168.5.4) +// but the real S3 server does not seem to check that rule, so we will not +// check it either. +// +func validBucketName(name string) bool { + if len(name) < 3 || len(name) > 255 { + return false + } + r := name[0] + if !(r >= '0' && r <= '9' || r >= 'a' && r <= 'z') { + return false + } + for _, r := range name { + switch { + case r >= '0' && r <= '9': + case r >= 'a' && r <= 'z': + case r == '_' || r == '-': + case r == '.': + default: + return false + } + } + return true +} + +var responseParams = map[string]bool{ + "content-type": true, + "content-language": true, + "expires": true, + "cache-control": true, + "content-disposition": true, + "content-encoding": true, +} + +type objectResource struct { + name string + version string + bucket *bucket // always non-nil. + object *object // may be nil. +} + +// GET on an object gets the contents of the object. +// http://docs.amazonwebservices.com/AmazonS3/latest/API/RESTObjectGET.html +func (objr objectResource) get(a *action) interface{} { + obj := objr.object + if obj == nil { + fatalf(404, "NoSuchKey", "The specified key does not exist.") + } + h := a.w.Header() + // add metadata + for name, d := range obj.meta { + h[name] = d + } + // override header values in response to request parameters. + for name, vals := range a.req.Form { + if strings.HasPrefix(name, "response-") { + name = name[len("response-"):] + if !responseParams[name] { + continue + } + h.Set(name, vals[0]) + } + } + if r := a.req.Header.Get("Range"); r != "" { + fatalf(400, "NotImplemented", "range unimplemented") + } + // TODO Last-Modified-Since + // TODO If-Modified-Since + // TODO If-Unmodified-Since + // TODO If-Match + // TODO If-None-Match + // TODO Connection: close ?? + // TODO x-amz-request-id + h.Set("Content-Length", fmt.Sprint(len(obj.data))) + h.Set("ETag", hex.EncodeToString(obj.checksum)) + h.Set("Last-Modified", obj.mtime.UTC().Format(http.TimeFormat)) + if a.req.Method == "HEAD" { + return nil + } + // TODO avoid holding the lock when writing data. + _, err := a.w.Write(obj.data) + if err != nil { + // we can't do much except just log the fact. + log.Printf("error writing data: %v", err) + } + return nil +} + +var metaHeaders = map[string]bool{ + "Content-MD5": true, + "x-amz-acl": true, + "Content-Type": true, + "Content-Encoding": true, + "Content-Disposition": true, +} + +// PUT on an object creates the object. +func (objr objectResource) put(a *action) interface{} { + // TODO Cache-Control header + // TODO Expires header + // TODO x-amz-server-side-encryption + // TODO x-amz-storage-class + + // TODO is this correct, or should we erase all previous metadata? + obj := objr.object + if obj == nil { + obj = &object{ + name: objr.name, + meta: make(http.Header), + } + } + + var expectHash []byte + if c := a.req.Header.Get("Content-MD5"); c != "" { + var err error + expectHash, err = hex.DecodeString(c) + if err != nil || len(expectHash) != md5.Size { + fatalf(400, "InvalidDigest", "The Content-MD5 you specified was invalid") + } + } + sum := md5.New() + // TODO avoid holding lock while reading data. + data, err := ioutil.ReadAll(io.TeeReader(a.req.Body, sum)) + if err != nil { + fatalf(400, "TODO", "read error") + } + gotHash := sum.Sum(nil) + if expectHash != nil && bytes.Compare(gotHash, expectHash) != 0 { + fatalf(400, "BadDigest", "The Content-MD5 you specified did not match what we received") + } + if a.req.ContentLength >= 0 && int64(len(data)) != a.req.ContentLength { + fatalf(400, "IncompleteBody", "You did not provide the number of bytes specified by the Content-Length HTTP header") + } + + // PUT request has been successful - save data and metadata + for key, values := range a.req.Header { + key = http.CanonicalHeaderKey(key) + if metaHeaders[key] || strings.HasPrefix(key, "X-Amz-Meta-") { + obj.meta[key] = values + } + } + obj.data = data + obj.checksum = gotHash + obj.mtime = time.Now() + objr.bucket.objects[objr.name] = obj + return nil +} + +func (objr objectResource) delete(a *action) interface{} { + delete(objr.bucket.objects, objr.name) + return nil +} + +func (objr objectResource) post(a *action) interface{} { + fatalf(400, "MethodNotAllowed", "The specified method is not allowed against this resource") + return nil +} + +type CreateBucketConfiguration struct { + LocationConstraint string +} + +// locationConstraint parses the request body (if present). +// If there is no body, an empty string will be returned. +func locationConstraint(a *action) string { + var body bytes.Buffer + if _, err := io.Copy(&body, a.req.Body); err != nil { + fatalf(400, "InvalidRequest", err.Error()) + } + if body.Len() == 0 { + return "" + } + var loc CreateBucketConfiguration + if err := xml.NewDecoder(&body).Decode(&loc); err != nil { + fatalf(400, "InvalidRequest", err.Error()) + } + return loc.LocationConstraint +} diff --git a/utils/config.go b/utils/config.go index dfc7aa25..ac3a9f08 100644 --- a/utils/config.go +++ b/utils/config.go @@ -32,6 +32,7 @@ type S3PublishRoot struct { Endpoint string `json:"endpoint"` AccessKeyID string `json:"awsAccessKeyID"` SecretAccessKey string `json:"awsSecretAccessKey"` + SessionToken string `json:"awsSessionToken"` Prefix string `json:"prefix"` ACL string `json:"acl"` StorageClass string `json:"storageClass"` diff --git a/utils/config_test.go b/utils/config_test.go index 23036f4f..b98b4b4a 100644 --- a/utils/config_test.go +++ b/utils/config_test.go @@ -69,6 +69,7 @@ func (s *ConfigSuite) TestSaveConfig(c *C) { " \"endpoint\": \"\",\n"+ " \"awsAccessKeyID\": \"\",\n"+ " \"awsSecretAccessKey\": \"\",\n"+ + " \"awsSessionToken\": \"\",\n"+ " \"prefix\": \"\",\n"+ " \"acl\": \"\",\n"+ " \"storageClass\": \"\",\n"+