mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-06-01 04:40:38 +00:00
Update Go AWS SDK to the latest version
This commit is contained in:
committed by
Andrey Smirnov
parent
d08be990ef
commit
94a72b23ff
+2206
-72
File diff suppressed because it is too large
Load Diff
+94
@@ -0,0 +1,94 @@
|
||||
// +build integration
|
||||
|
||||
package kinesis_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||
)
|
||||
|
||||
func TestInteg_SubscribeToShard(t *testing.T) {
|
||||
desc, err := svc.DescribeStream(&kinesis.DescribeStreamInput{
|
||||
StreamName: &streamName,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("expect no error, %v", err)
|
||||
}
|
||||
|
||||
cons, err := svc.DescribeStreamConsumer(
|
||||
&kinesis.DescribeStreamConsumerInput{
|
||||
StreamARN: desc.StreamDescription.StreamARN,
|
||||
ConsumerName: &consumerName,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("expect no error, %v", err)
|
||||
}
|
||||
|
||||
ctx, cancelFn := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
defer cancelFn()
|
||||
|
||||
var recordsRx int32
|
||||
var ignoredCount int32
|
||||
var goodCount int32
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(desc.StreamDescription.Shards))
|
||||
for i, shard := range desc.StreamDescription.Shards {
|
||||
go func(idx int, s *kinesis.Shard) {
|
||||
defer wg.Done()
|
||||
params := &kinesis.SubscribeToShardInput{
|
||||
ConsumerARN: cons.ConsumerDescription.ConsumerARN,
|
||||
StartingPosition: &kinesis.StartingPosition{
|
||||
Type: aws.String(kinesis.ShardIteratorTypeAtTimestamp),
|
||||
Timestamp: &startingTimestamp,
|
||||
},
|
||||
ShardId: s.ShardId,
|
||||
}
|
||||
|
||||
sub, err := svc.SubscribeToShardWithContext(ctx, params)
|
||||
if err != nil {
|
||||
t.Fatalf("expect no error, %v, %v", err, *s.ShardId)
|
||||
}
|
||||
defer sub.EventStream.Close()
|
||||
|
||||
Loop:
|
||||
for event := range sub.EventStream.Events() {
|
||||
switch e := event.(type) {
|
||||
case *kinesis.SubscribeToShardEvent:
|
||||
if len(e.Records) == 0 {
|
||||
atomic.AddInt32(&ignoredCount, 1)
|
||||
} else {
|
||||
atomic.AddInt32(&goodCount, 1)
|
||||
for _, r := range e.Records {
|
||||
if len(r.Data) == 0 {
|
||||
t.Fatalf("expect data in record, got none")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
newCount := atomic.AddInt32(&recordsRx, int32(len(e.Records)))
|
||||
if int(newCount) >= len(records) {
|
||||
break Loop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := sub.EventStream.Err(); err != nil {
|
||||
t.Fatalf("expect no error, %v, %v", err, *s.ShardId)
|
||||
}
|
||||
}(i, shard)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
if e, a := len(records), int(recordsRx); e != a {
|
||||
t.Errorf("expected to read %v records, got %v", e, a)
|
||||
}
|
||||
|
||||
t.Log("Ignored", ignoredCount, "empty events, non-empty", goodCount)
|
||||
}
|
||||
+343
@@ -0,0 +1,343 @@
|
||||
// +build integration
|
||||
|
||||
package kinesis_test
|
||||
|
||||
import (
|
||||
crand "crypto/rand"
|
||||
"crypto/tls"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/awstesting/integration"
|
||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||
"golang.org/x/net/http2"
|
||||
)
|
||||
|
||||
var (
|
||||
skipTLSVerify bool
|
||||
hUsage string
|
||||
endpoint string
|
||||
streamName string
|
||||
consumerName string
|
||||
numRecords int
|
||||
recordSize int
|
||||
debugEventStream bool
|
||||
mode string
|
||||
|
||||
svc *kinesis.Kinesis
|
||||
records []*kinesis.PutRecordsRequestEntry
|
||||
|
||||
startingTimestamp time.Time
|
||||
)
|
||||
|
||||
func init() {
|
||||
flag.StringVar(
|
||||
&mode, "mode", "all",
|
||||
"Sets the mode to run in, (test,create,cleanup,all).",
|
||||
)
|
||||
flag.BoolVar(
|
||||
&skipTLSVerify, "skip-verify", false,
|
||||
"Skips verification of TLS certificate.",
|
||||
)
|
||||
flag.StringVar(
|
||||
&hUsage, "http", "default",
|
||||
"The HTTP `version` to use for the connection. (default,1,2)",
|
||||
)
|
||||
flag.StringVar(
|
||||
&endpoint, "endpoint", "",
|
||||
"Overrides SDK `URL` endpoint for tests.",
|
||||
)
|
||||
flag.StringVar(
|
||||
&streamName, "stream", fmt.Sprintf("awsdkgo-s%v", UniqueID()),
|
||||
"The `name` of the stream to test against.",
|
||||
)
|
||||
flag.StringVar(
|
||||
&consumerName, "consumer", fmt.Sprintf("awsdkgo-c%v", UniqueID()),
|
||||
"The `name` of the stream to test against.",
|
||||
)
|
||||
flag.IntVar(
|
||||
&numRecords, "records", 20,
|
||||
"The `number` of records per PutRecords to test with.",
|
||||
)
|
||||
flag.IntVar(
|
||||
&recordSize, "record-size", 500,
|
||||
"The size in `bytes` of each record.",
|
||||
)
|
||||
flag.BoolVar(
|
||||
&debugEventStream, "debug-eventstream", false,
|
||||
"Enables debugging of the EventStream messages",
|
||||
)
|
||||
}
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
flag.Parse()
|
||||
|
||||
svc = createClient()
|
||||
|
||||
startingTimestamp = time.Now().Add(-time.Minute)
|
||||
|
||||
switch mode {
|
||||
case "create", "all":
|
||||
if err := createStream(streamName); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := createStreamConsumer(streamName, consumerName); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
fmt.Println("Stream Ready:", streamName, consumerName)
|
||||
|
||||
if mode != "all" {
|
||||
break
|
||||
}
|
||||
fallthrough
|
||||
case "test":
|
||||
records = createRecords(numRecords, recordSize)
|
||||
if err := putRecords(streamName, records, svc); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
|
||||
var exitCode int
|
||||
defer func() {
|
||||
os.Exit(exitCode)
|
||||
}()
|
||||
|
||||
exitCode = m.Run()
|
||||
|
||||
if mode != "all" {
|
||||
break
|
||||
}
|
||||
fallthrough
|
||||
case "cleanup":
|
||||
if err := cleanupStreamConsumer(streamName, consumerName); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := cleanupStream(streamName); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
default:
|
||||
fmt.Fprintf(os.Stderr, "unknown mode, %v", mode)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func createClient() *kinesis.Kinesis {
|
||||
ts := &http.Transport{}
|
||||
|
||||
if skipTLSVerify {
|
||||
ts.TLSClientConfig = &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
}
|
||||
}
|
||||
|
||||
http2.ConfigureTransport(ts)
|
||||
switch hUsage {
|
||||
case "default":
|
||||
// Restore H2 optional support since the Transport/TLSConfig was
|
||||
// modified.
|
||||
http2.ConfigureTransport(ts)
|
||||
case "1":
|
||||
// Do nothing. Without usign ConfigureTransport h2 won't be available.
|
||||
ts.TLSClientConfig.NextProtos = []string{"http/1.1"}
|
||||
case "2":
|
||||
// Force the TLS ALPN (NextProto) to H2 only.
|
||||
ts.TLSClientConfig.NextProtos = []string{http2.NextProtoTLS}
|
||||
default:
|
||||
panic("unknown h usage, " + hUsage)
|
||||
}
|
||||
|
||||
sess := integration.SessionWithDefaultRegion("us-west-2")
|
||||
cfg := &aws.Config{
|
||||
HTTPClient: &http.Client{
|
||||
Transport: ts,
|
||||
},
|
||||
}
|
||||
if debugEventStream {
|
||||
cfg.LogLevel = aws.LogLevel(
|
||||
sess.Config.LogLevel.Value() | aws.LogDebugWithEventStreamBody)
|
||||
}
|
||||
|
||||
return kinesis.New(sess, cfg)
|
||||
}
|
||||
|
||||
func createStream(name string) error {
|
||||
descParams := &kinesis.DescribeStreamInput{
|
||||
StreamName: &name,
|
||||
}
|
||||
|
||||
_, err := svc.DescribeStream(descParams)
|
||||
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == kinesis.ErrCodeResourceNotFoundException {
|
||||
_, err := svc.CreateStream(&kinesis.CreateStreamInput{
|
||||
ShardCount: aws.Int64(100),
|
||||
StreamName: &name,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create stream, %v", err)
|
||||
}
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("failed to describe stream, %v", err)
|
||||
}
|
||||
|
||||
if err := svc.WaitUntilStreamExists(descParams); err != nil {
|
||||
return fmt.Errorf("failed to wait for stream to exist, %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func cleanupStream(name string) error {
|
||||
_, err := svc.DeleteStream(&kinesis.DeleteStreamInput{
|
||||
StreamName: &name,
|
||||
EnforceConsumerDeletion: aws.Bool(true),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete stream, %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func createStreamConsumer(streamName, consumerName string) error {
|
||||
desc, err := svc.DescribeStream(&kinesis.DescribeStreamInput{
|
||||
StreamName: &streamName,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to describe stream, %s, %v", streamName, err)
|
||||
}
|
||||
|
||||
descParams := &kinesis.DescribeStreamConsumerInput{
|
||||
StreamARN: desc.StreamDescription.StreamARN,
|
||||
ConsumerName: &consumerName,
|
||||
}
|
||||
_, err = svc.DescribeStreamConsumer(descParams)
|
||||
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == kinesis.ErrCodeResourceNotFoundException {
|
||||
_, err := svc.RegisterStreamConsumer(
|
||||
&kinesis.RegisterStreamConsumerInput{
|
||||
ConsumerName: aws.String(consumerName),
|
||||
StreamARN: desc.StreamDescription.StreamARN,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create stream consumer %s, %v",
|
||||
consumerName, err)
|
||||
}
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("failed to describe stream consumer %s, %v",
|
||||
consumerName, err)
|
||||
}
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
resp, err := svc.DescribeStreamConsumer(descParams)
|
||||
if err != nil || aws.StringValue(resp.ConsumerDescription.ConsumerStatus) != kinesis.ConsumerStatusActive {
|
||||
time.Sleep(time.Second * 30)
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed to wait for consumer to exist, %v, %v",
|
||||
*descParams.StreamARN, *descParams.ConsumerName)
|
||||
}
|
||||
|
||||
func cleanupStreamConsumer(streamName, consumerName string) error {
|
||||
desc, err := svc.DescribeStream(&kinesis.DescribeStreamInput{
|
||||
StreamName: &streamName,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to describe stream, %s, %v",
|
||||
streamName, err)
|
||||
}
|
||||
|
||||
descCons, err := svc.DescribeStreamConsumer(&kinesis.DescribeStreamConsumerInput{
|
||||
StreamARN: desc.StreamDescription.StreamARN,
|
||||
ConsumerName: &consumerName,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to describe stream consumer, %s, %v",
|
||||
consumerName, err)
|
||||
}
|
||||
|
||||
_, err = svc.DeregisterStreamConsumer(
|
||||
&kinesis.DeregisterStreamConsumerInput{
|
||||
ConsumerName: descCons.ConsumerDescription.ConsumerName,
|
||||
ConsumerARN: descCons.ConsumerDescription.ConsumerARN,
|
||||
StreamARN: desc.StreamDescription.StreamARN,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete stream consumer, %s %v",
|
||||
consumerName, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func createRecords(num, size int) []*kinesis.PutRecordsRequestEntry {
|
||||
var err error
|
||||
data, err := loadRandomData(num, size)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "unable to read random data, %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
records := make([]*kinesis.PutRecordsRequestEntry, len(data))
|
||||
for i, td := range data {
|
||||
records[i] = &kinesis.PutRecordsRequestEntry{
|
||||
Data: td,
|
||||
PartitionKey: aws.String(UniqueID()),
|
||||
}
|
||||
}
|
||||
|
||||
return records
|
||||
}
|
||||
|
||||
func putRecords(stream string, records []*kinesis.PutRecordsRequestEntry, svc *kinesis.Kinesis) error {
|
||||
resp, err := svc.PutRecords(&kinesis.PutRecordsInput{
|
||||
StreamName: &stream,
|
||||
Records: records,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to put records to stream %s, %v", stream, err)
|
||||
}
|
||||
|
||||
if v := aws.Int64Value(resp.FailedRecordCount); v != 0 {
|
||||
return fmt.Errorf("failed to put records to stream %s, %d failed",
|
||||
stream, v)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func loadRandomData(m, n int) ([][]byte, error) {
|
||||
data := make([]byte, m*n)
|
||||
|
||||
_, err := rand.Read(data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
parts := make([][]byte, m)
|
||||
|
||||
for i := 0; i < m; i++ {
|
||||
mod := (i % m)
|
||||
parts[i] = data[mod*n : (mod+1)*n]
|
||||
}
|
||||
|
||||
return parts, nil
|
||||
}
|
||||
|
||||
// UniqueID returns a unique UUID-like identifier for use in generating
|
||||
// resources for integration tests.
|
||||
func UniqueID() string {
|
||||
uuid := make([]byte, 16)
|
||||
io.ReadFull(crand.Reader, uuid)
|
||||
return fmt.Sprintf("%x", uuid)
|
||||
}
|
||||
+5
-2
@@ -13,10 +13,13 @@ const (
|
||||
// ErrCodeExpiredNextTokenException for service response error code
|
||||
// "ExpiredNextTokenException".
|
||||
//
|
||||
// The pagination token passed to the ListShards operation is expired. For more
|
||||
// information, see ListShardsInput$NextToken.
|
||||
// The pagination token passed to the operation is expired.
|
||||
ErrCodeExpiredNextTokenException = "ExpiredNextTokenException"
|
||||
|
||||
// ErrCodeInternalFailureException for service response error code
|
||||
// "InternalFailureException".
|
||||
ErrCodeInternalFailureException = "InternalFailureException"
|
||||
|
||||
// ErrCodeInvalidArgumentException for service response error code
|
||||
// "InvalidArgumentException".
|
||||
//
|
||||
|
||||
+314
@@ -0,0 +1,314 @@
|
||||
// Code generated by private/model/cli/gen-api/main.go. DO NOT EDIT.
|
||||
|
||||
// +build go1.6
|
||||
|
||||
package kinesis
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/corehandlers"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/awstesting/unit"
|
||||
"github.com/aws/aws-sdk-go/private/protocol"
|
||||
"github.com/aws/aws-sdk-go/private/protocol/eventstream"
|
||||
"github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi"
|
||||
"github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamtest"
|
||||
"github.com/aws/aws-sdk-go/private/protocol/jsonrpc"
|
||||
)
|
||||
|
||||
var _ time.Time
|
||||
var _ awserr.Error
|
||||
|
||||
func TestSubscribeToShard_Read(t *testing.T) {
|
||||
expectEvents, eventMsgs := mockSubscribeToShardReadEvents()
|
||||
sess, cleanupFn, err := eventstreamtest.SetupEventStreamSession(t,
|
||||
eventstreamtest.ServeEventStream{
|
||||
T: t,
|
||||
Events: eventMsgs,
|
||||
},
|
||||
true,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("expect no error, %v", err)
|
||||
}
|
||||
defer cleanupFn()
|
||||
|
||||
svc := New(sess)
|
||||
resp, err := svc.SubscribeToShard(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("expect no error got, %v", err)
|
||||
}
|
||||
defer resp.EventStream.Close()
|
||||
// Trim off response output type pseudo event so only event messages remain.
|
||||
expectEvents = expectEvents[1:]
|
||||
|
||||
var i int
|
||||
for event := range resp.EventStream.Events() {
|
||||
if event == nil {
|
||||
t.Errorf("%d, expect event, got nil", i)
|
||||
}
|
||||
if e, a := expectEvents[i], event; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("%d, expect %T %v, got %T %v", i, e, e, a, a)
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
if err := resp.EventStream.Err(); err != nil {
|
||||
t.Errorf("expect no error, %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubscribeToShard_ReadClose(t *testing.T) {
|
||||
_, eventMsgs := mockSubscribeToShardReadEvents()
|
||||
sess, cleanupFn, err := eventstreamtest.SetupEventStreamSession(t,
|
||||
eventstreamtest.ServeEventStream{
|
||||
T: t,
|
||||
Events: eventMsgs,
|
||||
},
|
||||
true,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("expect no error, %v", err)
|
||||
}
|
||||
defer cleanupFn()
|
||||
|
||||
svc := New(sess)
|
||||
resp, err := svc.SubscribeToShard(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("expect no error got, %v", err)
|
||||
}
|
||||
|
||||
resp.EventStream.Close()
|
||||
<-resp.EventStream.Events()
|
||||
|
||||
if err := resp.EventStream.Err(); err != nil {
|
||||
t.Errorf("expect no error, %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkSubscribeToShard_Read(b *testing.B) {
|
||||
_, eventMsgs := mockSubscribeToShardReadEvents()
|
||||
var buf bytes.Buffer
|
||||
encoder := eventstream.NewEncoder(&buf)
|
||||
for _, msg := range eventMsgs {
|
||||
if err := encoder.Encode(msg); err != nil {
|
||||
b.Fatalf("failed to encode message, %v", err)
|
||||
}
|
||||
}
|
||||
stream := &loopReader{source: bytes.NewReader(buf.Bytes())}
|
||||
|
||||
sess := unit.Session
|
||||
svc := New(sess, &aws.Config{
|
||||
Endpoint: aws.String("https://example.com"),
|
||||
DisableParamValidation: aws.Bool(true),
|
||||
})
|
||||
svc.Handlers.Send.Swap(corehandlers.SendHandler.Name,
|
||||
request.NamedHandler{Name: "mockSend",
|
||||
Fn: func(r *request.Request) {
|
||||
r.HTTPResponse = &http.Response{
|
||||
Status: "200 OK",
|
||||
StatusCode: 200,
|
||||
Header: http.Header{},
|
||||
Body: ioutil.NopCloser(stream),
|
||||
}
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
resp, err := svc.SubscribeToShard(nil)
|
||||
if err != nil {
|
||||
b.Fatalf("failed to create request, %v", err)
|
||||
}
|
||||
defer resp.EventStream.Close()
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
if err = resp.EventStream.Err(); err != nil {
|
||||
b.Fatalf("expect no error, got %v", err)
|
||||
}
|
||||
event := <-resp.EventStream.Events()
|
||||
if event == nil {
|
||||
b.Fatalf("expect event, got nil, %v, %d", resp.EventStream.Err(), i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func mockSubscribeToShardReadEvents() (
|
||||
[]SubscribeToShardEventStreamEvent,
|
||||
[]eventstream.Message,
|
||||
) {
|
||||
expectEvents := []SubscribeToShardEventStreamEvent{
|
||||
&SubscribeToShardOutput{},
|
||||
&SubscribeToShardEvent{
|
||||
ContinuationSequenceNumber: aws.String("string value goes here"),
|
||||
MillisBehindLatest: aws.Int64(1234),
|
||||
Records: []*Record{
|
||||
{
|
||||
ApproximateArrivalTimestamp: aws.Time(time.Unix(1396594860, 0).UTC()),
|
||||
Data: []byte("blob value goes here"),
|
||||
EncryptionType: aws.String("string value goes here"),
|
||||
PartitionKey: aws.String("string value goes here"),
|
||||
SequenceNumber: aws.String("string value goes here"),
|
||||
},
|
||||
{
|
||||
ApproximateArrivalTimestamp: aws.Time(time.Unix(1396594860, 0).UTC()),
|
||||
Data: []byte("blob value goes here"),
|
||||
EncryptionType: aws.String("string value goes here"),
|
||||
PartitionKey: aws.String("string value goes here"),
|
||||
SequenceNumber: aws.String("string value goes here"),
|
||||
},
|
||||
{
|
||||
ApproximateArrivalTimestamp: aws.Time(time.Unix(1396594860, 0).UTC()),
|
||||
Data: []byte("blob value goes here"),
|
||||
EncryptionType: aws.String("string value goes here"),
|
||||
PartitionKey: aws.String("string value goes here"),
|
||||
SequenceNumber: aws.String("string value goes here"),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var marshalers request.HandlerList
|
||||
marshalers.PushBackNamed(jsonrpc.BuildHandler)
|
||||
payloadMarshaler := protocol.HandlerPayloadMarshal{
|
||||
Marshalers: marshalers,
|
||||
}
|
||||
_ = payloadMarshaler
|
||||
|
||||
eventMsgs := []eventstream.Message{
|
||||
{
|
||||
Headers: eventstream.Headers{
|
||||
eventstreamtest.EventMessageTypeHeader,
|
||||
{
|
||||
Name: eventstreamapi.EventTypeHeader,
|
||||
Value: eventstream.StringValue("initial-response"),
|
||||
},
|
||||
},
|
||||
Payload: eventstreamtest.MarshalEventPayload(payloadMarshaler, expectEvents[0]),
|
||||
},
|
||||
{
|
||||
Headers: eventstream.Headers{
|
||||
eventstreamtest.EventMessageTypeHeader,
|
||||
{
|
||||
Name: eventstreamapi.EventTypeHeader,
|
||||
Value: eventstream.StringValue("SubscribeToShardEvent"),
|
||||
},
|
||||
},
|
||||
Payload: eventstreamtest.MarshalEventPayload(payloadMarshaler, expectEvents[1]),
|
||||
},
|
||||
}
|
||||
|
||||
return expectEvents, eventMsgs
|
||||
}
|
||||
func TestSubscribeToShard_ReadException(t *testing.T) {
|
||||
expectEvents := []SubscribeToShardEventStreamEvent{
|
||||
&SubscribeToShardOutput{},
|
||||
&InternalFailureException{
|
||||
Message_: aws.String("string value goes here"),
|
||||
},
|
||||
}
|
||||
|
||||
var marshalers request.HandlerList
|
||||
marshalers.PushBackNamed(jsonrpc.BuildHandler)
|
||||
payloadMarshaler := protocol.HandlerPayloadMarshal{
|
||||
Marshalers: marshalers,
|
||||
}
|
||||
|
||||
eventMsgs := []eventstream.Message{
|
||||
{
|
||||
Headers: eventstream.Headers{
|
||||
eventstreamtest.EventMessageTypeHeader,
|
||||
{
|
||||
Name: eventstreamapi.EventTypeHeader,
|
||||
Value: eventstream.StringValue("initial-response"),
|
||||
},
|
||||
},
|
||||
Payload: eventstreamtest.MarshalEventPayload(payloadMarshaler, expectEvents[0]),
|
||||
},
|
||||
{
|
||||
Headers: eventstream.Headers{
|
||||
eventstreamtest.EventExceptionTypeHeader,
|
||||
{
|
||||
Name: eventstreamapi.ExceptionTypeHeader,
|
||||
Value: eventstream.StringValue("InternalFailureException"),
|
||||
},
|
||||
},
|
||||
Payload: eventstreamtest.MarshalEventPayload(payloadMarshaler, expectEvents[1]),
|
||||
},
|
||||
}
|
||||
|
||||
sess, cleanupFn, err := eventstreamtest.SetupEventStreamSession(t,
|
||||
eventstreamtest.ServeEventStream{
|
||||
T: t,
|
||||
Events: eventMsgs,
|
||||
},
|
||||
true,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("expect no error, %v", err)
|
||||
}
|
||||
defer cleanupFn()
|
||||
|
||||
svc := New(sess)
|
||||
resp, err := svc.SubscribeToShard(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("expect no error got, %v", err)
|
||||
}
|
||||
|
||||
defer resp.EventStream.Close()
|
||||
|
||||
<-resp.EventStream.Events()
|
||||
|
||||
err = resp.EventStream.Err()
|
||||
if err == nil {
|
||||
t.Fatalf("expect err, got none")
|
||||
}
|
||||
|
||||
expectErr := &InternalFailureException{
|
||||
Message_: aws.String("string value goes here"),
|
||||
}
|
||||
aerr, ok := err.(awserr.Error)
|
||||
if !ok {
|
||||
t.Errorf("expect exception, got %T, %#v", err, err)
|
||||
}
|
||||
if e, a := expectErr.Code(), aerr.Code(); e != a {
|
||||
t.Errorf("expect %v, got %v", e, a)
|
||||
}
|
||||
if e, a := expectErr.Message(), aerr.Message(); e != a {
|
||||
t.Errorf("expect %v, got %v", e, a)
|
||||
}
|
||||
|
||||
if e, a := expectErr, aerr; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("expect %#v, got %#v", e, a)
|
||||
}
|
||||
}
|
||||
|
||||
var _ awserr.Error = (*InternalFailureException)(nil)
|
||||
var _ awserr.Error = (*KMSAccessDeniedException)(nil)
|
||||
var _ awserr.Error = (*KMSDisabledException)(nil)
|
||||
var _ awserr.Error = (*KMSInvalidStateException)(nil)
|
||||
var _ awserr.Error = (*KMSNotFoundException)(nil)
|
||||
var _ awserr.Error = (*KMSOptInRequired)(nil)
|
||||
var _ awserr.Error = (*KMSThrottlingException)(nil)
|
||||
var _ awserr.Error = (*ResourceInUseException)(nil)
|
||||
var _ awserr.Error = (*ResourceNotFoundException)(nil)
|
||||
|
||||
type loopReader struct {
|
||||
source *bytes.Reader
|
||||
}
|
||||
|
||||
func (c *loopReader) Read(p []byte) (int, error) {
|
||||
if c.source.Len() == 0 {
|
||||
c.source.Seek(0, 0)
|
||||
}
|
||||
|
||||
return c.source.Read(p)
|
||||
}
|
||||
+58
@@ -0,0 +1,58 @@
|
||||
// Code generated by private/model/cli/gen-api/main.go. DO NOT EDIT.
|
||||
|
||||
// +build go1.10,integration
|
||||
|
||||
package kinesis_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/awstesting/integration"
|
||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||
)
|
||||
|
||||
var _ aws.Config
|
||||
var _ awserr.Error
|
||||
var _ request.Request
|
||||
|
||||
func TestInteg_00_ListStreams(t *testing.T) {
|
||||
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancelFn()
|
||||
|
||||
sess := integration.SessionWithDefaultRegion("us-west-2")
|
||||
svc := kinesis.New(sess)
|
||||
params := &kinesis.ListStreamsInput{}
|
||||
_, err := svc.ListStreamsWithContext(ctx, params)
|
||||
if err != nil {
|
||||
t.Errorf("expect no error, got %v", err)
|
||||
}
|
||||
}
|
||||
func TestInteg_01_DescribeStream(t *testing.T) {
|
||||
ctx, cancelFn := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancelFn()
|
||||
|
||||
sess := integration.SessionWithDefaultRegion("us-west-2")
|
||||
svc := kinesis.New(sess)
|
||||
params := &kinesis.DescribeStreamInput{
|
||||
StreamName: aws.String("bogus-stream-name"),
|
||||
}
|
||||
_, err := svc.DescribeStreamWithContext(ctx, params)
|
||||
if err == nil {
|
||||
t.Fatalf("expect request to fail")
|
||||
}
|
||||
aerr, ok := err.(awserr.RequestFailure)
|
||||
if !ok {
|
||||
t.Fatalf("expect awserr, was %T", err)
|
||||
}
|
||||
if len(aerr.Code()) == 0 {
|
||||
t.Errorf("expect non-empty error code")
|
||||
}
|
||||
if v := aerr.Code(); v == request.ErrCodeSerialization {
|
||||
t.Errorf("expect API error code got serialization failure")
|
||||
}
|
||||
}
|
||||
+23
@@ -76,6 +76,10 @@ type KinesisAPI interface {
|
||||
DeleteStreamWithContext(aws.Context, *kinesis.DeleteStreamInput, ...request.Option) (*kinesis.DeleteStreamOutput, error)
|
||||
DeleteStreamRequest(*kinesis.DeleteStreamInput) (*request.Request, *kinesis.DeleteStreamOutput)
|
||||
|
||||
DeregisterStreamConsumer(*kinesis.DeregisterStreamConsumerInput) (*kinesis.DeregisterStreamConsumerOutput, error)
|
||||
DeregisterStreamConsumerWithContext(aws.Context, *kinesis.DeregisterStreamConsumerInput, ...request.Option) (*kinesis.DeregisterStreamConsumerOutput, error)
|
||||
DeregisterStreamConsumerRequest(*kinesis.DeregisterStreamConsumerInput) (*request.Request, *kinesis.DeregisterStreamConsumerOutput)
|
||||
|
||||
DescribeLimits(*kinesis.DescribeLimitsInput) (*kinesis.DescribeLimitsOutput, error)
|
||||
DescribeLimitsWithContext(aws.Context, *kinesis.DescribeLimitsInput, ...request.Option) (*kinesis.DescribeLimitsOutput, error)
|
||||
DescribeLimitsRequest(*kinesis.DescribeLimitsInput) (*request.Request, *kinesis.DescribeLimitsOutput)
|
||||
@@ -87,6 +91,10 @@ type KinesisAPI interface {
|
||||
DescribeStreamPages(*kinesis.DescribeStreamInput, func(*kinesis.DescribeStreamOutput, bool) bool) error
|
||||
DescribeStreamPagesWithContext(aws.Context, *kinesis.DescribeStreamInput, func(*kinesis.DescribeStreamOutput, bool) bool, ...request.Option) error
|
||||
|
||||
DescribeStreamConsumer(*kinesis.DescribeStreamConsumerInput) (*kinesis.DescribeStreamConsumerOutput, error)
|
||||
DescribeStreamConsumerWithContext(aws.Context, *kinesis.DescribeStreamConsumerInput, ...request.Option) (*kinesis.DescribeStreamConsumerOutput, error)
|
||||
DescribeStreamConsumerRequest(*kinesis.DescribeStreamConsumerInput) (*request.Request, *kinesis.DescribeStreamConsumerOutput)
|
||||
|
||||
DescribeStreamSummary(*kinesis.DescribeStreamSummaryInput) (*kinesis.DescribeStreamSummaryOutput, error)
|
||||
DescribeStreamSummaryWithContext(aws.Context, *kinesis.DescribeStreamSummaryInput, ...request.Option) (*kinesis.DescribeStreamSummaryOutput, error)
|
||||
DescribeStreamSummaryRequest(*kinesis.DescribeStreamSummaryInput) (*request.Request, *kinesis.DescribeStreamSummaryOutput)
|
||||
@@ -115,6 +123,13 @@ type KinesisAPI interface {
|
||||
ListShardsWithContext(aws.Context, *kinesis.ListShardsInput, ...request.Option) (*kinesis.ListShardsOutput, error)
|
||||
ListShardsRequest(*kinesis.ListShardsInput) (*request.Request, *kinesis.ListShardsOutput)
|
||||
|
||||
ListStreamConsumers(*kinesis.ListStreamConsumersInput) (*kinesis.ListStreamConsumersOutput, error)
|
||||
ListStreamConsumersWithContext(aws.Context, *kinesis.ListStreamConsumersInput, ...request.Option) (*kinesis.ListStreamConsumersOutput, error)
|
||||
ListStreamConsumersRequest(*kinesis.ListStreamConsumersInput) (*request.Request, *kinesis.ListStreamConsumersOutput)
|
||||
|
||||
ListStreamConsumersPages(*kinesis.ListStreamConsumersInput, func(*kinesis.ListStreamConsumersOutput, bool) bool) error
|
||||
ListStreamConsumersPagesWithContext(aws.Context, *kinesis.ListStreamConsumersInput, func(*kinesis.ListStreamConsumersOutput, bool) bool, ...request.Option) error
|
||||
|
||||
ListStreams(*kinesis.ListStreamsInput) (*kinesis.ListStreamsOutput, error)
|
||||
ListStreamsWithContext(aws.Context, *kinesis.ListStreamsInput, ...request.Option) (*kinesis.ListStreamsOutput, error)
|
||||
ListStreamsRequest(*kinesis.ListStreamsInput) (*request.Request, *kinesis.ListStreamsOutput)
|
||||
@@ -138,6 +153,10 @@ type KinesisAPI interface {
|
||||
PutRecordsWithContext(aws.Context, *kinesis.PutRecordsInput, ...request.Option) (*kinesis.PutRecordsOutput, error)
|
||||
PutRecordsRequest(*kinesis.PutRecordsInput) (*request.Request, *kinesis.PutRecordsOutput)
|
||||
|
||||
RegisterStreamConsumer(*kinesis.RegisterStreamConsumerInput) (*kinesis.RegisterStreamConsumerOutput, error)
|
||||
RegisterStreamConsumerWithContext(aws.Context, *kinesis.RegisterStreamConsumerInput, ...request.Option) (*kinesis.RegisterStreamConsumerOutput, error)
|
||||
RegisterStreamConsumerRequest(*kinesis.RegisterStreamConsumerInput) (*request.Request, *kinesis.RegisterStreamConsumerOutput)
|
||||
|
||||
RemoveTagsFromStream(*kinesis.RemoveTagsFromStreamInput) (*kinesis.RemoveTagsFromStreamOutput, error)
|
||||
RemoveTagsFromStreamWithContext(aws.Context, *kinesis.RemoveTagsFromStreamInput, ...request.Option) (*kinesis.RemoveTagsFromStreamOutput, error)
|
||||
RemoveTagsFromStreamRequest(*kinesis.RemoveTagsFromStreamInput) (*request.Request, *kinesis.RemoveTagsFromStreamOutput)
|
||||
@@ -154,6 +173,10 @@ type KinesisAPI interface {
|
||||
StopStreamEncryptionWithContext(aws.Context, *kinesis.StopStreamEncryptionInput, ...request.Option) (*kinesis.StopStreamEncryptionOutput, error)
|
||||
StopStreamEncryptionRequest(*kinesis.StopStreamEncryptionInput) (*request.Request, *kinesis.StopStreamEncryptionOutput)
|
||||
|
||||
SubscribeToShard(*kinesis.SubscribeToShardInput) (*kinesis.SubscribeToShardOutput, error)
|
||||
SubscribeToShardWithContext(aws.Context, *kinesis.SubscribeToShardInput, ...request.Option) (*kinesis.SubscribeToShardOutput, error)
|
||||
SubscribeToShardRequest(*kinesis.SubscribeToShardInput) (*request.Request, *kinesis.SubscribeToShardOutput)
|
||||
|
||||
UpdateShardCount(*kinesis.UpdateShardCountInput) (*kinesis.UpdateShardCountOutput, error)
|
||||
UpdateShardCountWithContext(aws.Context, *kinesis.UpdateShardCountInput, ...request.Option) (*kinesis.UpdateShardCountOutput, error)
|
||||
UpdateShardCountRequest(*kinesis.UpdateShardCountInput) (*request.Request, *kinesis.UpdateShardCountOutput)
|
||||
|
||||
+6
-2
@@ -29,8 +29,9 @@ var initRequest func(*request.Request)
|
||||
|
||||
// Service information constants
|
||||
const (
|
||||
ServiceName = "kinesis" // Service endpoint prefix API calls made to.
|
||||
EndpointsID = ServiceName // Service ID for Regions and Endpoints metadata.
|
||||
ServiceName = "kinesis" // Name of service.
|
||||
EndpointsID = ServiceName // ID to lookup a service endpoint with.
|
||||
ServiceID = "Kinesis" // ServiceID is a unique identifer of a specific service.
|
||||
)
|
||||
|
||||
// New creates a new instance of the Kinesis client with a session.
|
||||
@@ -55,6 +56,7 @@ func newClient(cfg aws.Config, handlers request.Handlers, endpoint, signingRegio
|
||||
cfg,
|
||||
metadata.ClientInfo{
|
||||
ServiceName: ServiceName,
|
||||
ServiceID: ServiceID,
|
||||
SigningName: signingName,
|
||||
SigningRegion: signingRegion,
|
||||
Endpoint: endpoint,
|
||||
@@ -73,6 +75,8 @@ func newClient(cfg aws.Config, handlers request.Handlers, endpoint, signingRegio
|
||||
svc.Handlers.UnmarshalMeta.PushBackNamed(jsonrpc.UnmarshalMetaHandler)
|
||||
svc.Handlers.UnmarshalError.PushBackNamed(jsonrpc.UnmarshalErrorHandler)
|
||||
|
||||
svc.Handlers.UnmarshalStream.PushBackNamed(jsonrpc.UnmarshalHandler)
|
||||
|
||||
// Run custom client initialization if present
|
||||
if initClient != nil {
|
||||
initClient(svc.Client)
|
||||
|
||||
Reference in New Issue
Block a user