package database import ( "errors" "fmt" "io" "net" "gordenko.dev/dima/diploma" "gordenko.dev/dima/diploma/atree" "gordenko.dev/dima/diploma/bin" "gordenko.dev/dima/diploma/bufreader" "gordenko.dev/dima/diploma/chunkenc" "gordenko.dev/dima/diploma/conbuf" "gordenko.dev/dima/diploma/proto" "gordenko.dev/dima/diploma/txlog" ) var ( ErrNoValueBug = errors.New("has timestamp but no value") ErrWrongResultCodeBug = errors.New("bug: wrong result code") successMsg = []byte{ proto.RespSuccess, } ) func reply(conn io.Writer, errcode uint16) { var answer []byte if errcode == 0 { answer = successMsg } else { answer = []byte{ proto.RespError, 0, 0, } bin.PutUint16(answer[1:], errcode) } _, err := conn.Write(answer) if err != nil { return } } func (s *Database) handleTCPConn(conn net.Conn) { defer conn.Close() r := bufreader.New(conn, 128) for { err := s.processRequest(conn, r) if err != nil { if err != io.EOF { s.logger.Println(err) } return } } } func (s *Database) processRequest(conn net.Conn, r *bufreader.BufferedReader) (err error) { messageType, err := r.ReadByte() if err != nil { if err != io.EOF { return fmt.Errorf("read messageType: %s", err) } else { return err } } switch messageType { case proto.TypeGetMetric: req, err := proto.ReadGetMetricReq(r) if err != nil { return fmt.Errorf("proto.ReadGetMetricReq: %s", err) } if err = s.GetMetric(conn, req); err != nil { return fmt.Errorf("GetMetric: %s", err) } case proto.TypeAddMetric: req, err := proto.ReadAddMetricReq(r) if err != nil { return fmt.Errorf("proto.ReadAddMetricReq: %s", err) } reply(conn, s.AddMetric(req)) case proto.TypeDeleteMetric: req, err := proto.ReadDeleteMetricReq(r) if err != nil { return fmt.Errorf("proto.ReadDeleteMetricReq: %s", err) } reply(conn, s.DeleteMetric(req)) case proto.TypeAppendMeasure: req, err := proto.ReadAppendMeasureReq(r) if err != nil { return fmt.Errorf("proto.ReadAppendMeasureReq: %s", err) } //fmt.Println("append measure", req.MetricID, conn.RemoteAddr().String()) reply(conn, s.AppendMeasure(req)) case proto.TypeAppendMeasures: req, err := proto.ReadAppendMeasuresReq(r) if err != nil { return fmt.Errorf("proto.ReadAppendMeasuresReq: %s", err) } //fmt.Println("append measure", req.MetricID, conn.RemoteAddr().String()) reply(conn, s.AppendMeasures(req)) case proto.TypeListInstantMeasures: req, err := proto.ReadListInstantMeasuresReq(r) if err != nil { return fmt.Errorf("proto.ReadListInstantMeasuresReq: %s", err) } if err = s.ListInstantMeasures(conn, req); err != nil { return fmt.Errorf("ListInstantMeasures: %s", err) } case proto.TypeListCumulativeMeasures: req, err := proto.ReadListCumulativeMeasuresReq(r) if err != nil { return fmt.Errorf("proto.ReadListCumulativeMeasuresReq: %s", err) } if err = s.ListCumulativeMeasures(conn, req); err != nil { return fmt.Errorf("ListCumulativeMeasures: %s", err) } case proto.TypeListInstantPeriods: req, err := proto.ReadListInstantPeriodsReq(r) if err != nil { return fmt.Errorf("proto.ReadListInstantPeriodsReq: %s", err) } if err = s.ListInstantPeriods(conn, req); err != nil { return fmt.Errorf("ListInstantPeriods: %s", err) } case proto.TypeListCumulativePeriods: req, err := proto.ReadListCumulativePeriodsReq(r) if err != nil { return fmt.Errorf("proto.ReadListCumulativePeriodsReq: %s", err) } if err = s.ListCumulativePeriods(conn, req); err != nil { return fmt.Errorf("ListCumulativePeriods: %s", err) } case proto.TypeListCurrentValues: req, err := proto.ReadListCurrentValuesReq(r) if err != nil { return fmt.Errorf("proto.ListCurrentValuesReq: %s", err) } if err = s.ListCurrentValues(conn, req); err != nil { return fmt.Errorf("ListCurrentValues: %s", err) } case proto.TypeDeleteMeasures: req, err := proto.ReadDeleteMeasuresReq(r) if err != nil { return fmt.Errorf("proto.ReadDeleteMeasuresReq: %s", err) } reply(conn, s.DeleteMeasures(req)) case proto.TypeListAllInstantMeasures: req, err := proto.ReadListAllInstantMeasuresReq(r) if err != nil { return fmt.Errorf("proto.ReadListAllInstantMeasuresReq: %s", err) } if err = s.ListAllInstantMeasures(conn, req); err != nil { return fmt.Errorf("ListAllInstantMeasures: %s", err) } case proto.TypeListAllCumulativeMeasures: req, err := proto.ReadListAllCumulativeMeasuresReq(r) if err != nil { return fmt.Errorf("proto.ReadListAllCumulativeMeasuresReq: %s", err) } if err = s.ListAllCumulativeMeasures(conn, req); err != nil { return fmt.Errorf("ListAllCumulativeMeasures: %s", err) } default: return fmt.Errorf("unknown messageType: %d", messageType) } return } // API func (s *Database) AddMetric(req proto.AddMetricReq) uint16 { // Валидация if req.MetricID == 0 { return proto.ErrEmptyMetricID } if byte(req.FracDigits) > diploma.MaxFracDigits { return proto.ErrWrongFracDigits } switch req.MetricType { case diploma.Cumulative, diploma.Instant: // ok default: return proto.ErrWrongMetricType } resultCh := make(chan byte, 1) s.appendJobToWorkerQueue(tryAddMetricReq{ MetricID: req.MetricID, ResultCh: resultCh, }) resultCode := <-resultCh switch resultCode { case Succeed: waitCh := s.txlog.WriteAddedMetric(txlog.AddedMetric{ MetricID: req.MetricID, MetricType: req.MetricType, FracDigits: req.FracDigits, }) <-waitCh case MetricDuplicate: return proto.ErrDuplicate default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return 0 } type Metric struct { MetricType diploma.MetricType FracDigits byte ResultCode byte } func (s *Database) GetMetric(conn io.Writer, req proto.GetMetricReq) error { resultCh := make(chan Metric, 1) s.appendJobToWorkerQueue(tryGetMetricReq{ MetricID: req.MetricID, ResultCh: resultCh, }) result := <-resultCh switch result.ResultCode { case Succeed: answer := []byte{ proto.RespValue, 0, 0, 0, 0, // metricID byte(result.MetricType), result.FracDigits, } bin.PutUint32(answer[1:], req.MetricID) _, err := conn.Write(answer) if err != nil { return err } case NoMetric: reply(conn, proto.ErrNoMetric) default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return nil } type tryDeleteMetricResult struct { ResultCode byte RootPageNo uint32 } func (s *Database) DeleteMetric(req proto.DeleteMetricReq) uint16 { resultCh := make(chan tryDeleteMetricResult, 1) s.appendJobToWorkerQueue(tryDeleteMetricReq{ MetricID: req.MetricID, ResultCh: resultCh, }) result := <-resultCh switch result.ResultCode { case Succeed: var ( freeDataPages []uint32 freeIndexPages []uint32 ) if result.RootPageNo > 0 { pageLists, err := s.atree.GetAllPages(result.RootPageNo) if err != nil { diploma.Abort(diploma.FailedAtreeRequest, err) } freeDataPages = pageLists.DataPages freeIndexPages = pageLists.IndexPages } waitCh := s.txlog.WriteDeletedMetric(txlog.DeletedMetric{ MetricID: req.MetricID, FreeDataPages: freeDataPages, FreeIndexPages: freeIndexPages, }) <-waitCh case NoMetric: return proto.ErrNoMetric default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return 0 } type FilledPage struct { Since uint32 RootPageNo uint32 PrevPageNo uint32 TimestampsChunks [][]byte TimestampsSize uint16 ValuesChunks [][]byte ValuesSize uint16 } type tryAppendMeasureResult struct { FilledPage *FilledPage ResultCode byte } func (s *Database) AppendMeasure(req proto.AppendMeasureReq) uint16 { resultCh := make(chan tryAppendMeasureResult, 1) s.appendJobToWorkerQueue(tryAppendMeasureReq{ MetricID: req.MetricID, Timestamp: req.Timestamp, Value: req.Value, ResultCh: resultCh, }) result := <-resultCh switch result.ResultCode { case CanAppend: waitCh := s.txlog.WriteAppendMeasure(txlog.AppendedMeasure{ MetricID: req.MetricID, Timestamp: req.Timestamp, Value: req.Value, }) <-waitCh case NewPage: filled := result.FilledPage report, err := s.atree.AppendDataPage(atree.AppendDataPageReq{ MetricID: req.MetricID, Timestamp: req.Timestamp, Value: req.Value, Since: filled.Since, RootPageNo: filled.RootPageNo, PrevPageNo: filled.PrevPageNo, TimestampsChunks: filled.TimestampsChunks, TimestampsSize: filled.TimestampsSize, ValuesChunks: filled.ValuesChunks, ValuesSize: filled.ValuesSize, }) if err != nil { diploma.Abort(diploma.WriteToAtreeFailed, err) } waitCh := s.txlog.WriteAppendedMeasureWithOverflow( txlog.AppendedMeasureWithOverflow{ MetricID: req.MetricID, Timestamp: req.Timestamp, Value: req.Value, IsDataPageReused: report.IsDataPageReused, DataPageNo: report.DataPageNo, IsRootChanged: report.IsRootChanged, RootPageNo: report.NewRootPageNo, ReusedIndexPages: report.ReusedIndexPages, }, report.FileName, false, ) <-waitCh case NoMetric: return proto.ErrNoMetric case ExpiredMeasure: return proto.ErrExpiredMeasure case NonMonotonicValue: return proto.ErrNonMonotonicValue default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return 0 } type tryAppendMeasuresResult struct { ResultCode byte MetricType diploma.MetricType FracDigits byte Since uint32 Until uint32 UntilValue float64 RootPageNo uint32 PrevPageNo uint32 TimestampsBuf *conbuf.ContinuousBuffer ValuesBuf *conbuf.ContinuousBuffer Timestamps diploma.TimestampCompressor Values diploma.ValueCompressor } func (s *Database) AppendMeasures(req proto.AppendMeasuresReq) uint16 { resultCh := make(chan tryAppendMeasuresResult, 1) s.appendJobToWorkerQueue(tryAppendMeasuresReq{ MetricID: req.MetricID, ResultCh: resultCh, }) result := <-resultCh switch result.ResultCode { case CanAppend: var ( rootPageNo = result.RootPageNo prevPageNo = result.PrevPageNo timestampsBuf = result.TimestampsBuf valuesBuf = result.ValuesBuf timestamps = result.Timestamps values = result.Values since = result.Since until = result.Until untilValue = result.UntilValue // toAppendMeasures []proto.Measure ) for idx, measure := range req.Measures { //fmt.Printf("%d %v\n", measure.Timestamp, measure.Value) if since == 0 { since = measure.Timestamp } else { if measure.Timestamp <= until { if len(toAppendMeasures) > 0 { waitCh := s.txlog.WriteAppendMeasures( txlog.AppendedMeasures{ MetricID: req.MetricID, Measures: toAppendMeasures, }, false, ) <-waitCh } return proto.ErrExpiredMeasure } if result.MetricType == diploma.Cumulative && measure.Value < untilValue { if len(toAppendMeasures) > 0 { waitCh := s.txlog.WriteAppendMeasures( txlog.AppendedMeasures{ MetricID: req.MetricID, Measures: toAppendMeasures, }, false, ) <-waitCh } //fmt.Printf("m.Value: %v < untilValue: %v\n", measure.Value, untilValue) return proto.ErrNonMonotonicValue } } extraSpace := timestamps.CalcRequiredSpace(measure.Timestamp) + values.CalcRequiredSpace(measure.Value) totalSpace := timestamps.Size() + values.Size() + extraSpace if totalSpace <= atree.DataPagePayloadSize { timestamps.Append(measure.Timestamp) values.Append(measure.Value) toAppendMeasures = append(toAppendMeasures, measure) } else { if len(toAppendMeasures) > 0 { waitCh := s.txlog.WriteAppendMeasures( txlog.AppendedMeasures{ MetricID: req.MetricID, Measures: toAppendMeasures, }, true, ) <-waitCh toAppendMeasures = nil } report, err := s.atree.AppendDataPage(atree.AppendDataPageReq{ MetricID: req.MetricID, Timestamp: until, Value: untilValue, Since: since, RootPageNo: rootPageNo, PrevPageNo: prevPageNo, TimestampsChunks: timestampsBuf.Chunks(), TimestampsSize: uint16(timestamps.Size()), ValuesChunks: valuesBuf.Chunks(), ValuesSize: uint16(values.Size()), }) if err != nil { diploma.Abort(diploma.WriteToAtreeFailed, err) } prevPageNo = report.DataPageNo if report.IsRootChanged { rootPageNo = report.NewRootPageNo } waitCh := s.txlog.WriteAppendedMeasureWithOverflow( txlog.AppendedMeasureWithOverflow{ MetricID: req.MetricID, Timestamp: measure.Timestamp, Value: measure.Value, IsDataPageReused: report.IsDataPageReused, DataPageNo: report.DataPageNo, IsRootChanged: report.IsRootChanged, RootPageNo: report.NewRootPageNo, ReusedIndexPages: report.ReusedIndexPages, }, report.FileName, (idx+1) < len(req.Measures), ) <-waitCh timestampsBuf = conbuf.New(nil) valuesBuf = conbuf.New(nil) timestamps = chunkenc.NewReverseTimeDeltaOfDeltaCompressor( timestampsBuf, 0) if result.MetricType == diploma.Cumulative { values = chunkenc.NewReverseCumulativeDeltaCompressor( valuesBuf, 0, result.FracDigits) } else { values = chunkenc.NewReverseInstantDeltaCompressor( valuesBuf, 0, result.FracDigits) } timestamps.Append(measure.Timestamp) values.Append(measure.Value) since = measure.Timestamp } until = measure.Timestamp untilValue = measure.Value } if len(toAppendMeasures) > 0 { waitCh := s.txlog.WriteAppendMeasures( txlog.AppendedMeasures{ MetricID: req.MetricID, Measures: toAppendMeasures, }, false, ) <-waitCh } case NoMetric: return proto.ErrNoMetric default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return 0 } type tryDeleteMeasuresResult struct { ResultCode byte RootPageNo uint32 } func (s *Database) DeleteMeasures(req proto.DeleteMeasuresReq) uint16 { resultCh := make(chan tryDeleteMeasuresResult, 1) s.appendJobToWorkerQueue(tryDeleteMeasuresReq{ MetricID: req.MetricID, Since: req.Since, ResultCh: resultCh, }) result := <-resultCh switch result.ResultCode { case Succeed: var ( freeDataPages []uint32 freeIndexPages []uint32 ) if result.RootPageNo > 0 { pageLists, err := s.atree.GetAllPages(result.RootPageNo) if err != nil { diploma.Abort(diploma.FailedAtreeRequest, err) } freeDataPages = pageLists.DataPages freeIndexPages = pageLists.IndexPages } waitCh := s.txlog.WriteDeletedMeasures(txlog.DeletedMeasures{ MetricID: req.MetricID, FreeDataPages: freeDataPages, FreeIndexPages: freeIndexPages, }) <-waitCh case NoMetric: return proto.ErrNoMetric default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return 0 } // SELECT type instantMeasuresResult struct { ResultCode byte FracDigits byte PageNo uint32 } func (s *Database) ListAllInstantMeasures(conn net.Conn, req proto.ListAllInstantMetricMeasuresReq) error { resultCh := make(chan instantMeasuresResult, 1) responseWriter := atree.NewInstantMeasureWriter(conn) s.appendJobToWorkerQueue(tryListAllInstantMeasuresReq{ MetricID: req.MetricID, ResponseWriter: responseWriter, ResultCh: resultCh, }) result := <-resultCh switch result.ResultCode { case QueryDone: responseWriter.Close() case UntilFound: err := s.atree.IterateAllInstantByTreeCursor(atree.IterateAllInstantByTreeCursorReq{ FracDigits: result.FracDigits, PageNo: result.PageNo, ResponseWriter: responseWriter, }) s.metricRUnlock(req.MetricID) if err != nil { reply(conn, proto.ErrUnexpected) } else { responseWriter.Close() } case NoMetric: reply(conn, proto.ErrNoMetric) case WrongMetricType: reply(conn, proto.ErrWrongMetricType) default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return nil } func (s *Database) ListAllCumulativeMeasures(conn io.Writer, req proto.ListAllCumulativeMeasuresReq) error { resultCh := make(chan cumulativeMeasuresResult, 1) responseWriter := atree.NewCumulativeMeasureWriter(conn) s.appendJobToWorkerQueue(tryListAllCumulativeMeasuresReq{ MetricID: req.MetricID, ResponseWriter: responseWriter, ResultCh: resultCh, }) result := <-resultCh switch result.ResultCode { case QueryDone: responseWriter.Close() case UntilFound: err := s.atree.IterateAllCumulativeByTreeCursor(atree.IterateAllCumulativeByTreeCursorReq{ FracDigits: result.FracDigits, PageNo: result.PageNo, EndTimestamp: result.EndTimestamp, EndValue: result.EndValue, ResponseWriter: responseWriter, }) s.metricRUnlock(req.MetricID) if err != nil { reply(conn, proto.ErrUnexpected) } else { responseWriter.Close() } case NoMetric: reply(conn, proto.ErrNoMetric) case WrongMetricType: reply(conn, proto.ErrWrongMetricType) default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return nil } func (s *Database) ListInstantMeasures(conn net.Conn, req proto.ListInstantMeasuresReq) error { if req.Since > req.Until { reply(conn, proto.ErrInvalidRange) return nil } var ( since = req.Since until = req.Until ) if req.FirstHourOfDay > 0 { since, until = correctToFHD(req.Since, req.Until, req.FirstHourOfDay) } resultCh := make(chan instantMeasuresResult, 1) responseWriter := atree.NewInstantMeasureWriter(conn) s.appendJobToWorkerQueue(tryListInstantMeasuresReq{ MetricID: req.MetricID, Since: since, Until: until, ResponseWriter: responseWriter, ResultCh: resultCh, }) result := <-resultCh switch result.ResultCode { case QueryDone: responseWriter.Close() case UntilFound: err := s.atree.ContinueIterateInstantByTreeCursor(atree.ContinueIterateInstantByTreeCursorReq{ FracDigits: result.FracDigits, Since: since, Until: until, LastPageNo: result.PageNo, ResponseWriter: responseWriter, }) s.metricRUnlock(req.MetricID) if err != nil { reply(conn, proto.ErrUnexpected) } else { responseWriter.Close() } case UntilNotFound: err := s.atree.FindAndIterateInstantByTreeCursor(atree.FindAndIterateInstantByTreeCursorReq{ FracDigits: result.FracDigits, Since: since, Until: until, RootPageNo: result.PageNo, ResponseWriter: responseWriter, }) s.metricRUnlock(req.MetricID) if err != nil { reply(conn, proto.ErrUnexpected) } else { responseWriter.Close() } case NoMetric: reply(conn, proto.ErrNoMetric) case WrongMetricType: reply(conn, proto.ErrWrongMetricType) default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return nil } type cumulativeMeasuresResult struct { ResultCode byte FracDigits byte PageNo uint32 EndTimestamp uint32 EndValue float64 } func (s *Database) ListCumulativeMeasures(conn net.Conn, req proto.ListCumulativeMeasuresReq) error { if req.Since > req.Until { reply(conn, proto.ErrInvalidRange) return nil } var ( since = req.Since until = req.Until ) if req.FirstHourOfDay > 0 { since, until = correctToFHD(since, until, req.FirstHourOfDay) } resultCh := make(chan cumulativeMeasuresResult, 1) responseWriter := atree.NewCumulativeMeasureWriter(conn) s.appendJobToWorkerQueue(tryListCumulativeMeasuresReq{ MetricID: req.MetricID, Since: since, Until: until, ResponseWriter: responseWriter, ResultCh: resultCh, }) result := <-resultCh switch result.ResultCode { case QueryDone: responseWriter.Close() case UntilFound: err := s.atree.ContinueIterateCumulativeByTreeCursor(atree.ContinueIterateCumulativeByTreeCursorReq{ FracDigits: result.FracDigits, Since: since, Until: until, LastPageNo: result.PageNo, EndTimestamp: result.EndTimestamp, EndValue: result.EndValue, ResponseWriter: responseWriter, }) s.metricRUnlock(req.MetricID) if err != nil { reply(conn, proto.ErrUnexpected) } else { responseWriter.Close() } case UntilNotFound: err := s.atree.FindAndIterateCumulativeByTreeCursor(atree.FindAndIterateCumulativeByTreeCursorReq{ FracDigits: result.FracDigits, Since: since, Until: until, RootPageNo: result.PageNo, ResponseWriter: responseWriter, }) s.metricRUnlock(req.MetricID) if err != nil { reply(conn, proto.ErrUnexpected) } else { responseWriter.Close() } case NoMetric: reply(conn, proto.ErrNoMetric) case WrongMetricType: reply(conn, proto.ErrWrongMetricType) default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return nil } type instantPeriodsResult struct { ResultCode byte FracDigits byte PageNo uint32 } func (s *Database) ListInstantPeriods(conn net.Conn, req proto.ListInstantPeriodsReq) error { if req.Since > req.Until { reply(conn, proto.ErrInvalidRange) return nil } var ( since = req.Since until = req.Until ) if req.FirstHourOfDay > 0 { since, until = correctToFHD(since, until, req.FirstHourOfDay) } resultCh := make(chan instantPeriodsResult, 1) aggregator, err := atree.NewInstantAggregator(atree.InstantAggregatorOptions{ GroupBy: req.GroupBy, FirstHourOfDay: req.FirstHourOfDay, }) if err != nil { reply(conn, proto.ErrUnexpected) return nil } responseWriter := atree.NewInstantPeriodsWriter(conn, req.AggregateFuncs) s.appendJobToWorkerQueue(tryListInstantPeriodsReq{ MetricID: req.MetricID, Since: since, Until: until, Aggregator: aggregator, ResponseWriter: responseWriter, ResultCh: resultCh, }) result := <-resultCh switch result.ResultCode { case QueryDone: responseWriter.Close() case UntilFound: err := s.atree.ContinueCollectInstantPeriods(atree.ContinueCollectInstantPeriodsReq{ FracDigits: result.FracDigits, Aggregator: aggregator, ResponseWriter: responseWriter, LastPageNo: result.PageNo, Since: since, Until: until, }) s.metricRUnlock(req.MetricID) if err != nil { reply(conn, proto.ErrUnexpected) } else { responseWriter.Close() } case UntilNotFound: err := s.atree.FindInstantPeriods(atree.FindInstantPeriodsReq{ FracDigits: result.FracDigits, ResponseWriter: responseWriter, RootPageNo: result.PageNo, Since: since, Until: until, GroupBy: req.GroupBy, FirstHourOfDay: req.FirstHourOfDay, }) s.metricRUnlock(req.MetricID) if err != nil { reply(conn, proto.ErrUnexpected) } else { responseWriter.Close() } case NoMetric: reply(conn, proto.ErrNoMetric) case WrongMetricType: reply(conn, proto.ErrWrongMetricType) default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return nil } type cumulativePeriodsResult struct { ResultCode byte FracDigits byte PageNo uint32 } func (s *Database) ListCumulativePeriods(conn net.Conn, req proto.ListCumulativePeriodsReq) error { if req.Since > req.Until { reply(conn, proto.ErrInvalidRange) return nil } var ( since = req.Since until = req.Until ) if req.FirstHourOfDay > 0 { since, until = correctToFHD(since, until, req.FirstHourOfDay) } resultCh := make(chan cumulativePeriodsResult, 1) aggregator, err := atree.NewCumulativeAggregator(atree.CumulativeAggregatorOptions{ GroupBy: req.GroupBy, FirstHourOfDay: req.FirstHourOfDay, }) if err != nil { reply(conn, proto.ErrUnexpected) return nil } responseWriter := atree.NewCumulativePeriodsWriter(conn) s.appendJobToWorkerQueue(tryListCumulativePeriodsReq{ MetricID: req.MetricID, Since: since, Until: until, Aggregator: aggregator, ResponseWriter: responseWriter, ResultCh: resultCh, }) result := <-resultCh switch result.ResultCode { case QueryDone: responseWriter.Close() case UntilFound: err := s.atree.ContinueCollectCumulativePeriods(atree.ContinueCollectCumulativePeriodsReq{ FracDigits: result.FracDigits, Aggregator: aggregator, ResponseWriter: responseWriter, LastPageNo: result.PageNo, Since: since, Until: until, }) s.metricRUnlock(req.MetricID) if err != nil { reply(conn, proto.ErrUnexpected) } else { responseWriter.Close() } case UntilNotFound: err := s.atree.FindCumulativePeriods(atree.FindCumulativePeriodsReq{ FracDigits: result.FracDigits, ResponseWriter: responseWriter, RootPageNo: result.PageNo, Since: since, Until: until, GroupBy: req.GroupBy, FirstHourOfDay: req.FirstHourOfDay, }) s.metricRUnlock(req.MetricID) if err != nil { reply(conn, proto.ErrUnexpected) } else { responseWriter.Close() } case NoMetric: reply(conn, proto.ErrNoMetric) case WrongMetricType: reply(conn, proto.ErrWrongMetricType) default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return nil } func (s *Database) ListCurrentValues(conn net.Conn, req proto.ListCurrentValuesReq) error { responseWriter := atree.NewCurrentValueWriter(conn) defer responseWriter.Close() resultCh := make(chan struct{}) s.appendJobToWorkerQueue(tryListCurrentValuesReq{ MetricIDs: req.MetricIDs, ResponseWriter: responseWriter, ResultCh: resultCh, }) <-resultCh return nil }