mirror of
https://github.com/aptly-dev/aptly.git
synced 2026-04-20 19:38:39 +00:00
Revert "use new azure-sdk"
This reverts commit e2cbd637b8.
# Conflicts:
# azure/public.go
# go.sum
# Conflicts:
# go.mod
# go.sum
This commit is contained in:
@@ -5,35 +5,28 @@ package azure
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"net/url"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
|
"github.com/Azure/azure-storage-blob-go/azblob"
|
||||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
|
||||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
|
||||||
"github.com/aptly-dev/aptly/aptly"
|
"github.com/aptly-dev/aptly/aptly"
|
||||||
)
|
)
|
||||||
|
|
||||||
func isBlobNotFound(err error) bool {
|
func isBlobNotFound(err error) bool {
|
||||||
var respErr *azcore.ResponseError
|
storageError, ok := err.(azblob.StorageError)
|
||||||
if errors.As(err, &respErr) {
|
return ok && storageError.ServiceCode() == azblob.ServiceCodeBlobNotFound
|
||||||
return respErr.StatusCode == 404 // BlobNotFound
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type azContext struct {
|
type azContext struct {
|
||||||
client *azblob.Client
|
container azblob.ContainerURL
|
||||||
container string
|
|
||||||
prefix string
|
prefix string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newAzContext(accountName, accountKey, container, prefix, endpoint string) (*azContext, error) {
|
func newAzContext(accountName, accountKey, container, prefix, endpoint string) (*azContext, error) {
|
||||||
cred, err := azblob.NewSharedKeyCredential(accountName, accountKey)
|
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -42,14 +35,15 @@ func newAzContext(accountName, accountKey, container, prefix, endpoint string) (
|
|||||||
endpoint = fmt.Sprintf("https://%s.blob.core.windows.net", accountName)
|
endpoint = fmt.Sprintf("https://%s.blob.core.windows.net", accountName)
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceClient, err := azblob.NewClientWithSharedKeyCredential(endpoint, cred, nil)
|
url, err := url.Parse(fmt.Sprintf("%s/%s", endpoint, container))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
containerURL := azblob.NewContainerURL(*url, azblob.NewPipeline(credential, azblob.PipelineOptions{}))
|
||||||
|
|
||||||
result := &azContext{
|
result := &azContext{
|
||||||
client: serviceClient,
|
container: containerURL,
|
||||||
container: container,
|
|
||||||
prefix: prefix,
|
prefix: prefix,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -60,6 +54,10 @@ func (az *azContext) blobPath(path string) string {
|
|||||||
return filepath.Join(az.prefix, path)
|
return filepath.Join(az.prefix, path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (az *azContext) blobURL(path string) azblob.BlobURL {
|
||||||
|
return az.container.NewBlobURL(az.blobPath(path))
|
||||||
|
}
|
||||||
|
|
||||||
func (az *azContext) internalFilelist(prefix string, progress aptly.Progress) (paths []string, md5s []string, err error) {
|
func (az *azContext) internalFilelist(prefix string, progress aptly.Progress) (paths []string, md5s []string, err error) {
|
||||||
const delimiter = "/"
|
const delimiter = "/"
|
||||||
paths = make([]string, 0, 1024)
|
paths = make([]string, 0, 1024)
|
||||||
@@ -69,33 +67,27 @@ func (az *azContext) internalFilelist(prefix string, progress aptly.Progress) (p
|
|||||||
prefix += delimiter
|
prefix += delimiter
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := context.Background()
|
for marker := (azblob.Marker{}); marker.NotDone(); {
|
||||||
maxResults := int32(1)
|
listBlob, err := az.container.ListBlobsFlatSegment(
|
||||||
pager := az.client.NewListBlobsFlatPager(az.container, &azblob.ListBlobsFlatOptions{
|
context.Background(), marker, azblob.ListBlobsSegmentOptions{
|
||||||
Prefix: &prefix,
|
Prefix: prefix,
|
||||||
MaxResults: &maxResults,
|
MaxResults: 1,
|
||||||
Include: azblob.ListBlobsInclude{Metadata: true},
|
Details: azblob.BlobListingDetails{Metadata: true}})
|
||||||
})
|
|
||||||
|
|
||||||
// Iterate over each page
|
|
||||||
for pager.More() {
|
|
||||||
page, err := pager.NextPage(ctx)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("error listing under prefix %s in %s: %s", prefix, az, err)
|
return nil, nil, fmt.Errorf("error listing under prefix %s in %s: %s", prefix, az, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, blob := range page.Segment.BlobItems {
|
marker = listBlob.NextMarker
|
||||||
if prefix == "" {
|
|
||||||
paths = append(paths, *blob.Name)
|
|
||||||
} else {
|
|
||||||
name := *blob.Name
|
|
||||||
paths = append(paths, name[len(prefix):])
|
|
||||||
}
|
|
||||||
b := *blob
|
|
||||||
md5 := b.Properties.ContentMD5
|
|
||||||
md5s = append(md5s, fmt.Sprintf("%x", md5))
|
|
||||||
|
|
||||||
|
for _, blob := range listBlob.Segment.BlobItems {
|
||||||
|
if prefix == "" {
|
||||||
|
paths = append(paths, blob.Name)
|
||||||
|
} else {
|
||||||
|
paths = append(paths, blob.Name[len(prefix):])
|
||||||
|
}
|
||||||
|
md5s = append(md5s, fmt.Sprintf("%x", blob.Properties.ContentMD5))
|
||||||
}
|
}
|
||||||
|
|
||||||
if progress != nil {
|
if progress != nil {
|
||||||
time.Sleep(time.Duration(500) * time.Millisecond)
|
time.Sleep(time.Duration(500) * time.Millisecond)
|
||||||
progress.AddBar(1)
|
progress.AddBar(1)
|
||||||
@@ -105,27 +97,28 @@ func (az *azContext) internalFilelist(prefix string, progress aptly.Progress) (p
|
|||||||
return paths, md5s, nil
|
return paths, md5s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (az *azContext) putFile(blobName string, source io.Reader, sourceMD5 string) error {
|
func (az *azContext) putFile(blob azblob.BlobURL, source io.Reader, sourceMD5 string) error {
|
||||||
uploadOptions := &azblob.UploadFileOptions{
|
uploadOptions := azblob.UploadStreamToBlockBlobOptions{
|
||||||
BlockSize: 4 * 1024 * 1024,
|
BufferSize: 4 * 1024 * 1024,
|
||||||
Concurrency: 8,
|
MaxBuffers: 8,
|
||||||
}
|
}
|
||||||
|
|
||||||
path := az.blobPath(blobName)
|
|
||||||
if len(sourceMD5) > 0 {
|
if len(sourceMD5) > 0 {
|
||||||
decodedMD5, err := hex.DecodeString(sourceMD5)
|
decodedMD5, err := hex.DecodeString(sourceMD5)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
uploadOptions.HTTPHeaders = &blob.HTTPHeaders{
|
uploadOptions.BlobHTTPHeaders = azblob.BlobHTTPHeaders{
|
||||||
BlobContentMD5: decodedMD5,
|
ContentMD5: decodedMD5,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
_, err := azblob.UploadStreamToBlockBlob(
|
||||||
if file, ok := source.(*os.File); ok {
|
context.Background(),
|
||||||
_, err = az.client.UploadFile(context.TODO(), az.container, path, file, uploadOptions)
|
source,
|
||||||
}
|
blob.ToBlockBlobURL(),
|
||||||
|
uploadOptions,
|
||||||
|
)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/Azure/azure-storage-blob-go/azblob"
|
||||||
"github.com/aptly-dev/aptly/aptly"
|
"github.com/aptly-dev/aptly/aptly"
|
||||||
"github.com/aptly-dev/aptly/utils"
|
"github.com/aptly-dev/aptly/utils"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
@@ -40,7 +41,10 @@ func (pool *PackagePool) buildPoolPath(filename string, checksums *utils.Checksu
|
|||||||
return filepath.Join(hash[0:2], hash[2:4], hash[4:32]+"_"+filename)
|
return filepath.Join(hash[0:2], hash[2:4], hash[4:32]+"_"+filename)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pool *PackagePool) ensureChecksums(poolPath string, checksumStorage aptly.ChecksumStorage) (*utils.ChecksumInfo, error) {
|
func (pool *PackagePool) ensureChecksums(
|
||||||
|
poolPath string,
|
||||||
|
checksumStorage aptly.ChecksumStorage,
|
||||||
|
) (*utils.ChecksumInfo, error) {
|
||||||
targetChecksums, err := checksumStorage.Get(poolPath)
|
targetChecksums, err := checksumStorage.Get(poolPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -48,7 +52,8 @@ func (pool *PackagePool) ensureChecksums(poolPath string, checksumStorage aptly.
|
|||||||
|
|
||||||
if targetChecksums == nil {
|
if targetChecksums == nil {
|
||||||
// we don't have checksums stored yet for this file
|
// we don't have checksums stored yet for this file
|
||||||
download, err := pool.az.client.DownloadStream(context.Background(), pool.az.container, poolPath, nil)
|
blob := pool.az.blobURL(poolPath)
|
||||||
|
download, err := blob.Download(context.Background(), 0, 0, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isBlobNotFound(err) {
|
if isBlobNotFound(err) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
@@ -58,7 +63,7 @@ func (pool *PackagePool) ensureChecksums(poolPath string, checksumStorage aptly.
|
|||||||
}
|
}
|
||||||
|
|
||||||
targetChecksums = &utils.ChecksumInfo{}
|
targetChecksums = &utils.ChecksumInfo{}
|
||||||
*targetChecksums, err = utils.ChecksumsForReader(download.Body)
|
*targetChecksums, err = utils.ChecksumsForReader(download.Body(azblob.RetryReaderOptions{}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "error checksumming blob at %s", poolPath)
|
return nil, errors.Wrapf(err, "error checksumming blob at %s", poolPath)
|
||||||
}
|
}
|
||||||
@@ -87,49 +92,46 @@ func (pool *PackagePool) LegacyPath(_ string, _ *utils.ChecksumInfo) (string, er
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (pool *PackagePool) Size(path string) (int64, error) {
|
func (pool *PackagePool) Size(path string) (int64, error) {
|
||||||
serviceClient := pool.az.client.ServiceClient()
|
blob := pool.az.blobURL(path)
|
||||||
containerClient := serviceClient.NewContainerClient(pool.az.container)
|
props, err := blob.GetProperties(context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
|
||||||
blobClient := containerClient.NewBlobClient(path)
|
|
||||||
|
|
||||||
props, err := blobClient.GetProperties(context.TODO(), nil)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, errors.Wrapf(err, "error examining %s from %s", path, pool)
|
return 0, errors.Wrapf(err, "error examining %s from %s", path, pool)
|
||||||
}
|
}
|
||||||
|
|
||||||
return *props.ContentLength, nil
|
return props.ContentLength(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pool *PackagePool) Open(path string) (aptly.ReadSeekerCloser, error) {
|
func (pool *PackagePool) Open(path string) (aptly.ReadSeekerCloser, error) {
|
||||||
|
blob := pool.az.blobURL(path)
|
||||||
|
|
||||||
temp, err := os.CreateTemp("", "blob-download")
|
temp, err := os.CreateTemp("", "blob-download")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "error creating tempfile for %s", path)
|
return nil, errors.Wrap(err, "error creating temporary file for blob download")
|
||||||
}
|
}
|
||||||
|
|
||||||
defer os.Remove(temp.Name())
|
defer os.Remove(temp.Name())
|
||||||
|
|
||||||
_, err = pool.az.client.DownloadFile(context.TODO(), pool.az.container, path, temp, nil)
|
err = azblob.DownloadBlobToFile(context.Background(), blob, 0, 0, temp, azblob.DownloadFromBlobOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "error downloading blob %s", path)
|
return nil, errors.Wrapf(err, "error downloading blob at %s", path)
|
||||||
}
|
}
|
||||||
|
|
||||||
return temp, nil
|
return temp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pool *PackagePool) Remove(path string) (int64, error) {
|
func (pool *PackagePool) Remove(path string) (int64, error) {
|
||||||
serviceClient := pool.az.client.ServiceClient()
|
blob := pool.az.blobURL(path)
|
||||||
containerClient := serviceClient.NewContainerClient(pool.az.container)
|
props, err := blob.GetProperties(context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
|
||||||
blobClient := containerClient.NewBlobClient(path)
|
|
||||||
|
|
||||||
props, err := blobClient.GetProperties(context.TODO(), nil)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, errors.Wrapf(err, "error examining %s from %s", path, pool)
|
return 0, errors.Wrapf(err, "error getting props of %s from %s", path, pool)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = pool.az.client.DeleteBlob(context.Background(), pool.az.container, path, nil)
|
_, err = blob.Delete(context.Background(), azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, errors.Wrapf(err, "error deleting %s from %s", path, pool)
|
return 0, errors.Wrapf(err, "error deleting %s from %s", path, pool)
|
||||||
}
|
}
|
||||||
|
|
||||||
return *props.ContentLength, nil
|
return props.ContentLength(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pool *PackagePool) Import(srcPath, basename string, checksums *utils.ChecksumInfo, _ bool, checksumStorage aptly.ChecksumStorage) (string, error) {
|
func (pool *PackagePool) Import(srcPath, basename string, checksums *utils.ChecksumInfo, _ bool, checksumStorage aptly.ChecksumStorage) (string, error) {
|
||||||
@@ -143,6 +145,7 @@ func (pool *PackagePool) Import(srcPath, basename string, checksums *utils.Check
|
|||||||
}
|
}
|
||||||
|
|
||||||
path := pool.buildPoolPath(basename, checksums)
|
path := pool.buildPoolPath(basename, checksums)
|
||||||
|
blob := pool.az.blobURL(path)
|
||||||
targetChecksums, err := pool.ensureChecksums(path, checksumStorage)
|
targetChecksums, err := pool.ensureChecksums(path, checksumStorage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
@@ -158,7 +161,7 @@ func (pool *PackagePool) Import(srcPath, basename string, checksums *utils.Check
|
|||||||
}
|
}
|
||||||
defer source.Close()
|
defer source.Close()
|
||||||
|
|
||||||
err = pool.az.putFile(path, source, checksums.MD5)
|
err = pool.az.putFile(blob, source, checksums.MD5)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
|
||||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
"github.com/Azure/azure-storage-blob-go/azblob"
|
||||||
"github.com/aptly-dev/aptly/aptly"
|
"github.com/aptly-dev/aptly/aptly"
|
||||||
"github.com/aptly-dev/aptly/files"
|
"github.com/aptly-dev/aptly/files"
|
||||||
"github.com/aptly-dev/aptly/utils"
|
"github.com/aptly-dev/aptly/utils"
|
||||||
@@ -50,10 +50,8 @@ func (s *PackagePoolSuite) SetUpTest(c *C) {
|
|||||||
|
|
||||||
s.pool, err = NewPackagePool(s.accountName, s.accountKey, container, "", s.endpoint)
|
s.pool, err = NewPackagePool(s.accountName, s.accountKey, container, "", s.endpoint)
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
publicAccessType := azblob.PublicAccessTypeContainer
|
cnt := s.pool.az.container
|
||||||
_, err = s.pool.az.client.CreateContainer(context.TODO(), s.pool.az.container, &azblob.CreateContainerOptions{
|
_, err = cnt.Create(context.Background(), azblob.Metadata{}, azblob.PublicAccessContainer)
|
||||||
Access: &publicAccessType,
|
|
||||||
})
|
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
s.prefixedPool, err = NewPackagePool(s.accountName, s.accountKey, container, prefix, s.endpoint)
|
s.prefixedPool, err = NewPackagePool(s.accountName, s.accountKey, container, prefix, s.endpoint)
|
||||||
|
|||||||
125
azure/public.go
125
azure/public.go
@@ -3,21 +3,20 @@ package azure
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
|
"github.com/Azure/azure-storage-blob-go/azblob"
|
||||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
|
||||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/lease"
|
|
||||||
"github.com/aptly-dev/aptly/aptly"
|
"github.com/aptly-dev/aptly/aptly"
|
||||||
"github.com/aptly-dev/aptly/utils"
|
"github.com/aptly-dev/aptly/utils"
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PublishedStorage abstract file system with published files (actually hosted on Azure)
|
// PublishedStorage abstract file system with published files (actually hosted on Azure)
|
||||||
type PublishedStorage struct {
|
type PublishedStorage struct {
|
||||||
|
container azblob.ContainerURL
|
||||||
prefix string
|
prefix string
|
||||||
az *azContext
|
az *azContext
|
||||||
pathCache map[string]map[string]string
|
pathCache map[string]map[string]string
|
||||||
@@ -67,7 +66,7 @@ func (storage *PublishedStorage) PutFile(path string, sourceFilename string) err
|
|||||||
}
|
}
|
||||||
defer source.Close()
|
defer source.Close()
|
||||||
|
|
||||||
err = storage.az.putFile(path, source, sourceMD5)
|
err = storage.az.putFile(storage.az.blobURL(path), source, sourceMD5)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = errors.Wrap(err, fmt.Sprintf("error uploading %s to %s", sourceFilename, storage))
|
err = errors.Wrap(err, fmt.Sprintf("error uploading %s to %s", sourceFilename, storage))
|
||||||
}
|
}
|
||||||
@@ -77,15 +76,14 @@ func (storage *PublishedStorage) PutFile(path string, sourceFilename string) err
|
|||||||
|
|
||||||
// RemoveDirs removes directory structure under public path
|
// RemoveDirs removes directory structure under public path
|
||||||
func (storage *PublishedStorage) RemoveDirs(path string, _ aptly.Progress) error {
|
func (storage *PublishedStorage) RemoveDirs(path string, _ aptly.Progress) error {
|
||||||
path = storage.az.blobPath(path)
|
|
||||||
filelist, err := storage.Filelist(path)
|
filelist, err := storage.Filelist(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, filename := range filelist {
|
for _, filename := range filelist {
|
||||||
blob := filepath.Join(path, filename)
|
blob := storage.az.blobURL(filepath.Join(path, filename))
|
||||||
_, err := storage.az.client.DeleteBlob(context.Background(), storage.az.container, blob, nil)
|
_, err := blob.Delete(context.Background(), azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error deleting path %s from %s: %s", filename, storage, err)
|
return fmt.Errorf("error deleting path %s from %s: %s", filename, storage, err)
|
||||||
}
|
}
|
||||||
@@ -96,8 +94,8 @@ func (storage *PublishedStorage) RemoveDirs(path string, _ aptly.Progress) error
|
|||||||
|
|
||||||
// Remove removes single file under public path
|
// Remove removes single file under public path
|
||||||
func (storage *PublishedStorage) Remove(path string) error {
|
func (storage *PublishedStorage) Remove(path string) error {
|
||||||
path = storage.az.blobPath(path)
|
blob := storage.az.blobURL(path)
|
||||||
_, err := storage.az.client.DeleteBlob(context.Background(), storage.az.container, path, nil)
|
_, err := blob.Delete(context.Background(), azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = errors.Wrap(err, fmt.Sprintf("error deleting %s from %s: %s", path, storage, err))
|
err = errors.Wrap(err, fmt.Sprintf("error deleting %s from %s: %s", path, storage, err))
|
||||||
}
|
}
|
||||||
@@ -116,8 +114,9 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
|
|||||||
sourcePath string, sourceChecksums utils.ChecksumInfo, force bool) error {
|
sourcePath string, sourceChecksums utils.ChecksumInfo, force bool) error {
|
||||||
|
|
||||||
relFilePath := filepath.Join(publishedRelPath, fileName)
|
relFilePath := filepath.Join(publishedRelPath, fileName)
|
||||||
prefixRelFilePath := filepath.Join(publishedPrefix, relFilePath)
|
// prefixRelFilePath := filepath.Join(publishedPrefix, relFilePath)
|
||||||
poolPath := storage.az.blobPath(prefixRelFilePath)
|
// FIXME: check how to integrate publishedPrefix:
|
||||||
|
poolPath := storage.az.blobPath(fileName)
|
||||||
|
|
||||||
if storage.pathCache == nil {
|
if storage.pathCache == nil {
|
||||||
storage.pathCache = make(map[string]map[string]string)
|
storage.pathCache = make(map[string]map[string]string)
|
||||||
@@ -160,7 +159,7 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
|
|||||||
}
|
}
|
||||||
defer source.Close()
|
defer source.Close()
|
||||||
|
|
||||||
err = storage.az.putFile(relFilePath, source, sourceMD5)
|
err = storage.az.putFile(storage.az.blobURL(relFilePath), source, sourceMD5)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
pathCache[relFilePath] = sourceMD5
|
pathCache[relFilePath] = sourceMD5
|
||||||
} else {
|
} else {
|
||||||
@@ -177,58 +176,57 @@ func (storage *PublishedStorage) Filelist(prefix string) ([]string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Internal copy or move implementation
|
// Internal copy or move implementation
|
||||||
func (storage *PublishedStorage) internalCopyOrMoveBlob(src, dst string, metadata map[string]*string, move bool) error {
|
func (storage *PublishedStorage) internalCopyOrMoveBlob(src, dst string, metadata azblob.Metadata, move bool) error {
|
||||||
const leaseDuration = 30
|
const leaseDuration = 30
|
||||||
leaseID := uuid.NewString()
|
|
||||||
|
|
||||||
serviceClient := storage.az.client.ServiceClient()
|
dstBlobURL := storage.az.blobURL(dst)
|
||||||
containerClient := serviceClient.NewContainerClient(storage.az.container)
|
srcBlobURL := storage.az.blobURL(src)
|
||||||
srcBlobClient := containerClient.NewBlobClient(src)
|
leaseResp, err := srcBlobURL.AcquireLease(context.Background(), "", leaseDuration, azblob.ModifiedAccessConditions{})
|
||||||
blobLeaseClient, err := lease.NewBlobClient(srcBlobClient, &lease.BlobClientOptions{LeaseID: to.Ptr(leaseID)})
|
if err != nil || leaseResp.StatusCode() != http.StatusCreated {
|
||||||
if err != nil {
|
return fmt.Errorf("error acquiring lease on source blob %s", srcBlobURL)
|
||||||
return fmt.Errorf("error acquiring lease on source blob %s", src)
|
|
||||||
}
|
}
|
||||||
|
defer srcBlobURL.BreakLease(context.Background(), azblob.LeaseBreakNaturally, azblob.ModifiedAccessConditions{})
|
||||||
|
srcBlobLeaseID := leaseResp.LeaseID()
|
||||||
|
|
||||||
_, err = blobLeaseClient.AcquireLease(context.Background(), leaseDuration, nil)
|
copyResp, err := dstBlobURL.StartCopyFromURL(
|
||||||
if err != nil {
|
context.Background(),
|
||||||
return fmt.Errorf("error acquiring lease on source blob %s", src)
|
srcBlobURL.URL(),
|
||||||
}
|
metadata,
|
||||||
defer blobLeaseClient.BreakLease(context.Background(), &lease.BlobBreakOptions{BreakPeriod: to.Ptr(int32(60))})
|
azblob.ModifiedAccessConditions{},
|
||||||
|
azblob.BlobAccessConditions{},
|
||||||
dstBlobClient := containerClient.NewBlobClient(dst)
|
azblob.DefaultAccessTier,
|
||||||
copyResp, err := dstBlobClient.StartCopyFromURL(context.Background(), srcBlobClient.URL(), &blob.StartCopyFromURLOptions{
|
nil)
|
||||||
Metadata: metadata,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error copying %s -> %s in %s: %s", src, dst, storage, err)
|
return fmt.Errorf("error copying %s -> %s in %s: %s", src, dst, storage, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
copyStatus := *copyResp.CopyStatus
|
copyStatus := copyResp.CopyStatus()
|
||||||
for {
|
for {
|
||||||
if copyStatus == blob.CopyStatusTypeSuccess {
|
if copyStatus == azblob.CopyStatusSuccess {
|
||||||
if move {
|
if move {
|
||||||
_, err := storage.az.client.DeleteBlob(context.Background(), storage.az.container, src, &blob.DeleteOptions{
|
_, err = srcBlobURL.Delete(
|
||||||
AccessConditions: &blob.AccessConditions{
|
context.Background(),
|
||||||
LeaseAccessConditions: &blob.LeaseAccessConditions{
|
azblob.DeleteSnapshotsOptionNone,
|
||||||
LeaseID: &leaseID,
|
azblob.BlobAccessConditions{
|
||||||
},
|
LeaseAccessConditions: azblob.LeaseAccessConditions{LeaseID: srcBlobLeaseID},
|
||||||
},
|
})
|
||||||
})
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
} else if copyStatus == blob.CopyStatusTypePending {
|
} else if copyStatus == azblob.CopyStatusPending {
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
getMetadata, err := dstBlobClient.GetProperties(context.TODO(), nil)
|
blobPropsResp, err := dstBlobURL.GetProperties(
|
||||||
|
context.Background(),
|
||||||
|
azblob.BlobAccessConditions{LeaseAccessConditions: azblob.LeaseAccessConditions{LeaseID: srcBlobLeaseID}},
|
||||||
|
azblob.ClientProvidedKeyOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error getting copy progress %s", dst)
|
return fmt.Errorf("error getting destination blob properties %s", dstBlobURL)
|
||||||
}
|
}
|
||||||
copyStatus = *getMetadata.CopyStatus
|
copyStatus = blobPropsResp.CopyStatus()
|
||||||
|
|
||||||
_, err = blobLeaseClient.RenewLease(context.Background(), nil)
|
_, err = srcBlobURL.RenewLease(context.Background(), srcBlobLeaseID, azblob.ModifiedAccessConditions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error renewing source blob lease %s", src)
|
return fmt.Errorf("error renewing source blob lease %s", srcBlobURL)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return fmt.Errorf("error copying %s -> %s in %s: %s", dst, src, storage, copyStatus)
|
return fmt.Errorf("error copying %s -> %s in %s: %s", dst, src, storage, copyStatus)
|
||||||
@@ -243,9 +241,7 @@ func (storage *PublishedStorage) RenameFile(oldName, newName string) error {
|
|||||||
|
|
||||||
// SymLink creates a copy of src file and adds link information as meta data
|
// SymLink creates a copy of src file and adds link information as meta data
|
||||||
func (storage *PublishedStorage) SymLink(src string, dst string) error {
|
func (storage *PublishedStorage) SymLink(src string, dst string) error {
|
||||||
metadata := make(map[string]*string)
|
return storage.internalCopyOrMoveBlob(src, dst, azblob.Metadata{"SymLink": src}, false /* move */)
|
||||||
metadata["SymLink"] = &src
|
|
||||||
return storage.internalCopyOrMoveBlob(src, dst, metadata, false /* do not remove src */)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// HardLink using symlink functionality as hard links do not exist
|
// HardLink using symlink functionality as hard links do not exist
|
||||||
@@ -255,33 +251,28 @@ func (storage *PublishedStorage) HardLink(src string, dst string) error {
|
|||||||
|
|
||||||
// FileExists returns true if path exists
|
// FileExists returns true if path exists
|
||||||
func (storage *PublishedStorage) FileExists(path string) (bool, error) {
|
func (storage *PublishedStorage) FileExists(path string) (bool, error) {
|
||||||
serviceClient := storage.az.client.ServiceClient()
|
blob := storage.az.blobURL(path)
|
||||||
containerClient := serviceClient.NewContainerClient(storage.az.container)
|
resp, err := blob.GetProperties(context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
|
||||||
blobClient := containerClient.NewBlobClient(path)
|
|
||||||
_, err := blobClient.GetProperties(context.Background(), nil)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if isBlobNotFound(err) {
|
if isBlobNotFound(err) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
return false, fmt.Errorf("error checking if blob %s exists: %v", path, err)
|
return false, err
|
||||||
|
} else if resp.StatusCode() == http.StatusOK {
|
||||||
|
return true, nil
|
||||||
}
|
}
|
||||||
return true, nil
|
return false, fmt.Errorf("error checking if blob %s exists %d", blob, resp.StatusCode())
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadLink returns the symbolic link pointed to by path.
|
// ReadLink returns the symbolic link pointed to by path.
|
||||||
// This simply reads text file created with SymLink
|
// This simply reads text file created with SymLink
|
||||||
func (storage *PublishedStorage) ReadLink(path string) (string, error) {
|
func (storage *PublishedStorage) ReadLink(path string) (string, error) {
|
||||||
serviceClient := storage.az.client.ServiceClient()
|
blob := storage.az.blobURL(path)
|
||||||
containerClient := serviceClient.NewContainerClient(storage.az.container)
|
resp, err := blob.GetProperties(context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
|
||||||
blobClient := containerClient.NewBlobClient(path)
|
|
||||||
props, err := blobClient.GetProperties(context.Background(), nil)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("failed to get blob properties: %v", err)
|
return "", err
|
||||||
|
} else if resp.StatusCode() != http.StatusOK {
|
||||||
|
return "", fmt.Errorf("error checking if blob %s exists %d", blob, resp.StatusCode())
|
||||||
}
|
}
|
||||||
|
return resp.NewMetadata()["SymLink"], nil
|
||||||
metadata := props.Metadata
|
|
||||||
if originalBlob, exists := metadata["original_blob"]; exists {
|
|
||||||
return *originalBlob, nil
|
|
||||||
}
|
|
||||||
return "", fmt.Errorf("error reading link %s: %v", path, err)
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,11 +7,8 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"bytes"
|
|
||||||
|
|
||||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
|
"github.com/Azure/azure-storage-blob-go/azblob"
|
||||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
|
||||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
|
||||||
"github.com/aptly-dev/aptly/files"
|
"github.com/aptly-dev/aptly/files"
|
||||||
"github.com/aptly-dev/aptly/utils"
|
"github.com/aptly-dev/aptly/utils"
|
||||||
. "gopkg.in/check.v1"
|
. "gopkg.in/check.v1"
|
||||||
@@ -69,10 +66,8 @@ func (s *PublishedStorageSuite) SetUpTest(c *C) {
|
|||||||
|
|
||||||
s.storage, err = NewPublishedStorage(s.accountName, s.accountKey, container, "", s.endpoint)
|
s.storage, err = NewPublishedStorage(s.accountName, s.accountKey, container, "", s.endpoint)
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
publicAccessType := azblob.PublicAccessTypeContainer
|
cnt := s.storage.az.container
|
||||||
_, err = s.storage.az.client.CreateContainer(context.Background(), s.storage.az.container, &azblob.CreateContainerOptions{
|
_, err = cnt.Create(context.Background(), azblob.Metadata{}, azblob.PublicAccessContainer)
|
||||||
Access: &publicAccessType,
|
|
||||||
})
|
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
|
|
||||||
s.prefixedStorage, err = NewPublishedStorage(s.accountName, s.accountKey, container, prefix, s.endpoint)
|
s.prefixedStorage, err = NewPublishedStorage(s.accountName, s.accountKey, container, prefix, s.endpoint)
|
||||||
@@ -80,39 +75,41 @@ func (s *PublishedStorageSuite) SetUpTest(c *C) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *PublishedStorageSuite) TearDownTest(c *C) {
|
func (s *PublishedStorageSuite) TearDownTest(c *C) {
|
||||||
_, err := s.storage.az.client.DeleteContainer(context.Background(), s.storage.az.container, nil)
|
cnt := s.storage.az.container
|
||||||
|
_, err := cnt.Delete(context.Background(), azblob.ContainerAccessConditions{})
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PublishedStorageSuite) GetFile(c *C, path string) []byte {
|
func (s *PublishedStorageSuite) GetFile(c *C, path string) []byte {
|
||||||
resp, err := s.storage.az.client.DownloadStream(context.Background(), s.storage.az.container, path, nil)
|
blob := s.storage.az.container.NewBlobURL(path)
|
||||||
|
resp, err := blob.Download(context.Background(), 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
data, err := ioutil.ReadAll(resp.Body)
|
body := resp.Body(azblob.RetryReaderOptions{MaxRetryRequests: 3})
|
||||||
|
data, err := ioutil.ReadAll(body)
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
return data
|
return data
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PublishedStorageSuite) AssertNoFile(c *C, path string) {
|
func (s *PublishedStorageSuite) AssertNoFile(c *C, path string) {
|
||||||
serviceClient := s.storage.az.client.ServiceClient()
|
_, err := s.storage.az.container.NewBlobURL(path).GetProperties(
|
||||||
containerClient := serviceClient.NewContainerClient(s.storage.az.container)
|
context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
|
||||||
blobClient := containerClient.NewBlobClient(path)
|
|
||||||
_, err := blobClient.GetProperties(context.Background(), nil)
|
|
||||||
c.Assert(err, NotNil)
|
c.Assert(err, NotNil)
|
||||||
|
storageError, ok := err.(azblob.StorageError)
|
||||||
storageError, ok := err.(*azcore.ResponseError)
|
|
||||||
c.Assert(ok, Equals, true)
|
c.Assert(ok, Equals, true)
|
||||||
c.Assert(storageError.StatusCode, Equals, 404)
|
c.Assert(string(storageError.ServiceCode()), Equals, string(string(azblob.StorageErrorCodeBlobNotFound)))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PublishedStorageSuite) PutFile(c *C, path string, data []byte) {
|
func (s *PublishedStorageSuite) PutFile(c *C, path string, data []byte) {
|
||||||
hash := md5.Sum(data)
|
hash := md5.Sum(data)
|
||||||
uploadOptions := &azblob.UploadStreamOptions{
|
_, err := azblob.UploadBufferToBlockBlob(
|
||||||
HTTPHeaders: &blob.HTTPHeaders{
|
context.Background(),
|
||||||
BlobContentMD5: hash[:],
|
data,
|
||||||
},
|
s.storage.az.container.NewBlockBlobURL(path),
|
||||||
}
|
azblob.UploadToBlockBlobOptions{
|
||||||
reader := bytes.NewReader(data)
|
BlobHTTPHeaders: azblob.BlobHTTPHeaders{
|
||||||
_, err := s.storage.az.client.UploadStream(context.Background(), s.storage.az.container, path, reader, uploadOptions)
|
ContentMD5: hash[:],
|
||||||
|
},
|
||||||
|
})
|
||||||
c.Assert(err, IsNil)
|
c.Assert(err, IsNil)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -333,7 +330,7 @@ func (s *PublishedStorageSuite) TestLinkFromPool(c *C) {
|
|||||||
|
|
||||||
// 2nd link from pool, providing wrong path for source file
|
// 2nd link from pool, providing wrong path for source file
|
||||||
//
|
//
|
||||||
// this test should check that file already exists in Azure and skip upload (which would fail if not skipped)
|
// this test should check that file already exists in S3 and skip upload (which would fail if not skipped)
|
||||||
s.prefixedStorage.pathCache = nil
|
s.prefixedStorage.pathCache = nil
|
||||||
err = s.prefixedStorage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, "wrong-looks-like-pathcache-doesnt-work", cksum1, false)
|
err = s.prefixedStorage.LinkFromPool("", filepath.Join("pool", "main", "m/mars-invaders"), "mars-invaders_1.03.deb", pool, "wrong-looks-like-pathcache-doesnt-work", cksum1, false)
|
||||||
c.Check(err, IsNil)
|
c.Check(err, IsNil)
|
||||||
|
|||||||
@@ -100,6 +100,7 @@ func (context *AptlyContext) config() *utils.ConfigStructure {
|
|||||||
configLocations := []string{homeLocation, "/usr/local/etc/aptly.conf", "/etc/aptly.conf"}
|
configLocations := []string{homeLocation, "/usr/local/etc/aptly.conf", "/etc/aptly.conf"}
|
||||||
|
|
||||||
for _, configLocation := range configLocations {
|
for _, configLocation := range configLocations {
|
||||||
|
// FIXME: check if exists, check if readable
|
||||||
err = utils.LoadConfig(configLocation, &utils.Config)
|
err = utils.LoadConfig(configLocation, &utils.Config)
|
||||||
if os.IsPermission(err) || os.IsNotExist(err) {
|
if os.IsPermission(err) || os.IsNotExist(err) {
|
||||||
continue
|
continue
|
||||||
|
|||||||
@@ -598,6 +598,7 @@ func (l *PackageList) Filter(options FilterOptions) (*PackageList, error) {
|
|||||||
//
|
//
|
||||||
// when follow-all-variants is enabled, we need to try to expand anyway,
|
// when follow-all-variants is enabled, we need to try to expand anyway,
|
||||||
// as even if dependency is satisfied now, there might be other ways to satisfy dependency
|
// as even if dependency is satisfied now, there might be other ways to satisfy dependency
|
||||||
|
// FIXME: do not search twice
|
||||||
if result.Search(dep, false, true) != nil {
|
if result.Search(dep, false, true) != nil {
|
||||||
if options.DependencyOptions&DepVerboseResolve == DepVerboseResolve && options.Progress != nil {
|
if options.DependencyOptions&DepVerboseResolve == DepVerboseResolve && options.Progress != nil {
|
||||||
options.Progress.ColoredPrintf("@{y}Already satisfied dependency@|: %s with %s", &dep, result.Search(dep, true, true))
|
options.Progress.ColoredPrintf("@{y}Already satisfied dependency@|: %s with %s", &dep, result.Search(dep, true, true))
|
||||||
|
|||||||
Reference in New Issue
Block a user