mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-05-30 04:20:53 +00:00
Update vendored deps, including AWS SDK, openpgp, ftp, ...
This commit is contained in:
+32
-8
@@ -60,7 +60,15 @@ func newError(err error, bucket, key *string) Error {
|
||||
}
|
||||
|
||||
func (err *Error) Error() string {
|
||||
return fmt.Sprintf("failed to upload %q to %q:\n%s", err.Key, err.Bucket, err.OrigErr.Error())
|
||||
origErr := ""
|
||||
if err.OrigErr != nil {
|
||||
origErr = ":\n" + err.OrigErr.Error()
|
||||
}
|
||||
return fmt.Sprintf("failed to perform batch operation on %q to %q%s",
|
||||
aws.StringValue(err.Key),
|
||||
aws.StringValue(err.Bucket),
|
||||
origErr,
|
||||
)
|
||||
}
|
||||
|
||||
// NewBatchError will return a BatchError that satisfies the awserr.Error interface.
|
||||
@@ -206,7 +214,7 @@ type BatchDelete struct {
|
||||
// },
|
||||
// }
|
||||
//
|
||||
// if err := batcher.Delete(&s3manager.DeleteObjectsIterator{
|
||||
// if err := batcher.Delete(aws.BackgroundContext(), &s3manager.DeleteObjectsIterator{
|
||||
// Objects: objects,
|
||||
// }); err != nil {
|
||||
// return err
|
||||
@@ -239,7 +247,7 @@ func NewBatchDeleteWithClient(client s3iface.S3API, options ...func(*BatchDelete
|
||||
// },
|
||||
// }
|
||||
//
|
||||
// if err := batcher.Delete(&s3manager.DeleteObjectsIterator{
|
||||
// if err := batcher.Delete(aws.BackgroundContext(), &s3manager.DeleteObjectsIterator{
|
||||
// Objects: objects,
|
||||
// }); err != nil {
|
||||
// return err
|
||||
@@ -312,7 +320,7 @@ func (d *BatchDelete) Delete(ctx aws.Context, iter BatchDeleteIterator) error {
|
||||
}
|
||||
|
||||
if len(input.Delete.Objects) == d.BatchSize || !parity {
|
||||
if err := deleteBatch(d, input, objects); err != nil {
|
||||
if err := deleteBatch(ctx, d, input, objects); err != nil {
|
||||
errs = append(errs, err...)
|
||||
}
|
||||
|
||||
@@ -331,7 +339,7 @@ func (d *BatchDelete) Delete(ctx aws.Context, iter BatchDeleteIterator) error {
|
||||
}
|
||||
|
||||
if input != nil && len(input.Delete.Objects) > 0 {
|
||||
if err := deleteBatch(d, input, objects); err != nil {
|
||||
if err := deleteBatch(ctx, d, input, objects); err != nil {
|
||||
errs = append(errs, err...)
|
||||
}
|
||||
}
|
||||
@@ -351,17 +359,33 @@ func initDeleteObjectsInput(o *s3.DeleteObjectInput) *s3.DeleteObjectsInput {
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
// ErrDeleteBatchFailCode represents an error code which will be returned
|
||||
// only when DeleteObjects.Errors has an error that does not contain a code.
|
||||
ErrDeleteBatchFailCode = "DeleteBatchError"
|
||||
errDefaultDeleteBatchMessage = "failed to delete"
|
||||
)
|
||||
|
||||
// deleteBatch will delete a batch of items in the objects parameters.
|
||||
func deleteBatch(d *BatchDelete, input *s3.DeleteObjectsInput, objects []BatchDeleteObject) []Error {
|
||||
func deleteBatch(ctx aws.Context, d *BatchDelete, input *s3.DeleteObjectsInput, objects []BatchDeleteObject) []Error {
|
||||
errs := []Error{}
|
||||
|
||||
if result, err := d.Client.DeleteObjects(input); err != nil {
|
||||
if result, err := d.Client.DeleteObjectsWithContext(ctx, input); err != nil {
|
||||
for i := 0; i < len(input.Delete.Objects); i++ {
|
||||
errs = append(errs, newError(err, input.Bucket, input.Delete.Objects[i].Key))
|
||||
}
|
||||
} else if len(result.Errors) > 0 {
|
||||
for i := 0; i < len(result.Errors); i++ {
|
||||
errs = append(errs, newError(err, input.Bucket, result.Errors[i].Key))
|
||||
code := ErrDeleteBatchFailCode
|
||||
msg := errDefaultDeleteBatchMessage
|
||||
if result.Errors[i].Message != nil {
|
||||
msg = *result.Errors[i].Message
|
||||
}
|
||||
if result.Errors[i].Code != nil {
|
||||
code = *result.Errors[i].Code
|
||||
}
|
||||
|
||||
errs = append(errs, newError(awserr.New(code, msg, err), input.Bucket, result.Errors[i].Key))
|
||||
}
|
||||
}
|
||||
for _, object := range objects {
|
||||
|
||||
+116
@@ -0,0 +1,116 @@
|
||||
// +build go1.7
|
||||
|
||||
package s3manager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
)
|
||||
|
||||
// #1790 bug
|
||||
func TestBatchDeleteContext(t *testing.T) {
|
||||
cases := []struct {
|
||||
objects []BatchDeleteObject
|
||||
size int
|
||||
expected int
|
||||
ctx aws.Context
|
||||
closeAt int
|
||||
errCheck func(error) (string, bool)
|
||||
}{
|
||||
{
|
||||
[]BatchDeleteObject{
|
||||
{
|
||||
Object: &s3.DeleteObjectInput{
|
||||
Key: aws.String("1"),
|
||||
Bucket: aws.String("bucket1"),
|
||||
},
|
||||
},
|
||||
{
|
||||
Object: &s3.DeleteObjectInput{
|
||||
Key: aws.String("2"),
|
||||
Bucket: aws.String("bucket2"),
|
||||
},
|
||||
},
|
||||
{
|
||||
Object: &s3.DeleteObjectInput{
|
||||
Key: aws.String("3"),
|
||||
Bucket: aws.String("bucket3"),
|
||||
},
|
||||
},
|
||||
{
|
||||
Object: &s3.DeleteObjectInput{
|
||||
Key: aws.String("4"),
|
||||
Bucket: aws.String("bucket4"),
|
||||
},
|
||||
},
|
||||
},
|
||||
1,
|
||||
0,
|
||||
aws.BackgroundContext(),
|
||||
0,
|
||||
func(err error) (string, bool) {
|
||||
batchErr, ok := err.(*BatchError)
|
||||
if !ok {
|
||||
return "not BatchError type", false
|
||||
}
|
||||
|
||||
errs := batchErr.Errors
|
||||
if len(errs) != 4 {
|
||||
return fmt.Sprintf("expected 1, but received %d", len(errs)), false
|
||||
}
|
||||
|
||||
for _, tempErr := range errs {
|
||||
aerr, ok := tempErr.OrigErr.(awserr.Error)
|
||||
if !ok {
|
||||
return "not awserr.Error type", false
|
||||
}
|
||||
|
||||
if code := aerr.Code(); code != request.CanceledErrorCode {
|
||||
return fmt.Sprintf("expected %q, but received %q", request.CanceledErrorCode, code), false
|
||||
}
|
||||
}
|
||||
return "", true
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
count := 0
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
count++
|
||||
}))
|
||||
|
||||
svc := &mockS3Client{S3: buildS3SvcClient(server.URL)}
|
||||
for i, c := range cases {
|
||||
ctx, done := context.WithCancel(c.ctx)
|
||||
defer done()
|
||||
if i == c.closeAt {
|
||||
done()
|
||||
}
|
||||
|
||||
batcher := BatchDelete{
|
||||
Client: svc,
|
||||
BatchSize: c.size,
|
||||
}
|
||||
|
||||
err := batcher.Delete(ctx, &DeleteObjectsIterator{Objects: c.objects})
|
||||
|
||||
if msg, ok := c.errCheck(err); !ok {
|
||||
t.Error(msg)
|
||||
}
|
||||
|
||||
if count != c.expected {
|
||||
t.Errorf("Case %d: expected %d, but received %d", i, c.expected, count)
|
||||
}
|
||||
|
||||
count = 0
|
||||
}
|
||||
}
|
||||
+134
-2
@@ -10,6 +10,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/awstesting/unit"
|
||||
@@ -309,10 +310,101 @@ func TestBatchDelete(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestBatchDeleteError(t *testing.T) {
|
||||
cases := []struct {
|
||||
objects []BatchDeleteObject
|
||||
output s3.DeleteObjectsOutput
|
||||
size int
|
||||
expectedErrCode string
|
||||
expectedErrMessage string
|
||||
}{
|
||||
{
|
||||
[]BatchDeleteObject{
|
||||
{
|
||||
Object: &s3.DeleteObjectInput{
|
||||
Key: aws.String("1"),
|
||||
Bucket: aws.String("bucket1"),
|
||||
},
|
||||
},
|
||||
},
|
||||
s3.DeleteObjectsOutput{
|
||||
Errors: []*s3.Error{
|
||||
{
|
||||
Code: aws.String("foo code"),
|
||||
Message: aws.String("foo error"),
|
||||
},
|
||||
},
|
||||
},
|
||||
1,
|
||||
"foo code",
|
||||
"foo error",
|
||||
},
|
||||
{
|
||||
[]BatchDeleteObject{
|
||||
{
|
||||
Object: &s3.DeleteObjectInput{
|
||||
Key: aws.String("1"),
|
||||
Bucket: aws.String("bucket1"),
|
||||
},
|
||||
},
|
||||
},
|
||||
s3.DeleteObjectsOutput{
|
||||
Errors: []*s3.Error{
|
||||
{},
|
||||
},
|
||||
},
|
||||
1,
|
||||
ErrDeleteBatchFailCode,
|
||||
errDefaultDeleteBatchMessage,
|
||||
},
|
||||
}
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}))
|
||||
|
||||
index := 0
|
||||
svc := &mockS3Client{
|
||||
S3: buildS3SvcClient(server.URL),
|
||||
deleteObjects: func(input *s3.DeleteObjectsInput) (*s3.DeleteObjectsOutput, error) {
|
||||
output := &cases[index].output
|
||||
index++
|
||||
return output, nil
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
batcher := BatchDelete{
|
||||
Client: svc,
|
||||
BatchSize: c.size,
|
||||
}
|
||||
|
||||
err := batcher.Delete(aws.BackgroundContext(), &DeleteObjectsIterator{Objects: c.objects})
|
||||
if err == nil {
|
||||
t.Errorf("expected error, but received none")
|
||||
}
|
||||
|
||||
berr := err.(*BatchError)
|
||||
|
||||
if len(berr.Errors) != 1 {
|
||||
t.Errorf("expected 1 error, but received %d", len(berr.Errors))
|
||||
}
|
||||
|
||||
aerr := berr.Errors[0].OrigErr.(awserr.Error)
|
||||
if e, a := c.expectedErrCode, aerr.Code(); e != a {
|
||||
t.Errorf("expected %q, but received %q", e, a)
|
||||
}
|
||||
|
||||
if e, a := c.expectedErrMessage, aerr.Message(); e != a {
|
||||
t.Errorf("expected %q, but received %q", e, a)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type mockS3Client struct {
|
||||
*s3.S3
|
||||
index int
|
||||
objects []*s3.ListObjectsOutput
|
||||
index int
|
||||
objects []*s3.ListObjectsOutput
|
||||
deleteObjects func(*s3.DeleteObjectsInput) (*s3.DeleteObjectsOutput, error)
|
||||
}
|
||||
|
||||
func (client *mockS3Client) ListObjects(input *s3.ListObjectsInput) (*s3.ListObjectsOutput, error) {
|
||||
@@ -321,6 +413,46 @@ func (client *mockS3Client) ListObjects(input *s3.ListObjectsInput) (*s3.ListObj
|
||||
return object, nil
|
||||
}
|
||||
|
||||
func (client *mockS3Client) DeleteObjects(input *s3.DeleteObjectsInput) (*s3.DeleteObjectsOutput, error) {
|
||||
if client.deleteObjects == nil {
|
||||
return client.S3.DeleteObjectsWithContext(aws.BackgroundContext(), input)
|
||||
}
|
||||
|
||||
return client.deleteObjects(input)
|
||||
}
|
||||
|
||||
func (client *mockS3Client) DeleteObjectsWithContext(ctx aws.Context, input *s3.DeleteObjectsInput, opt ...request.Option) (*s3.DeleteObjectsOutput, error) {
|
||||
if client.deleteObjects == nil {
|
||||
return client.S3.DeleteObjectsWithContext(ctx, input)
|
||||
}
|
||||
|
||||
return client.deleteObjects(input)
|
||||
}
|
||||
|
||||
func TestNilOrigError(t *testing.T) {
|
||||
err := Error{
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("key"),
|
||||
}
|
||||
errStr := err.Error()
|
||||
const expected1 = `failed to perform batch operation on "key" to "bucket"`
|
||||
if errStr != expected1 {
|
||||
t.Errorf("Expected %s, but received %s", expected1, errStr)
|
||||
}
|
||||
|
||||
err = Error{
|
||||
OrigErr: errors.New("foo"),
|
||||
Bucket: aws.String("bucket"),
|
||||
Key: aws.String("key"),
|
||||
}
|
||||
errStr = err.Error()
|
||||
const expected2 = "failed to perform batch operation on \"key\" to \"bucket\":\nfoo"
|
||||
if errStr != expected2 {
|
||||
t.Errorf("Expected %s, but received %s", expected2, errStr)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestBatchDeleteList(t *testing.T) {
|
||||
count := 0
|
||||
|
||||
|
||||
+10
-5
@@ -14,8 +14,11 @@ import (
|
||||
//
|
||||
// The request will not be signed, and will not use your AWS credentials.
|
||||
//
|
||||
// A "NotFound" error code will be returned if the bucket does not exist in
|
||||
// the AWS partition the regionHint belongs to.
|
||||
// A "NotFound" error code will be returned if the bucket does not exist in the
|
||||
// AWS partition the regionHint belongs to. If the regionHint parameter is an
|
||||
// empty string GetBucketRegion will fallback to the ConfigProvider's region
|
||||
// config. If the regionHint is empty, and the ConfigProvider does not have a
|
||||
// region value, an error will be returned..
|
||||
//
|
||||
// For example to get the region of a bucket which exists in "eu-central-1"
|
||||
// you could provide a region hint of "us-west-2".
|
||||
@@ -33,9 +36,11 @@ import (
|
||||
// fmt.Printf("Bucket %s is in %s region\n", bucket, region)
|
||||
//
|
||||
func GetBucketRegion(ctx aws.Context, c client.ConfigProvider, bucket, regionHint string, opts ...request.Option) (string, error) {
|
||||
svc := s3.New(c, &aws.Config{
|
||||
Region: aws.String(regionHint),
|
||||
})
|
||||
var cfg aws.Config
|
||||
if len(regionHint) != 0 {
|
||||
cfg.Region = aws.String(regionHint)
|
||||
}
|
||||
svc := s3.New(c, &cfg)
|
||||
return GetBucketRegionWithClient(ctx, svc, bucket, opts...)
|
||||
}
|
||||
|
||||
|
||||
+12
-8
@@ -21,12 +21,15 @@ func testSetupGetBucketRegionServer(region string, statusCode int, incHeader boo
|
||||
}
|
||||
|
||||
var testGetBucketRegionCases = []struct {
|
||||
RespRegion string
|
||||
StatusCode int
|
||||
RespRegion string
|
||||
StatusCode int
|
||||
HintRegion string
|
||||
ExpectReqRegion string
|
||||
}{
|
||||
{"bucket-region", 301},
|
||||
{"bucket-region", 403},
|
||||
{"bucket-region", 200},
|
||||
{"bucket-region", 301, "hint-region", ""},
|
||||
{"bucket-region", 403, "hint-region", ""},
|
||||
{"bucket-region", 200, "hint-region", ""},
|
||||
{"bucket-region", 200, "", "default-region"},
|
||||
}
|
||||
|
||||
func TestGetBucketRegion_Exists(t *testing.T) {
|
||||
@@ -34,11 +37,12 @@ func TestGetBucketRegion_Exists(t *testing.T) {
|
||||
server := testSetupGetBucketRegionServer(c.RespRegion, c.StatusCode, true)
|
||||
|
||||
sess := unit.Session.Copy()
|
||||
sess.Config.Region = aws.String("default-region")
|
||||
sess.Config.Endpoint = aws.String(server.URL)
|
||||
sess.Config.DisableSSL = aws.Bool(true)
|
||||
|
||||
ctx := aws.BackgroundContext()
|
||||
region, err := GetBucketRegion(ctx, sess, "bucket", "region")
|
||||
region, err := GetBucketRegion(ctx, sess, "bucket", c.HintRegion)
|
||||
if err != nil {
|
||||
t.Fatalf("%d, expect no error, got %v", i, err)
|
||||
}
|
||||
@@ -56,7 +60,7 @@ func TestGetBucketRegion_NotExists(t *testing.T) {
|
||||
sess.Config.DisableSSL = aws.Bool(true)
|
||||
|
||||
ctx := aws.BackgroundContext()
|
||||
region, err := GetBucketRegion(ctx, sess, "bucket", "region")
|
||||
region, err := GetBucketRegion(ctx, sess, "bucket", "hint-region")
|
||||
if err == nil {
|
||||
t.Fatalf("expect error, but did not get one")
|
||||
}
|
||||
@@ -74,7 +78,7 @@ func TestGetBucketRegionWithClient(t *testing.T) {
|
||||
server := testSetupGetBucketRegionServer(c.RespRegion, c.StatusCode, true)
|
||||
|
||||
svc := s3.New(unit.Session, &aws.Config{
|
||||
Region: aws.String("region"),
|
||||
Region: aws.String("hint-region"),
|
||||
Endpoint: aws.String(server.URL),
|
||||
DisableSSL: aws.Bool(true),
|
||||
})
|
||||
|
||||
+26
-16
@@ -117,6 +117,9 @@ type UploadInput struct {
|
||||
// The language the content is in.
|
||||
ContentLanguage *string `location:"header" locationName:"Content-Language" type:"string"`
|
||||
|
||||
// The base64-encoded 128-bit MD5 digest of the part data.
|
||||
ContentMD5 *string `location:"header" locationName:"Content-MD5" type:"string"`
|
||||
|
||||
// A standard MIME type describing the format of the object data.
|
||||
ContentType *string `location:"header" locationName:"Content-Type" type:"string"`
|
||||
|
||||
@@ -440,6 +443,8 @@ type uploader struct {
|
||||
|
||||
readerPos int64 // current reader position
|
||||
totalSize int64 // set to -1 if the size is not known
|
||||
|
||||
bufferPool sync.Pool
|
||||
}
|
||||
|
||||
// internal logic for deciding whether to upload a single part or use a
|
||||
@@ -453,7 +458,7 @@ func (u *uploader) upload() (*UploadOutput, error) {
|
||||
}
|
||||
|
||||
// Do one read to determine if we have more than one part
|
||||
reader, _, err := u.nextReader()
|
||||
reader, _, part, err := u.nextReader()
|
||||
if err == io.EOF { // single part
|
||||
return u.singlePart(reader)
|
||||
} else if err != nil {
|
||||
@@ -461,7 +466,7 @@ func (u *uploader) upload() (*UploadOutput, error) {
|
||||
}
|
||||
|
||||
mu := multiuploader{uploader: u}
|
||||
return mu.upload(reader)
|
||||
return mu.upload(reader, part)
|
||||
}
|
||||
|
||||
// init will initialize all default options.
|
||||
@@ -473,6 +478,10 @@ func (u *uploader) init() {
|
||||
u.cfg.PartSize = DefaultUploadPartSize
|
||||
}
|
||||
|
||||
u.bufferPool = sync.Pool{
|
||||
New: func() interface{} { return make([]byte, u.cfg.PartSize) },
|
||||
}
|
||||
|
||||
// Try to get the total size for some optimizations
|
||||
u.initSize()
|
||||
}
|
||||
@@ -484,10 +493,7 @@ func (u *uploader) initSize() {
|
||||
|
||||
switch r := u.in.Body.(type) {
|
||||
case io.Seeker:
|
||||
pos, _ := r.Seek(0, 1)
|
||||
defer r.Seek(pos, 0)
|
||||
|
||||
n, err := r.Seek(0, 2)
|
||||
n, err := aws.SeekerLen(r)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -507,7 +513,7 @@ func (u *uploader) initSize() {
|
||||
// This operation increases the shared u.readerPos counter, but note that it
|
||||
// does not need to be wrapped in a mutex because nextReader is only called
|
||||
// from the main thread.
|
||||
func (u *uploader) nextReader() (io.ReadSeeker, int, error) {
|
||||
func (u *uploader) nextReader() (io.ReadSeeker, int, []byte, error) {
|
||||
type readerAtSeeker interface {
|
||||
io.ReaderAt
|
||||
io.ReadSeeker
|
||||
@@ -529,14 +535,14 @@ func (u *uploader) nextReader() (io.ReadSeeker, int, error) {
|
||||
reader := io.NewSectionReader(r, u.readerPos, n)
|
||||
u.readerPos += n
|
||||
|
||||
return reader, int(n), err
|
||||
return reader, int(n), nil, err
|
||||
|
||||
default:
|
||||
part := make([]byte, u.cfg.PartSize)
|
||||
part := u.bufferPool.Get().([]byte)
|
||||
n, err := readFillBuf(r, part)
|
||||
u.readerPos += int64(n)
|
||||
|
||||
return bytes.NewReader(part[0:n]), n, err
|
||||
return bytes.NewReader(part[0:n]), n, part, err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -586,8 +592,9 @@ type multiuploader struct {
|
||||
|
||||
// keeps track of a single chunk of data being sent to S3.
|
||||
type chunk struct {
|
||||
buf io.ReadSeeker
|
||||
num int64
|
||||
buf io.ReadSeeker
|
||||
part []byte
|
||||
num int64
|
||||
}
|
||||
|
||||
// completedParts is a wrapper to make parts sortable by their part number,
|
||||
@@ -600,7 +607,7 @@ func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].Pa
|
||||
|
||||
// upload will perform a multipart upload using the firstBuf buffer containing
|
||||
// the first chunk of data.
|
||||
func (u *multiuploader) upload(firstBuf io.ReadSeeker) (*UploadOutput, error) {
|
||||
func (u *multiuploader) upload(firstBuf io.ReadSeeker, firstPart []byte) (*UploadOutput, error) {
|
||||
params := &s3.CreateMultipartUploadInput{}
|
||||
awsutil.Copy(params, u.in)
|
||||
|
||||
@@ -620,7 +627,7 @@ func (u *multiuploader) upload(firstBuf io.ReadSeeker) (*UploadOutput, error) {
|
||||
|
||||
// Send part 1 to the workers
|
||||
var num int64 = 1
|
||||
ch <- chunk{buf: firstBuf, num: num}
|
||||
ch <- chunk{buf: firstBuf, part: firstPart, num: num}
|
||||
|
||||
// Read and queue the rest of the parts
|
||||
for u.geterr() == nil && err == nil {
|
||||
@@ -641,7 +648,8 @@ func (u *multiuploader) upload(firstBuf io.ReadSeeker) (*UploadOutput, error) {
|
||||
|
||||
var reader io.ReadSeeker
|
||||
var nextChunkLen int
|
||||
reader, nextChunkLen, err = u.nextReader()
|
||||
var part []byte
|
||||
reader, nextChunkLen, part, err = u.nextReader()
|
||||
|
||||
if err != nil && err != io.EOF {
|
||||
u.seterr(awserr.New(
|
||||
@@ -658,7 +666,7 @@ func (u *multiuploader) upload(firstBuf io.ReadSeeker) (*UploadOutput, error) {
|
||||
break
|
||||
}
|
||||
|
||||
ch <- chunk{buf: reader, num: num}
|
||||
ch <- chunk{buf: reader, part: part, num: num}
|
||||
}
|
||||
|
||||
// Close the channel, wait for workers, and complete upload
|
||||
@@ -714,6 +722,8 @@ func (u *multiuploader) send(c chunk) error {
|
||||
PartNumber: &c.num,
|
||||
}
|
||||
resp, err := u.cfg.S3.UploadPartWithContext(u.ctx, params, u.cfg.RequestOptions...)
|
||||
// put the byte array back into the pool to conserve memory
|
||||
u.bufferPool.Put(c.part)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user