package s3 import ( "fmt" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" "github.com/smira/aptly/aptly" "github.com/smira/aptly/files" "os" "path/filepath" "strings" ) // PublishedStorage abstract file system with published files (actually hosted on S3) type PublishedStorage struct { s3 *s3.S3 config *aws.Config bucket string acl string prefix string storageClass string encryptionMethod string plusWorkaround bool disableMultiDel bool pathCache map[string]string } // Check interface var ( _ aptly.PublishedStorage = (*PublishedStorage)(nil) ) // NewPublishedStorageRaw creates published storage from raw aws credentials func NewPublishedStorageRaw( bucket, defaultACL, prefix, storageClass, encryptionMethod string, plusWorkaround, disabledMultiDel bool, config *aws.Config, ) (*PublishedStorage, error) { if defaultACL == "" { defaultACL = "private" } if storageClass == "STANDARD" { storageClass = "" } sess := session.New(config) result := &PublishedStorage{ s3: s3.New(sess), bucket: bucket, config: config, acl: defaultACL, prefix: prefix, storageClass: storageClass, encryptionMethod: encryptionMethod, plusWorkaround: plusWorkaround, disableMultiDel: disabledMultiDel, } return result, nil } // NewPublishedStorage creates new instance of PublishedStorage with specified S3 access // keys, region and bucket name func NewPublishedStorage(accessKey, secretKey, sessionToken, region, endpoint, bucket, defaultACL, prefix, storageClass, encryptionMethod string, plusWorkaround, disableMultiDel bool) (*PublishedStorage, error) { config := &aws.Config{ HTTPClient: RetryingClient, Region: aws.String(region), } if endpoint != "" { config = config.WithEndpoint(endpoint).WithS3ForcePathStyle(true) } if accessKey != "" { config.Credentials = credentials.NewStaticCredentials(accessKey, secretKey, sessionToken) } return NewPublishedStorageRaw(bucket, defaultACL, prefix, storageClass, encryptionMethod, plusWorkaround, disableMultiDel, config) } // String func (storage *PublishedStorage) String() string { return fmt.Sprintf("S3: %s:%s/%s", *storage.config.Region, storage.bucket, storage.prefix) } // MkDir creates directory recursively under public path func (storage *PublishedStorage) MkDir(path string) error { // no op for S3 return nil } // PutFile puts file into published storage at specified path func (storage *PublishedStorage) PutFile(path string, sourceFilename string) error { var ( source *os.File err error ) source, err = os.Open(sourceFilename) if err != nil { return err } defer source.Close() params := &s3.PutObjectInput{ Bucket: aws.String(storage.bucket), Key: aws.String(filepath.Join(storage.prefix, path)), Body: source, ACL: aws.String(storage.acl), } if storage.storageClass != "" { params.StorageClass = aws.String(storage.storageClass) } if storage.encryptionMethod != "" { params.ServerSideEncryption = aws.String(storage.encryptionMethod) } _, err = storage.s3.PutObject(params) if err != nil { return fmt.Errorf("error uploading %s to %s: %s", sourceFilename, storage, err) } if storage.plusWorkaround && strings.Index(path, "+") != -1 { return storage.PutFile(strings.Replace(path, "+", " ", -1), sourceFilename) } return nil } // Remove removes single file under public path func (storage *PublishedStorage) Remove(path string) error { params := &s3.DeleteObjectInput{ Bucket: aws.String(storage.bucket), Key: aws.String(path), } _, err := storage.s3.DeleteObject(params) if err != nil { return fmt.Errorf("error deleting %s from %s: %s", path, storage, err) } if storage.plusWorkaround && strings.Index(path, "+") != -1 { // try to remove workaround version, but don't care about result _ = storage.Remove(strings.Replace(path, "+", " ", -1)) } return nil } // RemoveDirs removes directory structure under public path func (storage *PublishedStorage) RemoveDirs(path string, progress aptly.Progress) error { const page = 1000 filelist, _, err := storage.internalFilelist(path, false) if err != nil { return err } if storage.disableMultiDel { for i := range filelist { params := &s3.DeleteObjectInput{ Bucket: aws.String(storage.bucket), Key: aws.String(filepath.Join(storage.prefix, path, filelist[i])), } _, err := storage.s3.DeleteObject(params) if err != nil { return fmt.Errorf("error deleting path %s from %s: %s", filelist[i], storage, err) } } } else { numParts := (len(filelist) + page - 1) / page for i := 0; i < numParts; i++ { var part []string if i == numParts-1 { part = filelist[i*page:] } else { part = filelist[i*page : (i+1)*page] } paths := make([]*s3.ObjectIdentifier, len(part)) for i := range part { paths[i] = &s3.ObjectIdentifier{ Key: aws.String(filepath.Join(storage.prefix, path, part[i])), } } params := &s3.DeleteObjectsInput{ Bucket: aws.String(storage.bucket), Delete: &s3.Delete{ Objects: paths, Quiet: aws.Bool(true), }, } _, err := storage.s3.DeleteObjects(params) if err != nil { return fmt.Errorf("error deleting multiple paths from %s: %s", storage, err) } } } return nil } // LinkFromPool links package file from pool to dist's pool location // // publishedDirectory is desired location in pool (like prefix/pool/component/liba/libav/) // sourcePool is instance of aptly.PackagePool // sourcePath is filepath to package file in package pool // // LinkFromPool returns relative path for the published file to be included in package index func (storage *PublishedStorage) LinkFromPool(publishedDirectory string, sourcePool aptly.PackagePool, sourcePath, sourceMD5 string, force bool) error { // verify that package pool is local pool in filesystem _ = sourcePool.(*files.PackagePool) baseName := filepath.Base(sourcePath) relPath := filepath.Join(publishedDirectory, baseName) poolPath := filepath.Join(storage.prefix, relPath) var ( err error ) if storage.pathCache == nil { paths, md5s, err := storage.internalFilelist(storage.prefix, true) if err != nil { return fmt.Errorf("error caching paths under prefix: %s", err) } storage.pathCache = make(map[string]string, len(paths)) for i := range paths { storage.pathCache[paths[i]] = md5s[i] } } destinationMD5, exists := storage.pathCache[relPath] if exists { if destinationMD5 == sourceMD5 { return nil } if !force && destinationMD5 != sourceMD5 { return fmt.Errorf("error putting file to %s: file already exists and is different: %s", poolPath, storage) } } err = storage.PutFile(relPath, sourcePath) if err == nil { storage.pathCache[relPath] = sourceMD5 } return err } // Filelist returns list of files under prefix func (storage *PublishedStorage) Filelist(prefix string) ([]string, error) { paths, _, err := storage.internalFilelist(prefix, true) return paths, err } func (storage *PublishedStorage) internalFilelist(prefix string, hidePlusWorkaround bool) (paths []string, md5s []string, err error) { paths = make([]string, 0, 1024) md5s = make([]string, 0, 1024) marker := "" prefix = filepath.Join(storage.prefix, prefix) if prefix != "" { prefix += "/" } for { params := &s3.ListObjectsInput{ Bucket: aws.String(storage.bucket), Prefix: aws.String(prefix), MaxKeys: aws.Int64(1000), } contents, err := storage.s3.ListObjects(params) if err != nil { return nil, nil, fmt.Errorf("error listing under prefix %s in %s: %s", prefix, storage, err) } lastKey := "" for _, key := range contents.Contents { lastKey = *key.Key if storage.plusWorkaround && hidePlusWorkaround && strings.Index(lastKey, " ") != -1 { // if we use plusWorkaround, we want to hide those duplicates /// from listing continue } if prefix == "" { paths = append(paths, *key.Key) } else { paths = append(paths, (*key.Key)[len(prefix):]) } md5s = append(md5s, strings.Replace(*key.ETag, "\"", "", -1)) } if contents.IsTruncated != nil && *contents.IsTruncated { marker = *contents.NextMarker if marker == "" { // From the s3 docs: If response does not include the // NextMarker and it is truncated, you can use the value of the // last Key in the response as the marker in the subsequent // request to get the next set of object keys. marker = lastKey } } else { break } } return paths, md5s, nil } // RenameFile renames (moves) file func (storage *PublishedStorage) RenameFile(oldName, newName string) error { source := fmt.Sprintf("/%s/%s", storage.bucket, filepath.Join(storage.prefix, oldName)) params := &s3.CopyObjectInput{ Bucket: aws.String(storage.bucket), CopySource: aws.String(source), Key: aws.String(filepath.Join(storage.prefix, newName)), ACL: aws.String(storage.acl), } _, err := storage.s3.CopyObject(params) if err != nil { return fmt.Errorf("error copying %s -> %s in %s: %s", oldName, newName, storage, err) } return storage.Remove(oldName) }