Add support for Azure package pools

This adds support for storing packages directly on Azure, with no truly
"local" (on-disk) repo used. The existing Azure PublishedStorage
implementation was refactored to move the shared code to a separate
context struct, which can then be re-used by the new PackagePool. In
addition, the files package's mockChecksumStorage was made public so
that it could be used in the Azure PackagePool tests as well.

Signed-off-by: Ryan Gonzalez <ryan.gonzalez@collabora.com>
This commit is contained in:
Ryan Gonzalez
2022-05-17 08:52:59 -05:00
committed by André Roth
parent 810df17009
commit f9325fbc91
16 changed files with 820 additions and 148 deletions
+16 -99
View File
@@ -2,11 +2,8 @@ package azure
import (
"context"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"time"
@@ -21,6 +18,7 @@ import (
type PublishedStorage struct {
container azblob.ContainerURL
prefix string
az *azContext
pathCache map[string]map[string]string
}
@@ -31,33 +29,17 @@ var (
// NewPublishedStorage creates published storage from Azure storage credentials
func NewPublishedStorage(accountName, accountKey, container, prefix, endpoint string) (*PublishedStorage, error) {
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
azctx, err := newAzContext(accountName, accountKey, container, prefix, endpoint)
if err != nil {
return nil, err
}
if endpoint == "" {
endpoint = fmt.Sprintf("https://%s.blob.core.windows.net", accountName)
}
url, err := url.Parse(fmt.Sprintf("%s/%s", endpoint, container))
if err != nil {
return nil, err
}
containerURL := azblob.NewContainerURL(*url, azblob.NewPipeline(credential, azblob.PipelineOptions{}))
result := &PublishedStorage{
container: containerURL,
prefix: prefix,
}
return result, nil
return &PublishedStorage{az: azctx}, nil
}
// String
func (storage *PublishedStorage) String() string {
return fmt.Sprintf("Azure: %s/%s", storage.container, storage.prefix)
return storage.az.String()
}
// MkDir creates directory recursively under public path
@@ -84,7 +66,7 @@ func (storage *PublishedStorage) PutFile(path string, sourceFilename string) err
}
defer source.Close()
err = storage.putFile(path, source, sourceMD5)
err = storage.az.putFile(storage.az.blobURL(path), source, sourceMD5)
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("error uploading %s to %s", sourceFilename, storage))
}
@@ -92,36 +74,6 @@ func (storage *PublishedStorage) PutFile(path string, sourceFilename string) err
return err
}
// putFile uploads file-like object to
func (storage *PublishedStorage) putFile(path string, source io.Reader, sourceMD5 string) error {
path = filepath.Join(storage.prefix, path)
blob := storage.container.NewBlockBlobURL(path)
uploadOptions := azblob.UploadStreamToBlockBlobOptions{
BufferSize: 4 * 1024 * 1024,
MaxBuffers: 8,
}
if len(sourceMD5) > 0 {
decodedMD5, err := hex.DecodeString(sourceMD5)
if err != nil {
return err
}
uploadOptions.BlobHTTPHeaders = azblob.BlobHTTPHeaders{
ContentMD5: decodedMD5,
}
}
_, err := azblob.UploadStreamToBlockBlob(
context.Background(),
source,
blob,
uploadOptions,
)
return err
}
// RemoveDirs removes directory structure under public path
func (storage *PublishedStorage) RemoveDirs(path string, _ aptly.Progress) error {
filelist, err := storage.Filelist(path)
@@ -130,7 +82,7 @@ func (storage *PublishedStorage) RemoveDirs(path string, _ aptly.Progress) error
}
for _, filename := range filelist {
blob := storage.container.NewBlobURL(filepath.Join(storage.prefix, path, filename))
blob := storage.az.blobURL(filepath.Join(path, filename))
_, err := blob.Delete(context.Background(), azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{})
if err != nil {
return fmt.Errorf("error deleting path %s from %s: %s", filename, storage, err)
@@ -142,7 +94,7 @@ func (storage *PublishedStorage) RemoveDirs(path string, _ aptly.Progress) error
// Remove removes single file under public path
func (storage *PublishedStorage) Remove(path string) error {
blob := storage.container.NewBlobURL(filepath.Join(storage.prefix, path))
blob := storage.az.blobURL(path)
_, err := blob.Delete(context.Background(), azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{})
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("error deleting %s from %s: %s", path, storage, err))
@@ -163,14 +115,14 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
relFilePath := filepath.Join(publishedRelPath, fileName)
prefixRelFilePath := filepath.Join(publishedPrefix, relFilePath)
poolPath := filepath.Join(storage.prefix, prefixRelFilePath)
poolPath := storage.az.blobPath(fileName)
if storage.pathCache == nil {
storage.pathCache = make(map[string]map[string]string)
}
pathCache := storage.pathCache[publishedPrefix]
if pathCache == nil {
paths, md5s, err := storage.internalFilelist(publishedPrefix)
paths, md5s, err := storage.az.internalFilelist(publishedPrefix, nil)
if err != nil {
return fmt.Errorf("error caching paths under prefix: %s", err)
}
@@ -206,7 +158,7 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
}
defer source.Close()
err = storage.putFile(prefixRelFilePath, source, sourceMD5)
err = storage.az.putFile(storage.az.blobURL(relPath), source, sourceMD5)
if err == nil {
pathCache[relFilePath] = sourceMD5
} else {
@@ -216,43 +168,9 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
return err
}
func (storage *PublishedStorage) internalFilelist(prefix string) (paths []string, md5s []string, err error) {
const delimiter = "/"
paths = make([]string, 0, 1024)
md5s = make([]string, 0, 1024)
prefix = filepath.Join(storage.prefix, prefix)
if prefix != "" {
prefix += delimiter
}
for marker := (azblob.Marker{}); marker.NotDone(); {
listBlob, err := storage.container.ListBlobsFlatSegment(
context.Background(), marker, azblob.ListBlobsSegmentOptions{
Prefix: prefix,
MaxResults: 1000,
Details: azblob.BlobListingDetails{Metadata: true}})
if err != nil {
return nil, nil, fmt.Errorf("error listing under prefix %s in %s: %s", prefix, storage, err)
}
marker = listBlob.NextMarker
for _, blob := range listBlob.Segment.BlobItems {
if prefix == "" {
paths = append(paths, blob.Name)
} else {
paths = append(paths, blob.Name[len(prefix):])
}
md5s = append(md5s, fmt.Sprintf("%x", blob.Properties.ContentMD5))
}
}
return paths, md5s, nil
}
// Filelist returns list of files under prefix
func (storage *PublishedStorage) Filelist(prefix string) ([]string, error) {
paths, _, err := storage.internalFilelist(prefix)
paths, _, err := storage.az.internalFilelist(prefix, nil)
return paths, err
}
@@ -260,8 +178,8 @@ func (storage *PublishedStorage) Filelist(prefix string) ([]string, error) {
func (storage *PublishedStorage) internalCopyOrMoveBlob(src, dst string, metadata azblob.Metadata, move bool) error {
const leaseDuration = 30
dstBlobURL := storage.container.NewBlobURL(filepath.Join(storage.prefix, dst))
srcBlobURL := storage.container.NewBlobURL(filepath.Join(storage.prefix, src))
dstBlobURL := storage.az.blobURL(dst)
srcBlobURL := storage.az.blobURL(src)
leaseResp, err := srcBlobURL.AcquireLease(context.Background(), "", leaseDuration, azblob.ModifiedAccessConditions{})
if err != nil || leaseResp.StatusCode() != http.StatusCreated {
return fmt.Errorf("error acquiring lease on source blob %s", srcBlobURL)
@@ -332,11 +250,10 @@ func (storage *PublishedStorage) HardLink(src string, dst string) error {
// FileExists returns true if path exists
func (storage *PublishedStorage) FileExists(path string) (bool, error) {
blob := storage.container.NewBlobURL(filepath.Join(storage.prefix, path))
blob := storage.az.blobURL(path)
resp, err := blob.GetProperties(context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
if err != nil {
storageError, ok := err.(azblob.StorageError)
if ok && string(storageError.ServiceCode()) == string(azblob.StorageErrorCodeBlobNotFound) {
if isBlobNotFound(err) {
return false, nil
}
return false, err
@@ -349,7 +266,7 @@ func (storage *PublishedStorage) FileExists(path string) (bool, error) {
// ReadLink returns the symbolic link pointed to by path.
// This simply reads text file created with SymLink
func (storage *PublishedStorage) ReadLink(path string) (string, error) {
blob := storage.container.NewBlobURL(filepath.Join(storage.prefix, path))
blob := storage.az.blobURL(path)
resp, err := blob.GetProperties(context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
if err != nil {
return "", err