You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
372 lines
9.1 KiB
372 lines
9.1 KiB
package transform
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"gordenko.dev/dima/diploma"
|
|
"gordenko.dev/dima/diploma/bin"
|
|
"gordenko.dev/dima/diploma/timeutil"
|
|
)
|
|
|
|
// INSTANT
|
|
|
|
type InstantPeriodsWriterOptions struct {
|
|
Dst io.Writer
|
|
GroupBy diploma.GroupBy
|
|
Since uint32
|
|
AggregateFuncs byte
|
|
FirstHourOfDay int
|
|
}
|
|
|
|
type InstantPeriodsWriter struct {
|
|
aggregateFuncs byte
|
|
arr []byte
|
|
responder *ChunkedResponder
|
|
groupBy diploma.GroupBy
|
|
since uint32
|
|
firstHourOfDay int
|
|
time2period func(uint32) time.Time
|
|
currentPeriod time.Time
|
|
lastTimestamp uint32
|
|
endTimestamp uint32
|
|
min float64
|
|
max float64
|
|
total float64
|
|
entries int
|
|
}
|
|
|
|
func NewInstantPeriodsWriter(opt InstantPeriodsWriterOptions) (*InstantPeriodsWriter, error) {
|
|
if opt.Dst == nil {
|
|
return nil, errors.New("Dst option is required")
|
|
}
|
|
if opt.FirstHourOfDay < 0 || opt.FirstHourOfDay > 23 {
|
|
return nil, fmt.Errorf("wrong FirstHourOfDay option: %d", opt.FirstHourOfDay)
|
|
}
|
|
var q int
|
|
if (opt.AggregateFuncs & diploma.AggregateMin) == diploma.AggregateMin {
|
|
q++
|
|
}
|
|
if (opt.AggregateFuncs & diploma.AggregateMax) == diploma.AggregateMax {
|
|
q++
|
|
}
|
|
if (opt.AggregateFuncs & diploma.AggregateAvg) == diploma.AggregateAvg {
|
|
q++
|
|
}
|
|
|
|
if q == 0 {
|
|
return nil, errors.New("AggregateFuncs option is required")
|
|
}
|
|
|
|
s := &InstantPeriodsWriter{
|
|
aggregateFuncs: opt.AggregateFuncs,
|
|
arr: make([]byte, 12+q*8),
|
|
responder: NewChunkedResponder(opt.Dst),
|
|
groupBy: opt.GroupBy,
|
|
since: opt.Since,
|
|
firstHourOfDay: opt.FirstHourOfDay,
|
|
}
|
|
|
|
switch opt.GroupBy {
|
|
case diploma.GroupByHour:
|
|
s.time2period = groupByHour
|
|
|
|
case diploma.GroupByDay:
|
|
if s.firstHourOfDay > 0 {
|
|
s.time2period = s.groupByDayUsingFHD
|
|
} else {
|
|
s.time2period = groupByDay
|
|
}
|
|
|
|
case diploma.GroupByMonth:
|
|
if s.firstHourOfDay > 0 {
|
|
s.time2period = s.groupByMonthUsingFHD
|
|
} else {
|
|
s.time2period = groupByMonth
|
|
}
|
|
|
|
default:
|
|
return nil, fmt.Errorf("unknown groupBy %d option", opt.GroupBy)
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
func (s *InstantPeriodsWriter) groupByDayUsingFHD(timestamp uint32) time.Time {
|
|
tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "d")
|
|
if tm.Hour() < s.firstHourOfDay {
|
|
tm = tm.AddDate(0, 0, -1)
|
|
}
|
|
return tm
|
|
}
|
|
|
|
func (s *InstantPeriodsWriter) groupByMonthUsingFHD(timestamp uint32) time.Time {
|
|
tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m")
|
|
if tm.Hour() < s.firstHourOfDay {
|
|
tm = tm.AddDate(0, 0, -1)
|
|
}
|
|
return tm
|
|
}
|
|
|
|
func (s *InstantPeriodsWriter) Feed(timestamp uint32, value float64) {
|
|
s.feed(timestamp, value, false)
|
|
}
|
|
|
|
func (s *InstantPeriodsWriter) FeedNoSend(timestamp uint32, value float64) {
|
|
s.feed(timestamp, value, true)
|
|
}
|
|
|
|
func (s *InstantPeriodsWriter) feed(timestamp uint32, value float64, isBuffer bool) {
|
|
if s.entries > 0 {
|
|
period := s.time2period(timestamp)
|
|
if period != s.currentPeriod {
|
|
s.packPeriod(timestamp)
|
|
if isBuffer {
|
|
s.responder.BufferRecord(s.arr)
|
|
} else {
|
|
s.responder.AppendRecord(s.arr)
|
|
}
|
|
s.decrementPeriod()
|
|
for period.Before(s.currentPeriod) {
|
|
s.packBlankPeriod()
|
|
if isBuffer {
|
|
s.responder.BufferRecord(s.arr)
|
|
} else {
|
|
s.responder.AppendRecord(s.arr)
|
|
}
|
|
s.decrementPeriod()
|
|
}
|
|
s.endTimestamp = timestamp
|
|
s.min = value
|
|
s.max = value
|
|
s.total = value
|
|
s.entries = 1
|
|
} else {
|
|
if value < s.min {
|
|
s.min = value
|
|
} else if value > s.max {
|
|
s.max = value
|
|
}
|
|
s.total += value
|
|
s.entries++
|
|
}
|
|
} else {
|
|
s.endTimestamp = timestamp
|
|
s.min = value
|
|
s.max = value
|
|
s.total = value
|
|
s.entries = 1
|
|
s.currentPeriod = s.time2period(timestamp)
|
|
}
|
|
s.lastTimestamp = timestamp
|
|
}
|
|
|
|
func (s *InstantPeriodsWriter) decrementPeriod() {
|
|
switch s.groupBy {
|
|
case diploma.GroupByHour:
|
|
s.currentPeriod = s.currentPeriod.Add(-1 * time.Hour)
|
|
case diploma.GroupByDay:
|
|
s.currentPeriod = s.currentPeriod.AddDate(0, 0, -1)
|
|
case diploma.GroupByMonth:
|
|
s.currentPeriod = s.currentPeriod.AddDate(0, -1, 0)
|
|
}
|
|
}
|
|
|
|
func (s *InstantPeriodsWriter) packBlankPeriod() {
|
|
bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix()))
|
|
for i := 4; i < len(s.arr); i++ {
|
|
s.arr[i] = 0
|
|
}
|
|
}
|
|
|
|
func (s *InstantPeriodsWriter) Close() (err error) {
|
|
if s.entries > 0 {
|
|
if s.lastTimestamp >= s.since {
|
|
s.packPeriod(s.lastTimestamp)
|
|
s.responder.AppendRecord(s.arr)
|
|
}
|
|
}
|
|
return s.responder.Flush()
|
|
}
|
|
|
|
func (s *InstantPeriodsWriter) packPeriod(timestamp uint32) {
|
|
bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix()))
|
|
bin.PutUint32(s.arr[4:], timestamp)
|
|
bin.PutUint32(s.arr[8:], s.endTimestamp)
|
|
|
|
pos := 12
|
|
if (s.aggregateFuncs & diploma.AggregateMin) == diploma.AggregateMin {
|
|
bin.PutFloat64(s.arr[pos:], s.min)
|
|
pos += 8
|
|
}
|
|
if (s.aggregateFuncs & diploma.AggregateMax) == diploma.AggregateMax {
|
|
bin.PutFloat64(s.arr[pos:], s.max)
|
|
pos += 8
|
|
}
|
|
if (s.aggregateFuncs & diploma.AggregateAvg) == diploma.AggregateAvg {
|
|
bin.PutFloat64(s.arr[pos:], s.total/float64(s.entries))
|
|
}
|
|
}
|
|
|
|
type CumulativePeriodsWriter struct {
|
|
arr []byte
|
|
responder *ChunkedResponder
|
|
since uint32
|
|
firstHourOfDay int
|
|
currentPeriod time.Time
|
|
groupBy diploma.GroupBy
|
|
time2period func(uint32) time.Time
|
|
endTimestamp uint32
|
|
endValue float64
|
|
lastTimestamp uint32
|
|
lastValue float64
|
|
}
|
|
|
|
type CumulativePeriodsWriterOptions struct {
|
|
Dst io.Writer
|
|
GroupBy diploma.GroupBy
|
|
Since uint32
|
|
FirstHourOfDay int
|
|
}
|
|
|
|
func NewCumulativePeriodsWriter(opt CumulativePeriodsWriterOptions) (*CumulativePeriodsWriter, error) {
|
|
if opt.Dst == nil {
|
|
return nil, errors.New("Dst option is required")
|
|
}
|
|
if opt.FirstHourOfDay < 0 || opt.FirstHourOfDay > 23 {
|
|
return nil, fmt.Errorf("wrong firstHourOfDay option: %d", opt.FirstHourOfDay)
|
|
}
|
|
|
|
s := &CumulativePeriodsWriter{
|
|
arr: make([]byte, 28),
|
|
responder: NewChunkedResponder(opt.Dst),
|
|
since: opt.Since,
|
|
firstHourOfDay: opt.FirstHourOfDay,
|
|
groupBy: opt.GroupBy,
|
|
}
|
|
|
|
s.time2period = func(timestamp uint32) time.Time {
|
|
return timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "h")
|
|
}
|
|
|
|
switch opt.GroupBy {
|
|
case diploma.GroupByHour:
|
|
s.time2period = groupByHour
|
|
|
|
case diploma.GroupByDay:
|
|
if s.firstHourOfDay > 0 {
|
|
s.time2period = s.groupByDayUsingFHD
|
|
} else {
|
|
s.time2period = groupByDay
|
|
}
|
|
|
|
case diploma.GroupByMonth:
|
|
if s.firstHourOfDay > 0 {
|
|
s.time2period = s.groupByMonthUsingFHD
|
|
} else {
|
|
s.time2period = groupByMonth
|
|
}
|
|
|
|
default:
|
|
return nil, fmt.Errorf("unknown groupBy %d option", opt.GroupBy)
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
func (s *CumulativePeriodsWriter) groupByDayUsingFHD(timestamp uint32) time.Time {
|
|
tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "d")
|
|
if tm.Hour() < s.firstHourOfDay {
|
|
tm = tm.AddDate(0, 0, -1)
|
|
}
|
|
return tm
|
|
}
|
|
|
|
func (s *CumulativePeriodsWriter) groupByMonthUsingFHD(timestamp uint32) time.Time {
|
|
tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m")
|
|
if tm.Hour() < s.firstHourOfDay {
|
|
tm = tm.AddDate(0, 0, -1)
|
|
}
|
|
return tm
|
|
}
|
|
|
|
func (s *CumulativePeriodsWriter) Feed(timestamp uint32, value float64) {
|
|
s.feed(timestamp, value, false)
|
|
}
|
|
|
|
func (s *CumulativePeriodsWriter) FeedNoSend(timestamp uint32, value float64) {
|
|
s.feed(timestamp, value, true)
|
|
}
|
|
|
|
func (s *CumulativePeriodsWriter) feed(timestamp uint32, value float64, isBuffer bool) {
|
|
if s.endTimestamp > 0 {
|
|
period := s.time2period(timestamp)
|
|
if period != s.currentPeriod {
|
|
s.packPeriod(timestamp, value)
|
|
if isBuffer {
|
|
s.responder.BufferRecord(s.arr)
|
|
} else {
|
|
s.responder.AppendRecord(s.arr)
|
|
}
|
|
s.decrementPeriod()
|
|
for period.Before(s.currentPeriod) {
|
|
// вставляю пустышку
|
|
s.packBlankPeriod()
|
|
if isBuffer {
|
|
s.responder.BufferRecord(s.arr)
|
|
} else {
|
|
s.responder.AppendRecord(s.arr)
|
|
}
|
|
s.decrementPeriod()
|
|
}
|
|
s.endTimestamp = timestamp
|
|
s.endValue = value
|
|
}
|
|
} else {
|
|
s.endTimestamp = timestamp
|
|
s.endValue = value
|
|
s.currentPeriod = s.time2period(timestamp)
|
|
}
|
|
s.lastTimestamp = timestamp
|
|
s.lastValue = value
|
|
}
|
|
|
|
func (s *CumulativePeriodsWriter) decrementPeriod() {
|
|
switch s.groupBy {
|
|
case diploma.GroupByHour:
|
|
s.currentPeriod = s.currentPeriod.Add(-1 * time.Hour)
|
|
case diploma.GroupByDay:
|
|
s.currentPeriod = s.currentPeriod.AddDate(0, 0, -1)
|
|
case diploma.GroupByMonth:
|
|
s.currentPeriod = s.currentPeriod.AddDate(0, -1, 0)
|
|
}
|
|
}
|
|
|
|
func (s *CumulativePeriodsWriter) packBlankPeriod() {
|
|
bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix()))
|
|
for i := 4; i < len(s.arr); i++ {
|
|
s.arr[i] = 0
|
|
}
|
|
}
|
|
|
|
func (s *CumulativePeriodsWriter) packPeriod(start uint32, startValue float64) {
|
|
bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix()))
|
|
bin.PutUint32(s.arr[4:], start)
|
|
bin.PutUint32(s.arr[8:], s.endTimestamp)
|
|
bin.PutFloat64(s.arr[12:], startValue)
|
|
bin.PutFloat64(s.arr[20:], s.endValue)
|
|
}
|
|
|
|
func (s *CumulativePeriodsWriter) Close() error {
|
|
if s.endTimestamp > 0 {
|
|
if s.endTimestamp >= s.since {
|
|
if s.lastTimestamp != s.endTimestamp {
|
|
s.packPeriod(s.lastTimestamp, s.lastValue)
|
|
} else {
|
|
s.packPeriod(s.endTimestamp, s.endValue)
|
|
}
|
|
s.responder.AppendRecord(s.arr)
|
|
}
|
|
}
|
|
return s.responder.Flush()
|
|
}
|
|
|