mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-01-12 03:21:33 +00:00
Remove S3 retrying client which is leftover from goamz times.
Also workaround go vet warnings in s3/sever_test.go
This commit is contained in:
@@ -69,8 +69,7 @@ func NewPublishedStorage(accessKey, secretKey, sessionToken, region, endpoint, b
|
||||
storageClass, encryptionMethod string, plusWorkaround, disableMultiDel bool) (*PublishedStorage, error) {
|
||||
|
||||
config := &aws.Config{
|
||||
HTTPClient: RetryingClient,
|
||||
Region: aws.String(region),
|
||||
Region: aws.String(region),
|
||||
}
|
||||
|
||||
if endpoint != "" {
|
||||
|
||||
121
s3/retry.go
121
s3/retry.go
@@ -1,121 +0,0 @@
|
||||
package s3
|
||||
|
||||
// This was taken from github.com/mitchellh/goamz/amz/client.go:
|
||||
|
||||
import (
|
||||
"math"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
type RetryableFunc func(*http.Request, *http.Response, error) bool
|
||||
type WaitFunc func(try int)
|
||||
type DeadlineFunc func() time.Time
|
||||
|
||||
type ResilientTransport struct {
|
||||
// Timeout is the maximum amount of time a dial will wait for
|
||||
// a connect to complete.
|
||||
//
|
||||
// The default is no timeout.
|
||||
//
|
||||
// With or without a timeout, the operating system may impose
|
||||
// its own earlier timeout. For instance, TCP timeouts are
|
||||
// often around 3 minutes.
|
||||
DialTimeout time.Duration
|
||||
|
||||
// MaxTries, if non-zero, specifies the number of times we will retry on
|
||||
// failure. Retries are only attempted for temporary network errors or known
|
||||
// safe failures.
|
||||
MaxTries int
|
||||
ShouldRetry RetryableFunc
|
||||
Wait WaitFunc
|
||||
transport *http.Transport
|
||||
}
|
||||
|
||||
// Convenience method for creating an http client
|
||||
func NewClient(rt *ResilientTransport) *http.Client {
|
||||
rt.transport = &http.Transport{
|
||||
Dial: func(netw, addr string) (net.Conn, error) {
|
||||
c, err := net.DialTimeout(netw, addr, rt.DialTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c, nil
|
||||
},
|
||||
DisableKeepAlives: true,
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
}
|
||||
// TODO: Would be nice is ResilientTransport allowed clients to initialize
|
||||
// with http.Transport attributes.
|
||||
return &http.Client{
|
||||
Transport: rt,
|
||||
}
|
||||
}
|
||||
|
||||
var retryingTransport = &ResilientTransport{
|
||||
DialTimeout: 15 * time.Second,
|
||||
MaxTries: 3,
|
||||
ShouldRetry: awsRetry,
|
||||
Wait: ExpBackoff,
|
||||
}
|
||||
|
||||
// Exported default client
|
||||
var RetryingClient = NewClient(retryingTransport)
|
||||
|
||||
func (t *ResilientTransport) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
return t.tries(req)
|
||||
}
|
||||
|
||||
// Retry a request a maximum of t.MaxTries times.
|
||||
// We'll only retry if the proper criteria are met.
|
||||
// If a wait function is specified, wait that amount of time
|
||||
// In between requests.
|
||||
func (t *ResilientTransport) tries(req *http.Request) (res *http.Response, err error) {
|
||||
for try := 0; try < t.MaxTries; try += 1 {
|
||||
res, err = t.transport.RoundTrip(req)
|
||||
|
||||
if !t.ShouldRetry(req, res, err) {
|
||||
break
|
||||
}
|
||||
if try == (t.MaxTries - 1) {
|
||||
break
|
||||
}
|
||||
if res != nil {
|
||||
res.Body.Close()
|
||||
}
|
||||
if t.Wait != nil {
|
||||
t.Wait(try)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func ExpBackoff(try int) {
|
||||
time.Sleep(100 * time.Millisecond *
|
||||
time.Duration(math.Exp2(float64(try))))
|
||||
}
|
||||
|
||||
// Decide if we should retry a request.
|
||||
// In general, the criteria for retrying a request is described here
|
||||
// http://docs.aws.amazon.com/general/latest/gr/api-retries.html
|
||||
func awsRetry(req *http.Request, res *http.Response, err error) bool {
|
||||
retry := false
|
||||
|
||||
// Retry if there's a temporary network error.
|
||||
if neterr, ok := err.(net.Error); ok {
|
||||
if neterr.Temporary() {
|
||||
retry = true
|
||||
}
|
||||
}
|
||||
|
||||
// Retry if we get a 5xx series error.
|
||||
if res != nil {
|
||||
if res.StatusCode >= 500 && res.StatusCode < 600 {
|
||||
retry = true
|
||||
}
|
||||
}
|
||||
|
||||
return retry
|
||||
}
|
||||
@@ -121,7 +121,7 @@ func (srv *Server) URL() string {
|
||||
return srv.url
|
||||
}
|
||||
|
||||
func fatalf(code int, codeStr string, errf string, a ...interface{}) {
|
||||
func fatalError(code int, codeStr string, errf string, a ...interface{}) {
|
||||
panic(&s3Error{
|
||||
statusCode: code,
|
||||
Code: codeStr,
|
||||
@@ -182,7 +182,7 @@ func (srv *Server) serveHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
case "POST":
|
||||
resp = r.post(a)
|
||||
default:
|
||||
fatalf(400, "MethodNotAllowed", "unknown http request method %q", req.Method)
|
||||
fatalError(400, "MethodNotAllowed", "unknown http request method %q", req.Method)
|
||||
}
|
||||
if resp != nil && req.Method != "HEAD" {
|
||||
xmlMarshal(w, resp)
|
||||
@@ -234,7 +234,7 @@ func (srv *Server) resourceForURL(u *url.URL) (r resource) {
|
||||
|
||||
m := pathRegexp.FindStringSubmatch(u.Path)
|
||||
if m == nil {
|
||||
fatalf(404, "InvalidURI", "Couldn't parse the specified URI")
|
||||
fatalError(404, "InvalidURI", "Couldn't parse the specified URI")
|
||||
}
|
||||
bucketName := m[2]
|
||||
objectName := m[4]
|
||||
@@ -256,7 +256,7 @@ func (srv *Server) resourceForURL(u *url.URL) (r resource) {
|
||||
|
||||
}
|
||||
if b.bucket == nil {
|
||||
fatalf(404, "NoSuchBucket", "The specified bucket does not exist")
|
||||
fatalError(404, "NoSuchBucket", "The specified bucket does not exist")
|
||||
}
|
||||
objr := objectResource{
|
||||
name: objectName,
|
||||
@@ -278,7 +278,7 @@ func (srv *Server) resourceForURL(u *url.URL) (r resource) {
|
||||
type nullResource struct{}
|
||||
|
||||
func notAllowed() interface{} {
|
||||
fatalf(400, "MethodNotAllowed", "The specified method is not allowed against this resource")
|
||||
fatalError(400, "MethodNotAllowed", "The specified method is not allowed against this resource")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -363,7 +363,7 @@ type Key struct {
|
||||
// http://docs.amazonwebservices.com/AmazonS3/latest/API/RESTBucketGET.html
|
||||
func (r bucketResource) get(a *action) interface{} {
|
||||
if r.bucket == nil {
|
||||
fatalf(404, "NoSuchBucket", "The specified bucket does not exist")
|
||||
fatalError(404, "NoSuchBucket", "The specified bucket does not exist")
|
||||
}
|
||||
delimiter := a.req.Form.Get("delimiter")
|
||||
marker := a.req.Form.Get("marker")
|
||||
@@ -371,7 +371,7 @@ func (r bucketResource) get(a *action) interface{} {
|
||||
if s := a.req.Form.Get("max-keys"); s != "" {
|
||||
i, err := strconv.Atoi(s)
|
||||
if err != nil || i < 0 {
|
||||
fatalf(400, "invalid value for max-keys: %q", s)
|
||||
fatalError(400, "invalid value for max-keys: %q", s)
|
||||
}
|
||||
maxKeys = i
|
||||
}
|
||||
@@ -466,10 +466,10 @@ func (obj *object) s3Key() Key {
|
||||
func (r bucketResource) delete(a *action) interface{} {
|
||||
b := r.bucket
|
||||
if b == nil {
|
||||
fatalf(404, "NoSuchBucket", "The specified bucket does not exist")
|
||||
fatalError(404, "NoSuchBucket", "The specified bucket does not exist")
|
||||
}
|
||||
if len(b.objects) > 0 {
|
||||
fatalf(400, "BucketNotEmpty", "The bucket you tried to delete is not empty")
|
||||
fatalError(400, "BucketNotEmpty", "The bucket you tried to delete is not empty")
|
||||
}
|
||||
delete(a.srv.buckets, b.name)
|
||||
return nil
|
||||
@@ -481,10 +481,10 @@ func (r bucketResource) put(a *action) interface{} {
|
||||
var created bool
|
||||
if r.bucket == nil {
|
||||
if !validBucketName(r.name) {
|
||||
fatalf(400, "InvalidBucketName", "The specified bucket is not valid")
|
||||
fatalError(400, "InvalidBucketName", "The specified bucket is not valid")
|
||||
}
|
||||
if loc := locationConstraint(a); loc == "" {
|
||||
fatalf(400, "InvalidRequets", "The unspecified location constraint is incompatible for the region specific endpoint this request was sent to.")
|
||||
fatalError(400, "InvalidRequets", "The unspecified location constraint is incompatible for the region specific endpoint this request was sent to.")
|
||||
}
|
||||
// TODO validate acl
|
||||
r.bucket = &bucket{
|
||||
@@ -496,14 +496,14 @@ func (r bucketResource) put(a *action) interface{} {
|
||||
created = true
|
||||
}
|
||||
if !created && a.srv.config.send409Conflict() {
|
||||
fatalf(409, "BucketAlreadyOwnedByYou", "Your previous request to create the named bucket succeeded and you already own it.")
|
||||
fatalError(409, "BucketAlreadyOwnedByYou", "Your previous request to create the named bucket succeeded and you already own it.")
|
||||
}
|
||||
r.bucket.acl = a.req.Header.Get("x-amz-acl")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bucketResource) post(a *action) interface{} {
|
||||
fatalf(400, "Method", "bucket POST method not available")
|
||||
fatalError(400, "Method", "bucket POST method not available")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -565,7 +565,7 @@ type objectResource struct {
|
||||
func (objr objectResource) get(a *action) interface{} {
|
||||
obj := objr.object
|
||||
if obj == nil {
|
||||
fatalf(404, "NoSuchKey", "The specified key does not exist.")
|
||||
fatalError(404, "NoSuchKey", "The specified key does not exist.")
|
||||
}
|
||||
h := a.w.Header()
|
||||
// add metadata
|
||||
@@ -583,7 +583,7 @@ func (objr objectResource) get(a *action) interface{} {
|
||||
}
|
||||
}
|
||||
if r := a.req.Header.Get("Range"); r != "" {
|
||||
fatalf(400, "NotImplemented", "range unimplemented")
|
||||
fatalError(400, "NotImplemented", "range unimplemented")
|
||||
}
|
||||
// TODO Last-Modified-Since
|
||||
// TODO If-Modified-Since
|
||||
@@ -636,21 +636,21 @@ func (objr objectResource) put(a *action) interface{} {
|
||||
var err error
|
||||
expectHash, err = hex.DecodeString(c)
|
||||
if err != nil || len(expectHash) != md5.Size {
|
||||
fatalf(400, "InvalidDigest", "The Content-MD5 you specified was invalid")
|
||||
fatalError(400, "InvalidDigest", "The Content-MD5 you specified was invalid")
|
||||
}
|
||||
}
|
||||
sum := md5.New()
|
||||
// TODO avoid holding lock while reading data.
|
||||
data, err := ioutil.ReadAll(io.TeeReader(a.req.Body, sum))
|
||||
if err != nil {
|
||||
fatalf(400, "TODO", "read error")
|
||||
fatalError(400, "TODO", "read error")
|
||||
}
|
||||
gotHash := sum.Sum(nil)
|
||||
if expectHash != nil && bytes.Compare(gotHash, expectHash) != 0 {
|
||||
fatalf(400, "BadDigest", "The Content-MD5 you specified did not match what we received")
|
||||
fatalError(400, "BadDigest", "The Content-MD5 you specified did not match what we received")
|
||||
}
|
||||
if a.req.ContentLength >= 0 && int64(len(data)) != a.req.ContentLength {
|
||||
fatalf(400, "IncompleteBody", "You did not provide the number of bytes specified by the Content-Length HTTP header")
|
||||
fatalError(400, "IncompleteBody", "You did not provide the number of bytes specified by the Content-Length HTTP header")
|
||||
}
|
||||
|
||||
// PUT request has been successful - save data and metadata
|
||||
@@ -673,7 +673,7 @@ func (objr objectResource) delete(a *action) interface{} {
|
||||
}
|
||||
|
||||
func (objr objectResource) post(a *action) interface{} {
|
||||
fatalf(400, "MethodNotAllowed", "The specified method is not allowed against this resource")
|
||||
fatalError(400, "MethodNotAllowed", "The specified method is not allowed against this resource")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -686,14 +686,14 @@ type CreateBucketConfiguration struct {
|
||||
func locationConstraint(a *action) string {
|
||||
var body bytes.Buffer
|
||||
if _, err := io.Copy(&body, a.req.Body); err != nil {
|
||||
fatalf(400, "InvalidRequest", err.Error())
|
||||
fatalError(400, "InvalidRequest", err.Error())
|
||||
}
|
||||
if body.Len() == 0 {
|
||||
return ""
|
||||
}
|
||||
var loc CreateBucketConfiguration
|
||||
if err := xml.NewDecoder(&body).Decode(&loc); err != nil {
|
||||
fatalf(400, "InvalidRequest", err.Error())
|
||||
fatalError(400, "InvalidRequest", err.Error())
|
||||
}
|
||||
return loc.LocationConstraint
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user