спеціалізована СУБД для зберігання та обробки показань датчиків та лічильників
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
diploma/database/proc.go

1091 lines
25 KiB

package database
import (
"fmt"
"gordenko.dev/dima/diploma"
"gordenko.dev/dima/diploma/atree"
"gordenko.dev/dima/diploma/chunkenc"
"gordenko.dev/dima/diploma/conbuf"
"gordenko.dev/dima/diploma/transform"
"gordenko.dev/dima/diploma/txlog"
)
const (
QueryDone = 1
UntilFound = 2
UntilNotFound = 3
RangeFound = 15
NoMeasures = 16
NoMetric = 4
MetricDuplicate = 5
Succeed = 6
NewPage = 7
ExpiredMeasure = 8
NonMonotonicValue = 9
CanAppend = 10
WrongMetricType = 11
NoMeasuresToDelete = 12
DeleteFromAtreeNotNeeded = 13
DeleteFromAtreeRequired = 14
)
func (s *Database) worker() {
for {
select {
case <-s.workerSignalCh:
s.DoWork()
}
}
}
func (s *Database) DoWork() {
s.mutex.Lock()
rLocksToRelease := s.rLocksToRelease
workerQueue := s.workerQueue
s.rLocksToRelease = nil
s.workerQueue = nil
s.mutex.Unlock()
for _, metricID := range rLocksToRelease {
lockEntry, ok := s.metricLockEntries[metricID]
if !ok {
diploma.Abort(diploma.NoLockEntryBug,
fmt.Errorf("drainQueues: lockEntry not found for the metric %d",
metricID))
}
if lockEntry.XLock {
diploma.Abort(diploma.XLockBug,
fmt.Errorf("drainQueues: xlock is set for the metric %d",
metricID))
}
if lockEntry.RLocks <= 0 {
diploma.Abort(diploma.NoRLockBug,
fmt.Errorf("drainQueues: rlock not set for the metric %d",
metricID))
}
lockEntry.RLocks--
if len(lockEntry.WaitQueue) > 0 {
metric, ok := s.metrics[metricID]
if !ok {
diploma.Abort(diploma.NoMetricBug,
fmt.Errorf("drainQueues: metric %d not found", metricID))
}
s.processMetricQueue(metricID, metric, lockEntry)
} else {
if lockEntry.RLocks == 0 {
delete(s.metricLockEntries, metricID)
}
}
}
for _, untyped := range workerQueue {
switch req := untyped.(type) {
case tryAppendMeasureReq:
s.tryAppendMeasure(req)
case tryAppendMeasuresReq:
s.tryAppendMeasures(req)
case txlog.Changes:
s.applyChanges(req) // all metrics only
case tryListCurrentValuesReq:
s.tryListCurrentValues(req) // all metrics only
case tryRangeScanReq:
s.tryRangeScan(req)
case tryFullScanReq:
s.tryFullScan(req)
case tryAddMetricReq:
s.tryAddMetric(req)
case tryDeleteMetricReq:
s.tryDeleteMetric(req)
case tryDeleteMeasuresReq:
s.tryDeleteMeasures(req)
case tryGetMetricReq:
s.tryGetMetric(req)
default:
diploma.Abort(diploma.UnknownWorkerQueueItemBug,
fmt.Errorf("bug: unknown worker queue item type %T", req))
}
}
}
func (s *Database) processMetricQueue(metricID uint32, metric *_metric, lockEntry *metricLockEntry) {
if len(lockEntry.WaitQueue) == 0 {
return
}
var modificationReqs []any
for _, untyped := range lockEntry.WaitQueue {
var rLockRequired bool
switch req := untyped.(type) {
case tryRangeScanReq:
rLockRequired = s.startRangeScan(metric, req)
case tryFullScanReq:
rLockRequired = s.startFullScan(metric, req)
case tryGetMetricReq:
s.tryGetMetric(req)
default:
modificationReqs = append(modificationReqs, untyped)
}
if rLockRequired {
lockEntry.RLocks++
}
}
lockEntry.WaitQueue = nil
if lockEntry.RLocks > 0 {
lockEntry.WaitQueue = modificationReqs
} else {
for idx, untyped := range modificationReqs {
switch req := untyped.(type) {
case tryAppendMeasureReq:
s.startAppendMeasure(metric, req, nil)
case tryAppendMeasuresReq:
s.startAppendMeasures(metric, req, nil)
case tryDeleteMetricReq:
s.startDeleteMetric(metric, req)
case tryDeleteMeasuresReq:
s.startDeleteMeasures(metric, req)
default:
diploma.Abort(diploma.UnknownMetricWaitQueueItemBug,
fmt.Errorf("bug: unknown metric wait queue item type %T", req))
}
lockEntry, ok := s.metricLockEntries[metricID]
if ok {
start := idx + 1
if start < len(modificationReqs) {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, modificationReqs[start:]...)
}
break
}
}
}
}
type tryAddMetricReq struct {
MetricID uint32
ResultCh chan byte
}
func (s *Database) tryAddMetric(req tryAddMetricReq) {
_, ok := s.metrics[req.MetricID]
if ok {
req.ResultCh <- MetricDuplicate
return
}
lockEntry, ok := s.metricLockEntries[req.MetricID]
if ok {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
} else {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
XLock: true,
}
req.ResultCh <- Succeed
}
}
func (s *Database) processTryAddMetricReqsImmediatelyAfterDelete(reqs []tryAddMetricReq) {
if len(reqs) == 0 {
return
}
var (
req = reqs[0]
waitQueue []any
)
if len(reqs) > 1 {
for _, req := range reqs[1:] {
waitQueue = append(waitQueue, req)
}
}
s.metricLockEntries[req.MetricID] = &metricLockEntry{
XLock: true,
WaitQueue: waitQueue,
}
req.ResultCh <- Succeed
}
type tryGetMetricReq struct {
MetricID uint32
ResultCh chan Metric
}
func (s *Database) tryGetMetric(req tryGetMetricReq) {
metric, ok := s.metrics[req.MetricID]
if ok {
req.ResultCh <- Metric{
ResultCode: Succeed,
MetricType: metric.MetricType,
FracDigits: metric.FracDigits,
}
} else {
req.ResultCh <- Metric{
ResultCode: NoMetric,
}
}
}
type tryDeleteMetricReq struct {
MetricID uint32
ResultCh chan tryDeleteMetricResult
}
func (s *Database) tryDeleteMetric(req tryDeleteMetricReq) {
metric, ok := s.metrics[req.MetricID]
if !ok {
req.ResultCh <- tryDeleteMetricResult{
ResultCode: NoMetric,
}
return
}
lockEntry, ok := s.metricLockEntries[req.MetricID]
if ok {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
} else {
s.startDeleteMetric(metric, req)
}
}
func (s *Database) startDeleteMetric(metric *_metric, req tryDeleteMetricReq) {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
XLock: true,
}
req.ResultCh <- tryDeleteMetricResult{
ResultCode: Succeed,
RootPageNo: metric.RootPageNo,
}
}
type tryDeleteMeasuresReq struct {
MetricID uint32
Since uint32
ResultCh chan tryDeleteMeasuresResult
}
func (s *Database) tryDeleteMeasures(req tryDeleteMeasuresReq) {
metric, ok := s.metrics[req.MetricID]
if !ok {
req.ResultCh <- tryDeleteMeasuresResult{
ResultCode: NoMetric,
}
return
}
if metric.Since == 0 || (req.Since > 0 && metric.Until < req.Since) {
req.ResultCh <- tryDeleteMeasuresResult{
ResultCode: NoMeasuresToDelete,
}
}
lockEntry, ok := s.metricLockEntries[req.MetricID]
if ok {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
} else {
s.startDeleteMeasures(metric, req)
}
}
func (s *Database) startDeleteMeasures(metric *_metric, req tryDeleteMeasuresReq) {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
XLock: true,
}
if metric.RootPageNo > 0 {
req.ResultCh <- tryDeleteMeasuresResult{
ResultCode: DeleteFromAtreeRequired,
RootPageNo: metric.RootPageNo,
}
} else {
req.ResultCh <- tryDeleteMeasuresResult{
ResultCode: DeleteFromAtreeNotNeeded,
}
}
}
type tryAppendMeasureReq struct {
MetricID uint32
Timestamp uint32
Value float64
ResultCh chan tryAppendMeasureResult
}
func (s *Database) tryAppendMeasure(req tryAppendMeasureReq) {
metric, ok := s.metrics[req.MetricID]
if !ok {
req.ResultCh <- tryAppendMeasureResult{
MetricID: req.MetricID,
ResultCode: NoMetric,
}
return
}
lockEntry, ok := s.metricLockEntries[req.MetricID]
if ok {
if lockEntry.XLock {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
return
}
}
s.startAppendMeasure(metric, req, lockEntry)
}
func (s *Database) startAppendMeasure(metric *_metric, req tryAppendMeasureReq, lockEntry *metricLockEntry) {
if req.Timestamp <= metric.Until {
req.ResultCh <- tryAppendMeasureResult{
MetricID: req.MetricID,
ResultCode: ExpiredMeasure,
}
return
}
if metric.MetricType == diploma.Cumulative && req.Value < metric.UntilValue {
req.ResultCh <- tryAppendMeasureResult{
MetricID: req.MetricID,
ResultCode: NonMonotonicValue,
}
return
}
extraSpace := metric.Timestamps.CalcRequiredSpace(req.Timestamp) +
metric.Values.CalcRequiredSpace(req.Value)
totalSpace := metric.Timestamps.Size() + metric.Values.Size() + extraSpace
if totalSpace <= atree.DataPagePayloadSize {
if lockEntry != nil {
lockEntry.RLocks++
} else {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
RLocks: 1,
}
}
req.ResultCh <- tryAppendMeasureResult{
MetricID: req.MetricID,
Timestamp: req.Timestamp,
Value: req.Value,
ResultCode: CanAppend,
}
} else {
if lockEntry != nil {
if lockEntry.RLocks > 0 {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
return
}
lockEntry.XLock = true
} else {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
XLock: true,
}
}
req.ResultCh <- tryAppendMeasureResult{
MetricID: req.MetricID,
Timestamp: req.Timestamp,
Value: req.Value,
ResultCode: NewPage,
FilledPage: &FilledPage{
Since: metric.Since,
RootPageNo: metric.RootPageNo,
PrevPageNo: metric.LastPageNo,
TimestampsChunks: metric.TimestampsBuf.Chunks(),
TimestampsSize: uint16(metric.Timestamps.Size()),
ValuesChunks: metric.ValuesBuf.Chunks(),
ValuesSize: uint16(metric.Values.Size()),
},
}
}
}
func (s *Database) appendMeasure(rec txlog.AppendedMeasure) {
metric, ok := s.metrics[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoMetricBug,
fmt.Errorf("appendMeasure: metric %d not found",
rec.MetricID))
}
lockEntry, ok := s.metricLockEntries[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoLockEntryBug,
fmt.Errorf("appendMeasure: lockEntry not found for the metric %d",
rec.MetricID))
}
if lockEntry.XLock {
diploma.Abort(diploma.XLockBug,
fmt.Errorf("appendMeasure: xlock is set for the metric %d",
rec.MetricID))
}
if lockEntry.RLocks <= 0 {
diploma.Abort(diploma.NoRLockBug,
fmt.Errorf("appendMeasure: rlock not set for the metric %d",
rec.MetricID))
}
if metric.Since == 0 {
metric.Since = rec.Timestamp
metric.SinceValue = rec.Value
}
metric.Timestamps.Append(rec.Timestamp)
metric.Values.Append(rec.Value)
metric.Until = rec.Timestamp
metric.UntilValue = rec.Value
lockEntry.RLocks--
if len(lockEntry.WaitQueue) > 0 {
s.processMetricQueue(rec.MetricID, metric, lockEntry)
} else {
if lockEntry.RLocks == 0 {
delete(s.metricLockEntries, rec.MetricID)
}
}
}
func (s *Database) appendMeasures(extended txlog.AppendedMeasuresExtended) {
rec := extended.Record
metric, ok := s.metrics[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoMetricBug,
fmt.Errorf("appendMeasureAfterOverflow: metric %d not found",
rec.MetricID))
}
lockEntry, ok := s.metricLockEntries[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoLockEntryBug,
fmt.Errorf("appendMeasureAfterOverflow: lockEntry not found for the metric %d",
rec.MetricID))
}
if !lockEntry.XLock {
diploma.Abort(diploma.NoXLockBug,
fmt.Errorf("appendMeasureAfterOverflow: xlock not set for the metric %d",
rec.MetricID))
}
for _, measure := range rec.Measures {
if metric.Since == 0 {
metric.Since = measure.Timestamp
metric.SinceValue = measure.Value
}
metric.Timestamps.Append(measure.Timestamp)
metric.Values.Append(measure.Value)
metric.Until = measure.Timestamp
metric.UntilValue = measure.Value
}
if !extended.HoldLock {
lockEntry.XLock = false
s.doAfterReleaseXLock(rec.MetricID, metric, lockEntry)
}
}
func (s *Database) appendMeasureAfterOverflow(extended txlog.AppendedMeasureWithOverflowExtended) {
rec := extended.Record
metric, ok := s.metrics[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoMetricBug,
fmt.Errorf("appendMeasureAfterOverflow: metric %d not found",
rec.MetricID))
}
lockEntry, ok := s.metricLockEntries[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoLockEntryBug,
fmt.Errorf("appendMeasureAfterOverflow: lockEntry not found for the metric %d",
rec.MetricID))
}
if !lockEntry.XLock {
diploma.Abort(diploma.NoXLockBug,
fmt.Errorf("appendMeasureAfterOverflow: xlock not set for the metric %d",
rec.MetricID))
}
//fmt.Printf("reinit by: %d, %v\n", rec.Timestamp, rec.Value)
metric.ReinitBy(rec.Timestamp, rec.Value)
if rec.IsRootChanged {
metric.RootPageNo = rec.RootPageNo
}
metric.LastPageNo = rec.DataPageNo
if rec.IsDataPageReused {
s.dataFreeList.DeletePages([]uint32{
rec.DataPageNo,
})
}
if len(rec.ReusedIndexPages) > 0 {
s.indexFreeList.DeletePages(rec.ReusedIndexPages)
}
if !extended.HoldLock {
lockEntry.XLock = false
s.doAfterReleaseXLock(rec.MetricID, metric, lockEntry)
}
}
type tryAppendMeasuresReq struct {
MetricID uint32
ResultCh chan tryAppendMeasuresResult
}
func (s *Database) tryAppendMeasures(req tryAppendMeasuresReq) {
metric, ok := s.metrics[req.MetricID]
if !ok {
req.ResultCh <- tryAppendMeasuresResult{
ResultCode: NoMetric,
}
return
}
lockEntry, ok := s.metricLockEntries[req.MetricID]
if ok {
if lockEntry.XLock {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
return
}
}
s.startAppendMeasures(metric, req, lockEntry)
}
func (s *Database) startAppendMeasures(metric *_metric, req tryAppendMeasuresReq, lockEntry *metricLockEntry) {
if lockEntry != nil {
if lockEntry.RLocks > 0 {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
return
}
lockEntry.XLock = true
} else {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
XLock: true,
}
}
var (
timestampsBuf *conbuf.ContinuousBuffer
valuesBuf *conbuf.ContinuousBuffer
timestamps diploma.TimestampCompressor
values diploma.ValueCompressor
)
if metric.Since > 0 {
timestampsBuf = metric.TimestampsBuf.Copy()
valuesBuf = metric.ValuesBuf.Copy()
timestamps = chunkenc.NewReverseTimeDeltaOfDeltaCompressor(
timestampsBuf, metric.Timestamps.Size())
if metric.MetricType == diploma.Cumulative {
values = chunkenc.NewReverseCumulativeDeltaCompressor(
valuesBuf, metric.Values.Size(), metric.FracDigits)
} else {
values = chunkenc.NewReverseInstantDeltaCompressor(
valuesBuf, metric.Values.Size(), metric.FracDigits)
}
} else {
timestampsBuf = conbuf.New(nil)
valuesBuf = conbuf.New(nil)
timestamps = chunkenc.NewReverseTimeDeltaOfDeltaCompressor(
timestampsBuf, 0)
if metric.MetricType == diploma.Cumulative {
values = chunkenc.NewReverseCumulativeDeltaCompressor(
valuesBuf, 0, metric.FracDigits)
} else {
values = chunkenc.NewReverseInstantDeltaCompressor(
valuesBuf, 0, metric.FracDigits)
}
}
req.ResultCh <- tryAppendMeasuresResult{
ResultCode: CanAppend,
MetricType: metric.MetricType,
FracDigits: metric.FracDigits,
Since: metric.Since,
Until: metric.Until,
UntilValue: metric.UntilValue,
RootPageNo: metric.RootPageNo,
PrevPageNo: metric.LastPageNo,
TimestampsBuf: timestampsBuf,
ValuesBuf: valuesBuf,
Timestamps: timestamps,
Values: values,
}
}
type tryRangeScanReq struct {
MetricID uint32
Since uint32
Until uint32
MetricType diploma.MetricType
ResponseWriter diploma.WorkerMeasureConsumer
ResultCh chan rangeScanResult
}
func (s *Database) tryRangeScan(req tryRangeScanReq) {
metric, ok := s.metrics[req.MetricID]
if !ok {
req.ResultCh <- rangeScanResult{
ResultCode: NoMetric,
}
return
}
if metric.MetricType != req.MetricType {
req.ResultCh <- rangeScanResult{
ResultCode: WrongMetricType,
}
return
}
lockEntry, ok := s.metricLockEntries[req.MetricID]
if ok {
if lockEntry.XLock {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
return
}
}
if s.startRangeScan(metric, req) {
if lockEntry != nil {
lockEntry.RLocks++
} else {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
RLocks: 1,
}
}
}
}
func (*Database) startRangeScan(metric *_metric, req tryRangeScanReq) bool {
if metric.Since == 0 {
req.ResultCh <- rangeScanResult{
ResultCode: QueryDone,
}
return false
}
if req.Since > metric.Until {
req.ResultCh <- rangeScanResult{
ResultCode: QueryDone,
}
return false
}
if req.Until < metric.Since {
if metric.RootPageNo > 0 {
req.ResultCh <- rangeScanResult{
ResultCode: UntilNotFound,
RootPageNo: metric.RootPageNo,
FracDigits: metric.FracDigits,
}
return true
} else {
req.ResultCh <- rangeScanResult{
ResultCode: QueryDone,
}
return false
}
}
timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor(
metric.TimestampsBuf,
metric.Timestamps.Size(),
)
var valueDecompressor diploma.ValueDecompressor
if metric.MetricType == diploma.Cumulative {
valueDecompressor = chunkenc.NewReverseCumulativeDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
} else {
valueDecompressor = chunkenc.NewReverseInstantDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
}
for {
timestamp, done := timestampDecompressor.NextValue()
if done {
break
}
value, done := valueDecompressor.NextValue()
if done {
diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug)
}
if timestamp <= req.Until {
req.ResponseWriter.FeedNoSend(timestamp, value)
if timestamp < req.Since {
req.ResultCh <- rangeScanResult{
ResultCode: QueryDone,
}
return false
}
}
}
if metric.LastPageNo > 0 {
req.ResultCh <- rangeScanResult{
ResultCode: UntilFound,
LastPageNo: metric.LastPageNo,
FracDigits: metric.FracDigits,
}
return true
} else {
req.ResultCh <- rangeScanResult{
ResultCode: QueryDone,
}
return false
}
}
type tryFullScanReq struct {
MetricID uint32
MetricType diploma.MetricType
ResponseWriter diploma.WorkerMeasureConsumer
ResultCh chan fullScanResult
}
func (s *Database) tryFullScan(req tryFullScanReq) {
metric, ok := s.metrics[req.MetricID]
if !ok {
req.ResultCh <- fullScanResult{
ResultCode: NoMetric,
}
return
}
if metric.MetricType != req.MetricType {
req.ResultCh <- fullScanResult{
ResultCode: WrongMetricType,
}
return
}
lockEntry, ok := s.metricLockEntries[req.MetricID]
if ok {
if lockEntry.XLock {
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
return
}
}
if s.startFullScan(metric, req) {
if lockEntry != nil {
lockEntry.RLocks++
} else {
s.metricLockEntries[req.MetricID] = &metricLockEntry{
RLocks: 1,
}
}
}
}
func (*Database) startFullScan(metric *_metric, req tryFullScanReq) bool {
if metric.Since == 0 {
req.ResultCh <- fullScanResult{
ResultCode: QueryDone,
}
return false
}
timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor(
metric.TimestampsBuf,
metric.Timestamps.Size(),
)
var valueDecompressor diploma.ValueDecompressor
if metric.MetricType == diploma.Cumulative {
valueDecompressor = chunkenc.NewReverseCumulativeDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
} else {
valueDecompressor = chunkenc.NewReverseInstantDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
}
for {
timestamp, done := timestampDecompressor.NextValue()
if done {
break
}
value, done := valueDecompressor.NextValue()
if done {
diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug)
}
req.ResponseWriter.FeedNoSend(timestamp, value)
}
if metric.LastPageNo > 0 {
req.ResultCh <- fullScanResult{
ResultCode: UntilFound,
LastPageNo: metric.LastPageNo,
FracDigits: metric.FracDigits,
}
return true
} else {
req.ResultCh <- fullScanResult{
ResultCode: QueryDone,
}
return false
}
}
type tryListCurrentValuesReq struct {
MetricIDs []uint32
ResponseWriter *transform.CurrentValueWriter
ResultCh chan struct{}
}
func (s *Database) tryListCurrentValues(req tryListCurrentValuesReq) {
for _, metricID := range req.MetricIDs {
metric, ok := s.metrics[metricID]
if ok {
req.ResponseWriter.BufferValue(transform.CurrentValue{
MetricID: metricID,
Timestamp: metric.Until,
Value: metric.UntilValue,
})
}
}
req.ResultCh <- struct{}{}
}
///////////////////////////////////////////////////////
func (s *Database) applyChanges(req txlog.Changes) {
for _, untyped := range req.Records {
switch rec := untyped.(type) {
case txlog.AddedMetric:
s.addMetric(rec)
case txlog.DeletedMetric:
s.deleteMetric(rec)
case txlog.AppendedMeasure:
s.appendMeasure(rec)
case txlog.AppendedMeasuresExtended:
s.appendMeasures(rec)
case txlog.AppendedMeasureWithOverflowExtended:
s.appendMeasureAfterOverflow(rec)
case txlog.DeletedMeasures:
s.deleteMeasures(rec)
}
}
if req.ForceSnapshot || req.ExitWaitGroup != nil {
s.dumpSnapshot(req.LogNumber)
}
close(req.WaitCh)
if req.ExitWaitGroup != nil {
req.ExitWaitGroup.Done()
}
}
func (s *Database) addMetric(rec txlog.AddedMetric) {
_, ok := s.metrics[rec.MetricID]
if ok {
diploma.Abort(diploma.MetricAddedBug,
fmt.Errorf("addMetric: metric %d already added",
rec.MetricID))
}
lockEntry, ok := s.metricLockEntries[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoLockEntryBug,
fmt.Errorf("addMetric: lockEntry not found for the metric %d",
rec.MetricID))
}
if !lockEntry.XLock {
diploma.Abort(diploma.NoXLockBug,
fmt.Errorf("addMetric: xlock not set for the metric %d",
rec.MetricID))
}
var (
values diploma.ValueCompressor
timestampsBuf = conbuf.New(nil)
valuesBuf = conbuf.New(nil)
)
if rec.MetricType == diploma.Cumulative {
values = chunkenc.NewReverseCumulativeDeltaCompressor(
valuesBuf, 0, byte(rec.FracDigits))
} else {
values = chunkenc.NewReverseInstantDeltaCompressor(
valuesBuf, 0, byte(rec.FracDigits))
}
s.metrics[rec.MetricID] = &_metric{
MetricType: rec.MetricType,
FracDigits: byte(rec.FracDigits),
TimestampsBuf: timestampsBuf,
ValuesBuf: valuesBuf,
Timestamps: chunkenc.NewReverseTimeDeltaOfDeltaCompressor(timestampsBuf, 0),
Values: values,
}
lockEntry.XLock = false
delete(s.metricLockEntries, rec.MetricID)
}
func (s *Database) deleteMetric(rec txlog.DeletedMetric) {
_, ok := s.metrics[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoMetricBug,
fmt.Errorf("deleteMetric: metric %d not found",
rec.MetricID))
}
lockEntry, ok := s.metricLockEntries[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoLockEntryBug,
fmt.Errorf("deleteMetric: lockEntry not found for the metric %d",
rec.MetricID))
}
if !lockEntry.XLock {
diploma.Abort(diploma.NoXLockBug,
fmt.Errorf("deleteMetric: xlock not set for the metric %d",
rec.MetricID))
}
var addMetricReqs []tryAddMetricReq
if len(lockEntry.WaitQueue) > 0 {
for _, untyped := range lockEntry.WaitQueue {
switch req := untyped.(type) {
case tryAppendMeasureReq:
req.ResultCh <- tryAppendMeasureResult{
MetricID: req.MetricID,
ResultCode: NoMetric,
}
case tryRangeScanReq:
req.ResultCh <- rangeScanResult{
ResultCode: NoMetric,
}
case tryFullScanReq:
req.ResultCh <- fullScanResult{
ResultCode: NoMetric,
}
case tryAddMetricReq:
addMetricReqs = append(addMetricReqs, req)
case tryDeleteMetricReq:
req.ResultCh <- tryDeleteMetricResult{
ResultCode: NoMetric,
}
case tryDeleteMeasuresReq:
req.ResultCh <- tryDeleteMeasuresResult{
ResultCode: NoMetric,
}
case tryGetMetricReq:
req.ResultCh <- Metric{
ResultCode: NoMetric,
}
default:
diploma.Abort(diploma.UnknownMetricWaitQueueItemBug,
fmt.Errorf("bug: unknown metric wait queue item type %T", req))
}
}
}
delete(s.metrics, rec.MetricID)
delete(s.metricLockEntries, rec.MetricID)
if len(rec.FreeDataPages) > 0 {
s.dataFreeList.AddPages(rec.FreeDataPages)
}
if len(rec.FreeIndexPages) > 0 {
s.indexFreeList.AddPages(rec.FreeIndexPages)
}
if len(addMetricReqs) > 0 {
s.processTryAddMetricReqsImmediatelyAfterDelete(addMetricReqs)
}
}
func (s *Database) deleteMeasures(rec txlog.DeletedMeasures) {
metric, ok := s.metrics[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoMetricBug,
fmt.Errorf("deleteMeasures: metric %d not found",
rec.MetricID))
}
lockEntry, ok := s.metricLockEntries[rec.MetricID]
if !ok {
diploma.Abort(diploma.NoLockEntryBug,
fmt.Errorf("deleteMeasures: lockEntry not found for the metric %d",
rec.MetricID))
}
if !lockEntry.XLock {
diploma.Abort(diploma.NoXLockBug,
fmt.Errorf("deleteMeasures: xlock not set for the metric %d",
rec.MetricID))
}
metric.DeleteMeasures()
lockEntry.XLock = false
if len(rec.FreeDataPages) > 0 {
s.dataFreeList.AddPages(rec.FreeDataPages)
}
if len(rec.FreeDataPages) > 0 {
s.indexFreeList.AddPages(rec.FreeIndexPages)
}
s.doAfterReleaseXLock(rec.MetricID, metric, lockEntry)
}
func (s *Database) doAfterReleaseXLock(metricID uint32, metric *_metric, lockEntry *metricLockEntry) {
if len(lockEntry.WaitQueue) == 0 {
delete(s.metricLockEntries, metricID)
} else {
s.processMetricQueue(metricID, metric, lockEntry)
}
}