Merge branch 'bitglue-aws_sdk' #344

This commit is contained in:
Andrey Smirnov
2016-02-09 12:00:16 +03:00
9 changed files with 860 additions and 131 deletions
+1
View File
@@ -19,3 +19,4 @@ List of contributors, in chronological order:
* Paul Krohn (https://github.com/paul-krohn)
* Vincent Bernat (https://github.com/vincentbernat)
* x539 (https://github.com/x539)
* Phil Frost (https://github.com/bitglue)
+3 -1
View File
@@ -1,12 +1,14 @@
gom 'github.com/AlekSi/pointer', :commit => '5f6d527dae3d678b46fbb20331ddf44e2b841943'
gom 'github.com/awalterschulze/gographviz', :commit => '20d1f693416d9be045340150094aa42035a41c9e'
gom 'github.com/aws/aws-sdk-go', :commit => 'a170e9cb76475a0da7c0326a13cc2b39e9244b3b'
gom 'github.com/cheggaaa/pb', :commit => '2c1b74620cc58a81ac152ee2d322e28c806d81ed'
gom 'github.com/DisposaBoy/JsonConfigReader', :commit => '33a99fdf1d5ee1f79b5077e9c06f955ad356d5f4'
gom 'github.com/gin-gonic/gin', :commit => 'b1758d3bfa09e61ddbc1c9a627e936eec6a170de'
gom 'github.com/go-ini/ini', :commit => 'afbd495e5aaea13597b5e14fe514ddeaa4d76fc3'
gom 'github.com/jlaffaye/ftp', :commit => 'fec71e62e457557fbe85cefc847a048d57815d76'
gom 'github.com/jmespath/go-jmespath', :commit => '0b12d6b521d83fc7f755e7cfc1b1fbdd35a01a74'
gom 'github.com/julienschmidt/httprouter', :commit => '46807412fe50aaceb73bb57061c2230fd26a1640'
gom 'github.com/mattn/go-shellwords', :commit => 'c7ca6f94add751566a61cf2199e1de78d4c3eee4'
gom 'github.com/mitchellh/goamz/s3', :commit => 'caaaea8b30ee15616494ee68abd5d8ebbbef05cf'
gom 'github.com/mkrautz/goar', :commit => '282caa8bd9daba480b51f1d5a988714913b97aad'
gom 'github.com/mxk/go-flowrate/flowrate', :commit => 'cca7078d478f8520f85629ad7c68962d31ed7682'
gom 'github.com/ncw/swift', :commit => '384ef27c70645e285f8bb9d02276bf654d06027e'
+2 -1
View File
@@ -321,7 +321,8 @@ func (context *AptlyContext) GetPublishedStorage(name string) aptly.PublishedSto
}
var err error
publishedStorage, err = s3.NewPublishedStorage(params.AccessKeyID, params.SecretAccessKey,
publishedStorage, err = s3.NewPublishedStorage(
params.AccessKeyID, params.SecretAccessKey, params.SessionToken,
params.Region, params.Endpoint, params.Bucket, params.ACL, params.Prefix, params.StorageClass,
params.EncryptionMethod, params.PlusWorkaround, params.DisableMultiDel)
if err != nil {
+1
View File
@@ -55,6 +55,7 @@ Configuration file is stored in JSON format (default values shown below):
"endpoint": "",
"awsAccessKeyID": "",
"awsSecretAccessKey": "",
"awsSessionToken": "",
"prefix": "",
"acl": "public\-read",
"storageClass": "",
+86 -61
View File
@@ -2,11 +2,12 @@ package s3
import (
"fmt"
"github.com/mitchellh/goamz/aws"
"github.com/mitchellh/goamz/s3"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/smira/aptly/aptly"
"github.com/smira/aptly/files"
"net/http"
"os"
"path/filepath"
"strings"
@@ -15,8 +16,9 @@ import (
// PublishedStorage abstract file system with published files (actually hosted on S3)
type PublishedStorage struct {
s3 *s3.S3
bucket *s3.Bucket
acl s3.ACL
config *aws.Config
bucket string
acl string
prefix string
storageClass string
encryptionMethod string
@@ -31,8 +33,11 @@ var (
)
// NewPublishedStorageRaw creates published storage from raw aws credentials
func NewPublishedStorageRaw(auth aws.Auth, region aws.Region, bucket, defaultACL, prefix,
storageClass, encryptionMethod string, plusWorkaround, disabledMultiDel bool) (*PublishedStorage, error) {
func NewPublishedStorageRaw(
bucket, defaultACL, prefix, storageClass, encryptionMethod string,
plusWorkaround, disabledMultiDel bool,
config *aws.Config,
) (*PublishedStorage, error) {
if defaultACL == "" {
defaultACL = "private"
}
@@ -41,9 +46,13 @@ func NewPublishedStorageRaw(auth aws.Auth, region aws.Region, bucket, defaultACL
storageClass = ""
}
sess := session.New(config)
result := &PublishedStorage{
s3: s3.New(auth, region),
acl: s3.ACL(defaultACL),
s3: s3.New(sess),
bucket: bucket,
config: config,
acl: defaultACL,
prefix: prefix,
storageClass: storageClass,
encryptionMethod: encryptionMethod,
@@ -51,48 +60,34 @@ func NewPublishedStorageRaw(auth aws.Auth, region aws.Region, bucket, defaultACL
disableMultiDel: disabledMultiDel,
}
result.s3.HTTPClient = func() *http.Client {
return RetryingClient
}
result.bucket = result.s3.Bucket(bucket)
return result, nil
}
// NewPublishedStorage creates new instance of PublishedStorage with specified S3 access
// keys, region and bucket name
func NewPublishedStorage(accessKey, secretKey, region, endpoint, bucket, defaultACL, prefix,
func NewPublishedStorage(accessKey, secretKey, sessionToken, region, endpoint, bucket, defaultACL, prefix,
storageClass, encryptionMethod string, plusWorkaround, disableMultiDel bool) (*PublishedStorage, error) {
auth, err := aws.GetAuth(accessKey, secretKey)
if err != nil {
return nil, err
config := &aws.Config{
HTTPClient: RetryingClient,
Region: aws.String(region),
}
var awsRegion aws.Region
if endpoint == "" {
var ok bool
awsRegion, ok = aws.Regions[region]
if !ok {
return nil, fmt.Errorf("unknown region: %#v", region)
}
} else {
awsRegion = aws.Region{
Name: region,
S3Endpoint: endpoint,
S3LocationConstraint: true,
S3LowercaseBucket: true,
}
if endpoint != "" {
config = config.WithEndpoint(endpoint).WithS3ForcePathStyle(true)
}
return NewPublishedStorageRaw(auth, awsRegion, bucket, defaultACL, prefix, storageClass, encryptionMethod,
plusWorkaround, disableMultiDel)
if accessKey != "" {
config.Credentials = credentials.NewStaticCredentials(accessKey, secretKey, sessionToken)
}
return NewPublishedStorageRaw(bucket, defaultACL, prefix, storageClass,
encryptionMethod, plusWorkaround, disableMultiDel, config)
}
// String
func (storage *PublishedStorage) String() string {
return fmt.Sprintf("S3: %s:%s/%s", storage.s3.Region.Name, storage.bucket.Name, storage.prefix)
return fmt.Sprintf("S3: %s:%s/%s", *storage.config.Region, storage.bucket, storage.prefix)
}
// MkDir creates directory recursively under public path
@@ -106,7 +101,6 @@ func (storage *PublishedStorage) PutFile(path string, sourceFilename string) err
var (
source *os.File
err error
fi os.FileInfo
)
source, err = os.Open(sourceFilename)
if err != nil {
@@ -114,22 +108,20 @@ func (storage *PublishedStorage) PutFile(path string, sourceFilename string) err
}
defer source.Close()
fi, err = source.Stat()
if err != nil {
return err
}
headers := map[string][]string{
"Content-Type": {"binary/octet-stream"},
params := &s3.PutObjectInput{
Bucket: aws.String(storage.bucket),
Key: aws.String(filepath.Join(storage.prefix, path)),
Body: source,
ACL: aws.String(storage.acl),
}
if storage.storageClass != "" {
headers["x-amz-storage-class"] = []string{storage.storageClass}
params.StorageClass = aws.String(storage.storageClass)
}
if storage.encryptionMethod != "" {
headers["x-amz-server-side-encryption"] = []string{storage.encryptionMethod}
params.ServerSideEncryption = aws.String(storage.encryptionMethod)
}
err = storage.bucket.PutReaderHeader(filepath.Join(storage.prefix, path), source, fi.Size(), headers, storage.acl)
_, err = storage.s3.PutObject(params)
if err != nil {
return fmt.Errorf("error uploading %s to %s: %s", sourceFilename, storage, err)
}
@@ -142,7 +134,11 @@ func (storage *PublishedStorage) PutFile(path string, sourceFilename string) err
// Remove removes single file under public path
func (storage *PublishedStorage) Remove(path string) error {
err := storage.bucket.Del(filepath.Join(storage.prefix, path))
params := &s3.DeleteObjectInput{
Bucket: aws.String(storage.bucket),
Key: aws.String(path),
}
_, err := storage.s3.DeleteObject(params)
if err != nil {
return fmt.Errorf("error deleting %s from %s: %s", path, storage, err)
}
@@ -165,7 +161,11 @@ func (storage *PublishedStorage) RemoveDirs(path string, progress aptly.Progress
if storage.disableMultiDel {
for i := range filelist {
err = storage.bucket.Del(filepath.Join(storage.prefix, path, filelist[i]))
params := &s3.DeleteObjectInput{
Bucket: aws.String(storage.bucket),
Key: aws.String(filepath.Join(storage.prefix, path, filelist[i])),
}
_, err := storage.s3.DeleteObject(params)
if err != nil {
return fmt.Errorf("error deleting path %s from %s: %s", filelist[i], storage, err)
}
@@ -180,13 +180,23 @@ func (storage *PublishedStorage) RemoveDirs(path string, progress aptly.Progress
} else {
part = filelist[i*page : (i+1)*page]
}
paths := make([]string, len(part))
paths := make([]*s3.ObjectIdentifier, len(part))
for i := range part {
paths[i] = filepath.Join(storage.prefix, path, part[i])
paths[i] = &s3.ObjectIdentifier{
Key: aws.String(filepath.Join(storage.prefix, path, part[i])),
}
}
err = storage.bucket.MultiDel(paths)
params := &s3.DeleteObjectsInput{
Bucket: aws.String(storage.bucket),
Delete: &s3.Delete{
Objects: paths,
Quiet: aws.Bool(true),
},
}
_, err := storage.s3.DeleteObjects(params)
if err != nil {
return fmt.Errorf("error deleting multiple paths from %s: %s", storage, err)
}
@@ -265,13 +275,19 @@ func (storage *PublishedStorage) internalFilelist(prefix string, hidePlusWorkaro
prefix += "/"
}
for {
contents, err := storage.bucket.List(prefix, "", marker, 1000)
params := &s3.ListObjectsInput{
Bucket: aws.String(storage.bucket),
Prefix: aws.String(prefix),
MaxKeys: aws.Int64(1000),
}
contents, err := storage.s3.ListObjects(params)
if err != nil {
return nil, nil, fmt.Errorf("error listing under prefix %s in %s: %s", prefix, storage, err)
}
lastKey := ""
for _, key := range contents.Contents {
lastKey = key.Key
lastKey = *key.Key
if storage.plusWorkaround && hidePlusWorkaround && strings.Index(lastKey, " ") != -1 {
// if we use plusWorkaround, we want to hide those duplicates
/// from listing
@@ -279,14 +295,14 @@ func (storage *PublishedStorage) internalFilelist(prefix string, hidePlusWorkaro
}
if prefix == "" {
paths = append(paths, key.Key)
paths = append(paths, *key.Key)
} else {
paths = append(paths, key.Key[len(prefix):])
paths = append(paths, (*key.Key)[len(prefix):])
}
md5s = append(md5s, strings.Replace(key.ETag, "\"", "", -1))
md5s = append(md5s, strings.Replace(*key.ETag, "\"", "", -1))
}
if contents.IsTruncated {
marker = contents.NextMarker
if contents.IsTruncated != nil && *contents.IsTruncated {
marker = *contents.NextMarker
if marker == "" {
// From the s3 docs: If response does not include the
// NextMarker and it is truncated, you can use the value of the
@@ -304,7 +320,16 @@ func (storage *PublishedStorage) internalFilelist(prefix string, hidePlusWorkaro
// RenameFile renames (moves) file
func (storage *PublishedStorage) RenameFile(oldName, newName string) error {
err := storage.bucket.Copy(filepath.Join(storage.prefix, oldName), filepath.Join(storage.prefix, newName), storage.acl)
source := fmt.Sprintf("/%s/%s", storage.bucket, filepath.Join(storage.prefix, oldName))
params := &s3.CopyObjectInput{
Bucket: aws.String(storage.bucket),
CopySource: aws.String(source),
Key: aws.String(filepath.Join(storage.prefix, newName)),
ACL: aws.String(storage.acl),
}
_, err := storage.s3.CopyObject(params)
if err != nil {
return fmt.Errorf("error copying %s -> %s in %s: %s", oldName, newName, storage, err)
}
+66 -68
View File
@@ -1,18 +1,21 @@
package s3
import (
"github.com/mitchellh/goamz/aws"
"github.com/mitchellh/goamz/s3/s3test"
"github.com/smira/aptly/files"
"bytes"
"io/ioutil"
"os"
"path/filepath"
. "gopkg.in/check.v1"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/smira/aptly/files"
)
type PublishedStorageSuite struct {
srv *s3test.Server
srv *Server
storage, prefixedStorage *PublishedStorage
}
@@ -20,18 +23,16 @@ var _ = Suite(&PublishedStorageSuite{})
func (s *PublishedStorageSuite) SetUpTest(c *C) {
var err error
s.srv, err = s3test.NewServer(&s3test.Config{})
s.srv, err = NewServer(&Config{})
c.Assert(err, IsNil)
c.Assert(s.srv, NotNil)
auth, _ := aws.GetAuth("aa", "bb")
s.storage, err = NewPublishedStorageRaw(auth, aws.Region{Name: "test-1", S3Endpoint: s.srv.URL(), S3LocationConstraint: true}, "test", "", "", "", "", false, true)
s.storage, err = NewPublishedStorage("aa", "bb", "", "test-1", s.srv.URL(), "test", "", "", "", "", false, true)
c.Assert(err, IsNil)
s.prefixedStorage, err = NewPublishedStorage("aa", "bb", "", "test-1", s.srv.URL(), "test", "", "lala", "", "", false, true)
c.Assert(err, IsNil)
s.prefixedStorage, err = NewPublishedStorageRaw(auth, aws.Region{Name: "test-1", S3Endpoint: s.srv.URL(), S3LocationConstraint: true}, "test", "", "lala", "", "", false, true)
c.Assert(err, IsNil)
err = s.storage.s3.Bucket("test").PutBucket("private")
_, err = s.storage.s3.CreateBucket(&s3.CreateBucketInput{Bucket: aws.String("test")})
c.Assert(err, IsNil)
}
@@ -39,10 +40,37 @@ func (s *PublishedStorageSuite) TearDownTest(c *C) {
s.srv.Quit()
}
func (s *PublishedStorageSuite) TestNewPublishedStorage(c *C) {
stor, err := NewPublishedStorage("aa", "bbb", "", "", "", "", "", "", "", false, false)
c.Check(stor, IsNil)
c.Check(err, ErrorMatches, "unknown region: .*")
func (s *PublishedStorageSuite) GetFile(c *C, path string) []byte {
resp, err := s.storage.s3.GetObject(&s3.GetObjectInput{
Bucket: aws.String(s.storage.bucket),
Key: aws.String(path),
})
c.Assert(err, IsNil)
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
c.Assert(err, IsNil)
return body
}
func (s *PublishedStorageSuite) AssertNoFile(c *C, path string) {
_, err := s.storage.s3.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(s.storage.bucket),
Key: aws.String(path),
})
c.Assert(err, ErrorMatches, ".*\n.*status code: 404.*")
}
func (s *PublishedStorageSuite) PutFile(c *C, path string, data []byte) {
_, err := s.storage.s3.PutObject(&s3.PutObjectInput{
Bucket: aws.String(s.storage.bucket),
Key: aws.String(path),
Body: bytes.NewReader(data),
ContentType: aws.String("binary/octet-stream"),
ACL: aws.String("private"),
})
c.Assert(err, IsNil)
}
func (s *PublishedStorageSuite) TestPutFile(c *C) {
@@ -53,16 +81,12 @@ func (s *PublishedStorageSuite) TestPutFile(c *C) {
err = s.storage.PutFile("a/b.txt", filepath.Join(dir, "a"))
c.Check(err, IsNil)
data, err := s.storage.bucket.Get("a/b.txt")
c.Check(err, IsNil)
c.Check(data, DeepEquals, []byte("welcome to s3!"))
c.Check(s.GetFile(c, "a/b.txt"), DeepEquals, []byte("welcome to s3!"))
err = s.prefixedStorage.PutFile("a/b.txt", filepath.Join(dir, "a"))
c.Check(err, IsNil)
data, err = s.storage.bucket.Get("lala/a/b.txt")
c.Check(err, IsNil)
c.Check(data, DeepEquals, []byte("welcome to s3!"))
c.Check(s.GetFile(c, "lala/a/b.txt"), DeepEquals, []byte("welcome to s3!"))
}
func (s *PublishedStorageSuite) TestPutFilePlusWorkaround(c *C) {
@@ -75,20 +99,15 @@ func (s *PublishedStorageSuite) TestPutFilePlusWorkaround(c *C) {
err = s.storage.PutFile("a/b+c.txt", filepath.Join(dir, "a"))
c.Check(err, IsNil)
data, err := s.storage.bucket.Get("a/b+c.txt")
c.Check(err, IsNil)
c.Check(data, DeepEquals, []byte("welcome to s3!"))
c.Check(s.GetFile(c, "a/b+c.txt"), DeepEquals, []byte("welcome to s3!"))
data, err = s.storage.bucket.Get("a/b c.txt")
c.Check(err, IsNil)
c.Check(data, DeepEquals, []byte("welcome to s3!"))
c.Check(s.GetFile(c, "a/b c.txt"), DeepEquals, []byte("welcome to s3!"))
}
func (s *PublishedStorageSuite) TestFilelist(c *C) {
paths := []string{"a", "b", "c", "testa", "test/a", "test/b", "lala/a", "lala/b", "lala/c"}
for _, path := range paths {
err := s.storage.bucket.Put(path, []byte("test"), "binary/octet-stream", "private")
c.Check(err, IsNil)
s.PutFile(c, path, []byte("test"))
}
list, err := s.storage.Filelist("")
@@ -114,8 +133,7 @@ func (s *PublishedStorageSuite) TestFilelistPlusWorkaround(c *C) {
paths := []string{"a", "b", "c", "testa", "test/a+1", "test/a 1", "lala/a+b", "lala/a b", "lala/c"}
for _, path := range paths {
err := s.storage.bucket.Put(path, []byte("test"), "binary/octet-stream", "private")
c.Check(err, IsNil)
s.PutFile(c, path, []byte("test"))
}
list, err := s.storage.Filelist("")
@@ -136,40 +154,30 @@ func (s *PublishedStorageSuite) TestFilelistPlusWorkaround(c *C) {
}
func (s *PublishedStorageSuite) TestRemove(c *C) {
err := s.storage.bucket.Put("a/b", []byte("test"), "binary/octet-stream", "private")
s.PutFile(c, "a/b", []byte("test"))
err := s.storage.Remove("a/b")
c.Check(err, IsNil)
err = s.storage.Remove("a/b")
c.Check(err, IsNil)
_, err = s.storage.bucket.Get("a/b")
c.Check(err, ErrorMatches, "The specified key does not exist.")
s.AssertNoFile(c, "a/b")
}
func (s *PublishedStorageSuite) TestRemovePlusWorkaround(c *C) {
s.storage.plusWorkaround = true
err := s.storage.bucket.Put("a/b+c", []byte("test"), "binary/octet-stream", "private")
s.PutFile(c, "a/b+c", []byte("test"))
s.PutFile(c, "a/b", []byte("test"))
err := s.storage.Remove("a/b+c")
c.Check(err, IsNil)
err = s.storage.bucket.Put("a/b", []byte("test"), "binary/octet-stream", "private")
c.Check(err, IsNil)
err = s.storage.Remove("a/b+c")
c.Check(err, IsNil)
_, err = s.storage.bucket.Get("a/b+c")
c.Check(err, ErrorMatches, "The specified key does not exist.")
_, err = s.storage.bucket.Get("a/b c")
c.Check(err, ErrorMatches, "The specified key does not exist.")
s.AssertNoFile(c, "a/b+c")
s.AssertNoFile(c, "a/b c")
err = s.storage.Remove("a/b")
c.Check(err, IsNil)
_, err = s.storage.bucket.Get("a/b")
c.Check(err, ErrorMatches, "The specified key does not exist.")
s.AssertNoFile(c, "a/b")
}
func (s *PublishedStorageSuite) TestRemoveDirs(c *C) {
@@ -177,8 +185,7 @@ func (s *PublishedStorageSuite) TestRemoveDirs(c *C) {
paths := []string{"a", "b", "c", "testa", "test/a+1", "test/a 1", "lala/a+b", "lala/a b", "lala/c"}
for _, path := range paths {
err := s.storage.bucket.Put(path, []byte("test"), "binary/octet-stream", "private")
c.Check(err, IsNil)
s.PutFile(c, path, []byte("test"))
}
err := s.storage.RemoveDirs("test", nil)
@@ -192,8 +199,7 @@ func (s *PublishedStorageSuite) TestRemoveDirs(c *C) {
func (s *PublishedStorageSuite) TestRemoveDirsPlusWorkaround(c *C) {
paths := []string{"a", "b", "c", "testa", "test/a", "test/b", "lala/a", "lala/b", "lala/c"}
for _, path := range paths {
err := s.storage.bucket.Put(path, []byte("test"), "binary/octet-stream", "private")
c.Check(err, IsNil)
s.PutFile(c, path, []byte("test"))
}
err := s.storage.RemoveDirs("test", nil)
@@ -230,31 +236,23 @@ func (s *PublishedStorageSuite) TestLinkFromPool(c *C) {
err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), pool, sourcePath, "c1df1da7a1ce305a3b60af9d5733ac1d", false)
c.Check(err, IsNil)
data, err := s.storage.bucket.Get("pool/main/m/mars-invaders/mars-invaders_1.03.deb")
c.Check(err, IsNil)
c.Check(data, DeepEquals, []byte("Contents"))
c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents"))
// duplicate link from pool
err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), pool, sourcePath, "c1df1da7a1ce305a3b60af9d5733ac1d", false)
c.Check(err, IsNil)
data, err = s.storage.bucket.Get("pool/main/m/mars-invaders/mars-invaders_1.03.deb")
c.Check(err, IsNil)
c.Check(data, DeepEquals, []byte("Contents"))
c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents"))
// link from pool with conflict
err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), pool, sourcePath2, "e9dfd31cc505d51fc26975250750deab", false)
c.Check(err, ErrorMatches, ".*file already exists and is different.*")
data, err = s.storage.bucket.Get("pool/main/m/mars-invaders/mars-invaders_1.03.deb")
c.Check(err, IsNil)
c.Check(data, DeepEquals, []byte("Contents"))
c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Contents"))
// link from pool with conflict and force
err = s.storage.LinkFromPool(filepath.Join("", "pool", "main", "m/mars-invaders"), pool, sourcePath2, "e9dfd31cc505d51fc26975250750deab", true)
c.Check(err, IsNil)
data, err = s.storage.bucket.Get("pool/main/m/mars-invaders/mars-invaders_1.03.deb")
c.Check(err, IsNil)
c.Check(data, DeepEquals, []byte("Spam"))
c.Check(s.GetFile(c, "pool/main/m/mars-invaders/mars-invaders_1.03.deb"), DeepEquals, []byte("Spam"))
}
+699
View File
@@ -0,0 +1,699 @@
package s3
import (
"bytes"
"crypto/md5"
"encoding/hex"
"encoding/xml"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
)
const debug = true
type s3Error struct {
statusCode int
XMLName struct{} `xml:"Error"`
Code string
Message string
BucketName string
RequestId string
HostId string
}
type action struct {
srv *Server
w http.ResponseWriter
req *http.Request
reqId string
}
// Config controls the internal behaviour of the Server. A nil config is the default
// and behaves as if all configurations assume their default behaviour. Once passed
// to NewServer, the configuration must not be modified.
type Config struct {
// Send409Conflict controls how the Server will respond to calls to PUT on a
// previously existing bucket. The default is false, and corresponds to the
// us-east-1 s3 enpoint. Setting this value to true emulates the behaviour of
// all other regions.
// http://docs.amazonwebservices.com/AmazonS3/latest/API/ErrorResponses.html
Send409Conflict bool
}
func (c *Config) send409Conflict() bool {
if c != nil {
return c.Send409Conflict
}
return false
}
// Server is a fake S3 server for testing purposes.
// All of the data for the server is kept in memory.
type Server struct {
url string
reqId int
listener net.Listener
mu sync.Mutex
buckets map[string]*bucket
config *Config
}
type bucket struct {
name string
acl string
ctime time.Time
objects map[string]*object
}
type object struct {
name string
mtime time.Time
meta http.Header // metadata to return with requests.
checksum []byte // also held as Content-MD5 in meta.
data []byte
}
// A resource encapsulates the subject of an HTTP request.
// The resource referred to may or may not exist
// when the request is made.
type resource interface {
put(a *action) interface{}
get(a *action) interface{}
post(a *action) interface{}
delete(a *action) interface{}
}
func NewServer(config *Config) (*Server, error) {
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, fmt.Errorf("cannot listen on localhost: %v", err)
}
srv := &Server{
listener: l,
url: "http://" + l.Addr().String(),
buckets: make(map[string]*bucket),
config: config,
}
go http.Serve(l, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
srv.serveHTTP(w, req)
}))
return srv, nil
}
// Quit closes down the server.
func (srv *Server) Quit() {
srv.listener.Close()
}
// URL returns a URL for the server.
func (srv *Server) URL() string {
return srv.url
}
func fatalf(code int, codeStr string, errf string, a ...interface{}) {
panic(&s3Error{
statusCode: code,
Code: codeStr,
Message: fmt.Sprintf(errf, a...),
})
}
// serveHTTP serves the S3 protocol.
func (srv *Server) serveHTTP(w http.ResponseWriter, req *http.Request) {
// ignore error from ParseForm as it's usually spurious.
req.ParseForm()
srv.mu.Lock()
defer srv.mu.Unlock()
if debug {
log.Printf("s3test %q %q", req.Method, req.URL)
}
a := &action{
srv: srv,
w: w,
req: req,
reqId: fmt.Sprintf("%09X", srv.reqId),
}
srv.reqId++
var r resource
defer func() {
switch err := recover().(type) {
case *s3Error:
switch r := r.(type) {
case objectResource:
err.BucketName = r.bucket.name
case bucketResource:
err.BucketName = r.name
}
err.RequestId = a.reqId
// TODO HostId
w.Header().Set("Content-Type", `xml version="1.0" encoding="UTF-8"`)
w.WriteHeader(err.statusCode)
xmlMarshal(w, err)
case nil:
default:
panic(err)
}
}()
r = srv.resourceForURL(req.URL)
var resp interface{}
switch req.Method {
case "PUT":
resp = r.put(a)
case "GET", "HEAD":
resp = r.get(a)
case "DELETE":
resp = r.delete(a)
case "POST":
resp = r.post(a)
default:
fatalf(400, "MethodNotAllowed", "unknown http request method %q", req.Method)
}
if resp != nil && req.Method != "HEAD" {
xmlMarshal(w, resp)
}
}
// xmlMarshal is the same as xml.Marshal except that
// it panics on error. The marshalling should not fail,
// but we want to know if it does.
func xmlMarshal(w io.Writer, x interface{}) {
if err := xml.NewEncoder(w).Encode(x); err != nil {
panic(fmt.Errorf("error marshalling %#v: %v", x, err))
}
}
// In a fully implemented test server, each of these would have
// its own resource type.
var unimplementedBucketResourceNames = map[string]bool{
"acl": true,
"lifecycle": true,
"policy": true,
"location": true,
"logging": true,
"notification": true,
"versions": true,
"requestPayment": true,
"versioning": true,
"website": true,
"uploads": true,
}
var unimplementedObjectResourceNames = map[string]bool{
"uploadId": true,
"acl": true,
"torrent": true,
"uploads": true,
}
var pathRegexp = regexp.MustCompile("/(([^/]+)(/(.*))?)?")
// resourceForURL returns a resource object for the given URL.
func (srv *Server) resourceForURL(u *url.URL) (r resource) {
if u.Path == "/" {
return serviceResource{
buckets: srv.buckets,
}
}
m := pathRegexp.FindStringSubmatch(u.Path)
if m == nil {
fatalf(404, "InvalidURI", "Couldn't parse the specified URI")
}
bucketName := m[2]
objectName := m[4]
if bucketName == "" {
return nullResource{} // root
}
b := bucketResource{
name: bucketName,
bucket: srv.buckets[bucketName],
}
q := u.Query()
if objectName == "" {
for name := range q {
if unimplementedBucketResourceNames[name] {
return nullResource{}
}
}
return b
}
if b.bucket == nil {
fatalf(404, "NoSuchBucket", "The specified bucket does not exist")
}
objr := objectResource{
name: objectName,
version: q.Get("versionId"),
bucket: b.bucket,
}
for name := range q {
if unimplementedObjectResourceNames[name] {
return nullResource{}
}
}
if obj := objr.bucket.objects[objr.name]; obj != nil {
objr.object = obj
}
return objr
}
// nullResource has error stubs for all resource methods.
type nullResource struct{}
func notAllowed() interface{} {
fatalf(400, "MethodNotAllowed", "The specified method is not allowed against this resource")
return nil
}
func (nullResource) put(a *action) interface{} { return notAllowed() }
func (nullResource) get(a *action) interface{} { return notAllowed() }
func (nullResource) post(a *action) interface{} { return notAllowed() }
func (nullResource) delete(a *action) interface{} { return notAllowed() }
const timeFormat = "2006-01-02T15:04:05Z"
type serviceResource struct {
buckets map[string]*bucket
}
func (serviceResource) put(a *action) interface{} { return notAllowed() }
func (serviceResource) post(a *action) interface{} { return notAllowed() }
func (serviceResource) delete(a *action) interface{} { return notAllowed() }
// GET on an s3 service lists the buckets.
// http://docs.aws.amazon.com/AmazonS3/latest/API/RESTServiceGET.html
func (r serviceResource) get(a *action) interface{} {
type respBucket struct {
Name string
}
type response struct {
Buckets []respBucket `xml:">Bucket"`
}
resp := response{}
for _, bucketPtr := range r.buckets {
bkt := respBucket{
Name: bucketPtr.name,
}
resp.Buckets = append(resp.Buckets, bkt)
}
return &resp
}
type bucketResource struct {
name string
bucket *bucket // non-nil if the bucket already exists.
}
type Owner struct {
ID string
DisplayName string
}
// The ListResp type holds the results of a List bucket operation.
type ListResp struct {
Name string
Prefix string
Delimiter string
Marker string
NextMarker string
MaxKeys int
// IsTruncated is true if the results have been truncated because
// there are more keys and prefixes than can fit in MaxKeys.
// N.B. this is the opposite sense to that documented (incorrectly) in
// http://goo.gl/YjQTc
IsTruncated bool
Contents []Key
CommonPrefixes []string `xml:">Prefix"`
}
// The Key type represents an item stored in an S3 bucket.
type Key struct {
Key string
LastModified string
Size int64
// ETag gives the hex-encoded MD5 sum of the contents,
// surrounded with double-quotes.
ETag string
StorageClass string
Owner Owner
}
// GET on a bucket lists the objects in the bucket.
// http://docs.amazonwebservices.com/AmazonS3/latest/API/RESTBucketGET.html
func (r bucketResource) get(a *action) interface{} {
if r.bucket == nil {
fatalf(404, "NoSuchBucket", "The specified bucket does not exist")
}
delimiter := a.req.Form.Get("delimiter")
marker := a.req.Form.Get("marker")
maxKeys := -1
if s := a.req.Form.Get("max-keys"); s != "" {
i, err := strconv.Atoi(s)
if err != nil || i < 0 {
fatalf(400, "invalid value for max-keys: %q", s)
}
maxKeys = i
}
prefix := a.req.Form.Get("prefix")
a.w.Header().Set("Content-Type", "application/xml")
if a.req.Method == "HEAD" {
return nil
}
var objs orderedObjects
// first get all matching objects and arrange them in alphabetical order.
for name, obj := range r.bucket.objects {
if strings.HasPrefix(name, prefix) {
objs = append(objs, obj)
}
}
sort.Sort(objs)
if maxKeys <= 0 {
maxKeys = 1000
}
resp := &ListResp{
Name: r.bucket.name,
Prefix: prefix,
Delimiter: delimiter,
Marker: marker,
MaxKeys: maxKeys,
}
var prefixes []string
for _, obj := range objs {
if !strings.HasPrefix(obj.name, prefix) {
continue
}
name := obj.name
isPrefix := false
if delimiter != "" {
if i := strings.Index(obj.name[len(prefix):], delimiter); i >= 0 {
name = obj.name[:len(prefix)+i+len(delimiter)]
if prefixes != nil && prefixes[len(prefixes)-1] == name {
continue
}
isPrefix = true
}
}
if name <= marker {
continue
}
if len(resp.Contents)+len(prefixes) >= maxKeys {
resp.IsTruncated = true
break
}
if isPrefix {
prefixes = append(prefixes, name)
} else {
// Contents contains only keys not found in CommonPrefixes
resp.Contents = append(resp.Contents, obj.s3Key())
}
}
resp.CommonPrefixes = prefixes
return resp
}
// orderedObjects holds a slice of objects that can be sorted
// by name.
type orderedObjects []*object
func (s orderedObjects) Len() int {
return len(s)
}
func (s orderedObjects) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s orderedObjects) Less(i, j int) bool {
return s[i].name < s[j].name
}
func (obj *object) s3Key() Key {
return Key{
Key: obj.name,
LastModified: obj.mtime.Format(timeFormat),
Size: int64(len(obj.data)),
ETag: fmt.Sprintf(`"%x"`, obj.checksum),
// TODO StorageClass
// TODO Owner
}
}
// DELETE on a bucket deletes the bucket if it's not empty.
func (r bucketResource) delete(a *action) interface{} {
b := r.bucket
if b == nil {
fatalf(404, "NoSuchBucket", "The specified bucket does not exist")
}
if len(b.objects) > 0 {
fatalf(400, "BucketNotEmpty", "The bucket you tried to delete is not empty")
}
delete(a.srv.buckets, b.name)
return nil
}
// PUT on a bucket creates the bucket.
// http://docs.amazonwebservices.com/AmazonS3/latest/API/RESTBucketPUT.html
func (r bucketResource) put(a *action) interface{} {
var created bool
if r.bucket == nil {
if !validBucketName(r.name) {
fatalf(400, "InvalidBucketName", "The specified bucket is not valid")
}
if loc := locationConstraint(a); loc == "" {
fatalf(400, "InvalidRequets", "The unspecified location constraint is incompatible for the region specific endpoint this request was sent to.")
}
// TODO validate acl
r.bucket = &bucket{
name: r.name,
// TODO default acl
objects: make(map[string]*object),
}
a.srv.buckets[r.name] = r.bucket
created = true
}
if !created && a.srv.config.send409Conflict() {
fatalf(409, "BucketAlreadyOwnedByYou", "Your previous request to create the named bucket succeeded and you already own it.")
}
r.bucket.acl = a.req.Header.Get("x-amz-acl")
return nil
}
func (bucketResource) post(a *action) interface{} {
fatalf(400, "Method", "bucket POST method not available")
return nil
}
// validBucketName returns whether name is a valid bucket name.
// Here are the rules, from:
// http://docs.amazonwebservices.com/AmazonS3/2006-03-01/dev/BucketRestrictions.html
//
// Can contain lowercase letters, numbers, periods (.), underscores (_),
// and dashes (-). You can use uppercase letters for buckets only in the
// US Standard region.
//
// Must start with a number or letter
//
// Must be between 3 and 255 characters long
//
// There's one extra rule (Must not be formatted as an IP address (e.g., 192.168.5.4)
// but the real S3 server does not seem to check that rule, so we will not
// check it either.
//
func validBucketName(name string) bool {
if len(name) < 3 || len(name) > 255 {
return false
}
r := name[0]
if !(r >= '0' && r <= '9' || r >= 'a' && r <= 'z') {
return false
}
for _, r := range name {
switch {
case r >= '0' && r <= '9':
case r >= 'a' && r <= 'z':
case r == '_' || r == '-':
case r == '.':
default:
return false
}
}
return true
}
var responseParams = map[string]bool{
"content-type": true,
"content-language": true,
"expires": true,
"cache-control": true,
"content-disposition": true,
"content-encoding": true,
}
type objectResource struct {
name string
version string
bucket *bucket // always non-nil.
object *object // may be nil.
}
// GET on an object gets the contents of the object.
// http://docs.amazonwebservices.com/AmazonS3/latest/API/RESTObjectGET.html
func (objr objectResource) get(a *action) interface{} {
obj := objr.object
if obj == nil {
fatalf(404, "NoSuchKey", "The specified key does not exist.")
}
h := a.w.Header()
// add metadata
for name, d := range obj.meta {
h[name] = d
}
// override header values in response to request parameters.
for name, vals := range a.req.Form {
if strings.HasPrefix(name, "response-") {
name = name[len("response-"):]
if !responseParams[name] {
continue
}
h.Set(name, vals[0])
}
}
if r := a.req.Header.Get("Range"); r != "" {
fatalf(400, "NotImplemented", "range unimplemented")
}
// TODO Last-Modified-Since
// TODO If-Modified-Since
// TODO If-Unmodified-Since
// TODO If-Match
// TODO If-None-Match
// TODO Connection: close ??
// TODO x-amz-request-id
h.Set("Content-Length", fmt.Sprint(len(obj.data)))
h.Set("ETag", hex.EncodeToString(obj.checksum))
h.Set("Last-Modified", obj.mtime.UTC().Format(http.TimeFormat))
if a.req.Method == "HEAD" {
return nil
}
// TODO avoid holding the lock when writing data.
_, err := a.w.Write(obj.data)
if err != nil {
// we can't do much except just log the fact.
log.Printf("error writing data: %v", err)
}
return nil
}
var metaHeaders = map[string]bool{
"Content-MD5": true,
"x-amz-acl": true,
"Content-Type": true,
"Content-Encoding": true,
"Content-Disposition": true,
}
// PUT on an object creates the object.
func (objr objectResource) put(a *action) interface{} {
// TODO Cache-Control header
// TODO Expires header
// TODO x-amz-server-side-encryption
// TODO x-amz-storage-class
// TODO is this correct, or should we erase all previous metadata?
obj := objr.object
if obj == nil {
obj = &object{
name: objr.name,
meta: make(http.Header),
}
}
var expectHash []byte
if c := a.req.Header.Get("Content-MD5"); c != "" {
var err error
expectHash, err = hex.DecodeString(c)
if err != nil || len(expectHash) != md5.Size {
fatalf(400, "InvalidDigest", "The Content-MD5 you specified was invalid")
}
}
sum := md5.New()
// TODO avoid holding lock while reading data.
data, err := ioutil.ReadAll(io.TeeReader(a.req.Body, sum))
if err != nil {
fatalf(400, "TODO", "read error")
}
gotHash := sum.Sum(nil)
if expectHash != nil && bytes.Compare(gotHash, expectHash) != 0 {
fatalf(400, "BadDigest", "The Content-MD5 you specified did not match what we received")
}
if a.req.ContentLength >= 0 && int64(len(data)) != a.req.ContentLength {
fatalf(400, "IncompleteBody", "You did not provide the number of bytes specified by the Content-Length HTTP header")
}
// PUT request has been successful - save data and metadata
for key, values := range a.req.Header {
key = http.CanonicalHeaderKey(key)
if metaHeaders[key] || strings.HasPrefix(key, "X-Amz-Meta-") {
obj.meta[key] = values
}
}
obj.data = data
obj.checksum = gotHash
obj.mtime = time.Now()
objr.bucket.objects[objr.name] = obj
return nil
}
func (objr objectResource) delete(a *action) interface{} {
delete(objr.bucket.objects, objr.name)
return nil
}
func (objr objectResource) post(a *action) interface{} {
fatalf(400, "MethodNotAllowed", "The specified method is not allowed against this resource")
return nil
}
type CreateBucketConfiguration struct {
LocationConstraint string
}
// locationConstraint parses the <CreateBucketConfiguration /> request body (if present).
// If there is no body, an empty string will be returned.
func locationConstraint(a *action) string {
var body bytes.Buffer
if _, err := io.Copy(&body, a.req.Body); err != nil {
fatalf(400, "InvalidRequest", err.Error())
}
if body.Len() == 0 {
return ""
}
var loc CreateBucketConfiguration
if err := xml.NewDecoder(&body).Decode(&loc); err != nil {
fatalf(400, "InvalidRequest", err.Error())
}
return loc.LocationConstraint
}
+1
View File
@@ -32,6 +32,7 @@ type S3PublishRoot struct {
Endpoint string `json:"endpoint"`
AccessKeyID string `json:"awsAccessKeyID"`
SecretAccessKey string `json:"awsSecretAccessKey"`
SessionToken string `json:"awsSessionToken"`
Prefix string `json:"prefix"`
ACL string `json:"acl"`
StorageClass string `json:"storageClass"`
+1
View File
@@ -69,6 +69,7 @@ func (s *ConfigSuite) TestSaveConfig(c *C) {
" \"endpoint\": \"\",\n"+
" \"awsAccessKeyID\": \"\",\n"+
" \"awsSecretAccessKey\": \"\",\n"+
" \"awsSessionToken\": \"\",\n"+
" \"prefix\": \"\",\n"+
" \"acl\": \"\",\n"+
" \"storageClass\": \"\",\n"+