You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1091 lines
25 KiB
1091 lines
25 KiB
package database
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"gordenko.dev/dima/diploma"
|
|
"gordenko.dev/dima/diploma/atree"
|
|
"gordenko.dev/dima/diploma/chunkenc"
|
|
"gordenko.dev/dima/diploma/conbuf"
|
|
"gordenko.dev/dima/diploma/transform"
|
|
"gordenko.dev/dima/diploma/txlog"
|
|
)
|
|
|
|
const (
|
|
QueryDone = 1
|
|
UntilFound = 2
|
|
UntilNotFound = 3
|
|
RangeFound = 15
|
|
NoMeasures = 16
|
|
NoMetric = 4
|
|
MetricDuplicate = 5
|
|
Succeed = 6
|
|
NewPage = 7
|
|
ExpiredMeasure = 8
|
|
NonMonotonicValue = 9
|
|
CanAppend = 10
|
|
WrongMetricType = 11
|
|
NoMeasuresToDelete = 12
|
|
DeleteFromAtreeNotNeeded = 13
|
|
DeleteFromAtreeRequired = 14
|
|
)
|
|
|
|
func (s *Database) worker() {
|
|
for {
|
|
select {
|
|
case <-s.workerSignalCh:
|
|
s.DoWork()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Database) DoWork() {
|
|
s.mutex.Lock()
|
|
rLocksToRelease := s.rLocksToRelease
|
|
workerQueue := s.workerQueue
|
|
s.rLocksToRelease = nil
|
|
s.workerQueue = nil
|
|
s.mutex.Unlock()
|
|
|
|
for _, metricID := range rLocksToRelease {
|
|
lockEntry, ok := s.metricLockEntries[metricID]
|
|
if !ok {
|
|
diploma.Abort(diploma.NoLockEntryBug,
|
|
fmt.Errorf("drainQueues: lockEntry not found for the metric %d",
|
|
metricID))
|
|
}
|
|
|
|
if lockEntry.XLock {
|
|
diploma.Abort(diploma.XLockBug,
|
|
fmt.Errorf("drainQueues: xlock is set for the metric %d",
|
|
metricID))
|
|
}
|
|
|
|
if lockEntry.RLocks <= 0 {
|
|
diploma.Abort(diploma.NoRLockBug,
|
|
fmt.Errorf("drainQueues: rlock not set for the metric %d",
|
|
metricID))
|
|
}
|
|
|
|
lockEntry.RLocks--
|
|
|
|
if len(lockEntry.WaitQueue) > 0 {
|
|
metric, ok := s.metrics[metricID]
|
|
if !ok {
|
|
diploma.Abort(diploma.NoMetricBug,
|
|
fmt.Errorf("drainQueues: metric %d not found", metricID))
|
|
}
|
|
s.processMetricQueue(metricID, metric, lockEntry)
|
|
} else {
|
|
if lockEntry.RLocks == 0 {
|
|
delete(s.metricLockEntries, metricID)
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, untyped := range workerQueue {
|
|
switch req := untyped.(type) {
|
|
case tryAppendMeasureReq:
|
|
s.tryAppendMeasure(req)
|
|
|
|
case tryAppendMeasuresReq:
|
|
s.tryAppendMeasures(req)
|
|
|
|
case txlog.Changes:
|
|
s.applyChanges(req) // all metrics only
|
|
|
|
case tryListCurrentValuesReq:
|
|
s.tryListCurrentValues(req) // all metrics only
|
|
|
|
case tryRangeScanReq:
|
|
s.tryRangeScan(req)
|
|
|
|
case tryFullScanReq:
|
|
s.tryFullScan(req)
|
|
|
|
case tryAddMetricReq:
|
|
s.tryAddMetric(req)
|
|
|
|
case tryDeleteMetricReq:
|
|
s.tryDeleteMetric(req)
|
|
|
|
case tryDeleteMeasuresReq:
|
|
s.tryDeleteMeasures(req)
|
|
|
|
case tryGetMetricReq:
|
|
s.tryGetMetric(req)
|
|
|
|
default:
|
|
diploma.Abort(diploma.UnknownWorkerQueueItemBug,
|
|
fmt.Errorf("bug: unknown worker queue item type %T", req))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Database) processMetricQueue(metricID uint32, metric *_metric, lockEntry *metricLockEntry) {
|
|
if len(lockEntry.WaitQueue) == 0 {
|
|
return
|
|
}
|
|
|
|
var modificationReqs []any
|
|
|
|
for _, untyped := range lockEntry.WaitQueue {
|
|
var rLockRequired bool
|
|
switch req := untyped.(type) {
|
|
case tryRangeScanReq:
|
|
rLockRequired = s.startRangeScan(metric, req)
|
|
|
|
case tryFullScanReq:
|
|
rLockRequired = s.startFullScan(metric, req)
|
|
|
|
case tryGetMetricReq:
|
|
s.tryGetMetric(req)
|
|
|
|
default:
|
|
modificationReqs = append(modificationReqs, untyped)
|
|
}
|
|
|
|
if rLockRequired {
|
|
lockEntry.RLocks++
|
|
}
|
|
}
|
|
lockEntry.WaitQueue = nil
|
|
if lockEntry.RLocks > 0 {
|
|
lockEntry.WaitQueue = modificationReqs
|
|
} else {
|
|
for idx, untyped := range modificationReqs {
|
|
switch req := untyped.(type) {
|
|
case tryAppendMeasureReq:
|
|
s.startAppendMeasure(metric, req, nil)
|
|
|
|
case tryAppendMeasuresReq:
|
|
s.startAppendMeasures(metric, req, nil)
|
|
|
|
case tryDeleteMetricReq:
|
|
s.startDeleteMetric(metric, req)
|
|
|
|
case tryDeleteMeasuresReq:
|
|
s.startDeleteMeasures(metric, req)
|
|
|
|
default:
|
|
diploma.Abort(diploma.UnknownMetricWaitQueueItemBug,
|
|
fmt.Errorf("bug: unknown metric wait queue item type %T", req))
|
|
}
|
|
|
|
lockEntry, ok := s.metricLockEntries[metricID]
|
|
if ok {
|
|
start := idx + 1
|
|
if start < len(modificationReqs) {
|
|
lockEntry.WaitQueue = append(lockEntry.WaitQueue, modificationReqs[start:]...)
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type tryAddMetricReq struct {
|
|
MetricID uint32
|
|
ResultCh chan byte
|
|
}
|
|
|
|
func (s *Database) tryAddMetric(req tryAddMetricReq) {
|
|
_, ok := s.metrics[req.MetricID]
|
|
if ok {
|
|
req.ResultCh <- MetricDuplicate
|
|
return
|
|
}
|
|
|
|
lockEntry, ok := s.metricLockEntries[req.MetricID]
|
|
if ok {
|
|
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
|
|
} else {
|
|
s.metricLockEntries[req.MetricID] = &metricLockEntry{
|
|
XLock: true,
|
|
}
|
|
req.ResultCh <- Succeed
|
|
}
|
|
}
|
|
|
|
func (s *Database) processTryAddMetricReqsImmediatelyAfterDelete(reqs []tryAddMetricReq) {
|
|
if len(reqs) == 0 {
|
|
return
|
|
}
|
|
var (
|
|
req = reqs[0]
|
|
waitQueue []any
|
|
)
|
|
if len(reqs) > 1 {
|
|
for _, req := range reqs[1:] {
|
|
waitQueue = append(waitQueue, req)
|
|
}
|
|
}
|
|
s.metricLockEntries[req.MetricID] = &metricLockEntry{
|
|
XLock: true,
|
|
WaitQueue: waitQueue,
|
|
}
|
|
req.ResultCh <- Succeed
|
|
}
|
|
|
|
type tryGetMetricReq struct {
|
|
MetricID uint32
|
|
ResultCh chan Metric
|
|
}
|
|
|
|
func (s *Database) tryGetMetric(req tryGetMetricReq) {
|
|
metric, ok := s.metrics[req.MetricID]
|
|
if ok {
|
|
req.ResultCh <- Metric{
|
|
ResultCode: Succeed,
|
|
MetricType: metric.MetricType,
|
|
FracDigits: metric.FracDigits,
|
|
}
|
|
} else {
|
|
req.ResultCh <- Metric{
|
|
ResultCode: NoMetric,
|
|
}
|
|
}
|
|
}
|
|
|
|
type tryDeleteMetricReq struct {
|
|
MetricID uint32
|
|
ResultCh chan tryDeleteMetricResult
|
|
}
|
|
|
|
func (s *Database) tryDeleteMetric(req tryDeleteMetricReq) {
|
|
metric, ok := s.metrics[req.MetricID]
|
|
if !ok {
|
|
req.ResultCh <- tryDeleteMetricResult{
|
|
ResultCode: NoMetric,
|
|
}
|
|
return
|
|
}
|
|
|
|
lockEntry, ok := s.metricLockEntries[req.MetricID]
|
|
if ok {
|
|
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
|
|
} else {
|
|
s.startDeleteMetric(metric, req)
|
|
}
|
|
}
|
|
|
|
func (s *Database) startDeleteMetric(metric *_metric, req tryDeleteMetricReq) {
|
|
s.metricLockEntries[req.MetricID] = &metricLockEntry{
|
|
XLock: true,
|
|
}
|
|
req.ResultCh <- tryDeleteMetricResult{
|
|
ResultCode: Succeed,
|
|
RootPageNo: metric.RootPageNo,
|
|
}
|
|
}
|
|
|
|
type tryDeleteMeasuresReq struct {
|
|
MetricID uint32
|
|
Since uint32
|
|
ResultCh chan tryDeleteMeasuresResult
|
|
}
|
|
|
|
func (s *Database) tryDeleteMeasures(req tryDeleteMeasuresReq) {
|
|
metric, ok := s.metrics[req.MetricID]
|
|
if !ok {
|
|
req.ResultCh <- tryDeleteMeasuresResult{
|
|
ResultCode: NoMetric,
|
|
}
|
|
return
|
|
}
|
|
|
|
if metric.Since == 0 || (req.Since > 0 && metric.Until < req.Since) {
|
|
req.ResultCh <- tryDeleteMeasuresResult{
|
|
ResultCode: NoMeasuresToDelete,
|
|
}
|
|
}
|
|
|
|
lockEntry, ok := s.metricLockEntries[req.MetricID]
|
|
if ok {
|
|
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
|
|
} else {
|
|
s.startDeleteMeasures(metric, req)
|
|
}
|
|
}
|
|
|
|
func (s *Database) startDeleteMeasures(metric *_metric, req tryDeleteMeasuresReq) {
|
|
s.metricLockEntries[req.MetricID] = &metricLockEntry{
|
|
XLock: true,
|
|
}
|
|
|
|
if metric.RootPageNo > 0 {
|
|
req.ResultCh <- tryDeleteMeasuresResult{
|
|
ResultCode: DeleteFromAtreeRequired,
|
|
RootPageNo: metric.RootPageNo,
|
|
}
|
|
} else {
|
|
req.ResultCh <- tryDeleteMeasuresResult{
|
|
ResultCode: DeleteFromAtreeNotNeeded,
|
|
}
|
|
}
|
|
}
|
|
|
|
type tryAppendMeasureReq struct {
|
|
MetricID uint32
|
|
Timestamp uint32
|
|
Value float64
|
|
ResultCh chan tryAppendMeasureResult
|
|
}
|
|
|
|
func (s *Database) tryAppendMeasure(req tryAppendMeasureReq) {
|
|
metric, ok := s.metrics[req.MetricID]
|
|
if !ok {
|
|
req.ResultCh <- tryAppendMeasureResult{
|
|
MetricID: req.MetricID,
|
|
ResultCode: NoMetric,
|
|
}
|
|
return
|
|
}
|
|
|
|
lockEntry, ok := s.metricLockEntries[req.MetricID]
|
|
if ok {
|
|
if lockEntry.XLock {
|
|
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
|
|
return
|
|
}
|
|
}
|
|
s.startAppendMeasure(metric, req, lockEntry)
|
|
}
|
|
|
|
func (s *Database) startAppendMeasure(metric *_metric, req tryAppendMeasureReq, lockEntry *metricLockEntry) {
|
|
if req.Timestamp <= metric.Until {
|
|
req.ResultCh <- tryAppendMeasureResult{
|
|
MetricID: req.MetricID,
|
|
ResultCode: ExpiredMeasure,
|
|
}
|
|
return
|
|
}
|
|
|
|
if metric.MetricType == diploma.Cumulative && req.Value < metric.UntilValue {
|
|
req.ResultCh <- tryAppendMeasureResult{
|
|
MetricID: req.MetricID,
|
|
ResultCode: NonMonotonicValue,
|
|
}
|
|
return
|
|
}
|
|
|
|
extraSpace := metric.Timestamps.CalcRequiredSpace(req.Timestamp) +
|
|
metric.Values.CalcRequiredSpace(req.Value)
|
|
|
|
totalSpace := metric.Timestamps.Size() + metric.Values.Size() + extraSpace
|
|
|
|
if totalSpace <= atree.DataPagePayloadSize {
|
|
if lockEntry != nil {
|
|
lockEntry.RLocks++
|
|
} else {
|
|
s.metricLockEntries[req.MetricID] = &metricLockEntry{
|
|
RLocks: 1,
|
|
}
|
|
}
|
|
req.ResultCh <- tryAppendMeasureResult{
|
|
MetricID: req.MetricID,
|
|
Timestamp: req.Timestamp,
|
|
Value: req.Value,
|
|
ResultCode: CanAppend,
|
|
}
|
|
} else {
|
|
if lockEntry != nil {
|
|
if lockEntry.RLocks > 0 {
|
|
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
|
|
return
|
|
}
|
|
lockEntry.XLock = true
|
|
} else {
|
|
s.metricLockEntries[req.MetricID] = &metricLockEntry{
|
|
XLock: true,
|
|
}
|
|
}
|
|
|
|
req.ResultCh <- tryAppendMeasureResult{
|
|
MetricID: req.MetricID,
|
|
Timestamp: req.Timestamp,
|
|
Value: req.Value,
|
|
ResultCode: NewPage,
|
|
FilledPage: &FilledPage{
|
|
Since: metric.Since,
|
|
RootPageNo: metric.RootPageNo,
|
|
PrevPageNo: metric.LastPageNo,
|
|
TimestampsChunks: metric.TimestampsBuf.Chunks(),
|
|
TimestampsSize: uint16(metric.Timestamps.Size()),
|
|
ValuesChunks: metric.ValuesBuf.Chunks(),
|
|
ValuesSize: uint16(metric.Values.Size()),
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Database) appendMeasure(rec txlog.AppendedMeasure) {
|
|
metric, ok := s.metrics[rec.MetricID]
|
|
if !ok {
|
|
diploma.Abort(diploma.NoMetricBug,
|
|
fmt.Errorf("appendMeasure: metric %d not found",
|
|
rec.MetricID))
|
|
}
|
|
|
|
lockEntry, ok := s.metricLockEntries[rec.MetricID]
|
|
if !ok {
|
|
diploma.Abort(diploma.NoLockEntryBug,
|
|
fmt.Errorf("appendMeasure: lockEntry not found for the metric %d",
|
|
rec.MetricID))
|
|
}
|
|
|
|
if lockEntry.XLock {
|
|
diploma.Abort(diploma.XLockBug,
|
|
fmt.Errorf("appendMeasure: xlock is set for the metric %d",
|
|
rec.MetricID))
|
|
}
|
|
|
|
if lockEntry.RLocks <= 0 {
|
|
diploma.Abort(diploma.NoRLockBug,
|
|
fmt.Errorf("appendMeasure: rlock not set for the metric %d",
|
|
rec.MetricID))
|
|
}
|
|
|
|
if metric.Since == 0 {
|
|
metric.Since = rec.Timestamp
|
|
metric.SinceValue = rec.Value
|
|
}
|
|
|
|
metric.Timestamps.Append(rec.Timestamp)
|
|
metric.Values.Append(rec.Value)
|
|
metric.Until = rec.Timestamp
|
|
metric.UntilValue = rec.Value
|
|
|
|
lockEntry.RLocks--
|
|
if len(lockEntry.WaitQueue) > 0 {
|
|
s.processMetricQueue(rec.MetricID, metric, lockEntry)
|
|
} else {
|
|
if lockEntry.RLocks == 0 {
|
|
delete(s.metricLockEntries, rec.MetricID)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Database) appendMeasures(extended txlog.AppendedMeasuresExtended) {
|
|
rec := extended.Record
|
|
metric, ok := s.metrics[rec.MetricID]
|
|
if !ok {
|
|
diploma.Abort(diploma.NoMetricBug,
|
|
fmt.Errorf("appendMeasureAfterOverflow: metric %d not found",
|
|
rec.MetricID))
|
|
}
|
|
|
|
lockEntry, ok := s.metricLockEntries[rec.MetricID]
|
|
if !ok {
|
|
diploma.Abort(diploma.NoLockEntryBug,
|
|
fmt.Errorf("appendMeasureAfterOverflow: lockEntry not found for the metric %d",
|
|
rec.MetricID))
|
|
}
|
|
|
|
if !lockEntry.XLock {
|
|
diploma.Abort(diploma.NoXLockBug,
|
|
fmt.Errorf("appendMeasureAfterOverflow: xlock not set for the metric %d",
|
|
rec.MetricID))
|
|
}
|
|
|
|
for _, measure := range rec.Measures {
|
|
if metric.Since == 0 {
|
|
metric.Since = measure.Timestamp
|
|
metric.SinceValue = measure.Value
|
|
}
|
|
|
|
metric.Timestamps.Append(measure.Timestamp)
|
|
metric.Values.Append(measure.Value)
|
|
metric.Until = measure.Timestamp
|
|
metric.UntilValue = measure.Value
|
|
}
|
|
|
|
if !extended.HoldLock {
|
|
lockEntry.XLock = false
|
|
s.doAfterReleaseXLock(rec.MetricID, metric, lockEntry)
|
|
}
|
|
}
|
|
|
|
func (s *Database) appendMeasureAfterOverflow(extended txlog.AppendedMeasureWithOverflowExtended) {
|
|
rec := extended.Record
|
|
metric, ok := s.metrics[rec.MetricID]
|
|
if !ok {
|
|
diploma.Abort(diploma.NoMetricBug,
|
|
fmt.Errorf("appendMeasureAfterOverflow: metric %d not found",
|
|
rec.MetricID))
|
|
}
|
|
|
|
lockEntry, ok := s.metricLockEntries[rec.MetricID]
|
|
if !ok {
|
|
diploma.Abort(diploma.NoLockEntryBug,
|
|
fmt.Errorf("appendMeasureAfterOverflow: lockEntry not found for the metric %d",
|
|
rec.MetricID))
|
|
}
|
|
|
|
if !lockEntry.XLock {
|
|
diploma.Abort(diploma.NoXLockBug,
|
|
fmt.Errorf("appendMeasureAfterOverflow: xlock not set for the metric %d",
|
|
rec.MetricID))
|
|
}
|
|
|
|
//fmt.Printf("reinit by: %d, %v\n", rec.Timestamp, rec.Value)
|
|
|
|
metric.ReinitBy(rec.Timestamp, rec.Value)
|
|
if rec.IsRootChanged {
|
|
metric.RootPageNo = rec.RootPageNo
|
|
}
|
|
metric.LastPageNo = rec.DataPageNo
|
|
|
|
if rec.IsDataPageReused {
|
|
s.dataFreeList.DeletePages([]uint32{
|
|
rec.DataPageNo,
|
|
})
|
|
}
|
|
|
|
if len(rec.ReusedIndexPages) > 0 {
|
|
s.indexFreeList.DeletePages(rec.ReusedIndexPages)
|
|
}
|
|
|
|
if !extended.HoldLock {
|
|
lockEntry.XLock = false
|
|
s.doAfterReleaseXLock(rec.MetricID, metric, lockEntry)
|
|
}
|
|
}
|
|
|
|
type tryAppendMeasuresReq struct {
|
|
MetricID uint32
|
|
ResultCh chan tryAppendMeasuresResult
|
|
}
|
|
|
|
func (s *Database) tryAppendMeasures(req tryAppendMeasuresReq) {
|
|
metric, ok := s.metrics[req.MetricID]
|
|
if !ok {
|
|
req.ResultCh <- tryAppendMeasuresResult{
|
|
ResultCode: NoMetric,
|
|
}
|
|
return
|
|
}
|
|
|
|
lockEntry, ok := s.metricLockEntries[req.MetricID]
|
|
if ok {
|
|
if lockEntry.XLock {
|
|
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
|
|
return
|
|
}
|
|
}
|
|
s.startAppendMeasures(metric, req, lockEntry)
|
|
}
|
|
|
|
func (s *Database) startAppendMeasures(metric *_metric, req tryAppendMeasuresReq, lockEntry *metricLockEntry) {
|
|
if lockEntry != nil {
|
|
if lockEntry.RLocks > 0 {
|
|
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
|
|
return
|
|
}
|
|
lockEntry.XLock = true
|
|
} else {
|
|
s.metricLockEntries[req.MetricID] = &metricLockEntry{
|
|
XLock: true,
|
|
}
|
|
}
|
|
|
|
var (
|
|
timestampsBuf *conbuf.ContinuousBuffer
|
|
valuesBuf *conbuf.ContinuousBuffer
|
|
timestamps diploma.TimestampCompressor
|
|
values diploma.ValueCompressor
|
|
)
|
|
|
|
if metric.Since > 0 {
|
|
timestampsBuf = metric.TimestampsBuf.Copy()
|
|
valuesBuf = metric.ValuesBuf.Copy()
|
|
|
|
timestamps = chunkenc.NewReverseTimeDeltaOfDeltaCompressor(
|
|
timestampsBuf, metric.Timestamps.Size())
|
|
|
|
if metric.MetricType == diploma.Cumulative {
|
|
values = chunkenc.NewReverseCumulativeDeltaCompressor(
|
|
valuesBuf, metric.Values.Size(), metric.FracDigits)
|
|
} else {
|
|
values = chunkenc.NewReverseInstantDeltaCompressor(
|
|
valuesBuf, metric.Values.Size(), metric.FracDigits)
|
|
}
|
|
} else {
|
|
timestampsBuf = conbuf.New(nil)
|
|
valuesBuf = conbuf.New(nil)
|
|
timestamps = chunkenc.NewReverseTimeDeltaOfDeltaCompressor(
|
|
timestampsBuf, 0)
|
|
if metric.MetricType == diploma.Cumulative {
|
|
values = chunkenc.NewReverseCumulativeDeltaCompressor(
|
|
valuesBuf, 0, metric.FracDigits)
|
|
} else {
|
|
values = chunkenc.NewReverseInstantDeltaCompressor(
|
|
valuesBuf, 0, metric.FracDigits)
|
|
}
|
|
}
|
|
|
|
req.ResultCh <- tryAppendMeasuresResult{
|
|
ResultCode: CanAppend,
|
|
MetricType: metric.MetricType,
|
|
FracDigits: metric.FracDigits,
|
|
Since: metric.Since,
|
|
Until: metric.Until,
|
|
UntilValue: metric.UntilValue,
|
|
RootPageNo: metric.RootPageNo,
|
|
PrevPageNo: metric.LastPageNo,
|
|
TimestampsBuf: timestampsBuf,
|
|
ValuesBuf: valuesBuf,
|
|
Timestamps: timestamps,
|
|
Values: values,
|
|
}
|
|
}
|
|
|
|
type tryRangeScanReq struct {
|
|
MetricID uint32
|
|
Since uint32
|
|
Until uint32
|
|
MetricType diploma.MetricType
|
|
ResponseWriter diploma.WorkerMeasureConsumer
|
|
ResultCh chan rangeScanResult
|
|
}
|
|
|
|
func (s *Database) tryRangeScan(req tryRangeScanReq) {
|
|
metric, ok := s.metrics[req.MetricID]
|
|
if !ok {
|
|
req.ResultCh <- rangeScanResult{
|
|
ResultCode: NoMetric,
|
|
}
|
|
return
|
|
}
|
|
if metric.MetricType != req.MetricType {
|
|
req.ResultCh <- rangeScanResult{
|
|
ResultCode: WrongMetricType,
|
|
}
|
|
return
|
|
}
|
|
|
|
lockEntry, ok := s.metricLockEntries[req.MetricID]
|
|
if ok {
|
|
if lockEntry.XLock {
|
|
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
|
|
return
|
|
}
|
|
}
|
|
|
|
if s.startRangeScan(metric, req) {
|
|
if lockEntry != nil {
|
|
lockEntry.RLocks++
|
|
} else {
|
|
s.metricLockEntries[req.MetricID] = &metricLockEntry{
|
|
RLocks: 1,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (*Database) startRangeScan(metric *_metric, req tryRangeScanReq) bool {
|
|
if metric.Since == 0 {
|
|
req.ResultCh <- rangeScanResult{
|
|
ResultCode: QueryDone,
|
|
}
|
|
return false
|
|
}
|
|
|
|
if req.Since > metric.Until {
|
|
req.ResultCh <- rangeScanResult{
|
|
ResultCode: QueryDone,
|
|
}
|
|
return false
|
|
}
|
|
|
|
if req.Until < metric.Since {
|
|
if metric.RootPageNo > 0 {
|
|
req.ResultCh <- rangeScanResult{
|
|
ResultCode: UntilNotFound,
|
|
RootPageNo: metric.RootPageNo,
|
|
FracDigits: metric.FracDigits,
|
|
}
|
|
return true
|
|
} else {
|
|
req.ResultCh <- rangeScanResult{
|
|
ResultCode: QueryDone,
|
|
}
|
|
return false
|
|
}
|
|
}
|
|
|
|
timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor(
|
|
metric.TimestampsBuf,
|
|
metric.Timestamps.Size(),
|
|
)
|
|
|
|
var valueDecompressor diploma.ValueDecompressor
|
|
if metric.MetricType == diploma.Cumulative {
|
|
valueDecompressor = chunkenc.NewReverseCumulativeDeltaDecompressor(
|
|
metric.ValuesBuf,
|
|
metric.Values.Size(),
|
|
metric.FracDigits,
|
|
)
|
|
} else {
|
|
valueDecompressor = chunkenc.NewReverseInstantDeltaDecompressor(
|
|
metric.ValuesBuf,
|
|
metric.Values.Size(),
|
|
metric.FracDigits,
|
|
)
|
|
}
|
|
|
|
for {
|
|
timestamp, done := timestampDecompressor.NextValue()
|
|
if done {
|
|
break
|
|
}
|
|
|
|
value, done := valueDecompressor.NextValue()
|
|
if done {
|
|
diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug)
|
|
}
|
|
|
|
if timestamp <= req.Until {
|
|
req.ResponseWriter.FeedNoSend(timestamp, value)
|
|
if timestamp < req.Since {
|
|
req.ResultCh <- rangeScanResult{
|
|
ResultCode: QueryDone,
|
|
}
|
|
return false
|
|
}
|
|
}
|
|
}
|
|
|
|
if metric.LastPageNo > 0 {
|
|
req.ResultCh <- rangeScanResult{
|
|
ResultCode: UntilFound,
|
|
LastPageNo: metric.LastPageNo,
|
|
FracDigits: metric.FracDigits,
|
|
}
|
|
return true
|
|
} else {
|
|
req.ResultCh <- rangeScanResult{
|
|
ResultCode: QueryDone,
|
|
}
|
|
return false
|
|
}
|
|
}
|
|
|
|
type tryFullScanReq struct {
|
|
MetricID uint32
|
|
MetricType diploma.MetricType
|
|
ResponseWriter diploma.WorkerMeasureConsumer
|
|
ResultCh chan fullScanResult
|
|
}
|
|
|
|
func (s *Database) tryFullScan(req tryFullScanReq) {
|
|
metric, ok := s.metrics[req.MetricID]
|
|
if !ok {
|
|
req.ResultCh <- fullScanResult{
|
|
ResultCode: NoMetric,
|
|
}
|
|
return
|
|
}
|
|
if metric.MetricType != req.MetricType {
|
|
req.ResultCh <- fullScanResult{
|
|
ResultCode: WrongMetricType,
|
|
}
|
|
return
|
|
}
|
|
|
|
lockEntry, ok := s.metricLockEntries[req.MetricID]
|
|
if ok {
|
|
if lockEntry.XLock {
|
|
lockEntry.WaitQueue = append(lockEntry.WaitQueue, req)
|
|
return
|
|
}
|
|
}
|
|
|
|
if s.startFullScan(metric, req) {
|
|
if lockEntry != nil {
|
|
lockEntry.RLocks++
|
|
} else {
|
|
s.metricLockEntries[req.MetricID] = &metricLockEntry{
|
|
RLocks: 1,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (*Database) startFullScan(metric *_metric, req tryFullScanReq) bool {
|
|
if metric.Since == 0 {
|
|
req.ResultCh <- fullScanResult{
|
|
ResultCode: QueryDone,
|
|
}
|
|
return false
|
|
}
|
|
|
|
timestampDecompressor := chunkenc.NewReverseTimeDeltaOfDeltaDecompressor(
|
|
metric.TimestampsBuf,
|
|
metric.Timestamps.Size(),
|
|
)
|
|
|
|
var valueDecompressor diploma.ValueDecompressor
|
|
if metric.MetricType == diploma.Cumulative {
|
|
valueDecompressor = chunkenc.NewReverseCumulativeDeltaDecompressor(
|
|
metric.ValuesBuf,
|
|
metric.Values.Size(),
|
|
metric.FracDigits,
|
|
)
|
|
} else {
|
|
valueDecompressor = chunkenc.NewReverseInstantDeltaDecompressor(
|
|
metric.ValuesBuf,
|
|
metric.Values.Size(),
|
|
metric.FracDigits,
|
|
)
|
|
}
|
|
for {
|
|
timestamp, done := timestampDecompressor.NextValue()
|
|
if done {
|
|
break
|
|
}
|
|
value, done := valueDecompressor.NextValue()
|
|
if done {
|
|
diploma.Abort(diploma.HasTimestampNoValueBug, ErrNoValueBug)
|
|
}
|
|
req.ResponseWriter.FeedNoSend(timestamp, value)
|
|
}
|
|
|
|
if metric.LastPageNo > 0 {
|
|
req.ResultCh <- fullScanResult{
|
|
ResultCode: UntilFound,
|
|
LastPageNo: metric.LastPageNo,
|
|
FracDigits: metric.FracDigits,
|
|
}
|
|
return true
|
|
} else {
|
|
req.ResultCh <- fullScanResult{
|
|
ResultCode: QueryDone,
|
|
}
|
|
return false
|
|
}
|
|
}
|
|
|
|
type tryListCurrentValuesReq struct {
|
|
MetricIDs []uint32
|
|
ResponseWriter *transform.CurrentValueWriter
|
|
ResultCh chan struct{}
|
|
}
|
|
|
|
func (s *Database) tryListCurrentValues(req tryListCurrentValuesReq) {
|
|
for _, metricID := range req.MetricIDs {
|
|
metric, ok := s.metrics[metricID]
|
|
if ok {
|
|
req.ResponseWriter.BufferValue(transform.CurrentValue{
|
|
MetricID: metricID,
|
|
Timestamp: metric.Until,
|
|
Value: metric.UntilValue,
|
|
})
|
|
}
|
|
}
|
|
req.ResultCh <- struct{}{}
|
|
}
|
|
|
|
///////////////////////////////////////////////////////
|
|
|
|
func (s *Database) applyChanges(req txlog.Changes) {
|
|
for _, untyped := range req.Records {
|
|
switch rec := untyped.(type) {
|
|
case txlog.AddedMetric:
|
|
s.addMetric(rec)
|
|
|
|
case txlog.DeletedMetric:
|
|
s.deleteMetric(rec)
|
|
|
|
case txlog.AppendedMeasure:
|
|
s.appendMeasure(rec)
|
|
|
|
case txlog.AppendedMeasuresExtended:
|
|
s.appendMeasures(rec)
|
|
|
|
case txlog.AppendedMeasureWithOverflowExtended:
|
|
s.appendMeasureAfterOverflow(rec)
|
|
|
|
case txlog.DeletedMeasures:
|
|
s.deleteMeasures(rec)
|
|
}
|
|
}
|
|
|
|
if req.ForceSnapshot || req.ExitWaitGroup != nil {
|
|
s.dumpSnapshot(req.LogNumber)
|
|
}
|
|
|
|
close(req.WaitCh)
|
|
|
|
if req.ExitWaitGroup != nil {
|
|
req.ExitWaitGroup.Done()
|
|
}
|
|
}
|
|
|
|
func (s *Database) addMetric(rec txlog.AddedMetric) {
|
|
_, ok := s.metrics[rec.MetricID]
|
|
if ok {
|
|
diploma.Abort(diploma.MetricAddedBug,
|
|
fmt.Errorf("addMetric: metric %d already added",
|
|
rec.MetricID))
|
|
}
|
|
|
|
lockEntry, ok := s.metricLockEntries[rec.MetricID]
|
|
if !ok {
|
|
diploma.Abort(diploma.NoLockEntryBug,
|
|
fmt.Errorf("addMetric: lockEntry not found for the metric %d",
|
|
rec.MetricID))
|
|
}
|
|
|
|
if !lockEntry.XLock {
|
|
diploma.Abort(diploma.NoXLockBug,
|
|
fmt.Errorf("addMetric: xlock not set for the metric %d",
|
|
rec.MetricID))
|
|
}
|
|
|
|
var (
|
|
values diploma.ValueCompressor
|
|
timestampsBuf = conbuf.New(nil)
|
|
valuesBuf = conbuf.New(nil)
|
|
)
|
|
|
|
if rec.MetricType == diploma.Cumulative {
|
|
values = chunkenc.NewReverseCumulativeDeltaCompressor(
|
|
valuesBuf, 0, byte(rec.FracDigits))
|
|
} else {
|
|
values = chunkenc.NewReverseInstantDeltaCompressor(
|
|
valuesBuf, 0, byte(rec.FracDigits))
|
|
}
|
|
|
|
s.metrics[rec.MetricID] = &_metric{
|
|
MetricType: rec.MetricType,
|
|
FracDigits: byte(rec.FracDigits),
|
|
TimestampsBuf: timestampsBuf,
|
|
ValuesBuf: valuesBuf,
|
|
Timestamps: chunkenc.NewReverseTimeDeltaOfDeltaCompressor(timestampsBuf, 0),
|
|
Values: values,
|
|
}
|
|
|
|
lockEntry.XLock = false
|
|
delete(s.metricLockEntries, rec.MetricID)
|
|
}
|
|
|
|
func (s *Database) deleteMetric(rec txlog.DeletedMetric) {
|
|
_, ok := s.metrics[rec.MetricID]
|
|
if !ok {
|
|
diploma.Abort(diploma.NoMetricBug,
|
|
fmt.Errorf("deleteMetric: metric %d not found",
|
|
rec.MetricID))
|
|
}
|
|
|
|
lockEntry, ok := s.metricLockEntries[rec.MetricID]
|
|
if !ok {
|
|
diploma.Abort(diploma.NoLockEntryBug,
|
|
fmt.Errorf("deleteMetric: lockEntry not found for the metric %d",
|
|
rec.MetricID))
|
|
}
|
|
|
|
if !lockEntry.XLock {
|
|
diploma.Abort(diploma.NoXLockBug,
|
|
fmt.Errorf("deleteMetric: xlock not set for the metric %d",
|
|
rec.MetricID))
|
|
}
|
|
|
|
var addMetricReqs []tryAddMetricReq
|
|
|
|
if len(lockEntry.WaitQueue) > 0 {
|
|
for _, untyped := range lockEntry.WaitQueue {
|
|
switch req := untyped.(type) {
|
|
case tryAppendMeasureReq:
|
|
req.ResultCh <- tryAppendMeasureResult{
|
|
MetricID: req.MetricID,
|
|
ResultCode: NoMetric,
|
|
}
|
|
|
|
case tryRangeScanReq:
|
|
req.ResultCh <- rangeScanResult{
|
|
ResultCode: NoMetric,
|
|
}
|
|
|
|
case tryFullScanReq:
|
|
req.ResultCh <- fullScanResult{
|
|
ResultCode: NoMetric,
|
|
}
|
|
|
|
case tryAddMetricReq:
|
|
addMetricReqs = append(addMetricReqs, req)
|
|
|
|
case tryDeleteMetricReq:
|
|
req.ResultCh <- tryDeleteMetricResult{
|
|
ResultCode: NoMetric,
|
|
}
|
|
|
|
case tryDeleteMeasuresReq:
|
|
req.ResultCh <- tryDeleteMeasuresResult{
|
|
ResultCode: NoMetric,
|
|
}
|
|
|
|
case tryGetMetricReq:
|
|
req.ResultCh <- Metric{
|
|
ResultCode: NoMetric,
|
|
}
|
|
|
|
default:
|
|
diploma.Abort(diploma.UnknownMetricWaitQueueItemBug,
|
|
fmt.Errorf("bug: unknown metric wait queue item type %T", req))
|
|
}
|
|
}
|
|
}
|
|
delete(s.metrics, rec.MetricID)
|
|
delete(s.metricLockEntries, rec.MetricID)
|
|
|
|
if len(rec.FreeDataPages) > 0 {
|
|
s.dataFreeList.AddPages(rec.FreeDataPages)
|
|
}
|
|
if len(rec.FreeIndexPages) > 0 {
|
|
s.indexFreeList.AddPages(rec.FreeIndexPages)
|
|
}
|
|
|
|
if len(addMetricReqs) > 0 {
|
|
s.processTryAddMetricReqsImmediatelyAfterDelete(addMetricReqs)
|
|
}
|
|
}
|
|
|
|
func (s *Database) deleteMeasures(rec txlog.DeletedMeasures) {
|
|
metric, ok := s.metrics[rec.MetricID]
|
|
if !ok {
|
|
diploma.Abort(diploma.NoMetricBug,
|
|
fmt.Errorf("deleteMeasures: metric %d not found",
|
|
rec.MetricID))
|
|
}
|
|
|
|
lockEntry, ok := s.metricLockEntries[rec.MetricID]
|
|
if !ok {
|
|
diploma.Abort(diploma.NoLockEntryBug,
|
|
fmt.Errorf("deleteMeasures: lockEntry not found for the metric %d",
|
|
rec.MetricID))
|
|
}
|
|
|
|
if !lockEntry.XLock {
|
|
diploma.Abort(diploma.NoXLockBug,
|
|
fmt.Errorf("deleteMeasures: xlock not set for the metric %d",
|
|
rec.MetricID))
|
|
}
|
|
metric.DeleteMeasures()
|
|
lockEntry.XLock = false
|
|
if len(rec.FreeDataPages) > 0 {
|
|
s.dataFreeList.AddPages(rec.FreeDataPages)
|
|
}
|
|
if len(rec.FreeDataPages) > 0 {
|
|
s.indexFreeList.AddPages(rec.FreeIndexPages)
|
|
}
|
|
s.doAfterReleaseXLock(rec.MetricID, metric, lockEntry)
|
|
}
|
|
|
|
func (s *Database) doAfterReleaseXLock(metricID uint32, metric *_metric, lockEntry *metricLockEntry) {
|
|
if len(lockEntry.WaitQueue) == 0 {
|
|
delete(s.metricLockEntries, metricID)
|
|
} else {
|
|
s.processMetricQueue(metricID, metric, lockEntry)
|
|
}
|
|
}
|
|
|