спеціалізована СУБД для зберігання та обробки показань датчиків та лічильників
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
diploma/txlog/writer.go

507 lines
10 KiB

package txlog
import (
"bytes"
"errors"
"fmt"
"hash/crc32"
"os"
"path/filepath"
"sync"
octopus "gordenko.dev/dima/diploma"
"gordenko.dev/dima/diploma/bin"
"gordenko.dev/dima/diploma/proto"
)
const (
lsnSize = 4
packetPrefixSize = 12 // 4 lsn + 4 packet length + 4 crc32
lengthIdx = 0
checksumIdx = 4
lsnIdx = 8
filePerm = 0770
dumpSnapshotAfterNBytes = 1024 * 1024 * 1024 // 1 GB
)
const (
CodeAddedMetric byte = 1
CodeDeletedMetric byte = 2
CodeAppendedMeasure byte = 4
CodeAppendedMeasures byte = 5
CodeAppendedMeasureWithOverflow byte = 6
CodeDeletedMeasures byte = 7
)
func JoinChangesFileName(dir string, logNumber int) string {
return filepath.Join(dir, fmt.Sprintf("%d.changes", logNumber))
}
type Changes struct {
Records []any
LogNumber int
ForceSnapshot bool
ExitWaitGroup *sync.WaitGroup
WaitCh chan struct{}
}
type Writer struct {
mutex sync.Mutex
logNumber int
dir string
file *os.File
buf *bytes.Buffer
redoFilesToDelete []string
workerReqs []any
waitCh chan struct{}
appendToWorkerQueue func(any)
lsn uint32
written int64
isExited bool
exitCh chan struct{}
waitGroup *sync.WaitGroup
signalCh chan struct{}
}
type WriterOptions struct {
Dir string
LogNumber int // номер журнала
AppendToWorkerQueue func(any)
ExitCh chan struct{}
WaitGroup *sync.WaitGroup
}
func NewWriter(opt WriterOptions) (*Writer, error) {
if opt.Dir == "" {
return nil, errors.New("Dir option is required")
}
if opt.AppendToWorkerQueue == nil {
return nil, errors.New("AppendToWorkerQueue option is required")
}
if opt.ExitCh == nil {
return nil, errors.New("ExitCh option is required")
}
if opt.WaitGroup == nil {
return nil, errors.New("WaitGroup option is required")
}
s := &Writer{
dir: opt.Dir,
buf: bytes.NewBuffer(nil),
appendToWorkerQueue: opt.AppendToWorkerQueue,
logNumber: opt.LogNumber,
exitCh: opt.ExitCh,
waitGroup: opt.WaitGroup,
signalCh: make(chan struct{}, 1),
}
var err error
if opt.LogNumber > 0 {
s.file, err = os.OpenFile(
JoinChangesFileName(opt.Dir, s.logNumber),
os.O_APPEND|os.O_WRONLY,
filePerm,
)
if err != nil {
return nil, err
}
} else {
s.logNumber = 1
s.file, err = os.OpenFile(
JoinChangesFileName(opt.Dir, s.logNumber),
os.O_CREATE|os.O_WRONLY,
filePerm,
)
if err != nil {
return nil, err
}
}
s.reset()
return s, nil
}
func (s *Writer) Run() {
for {
select {
case <-s.signalCh:
if err := s.flush(); err != nil {
octopus.Abort(octopus.FailedWriteToTxLog, err)
}
case <-s.exitCh:
s.exit()
return
}
}
}
func (s *Writer) reset() {
s.buf.Reset()
s.buf.Write([]byte{
0, 0, 0, 0, // packet length
0, 0, 0, 0, // crc32
0, 0, 0, 0, // lsn
})
s.redoFilesToDelete = nil
s.workerReqs = nil
s.waitCh = make(chan struct{})
}
func (s *Writer) flush() error {
s.mutex.Lock()
workerReqs := s.workerReqs
waitCh := s.waitCh
isExited := s.isExited
var exitWaitGroup *sync.WaitGroup
if s.isExited {
exitWaitGroup = s.waitGroup
}
if s.buf.Len() > packetPrefixSize {
redoFilesToDelete := s.redoFilesToDelete
s.lsn++
lsn := s.lsn
packet := make([]byte, s.buf.Len())
copy(packet, s.buf.Bytes())
s.reset()
s.written += int64(len(packet)) + 12
s.mutex.Unlock()
bin.PutUint32(packet[lengthIdx:], uint32(len(packet)-packetPrefixSize))
bin.PutUint32(packet[lsnIdx:], lsn)
bin.PutUint32(packet[checksumIdx:], crc32.ChecksumIEEE(packet[8:]))
n, err := s.file.Write(packet)
if err != nil {
return fmt.Errorf("TxLog write: %s", err)
}
if n != len(packet) {
return fmt.Errorf("TxLog written %d != packet size %d", n, len(packet))
}
if err := s.file.Sync(); err != nil {
return fmt.Errorf("TxLog sync: %s", err)
}
for _, fileName := range redoFilesToDelete {
err = os.Remove(fileName)
if err != nil {
octopus.Abort(octopus.RemoveREDOFileFailed, err)
}
}
} else {
s.waitCh = make(chan struct{})
s.mutex.Unlock()
}
var forceSnapshot bool
if s.written > dumpSnapshotAfterNBytes {
forceSnapshot = true
}
if isExited && s.written > 0 {
forceSnapshot = true
}
if forceSnapshot {
if err := s.file.Close(); err != nil {
return fmt.Errorf("close changes file: %s", err)
}
s.logNumber++
var err error
s.file, err = os.OpenFile(
JoinChangesFileName(s.dir, s.logNumber),
os.O_CREATE|os.O_WRONLY,
filePerm,
)
if err != nil {
return fmt.Errorf("create new changes file: %s", err)
}
s.written = 0
}
s.appendToWorkerQueue(Changes{
Records: workerReqs,
ForceSnapshot: forceSnapshot,
LogNumber: s.logNumber,
WaitCh: waitCh,
ExitWaitGroup: exitWaitGroup,
})
return nil
}
func (s *Writer) exit() {
s.mutex.Lock()
s.isExited = true
s.mutex.Unlock()
if err := s.flush(); err != nil {
octopus.Abort(octopus.FailedWriteToTxLog, err)
}
}
// API
type AddedMetric struct {
MetricID uint32
MetricType octopus.MetricType
FracDigits int
}
func (s *Writer) WriteAddedMetric(req AddedMetric) chan struct{} {
arr := []byte{
CodeAddedMetric,
0, 0, 0, 0, //
byte(req.MetricType),
byte(req.FracDigits),
}
bin.PutUint32(arr[1:], req.MetricID)
// пишу в буфер
s.mutex.Lock()
s.buf.Write(arr)
s.workerReqs = append(s.workerReqs, req)
s.mutex.Unlock()
s.sendSignal()
return s.waitCh
}
type DeletedMetric struct {
MetricID uint32
FreeDataPages []uint32
FreeIndexPages []uint32
}
func (s *Writer) WriteDeletedMetric(req DeletedMetric) chan struct{} {
arr := []byte{
CodeDeletedMetric,
0, 0, 0, 0, // metricID
}
bin.PutUint32(arr[1:], req.MetricID)
// пишу в буфер
s.mutex.Lock()
defer s.mutex.Unlock()
s.buf.Write(arr)
s.packFreeDataAndIndexPages(req.FreeDataPages, req.FreeIndexPages)
s.workerReqs = append(s.workerReqs, req)
s.sendSignal()
return s.waitCh
}
type AppendedMeasure struct {
MetricID uint32
Timestamp uint32
Value float64
}
func (s *Writer) WriteAppendMeasure(req AppendedMeasure) chan struct{} {
arr := []byte{
CodeAppendedMeasure,
0, 0, 0, 0, // metricID
0, 0, 0, 0, // timestamp
0, 0, 0, 0, 0, 0, 0, 0, // value
}
bin.PutUint32(arr[1:], req.MetricID)
bin.PutUint32(arr[5:], req.Timestamp)
bin.PutFloat64(arr[9:], req.Value)
//
s.mutex.Lock()
s.buf.Write(arr)
s.workerReqs = append(s.workerReqs, req)
s.mutex.Unlock()
s.sendSignal()
return s.waitCh
}
type AppendedMeasures struct {
MetricID uint32
Measures []proto.Measure
}
type AppendedMeasuresExtended struct {
Record AppendedMeasures
HoldLock bool
}
func (s *Writer) WriteAppendMeasures(req AppendedMeasures, holdLock bool) chan struct{} {
arr := []byte{
CodeAppendedMeasures,
0, 0, 0, 0, // metricID
0, 0, // qty
}
bin.PutUint32(arr[1:], req.MetricID)
bin.PutUint16(arr[5:], uint16(len(req.Measures)))
//
s.mutex.Lock()
s.buf.Write(arr)
for _, measure := range req.Measures {
bin.WriteUint32(s.buf, measure.Timestamp)
bin.WriteFloat64(s.buf, measure.Value)
}
s.workerReqs = append(s.workerReqs, AppendedMeasuresExtended{
Record: req,
HoldLock: holdLock,
})
s.mutex.Unlock()
s.sendSignal()
return s.waitCh
}
type AppendedMeasureWithOverflow struct {
MetricID uint32
Timestamp uint32
Value float64
IsDataPageReused bool
DataPageNo uint32
IsRootChanged bool
RootPageNo uint32
ReusedIndexPages []uint32
}
type AppendedMeasureWithOverflowExtended struct {
Record AppendedMeasureWithOverflow
HoldLock bool
}
/*
Формат:
1b code
4b metricID
4b timestamp
8b value
1b isReusedDataPage
4b dataPageNo
1b isRootChanged
[4b] newRootPageNo
1b reusedIndexPages length
[N * 4b] reusedIndexPages
*/
func (s *Writer) WriteAppendedMeasureWithOverflow(req AppendedMeasureWithOverflow, redoFileName string, holdLock bool) chan struct{} {
size := 24 + len(req.ReusedIndexPages)*4
if req.IsRootChanged {
size += 4
}
tmp := make([]byte, size)
tmp[0] = CodeAppendedMeasureWithOverflow
bin.PutUint32(tmp[1:], req.MetricID)
bin.PutUint32(tmp[5:], req.Timestamp)
bin.PutFloat64(tmp[9:], req.Value)
if req.IsDataPageReused {
tmp[17] = 1
}
bin.PutUint32(tmp[18:], req.DataPageNo)
pos := 22
if req.IsRootChanged {
tmp[pos] = 1
bin.PutUint32(tmp[pos+1:], req.RootPageNo)
pos += 5
} else {
tmp[pos] = 0
pos += 1
}
tmp[pos] = byte(len(req.ReusedIndexPages))
pos += 1
for _, indexPageNo := range req.ReusedIndexPages {
bin.PutUint32(tmp[pos:], indexPageNo)
pos += 4
}
s.mutex.Lock()
s.buf.Write(tmp)
s.workerReqs = append(s.workerReqs, AppendedMeasureWithOverflowExtended{
Record: req,
HoldLock: holdLock,
})
s.redoFilesToDelete = append(s.redoFilesToDelete, redoFileName)
s.mutex.Unlock()
s.sendSignal()
return s.waitCh
}
type DeletedMeasures struct {
MetricID uint32
FreeDataPages []uint32
FreeIndexPages []uint32
}
/*
Формат:
1b code
4b metricID
1b freeDataPages length
[N * 4b] freeDataPages
1b freeIndexPages length
[N * 4b] freeIndexPages
*/
func (s *Writer) WriteDeletedMeasures(op DeletedMeasures) chan struct{} {
tmp := []byte{
CodeDeletedMeasures,
0, 0, 0, 0,
}
bin.PutUint32(tmp[1:], op.MetricID)
// записываю часть фиксированного размера
s.mutex.Lock()
s.buf.Write(tmp)
s.packFreeDataAndIndexPages(op.FreeDataPages, op.FreeIndexPages)
s.workerReqs = append(s.workerReqs, op)
s.mutex.Unlock()
s.sendSignal()
return s.waitCh
}
type DeletedMeasuresSince struct {
MetricID uint32
LastPageNo uint32
IsRootChanged bool
RootPageNo uint32
FreeDataPages []uint32
FreeIndexPages []uint32
TimestampsBuf []byte
ValuesBuf []byte
}
func (s *Writer) sendSignal() {
select {
case s.signalCh <- struct{}{}:
default:
}
}
// helper
func (s *Writer) packFreeDataAndIndexPages(freeDataPages, freeIndexPages []uint32) {
// записываю data pages
bin.WriteVarUint64(s.buf, uint64(len(freeDataPages)))
for _, dataPageNo := range freeDataPages {
bin.WriteUint32(s.buf, dataPageNo)
}
// записываю index pages
bin.WriteVarUint64(s.buf, uint64(len(freeIndexPages)))
for _, indexPageNo := range freeIndexPages {
bin.WriteUint32(s.buf, indexPageNo)
}
}