Grab downloader

This commit is contained in:
Lorenzo Bolla
2021-10-08 10:43:52 +02:00
parent f93bc6ef0f
commit 894192851e
38 changed files with 4240 additions and 1 deletions
+54
View File
@@ -0,0 +1,54 @@
/*
Package bps provides gauges for calculating the Bytes Per Second transfer rate
of data streams.
*/
package bps
import (
"context"
"time"
)
// Gauge is the common interface for all BPS gauges in this package. Given a
// set of samples over time, each gauge type can be used to measure the Bytes
// Per Second transfer rate of a data stream.
//
// All samples must monotonically increase in timestamp and value. Each sample
// should represent the total number of bytes sent in a stream, rather than
// accounting for the number sent since the last sample.
//
// To ensure a gauge can report progress as quickly as possible, take an initial
// sample when your stream first starts.
//
// All gauge implementations are safe for concurrent use.
type Gauge interface {
// Sample adds a new sample of the progress of the monitored stream.
Sample(t time.Time, n int64)
// BPS returns the calculated Bytes Per Second rate of the monitored stream.
BPS() float64
}
// SampleFunc is used by Watch to take periodic samples of a monitored stream.
type SampleFunc func() (n int64)
// Watch will periodically call the given SampleFunc to sample the progress of
// a monitored stream and update the given gauge. SampleFunc should return the
// total number of bytes transferred by the stream since it started.
//
// Watch is a blocking call and should typically be called in a new goroutine.
// To prevent the goroutine from leaking, make sure to cancel the given context
// once the stream is completed or canceled.
func Watch(ctx context.Context, g Gauge, f SampleFunc, interval time.Duration) {
g.Sample(time.Now(), f())
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case now := <-t.C:
g.Sample(now, f())
}
}
}
+81
View File
@@ -0,0 +1,81 @@
package bps
import (
"sync"
"time"
)
// NewSMA returns a gauge that uses a Simple Moving Average with the given
// number of samples to measure the bytes per second of a byte stream.
//
// BPS is computed using the timestamp of the most recent and oldest sample in
// the sample buffer. When a new sample is added, the oldest sample is dropped
// if the sample count exceeds maxSamples.
//
// The gauge does not account for any latency in arrival time of new samples or
// the desired window size. Any variance in the arrival of samples will result
// in a BPS measurement that is correct for the submitted samples, but over a
// varying time window.
//
// maxSamples should be equal to 1 + (window size / sampling interval) where
// window size is the number of seconds over which the moving average is
// smoothed and sampling interval is the number of seconds between each sample.
//
// For example, if you want a five second window, sampling once per second,
// maxSamples should be 1 + 5/1 = 6.
func NewSMA(maxSamples int) Gauge {
if maxSamples < 2 {
panic("sample count must be greater than 1")
}
return &sma{
maxSamples: uint64(maxSamples),
samples: make([]int64, maxSamples),
timestamps: make([]time.Time, maxSamples),
}
}
type sma struct {
mu sync.Mutex
index uint64
maxSamples uint64
sampleCount uint64
samples []int64
timestamps []time.Time
}
func (c *sma) Sample(t time.Time, n int64) {
c.mu.Lock()
defer c.mu.Unlock()
c.timestamps[c.index] = t
c.samples[c.index] = n
c.index = (c.index + 1) % c.maxSamples
// prevent integer overflow in sampleCount. Values greater or equal to
// maxSamples have the same semantic meaning.
c.sampleCount++
if c.sampleCount > c.maxSamples {
c.sampleCount = c.maxSamples
}
}
func (c *sma) BPS() float64 {
c.mu.Lock()
defer c.mu.Unlock()
// we need two samples to start
if c.sampleCount < 2 {
return 0
}
// First sample is always the oldest until ring buffer first overflows
oldest := c.index
if c.sampleCount < c.maxSamples {
oldest = 0
}
newest := (c.index + c.maxSamples - 1) % c.maxSamples
seconds := c.timestamps[newest].Sub(c.timestamps[oldest]).Seconds()
bytes := float64(c.samples[newest] - c.samples[oldest])
return bytes / seconds
}
+55
View File
@@ -0,0 +1,55 @@
package bps
import (
"testing"
"time"
)
type Sample struct {
N int64
Expect float64
}
func getSimpleSamples(sampleCount, rate int) []Sample {
a := make([]Sample, sampleCount)
for i := 1; i < sampleCount; i++ {
a[i] = Sample{N: int64(i * rate), Expect: float64(rate)}
}
return a
}
type SampleSetTest struct {
Gauge Gauge
Interval time.Duration
Samples []Sample
}
func (c *SampleSetTest) Run(t *testing.T) {
ts := time.Unix(0, 0)
for i, sample := range c.Samples {
c.Gauge.Sample(ts, sample.N)
if actual := c.Gauge.BPS(); actual != sample.Expect {
t.Errorf("expected: Gauge.BPS() → %0.2f, got %0.2f in test %d", sample.Expect, actual, i+1)
}
ts = ts.Add(c.Interval)
}
}
func TestSMA_SimpleSteadyCase(t *testing.T) {
test := &SampleSetTest{
Interval: time.Second,
Samples: getSimpleSamples(100000, 3),
}
t.Run("SmallSampleSize", func(t *testing.T) {
test.Gauge = NewSMA(2)
test.Run(t)
})
t.Run("RegularSize", func(t *testing.T) {
test.Gauge = NewSMA(6)
test.Run(t)
})
t.Run("LargeSampleSize", func(t *testing.T) {
test.Gauge = NewSMA(1000)
test.Run(t)
})
}