package transform import ( "io" "gordenko.dev/dima/diploma/bin" ) // CURRENT VALUE WRITER type CurrentValue struct { MetricID uint32 Timestamp uint32 Value float64 } type CurrentValueWriter struct { arr []byte responder *ChunkedResponder } func NewCurrentValueWriter(dst io.Writer) *CurrentValueWriter { return &CurrentValueWriter{ arr: make([]byte, 16), responder: NewChunkedResponder(dst), } } func (s *CurrentValueWriter) BufferValue(m CurrentValue) { bin.PutUint32(s.arr[0:], m.MetricID) bin.PutUint32(s.arr[4:], m.Timestamp) bin.PutFloat64(s.arr[8:], m.Value) s.responder.BufferRecord(s.arr) } func (s *CurrentValueWriter) Close() error { return s.responder.Flush() } // INSTANT MEASURE WRITER type InstantMeasure struct { Timestamp uint32 Value float64 } type InstantMeasureWriter struct { arr []byte responder *ChunkedResponder since uint32 } func NewInstantMeasureWriter(dst io.Writer, since uint32) *InstantMeasureWriter { return &InstantMeasureWriter{ arr: make([]byte, 12), responder: NewChunkedResponder(dst), since: since, } } func (s *InstantMeasureWriter) Feed(timestamp uint32, value float64) { s.feed(timestamp, value, false) } func (s *InstantMeasureWriter) FeedNoSend(timestamp uint32, value float64) { s.feed(timestamp, value, true) } func (s *InstantMeasureWriter) feed(timestamp uint32, value float64, isBuffer bool) { if timestamp < s.since { return } bin.PutUint32(s.arr[0:], timestamp) bin.PutFloat64(s.arr[4:], value) if isBuffer { s.responder.BufferRecord(s.arr) } else { s.responder.AppendRecord(s.arr) } } func (s *InstantMeasureWriter) Close() error { return s.responder.Flush() } // CUMULATIVE MEASURE WRITER type CumulativeMeasure struct { Timestamp uint32 Value float64 Total float64 } type CumulativeMeasureWriter struct { arr []byte responder *ChunkedResponder since uint32 endTimestamp uint32 endValue float64 } func NewCumulativeMeasureWriter(dst io.Writer, since uint32) *CumulativeMeasureWriter { return &CumulativeMeasureWriter{ arr: make([]byte, 20), responder: NewChunkedResponder(dst), since: since, } } func (s *CumulativeMeasureWriter) Feed(timestamp uint32, value float64) { s.feed(timestamp, value, false) } func (s *CumulativeMeasureWriter) FeedNoSend(timestamp uint32, value float64) { s.feed(timestamp, value, true) } func (s *CumulativeMeasureWriter) feed(timestamp uint32, value float64, isBuffer bool) { if s.endTimestamp > 0 { s.pack(s.endValue - value) if isBuffer { s.responder.BufferRecord(s.arr) } else { s.responder.AppendRecord(s.arr) } } s.endTimestamp = timestamp s.endValue = value } func (s *CumulativeMeasureWriter) pack(total float64) { bin.PutUint32(s.arr[0:], s.endTimestamp) bin.PutFloat64(s.arr[4:], s.endValue) bin.PutFloat64(s.arr[12:], total) } func (s *CumulativeMeasureWriter) Close() error { if s.endTimestamp >= s.since { s.pack(0) s.responder.BufferRecord(s.arr) } return s.responder.Flush() }