diff --git a/atree/aggregate.go b/atree/aggregate.go deleted file mode 100644 index c4081ab..0000000 --- a/atree/aggregate.go +++ /dev/null @@ -1,325 +0,0 @@ -package atree - -import ( - "fmt" - "time" - - "gordenko.dev/dima/diploma" - "gordenko.dev/dima/diploma/timeutil" -) - -// AGGREGATE - -type InstantAggregator struct { - firstHourOfDay int - lastDayOfMonth int - time2period func(uint32) uint32 - currentPeriod uint32 - since uint32 - until uint32 - min float64 - max float64 - total float64 - entries int -} - -type InstantAggregatorOptions struct { - GroupBy diploma.GroupBy - FirstHourOfDay int - LastDayOfMonth int -} - -func NewInstantAggregator(opt InstantAggregatorOptions) (*InstantAggregator, error) { - s := &InstantAggregator{ - firstHourOfDay: opt.FirstHourOfDay, - lastDayOfMonth: opt.LastDayOfMonth, - } - - switch opt.GroupBy { - case diploma.GroupByHour: - s.time2period = groupByHour - - case diploma.GroupByDay: - if s.firstHourOfDay > 0 { - s.time2period = s.groupByDayUsingFHD - } else { - s.time2period = groupByDay - } - - case diploma.GroupByMonth: - if s.firstHourOfDay > 0 { - if s.lastDayOfMonth > 0 { - s.time2period = s.groupByMonthUsingFHDAndLDM - } else { - s.time2period = s.groupByMonthUsingFHD - } - } else { - if s.lastDayOfMonth > 0 { - s.time2period = s.groupByMonthUsingLDM - } else { - s.time2period = groupByMonth - } - } - - default: - return nil, fmt.Errorf("unknown groupBy %d option", opt.GroupBy) - } - - return s, nil -} - -// Приходят данные от свежих к старым, тоесть сперва получаю Until. -// return period complete flag -func (s *InstantAggregator) Feed(timestamp uint32, value float64, p *InstantPeriod) bool { - period := s.time2period(timestamp) - //fmt.Printf("feed: %s %v, period: %s\n", time.Unix(int64(timestamp), 0), value, time.Unix(int64(period), 0)) - if s.entries == 0 { - s.currentPeriod = period - s.since = timestamp - s.until = timestamp - s.min = value - s.max = value - s.total = value - s.entries = 1 - return false - } - - if period != s.currentPeriod { - // готовый период - s.FillPeriod(timestamp, p) - s.currentPeriod = period - s.since = timestamp - s.until = timestamp - s.min = value - s.max = value - s.total = value - s.entries = 1 - return true - } - - if value < s.min { - s.min = value - } else if value > s.max { - s.max = value - } - // для подсчета AVG - s.total += value - s.entries++ - // начало периода - s.since = timestamp - return false -} - -func (s *InstantAggregator) FillPeriod(prevTimestamp uint32, p *InstantPeriod) bool { - if s.entries == 0 { - return false - } - - //fmt.Printf("FillPeriod: %s, prevTimestamp: %s\n", time.Unix(int64(s.currentPeriod), 0), time.Unix(int64(prevTimestamp), 0)) - p.Period = s.currentPeriod - if prevTimestamp > 0 { - p.Since = prevTimestamp - } else { - p.Since = s.since - } - p.Until = s.until - p.Min = s.min - p.Max = s.max - p.Avg = s.total / float64(s.entries) - return true -} - -func (s *InstantAggregator) groupByDayUsingFHD(timestamp uint32) uint32 { - tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "d") - if tm.Hour() < s.firstHourOfDay { - tm = tm.AddDate(0, 0, -1) - } - return uint32(tm.Unix()) -} - -func (s *InstantAggregator) groupByMonthUsingFHD(timestamp uint32) uint32 { - tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m") - if tm.Hour() < s.firstHourOfDay { - tm = tm.AddDate(0, 0, -1) - } - return uint32(tm.Unix()) -} - -func (s *InstantAggregator) groupByMonthUsingLDM(timestamp uint32) uint32 { - tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m") - if tm.Day() > s.lastDayOfMonth { - tm = tm.AddDate(0, 1, 0) - } - return uint32(tm.Unix()) -} - -func (s *InstantAggregator) groupByMonthUsingFHDAndLDM(timestamp uint32) uint32 { - // ВАЖНО! - // Сперва проверяю время. - tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m") - if tm.Hour() < s.firstHourOfDay { - tm = tm.AddDate(0, 0, -1) - } - if tm.Day() > s.lastDayOfMonth { - tm = tm.AddDate(0, 1, 0) - } - return uint32(tm.Unix()) -} - -// CUMULATIVE - -type CumulativeAggregator struct { - firstHourOfDay int - lastDayOfMonth int - time2period func(uint32) uint32 - currentPeriod uint32 - since uint32 - until uint32 - sinceValue float64 - untilValue float64 - entries int -} - -type CumulativeAggregatorOptions struct { - GroupBy diploma.GroupBy - FirstHourOfDay int - LastDayOfMonth int -} - -func NewCumulativeAggregator(opt CumulativeAggregatorOptions) (*CumulativeAggregator, error) { - s := &CumulativeAggregator{ - firstHourOfDay: opt.FirstHourOfDay, - lastDayOfMonth: opt.LastDayOfMonth, - } - - switch opt.GroupBy { - case diploma.GroupByHour: - s.time2period = groupByHour - - case diploma.GroupByDay: - if s.firstHourOfDay > 0 { - s.time2period = s.groupByDayUsingFHD - } else { - s.time2period = groupByDay - } - - case diploma.GroupByMonth: - if s.firstHourOfDay > 0 { - if s.lastDayOfMonth > 0 { - s.time2period = s.groupByMonthUsingFHDAndLDM - } else { - s.time2period = s.groupByMonthUsingFHD - } - } else { - if s.lastDayOfMonth > 0 { - s.time2period = s.groupByMonthUsingLDM - } else { - s.time2period = groupByMonth - } - } - - default: - return nil, fmt.Errorf("unknown groupBy %d option", opt.GroupBy) - } - - return s, nil -} - -// return period complete flag -func (s *CumulativeAggregator) Feed(timestamp uint32, value float64, p *CumulativePeriod) bool { - period := s.time2period(timestamp) - if s.entries == 0 { - s.currentPeriod = period - s.since = timestamp - s.until = timestamp - s.sinceValue = value - s.untilValue = value - s.entries = 1 - return false - } - - if period != s.currentPeriod { - // готовый период - s.FillPeriod(timestamp, value, p) - s.currentPeriod = period - s.since = timestamp - s.until = timestamp - s.sinceValue = value - s.untilValue = value - s.entries = 1 - return true - } - - // начало периода - s.since = timestamp - s.sinceValue = value - s.entries++ - return false -} - -func (s *CumulativeAggregator) FillPeriod(prevTimestamp uint32, value float64, p *CumulativePeriod) bool { - if s.entries == 0 { - return false - } - p.Period = s.currentPeriod - if prevTimestamp > 0 { - p.Since = prevTimestamp - p.Total = s.untilValue - value - } else { - p.Since = s.since - p.Total = s.untilValue - s.sinceValue - } - p.Until = s.until - p.EndValue = s.untilValue - - return true -} - -func (s *CumulativeAggregator) groupByDayUsingFHD(timestamp uint32) uint32 { - tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "d") - if tm.Hour() < s.firstHourOfDay { - tm = tm.AddDate(0, 0, -1) - } - return uint32(tm.Unix()) -} - -func (s *CumulativeAggregator) groupByMonthUsingFHD(timestamp uint32) uint32 { - tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m") - if tm.Hour() < s.firstHourOfDay { - tm = tm.AddDate(0, 0, -1) - } - return uint32(tm.Unix()) -} - -func (s *CumulativeAggregator) groupByMonthUsingLDM(timestamp uint32) uint32 { - tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m") - if tm.Day() > s.lastDayOfMonth { - tm = tm.AddDate(0, 1, 0) - } - return uint32(tm.Unix()) -} - -func (s *CumulativeAggregator) groupByMonthUsingFHDAndLDM(timestamp uint32) uint32 { - // ВАЖНО! - // Сперва проверяю время. - tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m") - if tm.Hour() < s.firstHourOfDay { - tm = tm.AddDate(0, 0, -1) - } - if tm.Day() > s.lastDayOfMonth { - tm = tm.AddDate(0, 1, 0) - } - return uint32(tm.Unix()) -} - -func groupByHour(timestamp uint32) uint32 { - return uint32(timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "h").Unix()) -} - -func groupByDay(timestamp uint32) uint32 { - return uint32(timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "d").Unix()) -} - -func groupByMonth(timestamp uint32) uint32 { - return uint32(timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m").Unix()) -} diff --git a/atree/io.go b/atree/io.go index 5bb4492..2d14e40 100644 --- a/atree/io.go +++ b/atree/io.go @@ -8,7 +8,7 @@ import ( "math" "os" - octopus "gordenko.dev/dima/diploma" + diploma "gordenko.dev/dima/diploma" "gordenko.dev/dima/diploma/atree/redo" "gordenko.dev/dima/diploma/bin" ) @@ -74,8 +74,8 @@ func (s *Atree) releaseIndexPage(pageNo uint32) { p.ReferenceCount-- return } else { - octopus.Abort( - octopus.ReferenceCountBug, + diploma.Abort( + diploma.ReferenceCountBug, fmt.Errorf("call releaseIndexPage on page %d with reference count = %d", pageNo, p.ReferenceCount), ) @@ -98,7 +98,7 @@ func (s *Atree) allocIndexPage() AllocatedPage { } else { s.mutex.Lock() if s.allocatedIndexPagesQty == math.MaxUint32 { - octopus.Abort(octopus.MaxAtreeSizeExceeded, + diploma.Abort(diploma.MaxAtreeSizeExceeded, errors.New("no space in Atree index")) } s.allocatedIndexPagesQty++ @@ -163,8 +163,8 @@ func (s *Atree) releaseDataPage(pageNo uint32) { p.ReferenceCount-- return } else { - octopus.Abort( - octopus.ReferenceCountBug, + diploma.Abort( + diploma.ReferenceCountBug, fmt.Errorf("call releaseDataPage on page %d with reference count = %d", pageNo, p.ReferenceCount), ) @@ -186,7 +186,7 @@ func (s *Atree) allocDataPage() AllocatedPage { } else { s.mutex.Lock() if s.allocatedDataPagesQty == math.MaxUint32 { - octopus.Abort(octopus.MaxAtreeSizeExceeded, + diploma.Abort(diploma.MaxAtreeSizeExceeded, errors.New("no space in Atree index")) } s.allocatedDataPagesQty++ @@ -303,7 +303,7 @@ func (s *Atree) pageWriter() { case <-s.writeSignalCh: err := s.writeTasks() if err != nil { - octopus.Abort(octopus.WriteToAtreeFailed, err) + diploma.Abort(diploma.WriteToAtreeFailed, err) } } } diff --git a/atree/select.go b/atree/select.go index ef002bc..ce2fb2e 100644 --- a/atree/select.go +++ b/atree/select.go @@ -3,81 +3,16 @@ package atree import ( "fmt" - octopus "gordenko.dev/dima/diploma" + "gordenko.dev/dima/diploma" ) -type IterateAllCumulativeByTreeCursorReq struct { +type ContinueFullScanReq struct { FracDigits byte - PageNo uint32 - EndTimestamp uint32 - EndValue float64 - ResponseWriter *CumulativeMeasureWriter -} - -func (s *Atree) IterateAllCumulativeByTreeCursor(req IterateAllCumulativeByTreeCursorReq) error { - buf, err := s.fetchDataPage(req.PageNo) - if err != nil { - return err - } - - treeCursor, err := NewBackwardCursor(BackwardCursorOptions{ - PageNo: req.PageNo, - PageData: buf, - Atree: s, - FracDigits: req.FracDigits, - MetricType: octopus.Cumulative, - }) - if err != nil { - return err - } - defer treeCursor.Close() - - var ( - endTimestamp = req.EndTimestamp - endValue = req.EndValue - ) - - for { - timestamp, value, done, err := treeCursor.Prev() - if err != nil { - return err - } - - if done { - err := req.ResponseWriter.WriteMeasure(CumulativeMeasure{ - Timestamp: endTimestamp, - Value: endValue, - Total: endValue, - }) - if err != nil { - return err - } - return nil - } - err = req.ResponseWriter.WriteMeasure(CumulativeMeasure{ - Timestamp: endTimestamp, - Value: endValue, - Total: endValue - value, - }) - if err != nil { - return err - } - endTimestamp = timestamp - endValue = value - } -} - -type ContinueIterateCumulativeByTreeCursorReq struct { - FracDigits byte - Since uint32 - Until uint32 + ResponseWriter AtreeMeasureConsumer LastPageNo uint32 - EndTimestamp uint32 - EndValue float64 - ResponseWriter *CumulativeMeasureWriter } -func (s *Atree) ContinueIterateCumulativeByTreeCursor(req ContinueIterateCumulativeByTreeCursorReq) error { +func (s *Atree) ContinueFullScan(req ContinueFullScanReq) error { buf, err := s.fetchDataPage(req.LastPageNo) if err != nil { return fmt.Errorf("fetchDataPage(%d): %s", req.LastPageNo, err) @@ -88,147 +23,7 @@ func (s *Atree) ContinueIterateCumulativeByTreeCursor(req ContinueIterateCumulat PageData: buf, Atree: s, FracDigits: req.FracDigits, - MetricType: octopus.Cumulative, - }) - if err != nil { - return err - } - defer treeCursor.Close() - - var ( - endTimestamp = req.EndTimestamp - endValue = req.EndValue - ) - - for { - timestamp, value, done, err := treeCursor.Prev() - if err != nil { - return err - } - - if done { - err := req.ResponseWriter.WriteMeasure(CumulativeMeasure{ - Timestamp: endTimestamp, - Value: endValue, - Total: endValue, - }) - if err != nil { - return err - } - return nil - } - - if timestamp <= req.Until { - err := req.ResponseWriter.WriteMeasure(CumulativeMeasure{ - Timestamp: endTimestamp, - Value: endValue, - Total: endValue - value, - }) - if err != nil { - return err - } - if timestamp < req.Since { - return nil - } - } else { - // bug panic - panic("continue cumulative but timestamp > req.Until") - } - } -} - -type FindAndIterateCumulativeByTreeCursorReq struct { - FracDigits byte - Since uint32 - Until uint32 - RootPageNo uint32 - ResponseWriter *CumulativeMeasureWriter -} - -func (s *Atree) FindAndIterateCumulativeByTreeCursor(req FindAndIterateCumulativeByTreeCursorReq) error { - pageNo, buf, err := s.findDataPage(req.RootPageNo, req.Until) - if err != nil { - return err - } - - treeCursor, err := NewBackwardCursor(BackwardCursorOptions{ - PageNo: pageNo, - PageData: buf, - Atree: s, - FracDigits: req.FracDigits, - MetricType: octopus.Cumulative, - }) - if err != nil { - return err - } - defer treeCursor.Close() - - var ( - endTimestamp uint32 - endValue float64 - ) - - for { - timestamp, value, done, err := treeCursor.Prev() - if err != nil { - return err - } - - if done { - if endTimestamp > 0 { - err := req.ResponseWriter.WriteMeasure(CumulativeMeasure{ - Timestamp: endTimestamp, - Value: endValue, - Total: endValue, - }) - if err != nil { - return err - } - } - return nil - } - - if timestamp > req.Until { - continue - } - - if endTimestamp > 0 { - err := req.ResponseWriter.WriteMeasure(CumulativeMeasure{ - Timestamp: endTimestamp, - Value: endValue, - Total: endValue - value, - }) - if err != nil { - return err - } - } - endTimestamp = timestamp - endValue = value - - if timestamp < req.Since { - return nil - } - } -} - -type IterateAllInstantByTreeCursorReq struct { - FracDigits byte - PageNo uint32 - ResponseWriter *InstantMeasureWriter -} - -func (s *Atree) IterateAllInstantByTreeCursor(req IterateAllInstantByTreeCursorReq) error { - buf, err := s.fetchDataPage(req.PageNo) - if err != nil { - return err - } - - treeCursor, err := NewBackwardCursor(BackwardCursorOptions{ - PageNo: req.PageNo, - PageData: buf, - Atree: s, - FracDigits: req.FracDigits, - MetricType: octopus.Instant, + MetricType: diploma.Instant, }) if err != nil { return err @@ -240,30 +35,21 @@ func (s *Atree) IterateAllInstantByTreeCursor(req IterateAllInstantByTreeCursorR if err != nil { return err } - if done { return nil } - - err = req.ResponseWriter.WriteMeasure(InstantMeasure{ - Timestamp: timestamp, - Value: value, - }) - if err != nil { - return err - } + req.ResponseWriter.Feed(timestamp, value) } } -type ContinueIterateInstantByTreeCursorReq struct { +type ContinueRangeScanReq struct { FracDigits byte - Since uint32 - Until uint32 + ResponseWriter AtreeMeasureConsumer LastPageNo uint32 - ResponseWriter *InstantMeasureWriter + Since uint32 } -func (s *Atree) ContinueIterateInstantByTreeCursor(req ContinueIterateInstantByTreeCursorReq) error { +func (s *Atree) ContinueRangeScan(req ContinueRangeScanReq) error { buf, err := s.fetchDataPage(req.LastPageNo) if err != nil { return fmt.Errorf("fetchDataPage(%d): %s", req.LastPageNo, err) @@ -274,7 +60,7 @@ func (s *Atree) ContinueIterateInstantByTreeCursor(req ContinueIterateInstantByT PageData: buf, Atree: s, FracDigits: req.FracDigits, - MetricType: octopus.Instant, + MetricType: diploma.Instant, }) if err != nil { return err @@ -286,333 +72,57 @@ func (s *Atree) ContinueIterateInstantByTreeCursor(req ContinueIterateInstantByT if err != nil { return err } - if done { - // - записи закончились; return nil } - - if timestamp > req.Until { - panic("continue instant timestamp > req.Until") - } - + req.ResponseWriter.Feed(timestamp, value) if timestamp < req.Since { return nil } - - err = req.ResponseWriter.WriteMeasure(InstantMeasure{ - Timestamp: timestamp, - Value: value, - }) - if err != nil { - return err - } } } -type FindAndIterateInstantByTreeCursorReq struct { +type RangeScanReq struct { FracDigits byte - Since uint32 - Until uint32 - RootPageNo uint32 - ResponseWriter *InstantMeasureWriter -} - -func (s *Atree) FindAndIterateInstantByTreeCursor(req FindAndIterateInstantByTreeCursorReq) error { - pageNo, buf, err := s.findDataPage(req.RootPageNo, req.Until) - if err != nil { - return err - } - - treeCursor, err := NewBackwardCursor(BackwardCursorOptions{ - PageNo: pageNo, - PageData: buf, - Atree: s, - FracDigits: req.FracDigits, - MetricType: octopus.Instant, - }) - if err != nil { - return err - } - defer treeCursor.Close() - - for { - timestamp, value, done, err := treeCursor.Prev() - if err != nil { - return err - } - - if done { - return nil - } - - if timestamp > req.Until { - continue - } - - if timestamp < req.Since { - return nil - } - - err = req.ResponseWriter.WriteMeasure(InstantMeasure{ - Timestamp: timestamp, - Value: value, - }) - if err != nil { - return err - } - } -} - -type ContinueCollectInstantPeriodsReq struct { - FracDigits byte - Aggregator *InstantAggregator - ResponseWriter *InstantPeriodsWriter - LastPageNo uint32 - Since uint32 - Until uint32 -} - -func (s *Atree) ContinueCollectInstantPeriods(req ContinueCollectInstantPeriodsReq) error { - buf, err := s.fetchDataPage(req.LastPageNo) - if err != nil { - return fmt.Errorf("fetchDataPage(%d): %s", req.LastPageNo, err) - } - - treeCursor, err := NewBackwardCursor(BackwardCursorOptions{ - PageNo: req.LastPageNo, - PageData: buf, - Atree: s, - FracDigits: req.FracDigits, - MetricType: octopus.Instant, - }) - if err != nil { - return err - } - defer treeCursor.Close() - - var period InstantPeriod - - for { - timestamp, value, done, err := treeCursor.Prev() - if err != nil { - return err - } - - if done || timestamp < req.Since { - isCompleted := req.Aggregator.FillPeriod(timestamp, &period) - if isCompleted { - err := req.ResponseWriter.WritePeriod(period) - if err != nil { - return err - } - } - return nil - } - - if timestamp <= req.Until { - isCompleted := req.Aggregator.Feed(timestamp, value, &period) - if isCompleted { - err := req.ResponseWriter.WritePeriod(period) - if err != nil { - return err - } - } - } - } -} - -type FindInstantPeriodsReq struct { - FracDigits byte - ResponseWriter *InstantPeriodsWriter + ResponseWriter AtreeMeasureConsumer RootPageNo uint32 Since uint32 Until uint32 - GroupBy octopus.GroupBy - FirstHourOfDay int - LastDayOfMonth int } -func (s *Atree) FindInstantPeriods(req FindInstantPeriodsReq) error { +func (s *Atree) RangeScan(req RangeScanReq) error { pageNo, buf, err := s.findDataPage(req.RootPageNo, req.Until) if err != nil { return err } - aggregator, err := NewInstantAggregator(InstantAggregatorOptions{ - GroupBy: req.GroupBy, - FirstHourOfDay: req.FirstHourOfDay, - LastDayOfMonth: req.LastDayOfMonth, - }) - if err != nil { - return err - } - cursor, err := NewBackwardCursor(BackwardCursorOptions{ PageNo: pageNo, PageData: buf, Atree: s, FracDigits: req.FracDigits, - MetricType: octopus.Instant, + MetricType: diploma.Instant, }) if err != nil { return err } defer cursor.Close() - var period InstantPeriod - for { timestamp, value, done, err := cursor.Prev() if err != nil { return err } - - if done || timestamp < req.Since { - isCompleted := aggregator.FillPeriod(timestamp, &period) - if isCompleted { - err := req.ResponseWriter.WritePeriod(period) - if err != nil { - return err - } - } - return nil - } - - if timestamp <= req.Until { - isCompleted := aggregator.Feed(timestamp, value, &period) - if isCompleted { - err := req.ResponseWriter.WritePeriod(period) - if err != nil { - return err - } - } - } - } -} - -type FindCumulativePeriodsReq struct { - FracDigits byte - ResponseWriter *CumulativePeriodsWriter - RootPageNo uint32 - Since uint32 - Until uint32 - GroupBy octopus.GroupBy - FirstHourOfDay int - LastDayOfMonth int -} - -func (s *Atree) FindCumulativePeriods(req FindCumulativePeriodsReq) error { - pageNo, buf, err := s.findDataPage(req.RootPageNo, req.Until) - if err != nil { - return err - } - - aggregator, err := NewCumulativeAggregator(CumulativeAggregatorOptions{ - GroupBy: req.GroupBy, - FirstHourOfDay: req.FirstHourOfDay, - LastDayOfMonth: req.LastDayOfMonth, - }) - if err != nil { - return err - } - - cursor, err := NewBackwardCursor(BackwardCursorOptions{ - PageNo: pageNo, - PageData: buf, - Atree: s, - FracDigits: req.FracDigits, - MetricType: octopus.Cumulative, - }) - if err != nil { - return err - } - defer cursor.Close() - - var period CumulativePeriod - - for { - timestamp, value, done, err := cursor.Prev() - if err != nil { - return err - } - - if done || timestamp < req.Since { - isCompleted := aggregator.FillPeriod(timestamp, value, &period) - if isCompleted { - err := req.ResponseWriter.WritePeriod(period) - if err != nil { - return err - } - } + if done { return nil } if timestamp <= req.Until { - isCompleted := aggregator.Feed(timestamp, value, &period) - if isCompleted { - err := req.ResponseWriter.WritePeriod(period) - if err != nil { - return err - } - } - } - } -} - -type ContinueCollectCumulativePeriodsReq struct { - FracDigits byte - Aggregator *CumulativeAggregator - ResponseWriter *CumulativePeriodsWriter - LastPageNo uint32 - Since uint32 - Until uint32 -} - -func (s *Atree) ContinueCollectCumulativePeriods(req ContinueCollectCumulativePeriodsReq) error { - buf, err := s.fetchDataPage(req.LastPageNo) - if err != nil { - return fmt.Errorf("fetchDataPage(%d): %s", req.LastPageNo, err) - } - - treeCursor, err := NewBackwardCursor(BackwardCursorOptions{ - PageNo: req.LastPageNo, - PageData: buf, - Atree: s, - FracDigits: req.FracDigits, - MetricType: octopus.Cumulative, - }) - if err != nil { - return err - } - defer treeCursor.Close() + req.ResponseWriter.Feed(timestamp, value) - var period CumulativePeriod - - for { - timestamp, value, done, err := treeCursor.Prev() - if err != nil { - return err - } - - if done || timestamp < req.Since { - isCompleted := req.Aggregator.FillPeriod(timestamp, value, &period) - if isCompleted { - err := req.ResponseWriter.WritePeriod(period) - if err != nil { - return err - } - } - return nil - } - - if timestamp <= req.Until { - isCompleted := req.Aggregator.Feed(timestamp, value, &period) - if isCompleted { - err := req.ResponseWriter.WritePeriod(period) - if err != nil { - return err - } + if timestamp < req.Since { + // - записи, удовлетворяющие временным рамкам, закончились. + return nil } } } diff --git a/atree/writers.go b/atree/writers.go index 06b59ae..2fd52e0 100644 --- a/atree/writers.go +++ b/atree/writers.go @@ -1,306 +1,15 @@ package atree -import ( - "bytes" - "fmt" - "io" - - octopus "gordenko.dev/dima/diploma" - "gordenko.dev/dima/diploma/bin" - "gordenko.dev/dima/diploma/proto" -) - -// CURRENT VALUE WRITER - -type CurrentValue struct { - MetricID uint32 - Timestamp uint32 - Value float64 -} - -type CurrentValueWriter struct { - arr []byte - responder *ChunkedResponder -} - -func NewCurrentValueWriter(dst io.Writer) *CurrentValueWriter { - return &CurrentValueWriter{ - arr: make([]byte, 16), - responder: NewChunkedResponder(dst), - } -} - -func (s *CurrentValueWriter) BufferValue(m CurrentValue) { - bin.PutUint32(s.arr[0:], m.MetricID) - bin.PutUint32(s.arr[4:], m.Timestamp) - bin.PutFloat64(s.arr[8:], m.Value) - s.responder.BufferRecord(s.arr) -} - -func (s *CurrentValueWriter) Close() error { - return s.responder.Flush() -} - -// INSTANT MEASURE WRITER - -type InstantMeasure struct { - Timestamp uint32 - Value float64 -} - -type InstantMeasureWriter struct { - arr []byte - responder *ChunkedResponder -} - -func NewInstantMeasureWriter(dst io.Writer) *InstantMeasureWriter { - return &InstantMeasureWriter{ - arr: make([]byte, 12), - responder: NewChunkedResponder(dst), - } -} - -func (s *InstantMeasureWriter) BufferMeasure(m InstantMeasure) { - bin.PutUint32(s.arr[0:], m.Timestamp) - bin.PutFloat64(s.arr[4:], m.Value) - s.responder.BufferRecord(s.arr) -} - -func (s *InstantMeasureWriter) WriteMeasure(m InstantMeasure) error { - bin.PutUint32(s.arr[0:], m.Timestamp) - bin.PutFloat64(s.arr[4:], m.Value) - return s.responder.AppendRecord(s.arr) -} - -func (s *InstantMeasureWriter) Close() error { - return s.responder.Flush() -} - -// CUMULATIVE MEASURE WRITER - -type CumulativeMeasure struct { - Timestamp uint32 - Value float64 - Total float64 -} - -type CumulativeMeasureWriter struct { - arr []byte - responder *ChunkedResponder -} - -func NewCumulativeMeasureWriter(dst io.Writer) *CumulativeMeasureWriter { - return &CumulativeMeasureWriter{ - arr: make([]byte, 20), - responder: NewChunkedResponder(dst), - } -} - -func (s *CumulativeMeasureWriter) BufferMeasure(m CumulativeMeasure) { - bin.PutUint32(s.arr[0:], m.Timestamp) - bin.PutFloat64(s.arr[4:], m.Value) - bin.PutFloat64(s.arr[12:], m.Total) - s.responder.BufferRecord(s.arr) -} - -func (s *CumulativeMeasureWriter) WriteMeasure(m CumulativeMeasure) error { - bin.PutUint32(s.arr[0:], m.Timestamp) - bin.PutFloat64(s.arr[4:], m.Value) - bin.PutFloat64(s.arr[12:], m.Total) - return s.responder.AppendRecord(s.arr) -} - -func (s *CumulativeMeasureWriter) Close() error { - return s.responder.Flush() -} - -// INSTANT AGGREGATE WRITER - -type InstantPeriodsWriter struct { - aggregateFuncs byte - arr []byte - responder *ChunkedResponder -} - -func NewInstantPeriodsWriter(dst io.Writer, aggregateFuncs byte) *InstantPeriodsWriter { - var q int - if (aggregateFuncs & octopus.AggregateMin) == octopus.AggregateMin { - q++ - } - if (aggregateFuncs & octopus.AggregateMax) == octopus.AggregateMax { - q++ - } - if (aggregateFuncs & octopus.AggregateAvg) == octopus.AggregateAvg { - q++ - } - return &InstantPeriodsWriter{ - aggregateFuncs: aggregateFuncs, - arr: make([]byte, 12+q*8), - responder: NewChunkedResponder(dst), - } -} - -type InstantPeriod struct { - Period uint32 - Since uint32 - Until uint32 - Min float64 - Max float64 - Avg float64 -} - -func (s *InstantPeriodsWriter) BufferMeasure(p InstantPeriod) { - s.pack(p) - s.responder.BufferRecord(s.arr) -} - -func (s *InstantPeriodsWriter) WritePeriod(p InstantPeriod) error { - s.pack(p) - return s.responder.AppendRecord(s.arr) -} - -func (s *InstantPeriodsWriter) Close() error { - return s.responder.Flush() -} - -func (s *InstantPeriodsWriter) pack(p InstantPeriod) { - bin.PutUint32(s.arr[0:], p.Period) - bin.PutUint32(s.arr[4:], p.Since) - bin.PutUint32(s.arr[8:], p.Until) - - pos := 12 - if (s.aggregateFuncs & octopus.AggregateMin) == octopus.AggregateMin { - bin.PutFloat64(s.arr[pos:], p.Min) - pos += 8 - } - if (s.aggregateFuncs & octopus.AggregateMax) == octopus.AggregateMax { - bin.PutFloat64(s.arr[pos:], p.Max) - pos += 8 - } - if (s.aggregateFuncs & octopus.AggregateAvg) == octopus.AggregateAvg { - bin.PutFloat64(s.arr[pos:], p.Avg) - } -} - -// CUMULATIVE AGGREGATE WRITER - -type CumulativePeriodsWriter struct { - arr []byte - responder *ChunkedResponder -} - -func NewCumulativePeriodsWriter(dst io.Writer) *CumulativePeriodsWriter { - return &CumulativePeriodsWriter{ - arr: make([]byte, 28), - responder: NewChunkedResponder(dst), - } -} - -type CumulativePeriod struct { - Period uint32 - Since uint32 - Until uint32 - EndValue float64 - Total float64 -} - -func (s *CumulativePeriodsWriter) BufferMeasure(p CumulativePeriod) { - s.pack(p) - s.responder.BufferRecord(s.arr) -} - -func (s *CumulativePeriodsWriter) WritePeriod(p CumulativePeriod) error { - s.pack(p) - return s.responder.AppendRecord(s.arr) -} - -func (s *CumulativePeriodsWriter) Close() error { - return s.responder.Flush() -} - -func (s *CumulativePeriodsWriter) pack(p CumulativePeriod) { - bin.PutUint32(s.arr[0:], p.Period) - bin.PutUint32(s.arr[4:], p.Since) - bin.PutUint32(s.arr[8:], p.Until) - bin.PutFloat64(s.arr[12:], p.EndValue) - bin.PutFloat64(s.arr[20:], p.Total) -} - -// CHUNKED RESPONDER - -//const headerSize = 3 - -var endMsg = []byte{ - proto.RespEndOfValue, // end of stream -} - -type ChunkedResponder struct { - recordsQty int - buf *bytes.Buffer - dst io.Writer -} - -func NewChunkedResponder(dst io.Writer) *ChunkedResponder { - s := &ChunkedResponder{ - recordsQty: 0, - buf: bytes.NewBuffer(nil), - dst: dst, - } - - s.buf.Write([]byte{ - proto.RespPartOfValue, // message type - 0, 0, 0, 0, // records qty - }) - return s -} - -func (s *ChunkedResponder) BufferRecord(rec []byte) { - s.buf.Write(rec) - s.recordsQty++ -} - -func (s *ChunkedResponder) AppendRecord(rec []byte) error { - s.buf.Write(rec) - s.recordsQty++ - - if s.buf.Len() < 1500 { - return nil - } - - if err := s.sendBuffered(); err != nil { - return err - } - - s.buf.Write([]byte{ - proto.RespPartOfValue, // message type - 0, 0, 0, 0, // records qty - }) - s.recordsQty = 0 - return nil +type PeriodsWriter interface { + Feed(uint32, float64) + FeedNoSend(uint32, float64) + Close() error } -func (s *ChunkedResponder) Flush() error { - if s.recordsQty > 0 { - if err := s.sendBuffered(); err != nil { - return err - } - } - if _, err := s.dst.Write(endMsg); err != nil { - return err - } - return nil +type WorkerMeasureConsumer interface { + FeedNoSend(uint32, float64) } -func (s *ChunkedResponder) sendBuffered() (err error) { - msg := s.buf.Bytes() - bin.PutUint32(msg[1:], uint32(s.recordsQty)) - n, err := s.dst.Write(msg) - if err != nil { - return - } - if n != len(msg) { - return fmt.Errorf("incomplete write %d bytes instead of %d", n, len(msg)) - } - s.buf.Reset() - return +type AtreeMeasureConsumer interface { + Feed(uint32, float64) } diff --git a/client/client.go b/client/client.go index ee6fa1f..1151d6d 100644 --- a/client/client.go +++ b/client/client.go @@ -66,13 +66,7 @@ func (s *Connection) mustSuccess(reader *bufreader.BufferedReader) (err error) { } } -type Metric struct { - MetricID uint32 - MetricType diploma.MetricType - FracDigits byte -} - -func (s *Connection) AddMetric(req Metric) error { +func (s *Connection) AddMetric(req proto.AddMetricReq) error { arr := []byte{ proto.TypeAddMetric, 0, 0, 0, 0, // @@ -87,7 +81,22 @@ func (s *Connection) AddMetric(req Metric) error { return s.mustSuccess(s.src) } -func (s *Connection) GetMetric(metricID uint32) (*Metric, error) { +// +// func (s *Connection) UpdateMetric(req Metric) error { +// arr := []byte{ +// proto.TypeUpdateMetric, +// 0, 0, 0, 0, // +// byte(req.FracDigits), +// } +// bin.PutUint32(arr[1:], req.MetricID) + +// if _, err := s.conn.Write(arr); err != nil { +// return err +// } +// return s.mustSuccess(s.src) +// } + +func (s *Connection) GetMetric(metricID uint32) (*proto.Metric, error) { arr := []byte{ proto.TypeGetMetric, 0, 0, 0, 0, @@ -110,10 +119,10 @@ func (s *Connection) GetMetric(metricID uint32) (*Metric, error) { return nil, fmt.Errorf("read body: %s", err) } - return &Metric{ + return &proto.Metric{ MetricID: bin.GetUint32(arr), MetricType: diploma.MetricType(arr[4]), - FracDigits: arr[5], + FracDigits: int(arr[5]), }, nil case proto.RespError: @@ -137,13 +146,7 @@ func (s *Connection) DeleteMetric(metricID uint32) error { return s.mustSuccess(s.src) } -type AppendMeasureReq struct { - MetricID uint32 - Timestamp uint32 - Value float64 -} - -func (s *Connection) AppendMeasure(req AppendMeasureReq) (err error) { +func (s *Connection) AppendMeasure(req proto.AppendMeasureReq) (err error) { arr := []byte{ proto.TypeAppendMeasure, 0, 0, 0, 0, // metricID @@ -160,17 +163,7 @@ func (s *Connection) AppendMeasure(req AppendMeasureReq) (err error) { return s.mustSuccess(s.src) } -type AppendMeasuresReq struct { - MetricID uint32 - Measures []Measure -} - -type Measure struct { - Timestamp uint32 - Value float64 -} - -func (s *Connection) AppendMeasures(req AppendMeasuresReq) (err error) { +func (s *Connection) AppendMeasures(req proto.AppendMeasuresReq) (err error) { if len(req.Measures) > 65535 { return fmt.Errorf("wrong measures qty: %d", len(req.Measures)) } @@ -191,15 +184,74 @@ func (s *Connection) AppendMeasures(req AppendMeasuresReq) (err error) { if _, err := s.conn.Write(arr); err != nil { return err } + + //fmt.Printf("encode measures: %d\n", len(req.Measures)) return s.mustSuccess(s.src) } -type InstantMeasure struct { - Timestamp uint32 - Value float64 +// type AppendMeasurePerMetricReq struct { +// MetricID uint32 +// Measures []Measure +// } + +func (s *Connection) AppendMeasurePerMetric(list []proto.MetricMeasure) (_ []proto.AppendError, err error) { + if len(list) > 65535 { + return nil, fmt.Errorf("wrong measures qty: %d", len(list)) + } + var ( + // 3 bytes: 1b message type + 2b records qty + fixedSize = 3 + recordSize = 16 + arr = make([]byte, fixedSize+len(list)*recordSize) + ) + arr[0] = proto.TypeAppendMeasures + bin.PutUint16(arr[1:], uint16(len(list))) + pos := fixedSize + for _, item := range list { + bin.PutUint32(arr[pos:], item.MetricID) + bin.PutUint32(arr[pos+4:], item.Timestamp) + bin.PutFloat64(arr[pos+8:], item.Value) + pos += recordSize + } + if _, err := s.conn.Write(arr); err != nil { + return nil, err + } + + code, err := s.src.ReadByte() + if err != nil { + return nil, fmt.Errorf("read response code: %s", err) + } + + switch code { + case proto.RespValue: + var ( + qty uint16 + appendErrors []proto.AppendError + ) + qty, err = bin.ReadUint16(s.src) + if err != nil { + return + } + for range qty { + var ae proto.AppendError + ae.MetricID, err = bin.ReadUint32(s.src) + if err != nil { + return + } + ae.ErrorCode, err = bin.ReadUint16(s.src) + if err != nil { + return + } + appendErrors = append(appendErrors, ae) + } + return appendErrors, nil + + default: + return nil, fmt.Errorf("unknown reponse code %d", code) + } } -func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]InstantMeasure, error) { +func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]proto.InstantMeasure, error) { arr := []byte{ proto.TypeListAllInstantMeasures, 0, 0, 0, 0, // metricID @@ -211,7 +263,7 @@ func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]InstantMeasure, } var ( - result []InstantMeasure + result []proto.InstantMeasure tmp = make([]byte, 12) ) @@ -221,6 +273,8 @@ func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]InstantMeasure, return nil, fmt.Errorf("read response code: %s", err) } + //fmt.Printf("code: %d\n", code) + switch code { case proto.RespPartOfValue: q, err := bin.ReadUint32(s.src) @@ -228,13 +282,15 @@ func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]InstantMeasure, return nil, fmt.Errorf("read records qty: %s", err) } + //fmt.Printf("q: %d\n", q) + for i := range int(q) { err = bin.ReadNInto(s.src, tmp) if err != nil { return nil, fmt.Errorf("read record #%d: %s", i, err) } - result = append(result, InstantMeasure{ + result = append(result, proto.InstantMeasure{ Timestamp: bin.GetUint32(tmp), Value: bin.GetFloat64(tmp[4:]), }) @@ -252,13 +308,12 @@ func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]InstantMeasure, } } -func (s *Connection) ListInstantMeasures(req proto.ListInstantMeasuresReq) ([]InstantMeasure, error) { +func (s *Connection) ListInstantMeasures(req proto.ListInstantMeasuresReq) ([]proto.InstantMeasure, error) { arr := []byte{ proto.TypeListInstantMeasures, 0, 0, 0, 0, // metricID 0, 0, 0, 0, // since 0, 0, 0, 0, // until - byte(req.FirstHourOfDay), } bin.PutUint32(arr[1:], req.MetricID) bin.PutUint32(arr[5:], req.Since) @@ -269,7 +324,7 @@ func (s *Connection) ListInstantMeasures(req proto.ListInstantMeasuresReq) ([]In } var ( - result []InstantMeasure + result []proto.InstantMeasure tmp = make([]byte, 12) ) @@ -292,7 +347,7 @@ func (s *Connection) ListInstantMeasures(req proto.ListInstantMeasuresReq) ([]In return nil, fmt.Errorf("read record #%d: %s", i, err) } - result = append(result, InstantMeasure{ + result = append(result, proto.InstantMeasure{ Timestamp: bin.GetUint32(tmp), Value: bin.GetFloat64(tmp[4:]), }) @@ -310,13 +365,7 @@ func (s *Connection) ListInstantMeasures(req proto.ListInstantMeasuresReq) ([]In } } -type CumulativeMeasure struct { - Timestamp uint32 - Value float64 - Total float64 -} - -func (s *Connection) ListAllCumulativeMeasures(metricID uint32) ([]CumulativeMeasure, error) { +func (s *Connection) ListAllCumulativeMeasures(metricID uint32) ([]proto.CumulativeMeasure, error) { arr := []byte{ proto.TypeListAllCumulativeMeasures, 0, 0, 0, 0, // metricID @@ -328,7 +377,7 @@ func (s *Connection) ListAllCumulativeMeasures(metricID uint32) ([]CumulativeMea } var ( - result []CumulativeMeasure + result []proto.CumulativeMeasure tmp = make([]byte, 20) ) @@ -338,6 +387,8 @@ func (s *Connection) ListAllCumulativeMeasures(metricID uint32) ([]CumulativeMea return nil, fmt.Errorf("read response code: %s", err) } + //fmt.Printf("code: %d\n", code) + switch code { case proto.RespPartOfValue: q, err := bin.ReadUint32(s.src) @@ -351,11 +402,19 @@ func (s *Connection) ListAllCumulativeMeasures(metricID uint32) ([]CumulativeMea return nil, fmt.Errorf("read record #%d: %s", i, err) } - result = append(result, CumulativeMeasure{ + //fmt.Printf("tmp: %d\n", tmp) + + result = append(result, proto.CumulativeMeasure{ Timestamp: bin.GetUint32(tmp), Value: bin.GetFloat64(tmp[4:]), Total: bin.GetFloat64(tmp[12:]), }) + + // pretty.PPrintln("measure", CumulativeMeasure{ + // Timestamp: bin.GetUint32(tmp), + // Value: bin.GetFloat64(tmp[4:]), + // Total: bin.GetFloat64(tmp[12:]), + // }) } case proto.RespEndOfValue: @@ -370,13 +429,12 @@ func (s *Connection) ListAllCumulativeMeasures(metricID uint32) ([]CumulativeMea } } -func (s *Connection) ListCumulativeMeasures(req proto.ListCumulativeMeasuresReq) ([]CumulativeMeasure, error) { +func (s *Connection) ListCumulativeMeasures(req proto.ListCumulativeMeasuresReq) ([]proto.CumulativeMeasure, error) { arr := []byte{ proto.TypeListCumulativeMeasures, 0, 0, 0, 0, // metricID 0, 0, 0, 0, // since 0, 0, 0, 0, // until - byte(req.FirstHourOfDay), } bin.PutUint32(arr[1:], req.MetricID) bin.PutUint32(arr[5:], req.Since) @@ -387,7 +445,7 @@ func (s *Connection) ListCumulativeMeasures(req proto.ListCumulativeMeasuresReq) } var ( - result []CumulativeMeasure + result []proto.CumulativeMeasure tmp = make([]byte, 20) ) @@ -410,7 +468,7 @@ func (s *Connection) ListCumulativeMeasures(req proto.ListCumulativeMeasuresReq) return nil, fmt.Errorf("read record #%d: %s", i, err) } - result = append(result, CumulativeMeasure{ + result = append(result, proto.CumulativeMeasure{ Timestamp: bin.GetUint32(tmp), Value: bin.GetFloat64(tmp[4:]), Total: bin.GetFloat64(tmp[12:]), @@ -429,16 +487,7 @@ func (s *Connection) ListCumulativeMeasures(req proto.ListCumulativeMeasuresReq) } } -type InstantPeriod struct { - Period uint32 - Since uint32 - Until uint32 - Min float64 - Max float64 - Avg float64 -} - -func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]InstantPeriod, error) { +func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]proto.InstantPeriod, error) { arr := []byte{ proto.TypeListInstantPeriods, 0, 0, 0, 0, // metricID @@ -447,11 +496,14 @@ func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]Inst byte(req.GroupBy), req.AggregateFuncs, byte(req.FirstHourOfDay), - byte(req.LastDayOfMonth), } bin.PutUint32(arr[1:], req.MetricID) - bin.PutUint32(arr[5:], req.Since) - bin.PutUint32(arr[9:], req.Until) + bin.PutUint16(arr[5:], uint16(req.Since.Year)) + arr[7] = byte(req.Since.Month) + arr[8] = byte(req.Since.Day) + bin.PutUint16(arr[9:], uint16(req.Until.Year)) + arr[11] = byte(req.Until.Month) + arr[12] = byte(req.Until.Day) if _, err := s.conn.Write(arr); err != nil { return nil, err @@ -469,7 +521,7 @@ func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]Inst } var ( - result []InstantPeriod + result []proto.InstantPeriod // 12 bytes - period, since, until // q * 8 bytes - min, max, avg tmp = make([]byte, 12+q*8) @@ -481,6 +533,8 @@ func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]Inst return nil, fmt.Errorf("read response code: %s", err) } + //fmt.Printf("code: %d\n", code) + switch code { case proto.RespPartOfValue: q, err := bin.ReadUint32(s.src) @@ -495,7 +549,7 @@ func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]Inst } var ( - p = InstantPeriod{ + p = proto.InstantPeriod{ Period: bin.GetUint32(tmp[0:]), Since: bin.GetUint32(tmp[4:]), Until: bin.GetUint32(tmp[8:]), @@ -530,15 +584,7 @@ func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]Inst } } -type CumulativePeriod struct { - Period uint32 - Since uint32 - Until uint32 - EndValue float64 - Total float64 -} - -func (s *Connection) ListCumulativePeriods(req proto.ListCumulativePeriodsReq) ([]CumulativePeriod, error) { +func (s *Connection) ListCumulativePeriods(req proto.ListCumulativePeriodsReq) ([]proto.CumulativePeriod, error) { arr := []byte{ proto.TypeListCumulativePeriods, 0, 0, 0, 0, // metricID @@ -546,18 +592,21 @@ func (s *Connection) ListCumulativePeriods(req proto.ListCumulativePeriodsReq) ( 0, 0, 0, 0, // until byte(req.GroupBy), byte(req.FirstHourOfDay), - byte(req.LastDayOfMonth), } bin.PutUint32(arr[1:], req.MetricID) - bin.PutUint32(arr[5:], req.Since) - bin.PutUint32(arr[9:], req.Until) + bin.PutUint16(arr[5:], uint16(req.Since.Year)) + arr[7] = byte(req.Since.Month) + arr[8] = byte(req.Since.Day) + bin.PutUint16(arr[9:], uint16(req.Until.Year)) + arr[11] = byte(req.Until.Month) + arr[12] = byte(req.Until.Day) if _, err := s.conn.Write(arr); err != nil { return nil, err } var ( - result []CumulativePeriod + result []proto.CumulativePeriod tmp = make([]byte, 28) ) @@ -579,7 +628,7 @@ func (s *Connection) ListCumulativePeriods(req proto.ListCumulativePeriodsReq) ( if err != nil { return nil, fmt.Errorf("read record #%d: %s", i, err) } - result = append(result, CumulativePeriod{ + result = append(result, proto.CumulativePeriod{ Period: bin.GetUint32(tmp[0:]), Since: bin.GetUint32(tmp[4:]), Until: bin.GetUint32(tmp[8:]), @@ -600,13 +649,7 @@ func (s *Connection) ListCumulativePeriods(req proto.ListCumulativePeriodsReq) ( } } -type CurrentValue struct { - MetricID uint32 - Timestamp uint32 - Value float64 -} - -func (s *Connection) ListCurrentValues(metricIDs []uint32) ([]CurrentValue, error) { +func (s *Connection) ListCurrentValues(metricIDs []uint32) ([]proto.CurrentValue, error) { arr := make([]byte, 3+metricKeySize*len(metricIDs)) arr[0] = proto.TypeListCurrentValues @@ -623,7 +666,7 @@ func (s *Connection) ListCurrentValues(metricIDs []uint32) ([]CurrentValue, erro } var ( - result []CurrentValue + result []proto.CurrentValue tmp = make([]byte, 16) ) @@ -646,7 +689,7 @@ func (s *Connection) ListCurrentValues(metricIDs []uint32) ([]CurrentValue, erro return nil, fmt.Errorf("read record #%d: %s", i, err) } - result = append(result, CurrentValue{ + result = append(result, proto.CurrentValue{ MetricID: bin.GetUint32(tmp), Timestamp: bin.GetUint32(tmp[4:]), Value: bin.GetFloat64(tmp[8:]), @@ -680,55 +723,6 @@ func (s *Connection) DeleteMeasures(req proto.DeleteMeasuresReq) (err error) { return s.mustSuccess(s.src) } -type RangeTotalResp struct { - Since uint32 - SinceValue float64 - Until uint32 - UntilValue float64 -} - -func (s *Connection) RangeTotal(req proto.RangeTotalReq) (*RangeTotalResp, error) { - arr := []byte{ - proto.TypeGetMetric, - 0, 0, 0, 0, - 0, 0, 0, 0, - 0, 0, 0, 0, - } - bin.PutUint32(arr[1:], req.MetricID) - bin.PutUint32(arr[5:], req.Since) - bin.PutUint32(arr[9:], req.MetricID) - - if _, err := s.conn.Write(arr); err != nil { - return nil, err - } - - code, err := s.src.ReadByte() - if err != nil { - return nil, fmt.Errorf("read response code: %s", err) - } - - switch code { - case proto.RespValue: - arr, err := s.src.ReadN(24) - if err != nil { - return nil, fmt.Errorf("read body: %s", err) - } - - return &RangeTotalResp{ - Since: bin.GetUint32(arr), - SinceValue: bin.GetFloat64(arr[4:]), - Until: bin.GetUint32(arr[12:]), - UntilValue: bin.GetFloat64(arr[16:]), - }, nil - - case proto.RespError: - return nil, s.onError() - - default: - return nil, fmt.Errorf("unknown reponse code %d", code) - } -} - func (s *Connection) onError() error { errorCode, err := bin.ReadUint16(s.src) if err != nil { diff --git a/database/api.go b/database/api.go index 70782a5..98940e4 100644 --- a/database/api.go +++ b/database/api.go @@ -13,6 +13,7 @@ import ( "gordenko.dev/dima/diploma/chunkenc" "gordenko.dev/dima/diploma/conbuf" "gordenko.dev/dima/diploma/proto" + "gordenko.dev/dima/diploma/transform" "gordenko.dev/dima/diploma/txlog" ) @@ -326,6 +327,9 @@ type FilledPage struct { } type tryAppendMeasureResult struct { + MetricID uint32 + Timestamp uint32 + Value float64 FilledPage *FilledPage ResultCode byte } @@ -441,7 +445,6 @@ func (s *Database) AppendMeasures(req proto.AppendMeasuresReq) uint16 { ) for idx, measure := range req.Measures { - //fmt.Printf("%d %v\n", measure.Timestamp, measure.Value) if since == 0 { since = measure.Timestamp } else { @@ -470,7 +473,6 @@ func (s *Database) AppendMeasures(req proto.AppendMeasuresReq) uint16 { ) <-waitCh } - //fmt.Printf("m.Value: %v < untilValue: %v\n", measure.Value, untilValue) return proto.ErrNonMonotonicValue } } @@ -593,23 +595,27 @@ func (s *Database) DeleteMeasures(req proto.DeleteMeasuresReq) uint16 { result := <-resultCh switch result.ResultCode { - case Succeed: - var ( - freeDataPages []uint32 - freeIndexPages []uint32 - ) - if result.RootPageNo > 0 { - pageLists, err := s.atree.GetAllPages(result.RootPageNo) - if err != nil { - diploma.Abort(diploma.FailedAtreeRequest, err) - } - freeDataPages = pageLists.DataPages - freeIndexPages = pageLists.IndexPages + case NoMeasuresToDelete: + // ok + + case DeleteFromAtreeNotNeeded: + // регистрирую удаление в TransactionLog + waitCh := s.txlog.WriteDeletedMeasures(txlog.DeletedMeasures{ + MetricID: req.MetricID, + }) + <-waitCh + + case DeleteFromAtreeRequired: + // собираю номера всех data и index страниц метрики (типа запись REDO лога). + pageLists, err := s.atree.GetAllPages(req.MetricID) + if err != nil { + diploma.Abort(diploma.FailedAtreeRequest, err) } + // регистрирую удаление в TransactionLog waitCh := s.txlog.WriteDeletedMeasures(txlog.DeletedMeasures{ MetricID: req.MetricID, - FreeDataPages: freeDataPages, - FreeIndexPages: freeIndexPages, + FreeDataPages: pageLists.DataPages, + FreeIndexPages: pageLists.IndexPages, }) <-waitCh @@ -624,98 +630,32 @@ func (s *Database) DeleteMeasures(req proto.DeleteMeasuresReq) uint16 { // SELECT -type instantMeasuresResult struct { +type fullScanResult struct { ResultCode byte FracDigits byte - PageNo uint32 + LastPageNo uint32 } func (s *Database) ListAllInstantMeasures(conn net.Conn, req proto.ListAllInstantMetricMeasuresReq) error { - resultCh := make(chan instantMeasuresResult, 1) + responseWriter := transform.NewInstantMeasureWriter(conn, 0) - responseWriter := atree.NewInstantMeasureWriter(conn) - - s.appendJobToWorkerQueue(tryListAllInstantMeasuresReq{ + return s.fullScan(fullScanReq{ MetricID: req.MetricID, + MetricType: diploma.Instant, + Conn: conn, ResponseWriter: responseWriter, - ResultCh: resultCh, }) - - result := <-resultCh - - switch result.ResultCode { - case QueryDone: - responseWriter.Close() - - case UntilFound: - err := s.atree.IterateAllInstantByTreeCursor(atree.IterateAllInstantByTreeCursorReq{ - FracDigits: result.FracDigits, - PageNo: result.PageNo, - ResponseWriter: responseWriter, - }) - s.metricRUnlock(req.MetricID) - - if err != nil { - reply(conn, proto.ErrUnexpected) - } else { - responseWriter.Close() - } - - case NoMetric: - reply(conn, proto.ErrNoMetric) - - case WrongMetricType: - reply(conn, proto.ErrWrongMetricType) - - default: - diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) - } - return nil } func (s *Database) ListAllCumulativeMeasures(conn io.Writer, req proto.ListAllCumulativeMeasuresReq) error { - resultCh := make(chan cumulativeMeasuresResult, 1) + responseWriter := transform.NewCumulativeMeasureWriter(conn, 0) - responseWriter := atree.NewCumulativeMeasureWriter(conn) - - s.appendJobToWorkerQueue(tryListAllCumulativeMeasuresReq{ + return s.fullScan(fullScanReq{ MetricID: req.MetricID, + MetricType: diploma.Cumulative, + Conn: conn, ResponseWriter: responseWriter, - ResultCh: resultCh, }) - - result := <-resultCh - - switch result.ResultCode { - case QueryDone: - responseWriter.Close() - - case UntilFound: - err := s.atree.IterateAllCumulativeByTreeCursor(atree.IterateAllCumulativeByTreeCursorReq{ - FracDigits: result.FracDigits, - PageNo: result.PageNo, - EndTimestamp: result.EndTimestamp, - EndValue: result.EndValue, - ResponseWriter: responseWriter, - }) - s.metricRUnlock(req.MetricID) - - if err != nil { - reply(conn, proto.ErrUnexpected) - } else { - responseWriter.Close() - } - - case NoMetric: - reply(conn, proto.ErrNoMetric) - - case WrongMetricType: - reply(conn, proto.ErrWrongMetricType) - - default: - diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) - } - return nil } func (s *Database) ListInstantMeasures(conn net.Conn, req proto.ListInstantMeasuresReq) error { @@ -724,182 +664,116 @@ func (s *Database) ListInstantMeasures(conn net.Conn, req proto.ListInstantMeasu return nil } - var since, until uint32 - if req.FirstHourOfDay > 0 { - since, until = correctToFHD(req.Since, req.Until, req.FirstHourOfDay) - } else { - since = req.Since - until = req.Until - } - - resultCh := make(chan instantMeasuresResult, 1) - responseWriter := atree.NewInstantMeasureWriter(conn) + responseWriter := transform.NewInstantMeasureWriter(conn, req.Since) - s.appendJobToWorkerQueue(tryListInstantMeasuresReq{ + return s.rangeScan(rangeScanReq{ MetricID: req.MetricID, - Since: since, - Until: until, + MetricType: diploma.Instant, + Since: req.Since, + Until: req.Until - 1, + Conn: conn, ResponseWriter: responseWriter, - ResultCh: resultCh, }) - - result := <-resultCh - - switch result.ResultCode { - case QueryDone: - responseWriter.Close() - - case UntilFound: - err := s.atree.ContinueIterateInstantByTreeCursor(atree.ContinueIterateInstantByTreeCursorReq{ - FracDigits: result.FracDigits, - Since: since, - Until: until, - LastPageNo: result.PageNo, - ResponseWriter: responseWriter, - }) - s.metricRUnlock(req.MetricID) - if err != nil { - reply(conn, proto.ErrUnexpected) - } else { - responseWriter.Close() - } - - case UntilNotFound: - err := s.atree.FindAndIterateInstantByTreeCursor(atree.FindAndIterateInstantByTreeCursorReq{ - FracDigits: result.FracDigits, - Since: since, - Until: until, - RootPageNo: result.PageNo, - ResponseWriter: responseWriter, - }) - s.metricRUnlock(req.MetricID) - if err != nil { - reply(conn, proto.ErrUnexpected) - } else { - responseWriter.Close() - } - - case NoMetric: - reply(conn, proto.ErrNoMetric) - - case WrongMetricType: - reply(conn, proto.ErrWrongMetricType) - - default: - diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) - } - return nil -} - -type cumulativeMeasuresResult struct { - ResultCode byte - FracDigits byte - PageNo uint32 - EndTimestamp uint32 - EndValue float64 } func (s *Database) ListCumulativeMeasures(conn net.Conn, req proto.ListCumulativeMeasuresReq) error { - resultCh := make(chan cumulativeMeasuresResult, 1) - responseWriter := atree.NewCumulativeMeasureWriter(conn) + if req.Since > req.Until { + reply(conn, proto.ErrInvalidRange) + return nil + } - s.appendJobToWorkerQueue(tryListCumulativeMeasuresReq{ + responseWriter := transform.NewCumulativeMeasureWriter(conn, req.Since) + + return s.rangeScan(rangeScanReq{ MetricID: req.MetricID, + MetricType: diploma.Cumulative, Since: req.Since, - Until: req.Until, + Until: req.Until - 1, + Conn: conn, ResponseWriter: responseWriter, - ResultCh: resultCh, }) - - result := <-resultCh - - switch result.ResultCode { - case QueryDone: - responseWriter.Close() - - case UntilFound: - err := s.atree.ContinueIterateCumulativeByTreeCursor(atree.ContinueIterateCumulativeByTreeCursorReq{ - FracDigits: result.FracDigits, - Since: req.Since, - Until: req.Until, - LastPageNo: result.PageNo, - EndTimestamp: result.EndTimestamp, - EndValue: result.EndValue, - ResponseWriter: responseWriter, - }) - s.metricRUnlock(req.MetricID) - if err != nil { - reply(conn, proto.ErrUnexpected) - } else { - responseWriter.Close() - } - - case UntilNotFound: - err := s.atree.FindAndIterateCumulativeByTreeCursor(atree.FindAndIterateCumulativeByTreeCursorReq{ - FracDigits: result.FracDigits, - Since: req.Since, - Until: req.Until, - RootPageNo: result.PageNo, - ResponseWriter: responseWriter, - }) - s.metricRUnlock(req.MetricID) - if err != nil { - reply(conn, proto.ErrUnexpected) - } else { - responseWriter.Close() - } - - case NoMetric: - reply(conn, proto.ErrNoMetric) - - case WrongMetricType: - reply(conn, proto.ErrWrongMetricType) - - default: - diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) - } - return nil } -type instantPeriodsResult struct { +type rangeScanResult struct { ResultCode byte FracDigits byte - PageNo uint32 + RootPageNo uint32 + LastPageNo uint32 } func (s *Database) ListInstantPeriods(conn net.Conn, req proto.ListInstantPeriodsReq) error { - var ( - since = req.Since - until = req.Until - ) - if req.FirstHourOfDay > 0 { - since, until = correctToFHD(since, until, req.FirstHourOfDay) + since, until := timeBoundsOfAggregation(req.Since, req.Until, req.GroupBy, req.FirstHourOfDay) + if since.After(until) { + reply(conn, proto.ErrInvalidRange) + return nil } - if req.LastDayOfMonth > 0 { - // fix + responseWriter, err := transform.NewInstantPeriodsWriter(transform.InstantPeriodsWriterOptions{ + Dst: conn, + GroupBy: req.GroupBy, + AggregateFuncs: req.AggregateFuncs, + FirstHourOfDay: req.FirstHourOfDay, + }) + if err != nil { + reply(conn, proto.ErrUnexpected) + return nil } - resultCh := make(chan instantPeriodsResult, 1) + return s.rangeScan(rangeScanReq{ + MetricID: req.MetricID, + MetricType: diploma.Instant, + Since: uint32(since.Unix()), + Until: uint32(until.Unix()), + Conn: conn, + ResponseWriter: responseWriter, + }) +} - aggregator, err := atree.NewInstantAggregator(atree.InstantAggregatorOptions{ +func (s *Database) ListCumulativePeriods(conn net.Conn, req proto.ListCumulativePeriodsReq) error { + since, until := timeBoundsOfAggregation(req.Since, req.Until, req.GroupBy, req.FirstHourOfDay) + if since.After(until) { + reply(conn, proto.ErrInvalidRange) + return nil + } + + responseWriter, err := transform.NewCumulativePeriodsWriter(transform.CumulativePeriodsWriterOptions{ + Dst: conn, GroupBy: req.GroupBy, FirstHourOfDay: req.FirstHourOfDay, - LastDayOfMonth: req.LastDayOfMonth, }) if err != nil { reply(conn, proto.ErrUnexpected) return nil } - responseWriter := atree.NewInstantPeriodsWriter(conn, req.AggregateFuncs) + return s.rangeScan(rangeScanReq{ + MetricID: req.MetricID, + MetricType: diploma.Cumulative, + Since: uint32(since.Unix()), + Until: uint32(until.Unix()), + Conn: conn, + ResponseWriter: responseWriter, + }) +} + +type rangeScanReq struct { + MetricID uint32 + MetricType diploma.MetricType + Since uint32 + Until uint32 + Conn io.Writer + ResponseWriter atree.PeriodsWriter +} + +func (s *Database) rangeScan(req rangeScanReq) error { + resultCh := make(chan rangeScanResult, 1) - s.appendJobToWorkerQueue(tryListInstantPeriodsReq{ + s.appendJobToWorkerQueue(tryRangeScanReq{ MetricID: req.MetricID, Since: req.Since, Until: req.Until, - Aggregator: aggregator, - ResponseWriter: responseWriter, + MetricType: req.MetricType, + ResponseWriter: req.ResponseWriter, ResultCh: resultCh, }) @@ -907,49 +781,44 @@ func (s *Database) ListInstantPeriods(conn net.Conn, req proto.ListInstantPeriod switch result.ResultCode { case QueryDone: - responseWriter.Close() + req.ResponseWriter.Close() case UntilFound: - err := s.atree.ContinueCollectInstantPeriods(atree.ContinueCollectInstantPeriodsReq{ + err := s.atree.ContinueRangeScan(atree.ContinueRangeScanReq{ FracDigits: result.FracDigits, - Aggregator: aggregator, - ResponseWriter: responseWriter, - LastPageNo: result.PageNo, + ResponseWriter: req.ResponseWriter, + LastPageNo: result.LastPageNo, Since: req.Since, - Until: req.Until, }) s.metricRUnlock(req.MetricID) if err != nil { - reply(conn, proto.ErrUnexpected) + reply(req.Conn, proto.ErrUnexpected) } else { - responseWriter.Close() + req.ResponseWriter.Close() } case UntilNotFound: - err := s.atree.FindInstantPeriods(atree.FindInstantPeriodsReq{ + err := s.atree.RangeScan(atree.RangeScanReq{ FracDigits: result.FracDigits, - ResponseWriter: responseWriter, - RootPageNo: result.PageNo, + ResponseWriter: req.ResponseWriter, + RootPageNo: result.RootPageNo, Since: req.Since, Until: req.Until, - GroupBy: req.GroupBy, - FirstHourOfDay: req.FirstHourOfDay, - LastDayOfMonth: req.LastDayOfMonth, }) s.metricRUnlock(req.MetricID) if err != nil { - reply(conn, proto.ErrUnexpected) + reply(req.Conn, proto.ErrUnexpected) } else { - responseWriter.Close() + req.ResponseWriter.Close() } case NoMetric: - reply(conn, proto.ErrNoMetric) + reply(req.Conn, proto.ErrNoMetric) case WrongMetricType: - reply(conn, proto.ErrWrongMetricType) + reply(req.Conn, proto.ErrWrongMetricType) default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) @@ -957,33 +826,20 @@ func (s *Database) ListInstantPeriods(conn net.Conn, req proto.ListInstantPeriod return nil } -type cumulativePeriodsResult struct { - ResultCode byte - FracDigits byte - PageNo uint32 +type fullScanReq struct { + MetricID uint32 + MetricType diploma.MetricType + Conn io.Writer + ResponseWriter atree.PeriodsWriter } -func (s *Database) ListCumulativePeriods(conn net.Conn, req proto.ListCumulativePeriodsReq) error { - resultCh := make(chan cumulativePeriodsResult, 1) - - aggregator, err := atree.NewCumulativeAggregator(atree.CumulativeAggregatorOptions{ - GroupBy: req.GroupBy, - FirstHourOfDay: req.FirstHourOfDay, - LastDayOfMonth: req.LastDayOfMonth, - }) - if err != nil { - reply(conn, proto.ErrUnexpected) - return nil - } +func (s *Database) fullScan(req fullScanReq) error { + resultCh := make(chan fullScanResult, 1) - responseWriter := atree.NewCumulativePeriodsWriter(conn) - - s.appendJobToWorkerQueue(tryListCumulativePeriodsReq{ + s.appendJobToWorkerQueue(tryFullScanReq{ MetricID: req.MetricID, - Since: req.Since, - Until: req.Until, - Aggregator: aggregator, - ResponseWriter: responseWriter, + MetricType: req.MetricType, + ResponseWriter: req.ResponseWriter, ResultCh: resultCh, }) @@ -991,49 +847,26 @@ func (s *Database) ListCumulativePeriods(conn net.Conn, req proto.ListCumulative switch result.ResultCode { case QueryDone: - responseWriter.Close() + req.ResponseWriter.Close() case UntilFound: - err := s.atree.ContinueCollectCumulativePeriods(atree.ContinueCollectCumulativePeriodsReq{ - FracDigits: result.FracDigits, - Aggregator: aggregator, - ResponseWriter: responseWriter, - LastPageNo: result.PageNo, - Since: req.Since, - Until: req.Until, - }) - s.metricRUnlock(req.MetricID) - - if err != nil { - reply(conn, proto.ErrUnexpected) - } else { - responseWriter.Close() - } - - case UntilNotFound: - err := s.atree.FindCumulativePeriods(atree.FindCumulativePeriodsReq{ + err := s.atree.ContinueFullScan(atree.ContinueFullScanReq{ FracDigits: result.FracDigits, - ResponseWriter: responseWriter, - RootPageNo: result.PageNo, - Since: req.Since, - Until: req.Until, - GroupBy: req.GroupBy, - FirstHourOfDay: req.FirstHourOfDay, - LastDayOfMonth: req.LastDayOfMonth, + ResponseWriter: req.ResponseWriter, + LastPageNo: result.LastPageNo, }) s.metricRUnlock(req.MetricID) - if err != nil { - reply(conn, proto.ErrUnexpected) + reply(req.Conn, proto.ErrUnexpected) } else { - responseWriter.Close() + req.ResponseWriter.Close() } case NoMetric: - reply(conn, proto.ErrNoMetric) + reply(req.Conn, proto.ErrNoMetric) case WrongMetricType: - reply(conn, proto.ErrWrongMetricType) + reply(req.Conn, proto.ErrWrongMetricType) default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) @@ -1042,7 +875,7 @@ func (s *Database) ListCumulativePeriods(conn net.Conn, req proto.ListCumulative } func (s *Database) ListCurrentValues(conn net.Conn, req proto.ListCurrentValuesReq) error { - responseWriter := atree.NewCurrentValueWriter(conn) + responseWriter := transform.NewCurrentValueWriter(conn) defer responseWriter.Close() resultCh := make(chan struct{}) diff --git a/database/helpers.go b/database/helpers.go index 8fbcf89..d8500cd 100644 --- a/database/helpers.go +++ b/database/helpers.go @@ -5,8 +5,32 @@ import ( "io/fs" "os" "time" + + "gordenko.dev/dima/diploma" + "gordenko.dev/dima/diploma/proto" ) +func timeBoundsOfAggregation(since, until proto.TimeBound, groupBy diploma.GroupBy, firstHourOfDay int) (s time.Time, u time.Time) { + switch groupBy { + case diploma.GroupByHour, diploma.GroupByDay: + s = time.Date(since.Year, since.Month, since.Day, 0, 0, 0, 0, time.Local) + u = time.Date(until.Year, until.Month, until.Day, 0, 0, 0, 0, time.Local) + + case diploma.GroupByMonth: + s = time.Date(since.Year, since.Month, 1, 0, 0, 0, 0, time.Local) + u = time.Date(until.Year, until.Month, 1, 0, 0, 0, 0, time.Local) + } + + if firstHourOfDay > 0 { + duration := time.Duration(firstHourOfDay) * time.Hour + s = s.Add(duration) + u = u.Add(duration) + } + + u = u.Add(-1 * time.Second) + return +} + func isFileExist(fileName string) (bool, error) { _, err := os.Stat(fileName) if err != nil { diff --git a/database/proc.go b/database/proc.go index 5d7e330..1452d95 100644 --- a/database/proc.go +++ b/database/proc.go @@ -7,22 +7,27 @@ import ( "gordenko.dev/dima/diploma/atree" "gordenko.dev/dima/diploma/chunkenc" "gordenko.dev/dima/diploma/conbuf" - "gordenko.dev/dima/diploma/enc" + "gordenko.dev/dima/diploma/transform" "gordenko.dev/dima/diploma/txlog" ) const ( - QueryDone = 1 - UntilFound = 2 - UntilNotFound = 3 - NoMetric = 4 - MetricDuplicate = 5 - Succeed = 6 - NewPage = 7 - ExpiredMeasure = 8 - NonMonotonicValue = 9 - CanAppend = 10 - WrongMetricType = 11 + QueryDone = 1 + UntilFound = 2 + UntilNotFound = 3 + RangeFound = 15 + NoMeasures = 16 + NoMetric = 4 + MetricDuplicate = 5 + Succeed = 6 + NewPage = 7 + ExpiredMeasure = 8 + NonMonotonicValue = 9 + CanAppend = 10 + WrongMetricType = 11 + NoMeasuresToDelete = 12 + DeleteFromAtreeNotNeeded = 13 + DeleteFromAtreeRequired = 14 ) func (s *Database) worker() { @@ -92,23 +97,11 @@ func (s *Database) DoWork() { case tryListCurrentValuesReq: s.tryListCurrentValues(req) // all metrics only - case tryListCumulativePeriodsReq: - s.tryListCumulativePeriods(req) + case tryRangeScanReq: + s.tryRangeScan(req) - case tryListInstantPeriodsReq: - s.tryListInstantPeriods(req) - - case tryListCumulativeMeasuresReq: - s.tryListCumulativeMeasures(req) - - case tryListInstantMeasuresReq: - s.tryListInstantMeasures(req) - - case tryListAllInstantMeasuresReq: - s.tryListAllInstantMeasures(req) - - case tryListAllCumulativeMeasuresReq: - s.tryListAllCumulativeMeasures(req) + case tryFullScanReq: + s.tryFullScan(req) case tryAddMetricReq: s.tryAddMetric(req) @@ -139,23 +132,11 @@ func (s *Database) processMetricQueue(metricID uint32, metric *_metric, lockEntr for _, untyped := range lockEntry.WaitQueue { var rLockRequired bool switch req := untyped.(type) { - case tryListCumulativePeriodsReq: - rLockRequired = s.startListCumulativePeriods(metric, req) - - case tryListInstantPeriodsReq: - rLockRequired = s.startListInstantPeriods(metric, req) - - case tryListCumulativeMeasuresReq: - rLockRequired = s.startListCumulativeMeasures(metric, req) - - case tryListInstantMeasuresReq: - rLockRequired = s.startListInstantMeasures(metric, req) - - case tryListAllInstantMeasuresReq: - rLockRequired = s.startListAllInstantMeasures(metric, req) + case tryRangeScanReq: + rLockRequired = s.startRangeScan(metric, req) - case tryListAllCumulativeMeasuresReq: - rLockRequired = s.startListAllCumulativeMeasures(metric, req) + case tryFullScanReq: + rLockRequired = s.startFullScan(metric, req) case tryGetMetricReq: s.tryGetMetric(req) @@ -168,9 +149,7 @@ func (s *Database) processMetricQueue(metricID uint32, metric *_metric, lockEntr lockEntry.RLocks++ } } - lockEntry.WaitQueue = nil - if lockEntry.RLocks > 0 { lockEntry.WaitQueue = modificationReqs } else { @@ -315,6 +294,12 @@ func (s *Database) tryDeleteMeasures(req tryDeleteMeasuresReq) { return } + if metric.Since == 0 || (req.Since > 0 && metric.Until < req.Since) { + req.ResultCh <- tryDeleteMeasuresResult{ + ResultCode: NoMeasuresToDelete, + } + } + lockEntry, ok := s.metricLockEntries[req.MetricID] if ok { lockEntry.WaitQueue = append(lockEntry.WaitQueue, req) @@ -327,516 +312,16 @@ func (s *Database) startDeleteMeasures(metric *_metric, req tryDeleteMeasuresReq s.metricLockEntries[req.MetricID] = &metricLockEntry{ XLock: true, } - req.ResultCh <- tryDeleteMeasuresResult{ - ResultCode: Succeed, - RootPageNo: metric.RootPageNo, - } -} - -// SELECT - -type tryListAllInstantMeasuresReq struct { - MetricID uint32 - ResponseWriter *atree.InstantMeasureWriter - ResultCh chan instantMeasuresResult -} - -func (s *Database) tryListAllInstantMeasures(req tryListAllInstantMeasuresReq) { - metric, ok := s.metrics[req.MetricID] - if !ok { - req.ResultCh <- instantMeasuresResult{ - ResultCode: NoMetric, - } - return - } - - if metric.MetricType != diploma.Instant { - req.ResultCh <- instantMeasuresResult{ - ResultCode: WrongMetricType, - } - return - } - - lockEntry, ok := s.metricLockEntries[req.MetricID] - if ok { - if lockEntry.XLock { - lockEntry.WaitQueue = append(lockEntry.WaitQueue, req) - return - } - } - - if s.startListAllInstantMeasures(metric, req) { - if lockEntry != nil { - lockEntry.RLocks++ - } else { - s.metricLockEntries[req.MetricID] = &metricLockEntry{ - RLocks: 1, - } - } - } -} - -func (s *Database) startListAllInstantMeasures(metric *_metric, req tryListAllInstantMeasuresReq) bool { - if metric.Since == 0 { - req.ResultCh <- instantMeasuresResult{ - ResultCode: QueryDone, - } - return false - } - - timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor( - metric.TimestampsBuf, - metric.Timestamps.Size(), - ) - - valueDecompressor := chunkenc.NewReverseInstantDeltaDecompressor( - metric.ValuesBuf, - metric.Values.Size(), - metric.FracDigits, - ) - - var ( - value float64 - timestamp uint32 - done bool - - buffered int - ) - - for { - timestamp, done = timestampDecompressor.NextValue() - if done { - break - } - - value, done = valueDecompressor.NextValue() - if done { - diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug) - } - - req.ResponseWriter.BufferMeasure(atree.InstantMeasure{ - Timestamp: timestamp, - Value: value, - }) - - buffered++ - } - - if metric.LastPageNo > 0 { - req.ResultCh <- instantMeasuresResult{ - ResultCode: UntilFound, - PageNo: metric.LastPageNo, - FracDigits: metric.FracDigits, - } - return true - } else { - req.ResultCh <- instantMeasuresResult{ - ResultCode: QueryDone, - } - return false - } -} - -type tryListInstantMeasuresReq struct { - MetricID uint32 - Since uint32 - Until uint32 - ResponseWriter *atree.InstantMeasureWriter - ResultCh chan instantMeasuresResult -} - -func (s *Database) tryListInstantMeasures(req tryListInstantMeasuresReq) { - metric, ok := s.metrics[req.MetricID] - if !ok { - req.ResultCh <- instantMeasuresResult{ - ResultCode: NoMetric, - } - return - } - - if metric.MetricType != diploma.Instant { - req.ResultCh <- instantMeasuresResult{ - ResultCode: WrongMetricType, - } - return - } - - lockEntry, ok := s.metricLockEntries[req.MetricID] - if ok { - if lockEntry.XLock { - lockEntry.WaitQueue = append(lockEntry.WaitQueue, req) - return - } - } - if s.startListInstantMeasures(metric, req) { - if lockEntry != nil { - lockEntry.RLocks++ - } else { - s.metricLockEntries[req.MetricID] = &metricLockEntry{ - RLocks: 1, - } - } - } -} - -func (s *Database) startListInstantMeasures(metric *_metric, req tryListInstantMeasuresReq) bool { - if metric.Since == 0 { - req.ResultCh <- instantMeasuresResult{ - ResultCode: QueryDone, - } - return false - } - - if req.Since > metric.Until { - req.ResultCh <- instantMeasuresResult{ - ResultCode: QueryDone, - } - return false - } - - if req.Until < metric.Since { - if metric.RootPageNo > 0 { - req.ResultCh <- instantMeasuresResult{ - ResultCode: UntilNotFound, - PageNo: metric.RootPageNo, - FracDigits: metric.FracDigits, - } - return true - } else { - req.ResultCh <- instantMeasuresResult{ - ResultCode: QueryDone, - } - return false - } - } - - timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor( - metric.TimestampsBuf, - metric.Timestamps.Size(), - ) - - valueDecompressor := chunkenc.NewReverseInstantDeltaDecompressor( - metric.ValuesBuf, - metric.Values.Size(), - metric.FracDigits, - ) - - var ( - value float64 - timestamp uint32 - done bool - ) - - for { - timestamp, done = timestampDecompressor.NextValue() - if done { - break - } - - value, done = valueDecompressor.NextValue() - if done { - diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug) - } - - if timestamp <= req.Until { - if timestamp < req.Since { - req.ResultCh <- instantMeasuresResult{ - ResultCode: QueryDone, - } - return false - } - - req.ResponseWriter.BufferMeasure(atree.InstantMeasure{ - Timestamp: timestamp, - Value: value, - }) - - if timestamp == req.Since { - req.ResultCh <- instantMeasuresResult{ - ResultCode: QueryDone, - } - return false - } - } - } - - if metric.LastPageNo > 0 { - req.ResultCh <- instantMeasuresResult{ - ResultCode: UntilFound, - PageNo: metric.LastPageNo, - FracDigits: metric.FracDigits, - } - return true - } else { - req.ResultCh <- instantMeasuresResult{ - ResultCode: QueryDone, - } - return false - } -} - -// CUMULATIVE - -type tryListAllCumulativeMeasuresReq struct { - MetricID uint32 - ResponseWriter *atree.CumulativeMeasureWriter - ResultCh chan cumulativeMeasuresResult -} - -func (s *Database) tryListAllCumulativeMeasures(req tryListAllCumulativeMeasuresReq) { - metric, ok := s.metrics[req.MetricID] - if !ok { - req.ResultCh <- cumulativeMeasuresResult{ - ResultCode: NoMetric, - } - return - } - if metric.MetricType != diploma.Cumulative { - req.ResultCh <- cumulativeMeasuresResult{ - ResultCode: WrongMetricType, - } - return - } - - lockEntry, ok := s.metricLockEntries[req.MetricID] - if ok { - if lockEntry.XLock { - lockEntry.WaitQueue = append(lockEntry.WaitQueue, req) - return - } - } - if s.startListAllCumulativeMeasures(metric, req) { - if lockEntry != nil { - lockEntry.RLocks++ - } else { - s.metricLockEntries[req.MetricID] = &metricLockEntry{ - RLocks: 1, - } - } - } -} - -func (s *Database) startListAllCumulativeMeasures(metric *_metric, req tryListAllCumulativeMeasuresReq) bool { - if metric.Since == 0 { - req.ResultCh <- cumulativeMeasuresResult{ - ResultCode: QueryDone, - } - return false - } - - timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor( - metric.TimestampsBuf, - metric.Timestamps.Size(), - ) - - valueDecompressor := chunkenc.NewReverseCumulativeDeltaDecompressor( - metric.ValuesBuf, - metric.Values.Size(), - metric.FracDigits, - ) - - endTimestamp, done := timestampDecompressor.NextValue() - if done { - req.ResultCh <- cumulativeMeasuresResult{ - ResultCode: QueryDone, - } - return false - } - - endValue, done := valueDecompressor.NextValue() - if done { - diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug) - } - - for { - timestamp, done := timestampDecompressor.NextValue() - if done { - break - } - - value, done := valueDecompressor.NextValue() - if done { - diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug) - } - - req.ResponseWriter.WriteMeasure(atree.CumulativeMeasure{ - Timestamp: endTimestamp, - Value: endValue, - Total: endValue - value, - }) - endTimestamp = timestamp - endValue = value - } - - if metric.LastPageNo > 0 { - req.ResultCh <- cumulativeMeasuresResult{ - ResultCode: UntilFound, - PageNo: metric.LastPageNo, - FracDigits: metric.FracDigits, - EndTimestamp: endTimestamp, - EndValue: endValue, - } - return true - } else { - req.ResponseWriter.WriteMeasure(atree.CumulativeMeasure{ - Timestamp: endTimestamp, - Value: endValue, - Total: endValue, - }) - req.ResultCh <- cumulativeMeasuresResult{ - ResultCode: QueryDone, - } - return false - } -} - -type tryListCumulativeMeasuresReq struct { - MetricID uint32 - Since uint32 - Until uint32 - ResponseWriter *atree.CumulativeMeasureWriter - ResultCh chan cumulativeMeasuresResult -} - -func (s *Database) tryListCumulativeMeasures(req tryListCumulativeMeasuresReq) { - metric, ok := s.metrics[req.MetricID] - if !ok { - req.ResultCh <- cumulativeMeasuresResult{ - ResultCode: NoMetric, - } - return - } - if metric.MetricType != diploma.Cumulative { - req.ResultCh <- cumulativeMeasuresResult{ - ResultCode: WrongMetricType, - } - return - } - - lockEntry, ok := s.metricLockEntries[req.MetricID] - if ok { - if lockEntry.XLock { - lockEntry.WaitQueue = append(lockEntry.WaitQueue, req) - return - } - } - if s.startListCumulativeMeasures(metric, req) { - if lockEntry != nil { - lockEntry.RLocks++ - } else { - s.metricLockEntries[req.MetricID] = &metricLockEntry{ - RLocks: 1, - } - } - } -} - -func (s *Database) startListCumulativeMeasures(metric *_metric, req tryListCumulativeMeasuresReq) bool { - if metric.Since == 0 { - req.ResultCh <- cumulativeMeasuresResult{ - ResultCode: QueryDone, - } - return false - } - - if req.Since > metric.Until { - req.ResultCh <- cumulativeMeasuresResult{ - ResultCode: QueryDone, - } - return false - } - - if req.Until < metric.Since { - // search in tree - if metric.RootPageNo > 0 { - req.ResultCh <- cumulativeMeasuresResult{ - ResultCode: UntilNotFound, - PageNo: metric.RootPageNo, - FracDigits: metric.FracDigits, - } - return true - } else { - req.ResultCh <- cumulativeMeasuresResult{ - ResultCode: QueryDone, - } - return false - } - } - - timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor( - metric.TimestampsBuf, - metric.Timestamps.Size(), - ) - - valueDecompressor := chunkenc.NewReverseCumulativeDeltaDecompressor( - metric.ValuesBuf, - metric.Values.Size(), - metric.FracDigits, - ) - - var ( - endTimestamp uint32 - endValue float64 - ) - - for { - timestamp, done := timestampDecompressor.NextValue() - if done { - break - } - - value, done := valueDecompressor.NextValue() - if done { - diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug) - } - - if timestamp > req.Until { - continue - } - - if timestamp < req.Since { - req.ResponseWriter.WriteMeasure(atree.CumulativeMeasure{ - Timestamp: endTimestamp, - Value: endValue, - Total: endValue - value, - }) - req.ResultCh <- cumulativeMeasuresResult{ - ResultCode: QueryDone, - } - return false - } - - if endTimestamp > 0 { - req.ResponseWriter.WriteMeasure(atree.CumulativeMeasure{ - Timestamp: endTimestamp, - Value: endValue, - Total: endValue - value, - }) - } - endTimestamp = timestamp - endValue = value - } - - if metric.LastPageNo > 0 { - req.ResultCh <- cumulativeMeasuresResult{ - ResultCode: UntilFound, - PageNo: metric.LastPageNo, - FracDigits: metric.FracDigits, - EndTimestamp: endTimestamp, - EndValue: endValue, + if metric.RootPageNo > 0 { + req.ResultCh <- tryDeleteMeasuresResult{ + ResultCode: DeleteFromAtreeRequired, + RootPageNo: metric.RootPageNo, } - return true } else { - req.ResponseWriter.WriteMeasure(atree.CumulativeMeasure{ - Timestamp: endTimestamp, - Value: endValue, - Total: endValue, - }) - req.ResultCh <- cumulativeMeasuresResult{ - ResultCode: QueryDone, + req.ResultCh <- tryDeleteMeasuresResult{ + ResultCode: DeleteFromAtreeNotNeeded, } - return false } } @@ -851,6 +336,7 @@ func (s *Database) tryAppendMeasure(req tryAppendMeasureReq) { metric, ok := s.metrics[req.MetricID] if !ok { req.ResultCh <- tryAppendMeasureResult{ + MetricID: req.MetricID, ResultCode: NoMetric, } return @@ -869,6 +355,7 @@ func (s *Database) tryAppendMeasure(req tryAppendMeasureReq) { func (s *Database) startAppendMeasure(metric *_metric, req tryAppendMeasureReq, lockEntry *metricLockEntry) { if req.Timestamp <= metric.Until { req.ResultCh <- tryAppendMeasureResult{ + MetricID: req.MetricID, ResultCode: ExpiredMeasure, } return @@ -876,6 +363,7 @@ func (s *Database) startAppendMeasure(metric *_metric, req tryAppendMeasureReq, if metric.MetricType == diploma.Cumulative && req.Value < metric.UntilValue { req.ResultCh <- tryAppendMeasureResult{ + MetricID: req.MetricID, ResultCode: NonMonotonicValue, } return @@ -895,6 +383,9 @@ func (s *Database) startAppendMeasure(metric *_metric, req tryAppendMeasureReq, } } req.ResultCh <- tryAppendMeasureResult{ + MetricID: req.MetricID, + Timestamp: req.Timestamp, + Value: req.Value, ResultCode: CanAppend, } } else { @@ -911,6 +402,9 @@ func (s *Database) startAppendMeasure(metric *_metric, req tryAppendMeasureReq, } req.ResultCh <- tryAppendMeasureResult{ + MetricID: req.MetricID, + Timestamp: req.Timestamp, + Value: req.Value, ResultCode: NewPage, FilledPage: &FilledPage{ Since: metric.Since, @@ -959,7 +453,6 @@ func (s *Database) appendMeasure(rec txlog.AppendedMeasure) { metric.Timestamps.Append(rec.Timestamp) metric.Values.Append(rec.Value) - metric.Until = rec.Timestamp metric.UntilValue = rec.Value @@ -1003,7 +496,6 @@ func (s *Database) appendMeasures(extended txlog.AppendedMeasuresExtended) { metric.Timestamps.Append(measure.Timestamp) metric.Values.Append(measure.Value) - metric.Until = measure.Timestamp metric.UntilValue = measure.Value } @@ -1042,7 +534,6 @@ func (s *Database) appendMeasureAfterOverflow(extended txlog.AppendedMeasureWith } metric.LastPageNo = rec.DataPageNo - // delete free pages if rec.IsDataPageReused { s.dataFreeList.DeletePages([]uint32{ rec.DataPageNo, @@ -1147,25 +638,25 @@ func (s *Database) startAppendMeasures(metric *_metric, req tryAppendMeasuresReq } } -type tryListCumulativePeriodsReq struct { +type tryRangeScanReq struct { MetricID uint32 Since uint32 Until uint32 - Aggregator *atree.CumulativeAggregator - ResponseWriter *atree.CumulativePeriodsWriter - ResultCh chan cumulativePeriodsResult + MetricType diploma.MetricType + ResponseWriter atree.WorkerMeasureConsumer + ResultCh chan rangeScanResult } -func (s *Database) tryListCumulativePeriods(req tryListCumulativePeriodsReq) { +func (s *Database) tryRangeScan(req tryRangeScanReq) { metric, ok := s.metrics[req.MetricID] if !ok { - req.ResultCh <- cumulativePeriodsResult{ + req.ResultCh <- rangeScanResult{ ResultCode: NoMetric, } return } - if metric.MetricType != diploma.Cumulative { - req.ResultCh <- cumulativePeriodsResult{ + if metric.MetricType != req.MetricType { + req.ResultCh <- rangeScanResult{ ResultCode: WrongMetricType, } return @@ -1179,7 +670,7 @@ func (s *Database) tryListCumulativePeriods(req tryListCumulativePeriodsReq) { } } - if s.startListCumulativePeriods(metric, req) { + if s.startRangeScan(metric, req) { if lockEntry != nil { lockEntry.RLocks++ } else { @@ -1190,16 +681,16 @@ func (s *Database) tryListCumulativePeriods(req tryListCumulativePeriodsReq) { } } -func (s *Database) startListCumulativePeriods(metric *_metric, req tryListCumulativePeriodsReq) bool { +func (*Database) startRangeScan(metric *_metric, req tryRangeScanReq) bool { if metric.Since == 0 { - req.ResultCh <- cumulativePeriodsResult{ + req.ResultCh <- rangeScanResult{ ResultCode: QueryDone, } return false } if req.Since > metric.Until { - req.ResultCh <- cumulativePeriodsResult{ + req.ResultCh <- rangeScanResult{ ResultCode: QueryDone, } return false @@ -1207,14 +698,14 @@ func (s *Database) startListCumulativePeriods(metric *_metric, req tryListCumula if req.Until < metric.Since { if metric.RootPageNo > 0 { - req.ResultCh <- cumulativePeriodsResult{ + req.ResultCh <- rangeScanResult{ ResultCode: UntilNotFound, - PageNo: metric.RootPageNo, + RootPageNo: metric.RootPageNo, FracDigits: metric.FracDigits, } return true } else { - req.ResultCh <- cumulativePeriodsResult{ + req.ResultCh <- rangeScanResult{ ResultCode: QueryDone, } return false @@ -1226,14 +717,12 @@ func (s *Database) startListCumulativePeriods(metric *_metric, req tryListCumula metric.Timestamps.Size(), ) - valueDecompressor := chunkenc.NewReverseCumulativeDeltaDecompressor( + valueDecompressor := chunkenc.NewReverseInstantDeltaDecompressor( metric.ValuesBuf, metric.Values.Size(), metric.FracDigits, ) - var period atree.CumulativePeriod - for { timestamp, done := timestampDecompressor.NextValue() if done { @@ -1245,67 +734,49 @@ func (s *Database) startListCumulativePeriods(metric *_metric, req tryListCumula diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug) } - if timestamp > req.Until { - continue - } - - if timestamp < req.Since { - isCompleted := req.Aggregator.FillPeriod(timestamp, value, &period) - if isCompleted { - req.ResponseWriter.WritePeriod(period) - } - req.ResultCh <- cumulativePeriodsResult{ - ResultCode: QueryDone, - } - return false - } - if timestamp <= req.Until { - isCompleted := req.Aggregator.Feed(timestamp, value, &period) - if isCompleted { - req.ResponseWriter.WritePeriod(period) + req.ResponseWriter.FeedNoSend(timestamp, value) + if timestamp < req.Since { + req.ResultCh <- rangeScanResult{ + ResultCode: QueryDone, + } + return false } } } if metric.LastPageNo > 0 { - req.ResultCh <- cumulativePeriodsResult{ + req.ResultCh <- rangeScanResult{ ResultCode: UntilFound, - PageNo: metric.LastPageNo, + LastPageNo: metric.LastPageNo, FracDigits: metric.FracDigits, } return true } else { - isCompleted := req.Aggregator.FillPeriod(metric.Since, metric.SinceValue, &period) - if isCompleted { - req.ResponseWriter.WritePeriod(period) - } - req.ResultCh <- cumulativePeriodsResult{ + req.ResultCh <- rangeScanResult{ ResultCode: QueryDone, } return false } } -type tryListInstantPeriodsReq struct { +type tryFullScanReq struct { MetricID uint32 - Since uint32 - Until uint32 - Aggregator *atree.InstantAggregator - ResponseWriter *atree.InstantPeriodsWriter - ResultCh chan instantPeriodsResult + MetricType diploma.MetricType + ResponseWriter atree.WorkerMeasureConsumer + ResultCh chan fullScanResult } -func (s *Database) tryListInstantPeriods(req tryListInstantPeriodsReq) { +func (s *Database) tryFullScan(req tryFullScanReq) { metric, ok := s.metrics[req.MetricID] if !ok { - req.ResultCh <- instantPeriodsResult{ + req.ResultCh <- fullScanResult{ ResultCode: NoMetric, } return } - if metric.MetricType != diploma.Instant { - req.ResultCh <- instantPeriodsResult{ + if metric.MetricType != req.MetricType { + req.ResultCh <- fullScanResult{ ResultCode: WrongMetricType, } return @@ -1318,7 +789,8 @@ func (s *Database) tryListInstantPeriods(req tryListInstantPeriodsReq) { return } } - if s.startListInstantPeriods(metric, req) { + + if s.startFullScan(metric, req) { if lockEntry != nil { lockEntry.RLocks++ } else { @@ -1329,94 +801,45 @@ func (s *Database) tryListInstantPeriods(req tryListInstantPeriodsReq) { } } -func (*Database) startListInstantPeriods(metric *_metric, req tryListInstantPeriodsReq) bool { +func (*Database) startFullScan(metric *_metric, req tryFullScanReq) bool { if metric.Since == 0 { - req.ResultCh <- instantPeriodsResult{ - ResultCode: QueryDone, - } - return false - } - - if req.Since > metric.Until { - req.ResultCh <- instantPeriodsResult{ + req.ResultCh <- fullScanResult{ ResultCode: QueryDone, } return false } - if req.Until < metric.Since { - // search in tree - if metric.RootPageNo > 0 { - req.ResultCh <- instantPeriodsResult{ - ResultCode: UntilNotFound, - PageNo: metric.RootPageNo, - FracDigits: metric.FracDigits, - } - return true - } else { - req.ResultCh <- instantPeriodsResult{ - ResultCode: QueryDone, - } - return false - } - } - timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor( metric.TimestampsBuf, metric.Timestamps.Size(), ) - valueDecompressor := chunkenc.NewReverseInstantDeltaDecompressor( metric.ValuesBuf, metric.Values.Size(), metric.FracDigits, ) - var period atree.InstantPeriod - for { timestamp, done := timestampDecompressor.NextValue() if done { break } - value, done := valueDecompressor.NextValue() if done { diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug) } - - if timestamp <= req.Until { - if timestamp < req.Since { - isCompleted := req.Aggregator.FillPeriod(timestamp, &period) - if isCompleted { - req.ResponseWriter.WritePeriod(period) - } - req.ResultCh <- instantPeriodsResult{ - ResultCode: QueryDone, - } - return false - } - - isCompleted := req.Aggregator.Feed(timestamp, value, &period) - if isCompleted { - req.ResponseWriter.WritePeriod(period) - } - } + req.ResponseWriter.FeedNoSend(timestamp, value) } if metric.LastPageNo > 0 { - req.ResultCh <- instantPeriodsResult{ + req.ResultCh <- fullScanResult{ ResultCode: UntilFound, - PageNo: metric.LastPageNo, + LastPageNo: metric.LastPageNo, FracDigits: metric.FracDigits, } return true } else { - isCompleted := req.Aggregator.FillPeriod(metric.Since, &period) - if isCompleted { - req.ResponseWriter.WritePeriod(period) - } - req.ResultCh <- instantPeriodsResult{ + req.ResultCh <- fullScanResult{ ResultCode: QueryDone, } return false @@ -1425,7 +848,7 @@ func (*Database) startListInstantPeriods(metric *_metric, req tryListInstantPeri type tryListCurrentValuesReq struct { MetricIDs []uint32 - ResponseWriter *atree.CurrentValueWriter + ResponseWriter *transform.CurrentValueWriter ResultCh chan struct{} } @@ -1433,7 +856,7 @@ func (s *Database) tryListCurrentValues(req tryListCurrentValuesReq) { for _, metricID := range req.MetricIDs { metric, ok := s.metrics[metricID] if ok { - req.ResponseWriter.BufferValue(atree.CurrentValue{ + req.ResponseWriter.BufferValue(transform.CurrentValue{ MetricID: metricID, Timestamp: metric.Until, Value: metric.UntilValue, @@ -1443,6 +866,8 @@ func (s *Database) tryListCurrentValues(req tryListCurrentValuesReq) { req.ResultCh <- struct{}{} } +/////////////////////////////////////////////////////// + func (s *Database) applyChanges(req txlog.Changes) { for _, untyped := range req.Records { switch rec := untyped.(type) { @@ -1463,9 +888,6 @@ func (s *Database) applyChanges(req txlog.Changes) { case txlog.DeletedMeasures: s.deleteMeasures(rec) - - case txlog.DeletedMeasuresSince: - s.deleteMeasuresSince(rec) } } @@ -1556,36 +978,17 @@ func (s *Database) deleteMetric(rec txlog.DeletedMetric) { switch req := untyped.(type) { case tryAppendMeasureReq: req.ResultCh <- tryAppendMeasureResult{ + MetricID: req.MetricID, ResultCode: NoMetric, } - case tryListCumulativePeriodsReq: - req.ResultCh <- cumulativePeriodsResult{ - ResultCode: NoMetric, - } - - case tryListInstantPeriodsReq: - req.ResultCh <- instantPeriodsResult{ - ResultCode: NoMetric, - } - - case tryListCumulativeMeasuresReq: - req.ResultCh <- cumulativeMeasuresResult{ + case tryRangeScanReq: + req.ResultCh <- rangeScanResult{ ResultCode: NoMetric, } - case tryListInstantMeasuresReq: - req.ResultCh <- instantMeasuresResult{ - ResultCode: NoMetric, - } - - case tryListAllCumulativeMeasuresReq: - req.ResultCh <- cumulativeMeasuresResult{ - ResultCode: NoMetric, - } - - case tryListAllInstantMeasuresReq: - req.ResultCh <- instantMeasuresResult{ + case tryFullScanReq: + req.ResultCh <- fullScanResult{ ResultCode: NoMetric, } @@ -1618,11 +1021,9 @@ func (s *Database) deleteMetric(rec txlog.DeletedMetric) { if len(rec.FreeDataPages) > 0 { s.dataFreeList.AddPages(rec.FreeDataPages) - s.atree.DeleteDataPages(rec.FreeDataPages) } if len(rec.FreeIndexPages) > 0 { s.indexFreeList.AddPages(rec.FreeIndexPages) - s.atree.DeleteIndexPages(rec.FreeIndexPages) } if len(addMetricReqs) > 0 { @@ -1650,79 +1051,13 @@ func (s *Database) deleteMeasures(rec txlog.DeletedMeasures) { fmt.Errorf("deleteMeasures: xlock not set for the metric %d", rec.MetricID)) } - metric.DeleteMeasures() lockEntry.XLock = false - // - if len(rec.FreeDataPages) > 0 { - s.dataFreeList.AddPages(rec.FreeDataPages) - s.atree.DeleteDataPages(rec.FreeDataPages) - } - if len(rec.FreeDataPages) > 0 { - s.indexFreeList.AddPages(rec.FreeIndexPages) - s.atree.DeleteIndexPages(rec.FreeIndexPages) - } - s.doAfterReleaseXLock(rec.MetricID, metric, lockEntry) -} - -func (s *Database) deleteMeasuresSince(rec txlog.DeletedMeasuresSince) { - metric, ok := s.metrics[rec.MetricID] - if !ok { - diploma.Abort(diploma.NoMetricBug, - fmt.Errorf("deleteMeasuresSince: metric %d not found", - rec.MetricID)) - } - - lockEntry, ok := s.metricLockEntries[rec.MetricID] - if !ok { - diploma.Abort(diploma.NoLockEntryBug, - fmt.Errorf("deleteMeasuresSince: lockEntry not found for the metric %d", - rec.MetricID)) - } - - if !lockEntry.XLock { - diploma.Abort(diploma.NoXLockBug, - fmt.Errorf("deleteMeasuresSince: xlock not set for the metric %d", - rec.MetricID)) - } - - if rec.IsRootChanged && rec.RootPageNo == 0 { - metric.DeleteMeasures() - } else { - metric.TimestampsBuf = conbuf.NewFromBuffer(rec.TimestampsBuf) - metric.ValuesBuf = conbuf.NewFromBuffer(rec.ValuesBuf) - - metric.Timestamps = chunkenc.NewReverseTimeDeltaOfDeltaCompressor( - metric.TimestampsBuf, len(rec.TimestampsBuf)) - - if metric.MetricType == diploma.Cumulative { - metric.Values = chunkenc.NewReverseCumulativeDeltaCompressor( - metric.ValuesBuf, len(rec.ValuesBuf), metric.FracDigits) - } else { - metric.Values = chunkenc.NewReverseInstantDeltaCompressor( - metric.ValuesBuf, len(rec.ValuesBuf), metric.FracDigits) - } - - metric.Since, metric.Until = enc.GetTimeRange(rec.TimestampsBuf) - metric.SinceValue, metric.UntilValue = enc.GetValueBounds( - rec.ValuesBuf, metric.MetricType, metric.FracDigits, - ) - - if rec.IsRootChanged { - metric.RootPageNo = rec.RootPageNo - } - metric.LastPageNo = rec.LastPageNo - } - - lockEntry.XLock = false - if len(rec.FreeDataPages) > 0 { s.dataFreeList.AddPages(rec.FreeDataPages) - s.atree.DeleteDataPages(rec.FreeDataPages) } if len(rec.FreeDataPages) > 0 { s.indexFreeList.AddPages(rec.FreeIndexPages) - s.atree.DeleteIndexPages(rec.FreeIndexPages) } s.doAfterReleaseXLock(rec.MetricID, metric, lockEntry) } diff --git a/examples/loadtest/loadtest.go b/examples/loadtest/loadtest.go index 7ab57ef..1de19ac 100644 --- a/examples/loadtest/loadtest.go +++ b/examples/loadtest/loadtest.go @@ -333,11 +333,22 @@ func execQuery(conn *client.Connection, queryGenerator *RandomQueryGenerator, st } case listInstantPeriods: + since := time.Unix(int64(recipe.Since), 0) + until := time.Unix(int64(recipe.Until), 0) + // t1 := time.Now() _, err := conn.ListInstantPeriods(proto.ListInstantPeriodsReq{ - MetricID: recipe.MetricID, - Since: recipe.Since, - Until: recipe.Until, + MetricID: recipe.MetricID, + Since: proto.TimeBound{ + Year: since.Year(), + Month: since.Month(), + Day: since.Day(), + }, + Until: proto.TimeBound{ + Year: until.Year(), + Month: until.Month(), + Day: until.Day(), + }, GroupBy: recipe.GroupBy, AggregateFuncs: diploma.AggregateMin | diploma.AggregateMax | diploma.AggregateAvg, }) @@ -354,12 +365,23 @@ func execQuery(conn *client.Connection, queryGenerator *RandomQueryGenerator, st } case listCumulativePeriods: + since := time.Unix(int64(recipe.Since), 0) + until := time.Unix(int64(recipe.Until), 0) + // t1 := time.Now() _, err := conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{ MetricID: recipe.MetricID, - Since: recipe.Since, - Until: recipe.Until, - GroupBy: recipe.GroupBy, + Since: proto.TimeBound{ + Year: since.Year(), + Month: since.Month(), + Day: since.Day(), + }, + Until: proto.TimeBound{ + Year: until.Year(), + Month: until.Month(), + Day: until.Day(), + }, + GroupBy: recipe.GroupBy, }) elapsedTime = time.Since(t1) stat.ElapsedTime += elapsedTime diff --git a/examples/requests/generate.go b/examples/requests/generate.go index d921e0f..34abc64 100644 --- a/examples/requests/generate.go +++ b/examples/requests/generate.go @@ -4,12 +4,12 @@ import ( "math/rand" "time" - "gordenko.dev/dima/diploma/client" + "gordenko.dev/dima/diploma/proto" ) -func GenerateCumulativeMeasures(days int) []client.Measure { +func GenerateCumulativeMeasures(days int) []proto.Measure { var ( - measures []client.Measure + measures []proto.Measure minutes = []int{14, 29, 44, 59} hoursPerDay = 24 totalHours = days * hoursPerDay @@ -31,7 +31,7 @@ func GenerateCumulativeMeasures(days int) []client.Measure { time.Local, ) - measure := client.Measure{ + measure := proto.Measure{ Timestamp: uint32(measureTime.Unix()), Value: totalValue, } @@ -43,9 +43,9 @@ func GenerateCumulativeMeasures(days int) []client.Measure { return measures } -func GenerateInstantMeasures(days int, baseValue float64) []client.Measure { +func GenerateInstantMeasures(days int, baseValue float64) []proto.Measure { var ( - measures []client.Measure + measures []proto.Measure minutes = []int{14, 29, 44, 59} hoursPerDay = 24 totalHours = days * hoursPerDay @@ -70,7 +70,7 @@ func GenerateInstantMeasures(days int, baseValue float64) []client.Measure { fluctuation := baseValue * 0.1 value := baseValue + (rand.Float64()*2-1)*fluctuation - measure := client.Measure{ + measure := proto.Measure{ Timestamp: uint32(measureTime.Unix()), Value: value, } diff --git a/examples/requests/requests.go b/examples/requests/requests.go index 60213bb..3aef7d8 100644 --- a/examples/requests/requests.go +++ b/examples/requests/requests.go @@ -14,7 +14,7 @@ func sendRequests(conn *client.Connection) { var ( instantMetricID uint32 = 10000 cumulativeMetricID uint32 = 10001 - fracDigits byte = 2 + fracDigits int = 2 err error ) @@ -23,7 +23,7 @@ func sendRequests(conn *client.Connection) { // ADD INSTANT METRIC - err = conn.AddMetric(client.Metric{ + err = conn.AddMetric(proto.AddMetricReq{ MetricID: instantMetricID, MetricType: diploma.Instant, FracDigits: fracDigits, @@ -53,7 +53,7 @@ GetMetric: instantMeasures := GenerateInstantMeasures(62, 220) - err = conn.AppendMeasures(client.AppendMeasuresReq{ + err = conn.AppendMeasures(proto.AppendMeasuresReq{ MetricID: instantMetricID, Measures: instantMeasures, }) @@ -103,9 +103,17 @@ GetMetric: since = until.Add(-24 * time.Hour) instantPeriods, err := conn.ListInstantPeriods(proto.ListInstantPeriodsReq{ - MetricID: instantMetricID, - Since: uint32(since.Unix()), - Until: uint32(until.Unix()), + MetricID: instantMetricID, + Since: proto.TimeBound{ + Year: since.Year(), + Month: since.Month(), + Day: since.Day(), + }, + Until: proto.TimeBound{ + Year: until.Year(), + Month: until.Month(), + Day: until.Day(), + }, GroupBy: diploma.GroupByHour, AggregateFuncs: diploma.AggregateMin | diploma.AggregateMax | diploma.AggregateAvg, }) @@ -124,9 +132,17 @@ GetMetric: since = until.AddDate(0, 0, -7) instantPeriods, err = conn.ListInstantPeriods(proto.ListInstantPeriodsReq{ - MetricID: instantMetricID, - Since: uint32(since.Unix()), - Until: uint32(until.Unix()), + MetricID: instantMetricID, + Since: proto.TimeBound{ + Year: since.Year(), + Month: since.Month(), + Day: since.Day(), + }, + Until: proto.TimeBound{ + Year: until.Year(), + Month: until.Month(), + Day: until.Day(), + }, GroupBy: diploma.GroupByDay, AggregateFuncs: diploma.AggregateMin | diploma.AggregateMax | diploma.AggregateAvg, }) @@ -145,9 +161,17 @@ GetMetric: since = until.AddDate(0, 0, -62) instantPeriods, err = conn.ListInstantPeriods(proto.ListInstantPeriodsReq{ - MetricID: instantMetricID, - Since: uint32(since.Unix()), - Until: uint32(until.Unix()), + MetricID: instantMetricID, + Since: proto.TimeBound{ + Year: since.Year(), + Month: since.Month(), + Day: since.Day(), + }, + Until: proto.TimeBound{ + Year: until.Year(), + Month: until.Month(), + Day: until.Day(), + }, GroupBy: diploma.GroupByMonth, AggregateFuncs: diploma.AggregateMin | diploma.AggregateMax | diploma.AggregateAvg, }) @@ -182,7 +206,7 @@ GetMetric: // ADD CUMULATIVE METRIC - err = conn.AddMetric(client.Metric{ + err = conn.AddMetric(proto.AddMetricReq{ MetricID: cumulativeMetricID, MetricType: diploma.Cumulative, FracDigits: fracDigits, @@ -212,7 +236,7 @@ GetMetric: cumulativeMeasures := GenerateCumulativeMeasures(62) - err = conn.AppendMeasures(client.AppendMeasuresReq{ + err = conn.AppendMeasures(proto.AppendMeasuresReq{ MetricID: cumulativeMetricID, Measures: cumulativeMeasures, }) @@ -264,9 +288,17 @@ GetMetric: cumulativePeriods, err := conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{ MetricID: cumulativeMetricID, - Since: uint32(since.Unix()), - Until: uint32(until.Unix()), - GroupBy: diploma.GroupByHour, + Since: proto.TimeBound{ + Year: since.Year(), + Month: since.Month(), + Day: since.Day(), + }, + Until: proto.TimeBound{ + Year: until.Year(), + Month: until.Month(), + Day: until.Day(), + }, + GroupBy: diploma.GroupByHour, }) if err != nil { log.Fatalf("conn.ListCumulativePeriods: %s\n", err) @@ -284,9 +316,17 @@ GetMetric: cumulativePeriods, err = conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{ MetricID: cumulativeMetricID, - Since: uint32(since.Unix()), - Until: uint32(until.Unix()), - GroupBy: diploma.GroupByDay, + Since: proto.TimeBound{ + Year: since.Year(), + Month: since.Month(), + Day: since.Day(), + }, + Until: proto.TimeBound{ + Year: until.Year(), + Month: until.Month(), + Day: until.Day(), + }, + GroupBy: diploma.GroupByDay, }) if err != nil { log.Fatalf("conn.ListCumulativePeriods: %s\n", err) @@ -304,9 +344,17 @@ GetMetric: cumulativePeriods, err = conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{ MetricID: cumulativeMetricID, - Since: uint32(since.Unix()), - Until: uint32(until.Unix()), - GroupBy: diploma.GroupByMonth, + Since: proto.TimeBound{ + Year: since.Year(), + Month: since.Month(), + Day: since.Day(), + }, + Until: proto.TimeBound{ + Year: until.Year(), + Month: until.Month(), + Day: until.Day(), + }, + GroupBy: diploma.GroupByMonth, }) if err != nil { log.Fatalf("conn.ListCumulativePeriods: %s\n", err) diff --git a/proto/proto.go b/proto/proto.go index 1a44cfb..a30af18 100644 --- a/proto/proto.go +++ b/proto/proto.go @@ -2,24 +2,26 @@ package proto import ( "fmt" + "time" - octopus "gordenko.dev/dima/diploma" + "gordenko.dev/dima/diploma" "gordenko.dev/dima/diploma/bin" "gordenko.dev/dima/diploma/bufreader" ) const ( - TypeDeleteMeasures byte = 1 - TypeListCurrentValues byte = 2 - TypeListInstantMeasures byte = 3 - TypeListCumulativeMeasures byte = 33 - TypeListInstantPeriods byte = 4 - TypeListCumulativePeriods byte = 44 - TypeGetMetric byte = 5 - TypeAddMetric byte = 6 + TypeDeleteMeasures byte = 1 + TypeListCurrentValues byte = 2 + TypeListInstantMeasures byte = 3 + TypeListCumulativeMeasures byte = 33 + + TypeListInstantPeriods byte = 4 + TypeListCumulativePeriods byte = 44 + TypeGetMetric byte = 5 + TypeAddMetric byte = 6 + TypeListAllInstantMeasures byte = 8 TypeListAllCumulativeMeasures byte = 88 - TypeRangeTotal byte = 9 TypeAppendMeasure byte = 10 TypeAppendMeasures byte = 11 TypeDeleteMetric byte = 12 @@ -66,164 +68,144 @@ func ErrorCodeToText(code uint16) string { } } -type GetMetricReq struct { - MetricID uint32 -} - -type ListCurrentValuesReq struct { - MetricIDs []uint32 -} - -type AddMetricReq struct { - MetricID uint32 - MetricType octopus.MetricType - FracDigits int -} +// common -type UpdateMetricReq struct { +type Metric struct { MetricID uint32 - MetricType octopus.MetricType + MetricType diploma.MetricType FracDigits int } -type DeleteMetricReq struct { - MetricID uint32 +type AppendError struct { + MetricID uint32 + ErrorCode uint16 } -type DeleteMeasuresReq struct { - MetricID uint32 - Since uint32 // timestamp (optional) +type TimeBound struct { + Year int + Month time.Month + Day int } -type AppendMeasureReq struct { - MetricID uint32 +type CumulativeMeasure struct { Timestamp uint32 Value float64 + Total float64 } -type ListAllInstantMetricMeasuresReq struct { - MetricID uint32 +type CumulativePeriod struct { + Period uint32 + Since uint32 + Until uint32 + EndValue float64 + Total float64 } -type ListAllCumulativeMeasuresReq struct { - MetricID uint32 +type InstantMeasure struct { + Timestamp uint32 + Value float64 } -type ListInstantMeasuresReq struct { - MetricID uint32 - Since uint32 - Until uint32 - FirstHourOfDay int +type InstantPeriod struct { + Period uint32 + Since uint32 + Until uint32 + Min float64 + Max float64 + Avg float64 } -type ListCumulativeMeasuresReq struct { - MetricID uint32 - Since uint32 - Until uint32 - FirstHourOfDay int +type CurrentValue struct { + MetricID uint32 + Timestamp uint32 + Value float64 } -type ListInstantPeriodsReq struct { - MetricID uint32 - Since uint32 - Until uint32 - GroupBy octopus.GroupBy - AggregateFuncs byte - FirstHourOfDay int - LastDayOfMonth int -} +// API reqs -type ListCumulativePeriodsReq struct { - MetricID uint32 - Since uint32 - Until uint32 - GroupBy octopus.GroupBy - FirstHourOfDay int - LastDayOfMonth int +type GetMetricReq struct { + MetricID uint32 } -type Metric struct { - MetricID uint32 - MetricType octopus.MetricType - FracDigits int +func ReadGetMetricReq(r *bufreader.BufferedReader) (m GetMetricReq, err error) { + m.MetricID, err = bin.ReadUint32(r) + if err != nil { + err = fmt.Errorf("read req: %s", err) + return + } + return } -type RangeTotalReq struct { - MetricID uint32 - Since uint32 - Until uint32 +type ListCurrentValuesReq struct { + MetricIDs []uint32 } -func PackAddMetricReq(req AddMetricReq) []byte { - arr := []byte{ - TypeAddMetric, - 0, 0, 0, 0, // - byte(req.MetricType), - byte(req.FracDigits), +func ReadListCurrentValuesReq(r *bufreader.BufferedReader) (m ListCurrentValuesReq, err error) { + qty, err := bin.ReadUint16(r) + if err != nil { + err = fmt.Errorf("read req: %s", err) + return } - bin.PutUint32(arr[1:], req.MetricID) - return arr -} -func PackDeleteMetricReq(req DeleteMetricReq) []byte { - arr := []byte{ - TypeDeleteMetric, - 0, 0, 0, 0, // metricID + for i := range int(qty) { + var metricID uint32 + metricID, err = bin.ReadUint32(r) + if err != nil { + err = fmt.Errorf("read metricID (#%d): %s", i, err) + return + } + m.MetricIDs = append(m.MetricIDs, metricID) } - bin.PutUint32(arr[1:], req.MetricID) - return arr + return } -func PackAppendMeasure(req AppendMeasureReq) []byte { - arr := []byte{ - TypeAppendMeasure, - 0, 0, 0, 0, // metricID - 0, 0, 0, 0, // timestamp - 0, 0, 0, 0, 0, 0, 0, 0, // value - } - bin.PutUint32(arr[1:], req.MetricID) - bin.PutUint32(arr[5:], uint32(req.Timestamp)) - bin.PutFloat64(arr[9:], req.Value) - return arr +type AddMetricReq struct { + MetricID uint32 + MetricType diploma.MetricType + FracDigits int } -func PackDeleteMeasuresReq(req DeleteMeasuresReq) []byte { - arr := []byte{ - TypeDeleteMeasures, - 0, 0, 0, 0, // metricID - 0, 0, 0, 0, // since +func ReadAddMetricReq(r *bufreader.BufferedReader) (m AddMetricReq, err error) { + arr, err := r.ReadN(6) + if err != nil { + err = fmt.Errorf("read req: %s", err) + return } - bin.PutUint32(arr[1:], req.MetricID) - bin.PutUint32(arr[5:], uint32(req.Since)) - return arr + return UnpackAddMetricReq(arr), nil } -// UNPACK reqs - func UnpackAddMetricReq(arr []byte) (m AddMetricReq) { m.MetricID = bin.GetUint32(arr) - m.MetricType = octopus.MetricType(arr[4]) + m.MetricType = diploma.MetricType(arr[4]) m.FracDigits = int(arr[5]) return } -func UnpackUpdateMetricReq(arr []byte) (m UpdateMetricReq) { - m.MetricID = bin.GetUint32(arr) - m.MetricType = octopus.MetricType(arr[4]) - m.FracDigits = int(arr[5]) - return +type DeleteMetricReq struct { + MetricID uint32 } -func UnpackDeleteMetricReq(arr []byte) (m DeleteMetricReq) { - m.MetricID = bin.GetUint32(arr) +func ReadDeleteMetricReq(r *bufreader.BufferedReader) (m DeleteMetricReq, err error) { + m.MetricID, err = bin.ReadUint32(r) + if err != nil { + err = fmt.Errorf("read req: %s", err) + return + } return } -func UnpackAppendMeasureReq(arr []byte) (m AppendMeasureReq) { - m.MetricID = bin.GetUint32(arr) - m.Timestamp = bin.GetUint32(arr[4:]) - m.Value = bin.GetFloat64(arr[8:]) - return +type DeleteMeasuresReq struct { + MetricID uint32 + Since uint32 // timestamp (optional) +} + +func ReadDeleteMeasuresReq(r *bufreader.BufferedReader) (m DeleteMeasuresReq, err error) { + arr, err := r.ReadN(8) + if err != nil { + err = fmt.Errorf("read req: %s", err) + return + } + return UnpackDeleteMeasuresReq(arr), nil } func UnpackDeleteMeasuresReq(arr []byte) (m DeleteMeasuresReq) { @@ -232,104 +214,93 @@ func UnpackDeleteMeasuresReq(arr []byte) (m DeleteMeasuresReq) { return } -func UnpackListInstantMeasuresReq(arr []byte) (m ListInstantMeasuresReq) { - m.MetricID = bin.GetUint32(arr[0:]) - m.Since = bin.GetUint32(arr[4:]) - m.Until = bin.GetUint32(arr[8:]) - m.FirstHourOfDay = int(arr[12]) - return +type AppendMeasureReq struct { + MetricID uint32 + Timestamp uint32 + Value float64 } -func UnpackListCumulativeMeasuresReq(arr []byte) (m ListCumulativeMeasuresReq) { - m.MetricID = bin.GetUint32(arr) - m.Since = bin.GetUint32(arr[4:]) - m.Until = bin.GetUint32(arr[8:]) - m.FirstHourOfDay = int(arr[12]) - return +func ReadAppendMeasureReq(r *bufreader.BufferedReader) (m AppendMeasureReq, err error) { + arr, err := r.ReadN(16) + if err != nil { + err = fmt.Errorf("read req: %s", err) + return + } + return UnpackAppendMeasureReq(arr), nil } -func UnpackListInstantPeriodsReq(arr []byte) (m ListInstantPeriodsReq) { +func UnpackAppendMeasureReq(arr []byte) (m AppendMeasureReq) { m.MetricID = bin.GetUint32(arr) - m.Since = bin.GetUint32(arr[4:]) - m.Until = bin.GetUint32(arr[8:]) - m.GroupBy = octopus.GroupBy(arr[12]) - m.AggregateFuncs = arr[13] - m.FirstHourOfDay = int(arr[14]) - m.LastDayOfMonth = int(arr[15]) + m.Timestamp = bin.GetUint32(arr[4:]) + m.Value = bin.GetFloat64(arr[8:]) return } -func UnpackListCumulativePeriodsReq(arr []byte) (m ListCumulativePeriodsReq) { - m.MetricID = bin.GetUint32(arr[0:]) - m.Since = bin.GetUint32(arr[4:]) - m.Until = bin.GetUint32(arr[8:]) - m.GroupBy = octopus.GroupBy(arr[12]) - m.FirstHourOfDay = int(arr[13]) - m.LastDayOfMonth = int(arr[14]) - return +type AppendMeasuresReq struct { + MetricID uint32 + Measures []Measure } -func UnpackRangeTotalReq(arr []byte) (m RangeTotalReq) { - m.MetricID = bin.GetUint32(arr) - m.Since = bin.GetUint32(arr[4:]) - m.Until = bin.GetUint32(arr[8:]) - return +type Measure struct { + Timestamp uint32 + Value float64 } -// READ reqs - -func ReadGetMetricReq(r *bufreader.BufferedReader) (m GetMetricReq, err error) { - m.MetricID, err = bin.ReadUint32(r) +func ReadAppendMeasuresReq(r *bufreader.BufferedReader) (m AppendMeasuresReq, err error) { + prefix, err := bin.ReadN(r, 6) // metricID + measures qty if err != nil { - err = fmt.Errorf("read req: %s", err) + err = fmt.Errorf("read prefix: %s", err) return } - return -} -func ReadAddMetricReq(r *bufreader.BufferedReader) (m AddMetricReq, err error) { - arr, err := r.ReadN(6) - if err != nil { - err = fmt.Errorf("read req: %s", err) - return - } - return UnpackAddMetricReq(arr), nil -} + m.MetricID = bin.GetUint32(prefix[0:]) + qty := bin.GetUint16(prefix[4:]) -func ReadUpdateMetricReq(r *bufreader.BufferedReader) (m UpdateMetricReq, err error) { - arr, err := r.ReadN(6) - if err != nil { - err = fmt.Errorf("read req: %s", err) - return + for i := range int(qty) { + var measure Measure + measure.Timestamp, err = bin.ReadUint32(r) + if err != nil { + err = fmt.Errorf("read timestamp (#%d): %s", i, err) + return + } + measure.Value, err = bin.ReadFloat64(r) + if err != nil { + err = fmt.Errorf("read value (#%d): %s", i, err) + return + } + m.Measures = append(m.Measures, measure) } - return UnpackUpdateMetricReq(arr), nil + return } -func ReadDeleteMetricReq(r *bufreader.BufferedReader) (m DeleteMetricReq, err error) { - m.MetricID, err = bin.ReadUint32(r) - if err != nil { - err = fmt.Errorf("read req: %s", err) - return - } - return +type MetricMeasure struct { + MetricID uint32 + Timestamp uint32 + Value float64 } -func ReadAppendMeasureReq(r *bufreader.BufferedReader) (m AppendMeasureReq, err error) { - arr, err := r.ReadN(16) +func ReadAppendMeasurePerMetricReq(r *bufreader.BufferedReader) (measures []MetricMeasure, err error) { + qty, err := bin.ReadUint16(r) if err != nil { - err = fmt.Errorf("read req: %s", err) return } - return UnpackAppendMeasureReq(arr), nil + var tmp = make([]byte, 16) + for range int(qty) { + err = bin.ReadNInto(r, tmp) + if err != nil { + return + } + measures = append(measures, MetricMeasure{ + MetricID: bin.GetUint32(tmp[0:]), + Timestamp: bin.GetUint32(tmp[4:]), + Value: bin.GetFloat64(tmp[8:]), + }) + } + return } -func ReadDeleteMeasuresReq(r *bufreader.BufferedReader) (m DeleteMeasuresReq, err error) { - arr, err := r.ReadN(8) - if err != nil { - err = fmt.Errorf("read req: %s", err) - return - } - return UnpackDeleteMeasuresReq(arr), nil +type ListAllInstantMetricMeasuresReq struct { + MetricID uint32 } func ReadListAllInstantMeasuresReq(r *bufreader.BufferedReader) (m ListAllInstantMetricMeasuresReq, err error) { @@ -341,6 +312,10 @@ func ReadListAllInstantMeasuresReq(r *bufreader.BufferedReader) (m ListAllInstan return } +type ListAllCumulativeMeasuresReq struct { + MetricID uint32 +} + func ReadListAllCumulativeMeasuresReq(r *bufreader.BufferedReader) (m ListAllCumulativeMeasuresReq, err error) { m.MetricID, err = bin.ReadUint32(r) if err != nil { @@ -350,8 +325,14 @@ func ReadListAllCumulativeMeasuresReq(r *bufreader.BufferedReader) (m ListAllCum return } +type ListInstantMeasuresReq struct { + MetricID uint32 + Since uint32 + Until uint32 +} + func ReadListInstantMeasuresReq(r *bufreader.BufferedReader) (m ListInstantMeasuresReq, err error) { - arr, err := r.ReadN(13) + arr, err := r.ReadN(12) if err != nil { err = fmt.Errorf("read req: %s", err) return @@ -359,8 +340,21 @@ func ReadListInstantMeasuresReq(r *bufreader.BufferedReader) (m ListInstantMeasu return UnpackListInstantMeasuresReq(arr), nil } +func UnpackListInstantMeasuresReq(arr []byte) (m ListInstantMeasuresReq) { + m.MetricID = bin.GetUint32(arr[0:]) + m.Since = bin.GetUint32(arr[4:]) + m.Until = bin.GetUint32(arr[8:]) + return +} + +type ListCumulativeMeasuresReq struct { + MetricID uint32 + Since uint32 + Until uint32 +} + func ReadListCumulativeMeasuresReq(r *bufreader.BufferedReader) (m ListCumulativeMeasuresReq, err error) { - arr, err := r.ReadN(13) + arr, err := r.ReadN(12) if err != nil { err = fmt.Errorf("read req: %s", err) return @@ -368,106 +362,79 @@ func ReadListCumulativeMeasuresReq(r *bufreader.BufferedReader) (m ListCumulativ return UnpackListCumulativeMeasuresReq(arr), nil } -func ReadListInstantPeriodsReq(r *bufreader.BufferedReader) (m ListInstantPeriodsReq, err error) { - arr, err := r.ReadN(16) - if err != nil { - err = fmt.Errorf("read req: %s", err) - return - } - return UnpackListInstantPeriodsReq(arr), nil +func UnpackListCumulativeMeasuresReq(arr []byte) (m ListCumulativeMeasuresReq) { + m.MetricID = bin.GetUint32(arr) + m.Since = bin.GetUint32(arr[4:]) + m.Until = bin.GetUint32(arr[8:]) + return } -func ReadListCumulativePeriodsReq(r *bufreader.BufferedReader) (m ListCumulativePeriodsReq, err error) { - arr, err := r.ReadN(15) - if err != nil { - err = fmt.Errorf("read req: %s", err) - return - } - return UnpackListCumulativePeriodsReq(arr), nil +type ListInstantPeriodsReq struct { + MetricID uint32 + Since TimeBound + Until TimeBound + GroupBy diploma.GroupBy + AggregateFuncs byte + FirstHourOfDay int } -func ReadRangeTotalReq(r *bufreader.BufferedReader) (m RangeTotalReq, err error) { - arr, err := r.ReadN(12) +func ReadListInstantPeriodsReq(r *bufreader.BufferedReader) (m ListInstantPeriodsReq, err error) { + arr, err := r.ReadN(15) if err != nil { err = fmt.Errorf("read req: %s", err) return } - return UnpackRangeTotalReq(arr), nil + return UnpackListInstantPeriodsReq(arr), nil } -func ReadListCurrentValuesReq(r *bufreader.BufferedReader) (m ListCurrentValuesReq, err error) { - qty, err := bin.ReadUint16(r) - if err != nil { - err = fmt.Errorf("read req: %s", err) - return +func UnpackListInstantPeriodsReq(arr []byte) (m ListInstantPeriodsReq) { + m.MetricID = bin.GetUint32(arr) + m.Since = TimeBound{ + Year: int(bin.GetUint16(arr[4:])), + Month: time.Month(arr[6]), + Day: int(arr[7]), } - - for i := range int(qty) { - var metricID uint32 - metricID, err = bin.ReadUint32(r) - if err != nil { - err = fmt.Errorf("read metricID (#%d): %s", i, err) - return - } - m.MetricIDs = append(m.MetricIDs, metricID) + m.Until = TimeBound{ + Year: int(bin.GetUint16(arr[8:])), + Month: time.Month(arr[10]), + Day: int(arr[11]), } + m.GroupBy = diploma.GroupBy(arr[12]) + m.AggregateFuncs = arr[13] + m.FirstHourOfDay = int(arr[14]) return } -type AppendMeasuresReq struct { - MetricID uint32 - Measures []Measure -} - -type Measure struct { - Timestamp uint32 - Value float64 -} - -func PackAppendMeasures(req AppendMeasuresReq) []byte { - if len(req.Measures) > 65535 { - panic(fmt.Errorf("wrong measures qty: %d", len(req.Measures))) - } - var ( - prefixSize = 7 - recordSize = 12 - arr = make([]byte, prefixSize+len(req.Measures)*recordSize) - ) - arr[0] = TypeAppendMeasures - bin.PutUint32(arr[1:], req.MetricID) - bin.PutUint16(arr[5:], uint16(len(req.Measures))) - pos := prefixSize - for _, measure := range req.Measures { - bin.PutUint32(arr[pos:], measure.Timestamp) - bin.PutFloat64(arr[pos+4:], measure.Value) - pos += recordSize - } - return arr +type ListCumulativePeriodsReq struct { + MetricID uint32 + Since TimeBound + Until TimeBound + GroupBy diploma.GroupBy + FirstHourOfDay int } -func ReadAppendMeasuresReq(r *bufreader.BufferedReader) (m AppendMeasuresReq, err error) { - prefix, err := bin.ReadN(r, 6) // metricID + measures qty +func ReadListCumulativePeriodsReq(r *bufreader.BufferedReader) (m ListCumulativePeriodsReq, err error) { + arr, err := r.ReadN(14) if err != nil { - err = fmt.Errorf("read prefix: %s", err) + err = fmt.Errorf("read req: %s", err) return } + return UnpackListCumulativePeriodsReq(arr), nil +} - m.MetricID = bin.GetUint32(prefix[0:]) - qty := bin.GetUint16(prefix[4:]) - - for i := range int(qty) { - var measure Measure - measure.Timestamp, err = bin.ReadUint32(r) - if err != nil { - err = fmt.Errorf("read timestamp (#%d): %s", i, err) - return - } - measure.Value, err = bin.ReadFloat64(r) - if err != nil { - err = fmt.Errorf("read value (#%d): %s", i, err) - return - } - m.Measures = append(m.Measures, measure) +func UnpackListCumulativePeriodsReq(arr []byte) (m ListCumulativePeriodsReq) { + m.MetricID = bin.GetUint32(arr[0:]) + m.Since = TimeBound{ + Year: int(bin.GetUint16(arr[4:])), + Month: time.Month(arr[6]), + Day: int(arr[7]), } + m.Until = TimeBound{ + Year: int(bin.GetUint16(arr[8:])), + Month: time.Month(arr[10]), + Day: int(arr[11]), + } + m.GroupBy = diploma.GroupBy(arr[12]) + m.FirstHourOfDay = int(arr[13]) return } diff --git a/transform/aggregate.go b/transform/aggregate.go new file mode 100644 index 0000000..c1adc30 --- /dev/null +++ b/transform/aggregate.go @@ -0,0 +1,408 @@ +package transform + +import ( + "errors" + "fmt" + "io" + "time" + + "gordenko.dev/dima/diploma" + "gordenko.dev/dima/diploma/bin" + "gordenko.dev/dima/diploma/timeutil" +) + +// INSTANT + +type InstantPeriodsWriterOptions struct { + Dst io.Writer + GroupBy diploma.GroupBy + AggregateFuncs byte + FirstHourOfDay int +} + +type InstantPeriodsWriter struct { + aggregateFuncs byte + arr []byte + responder *ChunkedResponder + groupBy diploma.GroupBy + firstHourOfDay int + time2period func(uint32) time.Time + currentPeriod time.Time + lastTimestamp uint32 + endTimestamp uint32 // время показания на конец периода + min float64 + max float64 + total float64 + entries int +} + +func NewInstantPeriodsWriter(opt InstantPeriodsWriterOptions) (*InstantPeriodsWriter, error) { + if opt.Dst == nil { + return nil, errors.New("Dst option is required") + } + if opt.FirstHourOfDay < 0 || opt.FirstHourOfDay > 23 { + return nil, fmt.Errorf("wrong FirstHourOfDay option: %d", opt.FirstHourOfDay) + } + // Считаю q, чтобы заранее выделить массив для упаковки периодов + var q int + if (opt.AggregateFuncs & diploma.AggregateMin) == diploma.AggregateMin { + q++ + } + if (opt.AggregateFuncs & diploma.AggregateMax) == diploma.AggregateMax { + q++ + } + if (opt.AggregateFuncs & diploma.AggregateAvg) == diploma.AggregateAvg { + q++ + } + + if q == 0 { + return nil, errors.New("AggregateFuncs option is required") + } + + // 12 - это period, since, until + // 8 - это размер float64 + s := &InstantPeriodsWriter{ + aggregateFuncs: opt.AggregateFuncs, + arr: make([]byte, 12+q*8), + responder: NewChunkedResponder(opt.Dst), + groupBy: opt.GroupBy, + firstHourOfDay: opt.FirstHourOfDay, + } + + switch opt.GroupBy { + case diploma.GroupByHour: + s.time2period = groupByHour + + case diploma.GroupByDay: + if s.firstHourOfDay > 0 { + s.time2period = s.groupByDayUsingFHD + } else { + s.time2period = groupByDay + } + + case diploma.GroupByMonth: + if s.firstHourOfDay > 0 { + s.time2period = s.groupByMonthUsingFHD + } else { + s.time2period = groupByMonth + } + + default: + return nil, fmt.Errorf("unknown groupBy %d option", opt.GroupBy) + } + return s, nil +} + +func (s *InstantPeriodsWriter) groupByDayUsingFHD(timestamp uint32) time.Time { + tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "d") + if tm.Hour() < s.firstHourOfDay { + tm = tm.AddDate(0, 0, -1) + } + return tm +} + +func (s *InstantPeriodsWriter) groupByMonthUsingFHD(timestamp uint32) time.Time { + tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m") + if tm.Hour() < s.firstHourOfDay { + tm = tm.AddDate(0, 0, -1) + } + return tm +} + +func (s *InstantPeriodsWriter) Feed(timestamp uint32, value float64) { + s.feed(timestamp, value, false) +} + +func (s *InstantPeriodsWriter) FeedNoSend(timestamp uint32, value float64) { + s.feed(timestamp, value, true) +} + +func (s *InstantPeriodsWriter) feed(timestamp uint32, value float64, isBuffer bool) { + if s.entries > 0 { + period := s.time2period(timestamp) + if period != s.currentPeriod { + // закрываю период + // готовый период + s.packPeriod(timestamp) + if isBuffer { + s.responder.BufferRecord(s.arr) + } else { + s.responder.AppendRecord(s.arr) + } + // затем + s.decrementPeriod() + //fmt.Println(" period: ", period.Format("2006-01-02 15:04:05")) + //fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05")) + for period.Before(s.currentPeriod) { + // вставляю пустышку + s.packBlankPeriod() + if isBuffer { + s.responder.BufferRecord(s.arr) + } else { + s.responder.AppendRecord(s.arr) + } + s.decrementPeriod() + //fmt.Println(" period: ", period.Format("2006-01-02 15:04:05")) + //fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05")) + //return + } + s.endTimestamp = timestamp + s.min = value + s.max = value + s.total = value + s.entries = 1 + } else { + if value < s.min { + s.min = value + } else if value > s.max { + s.max = value + } + // для подсчета AVG + s.total += value + s.entries++ + } + } else { + s.endTimestamp = timestamp + s.min = value + s.max = value + s.total = value + s.entries = 1 + s.currentPeriod = s.time2period(timestamp) + } + s.lastTimestamp = timestamp +} + +func (s *InstantPeriodsWriter) decrementPeriod() { + switch s.groupBy { + case diploma.GroupByHour: + s.currentPeriod = s.currentPeriod.Add(-1 * time.Hour) + //fmt.Println("decrement") + case diploma.GroupByDay: + s.currentPeriod = s.currentPeriod.AddDate(0, 0, -1) + case diploma.GroupByMonth: + s.currentPeriod = s.currentPeriod.AddDate(0, -1, 0) + } +} + +func (s *InstantPeriodsWriter) packBlankPeriod() { + //period := s.currentPeriod.Format("2006-01-02 15:04:05") + //since := "0" + //until := "0" + //fmt.Printf("%s: %s - %s, %.0f - %.0f\n", period, since, until, 0.0, 0.0) + // until - это endTimestamp всегда + bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix())) + for i := 4; i < len(s.arr); i++ { + s.arr[i] = 0 + } +} + +func (s *InstantPeriodsWriter) Close() (err error) { + if s.entries > 0 { + s.packPeriod(s.lastTimestamp) + s.responder.AppendRecord(s.arr) + } + return s.responder.Flush() +} + +func (s *InstantPeriodsWriter) packPeriod(timestamp uint32) { + bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix())) + bin.PutUint32(s.arr[4:], timestamp) + bin.PutUint32(s.arr[8:], s.endTimestamp) + + pos := 12 + if (s.aggregateFuncs & diploma.AggregateMin) == diploma.AggregateMin { + bin.PutFloat64(s.arr[pos:], s.min) + pos += 8 + } + if (s.aggregateFuncs & diploma.AggregateMax) == diploma.AggregateMax { + bin.PutFloat64(s.arr[pos:], s.max) + pos += 8 + } + if (s.aggregateFuncs & diploma.AggregateAvg) == diploma.AggregateAvg { + bin.PutFloat64(s.arr[pos:], s.total/float64(s.entries)) + } +} + +/* +Идея с разбивкой на периоды: +Для каждого периода нахожу одно последнее значение. +Начало периода - это конец предыдущего. Если предыдущий не строго предыдущий, +а с пропусками - на место пропусков вставляю пустышки. +Плюс такого решения - я всегда показываю реальное значение на конец периода. +*/ + +type CumulativePeriodsWriter struct { + arr []byte + responder *ChunkedResponder + firstHourOfDay int + currentPeriod time.Time + groupBy diploma.GroupBy + time2period func(uint32) time.Time + endTimestamp uint32 + endValue float64 + lastTimestamp uint32 + lastValue float64 +} + +type CumulativePeriodsWriterOptions struct { + Dst io.Writer + GroupBy diploma.GroupBy + FirstHourOfDay int +} + +func NewCumulativePeriodsWriter(opt CumulativePeriodsWriterOptions) (*CumulativePeriodsWriter, error) { + if opt.Dst == nil { + return nil, errors.New("Dst option is required") + } + // Считаю q, чтобы заранее выделить массив для упаковки периодов + if opt.FirstHourOfDay < 0 || opt.FirstHourOfDay > 23 { + return nil, fmt.Errorf("wrong firstHourOfDay option: %d", opt.FirstHourOfDay) + } + + s := &CumulativePeriodsWriter{ + arr: make([]byte, 28), + responder: NewChunkedResponder(opt.Dst), + firstHourOfDay: opt.FirstHourOfDay, + groupBy: opt.GroupBy, + } + + s.time2period = func(timestamp uint32) time.Time { + return timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "h") + } + + switch opt.GroupBy { + case diploma.GroupByHour: + s.time2period = groupByHour + + case diploma.GroupByDay: + if s.firstHourOfDay > 0 { + s.time2period = s.groupByDayUsingFHD + } else { + s.time2period = groupByDay + } + + case diploma.GroupByMonth: + if s.firstHourOfDay > 0 { + s.time2period = s.groupByMonthUsingFHD + } else { + s.time2period = groupByMonth + } + + default: + return nil, fmt.Errorf("unknown groupBy %d option", opt.GroupBy) + } + return s, nil +} + +func (s *CumulativePeriodsWriter) groupByDayUsingFHD(timestamp uint32) time.Time { + tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "d") + if tm.Hour() < s.firstHourOfDay { + tm = tm.AddDate(0, 0, -1) + } + return tm +} + +func (s *CumulativePeriodsWriter) groupByMonthUsingFHD(timestamp uint32) time.Time { + tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m") + if tm.Hour() < s.firstHourOfDay { + tm = tm.AddDate(0, 0, -1) + } + return tm +} + +func (s *CumulativePeriodsWriter) Feed(timestamp uint32, value float64) { + s.feed(timestamp, value, false) +} + +func (s *CumulativePeriodsWriter) FeedNoSend(timestamp uint32, value float64) { + s.feed(timestamp, value, true) +} + +func (s *CumulativePeriodsWriter) feed(timestamp uint32, value float64, isBuffer bool) { + if s.endTimestamp > 0 { + period := s.time2period(timestamp) + if period != s.currentPeriod { + // закрываю период + s.packPeriod(timestamp, value) + if isBuffer { + s.responder.BufferRecord(s.arr) + } else { + s.responder.AppendRecord(s.arr) + } + // затем + s.decrementPeriod() + //fmt.Println(" period: ", period.Format("2006-01-02 15:04:05")) + //fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05")) + for period.Before(s.currentPeriod) { + // вставляю пустышку + s.packBlankPeriod() + if isBuffer { + s.responder.BufferRecord(s.arr) + } else { + s.responder.AppendRecord(s.arr) + } + s.decrementPeriod() + //fmt.Println(" period: ", period.Format("2006-01-02 15:04:05")) + //fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05")) + //return + } + s.endTimestamp = timestamp + s.endValue = value + } + } else { + s.endTimestamp = timestamp + s.endValue = value + s.currentPeriod = s.time2period(timestamp) + } + s.lastTimestamp = timestamp + s.lastValue = value +} + +func (s *CumulativePeriodsWriter) decrementPeriod() { + switch s.groupBy { + case diploma.GroupByHour: + s.currentPeriod = s.currentPeriod.Add(-1 * time.Hour) + //fmt.Println("decrement") + case diploma.GroupByDay: + s.currentPeriod = s.currentPeriod.AddDate(0, 0, -1) + case diploma.GroupByMonth: + s.currentPeriod = s.currentPeriod.AddDate(0, -1, 0) + } +} + +func (s *CumulativePeriodsWriter) packBlankPeriod() { + //period := s.currentPeriod.Format("2006-01-02 15:04:05") + //since := "0" + //until := "0" + //fmt.Printf("%s: %s - %s, %.0f - %.0f\n", period, since, until, 0.0, 0.0) + // until - это endTimestamp всегда + bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix())) + for i := 4; i < len(s.arr); i++ { + s.arr[i] = 0 + } +} + +func (s *CumulativePeriodsWriter) packPeriod(start uint32, startValue float64) { + //period := s.currentPeriod.Format("2006-01-02 15:04:05") + //since := time.Unix(int64(start), 0).Format("2006-01-02 15:04:05") + //until := time.Unix(int64(s.endTimestamp), 0).Format("2006-01-02 15:04:05") + //fmt.Printf("%s: %s - %s, %.0f - %.0f\n", period, since, until, startValue, s.endValue) + // until - это endTimestamp всегда + bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix())) + bin.PutUint32(s.arr[4:], start) + bin.PutUint32(s.arr[8:], s.endTimestamp) + bin.PutFloat64(s.arr[12:], startValue) + bin.PutFloat64(s.arr[20:], s.endValue) +} + +func (s *CumulativePeriodsWriter) Close() error { + if s.endTimestamp > 0 { + if s.lastTimestamp != s.endTimestamp { + s.packPeriod(s.lastTimestamp, s.lastValue) + } else { + s.packPeriod(s.endTimestamp, s.endValue) + } + s.responder.AppendRecord(s.arr) + } + return s.responder.Flush() +} diff --git a/transform/raw.go b/transform/raw.go new file mode 100644 index 0000000..fa692e6 --- /dev/null +++ b/transform/raw.go @@ -0,0 +1,147 @@ +package transform + +import ( + "io" + + "gordenko.dev/dima/diploma/bin" +) + +// CURRENT VALUE WRITER + +type CurrentValue struct { + MetricID uint32 + Timestamp uint32 + Value float64 +} + +type CurrentValueWriter struct { + arr []byte + responder *ChunkedResponder +} + +func NewCurrentValueWriter(dst io.Writer) *CurrentValueWriter { + return &CurrentValueWriter{ + arr: make([]byte, 16), + responder: NewChunkedResponder(dst), + } +} + +func (s *CurrentValueWriter) BufferValue(m CurrentValue) { + bin.PutUint32(s.arr[0:], m.MetricID) + bin.PutUint32(s.arr[4:], m.Timestamp) + bin.PutFloat64(s.arr[8:], m.Value) + s.responder.BufferRecord(s.arr) +} + +func (s *CurrentValueWriter) Close() error { + return s.responder.Flush() +} + +// INSTANT MEASURE WRITER + +type InstantMeasure struct { + Timestamp uint32 + Value float64 +} + +type InstantMeasureWriter struct { + arr []byte + responder *ChunkedResponder + since uint32 +} + +func NewInstantMeasureWriter(dst io.Writer, since uint32) *InstantMeasureWriter { + // 12 - это timestamp, value + return &InstantMeasureWriter{ + arr: make([]byte, 12), + responder: NewChunkedResponder(dst), + since: since, + } +} + +func (s *InstantMeasureWriter) Feed(timestamp uint32, value float64) { + s.feed(timestamp, value, false) +} + +func (s *InstantMeasureWriter) FeedNoSend(timestamp uint32, value float64) { + s.feed(timestamp, value, true) +} + +func (s *InstantMeasureWriter) feed(timestamp uint32, value float64, isBuffer bool) { + if timestamp < s.since { + return + } + bin.PutUint32(s.arr[0:], timestamp) + bin.PutFloat64(s.arr[4:], value) + if isBuffer { + s.responder.BufferRecord(s.arr) + } else { + s.responder.AppendRecord(s.arr) + } +} + +func (s *InstantMeasureWriter) Close() error { + return s.responder.Flush() +} + +// CUMULATIVE MEASURE WRITER + +type CumulativeMeasure struct { + Timestamp uint32 + Value float64 + Total float64 +} + +type CumulativeMeasureWriter struct { + arr []byte + responder *ChunkedResponder + since uint32 + endTimestamp uint32 + endValue float64 +} + +func NewCumulativeMeasureWriter(dst io.Writer, since uint32) *CumulativeMeasureWriter { + // 20 - это timestamp, value, total + return &CumulativeMeasureWriter{ + arr: make([]byte, 20), + responder: NewChunkedResponder(dst), + since: since, + } +} + +func (s *CumulativeMeasureWriter) Feed(timestamp uint32, value float64) { + s.feed(timestamp, value, false) +} + +func (s *CumulativeMeasureWriter) FeedNoSend(timestamp uint32, value float64) { + s.feed(timestamp, value, true) +} + +func (s *CumulativeMeasureWriter) feed(timestamp uint32, value float64, isBuffer bool) { + if s.endTimestamp > 0 { + s.pack(s.endValue - value) + if isBuffer { + s.responder.BufferRecord(s.arr) + } else { + s.responder.AppendRecord(s.arr) + } + } + s.endTimestamp = timestamp + s.endValue = value +} + +func (s *CumulativeMeasureWriter) pack(total float64) { + bin.PutUint32(s.arr[0:], s.endTimestamp) + bin.PutFloat64(s.arr[4:], s.endValue) + bin.PutFloat64(s.arr[12:], total) +} + +func (s *CumulativeMeasureWriter) Close() error { + if s.endTimestamp >= s.since { + // endTimestamp внутри заданного периода. Других показаний нет, + // поэтому время добавляю, но накопленную сумму ставлю 0. + s.pack(0) + // Если < since - ничего делать не нужно, ибо накопленная сумма уже добавлена + } + return s.responder.Flush() +} diff --git a/transform/responder.go b/transform/responder.go new file mode 100644 index 0000000..da2e19c --- /dev/null +++ b/transform/responder.go @@ -0,0 +1,105 @@ +package transform + +import ( + "bytes" + "fmt" + "io" + + "gordenko.dev/dima/diploma/bin" + "gordenko.dev/dima/diploma/proto" +) + +// CHUNKED RESPONDER + +var endMsg = []byte{ + proto.RespEndOfValue, // end of stream +} + +type ChunkedResponder struct { + recordsQty int + buf *bytes.Buffer + dst io.Writer +} + +func NewChunkedResponder(dst io.Writer) *ChunkedResponder { + s := &ChunkedResponder{ + recordsQty: 0, + buf: bytes.NewBuffer(nil), + dst: dst, + } + + s.buf.Write([]byte{ + proto.RespPartOfValue, // message type + 0, 0, 0, 0, // records qty + }) + return s +} + +func (s *ChunkedResponder) BufferRecord(rec []byte) { + s.buf.Write(rec) + s.recordsQty++ +} + +func (s *ChunkedResponder) AppendRecord(rec []byte) error { + s.buf.Write(rec) + s.recordsQty++ + + if s.buf.Len() < 1500 { + return nil + } + + if err := s.sendBuffered(); err != nil { + return err + } + + s.buf.Write([]byte{ + proto.RespPartOfValue, // message type + 0, 0, 0, 0, // records qty + }) + s.recordsQty = 0 + return nil +} + +func (s *ChunkedResponder) Flush() error { + if s.recordsQty > 0 { + if err := s.sendBuffered(); err != nil { + return err + } + } + + if _, err := s.dst.Write(endMsg); err != nil { + return err + } + //fmt.Printf("sent endMsg %d\n", endMsg) + return nil +} + +func (s *ChunkedResponder) sendBuffered() (err error) { + msg := s.buf.Bytes() + bin.PutUint32(msg[1:], uint32(s.recordsQty)) + //fmt.Printf("put uint16: %d\n", msg[:3]) + + //fmt.Printf("send %d records\n", s.recordsQty) + + //fmt.Printf("send buffered: %d, qty: %d\n", msg, s.recordsQty) + + n, err := s.dst.Write(msg) + if err != nil { + return + } + + if n != len(msg) { + return fmt.Errorf("incomplete write %d bytes instead of %d", n, len(msg)) + } + + s.buf.Reset() + return +} + +// Для Aggregation пишем функцию определения периода и пуляем фактические периоды +// + +// By default net/http.Server uses 4KB buffers, which are flushed to client with chunked responses. +// These buffers may result in visible overhead for responses exceeding a few megabytes. +// So allocate 64Kb buffers. +// bw: bufio.NewWriterSize(w, 64*1024), diff --git a/transform/transform.go b/transform/transform.go new file mode 100644 index 0000000..f19826b --- /dev/null +++ b/transform/transform.go @@ -0,0 +1,19 @@ +package transform + +import ( + "time" + + "gordenko.dev/dima/diploma/timeutil" +) + +func groupByHour(timestamp uint32) time.Time { + return timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "h") +} + +func groupByDay(timestamp uint32) time.Time { + return timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "d") +} + +func groupByMonth(timestamp uint32) time.Time { + return timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m") +}