package database import ( "errors" "fmt" "io" "net" "gordenko.dev/dima/diploma" "gordenko.dev/dima/diploma/atree" "gordenko.dev/dima/diploma/bin" "gordenko.dev/dima/diploma/bufreader" "gordenko.dev/dima/diploma/chunkenc" "gordenko.dev/dima/diploma/conbuf" "gordenko.dev/dima/diploma/proto" "gordenko.dev/dima/diploma/transform" "gordenko.dev/dima/diploma/txlog" ) var ( ErrNoValueBug = errors.New("has timestamp but no value") ErrWrongResultCodeBug = errors.New("bug: wrong result code") successMsg = []byte{ proto.RespSuccess, } ) func reply(conn io.Writer, errcode uint16) { var answer []byte if errcode == 0 { answer = successMsg } else { answer = []byte{ proto.RespError, 0, 0, } bin.PutUint16(answer[1:], errcode) } _, err := conn.Write(answer) if err != nil { return } } func (s *Database) handleTCPConn(conn net.Conn) { defer conn.Close() r := bufreader.New(conn, 128) for { err := s.processRequest(conn, r) if err != nil { if err != io.EOF { s.logger.Println(err) } return } } } func (s *Database) processRequest(conn net.Conn, r *bufreader.BufferedReader) (err error) { messageType, err := r.ReadByte() if err != nil { if err != io.EOF { return fmt.Errorf("read messageType: %s", err) } else { return err } } switch messageType { case proto.TypeGetMetric: req, err := proto.ReadGetMetricReq(r) if err != nil { return fmt.Errorf("proto.ReadGetMetricReq: %s", err) } if err = s.GetMetric(conn, req); err != nil { return fmt.Errorf("GetMetric: %s", err) } case proto.TypeAddMetric: req, err := proto.ReadAddMetricReq(r) if err != nil { return fmt.Errorf("proto.ReadAddMetricReq: %s", err) } reply(conn, s.AddMetric(req)) case proto.TypeDeleteMetric: req, err := proto.ReadDeleteMetricReq(r) if err != nil { return fmt.Errorf("proto.ReadDeleteMetricReq: %s", err) } reply(conn, s.DeleteMetric(req)) case proto.TypeAppendMeasure: req, err := proto.ReadAppendMeasureReq(r) if err != nil { return fmt.Errorf("proto.ReadAppendMeasureReq: %s", err) } //fmt.Println("append measure", req.MetricID, conn.RemoteAddr().String()) reply(conn, s.AppendMeasure(req)) case proto.TypeAppendMeasures: req, err := proto.ReadAppendMeasuresReq(r) if err != nil { return fmt.Errorf("proto.ReadAppendMeasuresReq: %s", err) } //fmt.Println("append measure", req.MetricID, conn.RemoteAddr().String()) reply(conn, s.AppendMeasures(req)) case proto.TypeListInstantMeasures: req, err := proto.ReadListInstantMeasuresReq(r) if err != nil { return fmt.Errorf("proto.ReadListInstantMeasuresReq: %s", err) } if err = s.ListInstantMeasures(conn, req); err != nil { return fmt.Errorf("ListInstantMeasures: %s", err) } case proto.TypeListCumulativeMeasures: req, err := proto.ReadListCumulativeMeasuresReq(r) if err != nil { return fmt.Errorf("proto.ReadListCumulativeMeasuresReq: %s", err) } if err = s.ListCumulativeMeasures(conn, req); err != nil { return fmt.Errorf("ListCumulativeMeasures: %s", err) } case proto.TypeListInstantPeriods: req, err := proto.ReadListInstantPeriodsReq(r) if err != nil { return fmt.Errorf("proto.ReadListInstantPeriodsReq: %s", err) } if err = s.ListInstantPeriods(conn, req); err != nil { return fmt.Errorf("ListInstantPeriods: %s", err) } case proto.TypeListCumulativePeriods: req, err := proto.ReadListCumulativePeriodsReq(r) if err != nil { return fmt.Errorf("proto.ReadListCumulativePeriodsReq: %s", err) } if err = s.ListCumulativePeriods(conn, req); err != nil { return fmt.Errorf("ListCumulativePeriods: %s", err) } case proto.TypeListCurrentValues: req, err := proto.ReadListCurrentValuesReq(r) if err != nil { return fmt.Errorf("proto.ListCurrentValuesReq: %s", err) } if err = s.ListCurrentValues(conn, req); err != nil { return fmt.Errorf("ListCurrentValues: %s", err) } case proto.TypeDeleteMeasures: req, err := proto.ReadDeleteMeasuresReq(r) if err != nil { return fmt.Errorf("proto.ReadDeleteMeasuresReq: %s", err) } reply(conn, s.DeleteMeasures(req)) case proto.TypeListAllInstantMeasures: req, err := proto.ReadListAllInstantMeasuresReq(r) if err != nil { return fmt.Errorf("proto.ReadListAllInstantMeasuresReq: %s", err) } if err = s.ListAllInstantMeasures(conn, req); err != nil { return fmt.Errorf("ListAllInstantMeasures: %s", err) } case proto.TypeListAllCumulativeMeasures: req, err := proto.ReadListAllCumulativeMeasuresReq(r) if err != nil { return fmt.Errorf("proto.ReadListAllCumulativeMeasuresReq: %s", err) } if err = s.ListAllCumulativeMeasures(conn, req); err != nil { return fmt.Errorf("ListAllCumulativeMeasures: %s", err) } default: return fmt.Errorf("unknown messageType: %d", messageType) } return } // API func (s *Database) AddMetric(req proto.AddMetricReq) uint16 { // Валидация if req.MetricID == 0 { return proto.ErrEmptyMetricID } if byte(req.FracDigits) > diploma.MaxFracDigits { return proto.ErrWrongFracDigits } switch req.MetricType { case diploma.Cumulative, diploma.Instant: // ok default: return proto.ErrWrongMetricType } resultCh := make(chan byte, 1) s.appendJobToWorkerQueue(tryAddMetricReq{ MetricID: req.MetricID, ResultCh: resultCh, }) resultCode := <-resultCh switch resultCode { case Succeed: waitCh := s.txlog.WriteAddedMetric(txlog.AddedMetric{ MetricID: req.MetricID, MetricType: req.MetricType, FracDigits: req.FracDigits, }) <-waitCh case MetricDuplicate: return proto.ErrDuplicate default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return 0 } type Metric struct { MetricType diploma.MetricType FracDigits byte ResultCode byte } func (s *Database) GetMetric(conn io.Writer, req proto.GetMetricReq) error { resultCh := make(chan Metric, 1) s.appendJobToWorkerQueue(tryGetMetricReq{ MetricID: req.MetricID, ResultCh: resultCh, }) result := <-resultCh switch result.ResultCode { case Succeed: answer := []byte{ proto.RespValue, 0, 0, 0, 0, // metricID byte(result.MetricType), result.FracDigits, } bin.PutUint32(answer[1:], req.MetricID) _, err := conn.Write(answer) if err != nil { return err } case NoMetric: reply(conn, proto.ErrNoMetric) default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return nil } type tryDeleteMetricResult struct { ResultCode byte RootPageNo uint32 } func (s *Database) DeleteMetric(req proto.DeleteMetricReq) uint16 { resultCh := make(chan tryDeleteMetricResult, 1) s.appendJobToWorkerQueue(tryDeleteMetricReq{ MetricID: req.MetricID, ResultCh: resultCh, }) result := <-resultCh switch result.ResultCode { case Succeed: var ( freeDataPages []uint32 freeIndexPages []uint32 ) if result.RootPageNo > 0 { pageLists, err := s.atree.GetAllPages(result.RootPageNo) if err != nil { diploma.Abort(diploma.FailedAtreeRequest, err) } freeDataPages = pageLists.DataPages freeIndexPages = pageLists.IndexPages } waitCh := s.txlog.WriteDeletedMetric(txlog.DeletedMetric{ MetricID: req.MetricID, FreeDataPages: freeDataPages, FreeIndexPages: freeIndexPages, }) <-waitCh case NoMetric: return proto.ErrNoMetric default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return 0 } type FilledPage struct { Since uint32 RootPageNo uint32 PrevPageNo uint32 TimestampsChunks [][]byte TimestampsSize uint16 ValuesChunks [][]byte ValuesSize uint16 } type tryAppendMeasureResult struct { MetricID uint32 Timestamp uint32 Value float64 FilledPage *FilledPage ResultCode byte } func (s *Database) AppendMeasure(req proto.AppendMeasureReq) uint16 { resultCh := make(chan tryAppendMeasureResult, 1) s.appendJobToWorkerQueue(tryAppendMeasureReq{ MetricID: req.MetricID, Timestamp: req.Timestamp, Value: req.Value, ResultCh: resultCh, }) result := <-resultCh switch result.ResultCode { case CanAppend: waitCh := s.txlog.WriteAppendMeasure(txlog.AppendedMeasure{ MetricID: req.MetricID, Timestamp: req.Timestamp, Value: req.Value, }) <-waitCh case NewPage: filled := result.FilledPage report, err := s.atree.AppendDataPage(atree.AppendDataPageReq{ MetricID: req.MetricID, Timestamp: req.Timestamp, Value: req.Value, Since: filled.Since, RootPageNo: filled.RootPageNo, PrevPageNo: filled.PrevPageNo, TimestampsChunks: filled.TimestampsChunks, TimestampsSize: filled.TimestampsSize, ValuesChunks: filled.ValuesChunks, ValuesSize: filled.ValuesSize, }) if err != nil { diploma.Abort(diploma.WriteToAtreeFailed, err) } waitCh := s.txlog.WriteAppendedMeasureWithOverflow( txlog.AppendedMeasureWithOverflow{ MetricID: req.MetricID, Timestamp: req.Timestamp, Value: req.Value, IsDataPageReused: report.IsDataPageReused, DataPageNo: report.DataPageNo, IsRootChanged: report.IsRootChanged, RootPageNo: report.NewRootPageNo, ReusedIndexPages: report.ReusedIndexPages, }, report.FileName, false, ) <-waitCh case NoMetric: return proto.ErrNoMetric case ExpiredMeasure: return proto.ErrExpiredMeasure case NonMonotonicValue: return proto.ErrNonMonotonicValue default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return 0 } type tryAppendMeasuresResult struct { ResultCode byte MetricType diploma.MetricType FracDigits byte Since uint32 Until uint32 UntilValue float64 RootPageNo uint32 PrevPageNo uint32 TimestampsBuf *conbuf.ContinuousBuffer ValuesBuf *conbuf.ContinuousBuffer Timestamps diploma.TimestampCompressor Values diploma.ValueCompressor } func (s *Database) AppendMeasures(req proto.AppendMeasuresReq) uint16 { resultCh := make(chan tryAppendMeasuresResult, 1) s.appendJobToWorkerQueue(tryAppendMeasuresReq{ MetricID: req.MetricID, ResultCh: resultCh, }) result := <-resultCh switch result.ResultCode { case CanAppend: var ( rootPageNo = result.RootPageNo prevPageNo = result.PrevPageNo timestampsBuf = result.TimestampsBuf valuesBuf = result.ValuesBuf timestamps = result.Timestamps values = result.Values since = result.Since until = result.Until untilValue = result.UntilValue // toAppendMeasures []proto.Measure ) for idx, measure := range req.Measures { if since == 0 { since = measure.Timestamp } else { if measure.Timestamp <= until { if len(toAppendMeasures) > 0 { waitCh := s.txlog.WriteAppendMeasures( txlog.AppendedMeasures{ MetricID: req.MetricID, Measures: toAppendMeasures, }, false, ) <-waitCh } return proto.ErrExpiredMeasure } if result.MetricType == diploma.Cumulative && measure.Value < untilValue { if len(toAppendMeasures) > 0 { waitCh := s.txlog.WriteAppendMeasures( txlog.AppendedMeasures{ MetricID: req.MetricID, Measures: toAppendMeasures, }, false, ) <-waitCh } return proto.ErrNonMonotonicValue } } extraSpace := timestamps.CalcRequiredSpace(measure.Timestamp) + values.CalcRequiredSpace(measure.Value) totalSpace := timestamps.Size() + values.Size() + extraSpace if totalSpace <= atree.DataPagePayloadSize { timestamps.Append(measure.Timestamp) values.Append(measure.Value) toAppendMeasures = append(toAppendMeasures, measure) } else { if len(toAppendMeasures) > 0 { waitCh := s.txlog.WriteAppendMeasures( txlog.AppendedMeasures{ MetricID: req.MetricID, Measures: toAppendMeasures, }, true, ) <-waitCh toAppendMeasures = nil } //fmt.Printf("APPEND DATA PAGE %d, %v\n", measure.Timestamp, measure.Value) report, err := s.atree.AppendDataPage(atree.AppendDataPageReq{ MetricID: req.MetricID, Timestamp: measure.Timestamp, Value: measure.Value, Since: since, RootPageNo: rootPageNo, PrevPageNo: prevPageNo, TimestampsChunks: timestampsBuf.Chunks(), TimestampsSize: uint16(timestamps.Size()), ValuesChunks: valuesBuf.Chunks(), ValuesSize: uint16(values.Size()), }) if err != nil { diploma.Abort(diploma.WriteToAtreeFailed, err) } prevPageNo = report.DataPageNo if report.IsRootChanged { rootPageNo = report.NewRootPageNo } waitCh := s.txlog.WriteAppendedMeasureWithOverflow( txlog.AppendedMeasureWithOverflow{ MetricID: req.MetricID, Timestamp: measure.Timestamp, Value: measure.Value, IsDataPageReused: report.IsDataPageReused, DataPageNo: report.DataPageNo, IsRootChanged: report.IsRootChanged, RootPageNo: report.NewRootPageNo, ReusedIndexPages: report.ReusedIndexPages, }, report.FileName, (idx+1) < len(req.Measures), ) <-waitCh timestampsBuf = conbuf.New(nil) valuesBuf = conbuf.New(nil) timestamps = chunkenc.NewReverseTimeDeltaOfDeltaCompressor( timestampsBuf, 0) if result.MetricType == diploma.Cumulative { values = chunkenc.NewReverseCumulativeDeltaCompressor( valuesBuf, 0, result.FracDigits) } else { values = chunkenc.NewReverseInstantDeltaCompressor( valuesBuf, 0, result.FracDigits) } timestamps.Append(measure.Timestamp) values.Append(measure.Value) since = measure.Timestamp } until = measure.Timestamp untilValue = measure.Value } if len(toAppendMeasures) > 0 { waitCh := s.txlog.WriteAppendMeasures( txlog.AppendedMeasures{ MetricID: req.MetricID, Measures: toAppendMeasures, }, false, ) <-waitCh } case NoMetric: return proto.ErrNoMetric default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return 0 } type tryDeleteMeasuresResult struct { ResultCode byte RootPageNo uint32 } func (s *Database) DeleteMeasures(req proto.DeleteMeasuresReq) uint16 { resultCh := make(chan tryDeleteMeasuresResult, 1) s.appendJobToWorkerQueue(tryDeleteMeasuresReq{ MetricID: req.MetricID, Since: req.Since, ResultCh: resultCh, }) result := <-resultCh switch result.ResultCode { case NoMeasuresToDelete: // ok case DeleteFromAtreeNotNeeded: // регистрирую удаление в TransactionLog waitCh := s.txlog.WriteDeletedMeasures(txlog.DeletedMeasures{ MetricID: req.MetricID, }) <-waitCh case DeleteFromAtreeRequired: // собираю номера всех data и index страниц метрики (типа запись REDO лога). pageLists, err := s.atree.GetAllPages(result.RootPageNo) if err != nil { diploma.Abort(diploma.FailedAtreeRequest, err) } // регистрирую удаление в TransactionLog waitCh := s.txlog.WriteDeletedMeasures(txlog.DeletedMeasures{ MetricID: req.MetricID, FreeDataPages: pageLists.DataPages, FreeIndexPages: pageLists.IndexPages, }) <-waitCh case NoMetric: return proto.ErrNoMetric default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return 0 } // SELECT type fullScanResult struct { ResultCode byte FracDigits byte LastPageNo uint32 } func (s *Database) ListAllInstantMeasures(conn net.Conn, req proto.ListAllInstantMetricMeasuresReq) error { responseWriter := transform.NewInstantMeasureWriter(conn, 0) return s.fullScan(fullScanReq{ MetricID: req.MetricID, MetricType: diploma.Instant, Conn: conn, ResponseWriter: responseWriter, }) } func (s *Database) ListAllCumulativeMeasures(conn io.Writer, req proto.ListAllCumulativeMeasuresReq) error { responseWriter := transform.NewCumulativeMeasureWriter(conn, 0) return s.fullScan(fullScanReq{ MetricID: req.MetricID, MetricType: diploma.Cumulative, Conn: conn, ResponseWriter: responseWriter, }) } func (s *Database) ListInstantMeasures(conn net.Conn, req proto.ListInstantMeasuresReq) error { if req.Since > req.Until { reply(conn, proto.ErrInvalidRange) return nil } responseWriter := transform.NewInstantMeasureWriter(conn, req.Since) return s.rangeScan(rangeScanReq{ MetricID: req.MetricID, MetricType: diploma.Instant, Since: req.Since, Until: req.Until, Conn: conn, ResponseWriter: responseWriter, }) } func (s *Database) ListCumulativeMeasures(conn net.Conn, req proto.ListCumulativeMeasuresReq) error { if req.Since > req.Until { reply(conn, proto.ErrInvalidRange) return nil } responseWriter := transform.NewCumulativeMeasureWriter(conn, req.Since) return s.rangeScan(rangeScanReq{ MetricID: req.MetricID, MetricType: diploma.Cumulative, Since: req.Since, Until: req.Until, Conn: conn, ResponseWriter: responseWriter, }) } type rangeScanResult struct { ResultCode byte FracDigits byte RootPageNo uint32 LastPageNo uint32 } func (s *Database) ListInstantPeriods(conn net.Conn, req proto.ListInstantPeriodsReq) error { since, until := timeBoundsOfAggregation(req.Since, req.Until, req.GroupBy, req.FirstHourOfDay) if since.After(until) { reply(conn, proto.ErrInvalidRange) return nil } responseWriter, err := transform.NewInstantPeriodsWriter(transform.InstantPeriodsWriterOptions{ Dst: conn, GroupBy: req.GroupBy, AggregateFuncs: req.AggregateFuncs, FirstHourOfDay: req.FirstHourOfDay, }) if err != nil { reply(conn, proto.ErrUnexpected) return nil } return s.rangeScan(rangeScanReq{ MetricID: req.MetricID, MetricType: diploma.Instant, Since: uint32(since.Unix()), Until: uint32(until.Unix()), Conn: conn, ResponseWriter: responseWriter, }) } func (s *Database) ListCumulativePeriods(conn net.Conn, req proto.ListCumulativePeriodsReq) error { since, until := timeBoundsOfAggregation(req.Since, req.Until, req.GroupBy, req.FirstHourOfDay) if since.After(until) { reply(conn, proto.ErrInvalidRange) return nil } responseWriter, err := transform.NewCumulativePeriodsWriter(transform.CumulativePeriodsWriterOptions{ Dst: conn, GroupBy: req.GroupBy, Since: uint32(since.Unix()), FirstHourOfDay: req.FirstHourOfDay, }) if err != nil { reply(conn, proto.ErrUnexpected) return nil } return s.rangeScan(rangeScanReq{ MetricID: req.MetricID, MetricType: diploma.Cumulative, Since: uint32(since.Unix()), Until: uint32(until.Unix()), Conn: conn, ResponseWriter: responseWriter, }) } type rangeScanReq struct { MetricID uint32 MetricType diploma.MetricType Since uint32 Until uint32 Conn io.Writer ResponseWriter diploma.MeasureConsumer } func (s *Database) rangeScan(req rangeScanReq) error { resultCh := make(chan rangeScanResult, 1) s.appendJobToWorkerQueue(tryRangeScanReq{ MetricID: req.MetricID, Since: req.Since, Until: req.Until, MetricType: req.MetricType, ResponseWriter: req.ResponseWriter, ResultCh: resultCh, }) result := <-resultCh switch result.ResultCode { case QueryDone: req.ResponseWriter.Close() case UntilFound: err := s.atree.ContinueRangeScan(atree.ContinueRangeScanReq{ MetricType: req.MetricType, FracDigits: result.FracDigits, ResponseWriter: req.ResponseWriter, LastPageNo: result.LastPageNo, Since: req.Since, }) s.metricRUnlock(req.MetricID) if err != nil { reply(req.Conn, proto.ErrUnexpected) } else { req.ResponseWriter.Close() } case UntilNotFound: err := s.atree.RangeScan(atree.RangeScanReq{ MetricType: req.MetricType, FracDigits: result.FracDigits, ResponseWriter: req.ResponseWriter, RootPageNo: result.RootPageNo, Since: req.Since, Until: req.Until, }) s.metricRUnlock(req.MetricID) if err != nil { reply(req.Conn, proto.ErrUnexpected) } else { req.ResponseWriter.Close() } case NoMetric: reply(req.Conn, proto.ErrNoMetric) case WrongMetricType: reply(req.Conn, proto.ErrWrongMetricType) default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return nil } type fullScanReq struct { MetricID uint32 MetricType diploma.MetricType Conn io.Writer ResponseWriter diploma.MeasureConsumer } func (s *Database) fullScan(req fullScanReq) error { resultCh := make(chan fullScanResult, 1) s.appendJobToWorkerQueue(tryFullScanReq{ MetricID: req.MetricID, MetricType: req.MetricType, ResponseWriter: req.ResponseWriter, ResultCh: resultCh, }) result := <-resultCh switch result.ResultCode { case QueryDone: req.ResponseWriter.Close() case UntilFound: err := s.atree.ContinueFullScan(atree.ContinueFullScanReq{ MetricType: req.MetricType, FracDigits: result.FracDigits, ResponseWriter: req.ResponseWriter, LastPageNo: result.LastPageNo, }) s.metricRUnlock(req.MetricID) if err != nil { reply(req.Conn, proto.ErrUnexpected) } else { req.ResponseWriter.Close() } case NoMetric: reply(req.Conn, proto.ErrNoMetric) case WrongMetricType: reply(req.Conn, proto.ErrWrongMetricType) default: diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug) } return nil } func (s *Database) ListCurrentValues(conn net.Conn, req proto.ListCurrentValuesReq) error { responseWriter := transform.NewCurrentValueWriter(conn) defer responseWriter.Close() resultCh := make(chan struct{}) s.appendJobToWorkerQueue(tryListCurrentValuesReq{ MetricIDs: req.MetricIDs, ResponseWriter: responseWriter, ResultCh: resultCh, }) <-resultCh return nil }