Compare commits

..

No commits in common. 'c97003bc4f1eeedaf2864e9c5ba1ca0c6c6f73c9' and '5e49c66e15493614f750c4757f7e8b4bc820b826' have entirely different histories.

  1. 325
      atree/aggregate.go
  2. 4
      atree/atree.go
  3. 16
      atree/io.go
  4. 532
      atree/select.go
  5. 306
      atree/writers.go
  6. 191
      client/client.go
  7. 441
      database/api.go
  8. 31
      database/helpers.go
  9. 937
      database/proc.go
  10. BIN
      database_linux
  11. BIN
      database_windows
  12. 14
      diploma.go
  13. 30
      examples/loadtest/loadtest.go
  14. 14
      examples/requests/generate.go
  15. 185
      examples/requests/requests.go
  16. 1
      go.mod
  17. 2
      go.sum
  18. BIN
      loadtest_linux
  19. BIN
      loadtest_windows
  20. 467
      proto/proto.go
  21. BIN
      requests_linux
  22. BIN
      requests_windows
  23. 372
      transform/aggregate.go
  24. 143
      transform/raw.go
  25. 89
      transform/responder.go
  26. 19
      transform/transform.go

@ -0,0 +1,325 @@
package atree
import (
"fmt"
"time"
"gordenko.dev/dima/diploma"
"gordenko.dev/dima/diploma/timeutil"
)
// AGGREGATE
type InstantAggregator struct {
firstHourOfDay int
lastDayOfMonth int
time2period func(uint32) uint32
currentPeriod uint32
since uint32
until uint32
min float64
max float64
total float64
entries int
}
type InstantAggregatorOptions struct {
GroupBy diploma.GroupBy
FirstHourOfDay int
LastDayOfMonth int
}
func NewInstantAggregator(opt InstantAggregatorOptions) (*InstantAggregator, error) {
s := &InstantAggregator{
firstHourOfDay: opt.FirstHourOfDay,
lastDayOfMonth: opt.LastDayOfMonth,
}
switch opt.GroupBy {
case diploma.GroupByHour:
s.time2period = groupByHour
case diploma.GroupByDay:
if s.firstHourOfDay > 0 {
s.time2period = s.groupByDayUsingFHD
} else {
s.time2period = groupByDay
}
case diploma.GroupByMonth:
if s.firstHourOfDay > 0 {
if s.lastDayOfMonth > 0 {
s.time2period = s.groupByMonthUsingFHDAndLDM
} else {
s.time2period = s.groupByMonthUsingFHD
}
} else {
if s.lastDayOfMonth > 0 {
s.time2period = s.groupByMonthUsingLDM
} else {
s.time2period = groupByMonth
}
}
default:
return nil, fmt.Errorf("unknown groupBy %d option", opt.GroupBy)
}
return s, nil
}
// Приходят данные от свежих к старым, тоесть сперва получаю Until.
// return period complete flag
func (s *InstantAggregator) Feed(timestamp uint32, value float64, p *InstantPeriod) bool {
period := s.time2period(timestamp)
//fmt.Printf("feed: %s %v, period: %s\n", time.Unix(int64(timestamp), 0), value, time.Unix(int64(period), 0))
if s.entries == 0 {
s.currentPeriod = period
s.since = timestamp
s.until = timestamp
s.min = value
s.max = value
s.total = value
s.entries = 1
return false
}
if period != s.currentPeriod {
// готовый период
s.FillPeriod(timestamp, p)
s.currentPeriod = period
s.since = timestamp
s.until = timestamp
s.min = value
s.max = value
s.total = value
s.entries = 1
return true
}
if value < s.min {
s.min = value
} else if value > s.max {
s.max = value
}
// для подсчета AVG
s.total += value
s.entries++
// начало периода
s.since = timestamp
return false
}
func (s *InstantAggregator) FillPeriod(prevTimestamp uint32, p *InstantPeriod) bool {
if s.entries == 0 {
return false
}
//fmt.Printf("FillPeriod: %s, prevTimestamp: %s\n", time.Unix(int64(s.currentPeriod), 0), time.Unix(int64(prevTimestamp), 0))
p.Period = s.currentPeriod
if prevTimestamp > 0 {
p.Since = prevTimestamp
} else {
p.Since = s.since
}
p.Until = s.until
p.Min = s.min
p.Max = s.max
p.Avg = s.total / float64(s.entries)
return true
}
func (s *InstantAggregator) groupByDayUsingFHD(timestamp uint32) uint32 {
tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "d")
if tm.Hour() < s.firstHourOfDay {
tm = tm.AddDate(0, 0, -1)
}
return uint32(tm.Unix())
}
func (s *InstantAggregator) groupByMonthUsingFHD(timestamp uint32) uint32 {
tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m")
if tm.Hour() < s.firstHourOfDay {
tm = tm.AddDate(0, 0, -1)
}
return uint32(tm.Unix())
}
func (s *InstantAggregator) groupByMonthUsingLDM(timestamp uint32) uint32 {
tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m")
if tm.Day() > s.lastDayOfMonth {
tm = tm.AddDate(0, 1, 0)
}
return uint32(tm.Unix())
}
func (s *InstantAggregator) groupByMonthUsingFHDAndLDM(timestamp uint32) uint32 {
// ВАЖНО!
// Сперва проверяю время.
tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m")
if tm.Hour() < s.firstHourOfDay {
tm = tm.AddDate(0, 0, -1)
}
if tm.Day() > s.lastDayOfMonth {
tm = tm.AddDate(0, 1, 0)
}
return uint32(tm.Unix())
}
// CUMULATIVE
type CumulativeAggregator struct {
firstHourOfDay int
lastDayOfMonth int
time2period func(uint32) uint32
currentPeriod uint32
since uint32
until uint32
sinceValue float64
untilValue float64
entries int
}
type CumulativeAggregatorOptions struct {
GroupBy diploma.GroupBy
FirstHourOfDay int
LastDayOfMonth int
}
func NewCumulativeAggregator(opt CumulativeAggregatorOptions) (*CumulativeAggregator, error) {
s := &CumulativeAggregator{
firstHourOfDay: opt.FirstHourOfDay,
lastDayOfMonth: opt.LastDayOfMonth,
}
switch opt.GroupBy {
case diploma.GroupByHour:
s.time2period = groupByHour
case diploma.GroupByDay:
if s.firstHourOfDay > 0 {
s.time2period = s.groupByDayUsingFHD
} else {
s.time2period = groupByDay
}
case diploma.GroupByMonth:
if s.firstHourOfDay > 0 {
if s.lastDayOfMonth > 0 {
s.time2period = s.groupByMonthUsingFHDAndLDM
} else {
s.time2period = s.groupByMonthUsingFHD
}
} else {
if s.lastDayOfMonth > 0 {
s.time2period = s.groupByMonthUsingLDM
} else {
s.time2period = groupByMonth
}
}
default:
return nil, fmt.Errorf("unknown groupBy %d option", opt.GroupBy)
}
return s, nil
}
// return period complete flag
func (s *CumulativeAggregator) Feed(timestamp uint32, value float64, p *CumulativePeriod) bool {
period := s.time2period(timestamp)
if s.entries == 0 {
s.currentPeriod = period
s.since = timestamp
s.until = timestamp
s.sinceValue = value
s.untilValue = value
s.entries = 1
return false
}
if period != s.currentPeriod {
// готовый период
s.FillPeriod(timestamp, value, p)
s.currentPeriod = period
s.since = timestamp
s.until = timestamp
s.sinceValue = value
s.untilValue = value
s.entries = 1
return true
}
// начало периода
s.since = timestamp
s.sinceValue = value
s.entries++
return false
}
func (s *CumulativeAggregator) FillPeriod(prevTimestamp uint32, value float64, p *CumulativePeriod) bool {
if s.entries == 0 {
return false
}
p.Period = s.currentPeriod
if prevTimestamp > 0 {
p.Since = prevTimestamp
p.Total = s.untilValue - value
} else {
p.Since = s.since
p.Total = s.untilValue - s.sinceValue
}
p.Until = s.until
p.EndValue = s.untilValue
return true
}
func (s *CumulativeAggregator) groupByDayUsingFHD(timestamp uint32) uint32 {
tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "d")
if tm.Hour() < s.firstHourOfDay {
tm = tm.AddDate(0, 0, -1)
}
return uint32(tm.Unix())
}
func (s *CumulativeAggregator) groupByMonthUsingFHD(timestamp uint32) uint32 {
tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m")
if tm.Hour() < s.firstHourOfDay {
tm = tm.AddDate(0, 0, -1)
}
return uint32(tm.Unix())
}
func (s *CumulativeAggregator) groupByMonthUsingLDM(timestamp uint32) uint32 {
tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m")
if tm.Day() > s.lastDayOfMonth {
tm = tm.AddDate(0, 1, 0)
}
return uint32(tm.Unix())
}
func (s *CumulativeAggregator) groupByMonthUsingFHDAndLDM(timestamp uint32) uint32 {
// ВАЖНО!
// Сперва проверяю время.
tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m")
if tm.Hour() < s.firstHourOfDay {
tm = tm.AddDate(0, 0, -1)
}
if tm.Day() > s.lastDayOfMonth {
tm = tm.AddDate(0, 1, 0)
}
return uint32(tm.Unix())
}
func groupByHour(timestamp uint32) uint32 {
return uint32(timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "h").Unix())
}
func groupByDay(timestamp uint32) uint32 {
return uint32(timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "d").Unix())
}
func groupByMonth(timestamp uint32) uint32 {
return uint32(timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m").Unix())
}

@ -378,6 +378,10 @@ func (s *Atree) AppendDataPage(req AppendDataPageReq) (_ redo.Report, err error)
return return
} }
// На данний момен схема - наступна. Всі сторінки - data та index - зафіксовані в кеші.
// Отже запис на диск пройде максимально швидко. Після цього ReferenceCount кожної
// сторінки зменшиться на 1. Оскільки на метрику утримується XLock, сторінки мають
// ReferenceCount = 1 (немає інших читачів).
waitCh := make(chan struct{}) waitCh := make(chan struct{})
task := WriteTask{ task := WriteTask{

@ -8,7 +8,7 @@ import (
"math" "math"
"os" "os"
diploma "gordenko.dev/dima/diploma" octopus "gordenko.dev/dima/diploma"
"gordenko.dev/dima/diploma/atree/redo" "gordenko.dev/dima/diploma/atree/redo"
"gordenko.dev/dima/diploma/bin" "gordenko.dev/dima/diploma/bin"
) )
@ -74,8 +74,8 @@ func (s *Atree) releaseIndexPage(pageNo uint32) {
p.ReferenceCount-- p.ReferenceCount--
return return
} else { } else {
diploma.Abort( octopus.Abort(
diploma.ReferenceCountBug, octopus.ReferenceCountBug,
fmt.Errorf("call releaseIndexPage on page %d with reference count = %d", fmt.Errorf("call releaseIndexPage on page %d with reference count = %d",
pageNo, p.ReferenceCount), pageNo, p.ReferenceCount),
) )
@ -98,7 +98,7 @@ func (s *Atree) allocIndexPage() AllocatedPage {
} else { } else {
s.mutex.Lock() s.mutex.Lock()
if s.allocatedIndexPagesQty == math.MaxUint32 { if s.allocatedIndexPagesQty == math.MaxUint32 {
diploma.Abort(diploma.MaxAtreeSizeExceeded, octopus.Abort(octopus.MaxAtreeSizeExceeded,
errors.New("no space in Atree index")) errors.New("no space in Atree index"))
} }
s.allocatedIndexPagesQty++ s.allocatedIndexPagesQty++
@ -163,8 +163,8 @@ func (s *Atree) releaseDataPage(pageNo uint32) {
p.ReferenceCount-- p.ReferenceCount--
return return
} else { } else {
diploma.Abort( octopus.Abort(
diploma.ReferenceCountBug, octopus.ReferenceCountBug,
fmt.Errorf("call releaseDataPage on page %d with reference count = %d", fmt.Errorf("call releaseDataPage on page %d with reference count = %d",
pageNo, p.ReferenceCount), pageNo, p.ReferenceCount),
) )
@ -186,7 +186,7 @@ func (s *Atree) allocDataPage() AllocatedPage {
} else { } else {
s.mutex.Lock() s.mutex.Lock()
if s.allocatedDataPagesQty == math.MaxUint32 { if s.allocatedDataPagesQty == math.MaxUint32 {
diploma.Abort(diploma.MaxAtreeSizeExceeded, octopus.Abort(octopus.MaxAtreeSizeExceeded,
errors.New("no space in Atree index")) errors.New("no space in Atree index"))
} }
s.allocatedDataPagesQty++ s.allocatedDataPagesQty++
@ -303,7 +303,7 @@ func (s *Atree) pageWriter() {
case <-s.writeSignalCh: case <-s.writeSignalCh:
err := s.writeTasks() err := s.writeTasks()
if err != nil { if err != nil {
diploma.Abort(diploma.WriteToAtreeFailed, err) octopus.Abort(octopus.WriteToAtreeFailed, err)
} }
} }
} }

@ -3,17 +3,81 @@ package atree
import ( import (
"fmt" "fmt"
"gordenko.dev/dima/diploma" octopus "gordenko.dev/dima/diploma"
) )
type ContinueFullScanReq struct { type IterateAllCumulativeByTreeCursorReq struct {
MetricType diploma.MetricType
FracDigits byte FracDigits byte
ResponseWriter diploma.AtreeMeasureConsumer PageNo uint32
EndTimestamp uint32
EndValue float64
ResponseWriter *CumulativeMeasureWriter
}
func (s *Atree) IterateAllCumulativeByTreeCursor(req IterateAllCumulativeByTreeCursorReq) error {
buf, err := s.fetchDataPage(req.PageNo)
if err != nil {
return err
}
treeCursor, err := NewBackwardCursor(BackwardCursorOptions{
PageNo: req.PageNo,
PageData: buf,
Atree: s,
FracDigits: req.FracDigits,
MetricType: octopus.Cumulative,
})
if err != nil {
return err
}
defer treeCursor.Close()
var (
endTimestamp = req.EndTimestamp
endValue = req.EndValue
)
for {
timestamp, value, done, err := treeCursor.Prev()
if err != nil {
return err
}
if done {
err := req.ResponseWriter.WriteMeasure(CumulativeMeasure{
Timestamp: endTimestamp,
Value: endValue,
Total: endValue,
})
if err != nil {
return err
}
return nil
}
err = req.ResponseWriter.WriteMeasure(CumulativeMeasure{
Timestamp: endTimestamp,
Value: endValue,
Total: endValue - value,
})
if err != nil {
return err
}
endTimestamp = timestamp
endValue = value
}
}
type ContinueIterateCumulativeByTreeCursorReq struct {
FracDigits byte
Since uint32
Until uint32
LastPageNo uint32 LastPageNo uint32
EndTimestamp uint32
EndValue float64
ResponseWriter *CumulativeMeasureWriter
} }
func (s *Atree) ContinueFullScan(req ContinueFullScanReq) error { func (s *Atree) ContinueIterateCumulativeByTreeCursor(req ContinueIterateCumulativeByTreeCursorReq) error {
buf, err := s.fetchDataPage(req.LastPageNo) buf, err := s.fetchDataPage(req.LastPageNo)
if err != nil { if err != nil {
return fmt.Errorf("fetchDataPage(%d): %s", req.LastPageNo, err) return fmt.Errorf("fetchDataPage(%d): %s", req.LastPageNo, err)
@ -24,34 +88,182 @@ func (s *Atree) ContinueFullScan(req ContinueFullScanReq) error {
PageData: buf, PageData: buf,
Atree: s, Atree: s,
FracDigits: req.FracDigits, FracDigits: req.FracDigits,
MetricType: req.MetricType, MetricType: octopus.Cumulative,
}) })
if err != nil { if err != nil {
return err return err
} }
defer treeCursor.Close() defer treeCursor.Close()
var (
endTimestamp = req.EndTimestamp
endValue = req.EndValue
)
for { for {
timestamp, value, done, err := treeCursor.Prev() timestamp, value, done, err := treeCursor.Prev()
if err != nil { if err != nil {
return err return err
} }
if done { if done {
err := req.ResponseWriter.WriteMeasure(CumulativeMeasure{
Timestamp: endTimestamp,
Value: endValue,
Total: endValue,
})
if err != nil {
return err
}
return nil
}
if timestamp <= req.Until {
err := req.ResponseWriter.WriteMeasure(CumulativeMeasure{
Timestamp: endTimestamp,
Value: endValue,
Total: endValue - value,
})
if err != nil {
return err
}
if timestamp < req.Since {
return nil return nil
} }
req.ResponseWriter.Feed(timestamp, value) } else {
// bug panic
panic("continue cumulative but timestamp > req.Until")
}
} }
} }
type ContinueRangeScanReq struct { type FindAndIterateCumulativeByTreeCursorReq struct {
MetricType diploma.MetricType FracDigits byte
Since uint32
Until uint32
RootPageNo uint32
ResponseWriter *CumulativeMeasureWriter
}
func (s *Atree) FindAndIterateCumulativeByTreeCursor(req FindAndIterateCumulativeByTreeCursorReq) error {
pageNo, buf, err := s.findDataPage(req.RootPageNo, req.Until)
if err != nil {
return err
}
treeCursor, err := NewBackwardCursor(BackwardCursorOptions{
PageNo: pageNo,
PageData: buf,
Atree: s,
FracDigits: req.FracDigits,
MetricType: octopus.Cumulative,
})
if err != nil {
return err
}
defer treeCursor.Close()
var (
endTimestamp uint32
endValue float64
)
for {
timestamp, value, done, err := treeCursor.Prev()
if err != nil {
return err
}
if done {
if endTimestamp > 0 {
err := req.ResponseWriter.WriteMeasure(CumulativeMeasure{
Timestamp: endTimestamp,
Value: endValue,
Total: endValue,
})
if err != nil {
return err
}
}
return nil
}
if timestamp > req.Until {
continue
}
if endTimestamp > 0 {
err := req.ResponseWriter.WriteMeasure(CumulativeMeasure{
Timestamp: endTimestamp,
Value: endValue,
Total: endValue - value,
})
if err != nil {
return err
}
}
endTimestamp = timestamp
endValue = value
if timestamp < req.Since {
return nil
}
}
}
type IterateAllInstantByTreeCursorReq struct {
FracDigits byte
PageNo uint32
ResponseWriter *InstantMeasureWriter
}
func (s *Atree) IterateAllInstantByTreeCursor(req IterateAllInstantByTreeCursorReq) error {
buf, err := s.fetchDataPage(req.PageNo)
if err != nil {
return err
}
treeCursor, err := NewBackwardCursor(BackwardCursorOptions{
PageNo: req.PageNo,
PageData: buf,
Atree: s,
FracDigits: req.FracDigits,
MetricType: octopus.Instant,
})
if err != nil {
return err
}
defer treeCursor.Close()
for {
timestamp, value, done, err := treeCursor.Prev()
if err != nil {
return err
}
if done {
return nil
}
err = req.ResponseWriter.WriteMeasure(InstantMeasure{
Timestamp: timestamp,
Value: value,
})
if err != nil {
return err
}
}
}
type ContinueIterateInstantByTreeCursorReq struct {
FracDigits byte FracDigits byte
ResponseWriter diploma.AtreeMeasureConsumer
LastPageNo uint32
Since uint32 Since uint32
Until uint32
LastPageNo uint32
ResponseWriter *InstantMeasureWriter
} }
func (s *Atree) ContinueRangeScan(req ContinueRangeScanReq) error { func (s *Atree) ContinueIterateInstantByTreeCursor(req ContinueIterateInstantByTreeCursorReq) error {
buf, err := s.fetchDataPage(req.LastPageNo) buf, err := s.fetchDataPage(req.LastPageNo)
if err != nil { if err != nil {
return fmt.Errorf("fetchDataPage(%d): %s", req.LastPageNo, err) return fmt.Errorf("fetchDataPage(%d): %s", req.LastPageNo, err)
@ -62,7 +274,62 @@ func (s *Atree) ContinueRangeScan(req ContinueRangeScanReq) error {
PageData: buf, PageData: buf,
Atree: s, Atree: s,
FracDigits: req.FracDigits, FracDigits: req.FracDigits,
MetricType: req.MetricType, MetricType: octopus.Instant,
})
if err != nil {
return err
}
defer treeCursor.Close()
for {
timestamp, value, done, err := treeCursor.Prev()
if err != nil {
return err
}
if done {
// - записи закончились;
return nil
}
if timestamp > req.Until {
panic("continue instant timestamp > req.Until")
}
if timestamp < req.Since {
return nil
}
err = req.ResponseWriter.WriteMeasure(InstantMeasure{
Timestamp: timestamp,
Value: value,
})
if err != nil {
return err
}
}
}
type FindAndIterateInstantByTreeCursorReq struct {
FracDigits byte
Since uint32
Until uint32
RootPageNo uint32
ResponseWriter *InstantMeasureWriter
}
func (s *Atree) FindAndIterateInstantByTreeCursor(req FindAndIterateInstantByTreeCursorReq) error {
pageNo, buf, err := s.findDataPage(req.RootPageNo, req.Until)
if err != nil {
return err
}
treeCursor, err := NewBackwardCursor(BackwardCursorOptions{
PageNo: pageNo,
PageData: buf,
Atree: s,
FracDigits: req.FracDigits,
MetricType: octopus.Instant,
}) })
if err != nil { if err != nil {
return err return err
@ -74,60 +341,279 @@ func (s *Atree) ContinueRangeScan(req ContinueRangeScanReq) error {
if err != nil { if err != nil {
return err return err
} }
if done { if done {
return nil return nil
} }
req.ResponseWriter.Feed(timestamp, value)
if timestamp > req.Until {
continue
}
if timestamp < req.Since { if timestamp < req.Since {
return nil return nil
} }
err = req.ResponseWriter.WriteMeasure(InstantMeasure{
Timestamp: timestamp,
Value: value,
})
if err != nil {
return err
}
}
}
type ContinueCollectInstantPeriodsReq struct {
FracDigits byte
Aggregator *InstantAggregator
ResponseWriter *InstantPeriodsWriter
LastPageNo uint32
Since uint32
Until uint32
}
func (s *Atree) ContinueCollectInstantPeriods(req ContinueCollectInstantPeriodsReq) error {
buf, err := s.fetchDataPage(req.LastPageNo)
if err != nil {
return fmt.Errorf("fetchDataPage(%d): %s", req.LastPageNo, err)
}
treeCursor, err := NewBackwardCursor(BackwardCursorOptions{
PageNo: req.LastPageNo,
PageData: buf,
Atree: s,
FracDigits: req.FracDigits,
MetricType: octopus.Instant,
})
if err != nil {
return err
}
defer treeCursor.Close()
var period InstantPeriod
for {
timestamp, value, done, err := treeCursor.Prev()
if err != nil {
return err
}
if done || timestamp < req.Since {
isCompleted := req.Aggregator.FillPeriod(timestamp, &period)
if isCompleted {
err := req.ResponseWriter.WritePeriod(period)
if err != nil {
return err
}
}
return nil
}
if timestamp <= req.Until {
isCompleted := req.Aggregator.Feed(timestamp, value, &period)
if isCompleted {
err := req.ResponseWriter.WritePeriod(period)
if err != nil {
return err
}
}
}
} }
} }
type RangeScanReq struct { type FindInstantPeriodsReq struct {
MetricType diploma.MetricType
FracDigits byte FracDigits byte
ResponseWriter diploma.AtreeMeasureConsumer ResponseWriter *InstantPeriodsWriter
RootPageNo uint32 RootPageNo uint32
Since uint32 Since uint32
Until uint32 Until uint32
GroupBy octopus.GroupBy
FirstHourOfDay int
LastDayOfMonth int
} }
func (s *Atree) RangeScan(req RangeScanReq) error { func (s *Atree) FindInstantPeriods(req FindInstantPeriodsReq) error {
pageNo, buf, err := s.findDataPage(req.RootPageNo, req.Until) pageNo, buf, err := s.findDataPage(req.RootPageNo, req.Until)
if err != nil { if err != nil {
return err return err
} }
aggregator, err := NewInstantAggregator(InstantAggregatorOptions{
GroupBy: req.GroupBy,
FirstHourOfDay: req.FirstHourOfDay,
LastDayOfMonth: req.LastDayOfMonth,
})
if err != nil {
return err
}
cursor, err := NewBackwardCursor(BackwardCursorOptions{ cursor, err := NewBackwardCursor(BackwardCursorOptions{
PageNo: pageNo, PageNo: pageNo,
PageData: buf, PageData: buf,
Atree: s, Atree: s,
FracDigits: req.FracDigits, FracDigits: req.FracDigits,
MetricType: req.MetricType, MetricType: octopus.Instant,
}) })
if err != nil { if err != nil {
return err return err
} }
defer cursor.Close() defer cursor.Close()
var period InstantPeriod
for { for {
timestamp, value, done, err := cursor.Prev() timestamp, value, done, err := cursor.Prev()
if err != nil { if err != nil {
return err return err
} }
if done {
if done || timestamp < req.Since {
isCompleted := aggregator.FillPeriod(timestamp, &period)
if isCompleted {
err := req.ResponseWriter.WritePeriod(period)
if err != nil {
return err
}
}
return nil return nil
} }
//fmt.Printf("atree range scan: %s, %v\n", time.Unix(int64(timestamp), 0).Format("2006-01-02 15:04:05"), value) if timestamp <= req.Until {
isCompleted := aggregator.Feed(timestamp, value, &period)
if isCompleted {
err := req.ResponseWriter.WritePeriod(period)
if err != nil {
return err
}
}
}
}
}
type FindCumulativePeriodsReq struct {
FracDigits byte
ResponseWriter *CumulativePeriodsWriter
RootPageNo uint32
Since uint32
Until uint32
GroupBy octopus.GroupBy
FirstHourOfDay int
LastDayOfMonth int
}
func (s *Atree) FindCumulativePeriods(req FindCumulativePeriodsReq) error {
pageNo, buf, err := s.findDataPage(req.RootPageNo, req.Until)
if err != nil {
return err
}
aggregator, err := NewCumulativeAggregator(CumulativeAggregatorOptions{
GroupBy: req.GroupBy,
FirstHourOfDay: req.FirstHourOfDay,
LastDayOfMonth: req.LastDayOfMonth,
})
if err != nil {
return err
}
cursor, err := NewBackwardCursor(BackwardCursorOptions{
PageNo: pageNo,
PageData: buf,
Atree: s,
FracDigits: req.FracDigits,
MetricType: octopus.Cumulative,
})
if err != nil {
return err
}
defer cursor.Close()
var period CumulativePeriod
for {
timestamp, value, done, err := cursor.Prev()
if err != nil {
return err
}
if done || timestamp < req.Since {
isCompleted := aggregator.FillPeriod(timestamp, value, &period)
if isCompleted {
err := req.ResponseWriter.WritePeriod(period)
if err != nil {
return err
}
}
return nil
}
if timestamp <= req.Until { if timestamp <= req.Until {
req.ResponseWriter.Feed(timestamp, value) isCompleted := aggregator.Feed(timestamp, value, &period)
if isCompleted {
err := req.ResponseWriter.WritePeriod(period)
if err != nil {
return err
}
}
}
}
}
if timestamp < req.Since { type ContinueCollectCumulativePeriodsReq struct {
FracDigits byte
Aggregator *CumulativeAggregator
ResponseWriter *CumulativePeriodsWriter
LastPageNo uint32
Since uint32
Until uint32
}
func (s *Atree) ContinueCollectCumulativePeriods(req ContinueCollectCumulativePeriodsReq) error {
buf, err := s.fetchDataPage(req.LastPageNo)
if err != nil {
return fmt.Errorf("fetchDataPage(%d): %s", req.LastPageNo, err)
}
treeCursor, err := NewBackwardCursor(BackwardCursorOptions{
PageNo: req.LastPageNo,
PageData: buf,
Atree: s,
FracDigits: req.FracDigits,
MetricType: octopus.Cumulative,
})
if err != nil {
return err
}
defer treeCursor.Close()
var period CumulativePeriod
for {
timestamp, value, done, err := treeCursor.Prev()
if err != nil {
return err
}
if done || timestamp < req.Since {
isCompleted := req.Aggregator.FillPeriod(timestamp, value, &period)
if isCompleted {
err := req.ResponseWriter.WritePeriod(period)
if err != nil {
return err
}
}
return nil return nil
} }
if timestamp <= req.Until {
isCompleted := req.Aggregator.Feed(timestamp, value, &period)
if isCompleted {
err := req.ResponseWriter.WritePeriod(period)
if err != nil {
return err
}
}
} }
} }
} }

@ -0,0 +1,306 @@
package atree
import (
"bytes"
"fmt"
"io"
octopus "gordenko.dev/dima/diploma"
"gordenko.dev/dima/diploma/bin"
"gordenko.dev/dima/diploma/proto"
)
// CURRENT VALUE WRITER
type CurrentValue struct {
MetricID uint32
Timestamp uint32
Value float64
}
type CurrentValueWriter struct {
arr []byte
responder *ChunkedResponder
}
func NewCurrentValueWriter(dst io.Writer) *CurrentValueWriter {
return &CurrentValueWriter{
arr: make([]byte, 16),
responder: NewChunkedResponder(dst),
}
}
func (s *CurrentValueWriter) BufferValue(m CurrentValue) {
bin.PutUint32(s.arr[0:], m.MetricID)
bin.PutUint32(s.arr[4:], m.Timestamp)
bin.PutFloat64(s.arr[8:], m.Value)
s.responder.BufferRecord(s.arr)
}
func (s *CurrentValueWriter) Close() error {
return s.responder.Flush()
}
// INSTANT MEASURE WRITER
type InstantMeasure struct {
Timestamp uint32
Value float64
}
type InstantMeasureWriter struct {
arr []byte
responder *ChunkedResponder
}
func NewInstantMeasureWriter(dst io.Writer) *InstantMeasureWriter {
return &InstantMeasureWriter{
arr: make([]byte, 12),
responder: NewChunkedResponder(dst),
}
}
func (s *InstantMeasureWriter) BufferMeasure(m InstantMeasure) {
bin.PutUint32(s.arr[0:], m.Timestamp)
bin.PutFloat64(s.arr[4:], m.Value)
s.responder.BufferRecord(s.arr)
}
func (s *InstantMeasureWriter) WriteMeasure(m InstantMeasure) error {
bin.PutUint32(s.arr[0:], m.Timestamp)
bin.PutFloat64(s.arr[4:], m.Value)
return s.responder.AppendRecord(s.arr)
}
func (s *InstantMeasureWriter) Close() error {
return s.responder.Flush()
}
// CUMULATIVE MEASURE WRITER
type CumulativeMeasure struct {
Timestamp uint32
Value float64
Total float64
}
type CumulativeMeasureWriter struct {
arr []byte
responder *ChunkedResponder
}
func NewCumulativeMeasureWriter(dst io.Writer) *CumulativeMeasureWriter {
return &CumulativeMeasureWriter{
arr: make([]byte, 20),
responder: NewChunkedResponder(dst),
}
}
func (s *CumulativeMeasureWriter) BufferMeasure(m CumulativeMeasure) {
bin.PutUint32(s.arr[0:], m.Timestamp)
bin.PutFloat64(s.arr[4:], m.Value)
bin.PutFloat64(s.arr[12:], m.Total)
s.responder.BufferRecord(s.arr)
}
func (s *CumulativeMeasureWriter) WriteMeasure(m CumulativeMeasure) error {
bin.PutUint32(s.arr[0:], m.Timestamp)
bin.PutFloat64(s.arr[4:], m.Value)
bin.PutFloat64(s.arr[12:], m.Total)
return s.responder.AppendRecord(s.arr)
}
func (s *CumulativeMeasureWriter) Close() error {
return s.responder.Flush()
}
// INSTANT AGGREGATE WRITER
type InstantPeriodsWriter struct {
aggregateFuncs byte
arr []byte
responder *ChunkedResponder
}
func NewInstantPeriodsWriter(dst io.Writer, aggregateFuncs byte) *InstantPeriodsWriter {
var q int
if (aggregateFuncs & octopus.AggregateMin) == octopus.AggregateMin {
q++
}
if (aggregateFuncs & octopus.AggregateMax) == octopus.AggregateMax {
q++
}
if (aggregateFuncs & octopus.AggregateAvg) == octopus.AggregateAvg {
q++
}
return &InstantPeriodsWriter{
aggregateFuncs: aggregateFuncs,
arr: make([]byte, 12+q*8),
responder: NewChunkedResponder(dst),
}
}
type InstantPeriod struct {
Period uint32
Since uint32
Until uint32
Min float64
Max float64
Avg float64
}
func (s *InstantPeriodsWriter) BufferMeasure(p InstantPeriod) {
s.pack(p)
s.responder.BufferRecord(s.arr)
}
func (s *InstantPeriodsWriter) WritePeriod(p InstantPeriod) error {
s.pack(p)
return s.responder.AppendRecord(s.arr)
}
func (s *InstantPeriodsWriter) Close() error {
return s.responder.Flush()
}
func (s *InstantPeriodsWriter) pack(p InstantPeriod) {
bin.PutUint32(s.arr[0:], p.Period)
bin.PutUint32(s.arr[4:], p.Since)
bin.PutUint32(s.arr[8:], p.Until)
pos := 12
if (s.aggregateFuncs & octopus.AggregateMin) == octopus.AggregateMin {
bin.PutFloat64(s.arr[pos:], p.Min)
pos += 8
}
if (s.aggregateFuncs & octopus.AggregateMax) == octopus.AggregateMax {
bin.PutFloat64(s.arr[pos:], p.Max)
pos += 8
}
if (s.aggregateFuncs & octopus.AggregateAvg) == octopus.AggregateAvg {
bin.PutFloat64(s.arr[pos:], p.Avg)
}
}
// CUMULATIVE AGGREGATE WRITER
type CumulativePeriodsWriter struct {
arr []byte
responder *ChunkedResponder
}
func NewCumulativePeriodsWriter(dst io.Writer) *CumulativePeriodsWriter {
return &CumulativePeriodsWriter{
arr: make([]byte, 28),
responder: NewChunkedResponder(dst),
}
}
type CumulativePeriod struct {
Period uint32
Since uint32
Until uint32
EndValue float64
Total float64
}
func (s *CumulativePeriodsWriter) BufferMeasure(p CumulativePeriod) {
s.pack(p)
s.responder.BufferRecord(s.arr)
}
func (s *CumulativePeriodsWriter) WritePeriod(p CumulativePeriod) error {
s.pack(p)
return s.responder.AppendRecord(s.arr)
}
func (s *CumulativePeriodsWriter) Close() error {
return s.responder.Flush()
}
func (s *CumulativePeriodsWriter) pack(p CumulativePeriod) {
bin.PutUint32(s.arr[0:], p.Period)
bin.PutUint32(s.arr[4:], p.Since)
bin.PutUint32(s.arr[8:], p.Until)
bin.PutFloat64(s.arr[12:], p.EndValue)
bin.PutFloat64(s.arr[20:], p.Total)
}
// CHUNKED RESPONDER
//const headerSize = 3
var endMsg = []byte{
proto.RespEndOfValue, // end of stream
}
type ChunkedResponder struct {
recordsQty int
buf *bytes.Buffer
dst io.Writer
}
func NewChunkedResponder(dst io.Writer) *ChunkedResponder {
s := &ChunkedResponder{
recordsQty: 0,
buf: bytes.NewBuffer(nil),
dst: dst,
}
s.buf.Write([]byte{
proto.RespPartOfValue, // message type
0, 0, 0, 0, // records qty
})
return s
}
func (s *ChunkedResponder) BufferRecord(rec []byte) {
s.buf.Write(rec)
s.recordsQty++
}
func (s *ChunkedResponder) AppendRecord(rec []byte) error {
s.buf.Write(rec)
s.recordsQty++
if s.buf.Len() < 1500 {
return nil
}
if err := s.sendBuffered(); err != nil {
return err
}
s.buf.Write([]byte{
proto.RespPartOfValue, // message type
0, 0, 0, 0, // records qty
})
s.recordsQty = 0
return nil
}
func (s *ChunkedResponder) Flush() error {
if s.recordsQty > 0 {
if err := s.sendBuffered(); err != nil {
return err
}
}
if _, err := s.dst.Write(endMsg); err != nil {
return err
}
return nil
}
func (s *ChunkedResponder) sendBuffered() (err error) {
msg := s.buf.Bytes()
bin.PutUint32(msg[1:], uint32(s.recordsQty))
n, err := s.dst.Write(msg)
if err != nil {
return
}
if n != len(msg) {
return fmt.Errorf("incomplete write %d bytes instead of %d", n, len(msg))
}
s.buf.Reset()
return
}

@ -66,7 +66,13 @@ func (s *Connection) mustSuccess(reader *bufreader.BufferedReader) (err error) {
} }
} }
func (s *Connection) AddMetric(req proto.AddMetricReq) error { type Metric struct {
MetricID uint32
MetricType diploma.MetricType
FracDigits byte
}
func (s *Connection) AddMetric(req Metric) error {
arr := []byte{ arr := []byte{
proto.TypeAddMetric, proto.TypeAddMetric,
0, 0, 0, 0, // 0, 0, 0, 0, //
@ -81,7 +87,7 @@ func (s *Connection) AddMetric(req proto.AddMetricReq) error {
return s.mustSuccess(s.src) return s.mustSuccess(s.src)
} }
func (s *Connection) GetMetric(metricID uint32) (*proto.Metric, error) { func (s *Connection) GetMetric(metricID uint32) (*Metric, error) {
arr := []byte{ arr := []byte{
proto.TypeGetMetric, proto.TypeGetMetric,
0, 0, 0, 0, 0, 0, 0, 0,
@ -104,10 +110,10 @@ func (s *Connection) GetMetric(metricID uint32) (*proto.Metric, error) {
return nil, fmt.Errorf("read body: %s", err) return nil, fmt.Errorf("read body: %s", err)
} }
return &proto.Metric{ return &Metric{
MetricID: bin.GetUint32(arr), MetricID: bin.GetUint32(arr),
MetricType: diploma.MetricType(arr[4]), MetricType: diploma.MetricType(arr[4]),
FracDigits: int(arr[5]), FracDigits: arr[5],
}, nil }, nil
case proto.RespError: case proto.RespError:
@ -131,7 +137,13 @@ func (s *Connection) DeleteMetric(metricID uint32) error {
return s.mustSuccess(s.src) return s.mustSuccess(s.src)
} }
func (s *Connection) AppendMeasure(req proto.AppendMeasureReq) (err error) { type AppendMeasureReq struct {
MetricID uint32
Timestamp uint32
Value float64
}
func (s *Connection) AppendMeasure(req AppendMeasureReq) (err error) {
arr := []byte{ arr := []byte{
proto.TypeAppendMeasure, proto.TypeAppendMeasure,
0, 0, 0, 0, // metricID 0, 0, 0, 0, // metricID
@ -148,7 +160,17 @@ func (s *Connection) AppendMeasure(req proto.AppendMeasureReq) (err error) {
return s.mustSuccess(s.src) return s.mustSuccess(s.src)
} }
func (s *Connection) AppendMeasures(req proto.AppendMeasuresReq) (err error) { type AppendMeasuresReq struct {
MetricID uint32
Measures []Measure
}
type Measure struct {
Timestamp uint32
Value float64
}
func (s *Connection) AppendMeasures(req AppendMeasuresReq) (err error) {
if len(req.Measures) > 65535 { if len(req.Measures) > 65535 {
return fmt.Errorf("wrong measures qty: %d", len(req.Measures)) return fmt.Errorf("wrong measures qty: %d", len(req.Measures))
} }
@ -172,7 +194,12 @@ func (s *Connection) AppendMeasures(req proto.AppendMeasuresReq) (err error) {
return s.mustSuccess(s.src) return s.mustSuccess(s.src)
} }
func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]proto.InstantMeasure, error) { type InstantMeasure struct {
Timestamp uint32
Value float64
}
func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]InstantMeasure, error) {
arr := []byte{ arr := []byte{
proto.TypeListAllInstantMeasures, proto.TypeListAllInstantMeasures,
0, 0, 0, 0, // metricID 0, 0, 0, 0, // metricID
@ -184,7 +211,7 @@ func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]proto.InstantMea
} }
var ( var (
result []proto.InstantMeasure result []InstantMeasure
tmp = make([]byte, 12) tmp = make([]byte, 12)
) )
@ -207,7 +234,7 @@ func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]proto.InstantMea
return nil, fmt.Errorf("read record #%d: %s", i, err) return nil, fmt.Errorf("read record #%d: %s", i, err)
} }
result = append(result, proto.InstantMeasure{ result = append(result, InstantMeasure{
Timestamp: bin.GetUint32(tmp), Timestamp: bin.GetUint32(tmp),
Value: bin.GetFloat64(tmp[4:]), Value: bin.GetFloat64(tmp[4:]),
}) })
@ -225,12 +252,13 @@ func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]proto.InstantMea
} }
} }
func (s *Connection) ListInstantMeasures(req proto.ListInstantMeasuresReq) ([]proto.InstantMeasure, error) { func (s *Connection) ListInstantMeasures(req proto.ListInstantMeasuresReq) ([]InstantMeasure, error) {
arr := []byte{ arr := []byte{
proto.TypeListInstantMeasures, proto.TypeListInstantMeasures,
0, 0, 0, 0, // metricID 0, 0, 0, 0, // metricID
0, 0, 0, 0, // since 0, 0, 0, 0, // since
0, 0, 0, 0, // until 0, 0, 0, 0, // until
byte(req.FirstHourOfDay),
} }
bin.PutUint32(arr[1:], req.MetricID) bin.PutUint32(arr[1:], req.MetricID)
bin.PutUint32(arr[5:], req.Since) bin.PutUint32(arr[5:], req.Since)
@ -241,7 +269,7 @@ func (s *Connection) ListInstantMeasures(req proto.ListInstantMeasuresReq) ([]pr
} }
var ( var (
result []proto.InstantMeasure result []InstantMeasure
tmp = make([]byte, 12) tmp = make([]byte, 12)
) )
@ -264,7 +292,7 @@ func (s *Connection) ListInstantMeasures(req proto.ListInstantMeasuresReq) ([]pr
return nil, fmt.Errorf("read record #%d: %s", i, err) return nil, fmt.Errorf("read record #%d: %s", i, err)
} }
result = append(result, proto.InstantMeasure{ result = append(result, InstantMeasure{
Timestamp: bin.GetUint32(tmp), Timestamp: bin.GetUint32(tmp),
Value: bin.GetFloat64(tmp[4:]), Value: bin.GetFloat64(tmp[4:]),
}) })
@ -282,7 +310,13 @@ func (s *Connection) ListInstantMeasures(req proto.ListInstantMeasuresReq) ([]pr
} }
} }
func (s *Connection) ListAllCumulativeMeasures(metricID uint32) ([]proto.CumulativeMeasure, error) { type CumulativeMeasure struct {
Timestamp uint32
Value float64
Total float64
}
func (s *Connection) ListAllCumulativeMeasures(metricID uint32) ([]CumulativeMeasure, error) {
arr := []byte{ arr := []byte{
proto.TypeListAllCumulativeMeasures, proto.TypeListAllCumulativeMeasures,
0, 0, 0, 0, // metricID 0, 0, 0, 0, // metricID
@ -294,7 +328,7 @@ func (s *Connection) ListAllCumulativeMeasures(metricID uint32) ([]proto.Cumulat
} }
var ( var (
result []proto.CumulativeMeasure result []CumulativeMeasure
tmp = make([]byte, 20) tmp = make([]byte, 20)
) )
@ -317,7 +351,7 @@ func (s *Connection) ListAllCumulativeMeasures(metricID uint32) ([]proto.Cumulat
return nil, fmt.Errorf("read record #%d: %s", i, err) return nil, fmt.Errorf("read record #%d: %s", i, err)
} }
result = append(result, proto.CumulativeMeasure{ result = append(result, CumulativeMeasure{
Timestamp: bin.GetUint32(tmp), Timestamp: bin.GetUint32(tmp),
Value: bin.GetFloat64(tmp[4:]), Value: bin.GetFloat64(tmp[4:]),
Total: bin.GetFloat64(tmp[12:]), Total: bin.GetFloat64(tmp[12:]),
@ -336,12 +370,13 @@ func (s *Connection) ListAllCumulativeMeasures(metricID uint32) ([]proto.Cumulat
} }
} }
func (s *Connection) ListCumulativeMeasures(req proto.ListCumulativeMeasuresReq) ([]proto.CumulativeMeasure, error) { func (s *Connection) ListCumulativeMeasures(req proto.ListCumulativeMeasuresReq) ([]CumulativeMeasure, error) {
arr := []byte{ arr := []byte{
proto.TypeListCumulativeMeasures, proto.TypeListCumulativeMeasures,
0, 0, 0, 0, // metricID 0, 0, 0, 0, // metricID
0, 0, 0, 0, // since 0, 0, 0, 0, // since
0, 0, 0, 0, // until 0, 0, 0, 0, // until
byte(req.FirstHourOfDay),
} }
bin.PutUint32(arr[1:], req.MetricID) bin.PutUint32(arr[1:], req.MetricID)
bin.PutUint32(arr[5:], req.Since) bin.PutUint32(arr[5:], req.Since)
@ -352,7 +387,7 @@ func (s *Connection) ListCumulativeMeasures(req proto.ListCumulativeMeasuresReq)
} }
var ( var (
result []proto.CumulativeMeasure result []CumulativeMeasure
tmp = make([]byte, 20) tmp = make([]byte, 20)
) )
@ -375,7 +410,7 @@ func (s *Connection) ListCumulativeMeasures(req proto.ListCumulativeMeasuresReq)
return nil, fmt.Errorf("read record #%d: %s", i, err) return nil, fmt.Errorf("read record #%d: %s", i, err)
} }
result = append(result, proto.CumulativeMeasure{ result = append(result, CumulativeMeasure{
Timestamp: bin.GetUint32(tmp), Timestamp: bin.GetUint32(tmp),
Value: bin.GetFloat64(tmp[4:]), Value: bin.GetFloat64(tmp[4:]),
Total: bin.GetFloat64(tmp[12:]), Total: bin.GetFloat64(tmp[12:]),
@ -394,7 +429,16 @@ func (s *Connection) ListCumulativeMeasures(req proto.ListCumulativeMeasuresReq)
} }
} }
func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]proto.InstantPeriod, error) { type InstantPeriod struct {
Period uint32
Since uint32
Until uint32
Min float64
Max float64
Avg float64
}
func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]InstantPeriod, error) {
arr := []byte{ arr := []byte{
proto.TypeListInstantPeriods, proto.TypeListInstantPeriods,
0, 0, 0, 0, // metricID 0, 0, 0, 0, // metricID
@ -403,14 +447,11 @@ func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]prot
byte(req.GroupBy), byte(req.GroupBy),
req.AggregateFuncs, req.AggregateFuncs,
byte(req.FirstHourOfDay), byte(req.FirstHourOfDay),
byte(req.LastDayOfMonth),
} }
bin.PutUint32(arr[1:], req.MetricID) bin.PutUint32(arr[1:], req.MetricID)
bin.PutUint16(arr[5:], uint16(req.Since.Year)) bin.PutUint32(arr[5:], req.Since)
arr[7] = byte(req.Since.Month) bin.PutUint32(arr[9:], req.Until)
arr[8] = byte(req.Since.Day)
bin.PutUint16(arr[9:], uint16(req.Until.Year))
arr[11] = byte(req.Until.Month)
arr[12] = byte(req.Until.Day)
if _, err := s.conn.Write(arr); err != nil { if _, err := s.conn.Write(arr); err != nil {
return nil, err return nil, err
@ -428,7 +469,7 @@ func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]prot
} }
var ( var (
result []proto.InstantPeriod result []InstantPeriod
// 12 bytes - period, since, until // 12 bytes - period, since, until
// q * 8 bytes - min, max, avg // q * 8 bytes - min, max, avg
tmp = make([]byte, 12+q*8) tmp = make([]byte, 12+q*8)
@ -454,10 +495,10 @@ func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]prot
} }
var ( var (
p = proto.InstantPeriod{ p = InstantPeriod{
Period: bin.GetUint32(tmp[0:]), Period: bin.GetUint32(tmp[0:]),
Start: bin.GetUint32(tmp[4:]), Since: bin.GetUint32(tmp[4:]),
End: bin.GetUint32(tmp[8:]), Until: bin.GetUint32(tmp[8:]),
} }
// 12 bytes - period, since, until // 12 bytes - period, since, until
pos = 12 pos = 12
@ -489,7 +530,15 @@ func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]prot
} }
} }
func (s *Connection) ListCumulativePeriods(req proto.ListCumulativePeriodsReq) ([]proto.CumulativePeriod, error) { type CumulativePeriod struct {
Period uint32
Since uint32
Until uint32
EndValue float64
Total float64
}
func (s *Connection) ListCumulativePeriods(req proto.ListCumulativePeriodsReq) ([]CumulativePeriod, error) {
arr := []byte{ arr := []byte{
proto.TypeListCumulativePeriods, proto.TypeListCumulativePeriods,
0, 0, 0, 0, // metricID 0, 0, 0, 0, // metricID
@ -497,21 +546,18 @@ func (s *Connection) ListCumulativePeriods(req proto.ListCumulativePeriodsReq) (
0, 0, 0, 0, // until 0, 0, 0, 0, // until
byte(req.GroupBy), byte(req.GroupBy),
byte(req.FirstHourOfDay), byte(req.FirstHourOfDay),
byte(req.LastDayOfMonth),
} }
bin.PutUint32(arr[1:], req.MetricID) bin.PutUint32(arr[1:], req.MetricID)
bin.PutUint16(arr[5:], uint16(req.Since.Year)) bin.PutUint32(arr[5:], req.Since)
arr[7] = byte(req.Since.Month) bin.PutUint32(arr[9:], req.Until)
arr[8] = byte(req.Since.Day)
bin.PutUint16(arr[9:], uint16(req.Until.Year))
arr[11] = byte(req.Until.Month)
arr[12] = byte(req.Until.Day)
if _, err := s.conn.Write(arr); err != nil { if _, err := s.conn.Write(arr); err != nil {
return nil, err return nil, err
} }
var ( var (
result []proto.CumulativePeriod result []CumulativePeriod
tmp = make([]byte, 28) tmp = make([]byte, 28)
) )
@ -533,12 +579,12 @@ func (s *Connection) ListCumulativePeriods(req proto.ListCumulativePeriodsReq) (
if err != nil { if err != nil {
return nil, fmt.Errorf("read record #%d: %s", i, err) return nil, fmt.Errorf("read record #%d: %s", i, err)
} }
result = append(result, proto.CumulativePeriod{ result = append(result, CumulativePeriod{
Period: bin.GetUint32(tmp[0:]), Period: bin.GetUint32(tmp[0:]),
Start: bin.GetUint32(tmp[4:]), Since: bin.GetUint32(tmp[4:]),
End: bin.GetUint32(tmp[8:]), Until: bin.GetUint32(tmp[8:]),
StartValue: bin.GetFloat64(tmp[12:]), EndValue: bin.GetFloat64(tmp[12:]),
EndValue: bin.GetFloat64(tmp[20:]), Total: bin.GetFloat64(tmp[20:]),
}) })
} }
@ -554,7 +600,13 @@ func (s *Connection) ListCumulativePeriods(req proto.ListCumulativePeriodsReq) (
} }
} }
func (s *Connection) ListCurrentValues(metricIDs []uint32) ([]proto.CurrentValue, error) { type CurrentValue struct {
MetricID uint32
Timestamp uint32
Value float64
}
func (s *Connection) ListCurrentValues(metricIDs []uint32) ([]CurrentValue, error) {
arr := make([]byte, 3+metricKeySize*len(metricIDs)) arr := make([]byte, 3+metricKeySize*len(metricIDs))
arr[0] = proto.TypeListCurrentValues arr[0] = proto.TypeListCurrentValues
@ -571,7 +623,7 @@ func (s *Connection) ListCurrentValues(metricIDs []uint32) ([]proto.CurrentValue
} }
var ( var (
result []proto.CurrentValue result []CurrentValue
tmp = make([]byte, 16) tmp = make([]byte, 16)
) )
@ -594,7 +646,7 @@ func (s *Connection) ListCurrentValues(metricIDs []uint32) ([]proto.CurrentValue
return nil, fmt.Errorf("read record #%d: %s", i, err) return nil, fmt.Errorf("read record #%d: %s", i, err)
} }
result = append(result, proto.CurrentValue{ result = append(result, CurrentValue{
MetricID: bin.GetUint32(tmp), MetricID: bin.GetUint32(tmp),
Timestamp: bin.GetUint32(tmp[4:]), Timestamp: bin.GetUint32(tmp[4:]),
Value: bin.GetFloat64(tmp[8:]), Value: bin.GetFloat64(tmp[8:]),
@ -628,6 +680,55 @@ func (s *Connection) DeleteMeasures(req proto.DeleteMeasuresReq) (err error) {
return s.mustSuccess(s.src) return s.mustSuccess(s.src)
} }
type RangeTotalResp struct {
Since uint32
SinceValue float64
Until uint32
UntilValue float64
}
func (s *Connection) RangeTotal(req proto.RangeTotalReq) (*RangeTotalResp, error) {
arr := []byte{
proto.TypeGetMetric,
0, 0, 0, 0,
0, 0, 0, 0,
0, 0, 0, 0,
}
bin.PutUint32(arr[1:], req.MetricID)
bin.PutUint32(arr[5:], req.Since)
bin.PutUint32(arr[9:], req.MetricID)
if _, err := s.conn.Write(arr); err != nil {
return nil, err
}
code, err := s.src.ReadByte()
if err != nil {
return nil, fmt.Errorf("read response code: %s", err)
}
switch code {
case proto.RespValue:
arr, err := s.src.ReadN(24)
if err != nil {
return nil, fmt.Errorf("read body: %s", err)
}
return &RangeTotalResp{
Since: bin.GetUint32(arr),
SinceValue: bin.GetFloat64(arr[4:]),
Until: bin.GetUint32(arr[12:]),
UntilValue: bin.GetFloat64(arr[16:]),
}, nil
case proto.RespError:
return nil, s.onError()
default:
return nil, fmt.Errorf("unknown reponse code %d", code)
}
}
func (s *Connection) onError() error { func (s *Connection) onError() error {
errorCode, err := bin.ReadUint16(s.src) errorCode, err := bin.ReadUint16(s.src)
if err != nil { if err != nil {

@ -13,7 +13,6 @@ import (
"gordenko.dev/dima/diploma/chunkenc" "gordenko.dev/dima/diploma/chunkenc"
"gordenko.dev/dima/diploma/conbuf" "gordenko.dev/dima/diploma/conbuf"
"gordenko.dev/dima/diploma/proto" "gordenko.dev/dima/diploma/proto"
"gordenko.dev/dima/diploma/transform"
"gordenko.dev/dima/diploma/txlog" "gordenko.dev/dima/diploma/txlog"
) )
@ -327,9 +326,6 @@ type FilledPage struct {
} }
type tryAppendMeasureResult struct { type tryAppendMeasureResult struct {
MetricID uint32
Timestamp uint32
Value float64
FilledPage *FilledPage FilledPage *FilledPage
ResultCode byte ResultCode byte
} }
@ -445,6 +441,7 @@ func (s *Database) AppendMeasures(req proto.AppendMeasuresReq) uint16 {
) )
for idx, measure := range req.Measures { for idx, measure := range req.Measures {
//fmt.Printf("%d %v\n", measure.Timestamp, measure.Value)
if since == 0 { if since == 0 {
since = measure.Timestamp since = measure.Timestamp
} else { } else {
@ -473,6 +470,7 @@ func (s *Database) AppendMeasures(req proto.AppendMeasuresReq) uint16 {
) )
<-waitCh <-waitCh
} }
//fmt.Printf("m.Value: %v < untilValue: %v\n", measure.Value, untilValue)
return proto.ErrNonMonotonicValue return proto.ErrNonMonotonicValue
} }
} }
@ -499,11 +497,10 @@ func (s *Database) AppendMeasures(req proto.AppendMeasuresReq) uint16 {
toAppendMeasures = nil toAppendMeasures = nil
} }
//fmt.Printf("APPEND DATA PAGE %d, %v\n", measure.Timestamp, measure.Value)
report, err := s.atree.AppendDataPage(atree.AppendDataPageReq{ report, err := s.atree.AppendDataPage(atree.AppendDataPageReq{
MetricID: req.MetricID, MetricID: req.MetricID,
Timestamp: measure.Timestamp, Timestamp: until,
Value: measure.Value, Value: untilValue,
Since: since, Since: since,
RootPageNo: rootPageNo, RootPageNo: rootPageNo,
PrevPageNo: prevPageNo, PrevPageNo: prevPageNo,
@ -596,27 +593,23 @@ func (s *Database) DeleteMeasures(req proto.DeleteMeasuresReq) uint16 {
result := <-resultCh result := <-resultCh
switch result.ResultCode { switch result.ResultCode {
case NoMeasuresToDelete: case Succeed:
// ok var (
freeDataPages []uint32
case DeleteFromAtreeNotNeeded: freeIndexPages []uint32
// регистрирую удаление в TransactionLog )
waitCh := s.txlog.WriteDeletedMeasures(txlog.DeletedMeasures{ if result.RootPageNo > 0 {
MetricID: req.MetricID,
})
<-waitCh
case DeleteFromAtreeRequired:
// собираю номера всех data и index страниц метрики (типа запись REDO лога).
pageLists, err := s.atree.GetAllPages(result.RootPageNo) pageLists, err := s.atree.GetAllPages(result.RootPageNo)
if err != nil { if err != nil {
diploma.Abort(diploma.FailedAtreeRequest, err) diploma.Abort(diploma.FailedAtreeRequest, err)
} }
// регистрирую удаление в TransactionLog freeDataPages = pageLists.DataPages
freeIndexPages = pageLists.IndexPages
}
waitCh := s.txlog.WriteDeletedMeasures(txlog.DeletedMeasures{ waitCh := s.txlog.WriteDeletedMeasures(txlog.DeletedMeasures{
MetricID: req.MetricID, MetricID: req.MetricID,
FreeDataPages: pageLists.DataPages, FreeDataPages: freeDataPages,
FreeIndexPages: pageLists.IndexPages, FreeIndexPages: freeIndexPages,
}) })
<-waitCh <-waitCh
@ -631,32 +624,98 @@ func (s *Database) DeleteMeasures(req proto.DeleteMeasuresReq) uint16 {
// SELECT // SELECT
type fullScanResult struct { type instantMeasuresResult struct {
ResultCode byte ResultCode byte
FracDigits byte FracDigits byte
LastPageNo uint32 PageNo uint32
} }
func (s *Database) ListAllInstantMeasures(conn net.Conn, req proto.ListAllInstantMetricMeasuresReq) error { func (s *Database) ListAllInstantMeasures(conn net.Conn, req proto.ListAllInstantMetricMeasuresReq) error {
responseWriter := transform.NewInstantMeasureWriter(conn, 0) resultCh := make(chan instantMeasuresResult, 1)
return s.fullScan(fullScanReq{ responseWriter := atree.NewInstantMeasureWriter(conn)
s.appendJobToWorkerQueue(tryListAllInstantMeasuresReq{
MetricID: req.MetricID, MetricID: req.MetricID,
MetricType: diploma.Instant, ResponseWriter: responseWriter,
Conn: conn, ResultCh: resultCh,
})
result := <-resultCh
switch result.ResultCode {
case QueryDone:
responseWriter.Close()
case UntilFound:
err := s.atree.IterateAllInstantByTreeCursor(atree.IterateAllInstantByTreeCursorReq{
FracDigits: result.FracDigits,
PageNo: result.PageNo,
ResponseWriter: responseWriter, ResponseWriter: responseWriter,
}) })
s.metricRUnlock(req.MetricID)
if err != nil {
reply(conn, proto.ErrUnexpected)
} else {
responseWriter.Close()
}
case NoMetric:
reply(conn, proto.ErrNoMetric)
case WrongMetricType:
reply(conn, proto.ErrWrongMetricType)
default:
diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug)
}
return nil
} }
func (s *Database) ListAllCumulativeMeasures(conn io.Writer, req proto.ListAllCumulativeMeasuresReq) error { func (s *Database) ListAllCumulativeMeasures(conn io.Writer, req proto.ListAllCumulativeMeasuresReq) error {
responseWriter := transform.NewCumulativeMeasureWriter(conn, 0) resultCh := make(chan cumulativeMeasuresResult, 1)
responseWriter := atree.NewCumulativeMeasureWriter(conn)
return s.fullScan(fullScanReq{ s.appendJobToWorkerQueue(tryListAllCumulativeMeasuresReq{
MetricID: req.MetricID, MetricID: req.MetricID,
MetricType: diploma.Cumulative,
Conn: conn,
ResponseWriter: responseWriter, ResponseWriter: responseWriter,
ResultCh: resultCh,
}) })
result := <-resultCh
switch result.ResultCode {
case QueryDone:
responseWriter.Close()
case UntilFound:
err := s.atree.IterateAllCumulativeByTreeCursor(atree.IterateAllCumulativeByTreeCursorReq{
FracDigits: result.FracDigits,
PageNo: result.PageNo,
EndTimestamp: result.EndTimestamp,
EndValue: result.EndValue,
ResponseWriter: responseWriter,
})
s.metricRUnlock(req.MetricID)
if err != nil {
reply(conn, proto.ErrUnexpected)
} else {
responseWriter.Close()
}
case NoMetric:
reply(conn, proto.ErrNoMetric)
case WrongMetricType:
reply(conn, proto.ErrWrongMetricType)
default:
diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug)
}
return nil
} }
func (s *Database) ListInstantMeasures(conn net.Conn, req proto.ListInstantMeasuresReq) error { func (s *Database) ListInstantMeasures(conn net.Conn, req proto.ListInstantMeasuresReq) error {
@ -665,118 +724,182 @@ func (s *Database) ListInstantMeasures(conn net.Conn, req proto.ListInstantMeasu
return nil return nil
} }
responseWriter := transform.NewInstantMeasureWriter(conn, req.Since) var since, until uint32
if req.FirstHourOfDay > 0 {
since, until = correctToFHD(req.Since, req.Until, req.FirstHourOfDay)
} else {
since = req.Since
until = req.Until
}
return s.rangeScan(rangeScanReq{ resultCh := make(chan instantMeasuresResult, 1)
responseWriter := atree.NewInstantMeasureWriter(conn)
s.appendJobToWorkerQueue(tryListInstantMeasuresReq{
MetricID: req.MetricID, MetricID: req.MetricID,
MetricType: diploma.Instant, Since: since,
Since: req.Since, Until: until,
Until: req.Until,
Conn: conn,
ResponseWriter: responseWriter, ResponseWriter: responseWriter,
ResultCh: resultCh,
}) })
}
func (s *Database) ListCumulativeMeasures(conn net.Conn, req proto.ListCumulativeMeasuresReq) error { result := <-resultCh
if req.Since > req.Until {
reply(conn, proto.ErrInvalidRange)
return nil
}
responseWriter := transform.NewCumulativeMeasureWriter(conn, req.Since) switch result.ResultCode {
case QueryDone:
responseWriter.Close()
return s.rangeScan(rangeScanReq{ case UntilFound:
MetricID: req.MetricID, err := s.atree.ContinueIterateInstantByTreeCursor(atree.ContinueIterateInstantByTreeCursorReq{
MetricType: diploma.Cumulative, FracDigits: result.FracDigits,
Since: req.Since, Since: since,
Until: req.Until, Until: until,
Conn: conn, LastPageNo: result.PageNo,
ResponseWriter: responseWriter,
})
s.metricRUnlock(req.MetricID)
if err != nil {
reply(conn, proto.ErrUnexpected)
} else {
responseWriter.Close()
}
case UntilNotFound:
err := s.atree.FindAndIterateInstantByTreeCursor(atree.FindAndIterateInstantByTreeCursorReq{
FracDigits: result.FracDigits,
Since: since,
Until: until,
RootPageNo: result.PageNo,
ResponseWriter: responseWriter, ResponseWriter: responseWriter,
}) })
s.metricRUnlock(req.MetricID)
if err != nil {
reply(conn, proto.ErrUnexpected)
} else {
responseWriter.Close()
}
case NoMetric:
reply(conn, proto.ErrNoMetric)
case WrongMetricType:
reply(conn, proto.ErrWrongMetricType)
default:
diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug)
}
return nil
} }
type rangeScanResult struct { type cumulativeMeasuresResult struct {
ResultCode byte ResultCode byte
FracDigits byte FracDigits byte
RootPageNo uint32 PageNo uint32
LastPageNo uint32 EndTimestamp uint32
EndValue float64
} }
func (s *Database) ListInstantPeriods(conn net.Conn, req proto.ListInstantPeriodsReq) error { func (s *Database) ListCumulativeMeasures(conn net.Conn, req proto.ListCumulativeMeasuresReq) error {
since, until := timeBoundsOfAggregation(req.Since, req.Until, req.GroupBy, req.FirstHourOfDay) resultCh := make(chan cumulativeMeasuresResult, 1)
if since.After(until) { responseWriter := atree.NewCumulativeMeasureWriter(conn)
reply(conn, proto.ErrInvalidRange)
return nil
}
responseWriter, err := transform.NewInstantPeriodsWriter(transform.InstantPeriodsWriterOptions{ s.appendJobToWorkerQueue(tryListCumulativeMeasuresReq{
Dst: conn, MetricID: req.MetricID,
GroupBy: req.GroupBy, Since: req.Since,
Since: uint32(since.Unix()), Until: req.Until,
AggregateFuncs: req.AggregateFuncs, ResponseWriter: responseWriter,
FirstHourOfDay: req.FirstHourOfDay, ResultCh: resultCh,
}) })
result := <-resultCh
switch result.ResultCode {
case QueryDone:
responseWriter.Close()
case UntilFound:
err := s.atree.ContinueIterateCumulativeByTreeCursor(atree.ContinueIterateCumulativeByTreeCursorReq{
FracDigits: result.FracDigits,
Since: req.Since,
Until: req.Until,
LastPageNo: result.PageNo,
EndTimestamp: result.EndTimestamp,
EndValue: result.EndValue,
ResponseWriter: responseWriter,
})
s.metricRUnlock(req.MetricID)
if err != nil { if err != nil {
reply(conn, proto.ErrUnexpected) reply(conn, proto.ErrUnexpected)
return nil } else {
responseWriter.Close()
} }
return s.rangeScan(rangeScanReq{ case UntilNotFound:
MetricID: req.MetricID, err := s.atree.FindAndIterateCumulativeByTreeCursor(atree.FindAndIterateCumulativeByTreeCursorReq{
MetricType: diploma.Instant, FracDigits: result.FracDigits,
Since: uint32(since.Unix()), Since: req.Since,
Until: uint32(until.Unix()), Until: req.Until,
Conn: conn, RootPageNo: result.PageNo,
ResponseWriter: responseWriter, ResponseWriter: responseWriter,
}) })
} s.metricRUnlock(req.MetricID)
if err != nil {
reply(conn, proto.ErrUnexpected)
} else {
responseWriter.Close()
}
func (s *Database) ListCumulativePeriods(conn net.Conn, req proto.ListCumulativePeriodsReq) error { case NoMetric:
since, until := timeBoundsOfAggregation(req.Since, req.Until, req.GroupBy, req.FirstHourOfDay) reply(conn, proto.ErrNoMetric)
if since.After(until) {
reply(conn, proto.ErrInvalidRange) case WrongMetricType:
reply(conn, proto.ErrWrongMetricType)
default:
diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug)
}
return nil return nil
}
type instantPeriodsResult struct {
ResultCode byte
FracDigits byte
PageNo uint32
}
func (s *Database) ListInstantPeriods(conn net.Conn, req proto.ListInstantPeriodsReq) error {
var (
since = req.Since
until = req.Until
)
if req.FirstHourOfDay > 0 {
since, until = correctToFHD(since, until, req.FirstHourOfDay)
} }
responseWriter, err := transform.NewCumulativePeriodsWriter(transform.CumulativePeriodsWriterOptions{ if req.LastDayOfMonth > 0 {
Dst: conn, // fix
}
resultCh := make(chan instantPeriodsResult, 1)
aggregator, err := atree.NewInstantAggregator(atree.InstantAggregatorOptions{
GroupBy: req.GroupBy, GroupBy: req.GroupBy,
Since: uint32(since.Unix()),
FirstHourOfDay: req.FirstHourOfDay, FirstHourOfDay: req.FirstHourOfDay,
LastDayOfMonth: req.LastDayOfMonth,
}) })
if err != nil { if err != nil {
reply(conn, proto.ErrUnexpected) reply(conn, proto.ErrUnexpected)
return nil return nil
} }
return s.rangeScan(rangeScanReq{ responseWriter := atree.NewInstantPeriodsWriter(conn, req.AggregateFuncs)
MetricID: req.MetricID,
MetricType: diploma.Cumulative,
Since: uint32(since.Unix()),
Until: uint32(until.Unix()),
Conn: conn,
ResponseWriter: responseWriter,
})
}
type rangeScanReq struct {
MetricID uint32
MetricType diploma.MetricType
Since uint32
Until uint32
Conn io.Writer
ResponseWriter diploma.MeasureConsumer
}
func (s *Database) rangeScan(req rangeScanReq) error {
resultCh := make(chan rangeScanResult, 1)
s.appendJobToWorkerQueue(tryRangeScanReq{ s.appendJobToWorkerQueue(tryListInstantPeriodsReq{
MetricID: req.MetricID, MetricID: req.MetricID,
Since: req.Since, Since: req.Since,
Until: req.Until, Until: req.Until,
MetricType: req.MetricType, Aggregator: aggregator,
ResponseWriter: req.ResponseWriter, ResponseWriter: responseWriter,
ResultCh: resultCh, ResultCh: resultCh,
}) })
@ -784,46 +907,49 @@ func (s *Database) rangeScan(req rangeScanReq) error {
switch result.ResultCode { switch result.ResultCode {
case QueryDone: case QueryDone:
req.ResponseWriter.Close() responseWriter.Close()
case UntilFound: case UntilFound:
err := s.atree.ContinueRangeScan(atree.ContinueRangeScanReq{ err := s.atree.ContinueCollectInstantPeriods(atree.ContinueCollectInstantPeriodsReq{
MetricType: req.MetricType,
FracDigits: result.FracDigits, FracDigits: result.FracDigits,
ResponseWriter: req.ResponseWriter, Aggregator: aggregator,
LastPageNo: result.LastPageNo, ResponseWriter: responseWriter,
LastPageNo: result.PageNo,
Since: req.Since, Since: req.Since,
Until: req.Until,
}) })
s.metricRUnlock(req.MetricID) s.metricRUnlock(req.MetricID)
if err != nil { if err != nil {
reply(req.Conn, proto.ErrUnexpected) reply(conn, proto.ErrUnexpected)
} else { } else {
req.ResponseWriter.Close() responseWriter.Close()
} }
case UntilNotFound: case UntilNotFound:
err := s.atree.RangeScan(atree.RangeScanReq{ err := s.atree.FindInstantPeriods(atree.FindInstantPeriodsReq{
MetricType: req.MetricType,
FracDigits: result.FracDigits, FracDigits: result.FracDigits,
ResponseWriter: req.ResponseWriter, ResponseWriter: responseWriter,
RootPageNo: result.RootPageNo, RootPageNo: result.PageNo,
Since: req.Since, Since: req.Since,
Until: req.Until, Until: req.Until,
GroupBy: req.GroupBy,
FirstHourOfDay: req.FirstHourOfDay,
LastDayOfMonth: req.LastDayOfMonth,
}) })
s.metricRUnlock(req.MetricID) s.metricRUnlock(req.MetricID)
if err != nil { if err != nil {
reply(req.Conn, proto.ErrUnexpected) reply(conn, proto.ErrUnexpected)
} else { } else {
req.ResponseWriter.Close() responseWriter.Close()
} }
case NoMetric: case NoMetric:
reply(req.Conn, proto.ErrNoMetric) reply(conn, proto.ErrNoMetric)
case WrongMetricType: case WrongMetricType:
reply(req.Conn, proto.ErrWrongMetricType) reply(conn, proto.ErrWrongMetricType)
default: default:
diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug)
@ -831,20 +957,33 @@ func (s *Database) rangeScan(req rangeScanReq) error {
return nil return nil
} }
type fullScanReq struct { type cumulativePeriodsResult struct {
MetricID uint32 ResultCode byte
MetricType diploma.MetricType FracDigits byte
Conn io.Writer PageNo uint32
ResponseWriter diploma.MeasureConsumer
} }
func (s *Database) fullScan(req fullScanReq) error { func (s *Database) ListCumulativePeriods(conn net.Conn, req proto.ListCumulativePeriodsReq) error {
resultCh := make(chan fullScanResult, 1) resultCh := make(chan cumulativePeriodsResult, 1)
s.appendJobToWorkerQueue(tryFullScanReq{ aggregator, err := atree.NewCumulativeAggregator(atree.CumulativeAggregatorOptions{
GroupBy: req.GroupBy,
FirstHourOfDay: req.FirstHourOfDay,
LastDayOfMonth: req.LastDayOfMonth,
})
if err != nil {
reply(conn, proto.ErrUnexpected)
return nil
}
responseWriter := atree.NewCumulativePeriodsWriter(conn)
s.appendJobToWorkerQueue(tryListCumulativePeriodsReq{
MetricID: req.MetricID, MetricID: req.MetricID,
MetricType: req.MetricType, Since: req.Since,
ResponseWriter: req.ResponseWriter, Until: req.Until,
Aggregator: aggregator,
ResponseWriter: responseWriter,
ResultCh: resultCh, ResultCh: resultCh,
}) })
@ -852,27 +991,49 @@ func (s *Database) fullScan(req fullScanReq) error {
switch result.ResultCode { switch result.ResultCode {
case QueryDone: case QueryDone:
req.ResponseWriter.Close() responseWriter.Close()
case UntilFound: case UntilFound:
err := s.atree.ContinueFullScan(atree.ContinueFullScanReq{ err := s.atree.ContinueCollectCumulativePeriods(atree.ContinueCollectCumulativePeriodsReq{
MetricType: req.MetricType, FracDigits: result.FracDigits,
Aggregator: aggregator,
ResponseWriter: responseWriter,
LastPageNo: result.PageNo,
Since: req.Since,
Until: req.Until,
})
s.metricRUnlock(req.MetricID)
if err != nil {
reply(conn, proto.ErrUnexpected)
} else {
responseWriter.Close()
}
case UntilNotFound:
err := s.atree.FindCumulativePeriods(atree.FindCumulativePeriodsReq{
FracDigits: result.FracDigits, FracDigits: result.FracDigits,
ResponseWriter: req.ResponseWriter, ResponseWriter: responseWriter,
LastPageNo: result.LastPageNo, RootPageNo: result.PageNo,
Since: req.Since,
Until: req.Until,
GroupBy: req.GroupBy,
FirstHourOfDay: req.FirstHourOfDay,
LastDayOfMonth: req.LastDayOfMonth,
}) })
s.metricRUnlock(req.MetricID) s.metricRUnlock(req.MetricID)
if err != nil { if err != nil {
reply(req.Conn, proto.ErrUnexpected) reply(conn, proto.ErrUnexpected)
} else { } else {
req.ResponseWriter.Close() responseWriter.Close()
} }
case NoMetric: case NoMetric:
reply(req.Conn, proto.ErrNoMetric) reply(conn, proto.ErrNoMetric)
case WrongMetricType: case WrongMetricType:
reply(req.Conn, proto.ErrWrongMetricType) reply(conn, proto.ErrWrongMetricType)
default: default:
diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug)
@ -881,7 +1042,7 @@ func (s *Database) fullScan(req fullScanReq) error {
} }
func (s *Database) ListCurrentValues(conn net.Conn, req proto.ListCurrentValuesReq) error { func (s *Database) ListCurrentValues(conn net.Conn, req proto.ListCurrentValuesReq) error {
responseWriter := transform.NewCurrentValueWriter(conn) responseWriter := atree.NewCurrentValueWriter(conn)
defer responseWriter.Close() defer responseWriter.Close()
resultCh := make(chan struct{}) resultCh := make(chan struct{})

@ -5,32 +5,8 @@ import (
"io/fs" "io/fs"
"os" "os"
"time" "time"
"gordenko.dev/dima/diploma"
"gordenko.dev/dima/diploma/proto"
) )
func timeBoundsOfAggregation(since, until proto.TimeBound, groupBy diploma.GroupBy, firstHourOfDay int) (s time.Time, u time.Time) {
switch groupBy {
case diploma.GroupByHour, diploma.GroupByDay:
s = time.Date(since.Year, since.Month, since.Day, 0, 0, 0, 0, time.Local)
u = time.Date(until.Year, until.Month, until.Day, 23, 59, 59, 0, time.Local)
case diploma.GroupByMonth:
s = time.Date(since.Year, since.Month, 1, 0, 0, 0, 0, time.Local)
u = time.Date(until.Year, until.Month, 1, 23, 59, 59, 0, time.Local)
u = u.AddDate(0, 1, 0)
u = u.AddDate(0, 0, -1)
}
if firstHourOfDay > 0 {
duration := time.Duration(firstHourOfDay) * time.Hour
s = s.Add(duration)
u = u.Add(duration)
}
return
}
func isFileExist(fileName string) (bool, error) { func isFileExist(fileName string) (bool, error) {
_, err := os.Stat(fileName) _, err := os.Stat(fileName)
if err != nil { if err != nil {
@ -65,3 +41,10 @@ func (s *Database) metricRUnlock(metricID uint32) {
default: default:
} }
} }
func correctToFHD(since, until uint32, firstHourOfDay int) (uint32, uint32) {
duration := time.Duration(firstHourOfDay) * time.Hour
since = uint32(time.Unix(int64(since), 0).Add(duration).Unix())
until = uint32(time.Unix(int64(until), 0).Add(duration).Unix())
return since, until
}

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

@ -46,20 +46,6 @@ type ValueDecompressor interface {
NextValue() (float64, bool) NextValue() (float64, bool)
} }
type MeasureConsumer interface {
Feed(uint32, float64)
FeedNoSend(uint32, float64)
Close() error
}
type WorkerMeasureConsumer interface {
FeedNoSend(uint32, float64)
}
type AtreeMeasureConsumer interface {
Feed(uint32, float64)
}
type AbortCode int type AbortCode int
const ( const (

@ -333,22 +333,11 @@ func execQuery(conn *client.Connection, queryGenerator *RandomQueryGenerator, st
} }
case listInstantPeriods: case listInstantPeriods:
since := time.Unix(int64(recipe.Since), 0)
until := time.Unix(int64(recipe.Until), 0)
//
t1 := time.Now() t1 := time.Now()
_, err := conn.ListInstantPeriods(proto.ListInstantPeriodsReq{ _, err := conn.ListInstantPeriods(proto.ListInstantPeriodsReq{
MetricID: recipe.MetricID, MetricID: recipe.MetricID,
Since: proto.TimeBound{ Since: recipe.Since,
Year: since.Year(), Until: recipe.Until,
Month: since.Month(),
Day: since.Day(),
},
Until: proto.TimeBound{
Year: until.Year(),
Month: until.Month(),
Day: until.Day(),
},
GroupBy: recipe.GroupBy, GroupBy: recipe.GroupBy,
AggregateFuncs: diploma.AggregateMin | diploma.AggregateMax | diploma.AggregateAvg, AggregateFuncs: diploma.AggregateMin | diploma.AggregateMax | diploma.AggregateAvg,
}) })
@ -365,22 +354,11 @@ func execQuery(conn *client.Connection, queryGenerator *RandomQueryGenerator, st
} }
case listCumulativePeriods: case listCumulativePeriods:
since := time.Unix(int64(recipe.Since), 0)
until := time.Unix(int64(recipe.Until), 0)
//
t1 := time.Now() t1 := time.Now()
_, err := conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{ _, err := conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{
MetricID: recipe.MetricID, MetricID: recipe.MetricID,
Since: proto.TimeBound{ Since: recipe.Since,
Year: since.Year(), Until: recipe.Until,
Month: since.Month(),
Day: since.Day(),
},
Until: proto.TimeBound{
Year: until.Year(),
Month: until.Month(),
Day: until.Day(),
},
GroupBy: recipe.GroupBy, GroupBy: recipe.GroupBy,
}) })
elapsedTime = time.Since(t1) elapsedTime = time.Since(t1)

@ -4,12 +4,12 @@ import (
"math/rand" "math/rand"
"time" "time"
"gordenko.dev/dima/diploma/proto" "gordenko.dev/dima/diploma/client"
) )
func GenerateCumulativeMeasures(days int) []proto.Measure { func GenerateCumulativeMeasures(days int) []client.Measure {
var ( var (
measures []proto.Measure measures []client.Measure
minutes = []int{14, 29, 44, 59} minutes = []int{14, 29, 44, 59}
hoursPerDay = 24 hoursPerDay = 24
totalHours = days * hoursPerDay totalHours = days * hoursPerDay
@ -31,7 +31,7 @@ func GenerateCumulativeMeasures(days int) []proto.Measure {
time.Local, time.Local,
) )
measure := proto.Measure{ measure := client.Measure{
Timestamp: uint32(measureTime.Unix()), Timestamp: uint32(measureTime.Unix()),
Value: totalValue, Value: totalValue,
} }
@ -43,9 +43,9 @@ func GenerateCumulativeMeasures(days int) []proto.Measure {
return measures return measures
} }
func GenerateInstantMeasures(days int, baseValue float64) []proto.Measure { func GenerateInstantMeasures(days int, baseValue float64) []client.Measure {
var ( var (
measures []proto.Measure measures []client.Measure
minutes = []int{14, 29, 44, 59} minutes = []int{14, 29, 44, 59}
hoursPerDay = 24 hoursPerDay = 24
totalHours = days * hoursPerDay totalHours = days * hoursPerDay
@ -70,7 +70,7 @@ func GenerateInstantMeasures(days int, baseValue float64) []proto.Measure {
fluctuation := baseValue * 0.1 fluctuation := baseValue * 0.1
value := baseValue + (rand.Float64()*2-1)*fluctuation value := baseValue + (rand.Float64()*2-1)*fluctuation
measure := proto.Measure{ measure := client.Measure{
Timestamp: uint32(measureTime.Unix()), Timestamp: uint32(measureTime.Unix()),
Value: value, Value: value,
} }

@ -14,10 +14,8 @@ func sendRequests(conn *client.Connection) {
var ( var (
instantMetricID uint32 = 10000 instantMetricID uint32 = 10000
cumulativeMetricID uint32 = 10001 cumulativeMetricID uint32 = 10001
fracDigits int = 2 fracDigits byte = 2
err error err error
seriesInDays = 62
) )
conn.DeleteMetric(instantMetricID) conn.DeleteMetric(instantMetricID)
@ -25,7 +23,7 @@ func sendRequests(conn *client.Connection) {
// ADD INSTANT METRIC // ADD INSTANT METRIC
err = conn.AddMetric(proto.AddMetricReq{ err = conn.AddMetric(client.Metric{
MetricID: instantMetricID, MetricID: instantMetricID,
MetricType: diploma.Instant, MetricType: diploma.Instant,
FracDigits: fracDigits, FracDigits: fracDigits,
@ -53,9 +51,9 @@ GetMetric:
// APPEND MEASURES // APPEND MEASURES
instantMeasures := GenerateInstantMeasures(seriesInDays, 220) instantMeasures := GenerateInstantMeasures(62, 220)
err = conn.AppendMeasures(proto.AppendMeasuresReq{ err = conn.AppendMeasures(client.AppendMeasuresReq{
MetricID: instantMetricID, MetricID: instantMetricID,
Measures: instantMeasures, Measures: instantMeasures,
}) })
@ -80,10 +78,10 @@ GetMetric:
if err != nil { if err != nil {
log.Fatalf("conn.ListInstantMeasures: %s\n", err) log.Fatalf("conn.ListInstantMeasures: %s\n", err)
} else { } else {
fmt.Printf("\nListInstantMeasures %s %s:\n", fmt.Printf("\nListInstantMeasures %s - %s:\n",
formatTime(since), formatTime(until)) formatTime(uint32(since.Unix())), formatTime(uint32(until.Unix())))
for _, item := range instantList { for _, item := range instantList {
fmt.Printf(" %s => %.2f\n", formatTimestamp(item.Timestamp), item.Value) fmt.Printf(" %s => %.2f\n", formatTime(item.Timestamp), item.Value)
} }
} }
@ -95,102 +93,70 @@ GetMetric:
} else { } else {
fmt.Printf("\nListAllInstantMeasures (last 15 items):\n") fmt.Printf("\nListAllInstantMeasures (last 15 items):\n")
for _, item := range instantList[:15] { for _, item := range instantList[:15] {
fmt.Printf(" %s => %.2f\n", formatTimestamp(item.Timestamp), item.Value) fmt.Printf(" %s => %.2f\n", formatTime(item.Timestamp), item.Value)
} }
} }
// LIST INSTANT PERIODS (group by hour) // LIST INSTANT PERIODS (group by hour)
until = time.Unix(int64(lastTimestamp), 0) until = time.Unix(int64(lastTimestamp+1), 0)
since = until.Add(-24 * time.Hour) since = until.Add(-24 * time.Hour)
instantPeriods, err := conn.ListInstantPeriods(proto.ListInstantPeriodsReq{ instantPeriods, err := conn.ListInstantPeriods(proto.ListInstantPeriodsReq{
MetricID: instantMetricID, MetricID: instantMetricID,
Since: proto.TimeBound{ Since: uint32(since.Unix()),
Year: since.Year(), Until: uint32(until.Unix()),
Month: since.Month(),
Day: since.Day(),
},
Until: proto.TimeBound{
Year: until.Year(),
Month: until.Month(),
Day: until.Day(),
},
GroupBy: diploma.GroupByHour, GroupBy: diploma.GroupByHour,
AggregateFuncs: diploma.AggregateMin | diploma.AggregateMax | diploma.AggregateAvg, AggregateFuncs: diploma.AggregateMin | diploma.AggregateMax | diploma.AggregateAvg,
}) })
if err != nil { if err != nil {
log.Fatalf("conn.ListInstantPeriods: %s\n", err) log.Fatalf("conn.ListInstantPeriods: %s\n", err)
} else { } else {
fmt.Printf("\nListInstantPeriods (%s − %s, group by day):\n", fmt.Printf("\nListInstantPeriods (1 day, group by hour):\n")
formatDate(since), formatDate(until))
for _, item := range instantPeriods { for _, item := range instantPeriods {
fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n", fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n", formatHourPeriod(item.Period), item.Min, item.Max, item.Avg)
formatHourPeriod(item.Period), item.Min, item.Max, item.Avg)
} }
} }
// LIST INSTANT PERIODS (group by day) // LIST INSTANT PERIODS (group by day)
until = time.Unix(int64(lastTimestamp), 0) until = time.Unix(int64(lastTimestamp+1), 0)
since = until.AddDate(0, 0, -6) since = until.AddDate(0, 0, -7)
instantPeriods, err = conn.ListInstantPeriods(proto.ListInstantPeriodsReq{ instantPeriods, err = conn.ListInstantPeriods(proto.ListInstantPeriodsReq{
MetricID: instantMetricID, MetricID: instantMetricID,
Since: proto.TimeBound{ Since: uint32(since.Unix()),
Year: since.Year(), Until: uint32(until.Unix()),
Month: since.Month(),
Day: since.Day(),
},
Until: proto.TimeBound{
Year: until.Year(),
Month: until.Month(),
Day: until.Day(),
},
GroupBy: diploma.GroupByDay, GroupBy: diploma.GroupByDay,
AggregateFuncs: diploma.AggregateMin | diploma.AggregateMax | diploma.AggregateAvg, AggregateFuncs: diploma.AggregateMin | diploma.AggregateMax | diploma.AggregateAvg,
}) })
if err != nil { if err != nil {
log.Fatalf("conn.ListInstantPeriods: %s\n", err) log.Fatalf("conn.ListInstantPeriods: %s\n", err)
} else { } else {
fmt.Printf("\nListInstantPeriods (%s − %s, group by day):\n", fmt.Printf("\nListInstantPeriods (7 days, group by day):\n")
formatDate(since), formatDate(until))
for _, item := range instantPeriods { for _, item := range instantPeriods {
fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n", fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n", formatDayPeriod(item.Period), item.Min, item.Max, item.Avg)
formatDayPeriod(item.Period), item.Min, item.Max, item.Avg)
} }
} }
// LIST INSTANT PERIODS (group by month) // LIST INSTANT PERIODS (group by month)
until = time.Unix(int64(lastTimestamp), 0) until = time.Unix(int64(lastTimestamp+1), 0)
since = until.AddDate(0, 0, -seriesInDays) since = until.AddDate(0, 0, -62)
instantPeriods, err = conn.ListInstantPeriods(proto.ListInstantPeriodsReq{ instantPeriods, err = conn.ListInstantPeriods(proto.ListInstantPeriodsReq{
MetricID: instantMetricID, MetricID: instantMetricID,
Since: proto.TimeBound{ Since: uint32(since.Unix()),
Year: since.Year(), Until: uint32(until.Unix()),
Month: since.Month(),
Day: since.Day(),
},
Until: proto.TimeBound{
Year: until.Year(),
Month: until.Month(),
Day: until.Day(),
},
GroupBy: diploma.GroupByMonth, GroupBy: diploma.GroupByMonth,
AggregateFuncs: diploma.AggregateMin | diploma.AggregateMax | diploma.AggregateAvg, AggregateFuncs: diploma.AggregateMin | diploma.AggregateMax | diploma.AggregateAvg,
}) })
if err != nil { if err != nil {
log.Fatalf("conn.ListInstantPeriods: %s\n", err) log.Fatalf("conn.ListInstantPeriods: %s\n", err)
} else { } else {
fmt.Printf("\nListInstantPeriods (%s − %s, group by day):\n", fmt.Printf("\nListInstantPeriods (62 days, group by month):\n")
formatMonth(since), formatMonth(until))
for _, item := range instantPeriods { for _, item := range instantPeriods {
fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n", fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n", formatMonthPeriod(item.Period), item.Min, item.Max, item.Avg)
formatMonthPeriod(item.Period), item.Min, item.Max, item.Avg)
} }
} }
@ -216,7 +182,7 @@ GetMetric:
// ADD CUMULATIVE METRIC // ADD CUMULATIVE METRIC
err = conn.AddMetric(proto.AddMetricReq{ err = conn.AddMetric(client.Metric{
MetricID: cumulativeMetricID, MetricID: cumulativeMetricID,
MetricType: diploma.Cumulative, MetricType: diploma.Cumulative,
FracDigits: fracDigits, FracDigits: fracDigits,
@ -244,9 +210,9 @@ GetMetric:
// APPEND MEASURES // APPEND MEASURES
cumulativeMeasures := GenerateCumulativeMeasures(seriesInDays) cumulativeMeasures := GenerateCumulativeMeasures(62)
err = conn.AppendMeasures(proto.AppendMeasuresReq{ err = conn.AppendMeasures(client.AppendMeasuresReq{
MetricID: cumulativeMetricID, MetricID: cumulativeMetricID,
Measures: cumulativeMeasures, Measures: cumulativeMeasures,
}) })
@ -271,12 +237,11 @@ GetMetric:
if err != nil { if err != nil {
log.Fatalf("conn.ListCumulativeMeasures: %s\n", err) log.Fatalf("conn.ListCumulativeMeasures: %s\n", err)
} else { } else {
fmt.Printf("\nListCumulativeMeasures %s %s:\n", fmt.Printf("\nListCumulativeMeasures %s - %s:\n",
formatTime(since), formatTime(until)) formatTime(uint32(since.Unix())), formatTime(uint32(until.Unix())))
for _, item := range cumulativeList { for _, item := range cumulativeList {
fmt.Printf(" %s => %.2f (%.2f)\n", fmt.Printf(" %s => %.2f\n", formatTime(item.Timestamp), item.Value)
formatTimestamp(item.Timestamp), item.Value, item.Total)
} }
} }
@ -288,101 +253,67 @@ GetMetric:
} else { } else {
fmt.Printf("\nListAllCumulativeMeasures (last 15 items):\n") fmt.Printf("\nListAllCumulativeMeasures (last 15 items):\n")
for _, item := range cumulativeList[:15] { for _, item := range cumulativeList[:15] {
fmt.Printf(" %s => %.2f (%.2f)\n", fmt.Printf(" %s => %.2f\n", formatTime(item.Timestamp), item.Value)
formatTimestamp(item.Timestamp), item.Value, item.Total)
} }
} }
//LIST CUMULATIVE PERIODS (group by hour) // LIST CUMULATIVE PERIODS (group by hour)
until = time.Unix(int64(lastTimestamp), 0) until = time.Unix(int64(lastTimestamp+1), 0)
since = until.Add(-24 * time.Hour) since = until.Add(-24 * time.Hour)
cumulativePeriods, err := conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{ cumulativePeriods, err := conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{
MetricID: cumulativeMetricID, MetricID: cumulativeMetricID,
Since: proto.TimeBound{ Since: uint32(since.Unix()),
Year: since.Year(), Until: uint32(until.Unix()),
Month: since.Month(),
Day: since.Day(),
},
Until: proto.TimeBound{
Year: until.Year(),
Month: until.Month(),
Day: until.Day(),
},
GroupBy: diploma.GroupByHour, GroupBy: diploma.GroupByHour,
}) })
if err != nil { if err != nil {
log.Fatalf("conn.ListCumulativePeriods: %s\n", err) log.Fatalf("conn.ListCumulativePeriods: %s\n", err)
} else { } else {
fmt.Printf("\nListCumulativePeriods (%s − %s, group by hour):\n", fmt.Printf("\nListCumulativePeriods (1 day, group by hour):\n")
formatDate(since), formatDate(until))
for _, item := range cumulativePeriods { for _, item := range cumulativePeriods {
fmt.Printf(" %s => %.2f (%.2f)\n", fmt.Printf(" %s => end value %.2f, total %.2f\n", formatHourPeriod(item.Period), item.EndValue, item.Total)
formatHourPeriod(item.Period), item.EndValue, item.EndValue-item.StartValue)
} }
} }
// LIST CUMULATIVE PERIODS (group by day) // LIST CUMULATIVE PERIODS (group by day)
until = time.Unix(int64(lastTimestamp), 0) until = time.Unix(int64(lastTimestamp+1), 0)
since = until.AddDate(0, 0, -6) since = until.AddDate(0, 0, -7)
cumulativePeriods, err = conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{ cumulativePeriods, err = conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{
MetricID: cumulativeMetricID, MetricID: cumulativeMetricID,
Since: proto.TimeBound{ Since: uint32(since.Unix()),
Year: since.Year(), Until: uint32(until.Unix()),
Month: since.Month(),
Day: since.Day(),
},
Until: proto.TimeBound{
Year: until.Year(),
Month: until.Month(),
Day: until.Day(),
},
GroupBy: diploma.GroupByDay, GroupBy: diploma.GroupByDay,
}) })
if err != nil { if err != nil {
log.Fatalf("conn.ListCumulativePeriods: %s\n", err) log.Fatalf("conn.ListCumulativePeriods: %s\n", err)
} else { } else {
fmt.Printf("\nListCumulativePeriods (%s − %s, group by day):\n", fmt.Printf("\nListCumulativePeriods (7 days, group by day):\n")
formatDate(since), formatDate(until))
for _, item := range cumulativePeriods { for _, item := range cumulativePeriods {
fmt.Printf(" %s => %.2f (%.2f)\n", fmt.Printf(" %s => end value %.2f, total %.2f\n", formatDayPeriod(item.Period), item.EndValue, item.Total)
formatDayPeriod(item.Period), item.EndValue, item.EndValue-item.StartValue)
} }
} }
// LIST CUMULATIVE PERIODS (group by day) // LIST CUMULATIVE PERIODS (group by day)
until = time.Unix(int64(lastTimestamp), 0) until = time.Unix(int64(lastTimestamp+1), 0)
since = until.AddDate(0, 0, -seriesInDays) since = until.AddDate(0, 0, -62)
cumulativePeriods, err = conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{ cumulativePeriods, err = conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{
MetricID: cumulativeMetricID, MetricID: cumulativeMetricID,
Since: proto.TimeBound{ Since: uint32(since.Unix()),
Year: since.Year(), Until: uint32(until.Unix()),
Month: since.Month(),
Day: since.Day(),
},
Until: proto.TimeBound{
Year: until.Year(),
Month: until.Month(),
Day: until.Day(),
},
GroupBy: diploma.GroupByMonth, GroupBy: diploma.GroupByMonth,
}) })
if err != nil { if err != nil {
log.Fatalf("conn.ListCumulativePeriods: %s\n", err) log.Fatalf("conn.ListCumulativePeriods: %s\n", err)
} else { } else {
fmt.Printf("\nListCumulativePeriods (%s − %s, group by month):\n", fmt.Printf("\nListCumulativePeriods (62 days, group by month):\n")
formatMonth(since), formatMonth(until))
for _, item := range cumulativePeriods { for _, item := range cumulativePeriods {
fmt.Printf(" %s => %.2f (%.2f)\n", fmt.Printf(" %s => end value %.2f, total %.2f\n", formatMonthPeriod(item.Period), item.EndValue, item.Total)
formatMonthPeriod(item.Period), item.EndValue, item.EndValue-item.StartValue)
} }
} }
@ -407,21 +338,11 @@ GetMetric:
} }
} }
func formatMonth(tm time.Time) string { const datetimeLayout = "2006-01-02 15:04:05"
return tm.Format("2006-01")
}
func formatDate(tm time.Time) string {
return tm.Format("2006-01-02")
}
func formatTime(tm time.Time) string {
return tm.Format("2006-01-02 15:04:05")
}
func formatTimestamp(timestamp uint32) string { func formatTime(timestamp uint32) string {
tm := time.Unix(int64(timestamp), 0) tm := time.Unix(int64(timestamp), 0)
return tm.Format("2006-01-02 15:04:05") return tm.Format(datetimeLayout)
} }
func formatHourPeriod(period uint32) string { func formatHourPeriod(period uint32) string {

@ -5,7 +5,6 @@ go 1.24.0
require ( require (
github.com/RoaringBitmap/roaring/v2 v2.5.0 github.com/RoaringBitmap/roaring/v2 v2.5.0
gopkg.in/ini.v1 v1.67.0 gopkg.in/ini.v1 v1.67.0
gordenko.dev/dima/pretty v0.0.0-20221225212746-0c27d8c0ac69
) )
require ( require (

@ -18,5 +18,3 @@ gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA= gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gordenko.dev/dima/pretty v0.0.0-20221225212746-0c27d8c0ac69 h1:nyJ3mzTQ46yUeMZCdLyYcs7B5JCS54c67v84miyhq2E=
gordenko.dev/dima/pretty v0.0.0-20221225212746-0c27d8c0ac69/go.mod h1:AxgKDktpqBVyIOhIcP+nlCpK+EsJyjN5kPdqyd8euVU=

Binary file not shown.

Binary file not shown.

@ -2,9 +2,8 @@ package proto
import ( import (
"fmt" "fmt"
"time"
"gordenko.dev/dima/diploma" octopus "gordenko.dev/dima/diploma"
"gordenko.dev/dima/diploma/bin" "gordenko.dev/dima/diploma/bin"
"gordenko.dev/dima/diploma/bufreader" "gordenko.dev/dima/diploma/bufreader"
) )
@ -14,14 +13,13 @@ const (
TypeListCurrentValues byte = 2 TypeListCurrentValues byte = 2
TypeListInstantMeasures byte = 3 TypeListInstantMeasures byte = 3
TypeListCumulativeMeasures byte = 33 TypeListCumulativeMeasures byte = 33
TypeListInstantPeriods byte = 4 TypeListInstantPeriods byte = 4
TypeListCumulativePeriods byte = 44 TypeListCumulativePeriods byte = 44
TypeGetMetric byte = 5 TypeGetMetric byte = 5
TypeAddMetric byte = 6 TypeAddMetric byte = 6
TypeListAllInstantMeasures byte = 8 TypeListAllInstantMeasures byte = 8
TypeListAllCumulativeMeasures byte = 88 TypeListAllCumulativeMeasures byte = 88
TypeRangeTotal byte = 9
TypeAppendMeasure byte = 10 TypeAppendMeasure byte = 10
TypeAppendMeasures byte = 11 TypeAppendMeasures byte = 11
TypeDeleteMetric byte = 12 TypeDeleteMetric byte = 12
@ -68,144 +66,164 @@ func ErrorCodeToText(code uint16) string {
} }
} }
// common type GetMetricReq struct {
MetricID uint32
}
type Metric struct { type ListCurrentValuesReq struct {
MetricIDs []uint32
}
type AddMetricReq struct {
MetricID uint32 MetricID uint32
MetricType diploma.MetricType MetricType octopus.MetricType
FracDigits int FracDigits int
} }
type AppendError struct { type UpdateMetricReq struct {
MetricID uint32
MetricType octopus.MetricType
FracDigits int
}
type DeleteMetricReq struct {
MetricID uint32 MetricID uint32
ErrorCode uint16
} }
type TimeBound struct { type DeleteMeasuresReq struct {
Year int MetricID uint32
Month time.Month Since uint32 // timestamp (optional)
Day int
} }
type CumulativeMeasure struct { type AppendMeasureReq struct {
MetricID uint32
Timestamp uint32 Timestamp uint32
Value float64 Value float64
Total float64
} }
type CumulativePeriod struct { type ListAllInstantMetricMeasuresReq struct {
Period uint32 MetricID uint32
Start uint32
End uint32
StartValue float64
EndValue float64
} }
type InstantMeasure struct { type ListAllCumulativeMeasuresReq struct {
Timestamp uint32 MetricID uint32
Value float64
} }
type InstantPeriod struct { type ListInstantMeasuresReq struct {
Period uint32 MetricID uint32
Start uint32 Since uint32
End uint32 Until uint32
Min float64 FirstHourOfDay int
Max float64
Avg float64
} }
type CurrentValue struct { type ListCumulativeMeasuresReq struct {
MetricID uint32 MetricID uint32
Timestamp uint32 Since uint32
Value float64 Until uint32
FirstHourOfDay int
} }
// API reqs type ListInstantPeriodsReq struct {
MetricID uint32
Since uint32
Until uint32
GroupBy octopus.GroupBy
AggregateFuncs byte
FirstHourOfDay int
LastDayOfMonth int
}
type GetMetricReq struct { type ListCumulativePeriodsReq struct {
MetricID uint32 MetricID uint32
Since uint32
Until uint32
GroupBy octopus.GroupBy
FirstHourOfDay int
LastDayOfMonth int
} }
func ReadGetMetricReq(r *bufreader.BufferedReader) (m GetMetricReq, err error) { type Metric struct {
m.MetricID, err = bin.ReadUint32(r) MetricID uint32
if err != nil { MetricType octopus.MetricType
err = fmt.Errorf("read req: %s", err) FracDigits int
return
}
return
} }
type ListCurrentValuesReq struct { type RangeTotalReq struct {
MetricIDs []uint32 MetricID uint32
Since uint32
Until uint32
} }
func ReadListCurrentValuesReq(r *bufreader.BufferedReader) (m ListCurrentValuesReq, err error) { func PackAddMetricReq(req AddMetricReq) []byte {
qty, err := bin.ReadUint16(r) arr := []byte{
if err != nil { TypeAddMetric,
err = fmt.Errorf("read req: %s", err) 0, 0, 0, 0, //
return byte(req.MetricType),
byte(req.FracDigits),
} }
bin.PutUint32(arr[1:], req.MetricID)
return arr
}
for i := range int(qty) { func PackDeleteMetricReq(req DeleteMetricReq) []byte {
var metricID uint32 arr := []byte{
metricID, err = bin.ReadUint32(r) TypeDeleteMetric,
if err != nil { 0, 0, 0, 0, // metricID
err = fmt.Errorf("read metricID (#%d): %s", i, err)
return
}
m.MetricIDs = append(m.MetricIDs, metricID)
} }
return bin.PutUint32(arr[1:], req.MetricID)
return arr
} }
type AddMetricReq struct { func PackAppendMeasure(req AppendMeasureReq) []byte {
MetricID uint32 arr := []byte{
MetricType diploma.MetricType TypeAppendMeasure,
FracDigits int 0, 0, 0, 0, // metricID
0, 0, 0, 0, // timestamp
0, 0, 0, 0, 0, 0, 0, 0, // value
}
bin.PutUint32(arr[1:], req.MetricID)
bin.PutUint32(arr[5:], uint32(req.Timestamp))
bin.PutFloat64(arr[9:], req.Value)
return arr
} }
func ReadAddMetricReq(r *bufreader.BufferedReader) (m AddMetricReq, err error) { func PackDeleteMeasuresReq(req DeleteMeasuresReq) []byte {
arr, err := r.ReadN(6) arr := []byte{
if err != nil { TypeDeleteMeasures,
err = fmt.Errorf("read req: %s", err) 0, 0, 0, 0, // metricID
return 0, 0, 0, 0, // since
} }
return UnpackAddMetricReq(arr), nil bin.PutUint32(arr[1:], req.MetricID)
bin.PutUint32(arr[5:], uint32(req.Since))
return arr
} }
// UNPACK reqs
func UnpackAddMetricReq(arr []byte) (m AddMetricReq) { func UnpackAddMetricReq(arr []byte) (m AddMetricReq) {
m.MetricID = bin.GetUint32(arr) m.MetricID = bin.GetUint32(arr)
m.MetricType = diploma.MetricType(arr[4]) m.MetricType = octopus.MetricType(arr[4])
m.FracDigits = int(arr[5]) m.FracDigits = int(arr[5])
return return
} }
type DeleteMetricReq struct { func UnpackUpdateMetricReq(arr []byte) (m UpdateMetricReq) {
MetricID uint32 m.MetricID = bin.GetUint32(arr)
} m.MetricType = octopus.MetricType(arr[4])
m.FracDigits = int(arr[5])
func ReadDeleteMetricReq(r *bufreader.BufferedReader) (m DeleteMetricReq, err error) {
m.MetricID, err = bin.ReadUint32(r)
if err != nil {
err = fmt.Errorf("read req: %s", err)
return
}
return return
} }
type DeleteMeasuresReq struct { func UnpackDeleteMetricReq(arr []byte) (m DeleteMetricReq) {
MetricID uint32 m.MetricID = bin.GetUint32(arr)
Since uint32 // timestamp (optional) return
} }
func ReadDeleteMeasuresReq(r *bufreader.BufferedReader) (m DeleteMeasuresReq, err error) { func UnpackAppendMeasureReq(arr []byte) (m AppendMeasureReq) {
arr, err := r.ReadN(8) m.MetricID = bin.GetUint32(arr)
if err != nil { m.Timestamp = bin.GetUint32(arr[4:])
err = fmt.Errorf("read req: %s", err) m.Value = bin.GetFloat64(arr[8:])
return return
}
return UnpackDeleteMeasuresReq(arr), nil
} }
func UnpackDeleteMeasuresReq(arr []byte) (m DeleteMeasuresReq) { func UnpackDeleteMeasuresReq(arr []byte) (m DeleteMeasuresReq) {
@ -214,93 +232,104 @@ func UnpackDeleteMeasuresReq(arr []byte) (m DeleteMeasuresReq) {
return return
} }
type AppendMeasureReq struct { func UnpackListInstantMeasuresReq(arr []byte) (m ListInstantMeasuresReq) {
MetricID uint32 m.MetricID = bin.GetUint32(arr[0:])
Timestamp uint32 m.Since = bin.GetUint32(arr[4:])
Value float64 m.Until = bin.GetUint32(arr[8:])
m.FirstHourOfDay = int(arr[12])
return
} }
func ReadAppendMeasureReq(r *bufreader.BufferedReader) (m AppendMeasureReq, err error) { func UnpackListCumulativeMeasuresReq(arr []byte) (m ListCumulativeMeasuresReq) {
arr, err := r.ReadN(16) m.MetricID = bin.GetUint32(arr)
if err != nil { m.Since = bin.GetUint32(arr[4:])
err = fmt.Errorf("read req: %s", err) m.Until = bin.GetUint32(arr[8:])
m.FirstHourOfDay = int(arr[12])
return return
}
return UnpackAppendMeasureReq(arr), nil
} }
func UnpackAppendMeasureReq(arr []byte) (m AppendMeasureReq) { func UnpackListInstantPeriodsReq(arr []byte) (m ListInstantPeriodsReq) {
m.MetricID = bin.GetUint32(arr) m.MetricID = bin.GetUint32(arr)
m.Timestamp = bin.GetUint32(arr[4:]) m.Since = bin.GetUint32(arr[4:])
m.Value = bin.GetFloat64(arr[8:]) m.Until = bin.GetUint32(arr[8:])
m.GroupBy = octopus.GroupBy(arr[12])
m.AggregateFuncs = arr[13]
m.FirstHourOfDay = int(arr[14])
m.LastDayOfMonth = int(arr[15])
return return
} }
type AppendMeasuresReq struct { func UnpackListCumulativePeriodsReq(arr []byte) (m ListCumulativePeriodsReq) {
MetricID uint32 m.MetricID = bin.GetUint32(arr[0:])
Measures []Measure m.Since = bin.GetUint32(arr[4:])
m.Until = bin.GetUint32(arr[8:])
m.GroupBy = octopus.GroupBy(arr[12])
m.FirstHourOfDay = int(arr[13])
m.LastDayOfMonth = int(arr[14])
return
} }
type Measure struct { func UnpackRangeTotalReq(arr []byte) (m RangeTotalReq) {
Timestamp uint32 m.MetricID = bin.GetUint32(arr)
Value float64 m.Since = bin.GetUint32(arr[4:])
m.Until = bin.GetUint32(arr[8:])
return
} }
func ReadAppendMeasuresReq(r *bufreader.BufferedReader) (m AppendMeasuresReq, err error) { // READ reqs
prefix, err := bin.ReadN(r, 6) // metricID + measures qty
func ReadGetMetricReq(r *bufreader.BufferedReader) (m GetMetricReq, err error) {
m.MetricID, err = bin.ReadUint32(r)
if err != nil { if err != nil {
err = fmt.Errorf("read prefix: %s", err) err = fmt.Errorf("read req: %s", err)
return return
} }
return
}
m.MetricID = bin.GetUint32(prefix[0:]) func ReadAddMetricReq(r *bufreader.BufferedReader) (m AddMetricReq, err error) {
qty := bin.GetUint16(prefix[4:]) arr, err := r.ReadN(6)
for i := range int(qty) {
var measure Measure
measure.Timestamp, err = bin.ReadUint32(r)
if err != nil { if err != nil {
err = fmt.Errorf("read timestamp (#%d): %s", i, err) err = fmt.Errorf("read req: %s", err)
return return
} }
measure.Value, err = bin.ReadFloat64(r) return UnpackAddMetricReq(arr), nil
}
func ReadUpdateMetricReq(r *bufreader.BufferedReader) (m UpdateMetricReq, err error) {
arr, err := r.ReadN(6)
if err != nil { if err != nil {
err = fmt.Errorf("read value (#%d): %s", i, err) err = fmt.Errorf("read req: %s", err)
return return
} }
m.Measures = append(m.Measures, measure) return UnpackUpdateMetricReq(arr), nil
}
return
}
type MetricMeasure struct {
MetricID uint32
Timestamp uint32
Value float64
} }
func ReadAppendMeasurePerMetricReq(r *bufreader.BufferedReader) (measures []MetricMeasure, err error) { func ReadDeleteMetricReq(r *bufreader.BufferedReader) (m DeleteMetricReq, err error) {
qty, err := bin.ReadUint16(r) m.MetricID, err = bin.ReadUint32(r)
if err != nil { if err != nil {
err = fmt.Errorf("read req: %s", err)
return return
} }
var tmp = make([]byte, 16) return
for range int(qty) { }
err = bin.ReadNInto(r, tmp)
func ReadAppendMeasureReq(r *bufreader.BufferedReader) (m AppendMeasureReq, err error) {
arr, err := r.ReadN(16)
if err != nil { if err != nil {
err = fmt.Errorf("read req: %s", err)
return return
} }
measures = append(measures, MetricMeasure{ return UnpackAppendMeasureReq(arr), nil
MetricID: bin.GetUint32(tmp[0:]),
Timestamp: bin.GetUint32(tmp[4:]),
Value: bin.GetFloat64(tmp[8:]),
})
}
return
} }
type ListAllInstantMetricMeasuresReq struct { func ReadDeleteMeasuresReq(r *bufreader.BufferedReader) (m DeleteMeasuresReq, err error) {
MetricID uint32 arr, err := r.ReadN(8)
if err != nil {
err = fmt.Errorf("read req: %s", err)
return
}
return UnpackDeleteMeasuresReq(arr), nil
} }
func ReadListAllInstantMeasuresReq(r *bufreader.BufferedReader) (m ListAllInstantMetricMeasuresReq, err error) { func ReadListAllInstantMeasuresReq(r *bufreader.BufferedReader) (m ListAllInstantMetricMeasuresReq, err error) {
@ -312,10 +341,6 @@ func ReadListAllInstantMeasuresReq(r *bufreader.BufferedReader) (m ListAllInstan
return return
} }
type ListAllCumulativeMeasuresReq struct {
MetricID uint32
}
func ReadListAllCumulativeMeasuresReq(r *bufreader.BufferedReader) (m ListAllCumulativeMeasuresReq, err error) { func ReadListAllCumulativeMeasuresReq(r *bufreader.BufferedReader) (m ListAllCumulativeMeasuresReq, err error) {
m.MetricID, err = bin.ReadUint32(r) m.MetricID, err = bin.ReadUint32(r)
if err != nil { if err != nil {
@ -325,14 +350,8 @@ func ReadListAllCumulativeMeasuresReq(r *bufreader.BufferedReader) (m ListAllCum
return return
} }
type ListInstantMeasuresReq struct {
MetricID uint32
Since uint32
Until uint32
}
func ReadListInstantMeasuresReq(r *bufreader.BufferedReader) (m ListInstantMeasuresReq, err error) { func ReadListInstantMeasuresReq(r *bufreader.BufferedReader) (m ListInstantMeasuresReq, err error) {
arr, err := r.ReadN(12) arr, err := r.ReadN(13)
if err != nil { if err != nil {
err = fmt.Errorf("read req: %s", err) err = fmt.Errorf("read req: %s", err)
return return
@ -340,21 +359,8 @@ func ReadListInstantMeasuresReq(r *bufreader.BufferedReader) (m ListInstantMeasu
return UnpackListInstantMeasuresReq(arr), nil return UnpackListInstantMeasuresReq(arr), nil
} }
func UnpackListInstantMeasuresReq(arr []byte) (m ListInstantMeasuresReq) {
m.MetricID = bin.GetUint32(arr[0:])
m.Since = bin.GetUint32(arr[4:])
m.Until = bin.GetUint32(arr[8:])
return
}
type ListCumulativeMeasuresReq struct {
MetricID uint32
Since uint32
Until uint32
}
func ReadListCumulativeMeasuresReq(r *bufreader.BufferedReader) (m ListCumulativeMeasuresReq, err error) { func ReadListCumulativeMeasuresReq(r *bufreader.BufferedReader) (m ListCumulativeMeasuresReq, err error) {
arr, err := r.ReadN(12) arr, err := r.ReadN(13)
if err != nil { if err != nil {
err = fmt.Errorf("read req: %s", err) err = fmt.Errorf("read req: %s", err)
return return
@ -362,79 +368,106 @@ func ReadListCumulativeMeasuresReq(r *bufreader.BufferedReader) (m ListCumulativ
return UnpackListCumulativeMeasuresReq(arr), nil return UnpackListCumulativeMeasuresReq(arr), nil
} }
func UnpackListCumulativeMeasuresReq(arr []byte) (m ListCumulativeMeasuresReq) { func ReadListInstantPeriodsReq(r *bufreader.BufferedReader) (m ListInstantPeriodsReq, err error) {
m.MetricID = bin.GetUint32(arr) arr, err := r.ReadN(16)
m.Since = bin.GetUint32(arr[4:]) if err != nil {
m.Until = bin.GetUint32(arr[8:]) err = fmt.Errorf("read req: %s", err)
return return
}
return UnpackListInstantPeriodsReq(arr), nil
} }
type ListInstantPeriodsReq struct { func ReadListCumulativePeriodsReq(r *bufreader.BufferedReader) (m ListCumulativePeriodsReq, err error) {
MetricID uint32 arr, err := r.ReadN(15)
Since TimeBound if err != nil {
Until TimeBound err = fmt.Errorf("read req: %s", err)
GroupBy diploma.GroupBy return
AggregateFuncs byte }
FirstHourOfDay int return UnpackListCumulativePeriodsReq(arr), nil
} }
func ReadListInstantPeriodsReq(r *bufreader.BufferedReader) (m ListInstantPeriodsReq, err error) { func ReadRangeTotalReq(r *bufreader.BufferedReader) (m RangeTotalReq, err error) {
arr, err := r.ReadN(15) arr, err := r.ReadN(12)
if err != nil { if err != nil {
err = fmt.Errorf("read req: %s", err) err = fmt.Errorf("read req: %s", err)
return return
} }
return UnpackListInstantPeriodsReq(arr), nil return UnpackRangeTotalReq(arr), nil
} }
func UnpackListInstantPeriodsReq(arr []byte) (m ListInstantPeriodsReq) { func ReadListCurrentValuesReq(r *bufreader.BufferedReader) (m ListCurrentValuesReq, err error) {
m.MetricID = bin.GetUint32(arr) qty, err := bin.ReadUint16(r)
m.Since = TimeBound{ if err != nil {
Year: int(bin.GetUint16(arr[4:])), err = fmt.Errorf("read req: %s", err)
Month: time.Month(arr[6]), return
Day: int(arr[7]),
} }
m.Until = TimeBound{
Year: int(bin.GetUint16(arr[8:])), for i := range int(qty) {
Month: time.Month(arr[10]), var metricID uint32
Day: int(arr[11]), metricID, err = bin.ReadUint32(r)
if err != nil {
err = fmt.Errorf("read metricID (#%d): %s", i, err)
return
}
m.MetricIDs = append(m.MetricIDs, metricID)
} }
m.GroupBy = diploma.GroupBy(arr[12])
m.AggregateFuncs = arr[13]
m.FirstHourOfDay = int(arr[14])
return return
} }
type ListCumulativePeriodsReq struct { type AppendMeasuresReq struct {
MetricID uint32 MetricID uint32
Since TimeBound Measures []Measure
Until TimeBound
GroupBy diploma.GroupBy
FirstHourOfDay int
} }
func ReadListCumulativePeriodsReq(r *bufreader.BufferedReader) (m ListCumulativePeriodsReq, err error) { type Measure struct {
arr, err := r.ReadN(14) Timestamp uint32
Value float64
}
func PackAppendMeasures(req AppendMeasuresReq) []byte {
if len(req.Measures) > 65535 {
panic(fmt.Errorf("wrong measures qty: %d", len(req.Measures)))
}
var (
prefixSize = 7
recordSize = 12
arr = make([]byte, prefixSize+len(req.Measures)*recordSize)
)
arr[0] = TypeAppendMeasures
bin.PutUint32(arr[1:], req.MetricID)
bin.PutUint16(arr[5:], uint16(len(req.Measures)))
pos := prefixSize
for _, measure := range req.Measures {
bin.PutUint32(arr[pos:], measure.Timestamp)
bin.PutFloat64(arr[pos+4:], measure.Value)
pos += recordSize
}
return arr
}
func ReadAppendMeasuresReq(r *bufreader.BufferedReader) (m AppendMeasuresReq, err error) {
prefix, err := bin.ReadN(r, 6) // metricID + measures qty
if err != nil { if err != nil {
err = fmt.Errorf("read req: %s", err) err = fmt.Errorf("read prefix: %s", err)
return return
} }
return UnpackListCumulativePeriodsReq(arr), nil
}
func UnpackListCumulativePeriodsReq(arr []byte) (m ListCumulativePeriodsReq) { m.MetricID = bin.GetUint32(prefix[0:])
m.MetricID = bin.GetUint32(arr[0:]) qty := bin.GetUint16(prefix[4:])
m.Since = TimeBound{
Year: int(bin.GetUint16(arr[4:])), for i := range int(qty) {
Month: time.Month(arr[6]), var measure Measure
Day: int(arr[7]), measure.Timestamp, err = bin.ReadUint32(r)
if err != nil {
err = fmt.Errorf("read timestamp (#%d): %s", i, err)
return
}
measure.Value, err = bin.ReadFloat64(r)
if err != nil {
err = fmt.Errorf("read value (#%d): %s", i, err)
return
} }
m.Until = TimeBound{ m.Measures = append(m.Measures, measure)
Year: int(bin.GetUint16(arr[8:])),
Month: time.Month(arr[10]),
Day: int(arr[11]),
} }
m.GroupBy = diploma.GroupBy(arr[12])
m.FirstHourOfDay = int(arr[13])
return return
} }

Binary file not shown.

Binary file not shown.

@ -1,372 +0,0 @@
package transform
import (
"errors"
"fmt"
"io"
"time"
"gordenko.dev/dima/diploma"
"gordenko.dev/dima/diploma/bin"
"gordenko.dev/dima/diploma/timeutil"
)
// INSTANT
type InstantPeriodsWriterOptions struct {
Dst io.Writer
GroupBy diploma.GroupBy
Since uint32
AggregateFuncs byte
FirstHourOfDay int
}
type InstantPeriodsWriter struct {
aggregateFuncs byte
arr []byte
responder *ChunkedResponder
groupBy diploma.GroupBy
since uint32
firstHourOfDay int
time2period func(uint32) time.Time
currentPeriod time.Time
lastTimestamp uint32
endTimestamp uint32
min float64
max float64
total float64
entries int
}
func NewInstantPeriodsWriter(opt InstantPeriodsWriterOptions) (*InstantPeriodsWriter, error) {
if opt.Dst == nil {
return nil, errors.New("Dst option is required")
}
if opt.FirstHourOfDay < 0 || opt.FirstHourOfDay > 23 {
return nil, fmt.Errorf("wrong FirstHourOfDay option: %d", opt.FirstHourOfDay)
}
var q int
if (opt.AggregateFuncs & diploma.AggregateMin) == diploma.AggregateMin {
q++
}
if (opt.AggregateFuncs & diploma.AggregateMax) == diploma.AggregateMax {
q++
}
if (opt.AggregateFuncs & diploma.AggregateAvg) == diploma.AggregateAvg {
q++
}
if q == 0 {
return nil, errors.New("AggregateFuncs option is required")
}
s := &InstantPeriodsWriter{
aggregateFuncs: opt.AggregateFuncs,
arr: make([]byte, 12+q*8),
responder: NewChunkedResponder(opt.Dst),
groupBy: opt.GroupBy,
since: opt.Since,
firstHourOfDay: opt.FirstHourOfDay,
}
switch opt.GroupBy {
case diploma.GroupByHour:
s.time2period = groupByHour
case diploma.GroupByDay:
if s.firstHourOfDay > 0 {
s.time2period = s.groupByDayUsingFHD
} else {
s.time2period = groupByDay
}
case diploma.GroupByMonth:
if s.firstHourOfDay > 0 {
s.time2period = s.groupByMonthUsingFHD
} else {
s.time2period = groupByMonth
}
default:
return nil, fmt.Errorf("unknown groupBy %d option", opt.GroupBy)
}
return s, nil
}
func (s *InstantPeriodsWriter) groupByDayUsingFHD(timestamp uint32) time.Time {
tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "d")
if tm.Hour() < s.firstHourOfDay {
tm = tm.AddDate(0, 0, -1)
}
return tm
}
func (s *InstantPeriodsWriter) groupByMonthUsingFHD(timestamp uint32) time.Time {
tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m")
if tm.Hour() < s.firstHourOfDay {
tm = tm.AddDate(0, 0, -1)
}
return tm
}
func (s *InstantPeriodsWriter) Feed(timestamp uint32, value float64) {
s.feed(timestamp, value, false)
}
func (s *InstantPeriodsWriter) FeedNoSend(timestamp uint32, value float64) {
s.feed(timestamp, value, true)
}
func (s *InstantPeriodsWriter) feed(timestamp uint32, value float64, isBuffer bool) {
if s.entries > 0 {
period := s.time2period(timestamp)
if period != s.currentPeriod {
s.packPeriod(timestamp)
if isBuffer {
s.responder.BufferRecord(s.arr)
} else {
s.responder.AppendRecord(s.arr)
}
s.decrementPeriod()
for period.Before(s.currentPeriod) {
s.packBlankPeriod()
if isBuffer {
s.responder.BufferRecord(s.arr)
} else {
s.responder.AppendRecord(s.arr)
}
s.decrementPeriod()
}
s.endTimestamp = timestamp
s.min = value
s.max = value
s.total = value
s.entries = 1
} else {
if value < s.min {
s.min = value
} else if value > s.max {
s.max = value
}
s.total += value
s.entries++
}
} else {
s.endTimestamp = timestamp
s.min = value
s.max = value
s.total = value
s.entries = 1
s.currentPeriod = s.time2period(timestamp)
}
s.lastTimestamp = timestamp
}
func (s *InstantPeriodsWriter) decrementPeriod() {
switch s.groupBy {
case diploma.GroupByHour:
s.currentPeriod = s.currentPeriod.Add(-1 * time.Hour)
case diploma.GroupByDay:
s.currentPeriod = s.currentPeriod.AddDate(0, 0, -1)
case diploma.GroupByMonth:
s.currentPeriod = s.currentPeriod.AddDate(0, -1, 0)
}
}
func (s *InstantPeriodsWriter) packBlankPeriod() {
bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix()))
for i := 4; i < len(s.arr); i++ {
s.arr[i] = 0
}
}
func (s *InstantPeriodsWriter) Close() (err error) {
if s.entries > 0 {
if s.lastTimestamp >= s.since {
s.packPeriod(s.lastTimestamp)
s.responder.AppendRecord(s.arr)
}
}
return s.responder.Flush()
}
func (s *InstantPeriodsWriter) packPeriod(timestamp uint32) {
bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix()))
bin.PutUint32(s.arr[4:], timestamp)
bin.PutUint32(s.arr[8:], s.endTimestamp)
pos := 12
if (s.aggregateFuncs & diploma.AggregateMin) == diploma.AggregateMin {
bin.PutFloat64(s.arr[pos:], s.min)
pos += 8
}
if (s.aggregateFuncs & diploma.AggregateMax) == diploma.AggregateMax {
bin.PutFloat64(s.arr[pos:], s.max)
pos += 8
}
if (s.aggregateFuncs & diploma.AggregateAvg) == diploma.AggregateAvg {
bin.PutFloat64(s.arr[pos:], s.total/float64(s.entries))
}
}
type CumulativePeriodsWriter struct {
arr []byte
responder *ChunkedResponder
since uint32
firstHourOfDay int
currentPeriod time.Time
groupBy diploma.GroupBy
time2period func(uint32) time.Time
endTimestamp uint32
endValue float64
lastTimestamp uint32
lastValue float64
}
type CumulativePeriodsWriterOptions struct {
Dst io.Writer
GroupBy diploma.GroupBy
Since uint32
FirstHourOfDay int
}
func NewCumulativePeriodsWriter(opt CumulativePeriodsWriterOptions) (*CumulativePeriodsWriter, error) {
if opt.Dst == nil {
return nil, errors.New("Dst option is required")
}
if opt.FirstHourOfDay < 0 || opt.FirstHourOfDay > 23 {
return nil, fmt.Errorf("wrong firstHourOfDay option: %d", opt.FirstHourOfDay)
}
s := &CumulativePeriodsWriter{
arr: make([]byte, 28),
responder: NewChunkedResponder(opt.Dst),
since: opt.Since,
firstHourOfDay: opt.FirstHourOfDay,
groupBy: opt.GroupBy,
}
s.time2period = func(timestamp uint32) time.Time {
return timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "h")
}
switch opt.GroupBy {
case diploma.GroupByHour:
s.time2period = groupByHour
case diploma.GroupByDay:
if s.firstHourOfDay > 0 {
s.time2period = s.groupByDayUsingFHD
} else {
s.time2period = groupByDay
}
case diploma.GroupByMonth:
if s.firstHourOfDay > 0 {
s.time2period = s.groupByMonthUsingFHD
} else {
s.time2period = groupByMonth
}
default:
return nil, fmt.Errorf("unknown groupBy %d option", opt.GroupBy)
}
return s, nil
}
func (s *CumulativePeriodsWriter) groupByDayUsingFHD(timestamp uint32) time.Time {
tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "d")
if tm.Hour() < s.firstHourOfDay {
tm = tm.AddDate(0, 0, -1)
}
return tm
}
func (s *CumulativePeriodsWriter) groupByMonthUsingFHD(timestamp uint32) time.Time {
tm := timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m")
if tm.Hour() < s.firstHourOfDay {
tm = tm.AddDate(0, 0, -1)
}
return tm
}
func (s *CumulativePeriodsWriter) Feed(timestamp uint32, value float64) {
s.feed(timestamp, value, false)
}
func (s *CumulativePeriodsWriter) FeedNoSend(timestamp uint32, value float64) {
s.feed(timestamp, value, true)
}
func (s *CumulativePeriodsWriter) feed(timestamp uint32, value float64, isBuffer bool) {
if s.endTimestamp > 0 {
period := s.time2period(timestamp)
if period != s.currentPeriod {
s.packPeriod(timestamp, value)
if isBuffer {
s.responder.BufferRecord(s.arr)
} else {
s.responder.AppendRecord(s.arr)
}
s.decrementPeriod()
for period.Before(s.currentPeriod) {
// вставляю пустышку
s.packBlankPeriod()
if isBuffer {
s.responder.BufferRecord(s.arr)
} else {
s.responder.AppendRecord(s.arr)
}
s.decrementPeriod()
}
s.endTimestamp = timestamp
s.endValue = value
}
} else {
s.endTimestamp = timestamp
s.endValue = value
s.currentPeriod = s.time2period(timestamp)
}
s.lastTimestamp = timestamp
s.lastValue = value
}
func (s *CumulativePeriodsWriter) decrementPeriod() {
switch s.groupBy {
case diploma.GroupByHour:
s.currentPeriod = s.currentPeriod.Add(-1 * time.Hour)
case diploma.GroupByDay:
s.currentPeriod = s.currentPeriod.AddDate(0, 0, -1)
case diploma.GroupByMonth:
s.currentPeriod = s.currentPeriod.AddDate(0, -1, 0)
}
}
func (s *CumulativePeriodsWriter) packBlankPeriod() {
bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix()))
for i := 4; i < len(s.arr); i++ {
s.arr[i] = 0
}
}
func (s *CumulativePeriodsWriter) packPeriod(start uint32, startValue float64) {
bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix()))
bin.PutUint32(s.arr[4:], start)
bin.PutUint32(s.arr[8:], s.endTimestamp)
bin.PutFloat64(s.arr[12:], startValue)
bin.PutFloat64(s.arr[20:], s.endValue)
}
func (s *CumulativePeriodsWriter) Close() error {
if s.endTimestamp > 0 {
if s.endTimestamp >= s.since {
if s.lastTimestamp != s.endTimestamp {
s.packPeriod(s.lastTimestamp, s.lastValue)
} else {
s.packPeriod(s.endTimestamp, s.endValue)
}
s.responder.AppendRecord(s.arr)
}
}
return s.responder.Flush()
}

@ -1,143 +0,0 @@
package transform
import (
"io"
"gordenko.dev/dima/diploma/bin"
)
// CURRENT VALUE WRITER
type CurrentValue struct {
MetricID uint32
Timestamp uint32
Value float64
}
type CurrentValueWriter struct {
arr []byte
responder *ChunkedResponder
}
func NewCurrentValueWriter(dst io.Writer) *CurrentValueWriter {
return &CurrentValueWriter{
arr: make([]byte, 16),
responder: NewChunkedResponder(dst),
}
}
func (s *CurrentValueWriter) BufferValue(m CurrentValue) {
bin.PutUint32(s.arr[0:], m.MetricID)
bin.PutUint32(s.arr[4:], m.Timestamp)
bin.PutFloat64(s.arr[8:], m.Value)
s.responder.BufferRecord(s.arr)
}
func (s *CurrentValueWriter) Close() error {
return s.responder.Flush()
}
// INSTANT MEASURE WRITER
type InstantMeasure struct {
Timestamp uint32
Value float64
}
type InstantMeasureWriter struct {
arr []byte
responder *ChunkedResponder
since uint32
}
func NewInstantMeasureWriter(dst io.Writer, since uint32) *InstantMeasureWriter {
return &InstantMeasureWriter{
arr: make([]byte, 12),
responder: NewChunkedResponder(dst),
since: since,
}
}
func (s *InstantMeasureWriter) Feed(timestamp uint32, value float64) {
s.feed(timestamp, value, false)
}
func (s *InstantMeasureWriter) FeedNoSend(timestamp uint32, value float64) {
s.feed(timestamp, value, true)
}
func (s *InstantMeasureWriter) feed(timestamp uint32, value float64, isBuffer bool) {
if timestamp < s.since {
return
}
bin.PutUint32(s.arr[0:], timestamp)
bin.PutFloat64(s.arr[4:], value)
if isBuffer {
s.responder.BufferRecord(s.arr)
} else {
s.responder.AppendRecord(s.arr)
}
}
func (s *InstantMeasureWriter) Close() error {
return s.responder.Flush()
}
// CUMULATIVE MEASURE WRITER
type CumulativeMeasure struct {
Timestamp uint32
Value float64
Total float64
}
type CumulativeMeasureWriter struct {
arr []byte
responder *ChunkedResponder
since uint32
endTimestamp uint32
endValue float64
}
func NewCumulativeMeasureWriter(dst io.Writer, since uint32) *CumulativeMeasureWriter {
return &CumulativeMeasureWriter{
arr: make([]byte, 20),
responder: NewChunkedResponder(dst),
since: since,
}
}
func (s *CumulativeMeasureWriter) Feed(timestamp uint32, value float64) {
s.feed(timestamp, value, false)
}
func (s *CumulativeMeasureWriter) FeedNoSend(timestamp uint32, value float64) {
s.feed(timestamp, value, true)
}
func (s *CumulativeMeasureWriter) feed(timestamp uint32, value float64, isBuffer bool) {
if s.endTimestamp > 0 {
s.pack(s.endValue - value)
if isBuffer {
s.responder.BufferRecord(s.arr)
} else {
s.responder.AppendRecord(s.arr)
}
}
s.endTimestamp = timestamp
s.endValue = value
}
func (s *CumulativeMeasureWriter) pack(total float64) {
bin.PutUint32(s.arr[0:], s.endTimestamp)
bin.PutFloat64(s.arr[4:], s.endValue)
bin.PutFloat64(s.arr[12:], total)
}
func (s *CumulativeMeasureWriter) Close() error {
if s.endTimestamp >= s.since {
s.pack(0)
s.responder.BufferRecord(s.arr)
}
return s.responder.Flush()
}

@ -1,89 +0,0 @@
package transform
import (
"bytes"
"fmt"
"io"
"gordenko.dev/dima/diploma/bin"
"gordenko.dev/dima/diploma/proto"
)
// CHUNKED RESPONDER
var endMsg = []byte{
proto.RespEndOfValue, // end of stream
}
type ChunkedResponder struct {
recordsQty int
buf *bytes.Buffer
dst io.Writer
}
func NewChunkedResponder(dst io.Writer) *ChunkedResponder {
s := &ChunkedResponder{
recordsQty: 0,
buf: bytes.NewBuffer(nil),
dst: dst,
}
s.buf.Write([]byte{
proto.RespPartOfValue, // message type
0, 0, 0, 0, // records qty
})
return s
}
func (s *ChunkedResponder) BufferRecord(rec []byte) {
s.buf.Write(rec)
s.recordsQty++
}
func (s *ChunkedResponder) AppendRecord(rec []byte) error {
s.buf.Write(rec)
s.recordsQty++
if s.buf.Len() < 1500 {
return nil
}
if err := s.sendBuffered(); err != nil {
return err
}
s.buf.Write([]byte{
proto.RespPartOfValue, // message type
0, 0, 0, 0, // records qty
})
s.recordsQty = 0
return nil
}
func (s *ChunkedResponder) Flush() error {
if s.recordsQty > 0 {
if err := s.sendBuffered(); err != nil {
return err
}
}
if _, err := s.dst.Write(endMsg); err != nil {
return err
}
return nil
}
func (s *ChunkedResponder) sendBuffered() (err error) {
msg := s.buf.Bytes()
bin.PutUint32(msg[1:], uint32(s.recordsQty))
n, err := s.dst.Write(msg)
if err != nil {
return
}
if n != len(msg) {
return fmt.Errorf("incomplete write %d bytes instead of %d", n, len(msg))
}
s.buf.Reset()
return
}

@ -1,19 +0,0 @@
package transform
import (
"time"
"gordenko.dev/dima/diploma/timeutil"
)
func groupByHour(timestamp uint32) time.Time {
return timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "h")
}
func groupByDay(timestamp uint32) time.Time {
return timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "d")
}
func groupByMonth(timestamp uint32) time.Time {
return timeutil.FirstSecondInPeriod(time.Unix(int64(timestamp), 0), "m")
}
Loading…
Cancel
Save