You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
346 lines
6.8 KiB
346 lines
6.8 KiB
package txlog
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"hash/crc32"
|
|
"io"
|
|
"os"
|
|
|
|
"gordenko.dev/dima/diploma"
|
|
"gordenko.dev/dima/diploma/bin"
|
|
"gordenko.dev/dima/diploma/proto"
|
|
)
|
|
|
|
type Reader struct {
|
|
file *os.File
|
|
reader *bufio.Reader
|
|
}
|
|
|
|
type ReaderOptions struct {
|
|
FileName string
|
|
BufferSize int
|
|
}
|
|
|
|
func NewReader(opt ReaderOptions) (*Reader, error) {
|
|
if opt.FileName == "" {
|
|
return nil, errors.New("FileName option is required")
|
|
}
|
|
if opt.BufferSize <= 0 {
|
|
return nil, errors.New("BufferSize option is required")
|
|
}
|
|
|
|
file, err := os.Open(opt.FileName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Reader{
|
|
file: file,
|
|
reader: bufio.NewReaderSize(file, 1024*1024),
|
|
}, nil
|
|
}
|
|
|
|
func (s *Reader) Close() {
|
|
s.file.Close()
|
|
}
|
|
|
|
func (s *Reader) ReadPacket() (uint32, []any, bool, error) {
|
|
prefix := make([]byte, packetPrefixSize)
|
|
n, err := s.reader.Read(prefix)
|
|
if err != nil {
|
|
if err == io.EOF && n == 0 {
|
|
return 0, nil, true, nil
|
|
} else {
|
|
return 0, nil, false, fmt.Errorf("read packet prefix: %s", err)
|
|
}
|
|
}
|
|
|
|
length := bin.GetUint32(prefix[lengthIdx:])
|
|
storedCRC := bin.GetUint32(prefix[checksumIdx:])
|
|
lsn := bin.GetUint32(prefix[lsnIdx:])
|
|
|
|
body, err := bin.ReadN(s.reader, int(length))
|
|
if err != nil {
|
|
return 0, nil, false, fmt.Errorf("read packet body: %s", err)
|
|
}
|
|
|
|
hasher := crc32.NewIEEE()
|
|
hasher.Write(prefix[lsnIdx:])
|
|
hasher.Write(body)
|
|
|
|
calculatedCRC := hasher.Sum32()
|
|
|
|
if calculatedCRC != storedCRC {
|
|
return 0, nil, false, fmt.Errorf("stored CRC %d != calculated CRC %d",
|
|
storedCRC, calculatedCRC)
|
|
}
|
|
|
|
records, err := s.parseRecords(body)
|
|
if err != nil {
|
|
return 0, nil, false, err
|
|
}
|
|
return lsn, records, false, nil
|
|
}
|
|
|
|
func (s *Reader) parseRecords(body []byte) ([]any, error) {
|
|
var (
|
|
src = bytes.NewBuffer(body)
|
|
records []any
|
|
)
|
|
|
|
for {
|
|
recordType, err := src.ReadByte()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
return records, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
switch recordType {
|
|
case CodeAddedMetric:
|
|
var rec AddedMetric
|
|
rec, err = s.readAddedMetric(src)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
records = append(records, rec)
|
|
|
|
case CodeDeletedMetric:
|
|
var rec DeletedMetric
|
|
rec, err = s.readDeletedMetric(src)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
records = append(records, rec)
|
|
|
|
case CodeAppendedMeasure:
|
|
var rec AppendedMeasure
|
|
rec, err = s.readAppendedMeasure(src)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
records = append(records, rec)
|
|
|
|
case CodeAppendedMeasures:
|
|
var rec AppendedMeasures
|
|
rec, err = s.readAppendedMeasures(src)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
records = append(records, rec)
|
|
|
|
case CodeAppendedMeasureWithOverflow:
|
|
var rec AppendedMeasureWithOverflow
|
|
rec, err = s.readAppendedMeasureWithOverflow(src)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
records = append(records, rec)
|
|
|
|
case CodeDeletedMeasures:
|
|
var rec DeletedMeasures
|
|
rec, err = s.readDeletedMeasures(src)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
records = append(records, rec)
|
|
|
|
default:
|
|
return nil, fmt.Errorf("unknown record type code: %d", recordType)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Reader) readAddedMetric(src *bytes.Buffer) (_ AddedMetric, err error) {
|
|
arr, err := bin.ReadN(src, 6)
|
|
if err != nil {
|
|
return
|
|
}
|
|
return AddedMetric{
|
|
MetricID: bin.GetUint32(arr),
|
|
MetricType: diploma.MetricType(arr[4]),
|
|
FracDigits: int(arr[5]),
|
|
}, nil
|
|
}
|
|
|
|
func (s *Reader) readDeletedMetric(src *bytes.Buffer) (_ DeletedMetric, err error) {
|
|
var rec DeletedMetric
|
|
rec.MetricID, err = bin.ReadUint32(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
// free data pages
|
|
dataQty, _, err := bin.ReadVarUint64(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
for range dataQty {
|
|
var pageNo uint32
|
|
pageNo, err = bin.ReadUint32(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
rec.FreeDataPages = append(rec.FreeDataPages, pageNo)
|
|
}
|
|
// free index pages
|
|
indexQty, _, err := bin.ReadVarUint64(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
for range indexQty {
|
|
var pageNo uint32
|
|
pageNo, err = bin.ReadUint32(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
rec.FreeIndexPages = append(rec.FreeIndexPages, pageNo)
|
|
}
|
|
return rec, nil
|
|
}
|
|
|
|
func (s *Reader) readAppendedMeasure(src *bytes.Buffer) (_ AppendedMeasure, err error) {
|
|
arr, err := bin.ReadN(src, 16)
|
|
if err != nil {
|
|
return
|
|
}
|
|
return AppendedMeasure{
|
|
MetricID: bin.GetUint32(arr[0:]),
|
|
Timestamp: bin.GetUint32(arr[4:]),
|
|
Value: bin.GetFloat64(arr[8:]),
|
|
}, nil
|
|
}
|
|
|
|
func (s *Reader) readAppendedMeasures(src *bytes.Buffer) (_ AppendedMeasures, err error) {
|
|
var rec AppendedMeasures
|
|
rec.MetricID, err = bin.ReadUint32(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
qty, err := bin.ReadUint16(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
for range qty {
|
|
var measure proto.Measure
|
|
measure.Timestamp, err = bin.ReadUint32(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
measure.Value, err = bin.ReadFloat64(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
rec.Measures = append(rec.Measures, measure)
|
|
}
|
|
return rec, nil
|
|
}
|
|
|
|
func (s *Reader) readAppendedMeasureWithOverflow(src *bytes.Buffer) (_ AppendedMeasureWithOverflow, err error) {
|
|
var (
|
|
b byte
|
|
rec AppendedMeasureWithOverflow
|
|
)
|
|
rec.MetricID, err = bin.ReadUint32(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
rec.Timestamp, err = bin.ReadUint32(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
rec.Value, err = bin.ReadFloat64(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
b, err = src.ReadByte()
|
|
if err != nil {
|
|
return
|
|
}
|
|
rec.IsDataPageReused = b == 1
|
|
|
|
rec.DataPageNo, err = bin.ReadUint32(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
b, err = src.ReadByte()
|
|
if err != nil {
|
|
return
|
|
}
|
|
if b == 1 {
|
|
rec.IsRootChanged = true
|
|
rec.RootPageNo, err = bin.ReadUint32(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
// index pages
|
|
indexQty, err := src.ReadByte()
|
|
if err != nil {
|
|
return
|
|
}
|
|
for range indexQty {
|
|
var pageNo uint32
|
|
pageNo, err = bin.ReadUint32(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
rec.ReusedIndexPages = append(rec.ReusedIndexPages, pageNo)
|
|
}
|
|
return rec, nil
|
|
}
|
|
|
|
func (s *Reader) readDeletedMeasures(src *bytes.Buffer) (_ DeletedMeasures, err error) {
|
|
var (
|
|
rec DeletedMeasures
|
|
)
|
|
rec.MetricID, err = bin.ReadUint32(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
// free data pages
|
|
rec.FreeDataPages, err = s.readFreePageNumbers(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
// free index pages
|
|
rec.FreeIndexPages, err = s.readFreePageNumbers(src)
|
|
if err != nil {
|
|
return
|
|
}
|
|
return rec, nil
|
|
}
|
|
|
|
// HELPERS
|
|
|
|
func (s *Reader) readFreePageNumbers(src *bytes.Buffer) ([]uint32, error) {
|
|
var freePages []uint32
|
|
qty, _, err := bin.ReadVarUint64(src)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for range qty {
|
|
var pageNo uint32
|
|
pageNo, err = bin.ReadUint32(src)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
freePages = append(freePages, pageNo)
|
|
}
|
|
return freePages, nil
|
|
}
|
|
|
|
func (s *Reader) Seek(offset int64) error {
|
|
ret, err := s.file.Seek(offset, 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if ret != offset {
|
|
return fmt.Errorf("ret %d != offset %d", ret, offset)
|
|
}
|
|
return nil
|
|
}
|
|
|