Compare commits

...

2 Commits

Author SHA1 Message Date
Dima Gordenko 14f1531276 rc2 3 months ago
Dima Gordenko c6897c5b76 rc2 3 months ago
  1. 4
      atree/atree.go
  2. 18
      atree/select.go
  3. 15
      atree/writers.go
  4. 109
      client/client.go
  5. 19
      database/api.go
  6. 15
      database/helpers.go
  7. 44
      database/proc.go
  8. BIN
      database_linux
  9. 14
      diploma.go
  10. 115
      examples/requests/requests.go
  11. 1
      go.mod
  12. 2
      go.sum
  13. BIN
      loadtest_linux
  14. 14
      proto/proto.go
  15. BIN
      requests_linux
  16. 63
      transform/aggregate.go
  17. 6
      transform/raw.go
  18. 16
      transform/responder.go

@ -378,10 +378,6 @@ func (s *Atree) AppendDataPage(req AppendDataPageReq) (_ redo.Report, err error)
return
}
// На данний момен схема - наступна. Всі сторінки - data та index - зафіксовані в кеші.
// Отже запис на диск пройде максимально швидко. Після цього ReferenceCount кожної
// сторінки зменшиться на 1. Оскільки на метрику утримується XLock, сторінки мають
// ReferenceCount = 1 (немає інших читачів).
waitCh := make(chan struct{})
task := WriteTask{

@ -7,8 +7,9 @@ import (
)
type ContinueFullScanReq struct {
MetricType diploma.MetricType
FracDigits byte
ResponseWriter AtreeMeasureConsumer
ResponseWriter diploma.AtreeMeasureConsumer
LastPageNo uint32
}
@ -23,7 +24,7 @@ func (s *Atree) ContinueFullScan(req ContinueFullScanReq) error {
PageData: buf,
Atree: s,
FracDigits: req.FracDigits,
MetricType: diploma.Instant,
MetricType: req.MetricType,
})
if err != nil {
return err
@ -43,8 +44,9 @@ func (s *Atree) ContinueFullScan(req ContinueFullScanReq) error {
}
type ContinueRangeScanReq struct {
MetricType diploma.MetricType
FracDigits byte
ResponseWriter AtreeMeasureConsumer
ResponseWriter diploma.AtreeMeasureConsumer
LastPageNo uint32
Since uint32
}
@ -60,7 +62,7 @@ func (s *Atree) ContinueRangeScan(req ContinueRangeScanReq) error {
PageData: buf,
Atree: s,
FracDigits: req.FracDigits,
MetricType: diploma.Instant,
MetricType: req.MetricType,
})
if err != nil {
return err
@ -83,8 +85,9 @@ func (s *Atree) ContinueRangeScan(req ContinueRangeScanReq) error {
}
type RangeScanReq struct {
MetricType diploma.MetricType
FracDigits byte
ResponseWriter AtreeMeasureConsumer
ResponseWriter diploma.AtreeMeasureConsumer
RootPageNo uint32
Since uint32
Until uint32
@ -101,7 +104,7 @@ func (s *Atree) RangeScan(req RangeScanReq) error {
PageData: buf,
Atree: s,
FracDigits: req.FracDigits,
MetricType: diploma.Instant,
MetricType: req.MetricType,
})
if err != nil {
return err
@ -117,11 +120,12 @@ func (s *Atree) RangeScan(req RangeScanReq) error {
return nil
}
//fmt.Printf("atree range scan: %s, %v\n", time.Unix(int64(timestamp), 0).Format("2006-01-02 15:04:05"), value)
if timestamp <= req.Until {
req.ResponseWriter.Feed(timestamp, value)
if timestamp < req.Since {
// - записи, удовлетворяющие временным рамкам, закончились.
return nil
}
}

@ -1,15 +0,0 @@
package atree
type PeriodsWriter interface {
Feed(uint32, float64)
FeedNoSend(uint32, float64)
Close() error
}
type WorkerMeasureConsumer interface {
FeedNoSend(uint32, float64)
}
type AtreeMeasureConsumer interface {
Feed(uint32, float64)
}

@ -81,21 +81,6 @@ func (s *Connection) AddMetric(req proto.AddMetricReq) error {
return s.mustSuccess(s.src)
}
//
// func (s *Connection) UpdateMetric(req Metric) error {
// arr := []byte{
// proto.TypeUpdateMetric,
// 0, 0, 0, 0, //
// byte(req.FracDigits),
// }
// bin.PutUint32(arr[1:], req.MetricID)
// if _, err := s.conn.Write(arr); err != nil {
// return err
// }
// return s.mustSuccess(s.src)
// }
func (s *Connection) GetMetric(metricID uint32) (*proto.Metric, error) {
arr := []byte{
proto.TypeGetMetric,
@ -184,73 +169,9 @@ func (s *Connection) AppendMeasures(req proto.AppendMeasuresReq) (err error) {
if _, err := s.conn.Write(arr); err != nil {
return err
}
//fmt.Printf("encode measures: %d\n", len(req.Measures))
return s.mustSuccess(s.src)
}
// type AppendMeasurePerMetricReq struct {
// MetricID uint32
// Measures []Measure
// }
func (s *Connection) AppendMeasurePerMetric(list []proto.MetricMeasure) (_ []proto.AppendError, err error) {
if len(list) > 65535 {
return nil, fmt.Errorf("wrong measures qty: %d", len(list))
}
var (
// 3 bytes: 1b message type + 2b records qty
fixedSize = 3
recordSize = 16
arr = make([]byte, fixedSize+len(list)*recordSize)
)
arr[0] = proto.TypeAppendMeasures
bin.PutUint16(arr[1:], uint16(len(list)))
pos := fixedSize
for _, item := range list {
bin.PutUint32(arr[pos:], item.MetricID)
bin.PutUint32(arr[pos+4:], item.Timestamp)
bin.PutFloat64(arr[pos+8:], item.Value)
pos += recordSize
}
if _, err := s.conn.Write(arr); err != nil {
return nil, err
}
code, err := s.src.ReadByte()
if err != nil {
return nil, fmt.Errorf("read response code: %s", err)
}
switch code {
case proto.RespValue:
var (
qty uint16
appendErrors []proto.AppendError
)
qty, err = bin.ReadUint16(s.src)
if err != nil {
return
}
for range qty {
var ae proto.AppendError
ae.MetricID, err = bin.ReadUint32(s.src)
if err != nil {
return
}
ae.ErrorCode, err = bin.ReadUint16(s.src)
if err != nil {
return
}
appendErrors = append(appendErrors, ae)
}
return appendErrors, nil
default:
return nil, fmt.Errorf("unknown reponse code %d", code)
}
}
func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]proto.InstantMeasure, error) {
arr := []byte{
proto.TypeListAllInstantMeasures,
@ -273,8 +194,6 @@ func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]proto.InstantMea
return nil, fmt.Errorf("read response code: %s", err)
}
//fmt.Printf("code: %d\n", code)
switch code {
case proto.RespPartOfValue:
q, err := bin.ReadUint32(s.src)
@ -282,8 +201,6 @@ func (s *Connection) ListAllInstantMeasures(metricID uint32) ([]proto.InstantMea
return nil, fmt.Errorf("read records qty: %s", err)
}
//fmt.Printf("q: %d\n", q)
for i := range int(q) {
err = bin.ReadNInto(s.src, tmp)
if err != nil {
@ -387,8 +304,6 @@ func (s *Connection) ListAllCumulativeMeasures(metricID uint32) ([]proto.Cumulat
return nil, fmt.Errorf("read response code: %s", err)
}
//fmt.Printf("code: %d\n", code)
switch code {
case proto.RespPartOfValue:
q, err := bin.ReadUint32(s.src)
@ -402,19 +317,11 @@ func (s *Connection) ListAllCumulativeMeasures(metricID uint32) ([]proto.Cumulat
return nil, fmt.Errorf("read record #%d: %s", i, err)
}
//fmt.Printf("tmp: %d\n", tmp)
result = append(result, proto.CumulativeMeasure{
Timestamp: bin.GetUint32(tmp),
Value: bin.GetFloat64(tmp[4:]),
Total: bin.GetFloat64(tmp[12:]),
})
// pretty.PPrintln("measure", CumulativeMeasure{
// Timestamp: bin.GetUint32(tmp),
// Value: bin.GetFloat64(tmp[4:]),
// Total: bin.GetFloat64(tmp[12:]),
// })
}
case proto.RespEndOfValue:
@ -533,8 +440,6 @@ func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]prot
return nil, fmt.Errorf("read response code: %s", err)
}
//fmt.Printf("code: %d\n", code)
switch code {
case proto.RespPartOfValue:
q, err := bin.ReadUint32(s.src)
@ -551,8 +456,8 @@ func (s *Connection) ListInstantPeriods(req proto.ListInstantPeriodsReq) ([]prot
var (
p = proto.InstantPeriod{
Period: bin.GetUint32(tmp[0:]),
Since: bin.GetUint32(tmp[4:]),
Until: bin.GetUint32(tmp[8:]),
Start: bin.GetUint32(tmp[4:]),
End: bin.GetUint32(tmp[8:]),
}
// 12 bytes - period, since, until
pos = 12
@ -629,11 +534,11 @@ func (s *Connection) ListCumulativePeriods(req proto.ListCumulativePeriodsReq) (
return nil, fmt.Errorf("read record #%d: %s", i, err)
}
result = append(result, proto.CumulativePeriod{
Period: bin.GetUint32(tmp[0:]),
Since: bin.GetUint32(tmp[4:]),
Until: bin.GetUint32(tmp[8:]),
EndValue: bin.GetFloat64(tmp[12:]),
Total: bin.GetFloat64(tmp[20:]),
Period: bin.GetUint32(tmp[0:]),
Start: bin.GetUint32(tmp[4:]),
End: bin.GetUint32(tmp[8:]),
StartValue: bin.GetFloat64(tmp[12:]),
EndValue: bin.GetFloat64(tmp[20:]),
})
}

@ -499,10 +499,11 @@ func (s *Database) AppendMeasures(req proto.AppendMeasuresReq) uint16 {
toAppendMeasures = nil
}
//fmt.Printf("APPEND DATA PAGE %d, %v\n", measure.Timestamp, measure.Value)
report, err := s.atree.AppendDataPage(atree.AppendDataPageReq{
MetricID: req.MetricID,
Timestamp: until,
Value: untilValue,
Timestamp: measure.Timestamp,
Value: measure.Value,
Since: since,
RootPageNo: rootPageNo,
PrevPageNo: prevPageNo,
@ -607,7 +608,7 @@ func (s *Database) DeleteMeasures(req proto.DeleteMeasuresReq) uint16 {
case DeleteFromAtreeRequired:
// собираю номера всех data и index страниц метрики (типа запись REDO лога).
pageLists, err := s.atree.GetAllPages(req.MetricID)
pageLists, err := s.atree.GetAllPages(result.RootPageNo)
if err != nil {
diploma.Abort(diploma.FailedAtreeRequest, err)
}
@ -670,7 +671,7 @@ func (s *Database) ListInstantMeasures(conn net.Conn, req proto.ListInstantMeasu
MetricID: req.MetricID,
MetricType: diploma.Instant,
Since: req.Since,
Until: req.Until - 1,
Until: req.Until,
Conn: conn,
ResponseWriter: responseWriter,
})
@ -688,7 +689,7 @@ func (s *Database) ListCumulativeMeasures(conn net.Conn, req proto.ListCumulativ
MetricID: req.MetricID,
MetricType: diploma.Cumulative,
Since: req.Since,
Until: req.Until - 1,
Until: req.Until,
Conn: conn,
ResponseWriter: responseWriter,
})
@ -739,6 +740,7 @@ func (s *Database) ListCumulativePeriods(conn net.Conn, req proto.ListCumulative
responseWriter, err := transform.NewCumulativePeriodsWriter(transform.CumulativePeriodsWriterOptions{
Dst: conn,
GroupBy: req.GroupBy,
Since: uint32(since.Unix()),
FirstHourOfDay: req.FirstHourOfDay,
})
if err != nil {
@ -762,7 +764,7 @@ type rangeScanReq struct {
Since uint32
Until uint32
Conn io.Writer
ResponseWriter atree.PeriodsWriter
ResponseWriter diploma.MeasureConsumer
}
func (s *Database) rangeScan(req rangeScanReq) error {
@ -785,6 +787,7 @@ func (s *Database) rangeScan(req rangeScanReq) error {
case UntilFound:
err := s.atree.ContinueRangeScan(atree.ContinueRangeScanReq{
MetricType: req.MetricType,
FracDigits: result.FracDigits,
ResponseWriter: req.ResponseWriter,
LastPageNo: result.LastPageNo,
@ -800,6 +803,7 @@ func (s *Database) rangeScan(req rangeScanReq) error {
case UntilNotFound:
err := s.atree.RangeScan(atree.RangeScanReq{
MetricType: req.MetricType,
FracDigits: result.FracDigits,
ResponseWriter: req.ResponseWriter,
RootPageNo: result.RootPageNo,
@ -830,7 +834,7 @@ type fullScanReq struct {
MetricID uint32
MetricType diploma.MetricType
Conn io.Writer
ResponseWriter atree.PeriodsWriter
ResponseWriter diploma.MeasureConsumer
}
func (s *Database) fullScan(req fullScanReq) error {
@ -851,6 +855,7 @@ func (s *Database) fullScan(req fullScanReq) error {
case UntilFound:
err := s.atree.ContinueFullScan(atree.ContinueFullScanReq{
MetricType: req.MetricType,
FracDigits: result.FracDigits,
ResponseWriter: req.ResponseWriter,
LastPageNo: result.LastPageNo,

@ -14,11 +14,13 @@ func timeBoundsOfAggregation(since, until proto.TimeBound, groupBy diploma.Group
switch groupBy {
case diploma.GroupByHour, diploma.GroupByDay:
s = time.Date(since.Year, since.Month, since.Day, 0, 0, 0, 0, time.Local)
u = time.Date(until.Year, until.Month, until.Day, 0, 0, 0, 0, time.Local)
u = time.Date(until.Year, until.Month, until.Day, 23, 59, 59, 0, time.Local)
case diploma.GroupByMonth:
s = time.Date(since.Year, since.Month, 1, 0, 0, 0, 0, time.Local)
u = time.Date(until.Year, until.Month, 1, 0, 0, 0, 0, time.Local)
u = time.Date(until.Year, until.Month, 1, 23, 59, 59, 0, time.Local)
u = u.AddDate(0, 1, 0)
u = u.AddDate(0, 0, -1)
}
if firstHourOfDay > 0 {
@ -26,8 +28,6 @@ func timeBoundsOfAggregation(since, until proto.TimeBound, groupBy diploma.Group
s = s.Add(duration)
u = u.Add(duration)
}
u = u.Add(-1 * time.Second)
return
}
@ -65,10 +65,3 @@ func (s *Database) metricRUnlock(metricID uint32) {
default:
}
}
func correctToFHD(since, until uint32, firstHourOfDay int) (uint32, uint32) {
duration := time.Duration(firstHourOfDay) * time.Hour
since = uint32(time.Unix(int64(since), 0).Add(duration).Unix())
until = uint32(time.Unix(int64(until), 0).Add(duration).Unix())
return since, until
}

@ -528,6 +528,8 @@ func (s *Database) appendMeasureAfterOverflow(extended txlog.AppendedMeasureWith
rec.MetricID))
}
//fmt.Printf("reinit by: %d, %v\n", rec.Timestamp, rec.Value)
metric.ReinitBy(rec.Timestamp, rec.Value)
if rec.IsRootChanged {
metric.RootPageNo = rec.RootPageNo
@ -643,7 +645,7 @@ type tryRangeScanReq struct {
Since uint32
Until uint32
MetricType diploma.MetricType
ResponseWriter atree.WorkerMeasureConsumer
ResponseWriter diploma.WorkerMeasureConsumer
ResultCh chan rangeScanResult
}
@ -717,11 +719,20 @@ func (*Database) startRangeScan(metric *_metric, req tryRangeScanReq) bool {
metric.Timestamps.Size(),
)
valueDecompressor := chunkenc.NewReverseInstantDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
var valueDecompressor diploma.ValueDecompressor
if metric.MetricType == diploma.Cumulative {
valueDecompressor = chunkenc.NewReverseCumulativeDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
} else {
valueDecompressor = chunkenc.NewReverseInstantDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
}
for {
timestamp, done := timestampDecompressor.NextValue()
@ -763,7 +774,7 @@ func (*Database) startRangeScan(metric *_metric, req tryRangeScanReq) bool {
type tryFullScanReq struct {
MetricID uint32
MetricType diploma.MetricType
ResponseWriter atree.WorkerMeasureConsumer
ResponseWriter diploma.WorkerMeasureConsumer
ResultCh chan fullScanResult
}
@ -813,12 +824,21 @@ func (*Database) startFullScan(metric *_metric, req tryFullScanReq) bool {
metric.TimestampsBuf,
metric.Timestamps.Size(),
)
valueDecompressor := chunkenc.NewReverseInstantDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
var valueDecompressor diploma.ValueDecompressor
if metric.MetricType == diploma.Cumulative {
valueDecompressor = chunkenc.NewReverseCumulativeDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
} else {
valueDecompressor = chunkenc.NewReverseInstantDeltaDecompressor(
metric.ValuesBuf,
metric.Values.Size(),
metric.FracDigits,
)
}
for {
timestamp, done := timestampDecompressor.NextValue()
if done {

Binary file not shown.

@ -46,6 +46,20 @@ type ValueDecompressor interface {
NextValue() (float64, bool)
}
type MeasureConsumer interface {
Feed(uint32, float64)
FeedNoSend(uint32, float64)
Close() error
}
type WorkerMeasureConsumer interface {
FeedNoSend(uint32, float64)
}
type AtreeMeasureConsumer interface {
Feed(uint32, float64)
}
type AbortCode int
const (

@ -16,6 +16,8 @@ func sendRequests(conn *client.Connection) {
cumulativeMetricID uint32 = 10001
fracDigits int = 2
err error
seriesInDays = 62
)
conn.DeleteMetric(instantMetricID)
@ -41,17 +43,17 @@ func sendRequests(conn *client.Connection) {
log.Fatalf("conn.GetMetric: %s\n", err)
} else {
fmt.Printf(`
GetMetric:
metricID: %d
metricType: %s
fracDigits: %d
`,
GetMetric:
metricID: %d
metricType: %s
fracDigits: %d
`,
iMetric.MetricID, metricTypeToName[iMetric.MetricType], fracDigits)
}
// APPEND MEASURES
instantMeasures := GenerateInstantMeasures(62, 220)
instantMeasures := GenerateInstantMeasures(seriesInDays, 220)
err = conn.AppendMeasures(proto.AppendMeasuresReq{
MetricID: instantMetricID,
@ -78,10 +80,10 @@ GetMetric:
if err != nil {
log.Fatalf("conn.ListInstantMeasures: %s\n", err)
} else {
fmt.Printf("\nListInstantMeasures %s - %s:\n",
formatTime(uint32(since.Unix())), formatTime(uint32(until.Unix())))
fmt.Printf("\nListInstantMeasures %s %s:\n",
formatTime(since), formatTime(until))
for _, item := range instantList {
fmt.Printf(" %s => %.2f\n", formatTime(item.Timestamp), item.Value)
fmt.Printf(" %s => %.2f\n", formatTimestamp(item.Timestamp), item.Value)
}
}
@ -93,13 +95,13 @@ GetMetric:
} else {
fmt.Printf("\nListAllInstantMeasures (last 15 items):\n")
for _, item := range instantList[:15] {
fmt.Printf(" %s => %.2f\n", formatTime(item.Timestamp), item.Value)
fmt.Printf(" %s => %.2f\n", formatTimestamp(item.Timestamp), item.Value)
}
}
// LIST INSTANT PERIODS (group by hour)
until = time.Unix(int64(lastTimestamp+1), 0)
until = time.Unix(int64(lastTimestamp), 0)
since = until.Add(-24 * time.Hour)
instantPeriods, err := conn.ListInstantPeriods(proto.ListInstantPeriodsReq{
@ -120,16 +122,19 @@ GetMetric:
if err != nil {
log.Fatalf("conn.ListInstantPeriods: %s\n", err)
} else {
fmt.Printf("\nListInstantPeriods (1 day, group by hour):\n")
fmt.Printf("\nListInstantPeriods (%s − %s, group by day):\n",
formatDate(since), formatDate(until))
for _, item := range instantPeriods {
fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n", formatHourPeriod(item.Period), item.Min, item.Max, item.Avg)
fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n",
formatHourPeriod(item.Period), item.Min, item.Max, item.Avg)
}
}
// LIST INSTANT PERIODS (group by day)
until = time.Unix(int64(lastTimestamp+1), 0)
since = until.AddDate(0, 0, -7)
until = time.Unix(int64(lastTimestamp), 0)
since = until.AddDate(0, 0, -6)
instantPeriods, err = conn.ListInstantPeriods(proto.ListInstantPeriodsReq{
MetricID: instantMetricID,
@ -149,16 +154,19 @@ GetMetric:
if err != nil {
log.Fatalf("conn.ListInstantPeriods: %s\n", err)
} else {
fmt.Printf("\nListInstantPeriods (7 days, group by day):\n")
fmt.Printf("\nListInstantPeriods (%s − %s, group by day):\n",
formatDate(since), formatDate(until))
for _, item := range instantPeriods {
fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n", formatDayPeriod(item.Period), item.Min, item.Max, item.Avg)
fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n",
formatDayPeriod(item.Period), item.Min, item.Max, item.Avg)
}
}
// LIST INSTANT PERIODS (group by month)
until = time.Unix(int64(lastTimestamp+1), 0)
since = until.AddDate(0, 0, -62)
until = time.Unix(int64(lastTimestamp), 0)
since = until.AddDate(0, 0, -seriesInDays)
instantPeriods, err = conn.ListInstantPeriods(proto.ListInstantPeriodsReq{
MetricID: instantMetricID,
@ -178,9 +186,11 @@ GetMetric:
if err != nil {
log.Fatalf("conn.ListInstantPeriods: %s\n", err)
} else {
fmt.Printf("\nListInstantPeriods (62 days, group by month):\n")
fmt.Printf("\nListInstantPeriods (%s − %s, group by day):\n",
formatMonth(since), formatMonth(until))
for _, item := range instantPeriods {
fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n", formatMonthPeriod(item.Period), item.Min, item.Max, item.Avg)
fmt.Printf(" %s => min %.2f, max %.2f, avg %.2f\n",
formatMonthPeriod(item.Period), item.Min, item.Max, item.Avg)
}
}
@ -234,7 +244,9 @@ GetMetric:
// APPEND MEASURES
cumulativeMeasures := GenerateCumulativeMeasures(62)
cumulativeMeasures := GenerateCumulativeMeasures(seriesInDays)
//pretty.PPrintln("CUMULATIVE MEASURES", cumulativeMeasures)
err = conn.AppendMeasures(proto.AppendMeasuresReq{
MetricID: cumulativeMetricID,
@ -261,11 +273,12 @@ GetMetric:
if err != nil {
log.Fatalf("conn.ListCumulativeMeasures: %s\n", err)
} else {
fmt.Printf("\nListCumulativeMeasures %s - %s:\n",
formatTime(uint32(since.Unix())), formatTime(uint32(until.Unix())))
fmt.Printf("\nListCumulativeMeasures %s %s:\n",
formatTime(since), formatTime(until))
for _, item := range cumulativeList {
fmt.Printf(" %s => %.2f\n", formatTime(item.Timestamp), item.Value)
fmt.Printf(" %s => %.2f (%.2f)\n",
formatTimestamp(item.Timestamp), item.Value, item.Total)
}
}
@ -277,13 +290,14 @@ GetMetric:
} else {
fmt.Printf("\nListAllCumulativeMeasures (last 15 items):\n")
for _, item := range cumulativeList[:15] {
fmt.Printf(" %s => %.2f\n", formatTime(item.Timestamp), item.Value)
fmt.Printf(" %s => %.2f (%.2f)\n",
formatTimestamp(item.Timestamp), item.Value, item.Total)
}
}
// LIST CUMULATIVE PERIODS (group by hour)
//LIST CUMULATIVE PERIODS (group by hour)
until = time.Unix(int64(lastTimestamp+1), 0)
until = time.Unix(int64(lastTimestamp), 0)
since = until.Add(-24 * time.Hour)
cumulativePeriods, err := conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{
@ -303,16 +317,19 @@ GetMetric:
if err != nil {
log.Fatalf("conn.ListCumulativePeriods: %s\n", err)
} else {
fmt.Printf("\nListCumulativePeriods (1 day, group by hour):\n")
fmt.Printf("\nListCumulativePeriods (%s − %s, group by hour):\n",
formatDate(since), formatDate(until))
for _, item := range cumulativePeriods {
fmt.Printf(" %s => end value %.2f, total %.2f\n", formatHourPeriod(item.Period), item.EndValue, item.Total)
fmt.Printf(" %s => %.2f (%.2f)\n",
formatHourPeriod(item.Period), item.EndValue, item.EndValue-item.StartValue)
}
}
// LIST CUMULATIVE PERIODS (group by day)
until = time.Unix(int64(lastTimestamp+1), 0)
since = until.AddDate(0, 0, -7)
until = time.Unix(int64(lastTimestamp), 0)
since = until.AddDate(0, 0, -6)
cumulativePeriods, err = conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{
MetricID: cumulativeMetricID,
@ -331,16 +348,19 @@ GetMetric:
if err != nil {
log.Fatalf("conn.ListCumulativePeriods: %s\n", err)
} else {
fmt.Printf("\nListCumulativePeriods (7 days, group by day):\n")
fmt.Printf("\nListCumulativePeriods (%s − %s, group by day):\n",
formatDate(since), formatDate(until))
for _, item := range cumulativePeriods {
fmt.Printf(" %s => end value %.2f, total %.2f\n", formatDayPeriod(item.Period), item.EndValue, item.Total)
fmt.Printf(" %s => %.2f (%.2f)\n",
formatDayPeriod(item.Period), item.EndValue, item.EndValue-item.StartValue)
}
}
// LIST CUMULATIVE PERIODS (group by day)
until = time.Unix(int64(lastTimestamp+1), 0)
since = until.AddDate(0, 0, -62)
until = time.Unix(int64(lastTimestamp), 0)
since = until.AddDate(0, 0, -seriesInDays)
cumulativePeriods, err = conn.ListCumulativePeriods(proto.ListCumulativePeriodsReq{
MetricID: cumulativeMetricID,
@ -359,9 +379,12 @@ GetMetric:
if err != nil {
log.Fatalf("conn.ListCumulativePeriods: %s\n", err)
} else {
fmt.Printf("\nListCumulativePeriods (62 days, group by month):\n")
fmt.Printf("\nListCumulativePeriods (%s − %s, group by month):\n",
formatMonth(since), formatMonth(until))
for _, item := range cumulativePeriods {
fmt.Printf(" %s => end value %.2f, total %.2f\n", formatMonthPeriod(item.Period), item.EndValue, item.Total)
fmt.Printf(" %s => %.2f (%.2f)\n",
formatMonthPeriod(item.Period), item.EndValue, item.EndValue-item.StartValue)
}
}
@ -386,11 +409,21 @@ GetMetric:
}
}
const datetimeLayout = "2006-01-02 15:04:05"
func formatMonth(tm time.Time) string {
return tm.Format("2006-01")
}
func formatDate(tm time.Time) string {
return tm.Format("2006-01-02")
}
func formatTime(tm time.Time) string {
return tm.Format("2006-01-02 15:04:05")
}
func formatTime(timestamp uint32) string {
func formatTimestamp(timestamp uint32) string {
tm := time.Unix(int64(timestamp), 0)
return tm.Format(datetimeLayout)
return tm.Format("2006-01-02 15:04:05")
}
func formatHourPeriod(period uint32) string {

@ -5,6 +5,7 @@ go 1.24.0
require (
github.com/RoaringBitmap/roaring/v2 v2.5.0
gopkg.in/ini.v1 v1.67.0
gordenko.dev/dima/pretty v0.0.0-20221225212746-0c27d8c0ac69
)
require (

@ -18,3 +18,5 @@ gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gordenko.dev/dima/pretty v0.0.0-20221225212746-0c27d8c0ac69 h1:nyJ3mzTQ46yUeMZCdLyYcs7B5JCS54c67v84miyhq2E=
gordenko.dev/dima/pretty v0.0.0-20221225212746-0c27d8c0ac69/go.mod h1:AxgKDktpqBVyIOhIcP+nlCpK+EsJyjN5kPdqyd8euVU=

Binary file not shown.

@ -94,11 +94,11 @@ type CumulativeMeasure struct {
}
type CumulativePeriod struct {
Period uint32
Since uint32
Until uint32
EndValue float64
Total float64
Period uint32
Start uint32
End uint32
StartValue float64
EndValue float64
}
type InstantMeasure struct {
@ -108,8 +108,8 @@ type InstantMeasure struct {
type InstantPeriod struct {
Period uint32
Since uint32
Until uint32
Start uint32
End uint32
Min float64
Max float64
Avg float64

Binary file not shown.

@ -29,7 +29,7 @@ type InstantPeriodsWriter struct {
time2period func(uint32) time.Time
currentPeriod time.Time
lastTimestamp uint32
endTimestamp uint32 // время показания на конец периода
endTimestamp uint32
min float64
max float64
total float64
@ -43,7 +43,6 @@ func NewInstantPeriodsWriter(opt InstantPeriodsWriterOptions) (*InstantPeriodsWr
if opt.FirstHourOfDay < 0 || opt.FirstHourOfDay > 23 {
return nil, fmt.Errorf("wrong FirstHourOfDay option: %d", opt.FirstHourOfDay)
}
// Считаю q, чтобы заранее выделить массив для упаковки периодов
var q int
if (opt.AggregateFuncs & diploma.AggregateMin) == diploma.AggregateMin {
q++
@ -59,8 +58,6 @@ func NewInstantPeriodsWriter(opt InstantPeriodsWriterOptions) (*InstantPeriodsWr
return nil, errors.New("AggregateFuncs option is required")
}
// 12 - это period, since, until
// 8 - это размер float64
s := &InstantPeriodsWriter{
aggregateFuncs: opt.AggregateFuncs,
arr: make([]byte, 12+q*8),
@ -121,20 +118,14 @@ func (s *InstantPeriodsWriter) feed(timestamp uint32, value float64, isBuffer bo
if s.entries > 0 {
period := s.time2period(timestamp)
if period != s.currentPeriod {
// закрываю период
// готовый период
s.packPeriod(timestamp)
if isBuffer {
s.responder.BufferRecord(s.arr)
} else {
s.responder.AppendRecord(s.arr)
}
// затем
s.decrementPeriod()
//fmt.Println(" period: ", period.Format("2006-01-02 15:04:05"))
//fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05"))
for period.Before(s.currentPeriod) {
// вставляю пустышку
s.packBlankPeriod()
if isBuffer {
s.responder.BufferRecord(s.arr)
@ -142,9 +133,6 @@ func (s *InstantPeriodsWriter) feed(timestamp uint32, value float64, isBuffer bo
s.responder.AppendRecord(s.arr)
}
s.decrementPeriod()
//fmt.Println(" period: ", period.Format("2006-01-02 15:04:05"))
//fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05"))
//return
}
s.endTimestamp = timestamp
s.min = value
@ -157,7 +145,6 @@ func (s *InstantPeriodsWriter) feed(timestamp uint32, value float64, isBuffer bo
} else if value > s.max {
s.max = value
}
// для подсчета AVG
s.total += value
s.entries++
}
@ -176,7 +163,6 @@ func (s *InstantPeriodsWriter) decrementPeriod() {
switch s.groupBy {
case diploma.GroupByHour:
s.currentPeriod = s.currentPeriod.Add(-1 * time.Hour)
//fmt.Println("decrement")
case diploma.GroupByDay:
s.currentPeriod = s.currentPeriod.AddDate(0, 0, -1)
case diploma.GroupByMonth:
@ -185,11 +171,6 @@ func (s *InstantPeriodsWriter) decrementPeriod() {
}
func (s *InstantPeriodsWriter) packBlankPeriod() {
//period := s.currentPeriod.Format("2006-01-02 15:04:05")
//since := "0"
//until := "0"
//fmt.Printf("%s: %s - %s, %.0f - %.0f\n", period, since, until, 0.0, 0.0)
// until - это endTimestamp всегда
bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix()))
for i := 4; i < len(s.arr); i++ {
s.arr[i] = 0
@ -223,17 +204,10 @@ func (s *InstantPeriodsWriter) packPeriod(timestamp uint32) {
}
}
/*
Идея с разбивкой на периоды:
Для каждого периода нахожу одно последнее значение.
Начало периода - это конец предыдущего. Если предыдущий не строго предыдущий,
а с пропусками - на место пропусков вставляю пустышки.
Плюс такого решения - я всегда показываю реальное значение на конец периода.
*/
type CumulativePeriodsWriter struct {
arr []byte
responder *ChunkedResponder
since uint32
firstHourOfDay int
currentPeriod time.Time
groupBy diploma.GroupBy
@ -247,6 +221,7 @@ type CumulativePeriodsWriter struct {
type CumulativePeriodsWriterOptions struct {
Dst io.Writer
GroupBy diploma.GroupBy
Since uint32
FirstHourOfDay int
}
@ -254,7 +229,6 @@ func NewCumulativePeriodsWriter(opt CumulativePeriodsWriterOptions) (*Cumulative
if opt.Dst == nil {
return nil, errors.New("Dst option is required")
}
// Считаю q, чтобы заранее выделить массив для упаковки периодов
if opt.FirstHourOfDay < 0 || opt.FirstHourOfDay > 23 {
return nil, fmt.Errorf("wrong firstHourOfDay option: %d", opt.FirstHourOfDay)
}
@ -262,6 +236,7 @@ func NewCumulativePeriodsWriter(opt CumulativePeriodsWriterOptions) (*Cumulative
s := &CumulativePeriodsWriter{
arr: make([]byte, 28),
responder: NewChunkedResponder(opt.Dst),
since: opt.Since,
firstHourOfDay: opt.FirstHourOfDay,
groupBy: opt.GroupBy,
}
@ -322,17 +297,13 @@ func (s *CumulativePeriodsWriter) feed(timestamp uint32, value float64, isBuffer
if s.endTimestamp > 0 {
period := s.time2period(timestamp)
if period != s.currentPeriod {
// закрываю период
s.packPeriod(timestamp, value)
if isBuffer {
s.responder.BufferRecord(s.arr)
} else {
s.responder.AppendRecord(s.arr)
}
// затем
s.decrementPeriod()
//fmt.Println(" period: ", period.Format("2006-01-02 15:04:05"))
//fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05"))
for period.Before(s.currentPeriod) {
// вставляю пустышку
s.packBlankPeriod()
@ -342,9 +313,6 @@ func (s *CumulativePeriodsWriter) feed(timestamp uint32, value float64, isBuffer
s.responder.AppendRecord(s.arr)
}
s.decrementPeriod()
//fmt.Println(" period: ", period.Format("2006-01-02 15:04:05"))
//fmt.Println("current period: ", s.currentPeriod.Format("2006-01-02 15:04:05"))
//return
}
s.endTimestamp = timestamp
s.endValue = value
@ -362,7 +330,6 @@ func (s *CumulativePeriodsWriter) decrementPeriod() {
switch s.groupBy {
case diploma.GroupByHour:
s.currentPeriod = s.currentPeriod.Add(-1 * time.Hour)
//fmt.Println("decrement")
case diploma.GroupByDay:
s.currentPeriod = s.currentPeriod.AddDate(0, 0, -1)
case diploma.GroupByMonth:
@ -371,11 +338,6 @@ func (s *CumulativePeriodsWriter) decrementPeriod() {
}
func (s *CumulativePeriodsWriter) packBlankPeriod() {
//period := s.currentPeriod.Format("2006-01-02 15:04:05")
//since := "0"
//until := "0"
//fmt.Printf("%s: %s - %s, %.0f - %.0f\n", period, since, until, 0.0, 0.0)
// until - это endTimestamp всегда
bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix()))
for i := 4; i < len(s.arr); i++ {
s.arr[i] = 0
@ -383,11 +345,6 @@ func (s *CumulativePeriodsWriter) packBlankPeriod() {
}
func (s *CumulativePeriodsWriter) packPeriod(start uint32, startValue float64) {
//period := s.currentPeriod.Format("2006-01-02 15:04:05")
//since := time.Unix(int64(start), 0).Format("2006-01-02 15:04:05")
//until := time.Unix(int64(s.endTimestamp), 0).Format("2006-01-02 15:04:05")
//fmt.Printf("%s: %s - %s, %.0f - %.0f\n", period, since, until, startValue, s.endValue)
// until - это endTimestamp всегда
bin.PutUint32(s.arr[0:], uint32(s.currentPeriod.Unix()))
bin.PutUint32(s.arr[4:], start)
bin.PutUint32(s.arr[8:], s.endTimestamp)
@ -397,12 +354,14 @@ func (s *CumulativePeriodsWriter) packPeriod(start uint32, startValue float64) {
func (s *CumulativePeriodsWriter) Close() error {
if s.endTimestamp > 0 {
if s.lastTimestamp != s.endTimestamp {
s.packPeriod(s.lastTimestamp, s.lastValue)
} else {
s.packPeriod(s.endTimestamp, s.endValue)
if s.endTimestamp >= s.since {
if s.lastTimestamp != s.endTimestamp {
s.packPeriod(s.lastTimestamp, s.lastValue)
} else {
s.packPeriod(s.endTimestamp, s.endValue)
}
s.responder.AppendRecord(s.arr)
}
s.responder.AppendRecord(s.arr)
}
return s.responder.Flush()
}

@ -51,7 +51,6 @@ type InstantMeasureWriter struct {
}
func NewInstantMeasureWriter(dst io.Writer, since uint32) *InstantMeasureWriter {
// 12 - это timestamp, value
return &InstantMeasureWriter{
arr: make([]byte, 12),
responder: NewChunkedResponder(dst),
@ -101,7 +100,6 @@ type CumulativeMeasureWriter struct {
}
func NewCumulativeMeasureWriter(dst io.Writer, since uint32) *CumulativeMeasureWriter {
// 20 - это timestamp, value, total
return &CumulativeMeasureWriter{
arr: make([]byte, 20),
responder: NewChunkedResponder(dst),
@ -138,10 +136,8 @@ func (s *CumulativeMeasureWriter) pack(total float64) {
func (s *CumulativeMeasureWriter) Close() error {
if s.endTimestamp >= s.since {
// endTimestamp внутри заданного периода. Других показаний нет,
// поэтому время добавляю, но накопленную сумму ставлю 0.
s.pack(0)
// Если < since - ничего делать не нужно, ибо накопленная сумма уже добавлена
s.responder.BufferRecord(s.arr)
}
return s.responder.Flush()
}

@ -70,36 +70,20 @@ func (s *ChunkedResponder) Flush() error {
if _, err := s.dst.Write(endMsg); err != nil {
return err
}
//fmt.Printf("sent endMsg %d\n", endMsg)
return nil
}
func (s *ChunkedResponder) sendBuffered() (err error) {
msg := s.buf.Bytes()
bin.PutUint32(msg[1:], uint32(s.recordsQty))
//fmt.Printf("put uint16: %d\n", msg[:3])
//fmt.Printf("send %d records\n", s.recordsQty)
//fmt.Printf("send buffered: %d, qty: %d\n", msg, s.recordsQty)
n, err := s.dst.Write(msg)
if err != nil {
return
}
if n != len(msg) {
return fmt.Errorf("incomplete write %d bytes instead of %d", n, len(msg))
}
s.buf.Reset()
return
}
// Для Aggregation пишем функцию определения периода и пуляем фактические периоды
//
// By default net/http.Server uses 4KB buffers, which are flushed to client with chunked responses.
// These buffers may result in visible overhead for responses exceeding a few megabytes.
// So allocate 64Kb buffers.
// bw: bufio.NewWriterSize(w, 64*1024),

Loading…
Cancel
Save