спеціалізована СУБД для зберігання та обробки показань датчиків та лічильників
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
diploma/chunkenc/time_delta_delta.go

374 lines
7.5 KiB

package chunkenc
import (
"fmt"
"gordenko.dev/dima/diploma/bin"
"gordenko.dev/dima/diploma/conbuf"
)
// REVERSE
const (
lastUnixtimeIdx = 0
baseDeltaIdx = 4
)
type ReverseTimeDeltaOfDeltaCompressor struct {
buf *conbuf.ContinuousBuffer
pos int
baseDelta uint32
lastUnixtime uint32
lastDeltaOfDelta int64
length uint16
numIdx int
}
func NewReverseTimeDeltaOfDeltaCompressor(buf *conbuf.ContinuousBuffer, size int) *ReverseTimeDeltaOfDeltaCompressor {
s := &ReverseTimeDeltaOfDeltaCompressor{
buf: buf,
pos: size,
}
if size > 0 {
s.restoreState()
}
return s
}
func (s *ReverseTimeDeltaOfDeltaCompressor) restoreState() {
s.lastUnixtime = s.buf.GetUint32(lastUnixtimeIdx)
if s.pos > 4 {
u64, _, err := s.buf.GetVarUint64(baseDeltaIdx)
if err != nil {
panic(fmt.Sprintf("bug: get base delta: %s", err))
}
s.baseDelta = uint32(u64)
pos := s.pos - 1
idxOf8 := uint(8 - s.buf.GetByte(pos))
pos--
s8 := s.buf.GetByte(pos)
pos--
var n int
s.lastDeltaOfDelta, n, err = s.buf.ReverseGetVarInt64(pos)
if err != nil {
panic(err)
}
pos -= n
s.numIdx = pos + 1
var flag byte = 1 << idxOf8
if (s8 & flag) == flag {
s.length, _ = s.buf.DecodeRunLength(pos)
}
}
}
func (s *ReverseTimeDeltaOfDeltaCompressor) Size() int {
return s.pos
}
func (s *ReverseTimeDeltaOfDeltaCompressor) CalcRequiredSpace(unixtime uint32) int {
if s.pos == 0 {
return 4
}
if s.baseDelta == 0 {
baseDelta := unixtime - s.lastUnixtime
n := bin.CalcVarUint64Length(uint64(baseDelta))
return n + 3
}
deltaOfDelta := int64(unixtime-s.lastUnixtime) - int64(s.baseDelta)
if deltaOfDelta == s.lastDeltaOfDelta {
if s.length == 0 {
return 1
} else {
newLength := s.length + 1
if newLength < 130 {
return 0
} else if newLength == 130 {
return 1
} else {
if newLength < 32769 {
return 0
} else {
n := bin.CalcVarInt64Length(deltaOfDelta)
n += 2
s8q := s.buf.GetByte(s.pos - 1)
if s8q == 8 {
n -= 1
} else {
n -= 2
}
return n
}
}
}
} else {
n := bin.CalcVarInt64Length(deltaOfDelta)
n += 2
s8q := s.buf.GetByte(s.pos - 1)
if s8q == 8 {
n -= 1
} else {
n -= 2
}
return n
}
}
func (s *ReverseTimeDeltaOfDeltaCompressor) Append(unixtime uint32) {
if s.pos == 0 {
s.lastUnixtime = unixtime
s.buf.PutUint32(lastUnixtimeIdx, unixtime)
s.pos += 4
return
}
if s.baseDelta == 0 {
s.baseDelta = unixtime - s.lastUnixtime
s.lastDeltaOfDelta = 0
s.lastUnixtime = unixtime
s.buf.PutUint32(lastUnixtimeIdx, unixtime)
n := s.buf.PutVarUint64(s.pos, uint64(s.baseDelta))
s.pos += n
s.encodeNewDeltaOfDelta(0, 0, 1)
return
}
deltaOfDelta := int64(unixtime-s.lastUnixtime) - int64(s.baseDelta)
s.lastUnixtime = unixtime
s.buf.PutUint32(lastUnixtimeIdx, unixtime)
if deltaOfDelta == s.lastDeltaOfDelta {
if s.length == 0 {
s.length = 2
s.shiftOnePosToRight()
s.buf.SetByte(s.numIdx-1, 0)
s8q := s.buf.GetByte(s.pos - 1)
s.buf.SetFlag(s.pos-2, 1<<(8-s8q))
} else {
s.length++
if s.length < 130 {
s.buf.SetByte(s.numIdx-1, byte(s.length-2))
} else if s.length == 130 {
s.shiftOnePosToRight()
s.encode2bLength()
} else {
if s.length < 32769 {
s.encode2bLength()
} else {
s.appendNewDeltaOfDelta(deltaOfDelta)
}
}
}
} else {
s.appendNewDeltaOfDelta(deltaOfDelta)
}
}
func (s *ReverseTimeDeltaOfDeltaCompressor) appendNewDeltaOfDelta(deltaOfDelta int64) {
s.length = 0
s8 := s.buf.GetByte(s.pos - 2)
s8q := s.buf.GetByte(s.pos - 1)
if s8q == 8 {
s.pos -= 1
s8 = 0
s8q = 1
} else {
s.pos -= 2
s8q++
}
s.encodeNewDeltaOfDelta(deltaOfDelta, s8, s8q)
}
func (s *ReverseTimeDeltaOfDeltaCompressor) encodeNewDeltaOfDelta(deltaOfDelta int64, s8 byte, s8q byte) {
s.lastDeltaOfDelta = deltaOfDelta
s.numIdx = s.pos
n := s.buf.ReversePutVarInt64(s.pos, deltaOfDelta)
s.pos += n
s.buf.SetByte(s.pos, s8)
s.pos++
s.buf.SetByte(s.pos, s8q)
s.pos++
}
func (s *ReverseTimeDeltaOfDeltaCompressor) shiftOnePosToRight() {
s.buf.ShiftOnePosToRight(s.numIdx, s.pos)
s.pos++
s.numIdx++
}
func (s *ReverseTimeDeltaOfDeltaCompressor) encode2bLength() {
num := s.length - 2
s.buf.SetByte(s.numIdx-1, byte(num&127)|128)
s.buf.SetByte(s.numIdx-2, byte(num>>7))
}
func (s *ReverseTimeDeltaOfDeltaCompressor) DeleteLast() {
var (
s8q = s.buf.GetByte(s.pos - 1)
s8 = s.buf.GetByte(s.pos - 2)
flag byte = 1 << uint(8-s8q)
)
if s.length > 0 {
if s.length == 2 {
s.length = 0
s.buf.UnsetFlag(s.pos-2, flag)
s.buf.ShiftOnePosToLeft(s.numIdx, s.pos)
s.numIdx--
s.pos--
} else if s.length < 130 {
s.length--
s.buf.SetByte(s.numIdx-1, byte(s.length)-2)
} else if s.length == 130 {
s.length--
s.buf.ShiftOnePosToLeft(s.numIdx, s.pos)
s.numIdx--
s.pos--
s.buf.SetByte(s.numIdx-1, byte(s.length)-2)
} else {
s.length--
s.encode2bLength()
}
} else {
if s8q > 1 {
s8q--
flag = 1 << uint(8-s8q)
s.pos = s.numIdx + 2
s.buf.SetByte(s.pos-2, s8)
s.buf.SetByte(s.pos-1, s8q)
} else {
s.pos = s.numIdx + 1
s.buf.SetByte(s.pos-1, 8)
s8 = s.buf.GetByte(s.pos - 2)
flag = 1
}
var (
pos = s.pos - 3
n int
err error
)
s.lastDeltaOfDelta, n, err = s.buf.ReverseGetVarInt64(pos)
if err != nil {
panic(err)
}
s.numIdx = pos - n
if (s8 & flag) == flag {
s.length, _ = s.buf.DecodeRunLength(s.numIdx - 1)
}
}
delta := int64(s.baseDelta) + s.lastDeltaOfDelta
s.lastUnixtime = uint32(int64(s.lastUnixtime) - delta)
s.buf.PutUint32(lastUnixtimeIdx, s.lastUnixtime)
}
type ReverseTimeDeltaOfDeltaDecompressor struct {
step byte
buf *conbuf.ContinuousBuffer
pos int
bound int
lastUnixtime uint32
baseDelta uint32
lastDeltaOfDelta int64
length uint16
idxOf8 uint
s8 byte
}
func NewReverseTimeDeltaOfDeltaDecompressor(buf *conbuf.ContinuousBuffer, size int) *ReverseTimeDeltaOfDeltaDecompressor {
return &ReverseTimeDeltaOfDeltaDecompressor{
buf: buf,
pos: size,
}
}
func (s *ReverseTimeDeltaOfDeltaDecompressor) NextValue() (value uint32, done bool) {
if s.step == 0 {
if s.pos == 0 {
return 0, true
}
s.lastUnixtime = s.buf.GetUint32(lastUnixtimeIdx)
s.step = 1
return s.lastUnixtime, false
}
if s.step == 1 {
if s.pos == baseDeltaIdx {
return 0, true
}
u64, n, err := s.buf.GetVarUint64(baseDeltaIdx)
if err != nil {
panic("EOF")
}
s.bound = baseDeltaIdx + n
s.baseDelta = uint32(u64)
s.pos--
s.idxOf8 = uint(8 - s.buf.GetByte(s.pos))
s.pos--
s.s8 = s.buf.GetByte(s.pos)
s.pos--
s.readVar()
if s.length > 0 {
s.length--
}
s.step = 2
return s.lastUnixtime, false
}
if s.length > 0 {
s.length--
delta := int64(s.baseDelta) + s.lastDeltaOfDelta
s.lastUnixtime = uint32(int64(s.lastUnixtime) - delta)
return s.lastUnixtime, false
}
if s.pos < s.bound {
return 0, true
}
if s.idxOf8 == 0 {
s.s8 = s.buf.GetByte(s.pos)
s.pos--
}
s.readVar()
if s.length > 0 {
s.length--
}
return s.lastUnixtime, false
}
func (s *ReverseTimeDeltaOfDeltaDecompressor) readVar() {
var (
n int
err error
)
s.lastDeltaOfDelta, n, err = s.buf.ReverseGetVarInt64(s.pos)
if err != nil {
panic(err)
}
s.pos -= n
delta := int64(s.baseDelta) + s.lastDeltaOfDelta
s.lastUnixtime = uint32(int64(s.lastUnixtime) - delta)
var flag byte = 1 << s.idxOf8
if (s.s8 & flag) == flag {
s.length, n = s.buf.DecodeRunLength(s.pos)
s.pos -= n
}
if s.idxOf8 == 7 {
s.idxOf8 = 0
} else {
s.idxOf8++
}
}