package database import ( "fmt" "gordenko.dev/dima/diploma" "gordenko.dev/dima/diploma/atree" "gordenko.dev/dima/diploma/chunkenc" "gordenko.dev/dima/diploma/conbuf" "gordenko.dev/dima/diploma/enc" "gordenko.dev/dima/diploma/txlog" ) const ( QueryDone = 1 UntilFound = 2 UntilNotFound = 3 NoMetric = 4 MetricDuplicate = 5 Succeed = 6 NewPage = 7 ExpiredMeasure = 8 NonMonotonicValue = 9 CanAppend = 10 WrongMetricType = 11 ) func (s *Database) worker() { for { select { case <-s.workerSignalCh: s.DoWork() } } } func (s *Database) DoWork() { s.mutex.Lock() rLocksToRelease := s.rLocksToRelease workerQueue := s.workerQueue s.rLocksToRelease = nil s.workerQueue = nil s.mutex.Unlock() for _, metricID := range rLocksToRelease { lockEntry, ok := s.metricLockEntries[metricID] if !ok { diploma.Abort(diploma.NoLockEntryBug, fmt.Errorf("drainQueues: lockEntry not found for the metric %d", metricID)) } if lockEntry.XLock { diploma.Abort(diploma.XLockBug, fmt.Errorf("drainQueues: xlock is set for the metric %d", metricID)) } if lockEntry.RLocks <= 0 { diploma.Abort(diploma.NoRLockBug, fmt.Errorf("drainQueues: rlock not set for the metric %d", metricID)) } lockEntry.RLocks-- if len(lockEntry.WaitQueue) > 0 { metric, ok := s.metrics[metricID] if !ok { diploma.Abort(diploma.NoMetricBug, fmt.Errorf("drainQueues: metric %d not found", metricID)) } s.processMetricQueue(metricID, metric, lockEntry) } else { if lockEntry.RLocks == 0 { delete(s.metricLockEntries, metricID) } } } for _, untyped := range workerQueue { switch req := untyped.(type) { case tryAppendMeasureReq: s.tryAppendMeasure(req) case tryAppendMeasuresReq: s.tryAppendMeasures(req) case txlog.Changes: s.applyChanges(req) // all metrics only case tryListCurrentValuesReq: s.tryListCurrentValues(req) // all metrics only case tryListCumulativePeriodsReq: s.tryListCumulativePeriods(req) case tryListInstantPeriodsReq: s.tryListInstantPeriods(req) case tryListCumulativeMeasuresReq: s.tryListCumulativeMeasures(req) case tryListInstantMeasuresReq: s.tryListInstantMeasures(req) case tryListAllInstantMeasuresReq: s.tryListAllInstantMeasures(req) case tryListAllCumulativeMeasuresReq: s.tryListAllCumulativeMeasures(req) case tryAddMetricReq: s.tryAddMetric(req) case tryDeleteMetricReq: s.tryDeleteMetric(req) case tryDeleteMeasuresReq: s.tryDeleteMeasures(req) case tryGetMetricReq: s.tryGetMetric(req) default: diploma.Abort(diploma.UnknownWorkerQueueItemBug, fmt.Errorf("bug: unknown worker queue item type %T", req)) } } } func (s *Database) processMetricQueue(metricID uint32, metric *_metric, lockEntry *metricLockEntry) { if len(lockEntry.WaitQueue) == 0 { return } var modificationReqs []any for _, untyped := range lockEntry.WaitQueue { var rLockRequired bool switch req := untyped.(type) { case tryListCumulativePeriodsReq: rLockRequired = s.startListCumulativePeriods(metric, req) case tryListInstantPeriodsReq: rLockRequired = s.startListInstantPeriods(metric, req) case tryListCumulativeMeasuresReq: rLockRequired = s.startListCumulativeMeasures(metric, req) case tryListInstantMeasuresReq: rLockRequired = s.startListInstantMeasures(metric, req) case tryListAllInstantMeasuresReq: rLockRequired = s.startListAllInstantMeasures(metric, req) case tryListAllCumulativeMeasuresReq: rLockRequired = s.startListAllCumulativeMeasures(metric, req) case tryGetMetricReq: s.tryGetMetric(req) default: modificationReqs = append(modificationReqs, untyped) } if rLockRequired { lockEntry.RLocks++ } } lockEntry.WaitQueue = nil if lockEntry.RLocks > 0 { lockEntry.WaitQueue = modificationReqs } else { for idx, untyped := range modificationReqs { switch req := untyped.(type) { case tryAppendMeasureReq: s.startAppendMeasure(metric, req, nil) case tryAppendMeasuresReq: s.startAppendMeasures(metric, req, nil) case tryDeleteMetricReq: s.startDeleteMetric(metric, req) case tryDeleteMeasuresReq: s.startDeleteMeasures(metric, req) default: diploma.Abort(diploma.UnknownMetricWaitQueueItemBug, fmt.Errorf("bug: unknown metric wait queue item type %T", req)) } lockEntry, ok := s.metricLockEntries[metricID] if ok { start := idx + 1 if start < len(modificationReqs) { lockEntry.WaitQueue = append(lockEntry.WaitQueue, modificationReqs[start:]...) } break } } } } type tryAddMetricReq struct { MetricID uint32 ResultCh chan byte } func (s *Database) tryAddMetric(req tryAddMetricReq) { _, ok := s.metrics[req.MetricID] if ok { req.ResultCh <- MetricDuplicate return } lockEntry, ok := s.metricLockEntries[req.MetricID] if ok { lockEntry.WaitQueue = append(lockEntry.WaitQueue, req) } else { s.metricLockEntries[req.MetricID] = &metricLockEntry{ XLock: true, } req.ResultCh <- Succeed } } func (s *Database) processTryAddMetricReqsImmediatelyAfterDelete(reqs []tryAddMetricReq) { if len(reqs) == 0 { return } var ( req = reqs[0] waitQueue []any ) if len(reqs) > 1 { for _, req := range reqs[1:] { waitQueue = append(waitQueue, req) } } s.metricLockEntries[req.MetricID] = &metricLockEntry{ XLock: true, WaitQueue: waitQueue, } req.ResultCh <- Succeed } type tryGetMetricReq struct { MetricID uint32 ResultCh chan Metric } func (s *Database) tryGetMetric(req tryGetMetricReq) { metric, ok := s.metrics[req.MetricID] if ok { req.ResultCh <- Metric{ ResultCode: Succeed, MetricType: metric.MetricType, FracDigits: metric.FracDigits, } } else { req.ResultCh <- Metric{ ResultCode: NoMetric, } } } type tryDeleteMetricReq struct { MetricID uint32 ResultCh chan tryDeleteMetricResult } func (s *Database) tryDeleteMetric(req tryDeleteMetricReq) { metric, ok := s.metrics[req.MetricID] if !ok { req.ResultCh <- tryDeleteMetricResult{ ResultCode: NoMetric, } return } lockEntry, ok := s.metricLockEntries[req.MetricID] if ok { lockEntry.WaitQueue = append(lockEntry.WaitQueue, req) } else { s.startDeleteMetric(metric, req) } } func (s *Database) startDeleteMetric(metric *_metric, req tryDeleteMetricReq) { s.metricLockEntries[req.MetricID] = &metricLockEntry{ XLock: true, } req.ResultCh <- tryDeleteMetricResult{ ResultCode: Succeed, RootPageNo: metric.RootPageNo, } } type tryDeleteMeasuresReq struct { MetricID uint32 Since uint32 ResultCh chan tryDeleteMeasuresResult } func (s *Database) tryDeleteMeasures(req tryDeleteMeasuresReq) { metric, ok := s.metrics[req.MetricID] if !ok { req.ResultCh <- tryDeleteMeasuresResult{ ResultCode: NoMetric, } return } lockEntry, ok := s.metricLockEntries[req.MetricID] if ok { lockEntry.WaitQueue = append(lockEntry.WaitQueue, req) } else { s.startDeleteMeasures(metric, req) } } func (s *Database) startDeleteMeasures(metric *_metric, req tryDeleteMeasuresReq) { s.metricLockEntries[req.MetricID] = &metricLockEntry{ XLock: true, } req.ResultCh <- tryDeleteMeasuresResult{ ResultCode: Succeed, RootPageNo: metric.RootPageNo, } } // SELECT type tryListAllInstantMeasuresReq struct { MetricID uint32 ResponseWriter *atree.InstantMeasureWriter ResultCh chan instantMeasuresResult } func (s *Database) tryListAllInstantMeasures(req tryListAllInstantMeasuresReq) { metric, ok := s.metrics[req.MetricID] if !ok { req.ResultCh <- instantMeasuresResult{ ResultCode: NoMetric, } return } if metric.MetricType != diploma.Instant { req.ResultCh <- instantMeasuresResult{ ResultCode: WrongMetricType, } return } lockEntry, ok := s.metricLockEntries[req.MetricID] if ok { if lockEntry.XLock { lockEntry.WaitQueue = append(lockEntry.WaitQueue, req) return } } if s.startListAllInstantMeasures(metric, req) { if lockEntry != nil { lockEntry.RLocks++ } else { s.metricLockEntries[req.MetricID] = &metricLockEntry{ RLocks: 1, } } } } func (s *Database) startListAllInstantMeasures(metric *_metric, req tryListAllInstantMeasuresReq) bool { if metric.Since == 0 { req.ResultCh <- instantMeasuresResult{ ResultCode: QueryDone, } return false } timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor( metric.TimestampsBuf, metric.Timestamps.Size(), ) valueDecompressor := chunkenc.NewReverseInstantDeltaDecompressor( metric.ValuesBuf, metric.Values.Size(), metric.FracDigits, ) var ( value float64 timestamp uint32 done bool buffered int ) for { timestamp, done = timestampDecompressor.NextValue() if done { break } value, done = valueDecompressor.NextValue() if done { diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug) } req.ResponseWriter.BufferMeasure(atree.InstantMeasure{ Timestamp: timestamp, Value: value, }) buffered++ } if metric.LastPageNo > 0 { req.ResultCh <- instantMeasuresResult{ ResultCode: UntilFound, PageNo: metric.LastPageNo, FracDigits: metric.FracDigits, } return true } else { req.ResultCh <- instantMeasuresResult{ ResultCode: QueryDone, } return false } } type tryListInstantMeasuresReq struct { MetricID uint32 Since uint32 Until uint32 ResponseWriter *atree.InstantMeasureWriter ResultCh chan instantMeasuresResult } func (s *Database) tryListInstantMeasures(req tryListInstantMeasuresReq) { metric, ok := s.metrics[req.MetricID] if !ok { req.ResultCh <- instantMeasuresResult{ ResultCode: NoMetric, } return } if metric.MetricType != diploma.Instant { req.ResultCh <- instantMeasuresResult{ ResultCode: WrongMetricType, } return } lockEntry, ok := s.metricLockEntries[req.MetricID] if ok { if lockEntry.XLock { lockEntry.WaitQueue = append(lockEntry.WaitQueue, req) return } } if s.startListInstantMeasures(metric, req) { if lockEntry != nil { lockEntry.RLocks++ } else { s.metricLockEntries[req.MetricID] = &metricLockEntry{ RLocks: 1, } } } } func (s *Database) startListInstantMeasures(metric *_metric, req tryListInstantMeasuresReq) bool { if metric.Since == 0 { req.ResultCh <- instantMeasuresResult{ ResultCode: QueryDone, } return false } if req.Since > metric.Until { req.ResultCh <- instantMeasuresResult{ ResultCode: QueryDone, } return false } if req.Until < metric.Since { if metric.RootPageNo > 0 { req.ResultCh <- instantMeasuresResult{ ResultCode: UntilNotFound, PageNo: metric.RootPageNo, FracDigits: metric.FracDigits, } return true } else { req.ResultCh <- instantMeasuresResult{ ResultCode: QueryDone, } return false } } timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor( metric.TimestampsBuf, metric.Timestamps.Size(), ) valueDecompressor := chunkenc.NewReverseInstantDeltaDecompressor( metric.ValuesBuf, metric.Values.Size(), metric.FracDigits, ) var ( value float64 timestamp uint32 done bool ) for { timestamp, done = timestampDecompressor.NextValue() if done { break } value, done = valueDecompressor.NextValue() if done { diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug) } if timestamp <= req.Until { if timestamp < req.Since { req.ResultCh <- instantMeasuresResult{ ResultCode: QueryDone, } return false } req.ResponseWriter.BufferMeasure(atree.InstantMeasure{ Timestamp: timestamp, Value: value, }) if timestamp == req.Since { req.ResultCh <- instantMeasuresResult{ ResultCode: QueryDone, } return false } } } if metric.LastPageNo > 0 { req.ResultCh <- instantMeasuresResult{ ResultCode: UntilFound, PageNo: metric.LastPageNo, FracDigits: metric.FracDigits, } return true } else { req.ResultCh <- instantMeasuresResult{ ResultCode: QueryDone, } return false } } // CUMULATIVE type tryListAllCumulativeMeasuresReq struct { MetricID uint32 ResponseWriter *atree.CumulativeMeasureWriter ResultCh chan cumulativeMeasuresResult } func (s *Database) tryListAllCumulativeMeasures(req tryListAllCumulativeMeasuresReq) { metric, ok := s.metrics[req.MetricID] if !ok { req.ResultCh <- cumulativeMeasuresResult{ ResultCode: NoMetric, } return } if metric.MetricType != diploma.Cumulative { req.ResultCh <- cumulativeMeasuresResult{ ResultCode: WrongMetricType, } return } lockEntry, ok := s.metricLockEntries[req.MetricID] if ok { if lockEntry.XLock { lockEntry.WaitQueue = append(lockEntry.WaitQueue, req) return } } if s.startListAllCumulativeMeasures(metric, req) { if lockEntry != nil { lockEntry.RLocks++ } else { s.metricLockEntries[req.MetricID] = &metricLockEntry{ RLocks: 1, } } } } func (s *Database) startListAllCumulativeMeasures(metric *_metric, req tryListAllCumulativeMeasuresReq) bool { if metric.Since == 0 { req.ResultCh <- cumulativeMeasuresResult{ ResultCode: QueryDone, } return false } timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor( metric.TimestampsBuf, metric.Timestamps.Size(), ) valueDecompressor := chunkenc.NewReverseCumulativeDeltaDecompressor( metric.ValuesBuf, metric.Values.Size(), metric.FracDigits, ) endTimestamp, done := timestampDecompressor.NextValue() if done { req.ResultCh <- cumulativeMeasuresResult{ ResultCode: QueryDone, } return false } endValue, done := valueDecompressor.NextValue() if done { diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug) } for { timestamp, done := timestampDecompressor.NextValue() if done { break } value, done := valueDecompressor.NextValue() if done { diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug) } req.ResponseWriter.WriteMeasure(atree.CumulativeMeasure{ Timestamp: endTimestamp, Value: endValue, Total: endValue - value, }) endTimestamp = timestamp endValue = value } if metric.LastPageNo > 0 { req.ResultCh <- cumulativeMeasuresResult{ ResultCode: UntilFound, PageNo: metric.LastPageNo, FracDigits: metric.FracDigits, EndTimestamp: endTimestamp, EndValue: endValue, } return true } else { req.ResponseWriter.WriteMeasure(atree.CumulativeMeasure{ Timestamp: endTimestamp, Value: endValue, Total: endValue, }) req.ResultCh <- cumulativeMeasuresResult{ ResultCode: QueryDone, } return false } } type tryListCumulativeMeasuresReq struct { MetricID uint32 Since uint32 Until uint32 ResponseWriter *atree.CumulativeMeasureWriter ResultCh chan cumulativeMeasuresResult } func (s *Database) tryListCumulativeMeasures(req tryListCumulativeMeasuresReq) { metric, ok := s.metrics[req.MetricID] if !ok { req.ResultCh <- cumulativeMeasuresResult{ ResultCode: NoMetric, } return } if metric.MetricType != diploma.Cumulative { req.ResultCh <- cumulativeMeasuresResult{ ResultCode: WrongMetricType, } return } lockEntry, ok := s.metricLockEntries[req.MetricID] if ok { if lockEntry.XLock { lockEntry.WaitQueue = append(lockEntry.WaitQueue, req) return } } if s.startListCumulativeMeasures(metric, req) { if lockEntry != nil { lockEntry.RLocks++ } else { s.metricLockEntries[req.MetricID] = &metricLockEntry{ RLocks: 1, } } } } func (s *Database) startListCumulativeMeasures(metric *_metric, req tryListCumulativeMeasuresReq) bool { if metric.Since == 0 { req.ResultCh <- cumulativeMeasuresResult{ ResultCode: QueryDone, } return false } if req.Since > metric.Until { req.ResultCh <- cumulativeMeasuresResult{ ResultCode: QueryDone, } return false } if req.Until < metric.Since { // search in tree if metric.RootPageNo > 0 { req.ResultCh <- cumulativeMeasuresResult{ ResultCode: UntilNotFound, PageNo: metric.RootPageNo, FracDigits: metric.FracDigits, } return true } else { req.ResultCh <- cumulativeMeasuresResult{ ResultCode: QueryDone, } return false } } timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor( metric.TimestampsBuf, metric.Timestamps.Size(), ) valueDecompressor := chunkenc.NewReverseCumulativeDeltaDecompressor( metric.ValuesBuf, metric.Values.Size(), metric.FracDigits, ) var ( endTimestamp uint32 endValue float64 ) for { timestamp, done := timestampDecompressor.NextValue() if done { break } value, done := valueDecompressor.NextValue() if done { diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug) } if timestamp > req.Until { continue } if timestamp < req.Since { req.ResponseWriter.WriteMeasure(atree.CumulativeMeasure{ Timestamp: endTimestamp, Value: endValue, Total: endValue - value, }) req.ResultCh <- cumulativeMeasuresResult{ ResultCode: QueryDone, } return false } if endTimestamp > 0 { req.ResponseWriter.WriteMeasure(atree.CumulativeMeasure{ Timestamp: endTimestamp, Value: endValue, Total: endValue - value, }) } endTimestamp = timestamp endValue = value } if metric.LastPageNo > 0 { req.ResultCh <- cumulativeMeasuresResult{ ResultCode: UntilFound, PageNo: metric.LastPageNo, FracDigits: metric.FracDigits, EndTimestamp: endTimestamp, EndValue: endValue, } return true } else { req.ResponseWriter.WriteMeasure(atree.CumulativeMeasure{ Timestamp: endTimestamp, Value: endValue, Total: endValue, }) req.ResultCh <- cumulativeMeasuresResult{ ResultCode: QueryDone, } return false } } type tryAppendMeasureReq struct { MetricID uint32 Timestamp uint32 Value float64 ResultCh chan tryAppendMeasureResult } func (s *Database) tryAppendMeasure(req tryAppendMeasureReq) { metric, ok := s.metrics[req.MetricID] if !ok { req.ResultCh <- tryAppendMeasureResult{ ResultCode: NoMetric, } return } lockEntry, ok := s.metricLockEntries[req.MetricID] if ok { if lockEntry.XLock { lockEntry.WaitQueue = append(lockEntry.WaitQueue, req) return } } s.startAppendMeasure(metric, req, lockEntry) } func (s *Database) startAppendMeasure(metric *_metric, req tryAppendMeasureReq, lockEntry *metricLockEntry) { if req.Timestamp <= metric.Until { req.ResultCh <- tryAppendMeasureResult{ ResultCode: ExpiredMeasure, } return } if metric.MetricType == diploma.Cumulative && req.Value < metric.UntilValue { req.ResultCh <- tryAppendMeasureResult{ ResultCode: NonMonotonicValue, } return } extraSpace := metric.Timestamps.CalcRequiredSpace(req.Timestamp) + metric.Values.CalcRequiredSpace(req.Value) totalSpace := metric.Timestamps.Size() + metric.Values.Size() + extraSpace if totalSpace <= atree.DataPagePayloadSize { if lockEntry != nil { lockEntry.RLocks++ } else { s.metricLockEntries[req.MetricID] = &metricLockEntry{ RLocks: 1, } } req.ResultCh <- tryAppendMeasureResult{ ResultCode: CanAppend, } } else { if lockEntry != nil { if lockEntry.RLocks > 0 { lockEntry.WaitQueue = append(lockEntry.WaitQueue, req) return } lockEntry.XLock = true } else { s.metricLockEntries[req.MetricID] = &metricLockEntry{ XLock: true, } } req.ResultCh <- tryAppendMeasureResult{ ResultCode: NewPage, FilledPage: &FilledPage{ Since: metric.Since, RootPageNo: metric.RootPageNo, PrevPageNo: metric.LastPageNo, TimestampsChunks: metric.TimestampsBuf.Chunks(), TimestampsSize: uint16(metric.Timestamps.Size()), ValuesChunks: metric.ValuesBuf.Chunks(), ValuesSize: uint16(metric.Values.Size()), }, } } } func (s *Database) appendMeasure(rec txlog.AppendedMeasure) { metric, ok := s.metrics[rec.MetricID] if !ok { diploma.Abort(diploma.NoMetricBug, fmt.Errorf("appendMeasure: metric %d not found", rec.MetricID)) } lockEntry, ok := s.metricLockEntries[rec.MetricID] if !ok { diploma.Abort(diploma.NoLockEntryBug, fmt.Errorf("appendMeasure: lockEntry not found for the metric %d", rec.MetricID)) } if lockEntry.XLock { diploma.Abort(diploma.XLockBug, fmt.Errorf("appendMeasure: xlock is set for the metric %d", rec.MetricID)) } if lockEntry.RLocks <= 0 { diploma.Abort(diploma.NoRLockBug, fmt.Errorf("appendMeasure: rlock not set for the metric %d", rec.MetricID)) } if metric.Since == 0 { metric.Since = rec.Timestamp metric.SinceValue = rec.Value } metric.Timestamps.Append(rec.Timestamp) metric.Values.Append(rec.Value) metric.Until = rec.Timestamp metric.UntilValue = rec.Value lockEntry.RLocks-- if len(lockEntry.WaitQueue) > 0 { s.processMetricQueue(rec.MetricID, metric, lockEntry) } else { if lockEntry.RLocks == 0 { delete(s.metricLockEntries, rec.MetricID) } } } func (s *Database) appendMeasures(extended txlog.AppendedMeasuresExtended) { rec := extended.Record metric, ok := s.metrics[rec.MetricID] if !ok { diploma.Abort(diploma.NoMetricBug, fmt.Errorf("appendMeasureAfterOverflow: metric %d not found", rec.MetricID)) } lockEntry, ok := s.metricLockEntries[rec.MetricID] if !ok { diploma.Abort(diploma.NoLockEntryBug, fmt.Errorf("appendMeasureAfterOverflow: lockEntry not found for the metric %d", rec.MetricID)) } if !lockEntry.XLock { diploma.Abort(diploma.NoXLockBug, fmt.Errorf("appendMeasureAfterOverflow: xlock not set for the metric %d", rec.MetricID)) } for _, measure := range rec.Measures { if metric.Since == 0 { metric.Since = measure.Timestamp metric.SinceValue = measure.Value } metric.Timestamps.Append(measure.Timestamp) metric.Values.Append(measure.Value) metric.Until = measure.Timestamp metric.UntilValue = measure.Value } if !extended.HoldLock { lockEntry.XLock = false s.doAfterReleaseXLock(rec.MetricID, metric, lockEntry) } } func (s *Database) appendMeasureAfterOverflow(extended txlog.AppendedMeasureWithOverflowExtended) { rec := extended.Record metric, ok := s.metrics[rec.MetricID] if !ok { diploma.Abort(diploma.NoMetricBug, fmt.Errorf("appendMeasureAfterOverflow: metric %d not found", rec.MetricID)) } lockEntry, ok := s.metricLockEntries[rec.MetricID] if !ok { diploma.Abort(diploma.NoLockEntryBug, fmt.Errorf("appendMeasureAfterOverflow: lockEntry not found for the metric %d", rec.MetricID)) } if !lockEntry.XLock { diploma.Abort(diploma.NoXLockBug, fmt.Errorf("appendMeasureAfterOverflow: xlock not set for the metric %d", rec.MetricID)) } metric.ReinitBy(rec.Timestamp, rec.Value) if rec.IsRootChanged { metric.RootPageNo = rec.RootPageNo } metric.LastPageNo = rec.DataPageNo // delete free pages if rec.IsDataPageReused { s.dataFreeList.DeletePages([]uint32{ rec.DataPageNo, }) } if len(rec.ReusedIndexPages) > 0 { s.indexFreeList.DeletePages(rec.ReusedIndexPages) } if !extended.HoldLock { lockEntry.XLock = false s.doAfterReleaseXLock(rec.MetricID, metric, lockEntry) } } type tryAppendMeasuresReq struct { MetricID uint32 ResultCh chan tryAppendMeasuresResult } func (s *Database) tryAppendMeasures(req tryAppendMeasuresReq) { metric, ok := s.metrics[req.MetricID] if !ok { req.ResultCh <- tryAppendMeasuresResult{ ResultCode: NoMetric, } return } lockEntry, ok := s.metricLockEntries[req.MetricID] if ok { if lockEntry.XLock { lockEntry.WaitQueue = append(lockEntry.WaitQueue, req) return } } s.startAppendMeasures(metric, req, lockEntry) } func (s *Database) startAppendMeasures(metric *_metric, req tryAppendMeasuresReq, lockEntry *metricLockEntry) { if lockEntry != nil { if lockEntry.RLocks > 0 { lockEntry.WaitQueue = append(lockEntry.WaitQueue, req) return } lockEntry.XLock = true } else { s.metricLockEntries[req.MetricID] = &metricLockEntry{ XLock: true, } } var ( timestampsBuf *conbuf.ContinuousBuffer valuesBuf *conbuf.ContinuousBuffer timestamps diploma.TimestampCompressor values diploma.ValueCompressor ) if metric.Since > 0 { timestampsBuf = metric.TimestampsBuf.Copy() valuesBuf = metric.ValuesBuf.Copy() timestamps = chunkenc.NewReverseTimeDeltaOfDeltaCompressor( timestampsBuf, metric.Timestamps.Size()) if metric.MetricType == diploma.Cumulative { values = chunkenc.NewReverseCumulativeDeltaCompressor( valuesBuf, metric.Values.Size(), metric.FracDigits) } else { values = chunkenc.NewReverseInstantDeltaCompressor( valuesBuf, metric.Values.Size(), metric.FracDigits) } } else { timestampsBuf = conbuf.New(nil) valuesBuf = conbuf.New(nil) timestamps = chunkenc.NewReverseTimeDeltaOfDeltaCompressor( timestampsBuf, 0) if metric.MetricType == diploma.Cumulative { values = chunkenc.NewReverseCumulativeDeltaCompressor( valuesBuf, 0, metric.FracDigits) } else { values = chunkenc.NewReverseInstantDeltaCompressor( valuesBuf, 0, metric.FracDigits) } } req.ResultCh <- tryAppendMeasuresResult{ ResultCode: CanAppend, MetricType: metric.MetricType, FracDigits: metric.FracDigits, Since: metric.Since, Until: metric.Until, UntilValue: metric.UntilValue, RootPageNo: metric.RootPageNo, PrevPageNo: metric.LastPageNo, TimestampsBuf: timestampsBuf, ValuesBuf: valuesBuf, Timestamps: timestamps, Values: values, } } type tryListCumulativePeriodsReq struct { MetricID uint32 Since uint32 Until uint32 Aggregator *atree.CumulativeAggregator ResponseWriter *atree.CumulativePeriodsWriter ResultCh chan cumulativePeriodsResult } func (s *Database) tryListCumulativePeriods(req tryListCumulativePeriodsReq) { metric, ok := s.metrics[req.MetricID] if !ok { req.ResultCh <- cumulativePeriodsResult{ ResultCode: NoMetric, } return } if metric.MetricType != diploma.Cumulative { req.ResultCh <- cumulativePeriodsResult{ ResultCode: WrongMetricType, } return } lockEntry, ok := s.metricLockEntries[req.MetricID] if ok { if lockEntry.XLock { lockEntry.WaitQueue = append(lockEntry.WaitQueue, req) return } } if s.startListCumulativePeriods(metric, req) { if lockEntry != nil { lockEntry.RLocks++ } else { s.metricLockEntries[req.MetricID] = &metricLockEntry{ RLocks: 1, } } } } func (s *Database) startListCumulativePeriods(metric *_metric, req tryListCumulativePeriodsReq) bool { if metric.Since == 0 { req.ResultCh <- cumulativePeriodsResult{ ResultCode: QueryDone, } return false } if req.Since > metric.Until { req.ResultCh <- cumulativePeriodsResult{ ResultCode: QueryDone, } return false } if req.Until < metric.Since { if metric.RootPageNo > 0 { req.ResultCh <- cumulativePeriodsResult{ ResultCode: UntilNotFound, PageNo: metric.RootPageNo, FracDigits: metric.FracDigits, } return true } else { req.ResultCh <- cumulativePeriodsResult{ ResultCode: QueryDone, } return false } } timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor( metric.TimestampsBuf, metric.Timestamps.Size(), ) valueDecompressor := chunkenc.NewReverseCumulativeDeltaDecompressor( metric.ValuesBuf, metric.Values.Size(), metric.FracDigits, ) var period atree.CumulativePeriod for { timestamp, done := timestampDecompressor.NextValue() if done { break } value, done := valueDecompressor.NextValue() if done { diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug) } if timestamp > req.Until { continue } if timestamp < req.Since { isCompleted := req.Aggregator.FillPeriod(timestamp, value, &period) if isCompleted { req.ResponseWriter.WritePeriod(period) } req.ResultCh <- cumulativePeriodsResult{ ResultCode: QueryDone, } return false } if timestamp <= req.Until { isCompleted := req.Aggregator.Feed(timestamp, value, &period) if isCompleted { req.ResponseWriter.WritePeriod(period) } } } if metric.LastPageNo > 0 { req.ResultCh <- cumulativePeriodsResult{ ResultCode: UntilFound, PageNo: metric.LastPageNo, FracDigits: metric.FracDigits, } return true } else { isCompleted := req.Aggregator.FillPeriod(metric.Since, metric.SinceValue, &period) if isCompleted { req.ResponseWriter.WritePeriod(period) } req.ResultCh <- cumulativePeriodsResult{ ResultCode: QueryDone, } return false } } type tryListInstantPeriodsReq struct { MetricID uint32 Since uint32 Until uint32 Aggregator *atree.InstantAggregator ResponseWriter *atree.InstantPeriodsWriter ResultCh chan instantPeriodsResult } func (s *Database) tryListInstantPeriods(req tryListInstantPeriodsReq) { metric, ok := s.metrics[req.MetricID] if !ok { req.ResultCh <- instantPeriodsResult{ ResultCode: NoMetric, } return } if metric.MetricType != diploma.Instant { req.ResultCh <- instantPeriodsResult{ ResultCode: WrongMetricType, } return } lockEntry, ok := s.metricLockEntries[req.MetricID] if ok { if lockEntry.XLock { lockEntry.WaitQueue = append(lockEntry.WaitQueue, req) return } } if s.startListInstantPeriods(metric, req) { if lockEntry != nil { lockEntry.RLocks++ } else { s.metricLockEntries[req.MetricID] = &metricLockEntry{ RLocks: 1, } } } } func (*Database) startListInstantPeriods(metric *_metric, req tryListInstantPeriodsReq) bool { if metric.Since == 0 { req.ResultCh <- instantPeriodsResult{ ResultCode: QueryDone, } return false } if req.Since > metric.Until { req.ResultCh <- instantPeriodsResult{ ResultCode: QueryDone, } return false } if req.Until < metric.Since { // search in tree if metric.RootPageNo > 0 { req.ResultCh <- instantPeriodsResult{ ResultCode: UntilNotFound, PageNo: metric.RootPageNo, FracDigits: metric.FracDigits, } return true } else { req.ResultCh <- instantPeriodsResult{ ResultCode: QueryDone, } return false } } timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor( metric.TimestampsBuf, metric.Timestamps.Size(), ) valueDecompressor := chunkenc.NewReverseInstantDeltaDecompressor( metric.ValuesBuf, metric.Values.Size(), metric.FracDigits, ) var period atree.InstantPeriod for { timestamp, done := timestampDecompressor.NextValue() if done { break } value, done := valueDecompressor.NextValue() if done { diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug) } if timestamp <= req.Until { if timestamp < req.Since { isCompleted := req.Aggregator.FillPeriod(timestamp, &period) if isCompleted { req.ResponseWriter.WritePeriod(period) } req.ResultCh <- instantPeriodsResult{ ResultCode: QueryDone, } return false } isCompleted := req.Aggregator.Feed(timestamp, value, &period) if isCompleted { req.ResponseWriter.WritePeriod(period) } } } if metric.LastPageNo > 0 { req.ResultCh <- instantPeriodsResult{ ResultCode: UntilFound, PageNo: metric.LastPageNo, FracDigits: metric.FracDigits, } return true } else { isCompleted := req.Aggregator.FillPeriod(metric.Since, &period) if isCompleted { req.ResponseWriter.WritePeriod(period) } req.ResultCh <- instantPeriodsResult{ ResultCode: QueryDone, } return false } } type tryListCurrentValuesReq struct { MetricIDs []uint32 ResponseWriter *atree.CurrentValueWriter ResultCh chan struct{} } func (s *Database) tryListCurrentValues(req tryListCurrentValuesReq) { for _, metricID := range req.MetricIDs { metric, ok := s.metrics[metricID] if ok { req.ResponseWriter.BufferValue(atree.CurrentValue{ MetricID: metricID, Timestamp: metric.Until, Value: metric.UntilValue, }) } } req.ResultCh <- struct{}{} } func (s *Database) applyChanges(req txlog.Changes) { for _, untyped := range req.Records { switch rec := untyped.(type) { case txlog.AddedMetric: s.addMetric(rec) case txlog.DeletedMetric: s.deleteMetric(rec) case txlog.AppendedMeasure: s.appendMeasure(rec) case txlog.AppendedMeasuresExtended: s.appendMeasures(rec) case txlog.AppendedMeasureWithOverflowExtended: s.appendMeasureAfterOverflow(rec) case txlog.DeletedMeasures: s.deleteMeasures(rec) case txlog.DeletedMeasuresSince: s.deleteMeasuresSince(rec) } } if req.ForceSnapshot || req.ExitWaitGroup != nil { s.dumpSnapshot(req.LogNumber) } close(req.WaitCh) if req.ExitWaitGroup != nil { req.ExitWaitGroup.Done() } } func (s *Database) addMetric(rec txlog.AddedMetric) { _, ok := s.metrics[rec.MetricID] if ok { diploma.Abort(diploma.MetricAddedBug, fmt.Errorf("addMetric: metric %d already added", rec.MetricID)) } lockEntry, ok := s.metricLockEntries[rec.MetricID] if !ok { diploma.Abort(diploma.NoLockEntryBug, fmt.Errorf("addMetric: lockEntry not found for the metric %d", rec.MetricID)) } if !lockEntry.XLock { diploma.Abort(diploma.NoXLockBug, fmt.Errorf("addMetric: xlock not set for the metric %d", rec.MetricID)) } var ( values diploma.ValueCompressor timestampsBuf = conbuf.New(nil) valuesBuf = conbuf.New(nil) ) if rec.MetricType == diploma.Cumulative { values = chunkenc.NewReverseCumulativeDeltaCompressor( valuesBuf, 0, byte(rec.FracDigits)) } else { values = chunkenc.NewReverseInstantDeltaCompressor( valuesBuf, 0, byte(rec.FracDigits)) } s.metrics[rec.MetricID] = &_metric{ MetricType: rec.MetricType, FracDigits: byte(rec.FracDigits), TimestampsBuf: timestampsBuf, ValuesBuf: valuesBuf, Timestamps: chunkenc.NewReverseTimeDeltaOfDeltaCompressor(timestampsBuf, 0), Values: values, } lockEntry.XLock = false delete(s.metricLockEntries, rec.MetricID) } func (s *Database) deleteMetric(rec txlog.DeletedMetric) { _, ok := s.metrics[rec.MetricID] if !ok { diploma.Abort(diploma.NoMetricBug, fmt.Errorf("deleteMetric: metric %d not found", rec.MetricID)) } lockEntry, ok := s.metricLockEntries[rec.MetricID] if !ok { diploma.Abort(diploma.NoLockEntryBug, fmt.Errorf("deleteMetric: lockEntry not found for the metric %d", rec.MetricID)) } if !lockEntry.XLock { diploma.Abort(diploma.NoXLockBug, fmt.Errorf("deleteMetric: xlock not set for the metric %d", rec.MetricID)) } var addMetricReqs []tryAddMetricReq if len(lockEntry.WaitQueue) > 0 { for _, untyped := range lockEntry.WaitQueue { switch req := untyped.(type) { case tryAppendMeasureReq: req.ResultCh <- tryAppendMeasureResult{ ResultCode: NoMetric, } case tryListCumulativePeriodsReq: req.ResultCh <- cumulativePeriodsResult{ ResultCode: NoMetric, } case tryListInstantPeriodsReq: req.ResultCh <- instantPeriodsResult{ ResultCode: NoMetric, } case tryListCumulativeMeasuresReq: req.ResultCh <- cumulativeMeasuresResult{ ResultCode: NoMetric, } case tryListInstantMeasuresReq: req.ResultCh <- instantMeasuresResult{ ResultCode: NoMetric, } case tryListAllCumulativeMeasuresReq: req.ResultCh <- cumulativeMeasuresResult{ ResultCode: NoMetric, } case tryListAllInstantMeasuresReq: req.ResultCh <- instantMeasuresResult{ ResultCode: NoMetric, } case tryAddMetricReq: addMetricReqs = append(addMetricReqs, req) case tryDeleteMetricReq: req.ResultCh <- tryDeleteMetricResult{ ResultCode: NoMetric, } case tryDeleteMeasuresReq: req.ResultCh <- tryDeleteMeasuresResult{ ResultCode: NoMetric, } case tryGetMetricReq: req.ResultCh <- Metric{ ResultCode: NoMetric, } default: diploma.Abort(diploma.UnknownMetricWaitQueueItemBug, fmt.Errorf("bug: unknown metric wait queue item type %T", req)) } } } delete(s.metrics, rec.MetricID) delete(s.metricLockEntries, rec.MetricID) if len(rec.FreeDataPages) > 0 { s.dataFreeList.AddPages(rec.FreeDataPages) s.atree.DeleteDataPages(rec.FreeDataPages) } if len(rec.FreeIndexPages) > 0 { s.indexFreeList.AddPages(rec.FreeIndexPages) s.atree.DeleteIndexPages(rec.FreeIndexPages) } if len(addMetricReqs) > 0 { s.processTryAddMetricReqsImmediatelyAfterDelete(addMetricReqs) } } func (s *Database) deleteMeasures(rec txlog.DeletedMeasures) { metric, ok := s.metrics[rec.MetricID] if !ok { diploma.Abort(diploma.NoMetricBug, fmt.Errorf("deleteMeasures: metric %d not found", rec.MetricID)) } lockEntry, ok := s.metricLockEntries[rec.MetricID] if !ok { diploma.Abort(diploma.NoLockEntryBug, fmt.Errorf("deleteMeasures: lockEntry not found for the metric %d", rec.MetricID)) } if !lockEntry.XLock { diploma.Abort(diploma.NoXLockBug, fmt.Errorf("deleteMeasures: xlock not set for the metric %d", rec.MetricID)) } metric.DeleteMeasures() lockEntry.XLock = false // if len(rec.FreeDataPages) > 0 { s.dataFreeList.AddPages(rec.FreeDataPages) s.atree.DeleteDataPages(rec.FreeDataPages) } if len(rec.FreeDataPages) > 0 { s.indexFreeList.AddPages(rec.FreeIndexPages) s.atree.DeleteIndexPages(rec.FreeIndexPages) } s.doAfterReleaseXLock(rec.MetricID, metric, lockEntry) } func (s *Database) deleteMeasuresSince(rec txlog.DeletedMeasuresSince) { metric, ok := s.metrics[rec.MetricID] if !ok { diploma.Abort(diploma.NoMetricBug, fmt.Errorf("deleteMeasuresSince: metric %d not found", rec.MetricID)) } lockEntry, ok := s.metricLockEntries[rec.MetricID] if !ok { diploma.Abort(diploma.NoLockEntryBug, fmt.Errorf("deleteMeasuresSince: lockEntry not found for the metric %d", rec.MetricID)) } if !lockEntry.XLock { diploma.Abort(diploma.NoXLockBug, fmt.Errorf("deleteMeasuresSince: xlock not set for the metric %d", rec.MetricID)) } if rec.IsRootChanged && rec.RootPageNo == 0 { metric.DeleteMeasures() } else { metric.TimestampsBuf = conbuf.NewFromBuffer(rec.TimestampsBuf) metric.ValuesBuf = conbuf.NewFromBuffer(rec.ValuesBuf) metric.Timestamps = chunkenc.NewReverseTimeDeltaOfDeltaCompressor( metric.TimestampsBuf, len(rec.TimestampsBuf)) if metric.MetricType == diploma.Cumulative { metric.Values = chunkenc.NewReverseCumulativeDeltaCompressor( metric.ValuesBuf, len(rec.ValuesBuf), metric.FracDigits) } else { metric.Values = chunkenc.NewReverseInstantDeltaCompressor( metric.ValuesBuf, len(rec.ValuesBuf), metric.FracDigits) } metric.Since, metric.Until = enc.GetTimeRange(rec.TimestampsBuf) metric.SinceValue, metric.UntilValue = enc.GetValueBounds( rec.ValuesBuf, metric.MetricType, metric.FracDigits, ) if rec.IsRootChanged { metric.RootPageNo = rec.RootPageNo } metric.LastPageNo = rec.LastPageNo } lockEntry.XLock = false if len(rec.FreeDataPages) > 0 { s.dataFreeList.AddPages(rec.FreeDataPages) s.atree.DeleteDataPages(rec.FreeDataPages) } if len(rec.FreeDataPages) > 0 { s.indexFreeList.AddPages(rec.FreeIndexPages) s.atree.DeleteIndexPages(rec.FreeIndexPages) } s.doAfterReleaseXLock(rec.MetricID, metric, lockEntry) } func (s *Database) doAfterReleaseXLock(metricID uint32, metric *_metric, lockEntry *metricLockEntry) { if len(lockEntry.WaitQueue) == 0 { delete(s.metricLockEntries, metricID) } else { s.processMetricQueue(metricID, metric, lockEntry) } }