Dima Gordenko 2 months ago
parent 84ed171fdf
commit c6897c5b76
  1. 4
      atree/atree.go
  2. 18
      atree/select.go
  3. 15
      atree/writers.go
  4. 109
      client/client.go
  5. 19
      database/api.go
  6. 15
      database/helpers.go
  7. 44
      database/proc.go
  8. BIN
      database_linux
  9. 14
      diploma.go
  10. 115
      examples/requests/requests.go
  11. 1
      go.mod
  12. 2
      go.sum
  13. BIN
      loadtest_linux
  14. 14
      proto/proto.go
  15. BIN
      requests_linux
  16. BIN
      testdir/12.changes
  17. BIN
      testdir/12.snapshot
  18. BIN
      testdir/test.data
  19. BIN
      testdir/test.index
  20. 63
      transform/aggregate.go
  21. 6
      transform/raw.go
  22. 16
      transform/responder.go

@ -378,10 +378,6 @@ func (s *Atree) AppendDataPage(req AppendDataPageReq) (_ redo.Report, err error)
return
}
// На данний момен схема - наступна. Всі сторінки - data та index - зафіксовані в кеші.
// Отже запис на диск пройде максимально швидко. Після цього ReferenceCount кожної
// сторінки зменшиться на 1. Оскільки на метрику утримується XLock, сторінки мають
// ReferenceCount = 1 (немає інших читачів).
waitCh := make(chan struct{})
task := WriteTask{

@ -7,8 +7,9 @@ import (
)
type ContinueFullScanReq struct {
MetricType diploma.MetricType
FracDigits byte
ResponseWriter AtreeMeasureConsumer
ResponseWriter diploma.AtreeMeasureConsumer
LastPageNo uint32
}
@ -23,7 +24,7 @@ func (s *Atree) ContinueFullScan(req ContinueFullScanReq) error {
PageData: buf,
Atree: s,
FracDigits: req.FracDigits,
MetricType: diploma.Instant,
MetricType: req.MetricType,
})
if err != nil {
return err
@ -43,8 +44,9 @@ func (s *Atree) ContinueFullScan(req ContinueFullScanReq) error {
}
type ContinueRangeScanReq struct {
MetricType diploma.MetricType
FracDigits byte
ResponseWriter AtreeMeasureConsumer
ResponseWriter diploma.AtreeMeasureConsumer
LastPageNo uint32
Since uint32
}
@ -60,7 +62,7 @@ func (s *Atree) ContinueRangeScan(req ContinueRangeScanReq) error {
PageData: buf,
Atree: s,
FracDigits: req.FracDigits,
MetricType: diploma.Instant,
MetricType: req.MetricType,
})
if err != nil {
return err
@ -83,8 +85,9 @@ func (s *Atree) ContinueRangeScan(req ContinueRangeScanReq) error {
}
type RangeScanReq struct {
MetricType diploma.MetricType
FracDigits byte
ResponseWriter AtreeMeasureConsumer
ResponseWriter diploma.AtreeMeasureConsumer
RootPageNo uint32
Since uint32
Until uint32
@ -101,7 +104,7 @@ func (s *Atree) RangeScan(req RangeScanReq) error {
PageData: buf,
Atree: s,
FracDigits: req.FracDigits,
MetricType: diploma.Instant,
MetricType: req.MetricType,
})
if err != nil {
return err
@ -117,11 +120,12 @@ func (s *Atree) RangeScan(req RangeScanReq) error {
return nil
}
//fmt.Printf("atree range scan: %s, %v\n", time.Unix(int64(timestamp), 0).Format("2006-01-02 15:04:05"), value)
if timestamp <= req.Until {
req.ResponseWriter.Feed(timestamp, value)
if timestamp < req.Since {
// - записи, удовлетворяющие временным рамкам, закончились.
return nil
}
}

@ -1,15 +0,0 @@
package atree
type PeriodsWriter interface {
Feed(uint32, float64)
FeedNoSend(uint32, float64)
Close() error
}
type WorkerMeasureConsumer interface {
FeedNoSend(uint32, float64)
}
type AtreeMeasureConsumer interface {
Feed(uint32, float64)
}

@ -81,21 +81,6 @@ func (s *Connection) AddMetric(req proto.AddMetricReq) error {
return s.mustSuccess(s.src)
}
//
// func (s *Connection) UpdateMetric(req Metric) error {
// arr := []byte{
// proto.TypeUpdateMetric,
// 0, 0, 0, 0, //
// byte(req.FracDigits),
// }
// bin.PutUint32(arr[1:], req.MetricID)
// if _, err := s.conn.Write(arr); err != nil {
// return err
// }
// return s.mustSuccess(s.src)
// }
func (s *Connection) GetMetric(metricID uint32) (*proto.Metric, error) {
arr := []byte{
proto.TypeGetMetric,
@ -184,73 +169,9 @@ func (s *Connection) AppendMeasures(req proto.AppendMeasuresReq) (err error) {
if _, err := s.conn.Write(arr); err != nil {
return err
}
//fmt.Printf("encode measures: %d\n", len(req.Measures))
return s.mustSuccess(s.src)
}
// type AppendMeasurePerMetricReq struct {
// MetricID uint32
// Measures []Measure
// }
func (s *Connection) AppendMeasurePerMetric(list []proto.MetricMeasure) (_ []proto.AppendError, err error) {
if len(list) > 65535 {
return nil, fmt.Errorf("wrong measures qty: %d", len(list))
}
var (
// 3 bytes: 1b message type + 2b records qty
fixedSize = 3
recordSize = 16
arr = make([]byte, fixedSize+len(list)*recordSize)
)
arr[0] = proto.TypeAppendMeasures
bin.PutUint16(arr[1:], uint16(len(list)))
pos := fixedSize
for _, item := range list {
bin.PutUint32(arr[pos:], item.MetricID)
bin.PutUint32(arr[pos+4:], item.Timestamp)
bin.PutFloat64(arr[pos+8:], item.Value)
pos += recordSize
}
if _, err := s.conn.Write(arr); err != nil {
return nil, err
}
code, err := s.src.ReadByte()
if err != nil {
return nil, fmt.Errorf("read response code: %s", err)
}
switch code {
case proto.RespValue:
var (
qty uint16
appendErrors []proto.AppendError
)
qty, err = bin.ReadUint16(s.src)
if err != nil {
return
}
for range qty {
var ae proto.AppendError
ae.MetricID, err = bin.ReadUint32(s.src)
if err != nil {
return
}
ae.ErrorCode, err = bin.ReadUint16(s.src)
if err != nil {
return
}
appendErrors = append(appendErrors, ae)
}
return appendErrors, nil
default:
return nil, fmt.Errorf("unknown reponse code %d", code)
}
}
func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]proto.InstantMeasure, error) {
arr := []byte{
proto.TypeListAllInstantMeasures,
@ -273,8 +194,6 @@ func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]proto.InstantMea
return nil, fmt.Errorf("read response code: %s", err)
}
//fmt.Printf("code: %d\n", code)
switch code {
case proto.RespPartOfValue:
q, err := bin.ReadUint32(s.src)
@ -282,8 +201,6 @@ func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]proto.InstantMea
return nil, fmt.Errorf("read records qty: %s", err)
}
//fmt.Printf("q: %d\n", q)
for i := range int(q) {
err = bin.ReadNInto(s.src, tmp)
if err != nil {
@ -387,8 +304,6 @@ func (s *Connection) ListAllCumulativeMeasures(metricID uint32) ([]proto.Cumulat
return nil, fmt.Errorf("read response code: %s", err)
}
//fmt.Printf("code: %d\n", code)
switch code {
case proto.RespPartOfValue:
q, err := bin.ReadUint32(s.src)
@ -402,19 +317,11 @@ func (s *Connection) ListAllCumulativeMeasures(metricID uint32) ([]proto.Cumulat
return nil, fmt.Errorf("read record #%d: %s", i, err)
}
//fmt.Printf("tmp: %d\n", tmp)
result = append(result, proto.CumulativeMeasure{
Timestamp: bin.GetUint32(tmp),
Value: bin.GetFloat64(tmp[4:]),
Total: bin.GetFloat64(tmp[12:]),
})
// pretty.PPrintln("measure", CumulativeMeasure{
// Timestamp: bin.GetUint32(tmp),
// Value: bin.GetFloat64(tmp[4:]),
// Total: bin.GetFloat64(tmp[12:]),
// })
}
case proto.RespEndOfValue:
@ -533,8 +440,6 @@ func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]prot
return nil, fmt.Errorf("read response code: %s", err)
}
//fmt.Printf("code: %d\n", code)
switch code {
case proto.RespPartOfValue:
q, err := bin.ReadUint32(s.src)
@ -551,8 +456,8 @@ func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]prot
var (
p = proto.InstantPeriod{
Period: bin.GetUint32(tmp[0:]),
Since: bin.GetUint32(tmp[4:]),
Until: bin.GetUint32(tmp[8:]),
Start: bin.GetUint32(tmp[4:]),
End: bin.GetUint32(tmp[8:]),
}
// 12 bytes - period, since, until
pos = 12
@ -629,11 +534,11 @@ func (s *Connection) ListCumulativePeriods(req proto.ListCumulativePeriodsReq) (
return nil, fmt.Errorf("read record #%d: %s", i, err)
}
result = append(result, proto.CumulativePeriod{
Period: bin.GetUint32(tmp[0:]),
Since: bin.GetUint32(tmp[4:]),
Until: bin.GetUint32(tmp[8:]),
EndValue: bin.GetFloat64(tmp[12:]),
Total: bin.GetFloat64(tmp[20:]),
Period: bin.GetUint32(tmp[0:]),
Start: bin.GetUint32(tmp[4:]),
End: bin.GetUint32(tmp[8:]),
StartValue: bin.GetFloat64(tmp[12:]),
EndValue: bin.GetFloat64(tmp[20:]),
})
}

@ -499,10 +499,11 @@ func (s *Database) AppendMeasures(req proto.AppendMeasuresReq) uint16 {
toAppendMeasures = nil
}
//fmt.Printf("APPEND DATA PAGE %d, %v\n", measure.Timestamp, measure.Value)
report, err := s.atree.AppendDataPage(atree.AppendDataPageReq{
MetricID: req.MetricID,
Timestamp: until,
Value: untilValue,
Timestamp: measure.Timestamp,
Value: measure.Value,
Since: since,
RootPageNo: rootPageNo,
PrevPageNo: prevPageNo,
@ -607,7 +608,7 @@ func (s *Database) DeleteMeasures(req proto.DeleteMeasuresReq) uint16 {
case DeleteFromAtreeRequired:
// собираю номера всех data и index страниц метрики (типа запись REDO лога).
pageLists, err := s.atree.GetAllPages(req.MetricID)
pageLists, err := s.atree.GetAllPages(result.RootPageNo)
if err != nil {
diploma.Abort(diploma.FailedAtreeRequest, err)
}
@ -670,7 +671,7 @@ func (s *Database) ListInstantMeasures(conn net.Conn, req proto.ListInstantMeasu
MetricID: req.MetricID,
MetricType: diploma.Instant,
Since: req.Since,
Until: req.Until - 1,
Until: req.Until,
Conn: conn,
ResponseWriter: responseWriter,
})
@ -688,7 +689,7 @@ func (s *Database) ListCumulativeMeasures(conn net.Conn, req proto.ListCumulativ
MetricID: req.MetricID,
MetricType: diploma.Cumulative,
Since: req.Since,
Until: req.Until - 1,
Until: req.Until,
Conn: conn,
ResponseWriter: responseWriter,
})
@ -739,6 +740,7 @@ func (s *Database) ListCumulativePeriods(conn net.Conn, req proto.ListCumulative
responseWriter, err := transform.NewCumulativePeriodsWriter(transform.CumulativePeriodsWriterOptions{
Dst: conn,
GroupBy: req.GroupBy,
Since: uint32(since.Unix()),
FirstHourOfDay: req.FirstHourOfDay,
})
if err != nil {
@ -762,7 +764,7 @@ type rangeScanReq struct {
Since uint32
Until uint32
Conn io.Writer
ResponseWriter atree.PeriodsWriter
ResponseWriter diploma.MeasureConsumer
}
func (s *Database) rangeScan(req rangeScanReq) error {
@ -785,6 +787,7 @@ func (s *Database) rangeScan(req rangeScanReq) error {
case UntilFound:
err := s.atree.ContinueRangeScan(atree.ContinueRangeScanReq{
MetricType: req.MetricType,
FracDigits: result.FracDigits,
ResponseWriter: req.ResponseWriter,
LastPageNo: result.LastPageNo,
@ -800,6 +803,7 @@ func (s *Database) rangeScan(req rangeScanReq) error {
case UntilNotFound:
err := s.atree.RangeScan(atree.RangeScanReq{
MetricType: req.MetricType,
FracDigits: result.FracDigits,
ResponseWriter: req.ResponseWriter,
RootPageNo: result.RootPageNo,
@ -830,7 +834,7 @@ type fullScanReq struct {
MetricID uint32
MetricType diploma.MetricType
Conn io.Writer
ResponseWriter atree.PeriodsWriter
ResponseWriter diploma.MeasureConsumer
}
func (s *Database) fullScan(req fullScanReq) error {
@ -851,6 +855,7 @@ func (s *Database) fullScan(req fullScanReq) error {
case UntilFound:
err := s.atree.ContinueFullScan(atree.ContinueFullScanReq{
MetricType: req.MetricType,
FracDigits: result.FracDigits,
ResponseWriter: req.ResponseWriter,
LastPageNo: result.LastPageNo,

@ -14,11 +14,13 @@ func timeBoundsOfAggregation(since, until proto.TimeBound, groupBy diploma.Group
switch groupBy {
case diploma.GroupByHour, diploma.GroupByDay:
s = time.Date(since.Year, since.Month, since.Day, 0, 0, 0, 0, time.Local)
u = time.Date(until.Year, until.Month, until.Day, 0, 0, 0, 0, time.Local)
u = time.Date(until.Year, until.Month, until.Day, 23, 59, 59, 0, time.Local)
case diploma.GroupByMonth:
s = time.Date(since.Year, since.Month, 1, 0, 0, 0, 0, time.Local)
u = time.Date(until.Year, until.Month, 1, 0, 0, 0, 0, time.Local)
u = time.Date(until.Year, until.Month, 1, 23, 59, 59, 0, time.Local)
u = u.AddDate(0, 1, 0)
u = u.AddDate(0, 0, -1)
}
if firstHourOfDay > 0 {
@ -26,8 +28,6 @@ func timeBoundsOfAggregation(since, until proto.TimeBound, groupBy diploma.Group
s = s.Add(duration)
u = u.Add(duration)
}
u = u.Add(-1 * time.Second)
return
}
@ -65,10 +65,3 @@ func (s *Database) metricRUnlock(metricID uint32) {
default:
}
}
func correctToFHD(since, until uint32, firstHourOfDay int) (uint32, uint32) {
duration := time.Duration(firstHourOfDay) * time.Hour
since = uint32(time.Unix(int64(since), 0).Add(duration).Unix())
until = uint32(time.Unix(int64(until), 0).Add(duration).Unix())
return since, until
}

@ -528,6 +528,8 @@ func (s *Database) appendMeasureAfterOverflow(extended txlog.AppendedMeasureWith
rec.MetricID))
}
//fmt.Printf("reinit by: %d, %v\n", rec.Timestamp, rec.Value)
metric.ReinitBy(rec.Timestamp, rec.Value)
if rec.IsRootChanged {
metric.RootPageNo = rec.RootPageNo
@ -643,7 +645,7 @@ type tryRangeScanReq struct {
Since uint32
Until uint32
MetricType diploma.MetricType
ResponseWriter atree.WorkerMeasureConsumer
ResponseWriter diploma.WorkerMeasureConsumer
ResultCh chan rangeScanResult
}
@ -717,11 +719,20 @@ func (*Database) startRangeScan(metric *_metric, req tryRangeScanReq) bool {
metric.Timestamps.Size(),
)
valueDecompressor := chunkenc.NewReverseInstantDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
var valueDecompressor diploma.ValueDecompressor
if metric.MetricType == diploma.Cumulative {
valueDecompressor = chunkenc.NewReverseCumulativeDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
} else {
valueDecompressor = chunkenc.NewReverseInstantDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
}
for {
timestamp, done := timestampDecompressor.NextValue()
@ -763,7 +774,7 @@ func (*Database) startRangeScan(metric *_metric, req tryRangeScanReq) bool {
type tryFullScanReq struct {
MetricID uint32
MetricType diploma.MetricType
ResponseWriter atree.WorkerMeasureConsumer
ResponseWriter diploma.WorkerMeasureConsumer
ResultCh chan fullScanResult
}
@ -813,12 +824,21 @@ func (*Database) startFullScan(metric *_metric, req tryFullScanReq) bool {
metric.TimestampsBuf,
metric.Timestamps.Size(),
)
valueDecompressor := chunkenc.NewReverseInstantDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
var valueDecompressor diploma.ValueDecompressor
if metric.MetricType == diploma.Cumulative {
valueDecompressor = chunkenc.NewReverseCumulativeDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
} else {
valueDecompressor = chunkenc.NewReverseInstantDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
}
for {
timestamp, done := timestampDecompressor.NextValue()
if done {

Binary file not shown.

@ -46,6 +46,20 @@ type ValueDecompressor interface {
NextValue() (float64, bool)
}
type MeasureConsumer interface {
Feed(uint32, float64)
FeedNoSend(uint32, float64)
Close() error
}
type WorkerMeasureConsumer interface {
FeedNoSend(uint32, float64)
}
type AtreeMeasureConsumer interface {
Feed(uint32, float64)
}
type AbortCode int
const (

@ -16,6 +16,8 @@ func sendRequests(conn *client.Connection) {
cumulativeMetricID uint32 = 10001
fracDigits int = 2
err error
seriesInDays = 62
)
conn.DeleteMetric(instantMetricID)
@ -41,17 +43,17 @@ func sendRequests(conn *client.Connection) {
log.Fatalf("conn.GetMetric: %s\n", err)
} else {
fmt.Printf(`
GetMetric:
metricID: %d
metricType: %s
fracDigits: %d
`,
GetMetric:
metricID: %d
metricType: %s
fracDigits: %d
`,
iMetric.MetricID, metricTypeToName[iMetric.MetricType], fracDigits)
}
// APPEND MEASURES
instantMeasures := GenerateInstantMeasures(62, 220)
instantMeasures := GenerateInstantMeasures(seriesInDays, 220)
err = conn.AppendMeasures(proto.AppendMeasuresReq{
MetricID: instantMetricID,
@ -78,10 +80,10 @@ GetMetric:
if err != nil {
log.Fatalf("conn.ListInstantMeasures: %s\n", err)
} else {
fmt.Printf("\nListInstantMeasures %s - %s:\n",
formatTime(uint32(since.Unix())), formatTime(uint32(until.Unix())))
fmt.Printf("\nListInstantMeasures %s %s:\n",
formatTime(since), formatTime(until))
for _, item := range instantList {
fmt.Printf(" %s => %.2f\n", formatTime(item.Timestamp), item.Value)
fmt.Printf(" %s => %.2f\n", formatTimestamp(item.Timestamp), item.Value)
}
}
@ -93,13 +95,13 @@ GetMetric:
} else {
fmt.Printf("\nListAllInstantMeasures (last 15 items):\n")
for _, item := range instantList[:15] {
fmt.Printf(" %s => %.2f\n", formatTime(item.Timestamp), item.Value)
fmt.Printf(" %s => %.2f\n", formatTimestamp(item.Timestamp), item.Value)
}
}
// LIST INSTANT PERIODS (group by hour)
until = time.Unix(int64(lastTimestamp+1), 0)
until = time.Unix(int64(lastTimestamp), 0)
since = until.Add(-24 * time.Hour)
instantPeriods, err := conn.ListInstantPeriods(proto.ListInstantPeriodsReq{
@ -120,16 +122,19 @@ GetMetric:
if err != nil {
log.Fatalf("conn.ListInstantPeriods: %s\n", err)
} else {
fmt.Printf("\nListInstantPeriods (1 day, group by hour):\n")
fmt.Printf("\nListInstantPeriods (%s − %s, group by day):\n",
formatDate(since), formatDate(until))
for _, item := range instantPeriods {
fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n", formatHourPeriod(item.Period), item.Min, item.Max, item.Avg)
fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n",
formatHourPeriod(item.Period), item.Min, item.Max, item.Avg)
}
}
// LIST INSTANT PERIODS (group by day)
until = time.Unix(int64(lastTimestamp+1), 0)
since = until.AddDate(0, 0, -7)
until = time.Unix(int64(lastTimestamp), 0)
since = until.AddDate(0, 0, -6)
instantPeriods, err = conn.ListInstantPeriods(proto.ListInstantPeriodsReq{
MetricID: instantMetricID,
@ -149,16 +154,19 @@ GetMetric:
if err != nil {
log.Fatalf("conn.ListInstantPeriods: %s\n", err)
} else {
fmt.Printf("\nListInstantPeriods (7 days, group by day):\n")
fmt.Printf("\nListInstantPeriods (%s − %s, group by day):\n",
formatDate(since), formatDate(until))
for _, item := range instantPeriods {
fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n", formatDayPeriod(item.Period), item.Min, item.Max, item.Avg)
fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n",
formatDayPeriod(item.Period), item.Min, item.Max, item.Avg)
}
}
// LIST INSTANT PERIODS (group by month)
until = time.Unix(int64(lastTimestamp+1), 0)
since = until.AddDate(0, 0, -62)
until = time.Unix(int64(lastTimestamp), 0)
since = until.AddDate(0, 0, -seriesInDays)
instantPeriods, err = conn.ListInstantPeriods(proto.ListInstantPeriodsReq{
MetricID: instantMetricID,
@ -178,9 +186,11 @@ GetMetric:
if err != nil {
log.Fatalf("conn.ListInstantPeriods: %s\n", err)
} else {
fmt.Printf("\nListInstantPeriods (62 days, group by month):\n")
fmt.Printf("\nListInstantPeriods (%s − %s, group by day):\n",
formatMonth(since), formatMonth(until))
for _, item := range instantPeriods {
fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n", formatMonthPeriod(item.Period), item.Min, item.Max, item.Avg)
fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n",
formatMonthPeriod(item.Period), item.Min, item.Max, item.Avg)
}
}
@ -234,7 +244,9 @@ GetMetric:
// APPEND MEASURES
cumulativeMeasures := GenerateCumulativeMeasures(62)
cumulativeMeasures := GenerateCumulativeMeasures(seriesInDays)
//pretty.PPrintln("CUMULATIVE MEASURES", cumulativeMeasures)
err = conn.AppendMeasures(proto.AppendMeasuresReq{
MetricID: cumulativeMetricID,
@ -261,11 +273,12 @@ GetMetric:
if err != nil {
log.Fatalf("conn.ListCumulativeMeasures: %s\n", err)
} else {
fmt.Printf("\nListCumulativeMeasures %s - %s:\n",
formatTime(uint32(since.Unix())), formatTime(uint32(until.Unix())))
fmt.Printf("\nListCumulativeMeasures %s %s:\n",
formatTime(since), formatTime(until))
for _, item := range cumulativeList {
fmt.Printf(" %s => %.2f\n", formatTime(item.Timestamp), item.Value)
fmt.Printf(" %s => %.2f (%.2f)\n",
formatTimestamp(item.Timestamp), item.Value, item.Total)
}
}
@ -277,13 +290,14 @@ GetMetric:
} else {
fmt.Printf("\nListAllCumulativeMeasures (last 15 items):\n")
for _, item := range cumulativeList[:15] {
fmt.Printf(" %s => %.2f\n", formatTime(item.Timestamp), item.Value)
fmt.Printf(" %s => %.2f (%.2f)\n",
formatTimestamp(item.Timestamp), item.Value, item.Total)
}
}
// LIST CUMULATIVE PERIODS (group by hour)
//LIST CUMULATIVE PERIODS (group by hour)
until = time.Unix(int64(lastTimestamp+1), 0)
until = time.Unix(int64(lastTimestamp), 0)
since = until.Add(-24 * time.Hour)
cumulativePeriods, err := conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{
@ -303,16 +317,19 @@ GetMetric:
if err != nil {
log.Fatalf("conn.ListCumulativePeriods: %s\n", err)
} else {
fmt.Printf("\nListCumulativePeriods (1 day, group by hour):\n")
fmt.Printf("\nListCumulativePeriods (%s − %s, group by hour):\n",
formatDate(since), formatDate(until))
for _, item := range cumulativePeriods {
fmt.Printf(" %s => end value %.2f, total %.2f\n", formatHourPeriod(item.Period), item.EndValue, item.Total)
fmt.Printf(" %s => %.2f (%.2f)\n",
formatHourPeriod(item.Period), item.EndValue, item.EndValue-item.StartValue)
}
}
// LIST CUMULATIVE PERIODS (group by day)
until = time.Unix(int64(lastTimestamp+1), 0)
since = until.AddDate(0, 0, -7)
until = time.Unix(int64(lastTimestamp), 0)
since = until.AddDate(0, 0, -6)
cumulativePeriods, err = conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{
MetricID: cumulativeMetricID,
@ -331,16 +348,19 @@ GetMetric:
if err != nil {
log.Fatalf("conn.ListCumulativePeriods: %s\n", err)
} else {
fmt.Printf("\nListCumulativePeriods (7 days, group by day):\n")
fmt.Printf("\nListCumulativePeriods (%s − %s, group by day):\n",
formatDate(since), formatDate(until))
for _, item := range cumulativePeriods {
fmt.Printf(" %s => end value %.2f, total %.2f\n", formatDayPeriod(item.Period), item.EndValue, item.Total)
fmt.Printf(" %s => %.2f (%.2f)\n",
formatDayPeriod(item.Period), item.EndValue, item.EndValue-item.StartValue)
}
}
// LIST CUMULATIVE PERIODS (group by day)
until = time.Unix(int64(lastTimestamp+1), 0)
since = until.AddDate(0, 0, -62)
until = time.Unix(int64(lastTimestamp), 0)
since = until.AddDate(0, 0, -seriesInDays)
cumulativePeriods, err = conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{
MetricID: cumulativeMetricID,
@ -359,9 +379,12 @@ GetMetric:
if err != nil {
log.Fatalf("conn.ListCumulativePeriods: %s\n", err)
} else {
fmt.Printf("\nListCumulativePeriods (62 days, group by month):\n")
fmt.Printf("\nListCumulativePeriods (%s − %s, group by month):\n",
formatMonth(since), formatMonth(until))
for _, item := range cumulativePeriods {
fmt.Printf(" %s => end value %.2f, total %.2f\n", formatMonthPeriod(item.Period), item.EndValue, item.Total)
fmt.Printf(" %s => %.2f (%.2f)\n",
formatMonthPeriod(item.Period), item.EndValue, item.EndValue-item.StartValue)
}
}
@ -386,11 +409,21 @@ GetMetric:
}
}
const datetimeLayout = "2006-01-02 15:04:05"
func formatMonth(tm time.Time) string {
return tm.Format("2006-01")
}
func formatDate(tm time.Time) string {
return tm.Format("2006-01-02")
}
func formatTime(tm time.Time) string {
return tm.Format("2006-01-02 15:04:05")
}
func formatTime(timestamp uint32) string {
func formatTimestamp(timestamp uint32) string {
tm := time.Unix(int64(timestamp), 0)
return tm.Format(datetimeLayout)
return tm.Format("2006-01-02 15:04:05")
}
func formatHourPeriod(period uint32) string {

@ -5,6 +5,7 @@ go 1.24.0
require (
github.com/RoaringBitmap/roaring/v2 v2.5.0
gopkg.in/ini.v1 v1.67.0
gordenko.dev/dima/pretty v0.0.0-20221225212746-0c27d8c0ac69
)
require (

@ -18,3 +18,5 @@ gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gordenko.dev/dima/pretty v0.0.0-20221225212746-0c27d8c0ac69 h1:nyJ3mzTQ46yUeMZCdLyYcs7B5JCS54c67v84miyhq2E=
gordenko.dev/dima/pretty v0.0.0-20221225212746-0c27d8c0ac69/go.mod h1:AxgKDktpqBVyIOhIcP+nlCpK+EsJyjN5kPdqyd8euVU=

Binary file not shown.

@ -94,11 +94,11 @@ type CumulativeMeasure struct {
}
type CumulativePeriod struct {
Period uint32
Since uint32
Until uint32
EndValue float64
Total float64
Period uint32
Start uint32
End uint32
StartValue float64
EndValue float64
}
type InstantMeasure struct {
@ -108,8 +108,8 @@ type InstantMeasure struct {
type InstantPeriod struct {
Period uint32
Since uint32
Until uint32
Start uint32
End uint32
Min float64
Max float64
Avg float64

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

@ -29,7 +29,7 @@ type InstantPeriodsWriter struct {
time2period func(uint32) time.Time
currentPeriod time.Time
lastTimestamp uint32
endTimestamp uint32 // время показания на конец периода
endTimestamp uint32
min float64
max float64
total float64
@ -43,7 +43,6 @@ func NewInstantPeriodsWriter(opt InstantPeriodsWriterOptions) (*InstantPeriodsWr
if opt.FirstHourOfDay < 0 || opt.FirstHourOfDay > 23 {
return nil, fmt.Errorf("wrong FirstHourOfDay option: %d", opt.FirstHourOfDay)
}
// Считаю q, чтобы заранее выделить массив для упаковки периодов
var q int
if (opt.AggregateFuncs & diploma.AggregateMin) == diploma.AggregateMin {
q++
@ -59,8 +58,6 @@ func NewInstantPeriodsWriter(opt InstantPeriodsWriterOptions) (*InstantPeriodsWr
return nil, errors.New("AggregateFuncs option is required")
}
// 12 - это period, since, until
// 8 - это размер float64
s := &InstantPeriodsWriter{
aggregateFuncs: opt.AggregateFuncs,
arr: make([]byte, 12+q*8),
@ -121,20 +118,14 @@ func (s *InstantPeriodsWriter) feed(timestamp uint32, value float64, isBuffer bo
if s.entries > 0 {
period := s.time2period(timestamp)
if period != s.currentPeriod {
// закрываю период
// готовый период
s.packPeriod(timestamp)
if isBuffer {
s.responder.BufferRecord(s.arr)
} else {
s.responder.AppendRecord(s.arr)
}
// затем
s.decrementPeriod()
//fmt.Println(" period: ", period.Format("2006-01-02 15:04:05"))
//fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05"))
for period.Before(s.currentPeriod) {
// вставляю пустышку
s.packBlankPeriod()
if isBuffer {
s.responder.BufferRecord(s.arr)
@ -142,9 +133,6 @@ func (s *InstantPeriodsWriter) feed(timestamp uint32, value float64, isBuffer bo
s.responder.AppendRecord(s.arr)
}
s.decrementPeriod()
//fmt.Println(" period: ", period.Format("2006-01-02 15:04:05"))
//fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05"))
//return
}
s.endTimestamp = timestamp
s.min = value
@ -157,7 +145,6 @@ func (s *InstantPeriodsWriter) feed(timestamp uint32, value float64, isBuffer bo
} else if value > s.max {
s.max = value
}
// для подсчета AVG
s.total += value
s.entries++
}
@ -176,7 +163,6 @@ func (s *InstantPeriodsWriter) decrementPeriod() {
switch s.groupBy {
case diploma.GroupByHour:
s.currentPeriod = s.currentPeriod.Add(-1 * time.Hour)
//fmt.Println("decrement")
case diploma.GroupByDay:
s.currentPeriod = s.currentPeriod.AddDate(0, 0, -1)
case diploma.GroupByMonth:
@ -185,11 +171,6 @@ func (s *InstantPeriodsWriter) decrementPeriod() {
}
func (s *InstantPeriodsWriter) packBlankPeriod() {
//period := s.currentPeriod.Format("2006-01-02 15:04:05")
//since := "0"
//until := "0"
//fmt.Printf("%s: %s - %s, %.0f - %.0f\n", period, since, until, 0.0, 0.0)
// until - это endTimestamp всегда
bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix()))
for i := 4; i < len(s.arr); i++ {
s.arr[i] = 0
@ -223,17 +204,10 @@ func (s *InstantPeriodsWriter) packPeriod(timestamp uint32) {
}
}
/*
Идея с разбивкой на периоды:
Для каждого периода нахожу одно последнее значение.
Начало периода - это конец предыдущего. Если предыдущий не строго предыдущий,
а с пропусками - на место пропусков вставляю пустышки.
Плюс такого решения - я всегда показываю реальное значение на конец периода.
*/
type CumulativePeriodsWriter struct {
arr []byte
responder *ChunkedResponder
since uint32
firstHourOfDay int
currentPeriod time.Time
groupBy diploma.GroupBy
@ -247,6 +221,7 @@ type CumulativePeriodsWriter struct {
type CumulativePeriodsWriterOptions struct {
Dst io.Writer
GroupBy diploma.GroupBy
Since uint32
FirstHourOfDay int
}
@ -254,7 +229,6 @@ func NewCumulativePeriodsWriter(opt CumulativePeriodsWriterOptions) (*Cumulative
if opt.Dst == nil {
return nil, errors.New("Dst option is required")
}
// Считаю q, чтобы заранее выделить массив для упаковки периодов
if opt.FirstHourOfDay < 0 || opt.FirstHourOfDay > 23 {
return nil, fmt.Errorf("wrong firstHourOfDay option: %d", opt.FirstHourOfDay)
}
@ -262,6 +236,7 @@ func NewCumulativePeriodsWriter(opt CumulativePeriodsWriterOptions) (*Cumulative
s := &CumulativePeriodsWriter{
arr: make([]byte, 28),
responder: NewChunkedResponder(opt.Dst),
since: opt.Since,
firstHourOfDay: opt.FirstHourOfDay,
groupBy: opt.GroupBy,
}
@ -322,17 +297,13 @@ func (s *CumulativePeriodsWriter) feed(timestamp uint32, value float64, isBuffer
if s.endTimestamp > 0 {
period := s.time2period(timestamp)
if period != s.currentPeriod {
// закрываю период
s.packPeriod(timestamp, value)
if isBuffer {
s.responder.BufferRecord(s.arr)
} else {
s.responder.AppendRecord(s.arr)
}
// затем
s.decrementPeriod()
//fmt.Println(" period: ", period.Format("2006-01-02 15:04:05"))
//fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05"))
for period.Before(s.currentPeriod) {
// вставляю пустышку
s.packBlankPeriod()
@ -342,9 +313,6 @@ func (s *CumulativePeriodsWriter) feed(timestamp uint32, value float64, isBuffer
s.responder.AppendRecord(s.arr)
}
s.decrementPeriod()
//fmt.Println(" period: ", period.Format("2006-01-02 15:04:05"))
//fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05"))
//return
}
s.endTimestamp = timestamp
s.endValue = value
@ -362,7 +330,6 @@ func (s *CumulativePeriodsWriter) decrementPeriod() {
switch s.groupBy {
case diploma.GroupByHour:
s.currentPeriod = s.currentPeriod.Add(-1 * time.Hour)
//fmt.Println("decrement")
case diploma.GroupByDay:
s.currentPeriod = s.currentPeriod.AddDate(0, 0, -1)
case diploma.GroupByMonth:
@ -371,11 +338,6 @@ func (s *CumulativePeriodsWriter) decrementPeriod() {
}
func (s *CumulativePeriodsWriter) packBlankPeriod() {
//period := s.currentPeriod.Format("2006-01-02 15:04:05")
//since := "0"
//until := "0"
//fmt.Printf("%s: %s - %s, %.0f - %.0f\n", period, since, until, 0.0, 0.0)
// until - это endTimestamp всегда
bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix()))
for i := 4; i < len(s.arr); i++ {
s.arr[i] = 0
@ -383,11 +345,6 @@ func (s *CumulativePeriodsWriter) packBlankPeriod() {
}
func (s *CumulativePeriodsWriter) packPeriod(start uint32, startValue float64) {
//period := s.currentPeriod.Format("2006-01-02 15:04:05")
//since := time.Unix(int64(start), 0).Format("2006-01-02 15:04:05")
//until := time.Unix(int64(s.endTimestamp), 0).Format("2006-01-02 15:04:05")
//fmt.Printf("%s: %s - %s, %.0f - %.0f\n", period, since, until, startValue, s.endValue)
// until - это endTimestamp всегда
bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix()))
bin.PutUint32(s.arr[4:], start)
bin.PutUint32(s.arr[8:], s.endTimestamp)
@ -397,12 +354,14 @@ func (s *CumulativePeriodsWriter) packPeriod(start uint32, startValue float64) {
func (s *CumulativePeriodsWriter) Close() error {
if s.endTimestamp > 0 {
if s.lastTimestamp != s.endTimestamp {
s.packPeriod(s.lastTimestamp, s.lastValue)
} else {
s.packPeriod(s.endTimestamp, s.endValue)
if s.endTimestamp >= s.since {
if s.lastTimestamp != s.endTimestamp {
s.packPeriod(s.lastTimestamp, s.lastValue)
} else {
s.packPeriod(s.endTimestamp, s.endValue)
}
s.responder.AppendRecord(s.arr)
}
s.responder.AppendRecord(s.arr)
}
return s.responder.Flush()
}

@ -51,7 +51,6 @@ type InstantMeasureWriter struct {
}
func NewInstantMeasureWriter(dst io.Writer, since uint32) *InstantMeasureWriter {
// 12 - это timestamp, value
return &InstantMeasureWriter{
arr: make([]byte, 12),
responder: NewChunkedResponder(dst),
@ -101,7 +100,6 @@ type CumulativeMeasureWriter struct {
}
func NewCumulativeMeasureWriter(dst io.Writer, since uint32) *CumulativeMeasureWriter {
// 20 - это timestamp, value, total
return &CumulativeMeasureWriter{
arr: make([]byte, 20),
responder: NewChunkedResponder(dst),
@ -138,10 +136,8 @@ func (s *CumulativeMeasureWriter) pack(total float64) {
func (s *CumulativeMeasureWriter) Close() error {
if s.endTimestamp >= s.since {
// endTimestamp внутри заданного периода. Других показаний нет,
// поэтому время добавляю, но накопленную сумму ставлю 0.
s.pack(0)
// Если < since - ничего делать не нужно, ибо накопленная сумма уже добавлена
s.responder.BufferRecord(s.arr)
}
return s.responder.Flush()
}

@ -70,36 +70,20 @@ func (s *ChunkedResponder) Flush() error {
if _, err := s.dst.Write(endMsg); err != nil {
return err
}
//fmt.Printf("sent endMsg %d\n", endMsg)
return nil
}
func (s *ChunkedResponder) sendBuffered() (err error) {
msg := s.buf.Bytes()
bin.PutUint32(msg[1:], uint32(s.recordsQty))
//fmt.Printf("put uint16: %d\n", msg[:3])
//fmt.Printf("send %d records\n", s.recordsQty)
//fmt.Printf("send buffered: %d, qty: %d\n", msg, s.recordsQty)
n, err := s.dst.Write(msg)
if err != nil {
return
}
if n != len(msg) {
return fmt.Errorf("incomplete write %d bytes instead of %d", n, len(msg))
}
s.buf.Reset()
return
}
// Для Aggregation пишем функцию определения периода и пуляем фактические периоды
//
// By default net/http.Server uses 4KB buffers, which are flushed to client with chunked responses.
// These buffers may result in visible overhead for responses exceeding a few megabytes.
// So allocate 64Kb buffers.
// bw: bufio.NewWriterSize(w, 64*1024),

Loading…
Cancel
Save