diff --git a/atree/atree.go b/atree/atree.go index d554f91..7159ced 100644 --- a/atree/atree.go +++ b/atree/atree.go @@ -378,10 +378,6 @@ func (s *Atree) AppendDataPage(req AppendDataPageReq) (_ redo.Report, err error) return } - // На данний момен схема - наступна. Всі сторінки - data та index - зафіксовані в кеші. - // Отже запис на диск пройде максимально швидко. Після цього ReferenceCount кожної - // сторінки зменшиться на 1. Оскільки на метрику утримується XLock, сторінки мають - // ReferenceCount = 1 (немає інших читачів). waitCh := make(chan struct{}) task := WriteTask{ diff --git a/atree/select.go b/atree/select.go index ce2fb2e..049837a 100644 --- a/atree/select.go +++ b/atree/select.go @@ -7,8 +7,9 @@ import ( ) type ContinueFullScanReq struct { + MetricType diploma.MetricType FracDigits byte - ResponseWriter AtreeMeasureConsumer + ResponseWriter diploma.AtreeMeasureConsumer LastPageNo uint32 } @@ -23,7 +24,7 @@ func (s *Atree) ContinueFullScan(req ContinueFullScanReq) error { PageData: buf, Atree: s, FracDigits: req.FracDigits, - MetricType: diploma.Instant, + MetricType: req.MetricType, }) if err != nil { return err @@ -43,8 +44,9 @@ func (s *Atree) ContinueFullScan(req ContinueFullScanReq) error { } type ContinueRangeScanReq struct { + MetricType diploma.MetricType FracDigits byte - ResponseWriter AtreeMeasureConsumer + ResponseWriter diploma.AtreeMeasureConsumer LastPageNo uint32 Since uint32 } @@ -60,7 +62,7 @@ func (s *Atree) ContinueRangeScan(req ContinueRangeScanReq) error { PageData: buf, Atree: s, FracDigits: req.FracDigits, - MetricType: diploma.Instant, + MetricType: req.MetricType, }) if err != nil { return err @@ -83,8 +85,9 @@ func (s *Atree) ContinueRangeScan(req ContinueRangeScanReq) error { } type RangeScanReq struct { + MetricType diploma.MetricType FracDigits byte - ResponseWriter AtreeMeasureConsumer + ResponseWriter diploma.AtreeMeasureConsumer RootPageNo uint32 Since uint32 Until uint32 @@ -101,7 +104,7 @@ func (s *Atree) RangeScan(req RangeScanReq) error { PageData: buf, Atree: s, FracDigits: req.FracDigits, - MetricType: diploma.Instant, + MetricType: req.MetricType, }) if err != nil { return err @@ -117,11 +120,12 @@ func (s *Atree) RangeScan(req RangeScanReq) error { return nil } + //fmt.Printf("atree range scan: %s, %v\n", time.Unix(int64(timestamp), 0).Format("2006-01-02 15:04:05"), value) + if timestamp <= req.Until { req.ResponseWriter.Feed(timestamp, value) if timestamp < req.Since { - // - записи, удовлетворяющие временным рамкам, закончились. return nil } } diff --git a/atree/writers.go b/atree/writers.go deleted file mode 100644 index 2fd52e0..0000000 --- a/atree/writers.go +++ /dev/null @@ -1,15 +0,0 @@ -package atree - -type PeriodsWriter interface { - Feed(uint32, float64) - FeedNoSend(uint32, float64) - Close() error -} - -type WorkerMeasureConsumer interface { - FeedNoSend(uint32, float64) -} - -type AtreeMeasureConsumer interface { - Feed(uint32, float64) -} diff --git a/client/client.go b/client/client.go index 1151d6d..49c276e 100644 --- a/client/client.go +++ b/client/client.go @@ -81,21 +81,6 @@ func (s *Connection) AddMetric(req proto.AddMetricReq) error { return s.mustSuccess(s.src) } -// -// func (s *Connection) UpdateMetric(req Metric) error { -// arr := []byte{ -// proto.TypeUpdateMetric, -// 0, 0, 0, 0, // -// byte(req.FracDigits), -// } -// bin.PutUint32(arr[1:], req.MetricID) - -// if _, err := s.conn.Write(arr); err != nil { -// return err -// } -// return s.mustSuccess(s.src) -// } - func (s *Connection) GetMetric(metricID uint32) (*proto.Metric, error) { arr := []byte{ proto.TypeGetMetric, @@ -184,73 +169,9 @@ func (s *Connection) AppendMeasures(req proto.AppendMeasuresReq) (err error) { if _, err := s.conn.Write(arr); err != nil { return err } - - //fmt.Printf("encode measures: %d\n", len(req.Measures)) return s.mustSuccess(s.src) } -// type AppendMeasurePerMetricReq struct { -// MetricID uint32 -// Measures []Measure -// } - -func (s *Connection) AppendMeasurePerMetric(list []proto.MetricMeasure) (_ []proto.AppendError, err error) { - if len(list) > 65535 { - return nil, fmt.Errorf("wrong measures qty: %d", len(list)) - } - var ( - // 3 bytes: 1b message type + 2b records qty - fixedSize = 3 - recordSize = 16 - arr = make([]byte, fixedSize+len(list)*recordSize) - ) - arr[0] = proto.TypeAppendMeasures - bin.PutUint16(arr[1:], uint16(len(list))) - pos := fixedSize - for _, item := range list { - bin.PutUint32(arr[pos:], item.MetricID) - bin.PutUint32(arr[pos+4:], item.Timestamp) - bin.PutFloat64(arr[pos+8:], item.Value) - pos += recordSize - } - if _, err := s.conn.Write(arr); err != nil { - return nil, err - } - - code, err := s.src.ReadByte() - if err != nil { - return nil, fmt.Errorf("read response code: %s", err) - } - - switch code { - case proto.RespValue: - var ( - qty uint16 - appendErrors []proto.AppendError - ) - qty, err = bin.ReadUint16(s.src) - if err != nil { - return - } - for range qty { - var ae proto.AppendError - ae.MetricID, err = bin.ReadUint32(s.src) - if err != nil { - return - } - ae.ErrorCode, err = bin.ReadUint16(s.src) - if err != nil { - return - } - appendErrors = append(appendErrors, ae) - } - return appendErrors, nil - - default: - return nil, fmt.Errorf("unknown reponse code %d", code) - } -} - func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]proto.InstantMeasure, error) { arr := []byte{ proto.TypeListAllInstantMeasures, @@ -273,8 +194,6 @@ func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]proto.InstantMea return nil, fmt.Errorf("read response code: %s", err) } - //fmt.Printf("code: %d\n", code) - switch code { case proto.RespPartOfValue: q, err := bin.ReadUint32(s.src) @@ -282,8 +201,6 @@ func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]proto.InstantMea return nil, fmt.Errorf("read records qty: %s", err) } - //fmt.Printf("q: %d\n", q) - for i := range int(q) { err = bin.ReadNInto(s.src, tmp) if err != nil { @@ -387,8 +304,6 @@ func (s *Connection) ListAllCumulativeMeasures(metricID uint32) ([]proto.Cumulat return nil, fmt.Errorf("read response code: %s", err) } - //fmt.Printf("code: %d\n", code) - switch code { case proto.RespPartOfValue: q, err := bin.ReadUint32(s.src) @@ -402,19 +317,11 @@ func (s *Connection) ListAllCumulativeMeasures(metricID uint32) ([]proto.Cumulat return nil, fmt.Errorf("read record #%d: %s", i, err) } - //fmt.Printf("tmp: %d\n", tmp) - result = append(result, proto.CumulativeMeasure{ Timestamp: bin.GetUint32(tmp), Value: bin.GetFloat64(tmp[4:]), Total: bin.GetFloat64(tmp[12:]), }) - - // pretty.PPrintln("measure", CumulativeMeasure{ - // Timestamp: bin.GetUint32(tmp), - // Value: bin.GetFloat64(tmp[4:]), - // Total: bin.GetFloat64(tmp[12:]), - // }) } case proto.RespEndOfValue: @@ -533,8 +440,6 @@ func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]prot return nil, fmt.Errorf("read response code: %s", err) } - //fmt.Printf("code: %d\n", code) - switch code { case proto.RespPartOfValue: q, err := bin.ReadUint32(s.src) @@ -551,8 +456,8 @@ func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]prot var ( p = proto.InstantPeriod{ Period: bin.GetUint32(tmp[0:]), - Since: bin.GetUint32(tmp[4:]), - Until: bin.GetUint32(tmp[8:]), + Start: bin.GetUint32(tmp[4:]), + End: bin.GetUint32(tmp[8:]), } // 12 bytes - period, since, until pos = 12 @@ -629,11 +534,11 @@ func (s *Connection) ListCumulativePeriods(req proto.ListCumulativePeriodsReq) ( return nil, fmt.Errorf("read record #%d: %s", i, err) } result = append(result, proto.CumulativePeriod{ - Period: bin.GetUint32(tmp[0:]), - Since: bin.GetUint32(tmp[4:]), - Until: bin.GetUint32(tmp[8:]), - EndValue: bin.GetFloat64(tmp[12:]), - Total: bin.GetFloat64(tmp[20:]), + Period: bin.GetUint32(tmp[0:]), + Start: bin.GetUint32(tmp[4:]), + End: bin.GetUint32(tmp[8:]), + StartValue: bin.GetFloat64(tmp[12:]), + EndValue: bin.GetFloat64(tmp[20:]), }) } diff --git a/database/api.go b/database/api.go index 98940e4..9174d2e 100644 --- a/database/api.go +++ b/database/api.go @@ -499,10 +499,11 @@ func (s *Database) AppendMeasures(req proto.AppendMeasuresReq) uint16 { toAppendMeasures = nil } + //fmt.Printf("APPEND DATA PAGE %d, %v\n", measure.Timestamp, measure.Value) report, err := s.atree.AppendDataPage(atree.AppendDataPageReq{ MetricID: req.MetricID, - Timestamp: until, - Value: untilValue, + Timestamp: measure.Timestamp, + Value: measure.Value, Since: since, RootPageNo: rootPageNo, PrevPageNo: prevPageNo, @@ -607,7 +608,7 @@ func (s *Database) DeleteMeasures(req proto.DeleteMeasuresReq) uint16 { case DeleteFromAtreeRequired: // собираю номера всех data и index страниц метрики (типа запись REDO лога). - pageLists, err := s.atree.GetAllPages(req.MetricID) + pageLists, err := s.atree.GetAllPages(result.RootPageNo) if err != nil { diploma.Abort(diploma.FailedAtreeRequest, err) } @@ -670,7 +671,7 @@ func (s *Database) ListInstantMeasures(conn net.Conn, req proto.ListInstantMeasu MetricID: req.MetricID, MetricType: diploma.Instant, Since: req.Since, - Until: req.Until - 1, + Until: req.Until, Conn: conn, ResponseWriter: responseWriter, }) @@ -688,7 +689,7 @@ func (s *Database) ListCumulativeMeasures(conn net.Conn, req proto.ListCumulativ MetricID: req.MetricID, MetricType: diploma.Cumulative, Since: req.Since, - Until: req.Until - 1, + Until: req.Until, Conn: conn, ResponseWriter: responseWriter, }) @@ -739,6 +740,7 @@ func (s *Database) ListCumulativePeriods(conn net.Conn, req proto.ListCumulative responseWriter, err := transform.NewCumulativePeriodsWriter(transform.CumulativePeriodsWriterOptions{ Dst: conn, GroupBy: req.GroupBy, + Since: uint32(since.Unix()), FirstHourOfDay: req.FirstHourOfDay, }) if err != nil { @@ -762,7 +764,7 @@ type rangeScanReq struct { Since uint32 Until uint32 Conn io.Writer - ResponseWriter atree.PeriodsWriter + ResponseWriter diploma.MeasureConsumer } func (s *Database) rangeScan(req rangeScanReq) error { @@ -785,6 +787,7 @@ func (s *Database) rangeScan(req rangeScanReq) error { case UntilFound: err := s.atree.ContinueRangeScan(atree.ContinueRangeScanReq{ + MetricType: req.MetricType, FracDigits: result.FracDigits, ResponseWriter: req.ResponseWriter, LastPageNo: result.LastPageNo, @@ -800,6 +803,7 @@ func (s *Database) rangeScan(req rangeScanReq) error { case UntilNotFound: err := s.atree.RangeScan(atree.RangeScanReq{ + MetricType: req.MetricType, FracDigits: result.FracDigits, ResponseWriter: req.ResponseWriter, RootPageNo: result.RootPageNo, @@ -830,7 +834,7 @@ type fullScanReq struct { MetricID uint32 MetricType diploma.MetricType Conn io.Writer - ResponseWriter atree.PeriodsWriter + ResponseWriter diploma.MeasureConsumer } func (s *Database) fullScan(req fullScanReq) error { @@ -851,6 +855,7 @@ func (s *Database) fullScan(req fullScanReq) error { case UntilFound: err := s.atree.ContinueFullScan(atree.ContinueFullScanReq{ + MetricType: req.MetricType, FracDigits: result.FracDigits, ResponseWriter: req.ResponseWriter, LastPageNo: result.LastPageNo, diff --git a/database/helpers.go b/database/helpers.go index d8500cd..acb6d80 100644 --- a/database/helpers.go +++ b/database/helpers.go @@ -14,11 +14,13 @@ func timeBoundsOfAggregation(since, until proto.TimeBound, groupBy diploma.Group switch groupBy { case diploma.GroupByHour, diploma.GroupByDay: s = time.Date(since.Year, since.Month, since.Day, 0, 0, 0, 0, time.Local) - u = time.Date(until.Year, until.Month, until.Day, 0, 0, 0, 0, time.Local) + u = time.Date(until.Year, until.Month, until.Day, 23, 59, 59, 0, time.Local) case diploma.GroupByMonth: s = time.Date(since.Year, since.Month, 1, 0, 0, 0, 0, time.Local) - u = time.Date(until.Year, until.Month, 1, 0, 0, 0, 0, time.Local) + u = time.Date(until.Year, until.Month, 1, 23, 59, 59, 0, time.Local) + u = u.AddDate(0, 1, 0) + u = u.AddDate(0, 0, -1) } if firstHourOfDay > 0 { @@ -26,8 +28,6 @@ func timeBoundsOfAggregation(since, until proto.TimeBound, groupBy diploma.Group s = s.Add(duration) u = u.Add(duration) } - - u = u.Add(-1 * time.Second) return } @@ -65,10 +65,3 @@ func (s *Database) metricRUnlock(metricID uint32) { default: } } - -func correctToFHD(since, until uint32, firstHourOfDay int) (uint32, uint32) { - duration := time.Duration(firstHourOfDay) * time.Hour - since = uint32(time.Unix(int64(since), 0).Add(duration).Unix()) - until = uint32(time.Unix(int64(until), 0).Add(duration).Unix()) - return since, until -} diff --git a/database/proc.go b/database/proc.go index 1452d95..9cced88 100644 --- a/database/proc.go +++ b/database/proc.go @@ -528,6 +528,8 @@ func (s *Database) appendMeasureAfterOverflow(extended txlog.AppendedMeasureWith rec.MetricID)) } + //fmt.Printf("reinit by: %d, %v\n", rec.Timestamp, rec.Value) + metric.ReinitBy(rec.Timestamp, rec.Value) if rec.IsRootChanged { metric.RootPageNo = rec.RootPageNo @@ -643,7 +645,7 @@ type tryRangeScanReq struct { Since uint32 Until uint32 MetricType diploma.MetricType - ResponseWriter atree.WorkerMeasureConsumer + ResponseWriter diploma.WorkerMeasureConsumer ResultCh chan rangeScanResult } @@ -717,11 +719,20 @@ func (*Database) startRangeScan(metric *_metric, req tryRangeScanReq) bool { metric.Timestamps.Size(), ) - valueDecompressor := chunkenc.NewReverseInstantDeltaDecompressor( - metric.ValuesBuf, - metric.Values.Size(), - metric.FracDigits, - ) + var valueDecompressor diploma.ValueDecompressor + if metric.MetricType == diploma.Cumulative { + valueDecompressor = chunkenc.NewReverseCumulativeDeltaDecompressor( + metric.ValuesBuf, + metric.Values.Size(), + metric.FracDigits, + ) + } else { + valueDecompressor = chunkenc.NewReverseInstantDeltaDecompressor( + metric.ValuesBuf, + metric.Values.Size(), + metric.FracDigits, + ) + } for { timestamp, done := timestampDecompressor.NextValue() @@ -763,7 +774,7 @@ func (*Database) startRangeScan(metric *_metric, req tryRangeScanReq) bool { type tryFullScanReq struct { MetricID uint32 MetricType diploma.MetricType - ResponseWriter atree.WorkerMeasureConsumer + ResponseWriter diploma.WorkerMeasureConsumer ResultCh chan fullScanResult } @@ -813,12 +824,21 @@ func (*Database) startFullScan(metric *_metric, req tryFullScanReq) bool { metric.TimestampsBuf, metric.Timestamps.Size(), ) - valueDecompressor := chunkenc.NewReverseInstantDeltaDecompressor( - metric.ValuesBuf, - metric.Values.Size(), - metric.FracDigits, - ) + var valueDecompressor diploma.ValueDecompressor + if metric.MetricType == diploma.Cumulative { + valueDecompressor = chunkenc.NewReverseCumulativeDeltaDecompressor( + metric.ValuesBuf, + metric.Values.Size(), + metric.FracDigits, + ) + } else { + valueDecompressor = chunkenc.NewReverseInstantDeltaDecompressor( + metric.ValuesBuf, + metric.Values.Size(), + metric.FracDigits, + ) + } for { timestamp, done := timestampDecompressor.NextValue() if done { diff --git a/database_linux b/database_linux index 48bc8f7..3e41d19 100755 Binary files a/database_linux and b/database_linux differ diff --git a/diploma.go b/diploma.go index 53889c3..d5382db 100644 --- a/diploma.go +++ b/diploma.go @@ -46,6 +46,20 @@ type ValueDecompressor interface { NextValue() (float64, bool) } +type MeasureConsumer interface { + Feed(uint32, float64) + FeedNoSend(uint32, float64) + Close() error +} + +type WorkerMeasureConsumer interface { + FeedNoSend(uint32, float64) +} + +type AtreeMeasureConsumer interface { + Feed(uint32, float64) +} + type AbortCode int const ( diff --git a/examples/requests/requests.go b/examples/requests/requests.go index 3aef7d8..de24599 100644 --- a/examples/requests/requests.go +++ b/examples/requests/requests.go @@ -16,6 +16,8 @@ func sendRequests(conn *client.Connection) { cumulativeMetricID uint32 = 10001 fracDigits int = 2 err error + + seriesInDays = 62 ) conn.DeleteMetric(instantMetricID) @@ -41,17 +43,17 @@ func sendRequests(conn *client.Connection) { log.Fatalf("conn.GetMetric: %s\n", err) } else { fmt.Printf(` -GetMetric: - metricID: %d - metricType: %s - fracDigits: %d -`, + GetMetric: + metricID: %d + metricType: %s + fracDigits: %d + `, iMetric.MetricID, metricTypeToName[iMetric.MetricType], fracDigits) } // APPEND MEASURES - instantMeasures := GenerateInstantMeasures(62, 220) + instantMeasures := GenerateInstantMeasures(seriesInDays, 220) err = conn.AppendMeasures(proto.AppendMeasuresReq{ MetricID: instantMetricID, @@ -78,10 +80,10 @@ GetMetric: if err != nil { log.Fatalf("conn.ListInstantMeasures: %s\n", err) } else { - fmt.Printf("\nListInstantMeasures %s - %s:\n", - formatTime(uint32(since.Unix())), formatTime(uint32(until.Unix()))) + fmt.Printf("\nListInstantMeasures %s − %s:\n", + formatTime(since), formatTime(until)) for _, item := range instantList { - fmt.Printf(" %s => %.2f\n", formatTime(item.Timestamp), item.Value) + fmt.Printf(" %s => %.2f\n", formatTimestamp(item.Timestamp), item.Value) } } @@ -93,13 +95,13 @@ GetMetric: } else { fmt.Printf("\nListAllInstantMeasures (last 15 items):\n") for _, item := range instantList[:15] { - fmt.Printf(" %s => %.2f\n", formatTime(item.Timestamp), item.Value) + fmt.Printf(" %s => %.2f\n", formatTimestamp(item.Timestamp), item.Value) } } // LIST INSTANT PERIODS (group by hour) - until = time.Unix(int64(lastTimestamp+1), 0) + until = time.Unix(int64(lastTimestamp), 0) since = until.Add(-24 * time.Hour) instantPeriods, err := conn.ListInstantPeriods(proto.ListInstantPeriodsReq{ @@ -120,16 +122,19 @@ GetMetric: if err != nil { log.Fatalf("conn.ListInstantPeriods: %s\n", err) } else { - fmt.Printf("\nListInstantPeriods (1 day, group by hour):\n") + fmt.Printf("\nListInstantPeriods (%s − %s, group by day):\n", + formatDate(since), formatDate(until)) + for _, item := range instantPeriods { - fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n", formatHourPeriod(item.Period), item.Min, item.Max, item.Avg) + fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n", + formatHourPeriod(item.Period), item.Min, item.Max, item.Avg) } } // LIST INSTANT PERIODS (group by day) - until = time.Unix(int64(lastTimestamp+1), 0) - since = until.AddDate(0, 0, -7) + until = time.Unix(int64(lastTimestamp), 0) + since = until.AddDate(0, 0, -6) instantPeriods, err = conn.ListInstantPeriods(proto.ListInstantPeriodsReq{ MetricID: instantMetricID, @@ -149,16 +154,19 @@ GetMetric: if err != nil { log.Fatalf("conn.ListInstantPeriods: %s\n", err) } else { - fmt.Printf("\nListInstantPeriods (7 days, group by day):\n") + fmt.Printf("\nListInstantPeriods (%s − %s, group by day):\n", + formatDate(since), formatDate(until)) + for _, item := range instantPeriods { - fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n", formatDayPeriod(item.Period), item.Min, item.Max, item.Avg) + fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n", + formatDayPeriod(item.Period), item.Min, item.Max, item.Avg) } } // LIST INSTANT PERIODS (group by month) - until = time.Unix(int64(lastTimestamp+1), 0) - since = until.AddDate(0, 0, -62) + until = time.Unix(int64(lastTimestamp), 0) + since = until.AddDate(0, 0, -seriesInDays) instantPeriods, err = conn.ListInstantPeriods(proto.ListInstantPeriodsReq{ MetricID: instantMetricID, @@ -178,9 +186,11 @@ GetMetric: if err != nil { log.Fatalf("conn.ListInstantPeriods: %s\n", err) } else { - fmt.Printf("\nListInstantPeriods (62 days, group by month):\n") + fmt.Printf("\nListInstantPeriods (%s − %s, group by day):\n", + formatMonth(since), formatMonth(until)) for _, item := range instantPeriods { - fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n", formatMonthPeriod(item.Period), item.Min, item.Max, item.Avg) + fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n", + formatMonthPeriod(item.Period), item.Min, item.Max, item.Avg) } } @@ -234,7 +244,9 @@ GetMetric: // APPEND MEASURES - cumulativeMeasures := GenerateCumulativeMeasures(62) + cumulativeMeasures := GenerateCumulativeMeasures(seriesInDays) + + //pretty.PPrintln("CUMULATIVE MEASURES", cumulativeMeasures) err = conn.AppendMeasures(proto.AppendMeasuresReq{ MetricID: cumulativeMetricID, @@ -261,11 +273,12 @@ GetMetric: if err != nil { log.Fatalf("conn.ListCumulativeMeasures: %s\n", err) } else { - fmt.Printf("\nListCumulativeMeasures %s - %s:\n", - formatTime(uint32(since.Unix())), formatTime(uint32(until.Unix()))) + fmt.Printf("\nListCumulativeMeasures %s − %s:\n", + formatTime(since), formatTime(until)) for _, item := range cumulativeList { - fmt.Printf(" %s => %.2f\n", formatTime(item.Timestamp), item.Value) + fmt.Printf(" %s => %.2f (%.2f)\n", + formatTimestamp(item.Timestamp), item.Value, item.Total) } } @@ -277,13 +290,14 @@ GetMetric: } else { fmt.Printf("\nListAllCumulativeMeasures (last 15 items):\n") for _, item := range cumulativeList[:15] { - fmt.Printf(" %s => %.2f\n", formatTime(item.Timestamp), item.Value) + fmt.Printf(" %s => %.2f (%.2f)\n", + formatTimestamp(item.Timestamp), item.Value, item.Total) } } - // LIST CUMULATIVE PERIODS (group by hour) + //LIST CUMULATIVE PERIODS (group by hour) - until = time.Unix(int64(lastTimestamp+1), 0) + until = time.Unix(int64(lastTimestamp), 0) since = until.Add(-24 * time.Hour) cumulativePeriods, err := conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{ @@ -303,16 +317,19 @@ GetMetric: if err != nil { log.Fatalf("conn.ListCumulativePeriods: %s\n", err) } else { - fmt.Printf("\nListCumulativePeriods (1 day, group by hour):\n") + fmt.Printf("\nListCumulativePeriods (%s − %s, group by hour):\n", + formatDate(since), formatDate(until)) + for _, item := range cumulativePeriods { - fmt.Printf(" %s => end value %.2f, total %.2f\n", formatHourPeriod(item.Period), item.EndValue, item.Total) + fmt.Printf(" %s => %.2f (%.2f)\n", + formatHourPeriod(item.Period), item.EndValue, item.EndValue-item.StartValue) } } // LIST CUMULATIVE PERIODS (group by day) - until = time.Unix(int64(lastTimestamp+1), 0) - since = until.AddDate(0, 0, -7) + until = time.Unix(int64(lastTimestamp), 0) + since = until.AddDate(0, 0, -6) cumulativePeriods, err = conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{ MetricID: cumulativeMetricID, @@ -331,16 +348,19 @@ GetMetric: if err != nil { log.Fatalf("conn.ListCumulativePeriods: %s\n", err) } else { - fmt.Printf("\nListCumulativePeriods (7 days, group by day):\n") + fmt.Printf("\nListCumulativePeriods (%s − %s, group by day):\n", + formatDate(since), formatDate(until)) + for _, item := range cumulativePeriods { - fmt.Printf(" %s => end value %.2f, total %.2f\n", formatDayPeriod(item.Period), item.EndValue, item.Total) + fmt.Printf(" %s => %.2f (%.2f)\n", + formatDayPeriod(item.Period), item.EndValue, item.EndValue-item.StartValue) } } // LIST CUMULATIVE PERIODS (group by day) - until = time.Unix(int64(lastTimestamp+1), 0) - since = until.AddDate(0, 0, -62) + until = time.Unix(int64(lastTimestamp), 0) + since = until.AddDate(0, 0, -seriesInDays) cumulativePeriods, err = conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{ MetricID: cumulativeMetricID, @@ -359,9 +379,12 @@ GetMetric: if err != nil { log.Fatalf("conn.ListCumulativePeriods: %s\n", err) } else { - fmt.Printf("\nListCumulativePeriods (62 days, group by month):\n") + fmt.Printf("\nListCumulativePeriods (%s − %s, group by month):\n", + formatMonth(since), formatMonth(until)) + for _, item := range cumulativePeriods { - fmt.Printf(" %s => end value %.2f, total %.2f\n", formatMonthPeriod(item.Period), item.EndValue, item.Total) + fmt.Printf(" %s => %.2f (%.2f)\n", + formatMonthPeriod(item.Period), item.EndValue, item.EndValue-item.StartValue) } } @@ -386,11 +409,21 @@ GetMetric: } } -const datetimeLayout = "2006-01-02 15:04:05" +func formatMonth(tm time.Time) string { + return tm.Format("2006-01") +} + +func formatDate(tm time.Time) string { + return tm.Format("2006-01-02") +} + +func formatTime(tm time.Time) string { + return tm.Format("2006-01-02 15:04:05") +} -func formatTime(timestamp uint32) string { +func formatTimestamp(timestamp uint32) string { tm := time.Unix(int64(timestamp), 0) - return tm.Format(datetimeLayout) + return tm.Format("2006-01-02 15:04:05") } func formatHourPeriod(period uint32) string { diff --git a/go.mod b/go.mod index 995e343..b6b08a5 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.24.0 require ( github.com/RoaringBitmap/roaring/v2 v2.5.0 gopkg.in/ini.v1 v1.67.0 + gordenko.dev/dima/pretty v0.0.0-20221225212746-0c27d8c0ac69 ) require ( diff --git a/go.sum b/go.sum index 9a5aafa..ee64b10 100644 --- a/go.sum +++ b/go.sum @@ -18,3 +18,5 @@ gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA= gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gordenko.dev/dima/pretty v0.0.0-20221225212746-0c27d8c0ac69 h1:nyJ3mzTQ46yUeMZCdLyYcs7B5JCS54c67v84miyhq2E= +gordenko.dev/dima/pretty v0.0.0-20221225212746-0c27d8c0ac69/go.mod h1:AxgKDktpqBVyIOhIcP+nlCpK+EsJyjN5kPdqyd8euVU= diff --git a/loadtest_linux b/loadtest_linux index 122c54b..86b22c1 100755 Binary files a/loadtest_linux and b/loadtest_linux differ diff --git a/proto/proto.go b/proto/proto.go index a30af18..a6924c1 100644 --- a/proto/proto.go +++ b/proto/proto.go @@ -94,11 +94,11 @@ type CumulativeMeasure struct { } type CumulativePeriod struct { - Period uint32 - Since uint32 - Until uint32 - EndValue float64 - Total float64 + Period uint32 + Start uint32 + End uint32 + StartValue float64 + EndValue float64 } type InstantMeasure struct { @@ -108,8 +108,8 @@ type InstantMeasure struct { type InstantPeriod struct { Period uint32 - Since uint32 - Until uint32 + Start uint32 + End uint32 Min float64 Max float64 Avg float64 diff --git a/requests_linux b/requests_linux index 620b723..dc319df 100755 Binary files a/requests_linux and b/requests_linux differ diff --git a/testdir/12.changes b/testdir/12.changes new file mode 100755 index 0000000..631c65c Binary files /dev/null and b/testdir/12.changes differ diff --git a/testdir/12.snapshot b/testdir/12.snapshot new file mode 100755 index 0000000..cd1af48 Binary files /dev/null and b/testdir/12.snapshot differ diff --git a/testdir/test.data b/testdir/test.data new file mode 100755 index 0000000..90bf55c Binary files /dev/null and b/testdir/test.data differ diff --git a/testdir/test.index b/testdir/test.index new file mode 100755 index 0000000..292af93 Binary files /dev/null and b/testdir/test.index differ diff --git a/transform/aggregate.go b/transform/aggregate.go index c1adc30..c43f6db 100644 --- a/transform/aggregate.go +++ b/transform/aggregate.go @@ -29,7 +29,7 @@ type InstantPeriodsWriter struct { time2period func(uint32) time.Time currentPeriod time.Time lastTimestamp uint32 - endTimestamp uint32 // время показания на конец периода + endTimestamp uint32 min float64 max float64 total float64 @@ -43,7 +43,6 @@ func NewInstantPeriodsWriter(opt InstantPeriodsWriterOptions) (*InstantPeriodsWr if opt.FirstHourOfDay < 0 || opt.FirstHourOfDay > 23 { return nil, fmt.Errorf("wrong FirstHourOfDay option: %d", opt.FirstHourOfDay) } - // Считаю q, чтобы заранее выделить массив для упаковки периодов var q int if (opt.AggregateFuncs & diploma.AggregateMin) == diploma.AggregateMin { q++ @@ -59,8 +58,6 @@ func NewInstantPeriodsWriter(opt InstantPeriodsWriterOptions) (*InstantPeriodsWr return nil, errors.New("AggregateFuncs option is required") } - // 12 - это period, since, until - // 8 - это размер float64 s := &InstantPeriodsWriter{ aggregateFuncs: opt.AggregateFuncs, arr: make([]byte, 12+q*8), @@ -121,20 +118,14 @@ func (s *InstantPeriodsWriter) feed(timestamp uint32, value float64, isBuffer bo if s.entries > 0 { period := s.time2period(timestamp) if period != s.currentPeriod { - // закрываю период - // готовый период s.packPeriod(timestamp) if isBuffer { s.responder.BufferRecord(s.arr) } else { s.responder.AppendRecord(s.arr) } - // затем s.decrementPeriod() - //fmt.Println(" period: ", period.Format("2006-01-02 15:04:05")) - //fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05")) for period.Before(s.currentPeriod) { - // вставляю пустышку s.packBlankPeriod() if isBuffer { s.responder.BufferRecord(s.arr) @@ -142,9 +133,6 @@ func (s *InstantPeriodsWriter) feed(timestamp uint32, value float64, isBuffer bo s.responder.AppendRecord(s.arr) } s.decrementPeriod() - //fmt.Println(" period: ", period.Format("2006-01-02 15:04:05")) - //fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05")) - //return } s.endTimestamp = timestamp s.min = value @@ -157,7 +145,6 @@ func (s *InstantPeriodsWriter) feed(timestamp uint32, value float64, isBuffer bo } else if value > s.max { s.max = value } - // для подсчета AVG s.total += value s.entries++ } @@ -176,7 +163,6 @@ func (s *InstantPeriodsWriter) decrementPeriod() { switch s.groupBy { case diploma.GroupByHour: s.currentPeriod = s.currentPeriod.Add(-1 * time.Hour) - //fmt.Println("decrement") case diploma.GroupByDay: s.currentPeriod = s.currentPeriod.AddDate(0, 0, -1) case diploma.GroupByMonth: @@ -185,11 +171,6 @@ func (s *InstantPeriodsWriter) decrementPeriod() { } func (s *InstantPeriodsWriter) packBlankPeriod() { - //period := s.currentPeriod.Format("2006-01-02 15:04:05") - //since := "0" - //until := "0" - //fmt.Printf("%s: %s - %s, %.0f - %.0f\n", period, since, until, 0.0, 0.0) - // until - это endTimestamp всегда bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix())) for i := 4; i < len(s.arr); i++ { s.arr[i] = 0 @@ -223,17 +204,10 @@ func (s *InstantPeriodsWriter) packPeriod(timestamp uint32) { } } -/* -Идея с разбивкой на периоды: -Для каждого периода нахожу одно последнее значение. -Начало периода - это конец предыдущего. Если предыдущий не строго предыдущий, -а с пропусками - на место пропусков вставляю пустышки. -Плюс такого решения - я всегда показываю реальное значение на конец периода. -*/ - type CumulativePeriodsWriter struct { arr []byte responder *ChunkedResponder + since uint32 firstHourOfDay int currentPeriod time.Time groupBy diploma.GroupBy @@ -247,6 +221,7 @@ type CumulativePeriodsWriter struct { type CumulativePeriodsWriterOptions struct { Dst io.Writer GroupBy diploma.GroupBy + Since uint32 FirstHourOfDay int } @@ -254,7 +229,6 @@ func NewCumulativePeriodsWriter(opt CumulativePeriodsWriterOptions) (*Cumulative if opt.Dst == nil { return nil, errors.New("Dst option is required") } - // Считаю q, чтобы заранее выделить массив для упаковки периодов if opt.FirstHourOfDay < 0 || opt.FirstHourOfDay > 23 { return nil, fmt.Errorf("wrong firstHourOfDay option: %d", opt.FirstHourOfDay) } @@ -262,6 +236,7 @@ func NewCumulativePeriodsWriter(opt CumulativePeriodsWriterOptions) (*Cumulative s := &CumulativePeriodsWriter{ arr: make([]byte, 28), responder: NewChunkedResponder(opt.Dst), + since: opt.Since, firstHourOfDay: opt.FirstHourOfDay, groupBy: opt.GroupBy, } @@ -322,17 +297,13 @@ func (s *CumulativePeriodsWriter) feed(timestamp uint32, value float64, isBuffer if s.endTimestamp > 0 { period := s.time2period(timestamp) if period != s.currentPeriod { - // закрываю период s.packPeriod(timestamp, value) if isBuffer { s.responder.BufferRecord(s.arr) } else { s.responder.AppendRecord(s.arr) } - // затем s.decrementPeriod() - //fmt.Println(" period: ", period.Format("2006-01-02 15:04:05")) - //fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05")) for period.Before(s.currentPeriod) { // вставляю пустышку s.packBlankPeriod() @@ -342,9 +313,6 @@ func (s *CumulativePeriodsWriter) feed(timestamp uint32, value float64, isBuffer s.responder.AppendRecord(s.arr) } s.decrementPeriod() - //fmt.Println(" period: ", period.Format("2006-01-02 15:04:05")) - //fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05")) - //return } s.endTimestamp = timestamp s.endValue = value @@ -362,7 +330,6 @@ func (s *CumulativePeriodsWriter) decrementPeriod() { switch s.groupBy { case diploma.GroupByHour: s.currentPeriod = s.currentPeriod.Add(-1 * time.Hour) - //fmt.Println("decrement") case diploma.GroupByDay: s.currentPeriod = s.currentPeriod.AddDate(0, 0, -1) case diploma.GroupByMonth: @@ -371,11 +338,6 @@ func (s *CumulativePeriodsWriter) decrementPeriod() { } func (s *CumulativePeriodsWriter) packBlankPeriod() { - //period := s.currentPeriod.Format("2006-01-02 15:04:05") - //since := "0" - //until := "0" - //fmt.Printf("%s: %s - %s, %.0f - %.0f\n", period, since, until, 0.0, 0.0) - // until - это endTimestamp всегда bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix())) for i := 4; i < len(s.arr); i++ { s.arr[i] = 0 @@ -383,11 +345,6 @@ func (s *CumulativePeriodsWriter) packBlankPeriod() { } func (s *CumulativePeriodsWriter) packPeriod(start uint32, startValue float64) { - //period := s.currentPeriod.Format("2006-01-02 15:04:05") - //since := time.Unix(int64(start), 0).Format("2006-01-02 15:04:05") - //until := time.Unix(int64(s.endTimestamp), 0).Format("2006-01-02 15:04:05") - //fmt.Printf("%s: %s - %s, %.0f - %.0f\n", period, since, until, startValue, s.endValue) - // until - это endTimestamp всегда bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix())) bin.PutUint32(s.arr[4:], start) bin.PutUint32(s.arr[8:], s.endTimestamp) @@ -397,12 +354,14 @@ func (s *CumulativePeriodsWriter) packPeriod(start uint32, startValue float64) { func (s *CumulativePeriodsWriter) Close() error { if s.endTimestamp > 0 { - if s.lastTimestamp != s.endTimestamp { - s.packPeriod(s.lastTimestamp, s.lastValue) - } else { - s.packPeriod(s.endTimestamp, s.endValue) + if s.endTimestamp >= s.since { + if s.lastTimestamp != s.endTimestamp { + s.packPeriod(s.lastTimestamp, s.lastValue) + } else { + s.packPeriod(s.endTimestamp, s.endValue) + } + s.responder.AppendRecord(s.arr) } - s.responder.AppendRecord(s.arr) } return s.responder.Flush() } diff --git a/transform/raw.go b/transform/raw.go index fa692e6..d9c8af9 100644 --- a/transform/raw.go +++ b/transform/raw.go @@ -51,7 +51,6 @@ type InstantMeasureWriter struct { } func NewInstantMeasureWriter(dst io.Writer, since uint32) *InstantMeasureWriter { - // 12 - это timestamp, value return &InstantMeasureWriter{ arr: make([]byte, 12), responder: NewChunkedResponder(dst), @@ -101,7 +100,6 @@ type CumulativeMeasureWriter struct { } func NewCumulativeMeasureWriter(dst io.Writer, since uint32) *CumulativeMeasureWriter { - // 20 - это timestamp, value, total return &CumulativeMeasureWriter{ arr: make([]byte, 20), responder: NewChunkedResponder(dst), @@ -138,10 +136,8 @@ func (s *CumulativeMeasureWriter) pack(total float64) { func (s *CumulativeMeasureWriter) Close() error { if s.endTimestamp >= s.since { - // endTimestamp внутри заданного периода. Других показаний нет, - // поэтому время добавляю, но накопленную сумму ставлю 0. s.pack(0) - // Если < since - ничего делать не нужно, ибо накопленная сумма уже добавлена + s.responder.BufferRecord(s.arr) } return s.responder.Flush() } diff --git a/transform/responder.go b/transform/responder.go index da2e19c..8937e3d 100644 --- a/transform/responder.go +++ b/transform/responder.go @@ -70,36 +70,20 @@ func (s *ChunkedResponder) Flush() error { if _, err := s.dst.Write(endMsg); err != nil { return err } - //fmt.Printf("sent endMsg %d\n", endMsg) return nil } func (s *ChunkedResponder) sendBuffered() (err error) { msg := s.buf.Bytes() bin.PutUint32(msg[1:], uint32(s.recordsQty)) - //fmt.Printf("put uint16: %d\n", msg[:3]) - - //fmt.Printf("send %d records\n", s.recordsQty) - - //fmt.Printf("send buffered: %d, qty: %d\n", msg, s.recordsQty) n, err := s.dst.Write(msg) if err != nil { return } - if n != len(msg) { return fmt.Errorf("incomplete write %d bytes instead of %d", n, len(msg)) } - s.buf.Reset() return } - -// Для Aggregation пишем функцию определения периода и пуляем фактические периоды -// - -// By default net/http.Server uses 4KB buffers, which are flushed to client with chunked responses. -// These buffers may result in visible overhead for responses exceeding a few megabytes. -// So allocate 64Kb buffers. -// bw: bufio.NewWriterSize(w, 64*1024),