package transform import ( "errors" "fmt" "io" "time" "gordenko.dev/dima/diploma" "gordenko.dev/dima/diploma/bin" "gordenko.dev/dima/diploma/timeutil" ) // INSTANT type InstantPeriodsWriterOptions struct { Dst io.Writer GroupBy diploma.GroupBy AggregateFuncs byte FirstHourOfDay int } type InstantPeriodsWriter struct { aggregateFuncs byte arr []byte responder *ChunkedResponder groupBy diploma.GroupBy firstHourOfDay int time2period func(uint32) time.Time currentPeriod time.Time lastTimestamp uint32 endTimestamp uint32 // время показания на конец периода min float64 max float64 total float64 entries int } func NewInstantPeriodsWriter(opt InstantPeriodsWriterOptions) (*InstantPeriodsWriter, error) { if opt.Dst == nil { return nil, errors.New("Dst option is required") } if opt.FirstHourOfDay < 0 || opt.FirstHourOfDay > 23 { return nil, fmt.Errorf("wrong FirstHourOfDay option: %d", opt.FirstHourOfDay) } // Считаю q, чтобы заранее выделить массив для упаковки периодов var q int if (opt.AggregateFuncs & diploma.AggregateMin) == diploma.AggregateMin { q++ } if (opt.AggregateFuncs & diploma.AggregateMax) == diploma.AggregateMax { q++ } if (opt.AggregateFuncs & diploma.AggregateAvg) == diploma.AggregateAvg { q++ } if q == 0 { return nil, errors.New("AggregateFuncs option is required") } // 12 - это period, since, until // 8 - это размер float64 s := &InstantPeriodsWriter{ aggregateFuncs: opt.AggregateFuncs, arr: make([]byte, 12+q*8), responder: NewChunkedResponder(opt.Dst), groupBy: opt.GroupBy, firstHourOfDay: opt.FirstHourOfDay, } switch opt.GroupBy { case diploma.GroupByHour: s.time2period = groupByHour case diploma.GroupByDay: if s.firstHourOfDay > 0 { s.time2period = s.groupByDayUsingFHD } else { s.time2period = groupByDay } case diploma.GroupByMonth: if s.firstHourOfDay > 0 { s.time2period = s.groupByMonthUsingFHD } else { s.time2period = groupByMonth } default: return nil, fmt.Errorf("unknown groupBy %d option", opt.GroupBy) } return s, nil } func (s *InstantPeriodsWriter) groupByDayUsingFHD(timestamp uint32) time.Time { tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "d") if tm.Hour() < s.firstHourOfDay { tm = tm.AddDate(0, 0, -1) } return tm } func (s *InstantPeriodsWriter) groupByMonthUsingFHD(timestamp uint32) time.Time { tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m") if tm.Hour() < s.firstHourOfDay { tm = tm.AddDate(0, 0, -1) } return tm } func (s *InstantPeriodsWriter) Feed(timestamp uint32, value float64) { s.feed(timestamp, value, false) } func (s *InstantPeriodsWriter) FeedNoSend(timestamp uint32, value float64) { s.feed(timestamp, value, true) } func (s *InstantPeriodsWriter) feed(timestamp uint32, value float64, isBuffer bool) { if s.entries > 0 { period := s.time2period(timestamp) if period != s.currentPeriod { // закрываю период // готовый период s.packPeriod(timestamp) if isBuffer { s.responder.BufferRecord(s.arr) } else { s.responder.AppendRecord(s.arr) } // затем s.decrementPeriod() //fmt.Println(" period: ", period.Format("2006-01-02 15:04:05")) //fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05")) for period.Before(s.currentPeriod) { // вставляю пустышку s.packBlankPeriod() if isBuffer { s.responder.BufferRecord(s.arr) } else { s.responder.AppendRecord(s.arr) } s.decrementPeriod() //fmt.Println(" period: ", period.Format("2006-01-02 15:04:05")) //fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05")) //return } s.endTimestamp = timestamp s.min = value s.max = value s.total = value s.entries = 1 } else { if value < s.min { s.min = value } else if value > s.max { s.max = value } // для подсчета AVG s.total += value s.entries++ } } else { s.endTimestamp = timestamp s.min = value s.max = value s.total = value s.entries = 1 s.currentPeriod = s.time2period(timestamp) } s.lastTimestamp = timestamp } func (s *InstantPeriodsWriter) decrementPeriod() { switch s.groupBy { case diploma.GroupByHour: s.currentPeriod = s.currentPeriod.Add(-1 * time.Hour) //fmt.Println("decrement") case diploma.GroupByDay: s.currentPeriod = s.currentPeriod.AddDate(0, 0, -1) case diploma.GroupByMonth: s.currentPeriod = s.currentPeriod.AddDate(0, -1, 0) } } func (s *InstantPeriodsWriter) packBlankPeriod() { //period := s.currentPeriod.Format("2006-01-02 15:04:05") //since := "0" //until := "0" //fmt.Printf("%s: %s - %s, %.0f - %.0f\n", period, since, until, 0.0, 0.0) // until - это endTimestamp всегда bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix())) for i := 4; i < len(s.arr); i++ { s.arr[i] = 0 } } func (s *InstantPeriodsWriter) Close() (err error) { if s.entries > 0 { s.packPeriod(s.lastTimestamp) s.responder.AppendRecord(s.arr) } return s.responder.Flush() } func (s *InstantPeriodsWriter) packPeriod(timestamp uint32) { bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix())) bin.PutUint32(s.arr[4:], timestamp) bin.PutUint32(s.arr[8:], s.endTimestamp) pos := 12 if (s.aggregateFuncs & diploma.AggregateMin) == diploma.AggregateMin { bin.PutFloat64(s.arr[pos:], s.min) pos += 8 } if (s.aggregateFuncs & diploma.AggregateMax) == diploma.AggregateMax { bin.PutFloat64(s.arr[pos:], s.max) pos += 8 } if (s.aggregateFuncs & diploma.AggregateAvg) == diploma.AggregateAvg { bin.PutFloat64(s.arr[pos:], s.total/float64(s.entries)) } } /* Идея с разбивкой на периоды: Для каждого периода нахожу одно последнее значение. Начало периода - это конец предыдущего. Если предыдущий не строго предыдущий, а с пропусками - на место пропусков вставляю пустышки. Плюс такого решения - я всегда показываю реальное значение на конец периода. */ type CumulativePeriodsWriter struct { arr []byte responder *ChunkedResponder firstHourOfDay int currentPeriod time.Time groupBy diploma.GroupBy time2period func(uint32) time.Time endTimestamp uint32 endValue float64 lastTimestamp uint32 lastValue float64 } type CumulativePeriodsWriterOptions struct { Dst io.Writer GroupBy diploma.GroupBy FirstHourOfDay int } func NewCumulativePeriodsWriter(opt CumulativePeriodsWriterOptions) (*CumulativePeriodsWriter, error) { if opt.Dst == nil { return nil, errors.New("Dst option is required") } // Считаю q, чтобы заранее выделить массив для упаковки периодов if opt.FirstHourOfDay < 0 || opt.FirstHourOfDay > 23 { return nil, fmt.Errorf("wrong firstHourOfDay option: %d", opt.FirstHourOfDay) } s := &CumulativePeriodsWriter{ arr: make([]byte, 28), responder: NewChunkedResponder(opt.Dst), firstHourOfDay: opt.FirstHourOfDay, groupBy: opt.GroupBy, } s.time2period = func(timestamp uint32) time.Time { return timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "h") } switch opt.GroupBy { case diploma.GroupByHour: s.time2period = groupByHour case diploma.GroupByDay: if s.firstHourOfDay > 0 { s.time2period = s.groupByDayUsingFHD } else { s.time2period = groupByDay } case diploma.GroupByMonth: if s.firstHourOfDay > 0 { s.time2period = s.groupByMonthUsingFHD } else { s.time2period = groupByMonth } default: return nil, fmt.Errorf("unknown groupBy %d option", opt.GroupBy) } return s, nil } func (s *CumulativePeriodsWriter) groupByDayUsingFHD(timestamp uint32) time.Time { tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "d") if tm.Hour() < s.firstHourOfDay { tm = tm.AddDate(0, 0, -1) } return tm } func (s *CumulativePeriodsWriter) groupByMonthUsingFHD(timestamp uint32) time.Time { tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m") if tm.Hour() < s.firstHourOfDay { tm = tm.AddDate(0, 0, -1) } return tm } func (s *CumulativePeriodsWriter) Feed(timestamp uint32, value float64) { s.feed(timestamp, value, false) } func (s *CumulativePeriodsWriter) FeedNoSend(timestamp uint32, value float64) { s.feed(timestamp, value, true) } func (s *CumulativePeriodsWriter) feed(timestamp uint32, value float64, isBuffer bool) { if s.endTimestamp > 0 { period := s.time2period(timestamp) if period != s.currentPeriod { // закрываю период s.packPeriod(timestamp, value) if isBuffer { s.responder.BufferRecord(s.arr) } else { s.responder.AppendRecord(s.arr) } // затем s.decrementPeriod() //fmt.Println(" period: ", period.Format("2006-01-02 15:04:05")) //fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05")) for period.Before(s.currentPeriod) { // вставляю пустышку s.packBlankPeriod() if isBuffer { s.responder.BufferRecord(s.arr) } else { s.responder.AppendRecord(s.arr) } s.decrementPeriod() //fmt.Println(" period: ", period.Format("2006-01-02 15:04:05")) //fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05")) //return } s.endTimestamp = timestamp s.endValue = value } } else { s.endTimestamp = timestamp s.endValue = value s.currentPeriod = s.time2period(timestamp) } s.lastTimestamp = timestamp s.lastValue = value } func (s *CumulativePeriodsWriter) decrementPeriod() { switch s.groupBy { case diploma.GroupByHour: s.currentPeriod = s.currentPeriod.Add(-1 * time.Hour) //fmt.Println("decrement") case diploma.GroupByDay: s.currentPeriod = s.currentPeriod.AddDate(0, 0, -1) case diploma.GroupByMonth: s.currentPeriod = s.currentPeriod.AddDate(0, -1, 0) } } func (s *CumulativePeriodsWriter) packBlankPeriod() { //period := s.currentPeriod.Format("2006-01-02 15:04:05") //since := "0" //until := "0" //fmt.Printf("%s: %s - %s, %.0f - %.0f\n", period, since, until, 0.0, 0.0) // until - это endTimestamp всегда bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix())) for i := 4; i < len(s.arr); i++ { s.arr[i] = 0 } } func (s *CumulativePeriodsWriter) packPeriod(start uint32, startValue float64) { //period := s.currentPeriod.Format("2006-01-02 15:04:05") //since := time.Unix(int64(start), 0).Format("2006-01-02 15:04:05") //until := time.Unix(int64(s.endTimestamp), 0).Format("2006-01-02 15:04:05") //fmt.Printf("%s: %s - %s, %.0f - %.0f\n", period, since, until, startValue, s.endValue) // until - это endTimestamp всегда bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix())) bin.PutUint32(s.arr[4:], start) bin.PutUint32(s.arr[8:], s.endTimestamp) bin.PutFloat64(s.arr[12:], startValue) bin.PutFloat64(s.arr[20:], s.endValue) } func (s *CumulativePeriodsWriter) Close() error { if s.endTimestamp > 0 { if s.lastTimestamp != s.endTimestamp { s.packPeriod(s.lastTimestamp, s.lastValue) } else { s.packPeriod(s.endTimestamp, s.endValue) } s.responder.AppendRecord(s.arr) } return s.responder.Flush() }