спеціалізована СУБД для зберігання та обробки показань датчиків та лічильників
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
diploma/database/snapshot.go

287 lines
6.1 KiB

package database
import (
"fmt"
"hash/crc32"
"io"
"os"
"path/filepath"
octopus "gordenko.dev/dima/diploma"
"gordenko.dev/dima/diploma/atree"
"gordenko.dev/dima/diploma/bin"
"gordenko.dev/dima/diploma/chunkenc"
"gordenko.dev/dima/diploma/conbuf"
"gordenko.dev/dima/diploma/freelist"
)
/*
Формат:
//lsn - varuint (останній LSN, що змінив дані у RAM)
metricsQty - varuint
[metric]*
где metric - це:
metricID - 4b
metricType - 1b
fracDigits - 1b
rootPageNo - 4b
lastPageNo - 4b
since - 4b
sinceValue - 8b
until - 4b
untilValue - 8b
timestamps size - 2b
values size - 2b
timestams payload - Nb
values payload - Nb
dataFreeList size - varuint
dataFreeList - Nb
indexFreeList size - varuint
indexFreeList - Nb
CRC32 - 4b
*/
const metricHeaderSize = 42
func (s *Database) dumpSnapshot(logNumber int) (err error) {
var (
fileName = filepath.Join(s.dir, fmt.Sprintf("%d.snapshot", logNumber))
hasher = crc32.NewIEEE()
prefix = make([]byte, metricHeaderSize)
)
file, err := os.OpenFile(fileName, os.O_CREATE|os.O_WRONLY, 0770)
if err != nil {
return
}
dst := io.MultiWriter(file, hasher)
_, err = bin.WriteVarUint64(dst, uint64(len(s.metrics)))
if err != nil {
return
}
for metricID, metric := range s.metrics {
tSize := metric.Timestamps.Size()
vSize := metric.Values.Size()
bin.PutUint32(prefix[0:], metricID)
prefix[4] = byte(metric.MetricType)
prefix[5] = metric.FracDigits
bin.PutUint32(prefix[6:], metric.RootPageNo)
bin.PutUint32(prefix[10:], metric.LastPageNo)
bin.PutUint32(prefix[14:], metric.Since)
bin.PutFloat64(prefix[18:], metric.SinceValue)
bin.PutUint32(prefix[26:], metric.Until)
bin.PutFloat64(prefix[30:], metric.UntilValue)
bin.PutUint16(prefix[38:], uint16(tSize))
bin.PutUint16(prefix[40:], uint16(vSize))
_, err = dst.Write(prefix)
if err != nil {
return
}
// copy timestamps
remaining := tSize
for _, buf := range metric.TimestampsBuf.Chunks() {
if remaining < len(buf) {
buf = buf[:remaining]
}
_, err = dst.Write(buf)
if err != nil {
return
}
remaining -= len(buf)
if remaining == 0 {
break
}
}
// copy values
remaining = vSize
for _, buf := range metric.ValuesBuf.Chunks() {
if remaining < len(buf) {
buf = buf[:remaining]
}
_, err = dst.Write(buf)
if err != nil {
return
}
remaining -= len(buf)
if remaining == 0 {
break
}
}
}
// free data pages
err = freeListWriteTo(s.dataFreeList, dst)
if err != nil {
return
}
// free index pages
err = freeListWriteTo(s.indexFreeList, dst)
if err != nil {
return
}
bin.WriteUint32(file, hasher.Sum32())
err = file.Sync()
if err != nil {
return
}
err = file.Close()
if err != nil {
return
}
prevLogNumber := logNumber - 1
prevChanges := filepath.Join(s.dir, fmt.Sprintf("%d.changes", prevLogNumber))
prevSnapshot := filepath.Join(s.dir, fmt.Sprintf("%d.snapshot", prevLogNumber))
isExist, err := isFileExist(prevChanges)
if err != nil {
return
}
if isExist {
err = os.Remove(prevChanges)
if err != nil {
octopus.Abort(octopus.DeletePrevChangesFileFailed, err)
}
}
isExist, err = isFileExist(prevSnapshot)
if err != nil {
return
}
if isExist {
err = os.Remove(prevSnapshot)
if err != nil {
octopus.Abort(octopus.DeletePrevSnapshotFileFailed, err)
}
}
return
}
func (s *Database) loadSnapshot(fileName string) (err error) {
var (
hasher = crc32.NewIEEE()
metricsQty int
header = make([]byte, metricHeaderSize)
body = make([]byte, atree.DataPageSize)
)
file, err := os.Open(fileName)
if err != nil {
return
}
src := io.TeeReader(file, hasher)
u64, _, err := bin.ReadVarUint64(src)
if err != nil {
return
}
metricsQty = int(u64)
for range metricsQty {
var metric _metric
err = bin.ReadNInto(src, header)
if err != nil {
return
}
metricID := bin.GetUint32(header[0:])
metric.MetricType = octopus.MetricType(header[4])
metric.FracDigits = header[5]
metric.RootPageNo = bin.GetUint32(header[6:])
metric.LastPageNo = bin.GetUint32(header[10:])
metric.Since = bin.GetUint32(header[14:])
metric.SinceValue = bin.GetFloat64(header[18:])
metric.Until = bin.GetUint32(header[26:])
metric.UntilValue = bin.GetFloat64(header[30:])
tSize := bin.GetUint16(header[38:])
vSize := bin.GetUint16(header[40:])
buf := body[:tSize]
err = bin.ReadNInto(src, buf)
if err != nil {
return
}
metric.TimestampsBuf = conbuf.NewFromBuffer(buf)
buf = body[:vSize]
err = bin.ReadNInto(src, buf)
if err != nil {
return
}
metric.ValuesBuf = conbuf.NewFromBuffer(buf)
metric.Timestamps = chunkenc.NewReverseTimeDeltaOfDeltaCompressor(
metric.TimestampsBuf, int(tSize))
if metric.MetricType == octopus.Cumulative {
metric.Values = chunkenc.NewReverseCumulativeDeltaCompressor(
metric.ValuesBuf, int(vSize), metric.FracDigits)
} else {
metric.Values = chunkenc.NewReverseInstantDeltaCompressor(
metric.ValuesBuf, int(vSize), metric.FracDigits)
}
s.metrics[metricID] = &metric
}
err = restoreFreeList(s.dataFreeList, src)
if err != nil {
return fmt.Errorf("restore dataFreeList: %s", err)
}
err = restoreFreeList(s.indexFreeList, src)
if err != nil {
return fmt.Errorf("restore indexFreeList: %s", err)
}
calculatedChecksum := hasher.Sum32()
writtenChecksum, err := bin.ReadUint32(file)
if err != nil {
return
}
if calculatedChecksum != writtenChecksum {
return fmt.Errorf("calculated checksum %d not equal written checksum %d", calculatedChecksum, writtenChecksum)
}
return
}
// HELPERS
func freeListWriteTo(freeList *freelist.FreeList, dst io.Writer) error {
serialized, err := freeList.Serialize()
if err != nil {
octopus.Abort(octopus.FailedFreeListSerialize, err)
}
_, err = bin.WriteVarUint64(dst, uint64(len(serialized)))
if err != nil {
return err
}
_, err = dst.Write(serialized)
if err != nil {
return err
}
return nil
}
func restoreFreeList(freeList *freelist.FreeList, src io.Reader) error {
size, _, err := bin.ReadVarUint64(src)
if err != nil {
return err
}
serialized, err := bin.ReadN(src, int(size))
if err != nil {
return err
}
freeList.Restore(serialized)
return nil
}