|
|
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package timeseries implements a time series structure for stats collection.
package timeseries // import "golang.org/x/net/internal/timeseries"
import ( "fmt" "log" "time" )
const ( timeSeriesNumBuckets = 64 minuteHourSeriesNumBuckets = 60 )
var timeSeriesResolutions = []time.Duration{ 1 * time.Second, 10 * time.Second, 1 * time.Minute, 10 * time.Minute, 1 * time.Hour, 6 * time.Hour, 24 * time.Hour, // 1 day
7 * 24 * time.Hour, // 1 week
4 * 7 * 24 * time.Hour, // 4 weeks
16 * 7 * 24 * time.Hour, // 16 weeks
}
var minuteHourSeriesResolutions = []time.Duration{ 1 * time.Second, 1 * time.Minute, }
// An Observable is a kind of data that can be aggregated in a time series.
type Observable interface { Multiply(ratio float64) // Multiplies the data in self by a given ratio
Add(other Observable) // Adds the data from a different observation to self
Clear() // Clears the observation so it can be reused.
CopyFrom(other Observable) // Copies the contents of a given observation to self
}
// Float attaches the methods of Observable to a float64.
type Float float64
// NewFloat returns a Float.
func NewFloat() Observable { f := Float(0) return &f }
// String returns the float as a string.
func (f *Float) String() string { return fmt.Sprintf("%g", f.Value()) }
// Value returns the float's value.
func (f *Float) Value() float64 { return float64(*f) }
func (f *Float) Multiply(ratio float64) { *f *= Float(ratio) }
func (f *Float) Add(other Observable) { o := other.(*Float) *f += *o }
func (f *Float) Clear() { *f = 0 }
func (f *Float) CopyFrom(other Observable) { o := other.(*Float) *f = *o }
// A Clock tells the current time.
type Clock interface { Time() time.Time }
type defaultClock int
var defaultClockInstance defaultClock
func (defaultClock) Time() time.Time { return time.Now() }
// Information kept per level. Each level consists of a circular list of
// observations. The start of the level may be derived from end and the
// len(buckets) * sizeInMillis.
type tsLevel struct { oldest int // index to oldest bucketed Observable
newest int // index to newest bucketed Observable
end time.Time // end timestamp for this level
size time.Duration // duration of the bucketed Observable
buckets []Observable // collections of observations
provider func() Observable // used for creating new Observable
}
func (l *tsLevel) Clear() { l.oldest = 0 l.newest = len(l.buckets) - 1 l.end = time.Time{} for i := range l.buckets { if l.buckets[i] != nil { l.buckets[i].Clear() l.buckets[i] = nil } } }
func (l *tsLevel) InitLevel(size time.Duration, numBuckets int, f func() Observable) { l.size = size l.provider = f l.buckets = make([]Observable, numBuckets) }
// Keeps a sequence of levels. Each level is responsible for storing data at
// a given resolution. For example, the first level stores data at a one
// minute resolution while the second level stores data at a one hour
// resolution.
// Each level is represented by a sequence of buckets. Each bucket spans an
// interval equal to the resolution of the level. New observations are added
// to the last bucket.
type timeSeries struct { provider func() Observable // make more Observable
numBuckets int // number of buckets in each level
levels []*tsLevel // levels of bucketed Observable
lastAdd time.Time // time of last Observable tracked
total Observable // convenient aggregation of all Observable
clock Clock // Clock for getting current time
pending Observable // observations not yet bucketed
pendingTime time.Time // what time are we keeping in pending
dirty bool // if there are pending observations
}
// init initializes a level according to the supplied criteria.
func (ts *timeSeries) init(resolutions []time.Duration, f func() Observable, numBuckets int, clock Clock) { ts.provider = f ts.numBuckets = numBuckets ts.clock = clock ts.levels = make([]*tsLevel, len(resolutions))
for i := range resolutions { if i > 0 && resolutions[i-1] >= resolutions[i] { log.Print("timeseries: resolutions must be monotonically increasing") break } newLevel := new(tsLevel) newLevel.InitLevel(resolutions[i], ts.numBuckets, ts.provider) ts.levels[i] = newLevel }
ts.Clear() }
// Clear removes all observations from the time series.
func (ts *timeSeries) Clear() { ts.lastAdd = time.Time{} ts.total = ts.resetObservation(ts.total) ts.pending = ts.resetObservation(ts.pending) ts.pendingTime = time.Time{} ts.dirty = false
for i := range ts.levels { ts.levels[i].Clear() } }
// Add records an observation at the current time.
func (ts *timeSeries) Add(observation Observable) { ts.AddWithTime(observation, ts.clock.Time()) }
// AddWithTime records an observation at the specified time.
func (ts *timeSeries) AddWithTime(observation Observable, t time.Time) {
smallBucketDuration := ts.levels[0].size
if t.After(ts.lastAdd) { ts.lastAdd = t }
if t.After(ts.pendingTime) { ts.advance(t) ts.mergePendingUpdates() ts.pendingTime = ts.levels[0].end ts.pending.CopyFrom(observation) ts.dirty = true } else if t.After(ts.pendingTime.Add(-1 * smallBucketDuration)) { // The observation is close enough to go into the pending bucket.
// This compensates for clock skewing and small scheduling delays
// by letting the update stay in the fast path.
ts.pending.Add(observation) ts.dirty = true } else { ts.mergeValue(observation, t) } }
// mergeValue inserts the observation at the specified time in the past into all levels.
func (ts *timeSeries) mergeValue(observation Observable, t time.Time) { for _, level := range ts.levels { index := (ts.numBuckets - 1) - int(level.end.Sub(t)/level.size) if 0 <= index && index < ts.numBuckets { bucketNumber := (level.oldest + index) % ts.numBuckets if level.buckets[bucketNumber] == nil { level.buckets[bucketNumber] = level.provider() } level.buckets[bucketNumber].Add(observation) } } ts.total.Add(observation) }
// mergePendingUpdates applies the pending updates into all levels.
func (ts *timeSeries) mergePendingUpdates() { if ts.dirty { ts.mergeValue(ts.pending, ts.pendingTime) ts.pending = ts.resetObservation(ts.pending) ts.dirty = false } }
// advance cycles the buckets at each level until the latest bucket in
// each level can hold the time specified.
func (ts *timeSeries) advance(t time.Time) { if !t.After(ts.levels[0].end) { return } for i := 0; i < len(ts.levels); i++ { level := ts.levels[i] if !level.end.Before(t) { break }
// If the time is sufficiently far, just clear the level and advance
// directly.
if !t.Before(level.end.Add(level.size * time.Duration(ts.numBuckets))) { for _, b := range level.buckets { ts.resetObservation(b) } level.end = time.Unix(0, (t.UnixNano()/level.size.Nanoseconds())*level.size.Nanoseconds()) }
for t.After(level.end) { level.end = level.end.Add(level.size) level.newest = level.oldest level.oldest = (level.oldest + 1) % ts.numBuckets ts.resetObservation(level.buckets[level.newest]) }
t = level.end } }
// Latest returns the sum of the num latest buckets from the level.
func (ts *timeSeries) Latest(level, num int) Observable { now := ts.clock.Time() if ts.levels[0].end.Before(now) { ts.advance(now) }
ts.mergePendingUpdates()
result := ts.provider() l := ts.levels[level] index := l.newest
for i := 0; i < num; i++ { if l.buckets[index] != nil { result.Add(l.buckets[index]) } if index == 0 { index = ts.numBuckets } index-- }
return result }
// LatestBuckets returns a copy of the num latest buckets from level.
func (ts *timeSeries) LatestBuckets(level, num int) []Observable { if level < 0 || level > len(ts.levels) { log.Print("timeseries: bad level argument: ", level) return nil } if num < 0 || num >= ts.numBuckets { log.Print("timeseries: bad num argument: ", num) return nil }
results := make([]Observable, num) now := ts.clock.Time() if ts.levels[0].end.Before(now) { ts.advance(now) }
ts.mergePendingUpdates()
l := ts.levels[level] index := l.newest
for i := 0; i < num; i++ { result := ts.provider() results[i] = result if l.buckets[index] != nil { result.CopyFrom(l.buckets[index]) }
if index == 0 { index = ts.numBuckets } index -= 1 } return results }
// ScaleBy updates observations by scaling by factor.
func (ts *timeSeries) ScaleBy(factor float64) { for _, l := range ts.levels { for i := 0; i < ts.numBuckets; i++ { l.buckets[i].Multiply(factor) } }
ts.total.Multiply(factor) ts.pending.Multiply(factor) }
// Range returns the sum of observations added over the specified time range.
// If start or finish times don't fall on bucket boundaries of the same
// level, then return values are approximate answers.
func (ts *timeSeries) Range(start, finish time.Time) Observable { return ts.ComputeRange(start, finish, 1)[0] }
// Recent returns the sum of observations from the last delta.
func (ts *timeSeries) Recent(delta time.Duration) Observable { now := ts.clock.Time() return ts.Range(now.Add(-delta), now) }
// Total returns the total of all observations.
func (ts *timeSeries) Total() Observable { ts.mergePendingUpdates() return ts.total }
// ComputeRange computes a specified number of values into a slice using
// the observations recorded over the specified time period. The return
// values are approximate if the start or finish times don't fall on the
// bucket boundaries at the same level or if the number of buckets spanning
// the range is not an integral multiple of num.
func (ts *timeSeries) ComputeRange(start, finish time.Time, num int) []Observable { if start.After(finish) { log.Printf("timeseries: start > finish, %v>%v", start, finish) return nil }
if num < 0 { log.Printf("timeseries: num < 0, %v", num) return nil }
results := make([]Observable, num)
for _, l := range ts.levels { if !start.Before(l.end.Add(-l.size * time.Duration(ts.numBuckets))) { ts.extract(l, start, finish, num, results) return results } }
// Failed to find a level that covers the desired range. So just
// extract from the last level, even if it doesn't cover the entire
// desired range.
ts.extract(ts.levels[len(ts.levels)-1], start, finish, num, results)
return results }
// RecentList returns the specified number of values in slice over the most
// recent time period of the specified range.
func (ts *timeSeries) RecentList(delta time.Duration, num int) []Observable { if delta < 0 { return nil } now := ts.clock.Time() return ts.ComputeRange(now.Add(-delta), now, num) }
// extract returns a slice of specified number of observations from a given
// level over a given range.
func (ts *timeSeries) extract(l *tsLevel, start, finish time.Time, num int, results []Observable) { ts.mergePendingUpdates()
srcInterval := l.size dstInterval := finish.Sub(start) / time.Duration(num) dstStart := start srcStart := l.end.Add(-srcInterval * time.Duration(ts.numBuckets))
srcIndex := 0
// Where should scanning start?
if dstStart.After(srcStart) { advance := dstStart.Sub(srcStart) / srcInterval srcIndex += int(advance) srcStart = srcStart.Add(advance * srcInterval) }
// The i'th value is computed as show below.
// interval = (finish/start)/num
// i'th value = sum of observation in range
// [ start + i * interval,
// start + (i + 1) * interval )
for i := 0; i < num; i++ { results[i] = ts.resetObservation(results[i]) dstEnd := dstStart.Add(dstInterval) for srcIndex < ts.numBuckets && srcStart.Before(dstEnd) { srcEnd := srcStart.Add(srcInterval) if srcEnd.After(ts.lastAdd) { srcEnd = ts.lastAdd }
if !srcEnd.Before(dstStart) { srcValue := l.buckets[(srcIndex+l.oldest)%ts.numBuckets] if !srcStart.Before(dstStart) && !srcEnd.After(dstEnd) { // dst completely contains src.
if srcValue != nil { results[i].Add(srcValue) } } else { // dst partially overlaps src.
overlapStart := maxTime(srcStart, dstStart) overlapEnd := minTime(srcEnd, dstEnd) base := srcEnd.Sub(srcStart) fraction := overlapEnd.Sub(overlapStart).Seconds() / base.Seconds()
used := ts.provider() if srcValue != nil { used.CopyFrom(srcValue) } used.Multiply(fraction) results[i].Add(used) }
if srcEnd.After(dstEnd) { break } } srcIndex++ srcStart = srcStart.Add(srcInterval) } dstStart = dstStart.Add(dstInterval) } }
// resetObservation clears the content so the struct may be reused.
func (ts *timeSeries) resetObservation(observation Observable) Observable { if observation == nil { observation = ts.provider() } else { observation.Clear() } return observation }
// TimeSeries tracks data at granularities from 1 second to 16 weeks.
type TimeSeries struct { timeSeries }
// NewTimeSeries creates a new TimeSeries using the function provided for creating new Observable.
func NewTimeSeries(f func() Observable) *TimeSeries { return NewTimeSeriesWithClock(f, defaultClockInstance) }
// NewTimeSeriesWithClock creates a new TimeSeries using the function provided for creating new Observable and the clock for
// assigning timestamps.
func NewTimeSeriesWithClock(f func() Observable, clock Clock) *TimeSeries { ts := new(TimeSeries) ts.timeSeries.init(timeSeriesResolutions, f, timeSeriesNumBuckets, clock) return ts }
// MinuteHourSeries tracks data at granularities of 1 minute and 1 hour.
type MinuteHourSeries struct { timeSeries }
// NewMinuteHourSeries creates a new MinuteHourSeries using the function provided for creating new Observable.
func NewMinuteHourSeries(f func() Observable) *MinuteHourSeries { return NewMinuteHourSeriesWithClock(f, defaultClockInstance) }
// NewMinuteHourSeriesWithClock creates a new MinuteHourSeries using the function provided for creating new Observable and the clock for
// assigning timestamps.
func NewMinuteHourSeriesWithClock(f func() Observable, clock Clock) *MinuteHourSeries { ts := new(MinuteHourSeries) ts.timeSeries.init(minuteHourSeriesResolutions, f, minuteHourSeriesNumBuckets, clock) return ts }
func (ts *MinuteHourSeries) Minute() Observable { return ts.timeSeries.Latest(0, 60) }
func (ts *MinuteHourSeries) Hour() Observable { return ts.timeSeries.Latest(1, 60) }
func minTime(a, b time.Time) time.Time { if a.Before(b) { return a } return b }
func maxTime(a, b time.Time) time.Time { if a.After(b) { return a } return b }
|