спеціалізована СУБД для зберігання та обробки показань датчиків та лічильників
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

897 lines
22 KiB

package database
import (
"errors"
"fmt"
"io"
"net"
"gordenko.dev/dima/diploma"
"gordenko.dev/dima/diploma/atree"
"gordenko.dev/dima/diploma/bin"
"gordenko.dev/dima/diploma/bufreader"
"gordenko.dev/dima/diploma/chunkenc"
"gordenko.dev/dima/diploma/conbuf"
"gordenko.dev/dima/diploma/proto"
"gordenko.dev/dima/diploma/transform"
"gordenko.dev/dima/diploma/txlog"
)
var (
ErrNoValueBug = errors.New("has timestamp but no value")
ErrWrongResultCodeBug = errors.New("bug: wrong result code")
successMsg = []byte{
proto.RespSuccess,
}
)
func reply(conn io.Writer, errcode uint16) {
var answer []byte
if errcode == 0 {
answer = successMsg
} else {
answer = []byte{
proto.RespError,
0, 0,
}
bin.PutUint16(answer[1:], errcode)
}
_, err := conn.Write(answer)
if err != nil {
return
}
}
func (s *Database) handleTCPConn(conn net.Conn) {
defer conn.Close()
r := bufreader.New(conn, 128)
for {
err := s.processRequest(conn, r)
if err != nil {
if err != io.EOF {
s.logger.Println(err)
}
return
}
}
}
func (s *Database) processRequest(conn net.Conn, r *bufreader.BufferedReader) (err error) {
messageType, err := r.ReadByte()
if err != nil {
if err != io.EOF {
return fmt.Errorf("read messageType: %s", err)
} else {
return err
}
}
switch messageType {
case proto.TypeGetMetric:
req, err := proto.ReadGetMetricReq(r)
if err != nil {
return fmt.Errorf("proto.ReadGetMetricReq: %s", err)
}
if err = s.GetMetric(conn, req); err != nil {
return fmt.Errorf("GetMetric: %s", err)
}
case proto.TypeAddMetric:
req, err := proto.ReadAddMetricReq(r)
if err != nil {
return fmt.Errorf("proto.ReadAddMetricReq: %s", err)
}
reply(conn, s.AddMetric(req))
case proto.TypeDeleteMetric:
req, err := proto.ReadDeleteMetricReq(r)
if err != nil {
return fmt.Errorf("proto.ReadDeleteMetricReq: %s", err)
}
reply(conn, s.DeleteMetric(req))
case proto.TypeAppendMeasure:
req, err := proto.ReadAppendMeasureReq(r)
if err != nil {
return fmt.Errorf("proto.ReadAppendMeasureReq: %s", err)
}
//fmt.Println("append measure", req.MetricID, conn.RemoteAddr().String())
reply(conn, s.AppendMeasure(req))
case proto.TypeAppendMeasures:
req, err := proto.ReadAppendMeasuresReq(r)
if err != nil {
return fmt.Errorf("proto.ReadAppendMeasuresReq: %s", err)
}
//fmt.Println("append measure", req.MetricID, conn.RemoteAddr().String())
reply(conn, s.AppendMeasures(req))
case proto.TypeListInstantMeasures:
req, err := proto.ReadListInstantMeasuresReq(r)
if err != nil {
return fmt.Errorf("proto.ReadListInstantMeasuresReq: %s", err)
}
if err = s.ListInstantMeasures(conn, req); err != nil {
return fmt.Errorf("ListInstantMeasures: %s", err)
}
case proto.TypeListCumulativeMeasures:
req, err := proto.ReadListCumulativeMeasuresReq(r)
if err != nil {
return fmt.Errorf("proto.ReadListCumulativeMeasuresReq: %s", err)
}
if err = s.ListCumulativeMeasures(conn, req); err != nil {
return fmt.Errorf("ListCumulativeMeasures: %s", err)
}
case proto.TypeListInstantPeriods:
req, err := proto.ReadListInstantPeriodsReq(r)
if err != nil {
return fmt.Errorf("proto.ReadListInstantPeriodsReq: %s", err)
}
if err = s.ListInstantPeriods(conn, req); err != nil {
return fmt.Errorf("ListInstantPeriods: %s", err)
}
case proto.TypeListCumulativePeriods:
req, err := proto.ReadListCumulativePeriodsReq(r)
if err != nil {
return fmt.Errorf("proto.ReadListCumulativePeriodsReq: %s", err)
}
if err = s.ListCumulativePeriods(conn, req); err != nil {
return fmt.Errorf("ListCumulativePeriods: %s", err)
}
case proto.TypeListCurrentValues:
req, err := proto.ReadListCurrentValuesReq(r)
if err != nil {
return fmt.Errorf("proto.ListCurrentValuesReq: %s", err)
}
if err = s.ListCurrentValues(conn, req); err != nil {
return fmt.Errorf("ListCurrentValues: %s", err)
}
case proto.TypeDeleteMeasures:
req, err := proto.ReadDeleteMeasuresReq(r)
if err != nil {
return fmt.Errorf("proto.ReadDeleteMeasuresReq: %s", err)
}
reply(conn, s.DeleteMeasures(req))
case proto.TypeListAllInstantMeasures:
req, err := proto.ReadListAllInstantMeasuresReq(r)
if err != nil {
return fmt.Errorf("proto.ReadListAllInstantMeasuresReq: %s", err)
}
if err = s.ListAllInstantMeasures(conn, req); err != nil {
return fmt.Errorf("ListAllInstantMeasures: %s", err)
}
case proto.TypeListAllCumulativeMeasures:
req, err := proto.ReadListAllCumulativeMeasuresReq(r)
if err != nil {
return fmt.Errorf("proto.ReadListAllCumulativeMeasuresReq: %s", err)
}
if err = s.ListAllCumulativeMeasures(conn, req); err != nil {
return fmt.Errorf("ListAllCumulativeMeasures: %s", err)
}
default:
return fmt.Errorf("unknown messageType: %d", messageType)
}
return
}
// API
func (s *Database) AddMetric(req proto.AddMetricReq) uint16 {
// Валидация
if req.MetricID == 0 {
return proto.ErrEmptyMetricID
}
if byte(req.FracDigits) > diploma.MaxFracDigits {
return proto.ErrWrongFracDigits
}
switch req.MetricType {
case diploma.Cumulative, diploma.Instant:
// ok
default:
return proto.ErrWrongMetricType
}
resultCh := make(chan byte, 1)
s.appendJobToWorkerQueue(tryAddMetricReq{
MetricID: req.MetricID,
ResultCh: resultCh,
})
resultCode := <-resultCh
switch resultCode {
case Succeed:
waitCh := s.txlog.WriteAddedMetric(txlog.AddedMetric{
MetricID: req.MetricID,
MetricType: req.MetricType,
FracDigits: req.FracDigits,
})
<-waitCh
case MetricDuplicate:
return proto.ErrDuplicate
default:
diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug)
}
return 0
}
type Metric struct {
MetricType diploma.MetricType
FracDigits byte
ResultCode byte
}
func (s *Database) GetMetric(conn io.Writer, req proto.GetMetricReq) error {
resultCh := make(chan Metric, 1)
s.appendJobToWorkerQueue(tryGetMetricReq{
MetricID: req.MetricID,
ResultCh: resultCh,
})
result := <-resultCh
switch result.ResultCode {
case Succeed:
answer := []byte{
proto.RespValue,
0, 0, 0, 0, // metricID
byte(result.MetricType),
result.FracDigits,
}
bin.PutUint32(answer[1:], req.MetricID)
_, err := conn.Write(answer)
if err != nil {
return err
}
case NoMetric:
reply(conn, proto.ErrNoMetric)
default:
diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug)
}
return nil
}
type tryDeleteMetricResult struct {
ResultCode byte
RootPageNo uint32
}
func (s *Database) DeleteMetric(req proto.DeleteMetricReq) uint16 {
resultCh := make(chan tryDeleteMetricResult, 1)
s.appendJobToWorkerQueue(tryDeleteMetricReq{
MetricID: req.MetricID,
ResultCh: resultCh,
})
result := <-resultCh
switch result.ResultCode {
case Succeed:
var (
freeDataPages []uint32
freeIndexPages []uint32
)
if result.RootPageNo > 0 {
pageLists, err := s.atree.GetAllPages(result.RootPageNo)
if err != nil {
diploma.Abort(diploma.FailedAtreeRequest, err)
}
freeDataPages = pageLists.DataPages
freeIndexPages = pageLists.IndexPages
}
waitCh := s.txlog.WriteDeletedMetric(txlog.DeletedMetric{
MetricID: req.MetricID,
FreeDataPages: freeDataPages,
FreeIndexPages: freeIndexPages,
})
<-waitCh
case NoMetric:
return proto.ErrNoMetric
default:
diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug)
}
return 0
}
type FilledPage struct {
Since uint32
RootPageNo uint32
PrevPageNo uint32
TimestampsChunks [][]byte
TimestampsSize uint16
ValuesChunks [][]byte
ValuesSize uint16
}
type tryAppendMeasureResult struct {
MetricID uint32
Timestamp uint32
Value float64
FilledPage *FilledPage
ResultCode byte
}
func (s *Database) AppendMeasure(req proto.AppendMeasureReq) uint16 {
resultCh := make(chan tryAppendMeasureResult, 1)
s.appendJobToWorkerQueue(tryAppendMeasureReq{
MetricID: req.MetricID,
Timestamp: req.Timestamp,
Value: req.Value,
ResultCh: resultCh,
})
result := <-resultCh
switch result.ResultCode {
case CanAppend:
waitCh := s.txlog.WriteAppendMeasure(txlog.AppendedMeasure{
MetricID: req.MetricID,
Timestamp: req.Timestamp,
Value: req.Value,
})
<-waitCh
case NewPage:
filled := result.FilledPage
report, err := s.atree.AppendDataPage(atree.AppendDataPageReq{
MetricID: req.MetricID,
Timestamp: req.Timestamp,
Value: req.Value,
Since: filled.Since,
RootPageNo: filled.RootPageNo,
PrevPageNo: filled.PrevPageNo,
TimestampsChunks: filled.TimestampsChunks,
TimestampsSize: filled.TimestampsSize,
ValuesChunks: filled.ValuesChunks,
ValuesSize: filled.ValuesSize,
})
if err != nil {
diploma.Abort(diploma.WriteToAtreeFailed, err)
}
waitCh := s.txlog.WriteAppendedMeasureWithOverflow(
txlog.AppendedMeasureWithOverflow{
MetricID: req.MetricID,
Timestamp: req.Timestamp,
Value: req.Value,
IsDataPageReused: report.IsDataPageReused,
DataPageNo: report.DataPageNo,
IsRootChanged: report.IsRootChanged,
RootPageNo: report.NewRootPageNo,
ReusedIndexPages: report.ReusedIndexPages,
},
report.FileName,
false,
)
<-waitCh
case NoMetric:
return proto.ErrNoMetric
case ExpiredMeasure:
return proto.ErrExpiredMeasure
case NonMonotonicValue:
return proto.ErrNonMonotonicValue
default:
diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug)
}
return 0
}
type tryAppendMeasuresResult struct {
ResultCode byte
MetricType diploma.MetricType
FracDigits byte
Since uint32
Until uint32
UntilValue float64
RootPageNo uint32
PrevPageNo uint32
TimestampsBuf *conbuf.ContinuousBuffer
ValuesBuf *conbuf.ContinuousBuffer
Timestamps diploma.TimestampCompressor
Values diploma.ValueCompressor
}
func (s *Database) AppendMeasures(req proto.AppendMeasuresReq) uint16 {
resultCh := make(chan tryAppendMeasuresResult, 1)
s.appendJobToWorkerQueue(tryAppendMeasuresReq{
MetricID: req.MetricID,
ResultCh: resultCh,
})
result := <-resultCh
switch result.ResultCode {
case CanAppend:
var (
rootPageNo = result.RootPageNo
prevPageNo = result.PrevPageNo
timestampsBuf = result.TimestampsBuf
valuesBuf = result.ValuesBuf
timestamps = result.Timestamps
values = result.Values
since = result.Since
until = result.Until
untilValue = result.UntilValue
//
toAppendMeasures []proto.Measure
)
for idx, measure := range req.Measures {
if since == 0 {
since = measure.Timestamp
} else {
if measure.Timestamp <= until {
if len(toAppendMeasures) > 0 {
waitCh := s.txlog.WriteAppendMeasures(
txlog.AppendedMeasures{
MetricID: req.MetricID,
Measures: toAppendMeasures,
},
false,
)
<-waitCh
}
return proto.ErrExpiredMeasure
}
if result.MetricType == diploma.Cumulative && measure.Value < untilValue {
if len(toAppendMeasures) > 0 {
waitCh := s.txlog.WriteAppendMeasures(
txlog.AppendedMeasures{
MetricID: req.MetricID,
Measures: toAppendMeasures,
},
false,
)
<-waitCh
}
return proto.ErrNonMonotonicValue
}
}
extraSpace := timestamps.CalcRequiredSpace(measure.Timestamp) +
values.CalcRequiredSpace(measure.Value)
totalSpace := timestamps.Size() + values.Size() + extraSpace
if totalSpace <= atree.DataPagePayloadSize {
timestamps.Append(measure.Timestamp)
values.Append(measure.Value)
toAppendMeasures = append(toAppendMeasures, measure)
} else {
if len(toAppendMeasures) > 0 {
waitCh := s.txlog.WriteAppendMeasures(
txlog.AppendedMeasures{
MetricID: req.MetricID,
Measures: toAppendMeasures,
},
true,
)
<-waitCh
toAppendMeasures = nil
}
//fmt.Printf("APPEND DATA PAGE %d, %v\n", measure.Timestamp, measure.Value)
report, err := s.atree.AppendDataPage(atree.AppendDataPageReq{
MetricID: req.MetricID,
Timestamp: measure.Timestamp,
Value: measure.Value,
Since: since,
RootPageNo: rootPageNo,
PrevPageNo: prevPageNo,
TimestampsChunks: timestampsBuf.Chunks(),
TimestampsSize: uint16(timestamps.Size()),
ValuesChunks: valuesBuf.Chunks(),
ValuesSize: uint16(values.Size()),
})
if err != nil {
diploma.Abort(diploma.WriteToAtreeFailed, err)
}
prevPageNo = report.DataPageNo
if report.IsRootChanged {
rootPageNo = report.NewRootPageNo
}
waitCh := s.txlog.WriteAppendedMeasureWithOverflow(
txlog.AppendedMeasureWithOverflow{
MetricID: req.MetricID,
Timestamp: measure.Timestamp,
Value: measure.Value,
IsDataPageReused: report.IsDataPageReused,
DataPageNo: report.DataPageNo,
IsRootChanged: report.IsRootChanged,
RootPageNo: report.NewRootPageNo,
ReusedIndexPages: report.ReusedIndexPages,
},
report.FileName,
(idx+1) < len(req.Measures),
)
<-waitCh
timestampsBuf = conbuf.New(nil)
valuesBuf = conbuf.New(nil)
timestamps = chunkenc.NewReverseTimeDeltaOfDeltaCompressor(
timestampsBuf, 0)
if result.MetricType == diploma.Cumulative {
values = chunkenc.NewReverseCumulativeDeltaCompressor(
valuesBuf, 0, result.FracDigits)
} else {
values = chunkenc.NewReverseInstantDeltaCompressor(
valuesBuf, 0, result.FracDigits)
}
timestamps.Append(measure.Timestamp)
values.Append(measure.Value)
since = measure.Timestamp
}
until = measure.Timestamp
untilValue = measure.Value
}
if len(toAppendMeasures) > 0 {
waitCh := s.txlog.WriteAppendMeasures(
txlog.AppendedMeasures{
MetricID: req.MetricID,
Measures: toAppendMeasures,
},
false,
)
<-waitCh
}
case NoMetric:
return proto.ErrNoMetric
default:
diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug)
}
return 0
}
type tryDeleteMeasuresResult struct {
ResultCode byte
RootPageNo uint32
}
func (s *Database) DeleteMeasures(req proto.DeleteMeasuresReq) uint16 {
resultCh := make(chan tryDeleteMeasuresResult, 1)
s.appendJobToWorkerQueue(tryDeleteMeasuresReq{
MetricID: req.MetricID,
Since: req.Since,
ResultCh: resultCh,
})
result := <-resultCh
switch result.ResultCode {
case NoMeasuresToDelete:
// ok
case DeleteFromAtreeNotNeeded:
// регистрирую удаление в TransactionLog
waitCh := s.txlog.WriteDeletedMeasures(txlog.DeletedMeasures{
MetricID: req.MetricID,
})
<-waitCh
case DeleteFromAtreeRequired:
// собираю номера всех data и index страниц метрики (типа запись REDO лога).
pageLists, err := s.atree.GetAllPages(result.RootPageNo)
if err != nil {
diploma.Abort(diploma.FailedAtreeRequest, err)
}
// регистрирую удаление в TransactionLog
waitCh := s.txlog.WriteDeletedMeasures(txlog.DeletedMeasures{
MetricID: req.MetricID,
FreeDataPages: pageLists.DataPages,
FreeIndexPages: pageLists.IndexPages,
})
<-waitCh
case NoMetric:
return proto.ErrNoMetric
default:
diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug)
}
return 0
}
// SELECT
type fullScanResult struct {
ResultCode byte
FracDigits byte
LastPageNo uint32
}
func (s *Database) ListAllInstantMeasures(conn net.Conn, req proto.ListAllInstantMetricMeasuresReq) error {
responseWriter := transform.NewInstantMeasureWriter(conn, 0)
return s.fullScan(fullScanReq{
MetricID: req.MetricID,
MetricType: diploma.Instant,
Conn: conn,
ResponseWriter: responseWriter,
})
}
func (s *Database) ListAllCumulativeMeasures(conn io.Writer, req proto.ListAllCumulativeMeasuresReq) error {
responseWriter := transform.NewCumulativeMeasureWriter(conn, 0)
return s.fullScan(fullScanReq{
MetricID: req.MetricID,
MetricType: diploma.Cumulative,
Conn: conn,
ResponseWriter: responseWriter,
})
}
func (s *Database) ListInstantMeasures(conn net.Conn, req proto.ListInstantMeasuresReq) error {
if req.Since > req.Until {
reply(conn, proto.ErrInvalidRange)
return nil
}
responseWriter := transform.NewInstantMeasureWriter(conn, req.Since)
return s.rangeScan(rangeScanReq{
MetricID: req.MetricID,
MetricType: diploma.Instant,
Since: req.Since,
Until: req.Until,
Conn: conn,
ResponseWriter: responseWriter,
})
}
func (s *Database) ListCumulativeMeasures(conn net.Conn, req proto.ListCumulativeMeasuresReq) error {
if req.Since > req.Until {
reply(conn, proto.ErrInvalidRange)
return nil
}
responseWriter := transform.NewCumulativeMeasureWriter(conn, req.Since)
return s.rangeScan(rangeScanReq{
MetricID: req.MetricID,
MetricType: diploma.Cumulative,
Since: req.Since,
Until: req.Until,
Conn: conn,
ResponseWriter: responseWriter,
})
}
type rangeScanResult struct {
ResultCode byte
FracDigits byte
RootPageNo uint32
LastPageNo uint32
}
func (s *Database) ListInstantPeriods(conn net.Conn, req proto.ListInstantPeriodsReq) error {
since, until := timeBoundsOfAggregation(req.Since, req.Until, req.GroupBy, req.FirstHourOfDay)
if since.After(until) {
reply(conn, proto.ErrInvalidRange)
return nil
}
responseWriter, err := transform.NewInstantPeriodsWriter(transform.InstantPeriodsWriterOptions{
Dst: conn,
GroupBy: req.GroupBy,
Since: uint32(since.Unix()),
AggregateFuncs: req.AggregateFuncs,
FirstHourOfDay: req.FirstHourOfDay,
})
if err != nil {
reply(conn, proto.ErrUnexpected)
return nil
}
return s.rangeScan(rangeScanReq{
MetricID: req.MetricID,
MetricType: diploma.Instant,
Since: uint32(since.Unix()),
Until: uint32(until.Unix()),
Conn: conn,
ResponseWriter: responseWriter,
})
}
func (s *Database) ListCumulativePeriods(conn net.Conn, req proto.ListCumulativePeriodsReq) error {
since, until := timeBoundsOfAggregation(req.Since, req.Until, req.GroupBy, req.FirstHourOfDay)
if since.After(until) {
reply(conn, proto.ErrInvalidRange)
return nil
}
responseWriter, err := transform.NewCumulativePeriodsWriter(transform.CumulativePeriodsWriterOptions{
Dst: conn,
GroupBy: req.GroupBy,
Since: uint32(since.Unix()),
FirstHourOfDay: req.FirstHourOfDay,
})
if err != nil {
reply(conn, proto.ErrUnexpected)
return nil
}
return s.rangeScan(rangeScanReq{
MetricID: req.MetricID,
MetricType: diploma.Cumulative,
Since: uint32(since.Unix()),
Until: uint32(until.Unix()),
Conn: conn,
ResponseWriter: responseWriter,
})
}
type rangeScanReq struct {
MetricID uint32
MetricType diploma.MetricType
Since uint32
Until uint32
Conn io.Writer
ResponseWriter diploma.MeasureConsumer
}
func (s *Database) rangeScan(req rangeScanReq) error {
resultCh := make(chan rangeScanResult, 1)
s.appendJobToWorkerQueue(tryRangeScanReq{
MetricID: req.MetricID,
Since: req.Since,
Until: req.Until,
MetricType: req.MetricType,
ResponseWriter: req.ResponseWriter,
ResultCh: resultCh,
})
result := <-resultCh
switch result.ResultCode {
case QueryDone:
req.ResponseWriter.Close()
case UntilFound:
err := s.atree.ContinueRangeScan(atree.ContinueRangeScanReq{
MetricType: req.MetricType,
FracDigits: result.FracDigits,
ResponseWriter: req.ResponseWriter,
LastPageNo: result.LastPageNo,
Since: req.Since,
})
s.metricRUnlock(req.MetricID)
if err != nil {
reply(req.Conn, proto.ErrUnexpected)
} else {
req.ResponseWriter.Close()
}
case UntilNotFound:
err := s.atree.RangeScan(atree.RangeScanReq{
MetricType: req.MetricType,
FracDigits: result.FracDigits,
ResponseWriter: req.ResponseWriter,
RootPageNo: result.RootPageNo,
Since: req.Since,
Until: req.Until,
})
s.metricRUnlock(req.MetricID)
if err != nil {
reply(req.Conn, proto.ErrUnexpected)
} else {
req.ResponseWriter.Close()
}
case NoMetric:
reply(req.Conn, proto.ErrNoMetric)
case WrongMetricType:
reply(req.Conn, proto.ErrWrongMetricType)
default:
diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug)
}
return nil
}
type fullScanReq struct {
MetricID uint32
MetricType diploma.MetricType
Conn io.Writer
ResponseWriter diploma.MeasureConsumer
}
func (s *Database) fullScan(req fullScanReq) error {
resultCh := make(chan fullScanResult, 1)
s.appendJobToWorkerQueue(tryFullScanReq{
MetricID: req.MetricID,
MetricType: req.MetricType,
ResponseWriter: req.ResponseWriter,
ResultCh: resultCh,
})
result := <-resultCh
switch result.ResultCode {
case QueryDone:
req.ResponseWriter.Close()
case UntilFound:
err := s.atree.ContinueFullScan(atree.ContinueFullScanReq{
MetricType: req.MetricType,
FracDigits: result.FracDigits,
ResponseWriter: req.ResponseWriter,
LastPageNo: result.LastPageNo,
})
s.metricRUnlock(req.MetricID)
if err != nil {
reply(req.Conn, proto.ErrUnexpected)
} else {
req.ResponseWriter.Close()
}
case NoMetric:
reply(req.Conn, proto.ErrNoMetric)
case WrongMetricType:
reply(req.Conn, proto.ErrWrongMetricType)
default:
diploma.Abort(diploma.WrongResultCodeBug, ErrWrongResultCodeBug)
}
return nil
}
func (s *Database) ListCurrentValues(conn net.Conn, req proto.ListCurrentValuesReq) error {
responseWriter := transform.NewCurrentValueWriter(conn)
defer responseWriter.Close()
resultCh := make(chan struct{})
s.appendJobToWorkerQueue(tryListCurrentValuesReq{
MetricIDs: req.MetricIDs,
ResponseWriter: responseWriter,
ResultCh: resultCh,
})
<-resultCh
return nil
}