спеціалізована СУБД для зберігання та обробки показань датчиків та лічильників
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
diploma/database/proc.go

1736 lines
39 KiB

package database
import (
"fmt"
"gordenko.dev/dima/diploma"
"gordenko.dev/dima/diploma/atree"
"gordenko.dev/dima/diploma/chunkenc"
"gordenko.dev/dima/diploma/conbuf"
"gordenko.dev/dima/diploma/enc"
"gordenko.dev/dima/diploma/txlog"
)
const (
QueryDone = 1
UntilFound = 2
UntilNotFound = 3
NoMetric = 4
MetricDuplicate = 5
Succeed = 6
NewPage = 7
ExpiredMeasure = 8
NonMonotonicValue = 9
CanAppend = 10
WrongMetricType = 11
)
func (s *Database) worker() {
for {
select {
case <-s.workerSignalCh:
s.DoWork()
}
}
}
func (s *Database) DoWork() {
s.mutex.Lock()
rLocksToRelease := s.rLocksToRelease
workerQueue := s.workerQueue
s.rLocksToRelease = nil
s.workerQueue = nil
s.mutex.Unlock()
for _, metricID := range rLocksToRelease {
lockEntry, ok := s.metricLockEntries[metricID]
if !ok {
diploma.Abort(diploma.NoLockEntryBug,
fmt.Errorf("drainQueues: lockEntry not found for the metric %d",
metricID))
}
if lockEntry.XLock {
diploma.Abort(diploma.XLockBug,
fmt.Errorf("drainQueues: xlock is set for the metric %d",
metricID))
}
if lockEntry.RLocks <= 0 {
diploma.Abort(diploma.NoRLockBug,
fmt.Errorf("drainQueues: rlock not set for the metric %d",
metricID))
}
lockEntry.RLocks--
if len(lockEntry.WaitQueue) > 0 {
metric, ok := s.metrics[metricID]
if !ok {
diploma.Abort(diploma.NoMetricBug,
fmt.Errorf("drainQueues: metric %d not found", metricID))
}
s.processMetricQueue(metricID, metric, lockEntry)
} else {
if lockEntry.RLocks == 0 {
delete(s.metricLockEntries, metricID)
}
}
}
for _, untyped := range workerQueue {
switch req := untyped.(type) {
case tryAppendMeasureReq:
s.tryAppendMeasure(req)
case tryAppendMeasuresReq:
s.tryAppendMeasures(req)
case txlog.Changes:
s.applyChanges(req) // all metrics only
case tryListCurrentValuesReq:
s.tryListCurrentValues(req) // all metrics only
case tryListCumulativePeriodsReq:
s.tryListCumulativePeriods(req)
case tryListInstantPeriodsReq:
s.tryListInstantPeriods(req)
case tryListCumulativeMeasuresReq:
s.tryListCumulativeMeasures(req)
case tryListInstantMeasuresReq:
s.tryListInstantMeasures(req)
case tryListAllInstantMeasuresReq:
s.tryListAllInstantMeasures(req)
case tryListAllCumulativeMeasuresReq:
s.tryListAllCumulativeMeasures(req)
case tryAddMetricReq:
s.tryAddMetric(req)
case tryDeleteMetricReq:
s.tryDeleteMetric(req)
case tryDeleteMeasuresReq:
s.tryDeleteMeasures(req)
case tryGetMetricReq:
s.tryGetMetric(req)
default:
diploma.Abort(diploma.UnknownWorkerQueueItemBug,
fmt.Errorf("bug: unknown worker queue item type %T", req))
}
}
}
func (s *Database) processMetricQueue(metricID uint32, metric *_metric, lockEntry *metricLockEntry) {
if len(lockEntry.WaitQueue) == 0 {
return
}
var modificationReqs []any
for _, untyped := range lockEntry.WaitQueue {
var rLockRequired bool
switch req := untyped.(type) {
case tryListCumulativePeriodsReq:
rLockRequired = s.startListCumulativePeriods(metric, req)
case tryListInstantPeriodsReq:
rLockRequired = s.startListInstantPeriods(metric, req)
case tryListCumulativeMeasuresReq:
rLockRequired = s.startListCumulativeMeasures(metric, req)
case tryListInstantMeasuresReq:
rLockRequired = s.startListInstantMeasures(metric, req)
case tryListAllInstantMeasuresReq:
rLockRequired = s.startListAllInstantMeasures(metric, req)
case tryListAllCumulativeMeasuresReq:
rLockRequired = s.startListAllCumulativeMeasures(metric, req)
case tryGetMetricReq:
s.tryGetMetric(req)
default:
modificationReqs = append(modificationReqs, untyped)
}
if rLockRequired {
lockEntry.RLocks++
}
}
lockEntry.WaitQueue = nil
if lockEntry.RLocks > 0 {
lockEntry.WaitQueue = modificationReqs
} else {
for idx, untyped := range modificationReqs {
switch req := untyped.(type) {
case tryAppendMeasureReq:
s.startAppendMeasure(metric, req, nil)
case tryAppendMeasuresReq:
s.startAppendMeasures(metric, req, nil)
case tryDeleteMetricReq:
s.startDeleteMetric(metric, req)
case tryDeleteMeasuresReq:
s.startDeleteMeasures(metric, req)
default:
diploma.Abort(diploma.UnknownMetricWaitQueueItemBug,
fmt.Errorf("bug: unknown metric wait queue item type %T", req))
}
lockEntry, ok := s.metricLockEntries[metricID]
if ok {
start := idx + 1
if start < len(modificationReqs) {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, modificationReqs[start:]...)
}
break
}
}
}
}
type tryAddMetricReq struct {
MetricID uint32
ResultCh chan byte
}
func (s *Database) tryAddMetric(req tryAddMetricReq) {
_, ok := s.metrics[req.MetricID]
if ok {
req.ResultCh <- MetricDuplicate
return
}
lockEntry, ok := s.metricLockEntries[req.MetricID]
if ok {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
} else {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
XLock: true,
}
req.ResultCh <- Succeed
}
}
func (s *Database) processTryAddMetricReqsImmediatelyAfterDelete(reqs []tryAddMetricReq) {
if len(reqs) == 0 {
return
}
var (
req = reqs[0]
waitQueue []any
)
if len(reqs) > 1 {
for _, req := range reqs[1:] {
waitQueue = append(waitQueue, req)
}
}
s.metricLockEntries[req.MetricID] = &metricLockEntry{
XLock: true,
WaitQueue: waitQueue,
}
req.ResultCh <- Succeed
}
type tryGetMetricReq struct {
MetricID uint32
ResultCh chan Metric
}
func (s *Database) tryGetMetric(req tryGetMetricReq) {
metric, ok := s.metrics[req.MetricID]
if ok {
req.ResultCh <- Metric{
ResultCode: Succeed,
MetricType: metric.MetricType,
FracDigits: metric.FracDigits,
}
} else {
req.ResultCh <- Metric{
ResultCode: NoMetric,
}
}
}
type tryDeleteMetricReq struct {
MetricID uint32
ResultCh chan tryDeleteMetricResult
}
func (s *Database) tryDeleteMetric(req tryDeleteMetricReq) {
metric, ok := s.metrics[req.MetricID]
if !ok {
req.ResultCh <- tryDeleteMetricResult{
ResultCode: NoMetric,
}
return
}
lockEntry, ok := s.metricLockEntries[req.MetricID]
if ok {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
} else {
s.startDeleteMetric(metric, req)
}
}
func (s *Database) startDeleteMetric(metric *_metric, req tryDeleteMetricReq) {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
XLock: true,
}
req.ResultCh <- tryDeleteMetricResult{
ResultCode: Succeed,
RootPageNo: metric.RootPageNo,
}
}
type tryDeleteMeasuresReq struct {
MetricID uint32
Since uint32
ResultCh chan tryDeleteMeasuresResult
}
func (s *Database) tryDeleteMeasures(req tryDeleteMeasuresReq) {
metric, ok := s.metrics[req.MetricID]
if !ok {
req.ResultCh <- tryDeleteMeasuresResult{
ResultCode: NoMetric,
}
return
}
lockEntry, ok := s.metricLockEntries[req.MetricID]
if ok {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
} else {
s.startDeleteMeasures(metric, req)
}
}
func (s *Database) startDeleteMeasures(metric *_metric, req tryDeleteMeasuresReq) {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
XLock: true,
}
req.ResultCh <- tryDeleteMeasuresResult{
ResultCode: Succeed,
RootPageNo: metric.RootPageNo,
}
}
// SELECT
type tryListAllInstantMeasuresReq struct {
MetricID uint32
ResponseWriter *atree.InstantMeasureWriter
ResultCh chan instantMeasuresResult
}
func (s *Database) tryListAllInstantMeasures(req tryListAllInstantMeasuresReq) {
metric, ok := s.metrics[req.MetricID]
if !ok {
req.ResultCh <- instantMeasuresResult{
ResultCode: NoMetric,
}
return
}
if metric.MetricType != diploma.Instant {
req.ResultCh <- instantMeasuresResult{
ResultCode: WrongMetricType,
}
return
}
lockEntry, ok := s.metricLockEntries[req.MetricID]
if ok {
if lockEntry.XLock {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
return
}
}
if s.startListAllInstantMeasures(metric, req) {
if lockEntry != nil {
lockEntry.RLocks++
} else {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
RLocks: 1,
}
}
}
}
func (s *Database) startListAllInstantMeasures(metric *_metric, req tryListAllInstantMeasuresReq) bool {
if metric.Since == 0 {
req.ResultCh <- instantMeasuresResult{
ResultCode: QueryDone,
}
return false
}
timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor(
metric.TimestampsBuf,
metric.Timestamps.Size(),
)
valueDecompressor := chunkenc.NewReverseInstantDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
var (
value float64
timestamp uint32
done bool
buffered int
)
for {
timestamp, done = timestampDecompressor.NextValue()
if done {
break
}
value, done = valueDecompressor.NextValue()
if done {
diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug)
}
req.ResponseWriter.BufferMeasure(atree.InstantMeasure{
Timestamp: timestamp,
Value: value,
})
buffered++
}
if metric.LastPageNo > 0 {
req.ResultCh <- instantMeasuresResult{
ResultCode: UntilFound,
PageNo: metric.LastPageNo,
FracDigits: metric.FracDigits,
}
return true
} else {
req.ResultCh <- instantMeasuresResult{
ResultCode: QueryDone,
}
return false
}
}
type tryListInstantMeasuresReq struct {
MetricID uint32
Since uint32
Until uint32
ResponseWriter *atree.InstantMeasureWriter
ResultCh chan instantMeasuresResult
}
func (s *Database) tryListInstantMeasures(req tryListInstantMeasuresReq) {
metric, ok := s.metrics[req.MetricID]
if !ok {
req.ResultCh <- instantMeasuresResult{
ResultCode: NoMetric,
}
return
}
if metric.MetricType != diploma.Instant {
req.ResultCh <- instantMeasuresResult{
ResultCode: WrongMetricType,
}
return
}
lockEntry, ok := s.metricLockEntries[req.MetricID]
if ok {
if lockEntry.XLock {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
return
}
}
if s.startListInstantMeasures(metric, req) {
if lockEntry != nil {
lockEntry.RLocks++
} else {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
RLocks: 1,
}
}
}
}
func (s *Database) startListInstantMeasures(metric *_metric, req tryListInstantMeasuresReq) bool {
if metric.Since == 0 {
req.ResultCh <- instantMeasuresResult{
ResultCode: QueryDone,
}
return false
}
if req.Since > metric.Until {
req.ResultCh <- instantMeasuresResult{
ResultCode: QueryDone,
}
return false
}
if req.Until < metric.Since {
if metric.RootPageNo > 0 {
req.ResultCh <- instantMeasuresResult{
ResultCode: UntilNotFound,
PageNo: metric.RootPageNo,
FracDigits: metric.FracDigits,
}
return true
} else {
req.ResultCh <- instantMeasuresResult{
ResultCode: QueryDone,
}
return false
}
}
timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor(
metric.TimestampsBuf,
metric.Timestamps.Size(),
)
valueDecompressor := chunkenc.NewReverseInstantDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
var (
value float64
timestamp uint32
done bool
)
for {
timestamp, done = timestampDecompressor.NextValue()
if done {
break
}
value, done = valueDecompressor.NextValue()
if done {
diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug)
}
if timestamp <= req.Until {
if timestamp < req.Since {
req.ResultCh <- instantMeasuresResult{
ResultCode: QueryDone,
}
return false
}
req.ResponseWriter.BufferMeasure(atree.InstantMeasure{
Timestamp: timestamp,
Value: value,
})
if timestamp == req.Since {
req.ResultCh <- instantMeasuresResult{
ResultCode: QueryDone,
}
return false
}
}
}
if metric.LastPageNo > 0 {
req.ResultCh <- instantMeasuresResult{
ResultCode: UntilFound,
PageNo: metric.LastPageNo,
FracDigits: metric.FracDigits,
}
return true
} else {
req.ResultCh <- instantMeasuresResult{
ResultCode: QueryDone,
}
return false
}
}
// CUMULATIVE
type tryListAllCumulativeMeasuresReq struct {
MetricID uint32
ResponseWriter *atree.CumulativeMeasureWriter
ResultCh chan cumulativeMeasuresResult
}
func (s *Database) tryListAllCumulativeMeasures(req tryListAllCumulativeMeasuresReq) {
metric, ok := s.metrics[req.MetricID]
if !ok {
req.ResultCh <- cumulativeMeasuresResult{
ResultCode: NoMetric,
}
return
}
if metric.MetricType != diploma.Cumulative {
req.ResultCh <- cumulativeMeasuresResult{
ResultCode: WrongMetricType,
}
return
}
lockEntry, ok := s.metricLockEntries[req.MetricID]
if ok {
if lockEntry.XLock {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
return
}
}
if s.startListAllCumulativeMeasures(metric, req) {
if lockEntry != nil {
lockEntry.RLocks++
} else {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
RLocks: 1,
}
}
}
}
func (s *Database) startListAllCumulativeMeasures(metric *_metric, req tryListAllCumulativeMeasuresReq) bool {
if metric.Since == 0 {
req.ResultCh <- cumulativeMeasuresResult{
ResultCode: QueryDone,
}
return false
}
timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor(
metric.TimestampsBuf,
metric.Timestamps.Size(),
)
valueDecompressor := chunkenc.NewReverseCumulativeDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
endTimestamp, done := timestampDecompressor.NextValue()
if done {
req.ResultCh <- cumulativeMeasuresResult{
ResultCode: QueryDone,
}
return false
}
endValue, done := valueDecompressor.NextValue()
if done {
diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug)
}
for {
timestamp, done := timestampDecompressor.NextValue()
if done {
break
}
value, done := valueDecompressor.NextValue()
if done {
diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug)
}
req.ResponseWriter.WriteMeasure(atree.CumulativeMeasure{
Timestamp: endTimestamp,
Value: endValue,
Total: endValue - value,
})
endTimestamp = timestamp
endValue = value
}
if metric.LastPageNo > 0 {
req.ResultCh <- cumulativeMeasuresResult{
ResultCode: UntilFound,
PageNo: metric.LastPageNo,
FracDigits: metric.FracDigits,
EndTimestamp: endTimestamp,
EndValue: endValue,
}
return true
} else {
req.ResponseWriter.WriteMeasure(atree.CumulativeMeasure{
Timestamp: endTimestamp,
Value: endValue,
Total: endValue,
})
req.ResultCh <- cumulativeMeasuresResult{
ResultCode: QueryDone,
}
return false
}
}
type tryListCumulativeMeasuresReq struct {
MetricID uint32
Since uint32
Until uint32
ResponseWriter *atree.CumulativeMeasureWriter
ResultCh chan cumulativeMeasuresResult
}
func (s *Database) tryListCumulativeMeasures(req tryListCumulativeMeasuresReq) {
metric, ok := s.metrics[req.MetricID]
if !ok {
req.ResultCh <- cumulativeMeasuresResult{
ResultCode: NoMetric,
}
return
}
if metric.MetricType != diploma.Cumulative {
req.ResultCh <- cumulativeMeasuresResult{
ResultCode: WrongMetricType,
}
return
}
lockEntry, ok := s.metricLockEntries[req.MetricID]
if ok {
if lockEntry.XLock {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
return
}
}
if s.startListCumulativeMeasures(metric, req) {
if lockEntry != nil {
lockEntry.RLocks++
} else {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
RLocks: 1,
}
}
}
}
func (s *Database) startListCumulativeMeasures(metric *_metric, req tryListCumulativeMeasuresReq) bool {
if metric.Since == 0 {
req.ResultCh <- cumulativeMeasuresResult{
ResultCode: QueryDone,
}
return false
}
if req.Since > metric.Until {
req.ResultCh <- cumulativeMeasuresResult{
ResultCode: QueryDone,
}
return false
}
if req.Until < metric.Since {
// search in tree
if metric.RootPageNo > 0 {
req.ResultCh <- cumulativeMeasuresResult{
ResultCode: UntilNotFound,
PageNo: metric.RootPageNo,
FracDigits: metric.FracDigits,
}
return true
} else {
req.ResultCh <- cumulativeMeasuresResult{
ResultCode: QueryDone,
}
return false
}
}
timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor(
metric.TimestampsBuf,
metric.Timestamps.Size(),
)
valueDecompressor := chunkenc.NewReverseCumulativeDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
var (
endTimestamp uint32
endValue float64
)
for {
timestamp, done := timestampDecompressor.NextValue()
if done {
break
}
value, done := valueDecompressor.NextValue()
if done {
diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug)
}
if timestamp > req.Until {
continue
}
if timestamp < req.Since {
req.ResponseWriter.WriteMeasure(atree.CumulativeMeasure{
Timestamp: endTimestamp,
Value: endValue,
Total: endValue - value,
})
req.ResultCh <- cumulativeMeasuresResult{
ResultCode: QueryDone,
}
return false
}
if endTimestamp > 0 {
req.ResponseWriter.WriteMeasure(atree.CumulativeMeasure{
Timestamp: endTimestamp,
Value: endValue,
Total: endValue - value,
})
}
endTimestamp = timestamp
endValue = value
}
if metric.LastPageNo > 0 {
req.ResultCh <- cumulativeMeasuresResult{
ResultCode: UntilFound,
PageNo: metric.LastPageNo,
FracDigits: metric.FracDigits,
EndTimestamp: endTimestamp,
EndValue: endValue,
}
return true
} else {
req.ResponseWriter.WriteMeasure(atree.CumulativeMeasure{
Timestamp: endTimestamp,
Value: endValue,
Total: endValue,
})
req.ResultCh <- cumulativeMeasuresResult{
ResultCode: QueryDone,
}
return false
}
}
type tryAppendMeasureReq struct {
MetricID uint32
Timestamp uint32
Value float64
ResultCh chan tryAppendMeasureResult
}
func (s *Database) tryAppendMeasure(req tryAppendMeasureReq) {
metric, ok := s.metrics[req.MetricID]
if !ok {
req.ResultCh <- tryAppendMeasureResult{
ResultCode: NoMetric,
}
return
}
lockEntry, ok := s.metricLockEntries[req.MetricID]
if ok {
if lockEntry.XLock {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
return
}
}
s.startAppendMeasure(metric, req, lockEntry)
}
func (s *Database) startAppendMeasure(metric *_metric, req tryAppendMeasureReq, lockEntry *metricLockEntry) {
if req.Timestamp <= metric.Until {
req.ResultCh <- tryAppendMeasureResult{
ResultCode: ExpiredMeasure,
}
return
}
if metric.MetricType == diploma.Cumulative && req.Value < metric.UntilValue {
req.ResultCh <- tryAppendMeasureResult{
ResultCode: NonMonotonicValue,
}
return
}
extraSpace := metric.Timestamps.CalcRequiredSpace(req.Timestamp) +
metric.Values.CalcRequiredSpace(req.Value)
totalSpace := metric.Timestamps.Size() + metric.Values.Size() + extraSpace
if totalSpace <= atree.DataPagePayloadSize {
if lockEntry != nil {
lockEntry.RLocks++
} else {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
RLocks: 1,
}
}
req.ResultCh <- tryAppendMeasureResult{
ResultCode: CanAppend,
}
} else {
if lockEntry != nil {
if lockEntry.RLocks > 0 {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
return
}
lockEntry.XLock = true
} else {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
XLock: true,
}
}
req.ResultCh <- tryAppendMeasureResult{
ResultCode: NewPage,
FilledPage: &FilledPage{
Since: metric.Since,
RootPageNo: metric.RootPageNo,
PrevPageNo: metric.LastPageNo,
TimestampsChunks: metric.TimestampsBuf.Chunks(),
TimestampsSize: uint16(metric.Timestamps.Size()),
ValuesChunks: metric.ValuesBuf.Chunks(),
ValuesSize: uint16(metric.Values.Size()),
},
}
}
}
func (s *Database) appendMeasure(rec txlog.AppendedMeasure) {
metric, ok := s.metrics[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoMetricBug,
fmt.Errorf("appendMeasure: metric %d not found",
rec.MetricID))
}
lockEntry, ok := s.metricLockEntries[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoLockEntryBug,
fmt.Errorf("appendMeasure: lockEntry not found for the metric %d",
rec.MetricID))
}
if lockEntry.XLock {
diploma.Abort(diploma.XLockBug,
fmt.Errorf("appendMeasure: xlock is set for the metric %d",
rec.MetricID))
}
if lockEntry.RLocks <= 0 {
diploma.Abort(diploma.NoRLockBug,
fmt.Errorf("appendMeasure: rlock not set for the metric %d",
rec.MetricID))
}
if metric.Since == 0 {
metric.Since = rec.Timestamp
metric.SinceValue = rec.Value
}
metric.Timestamps.Append(rec.Timestamp)
metric.Values.Append(rec.Value)
metric.Until = rec.Timestamp
metric.UntilValue = rec.Value
lockEntry.RLocks--
if len(lockEntry.WaitQueue) > 0 {
s.processMetricQueue(rec.MetricID, metric, lockEntry)
} else {
if lockEntry.RLocks == 0 {
delete(s.metricLockEntries, rec.MetricID)
}
}
}
func (s *Database) appendMeasures(extended txlog.AppendedMeasuresExtended) {
rec := extended.Record
metric, ok := s.metrics[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoMetricBug,
fmt.Errorf("appendMeasureAfterOverflow: metric %d not found",
rec.MetricID))
}
lockEntry, ok := s.metricLockEntries[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoLockEntryBug,
fmt.Errorf("appendMeasureAfterOverflow: lockEntry not found for the metric %d",
rec.MetricID))
}
if !lockEntry.XLock {
diploma.Abort(diploma.NoXLockBug,
fmt.Errorf("appendMeasureAfterOverflow: xlock not set for the metric %d",
rec.MetricID))
}
for _, measure := range rec.Measures {
if metric.Since == 0 {
metric.Since = measure.Timestamp
metric.SinceValue = measure.Value
}
metric.Timestamps.Append(measure.Timestamp)
metric.Values.Append(measure.Value)
metric.Until = measure.Timestamp
metric.UntilValue = measure.Value
}
if !extended.HoldLock {
lockEntry.XLock = false
s.doAfterReleaseXLock(rec.MetricID, metric, lockEntry)
}
}
func (s *Database) appendMeasureAfterOverflow(extended txlog.AppendedMeasureWithOverflowExtended) {
rec := extended.Record
metric, ok := s.metrics[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoMetricBug,
fmt.Errorf("appendMeasureAfterOverflow: metric %d not found",
rec.MetricID))
}
lockEntry, ok := s.metricLockEntries[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoLockEntryBug,
fmt.Errorf("appendMeasureAfterOverflow: lockEntry not found for the metric %d",
rec.MetricID))
}
if !lockEntry.XLock {
diploma.Abort(diploma.NoXLockBug,
fmt.Errorf("appendMeasureAfterOverflow: xlock not set for the metric %d",
rec.MetricID))
}
metric.ReinitBy(rec.Timestamp, rec.Value)
if rec.IsRootChanged {
metric.RootPageNo = rec.RootPageNo
}
metric.LastPageNo = rec.DataPageNo
// delete free pages
if rec.IsDataPageReused {
s.dataFreeList.DeletePages([]uint32{
rec.DataPageNo,
})
}
if len(rec.ReusedIndexPages) > 0 {
s.indexFreeList.DeletePages(rec.ReusedIndexPages)
}
if !extended.HoldLock {
lockEntry.XLock = false
s.doAfterReleaseXLock(rec.MetricID, metric, lockEntry)
}
}
type tryAppendMeasuresReq struct {
MetricID uint32
ResultCh chan tryAppendMeasuresResult
}
func (s *Database) tryAppendMeasures(req tryAppendMeasuresReq) {
metric, ok := s.metrics[req.MetricID]
if !ok {
req.ResultCh <- tryAppendMeasuresResult{
ResultCode: NoMetric,
}
return
}
lockEntry, ok := s.metricLockEntries[req.MetricID]
if ok {
if lockEntry.XLock {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
return
}
}
s.startAppendMeasures(metric, req, lockEntry)
}
func (s *Database) startAppendMeasures(metric *_metric, req tryAppendMeasuresReq, lockEntry *metricLockEntry) {
if lockEntry != nil {
if lockEntry.RLocks > 0 {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
return
}
lockEntry.XLock = true
} else {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
XLock: true,
}
}
var (
timestampsBuf *conbuf.ContinuousBuffer
valuesBuf *conbuf.ContinuousBuffer
timestamps diploma.TimestampCompressor
values diploma.ValueCompressor
)
if metric.Since > 0 {
timestampsBuf = metric.TimestampsBuf.Copy()
valuesBuf = metric.ValuesBuf.Copy()
timestamps = chunkenc.NewReverseTimeDeltaOfDeltaCompressor(
timestampsBuf, metric.Timestamps.Size())
if metric.MetricType == diploma.Cumulative {
values = chunkenc.NewReverseCumulativeDeltaCompressor(
valuesBuf, metric.Values.Size(), metric.FracDigits)
} else {
values = chunkenc.NewReverseInstantDeltaCompressor(
valuesBuf, metric.Values.Size(), metric.FracDigits)
}
} else {
timestampsBuf = conbuf.New(nil)
valuesBuf = conbuf.New(nil)
timestamps = chunkenc.NewReverseTimeDeltaOfDeltaCompressor(
timestampsBuf, 0)
if metric.MetricType == diploma.Cumulative {
values = chunkenc.NewReverseCumulativeDeltaCompressor(
valuesBuf, 0, metric.FracDigits)
} else {
values = chunkenc.NewReverseInstantDeltaCompressor(
valuesBuf, 0, metric.FracDigits)
}
}
req.ResultCh <- tryAppendMeasuresResult{
ResultCode: CanAppend,
MetricType: metric.MetricType,
FracDigits: metric.FracDigits,
Since: metric.Since,
Until: metric.Until,
UntilValue: metric.UntilValue,
RootPageNo: metric.RootPageNo,
PrevPageNo: metric.LastPageNo,
TimestampsBuf: timestampsBuf,
ValuesBuf: valuesBuf,
Timestamps: timestamps,
Values: values,
}
}
type tryListCumulativePeriodsReq struct {
MetricID uint32
Since uint32
Until uint32
Aggregator *atree.CumulativeAggregator
ResponseWriter *atree.CumulativePeriodsWriter
ResultCh chan cumulativePeriodsResult
}
func (s *Database) tryListCumulativePeriods(req tryListCumulativePeriodsReq) {
metric, ok := s.metrics[req.MetricID]
if !ok {
req.ResultCh <- cumulativePeriodsResult{
ResultCode: NoMetric,
}
return
}
if metric.MetricType != diploma.Cumulative {
req.ResultCh <- cumulativePeriodsResult{
ResultCode: WrongMetricType,
}
return
}
lockEntry, ok := s.metricLockEntries[req.MetricID]
if ok {
if lockEntry.XLock {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
return
}
}
if s.startListCumulativePeriods(metric, req) {
if lockEntry != nil {
lockEntry.RLocks++
} else {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
RLocks: 1,
}
}
}
}
func (s *Database) startListCumulativePeriods(metric *_metric, req tryListCumulativePeriodsReq) bool {
if metric.Since == 0 {
req.ResultCh <- cumulativePeriodsResult{
ResultCode: QueryDone,
}
return false
}
if req.Since > metric.Until {
req.ResultCh <- cumulativePeriodsResult{
ResultCode: QueryDone,
}
return false
}
if req.Until < metric.Since {
if metric.RootPageNo > 0 {
req.ResultCh <- cumulativePeriodsResult{
ResultCode: UntilNotFound,
PageNo: metric.RootPageNo,
FracDigits: metric.FracDigits,
}
return true
} else {
req.ResultCh <- cumulativePeriodsResult{
ResultCode: QueryDone,
}
return false
}
}
timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor(
metric.TimestampsBuf,
metric.Timestamps.Size(),
)
valueDecompressor := chunkenc.NewReverseCumulativeDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
var period atree.CumulativePeriod
for {
timestamp, done := timestampDecompressor.NextValue()
if done {
break
}
value, done := valueDecompressor.NextValue()
if done {
diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug)
}
if timestamp > req.Until {
continue
}
if timestamp < req.Since {
isCompleted := req.Aggregator.FillPeriod(timestamp, value, &period)
if isCompleted {
req.ResponseWriter.WritePeriod(period)
}
req.ResultCh <- cumulativePeriodsResult{
ResultCode: QueryDone,
}
return false
}
if timestamp <= req.Until {
isCompleted := req.Aggregator.Feed(timestamp, value, &period)
if isCompleted {
req.ResponseWriter.WritePeriod(period)
}
}
}
if metric.LastPageNo > 0 {
req.ResultCh <- cumulativePeriodsResult{
ResultCode: UntilFound,
PageNo: metric.LastPageNo,
FracDigits: metric.FracDigits,
}
return true
} else {
isCompleted := req.Aggregator.FillPeriod(metric.Since, metric.SinceValue, &period)
if isCompleted {
req.ResponseWriter.WritePeriod(period)
}
req.ResultCh <- cumulativePeriodsResult{
ResultCode: QueryDone,
}
return false
}
}
type tryListInstantPeriodsReq struct {
MetricID uint32
Since uint32
Until uint32
Aggregator *atree.InstantAggregator
ResponseWriter *atree.InstantPeriodsWriter
ResultCh chan instantPeriodsResult
}
func (s *Database) tryListInstantPeriods(req tryListInstantPeriodsReq) {
metric, ok := s.metrics[req.MetricID]
if !ok {
req.ResultCh <- instantPeriodsResult{
ResultCode: NoMetric,
}
return
}
if metric.MetricType != diploma.Instant {
req.ResultCh <- instantPeriodsResult{
ResultCode: WrongMetricType,
}
return
}
lockEntry, ok := s.metricLockEntries[req.MetricID]
if ok {
if lockEntry.XLock {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
return
}
}
if s.startListInstantPeriods(metric, req) {
if lockEntry != nil {
lockEntry.RLocks++
} else {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
RLocks: 1,
}
}
}
}
func (*Database) startListInstantPeriods(metric *_metric, req tryListInstantPeriodsReq) bool {
if metric.Since == 0 {
req.ResultCh <- instantPeriodsResult{
ResultCode: QueryDone,
}
return false
}
if req.Since > metric.Until {
req.ResultCh <- instantPeriodsResult{
ResultCode: QueryDone,
}
return false
}
if req.Until < metric.Since {
// search in tree
if metric.RootPageNo > 0 {
req.ResultCh <- instantPeriodsResult{
ResultCode: UntilNotFound,
PageNo: metric.RootPageNo,
FracDigits: metric.FracDigits,
}
return true
} else {
req.ResultCh <- instantPeriodsResult{
ResultCode: QueryDone,
}
return false
}
}
timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor(
metric.TimestampsBuf,
metric.Timestamps.Size(),
)
valueDecompressor := chunkenc.NewReverseInstantDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
var period atree.InstantPeriod
for {
timestamp, done := timestampDecompressor.NextValue()
if done {
break
}
value, done := valueDecompressor.NextValue()
if done {
diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug)
}
if timestamp <= req.Until {
if timestamp < req.Since {
isCompleted := req.Aggregator.FillPeriod(timestamp, &period)
if isCompleted {
req.ResponseWriter.WritePeriod(period)
}
req.ResultCh <- instantPeriodsResult{
ResultCode: QueryDone,
}
return false
}
isCompleted := req.Aggregator.Feed(timestamp, value, &period)
if isCompleted {
req.ResponseWriter.WritePeriod(period)
}
}
}
if metric.LastPageNo > 0 {
req.ResultCh <- instantPeriodsResult{
ResultCode: UntilFound,
PageNo: metric.LastPageNo,
FracDigits: metric.FracDigits,
}
return true
} else {
isCompleted := req.Aggregator.FillPeriod(metric.Since, &period)
if isCompleted {
req.ResponseWriter.WritePeriod(period)
}
req.ResultCh <- instantPeriodsResult{
ResultCode: QueryDone,
}
return false
}
}
type tryListCurrentValuesReq struct {
MetricIDs []uint32
ResponseWriter *atree.CurrentValueWriter
ResultCh chan struct{}
}
func (s *Database) tryListCurrentValues(req tryListCurrentValuesReq) {
for _, metricID := range req.MetricIDs {
metric, ok := s.metrics[metricID]
if ok {
req.ResponseWriter.BufferValue(atree.CurrentValue{
MetricID: metricID,
Timestamp: metric.Until,
Value: metric.UntilValue,
})
}
}
req.ResultCh <- struct{}{}
}
func (s *Database) applyChanges(req txlog.Changes) {
for _, untyped := range req.Records {
switch rec := untyped.(type) {
case txlog.AddedMetric:
s.addMetric(rec)
case txlog.DeletedMetric:
s.deleteMetric(rec)
case txlog.AppendedMeasure:
s.appendMeasure(rec)
case txlog.AppendedMeasuresExtended:
s.appendMeasures(rec)
case txlog.AppendedMeasureWithOverflowExtended:
s.appendMeasureAfterOverflow(rec)
case txlog.DeletedMeasures:
s.deleteMeasures(rec)
case txlog.DeletedMeasuresSince:
s.deleteMeasuresSince(rec)
}
}
if req.ForceSnapshot || req.ExitWaitGroup != nil {
s.dumpSnapshot(req.LogNumber)
}
close(req.WaitCh)
if req.ExitWaitGroup != nil {
req.ExitWaitGroup.Done()
}
}
func (s *Database) addMetric(rec txlog.AddedMetric) {
_, ok := s.metrics[rec.MetricID]
if ok {
diploma.Abort(diploma.MetricAddedBug,
fmt.Errorf("addMetric: metric %d already added",
rec.MetricID))
}
lockEntry, ok := s.metricLockEntries[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoLockEntryBug,
fmt.Errorf("addMetric: lockEntry not found for the metric %d",
rec.MetricID))
}
if !lockEntry.XLock {
diploma.Abort(diploma.NoXLockBug,
fmt.Errorf("addMetric: xlock not set for the metric %d",
rec.MetricID))
}
var (
values diploma.ValueCompressor
timestampsBuf = conbuf.New(nil)
valuesBuf = conbuf.New(nil)
)
if rec.MetricType == diploma.Cumulative {
values = chunkenc.NewReverseCumulativeDeltaCompressor(
valuesBuf, 0, byte(rec.FracDigits))
} else {
values = chunkenc.NewReverseInstantDeltaCompressor(
valuesBuf, 0, byte(rec.FracDigits))
}
s.metrics[rec.MetricID] = &_metric{
MetricType: rec.MetricType,
FracDigits: byte(rec.FracDigits),
TimestampsBuf: timestampsBuf,
ValuesBuf: valuesBuf,
Timestamps: chunkenc.NewReverseTimeDeltaOfDeltaCompressor(timestampsBuf, 0),
Values: values,
}
lockEntry.XLock = false
delete(s.metricLockEntries, rec.MetricID)
}
func (s *Database) deleteMetric(rec txlog.DeletedMetric) {
_, ok := s.metrics[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoMetricBug,
fmt.Errorf("deleteMetric: metric %d not found",
rec.MetricID))
}
lockEntry, ok := s.metricLockEntries[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoLockEntryBug,
fmt.Errorf("deleteMetric: lockEntry not found for the metric %d",
rec.MetricID))
}
if !lockEntry.XLock {
diploma.Abort(diploma.NoXLockBug,
fmt.Errorf("deleteMetric: xlock not set for the metric %d",
rec.MetricID))
}
var addMetricReqs []tryAddMetricReq
if len(lockEntry.WaitQueue) > 0 {
for _, untyped := range lockEntry.WaitQueue {
switch req := untyped.(type) {
case tryAppendMeasureReq:
req.ResultCh <- tryAppendMeasureResult{
ResultCode: NoMetric,
}
case tryListCumulativePeriodsReq:
req.ResultCh <- cumulativePeriodsResult{
ResultCode: NoMetric,
}
case tryListInstantPeriodsReq:
req.ResultCh <- instantPeriodsResult{
ResultCode: NoMetric,
}
case tryListCumulativeMeasuresReq:
req.ResultCh <- cumulativeMeasuresResult{
ResultCode: NoMetric,
}
case tryListInstantMeasuresReq:
req.ResultCh <- instantMeasuresResult{
ResultCode: NoMetric,
}
case tryListAllCumulativeMeasuresReq:
req.ResultCh <- cumulativeMeasuresResult{
ResultCode: NoMetric,
}
case tryListAllInstantMeasuresReq:
req.ResultCh <- instantMeasuresResult{
ResultCode: NoMetric,
}
case tryAddMetricReq:
addMetricReqs = append(addMetricReqs, req)
case tryDeleteMetricReq:
req.ResultCh <- tryDeleteMetricResult{
ResultCode: NoMetric,
}
case tryDeleteMeasuresReq:
req.ResultCh <- tryDeleteMeasuresResult{
ResultCode: NoMetric,
}
case tryGetMetricReq:
req.ResultCh <- Metric{
ResultCode: NoMetric,
}
default:
diploma.Abort(diploma.UnknownMetricWaitQueueItemBug,
fmt.Errorf("bug: unknown metric wait queue item type %T", req))
}
}
}
delete(s.metrics, rec.MetricID)
delete(s.metricLockEntries, rec.MetricID)
if len(rec.FreeDataPages) > 0 {
s.dataFreeList.AddPages(rec.FreeDataPages)
s.atree.DeleteDataPages(rec.FreeDataPages)
}
if len(rec.FreeIndexPages) > 0 {
s.indexFreeList.AddPages(rec.FreeIndexPages)
s.atree.DeleteIndexPages(rec.FreeIndexPages)
}
if len(addMetricReqs) > 0 {
s.processTryAddMetricReqsImmediatelyAfterDelete(addMetricReqs)
}
}
func (s *Database) deleteMeasures(rec txlog.DeletedMeasures) {
metric, ok := s.metrics[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoMetricBug,
fmt.Errorf("deleteMeasures: metric %d not found",
rec.MetricID))
}
lockEntry, ok := s.metricLockEntries[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoLockEntryBug,
fmt.Errorf("deleteMeasures: lockEntry not found for the metric %d",
rec.MetricID))
}
if !lockEntry.XLock {
diploma.Abort(diploma.NoXLockBug,
fmt.Errorf("deleteMeasures: xlock not set for the metric %d",
rec.MetricID))
}
metric.DeleteMeasures()
lockEntry.XLock = false
//
if len(rec.FreeDataPages) > 0 {
s.dataFreeList.AddPages(rec.FreeDataPages)
s.atree.DeleteDataPages(rec.FreeDataPages)
}
if len(rec.FreeDataPages) > 0 {
s.indexFreeList.AddPages(rec.FreeIndexPages)
s.atree.DeleteIndexPages(rec.FreeIndexPages)
}
s.doAfterReleaseXLock(rec.MetricID, metric, lockEntry)
}
func (s *Database) deleteMeasuresSince(rec txlog.DeletedMeasuresSince) {
metric, ok := s.metrics[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoMetricBug,
fmt.Errorf("deleteMeasuresSince: metric %d not found",
rec.MetricID))
}
lockEntry, ok := s.metricLockEntries[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoLockEntryBug,
fmt.Errorf("deleteMeasuresSince: lockEntry not found for the metric %d",
rec.MetricID))
}
if !lockEntry.XLock {
diploma.Abort(diploma.NoXLockBug,
fmt.Errorf("deleteMeasuresSince: xlock not set for the metric %d",
rec.MetricID))
}
if rec.IsRootChanged && rec.RootPageNo == 0 {
metric.DeleteMeasures()
} else {
metric.TimestampsBuf = conbuf.NewFromBuffer(rec.TimestampsBuf)
metric.ValuesBuf = conbuf.NewFromBuffer(rec.ValuesBuf)
metric.Timestamps = chunkenc.NewReverseTimeDeltaOfDeltaCompressor(
metric.TimestampsBuf, len(rec.TimestampsBuf))
if metric.MetricType == diploma.Cumulative {
metric.Values = chunkenc.NewReverseCumulativeDeltaCompressor(
metric.ValuesBuf, len(rec.ValuesBuf), metric.FracDigits)
} else {
metric.Values = chunkenc.NewReverseInstantDeltaCompressor(
metric.ValuesBuf, len(rec.ValuesBuf), metric.FracDigits)
}
metric.Since, metric.Until = enc.GetTimeRange(rec.TimestampsBuf)
metric.SinceValue, metric.UntilValue = enc.GetValueBounds(
rec.ValuesBuf, metric.MetricType, metric.FracDigits,
)
if rec.IsRootChanged {
metric.RootPageNo = rec.RootPageNo
}
metric.LastPageNo = rec.LastPageNo
}
lockEntry.XLock = false
if len(rec.FreeDataPages) > 0 {
s.dataFreeList.AddPages(rec.FreeDataPages)
s.atree.DeleteDataPages(rec.FreeDataPages)
}
if len(rec.FreeDataPages) > 0 {
s.indexFreeList.AddPages(rec.FreeIndexPages)
s.atree.DeleteIndexPages(rec.FreeIndexPages)
}
s.doAfterReleaseXLock(rec.MetricID, metric, lockEntry)
}
func (s *Database) doAfterReleaseXLock(metricID uint32, metric *_metric, lockEntry *metricLockEntry) {
if len(lockEntry.WaitQueue) == 0 {
delete(s.metricLockEntries, metricID)
} else {
s.processMetricQueue(metricID, metric, lockEntry)
}
}