You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
460 lines
10 KiB
460 lines
10 KiB
package database
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"hash/crc32"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"regexp"
|
|
"sync"
|
|
"time"
|
|
|
|
"gordenko.dev/dima/diploma"
|
|
"gordenko.dev/dima/diploma/atree"
|
|
"gordenko.dev/dima/diploma/atree/redo"
|
|
"gordenko.dev/dima/diploma/bin"
|
|
"gordenko.dev/dima/diploma/chunkenc"
|
|
"gordenko.dev/dima/diploma/conbuf"
|
|
"gordenko.dev/dima/diploma/freelist"
|
|
"gordenko.dev/dima/diploma/recovery"
|
|
"gordenko.dev/dima/diploma/txlog"
|
|
)
|
|
|
|
func JoinSnapshotFileName(dir string, logNumber int) string {
|
|
return filepath.Join(dir, fmt.Sprintf("%d.snapshot", logNumber))
|
|
}
|
|
|
|
type metricLockEntry struct {
|
|
XLock bool
|
|
RLocks int
|
|
WaitQueue []any
|
|
}
|
|
|
|
type Database struct {
|
|
mutex sync.Mutex
|
|
workerSignalCh chan struct{}
|
|
workerQueue []any
|
|
rLocksToRelease []uint32
|
|
metrics map[uint32]*_metric
|
|
metricLockEntries map[uint32]*metricLockEntry
|
|
dataFreeList *freelist.FreeList
|
|
indexFreeList *freelist.FreeList
|
|
dir string
|
|
databaseName string
|
|
redoDir string
|
|
txlog *txlog.Writer
|
|
atree *atree.Atree
|
|
tcpPort int
|
|
logfile *os.File
|
|
logger *log.Logger
|
|
exitCh chan struct{}
|
|
waitGroup *sync.WaitGroup
|
|
}
|
|
|
|
type Options struct {
|
|
TCPPort int
|
|
Dir string
|
|
DatabaseName string
|
|
RedoDir string
|
|
Logfile *os.File
|
|
ExitCh chan struct{}
|
|
WaitGroup *sync.WaitGroup
|
|
}
|
|
|
|
func New(opt Options) (_ *Database, err error) {
|
|
if opt.TCPPort <= 0 {
|
|
return nil, errors.New("TCPPort option is required")
|
|
}
|
|
if opt.Dir == "" {
|
|
return nil, errors.New("Dir option is required")
|
|
}
|
|
if opt.DatabaseName == "" {
|
|
return nil, errors.New("DatabaseName option is required")
|
|
}
|
|
if opt.RedoDir == "" {
|
|
return nil, errors.New("RedoDir option is required")
|
|
}
|
|
if opt.Logfile == nil {
|
|
return nil, errors.New("Logfile option is required")
|
|
}
|
|
if opt.ExitCh == nil {
|
|
return nil, errors.New("ExitCh option is required")
|
|
}
|
|
if opt.WaitGroup == nil {
|
|
return nil, errors.New("WaitGroup option is required")
|
|
}
|
|
|
|
s := &Database{
|
|
workerSignalCh: make(chan struct{}, 1),
|
|
dir: opt.Dir,
|
|
databaseName: opt.DatabaseName,
|
|
redoDir: opt.RedoDir,
|
|
metrics: make(map[uint32]*_metric),
|
|
metricLockEntries: make(map[uint32]*metricLockEntry),
|
|
dataFreeList: freelist.New(),
|
|
indexFreeList: freelist.New(),
|
|
tcpPort: opt.TCPPort,
|
|
logfile: opt.Logfile,
|
|
logger: log.New(opt.Logfile, "", log.LstdFlags),
|
|
exitCh: opt.ExitCh,
|
|
waitGroup: opt.WaitGroup,
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
func (s *Database) ListenAndServe() (err error) {
|
|
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", s.tcpPort))
|
|
if err != nil {
|
|
return fmt.Errorf("net.Listen: %s; port=%d", err, s.tcpPort)
|
|
}
|
|
|
|
s.atree, err = atree.New(atree.Options{
|
|
Dir: s.dir,
|
|
DatabaseName: s.databaseName,
|
|
RedoDir: s.redoDir,
|
|
DataFreeList: s.dataFreeList,
|
|
IndexFreeList: s.indexFreeList,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("atree.New: %s", err)
|
|
}
|
|
s.atree.Run()
|
|
|
|
go s.worker()
|
|
|
|
s.recovery()
|
|
|
|
s.logger.Println("database started")
|
|
for {
|
|
// Listen for an incoming connection.
|
|
conn, err := listener.Accept()
|
|
if err != nil {
|
|
s.logger.Printf("listener.Accept: %s\n", err)
|
|
time.Sleep(time.Second)
|
|
} else {
|
|
go s.handleTCPConn(conn)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Database) recovery() {
|
|
advisor, err := recovery.NewRecoveryAdvisor(recovery.RecoveryAdvisorOptions{
|
|
Dir: s.dir,
|
|
VerifySnapshot: s.verifySnapshot,
|
|
})
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
recipe, err := advisor.GetRecipe()
|
|
if err != nil {
|
|
diploma.Abort(diploma.GetRecoveryRecipeFailed, err)
|
|
}
|
|
|
|
var logNumber int
|
|
|
|
if recipe != nil {
|
|
if recipe.Snapshot != "" {
|
|
err = s.loadSnapshot(recipe.Snapshot)
|
|
if err != nil {
|
|
diploma.Abort(diploma.LoadSnapshotFailed, err)
|
|
}
|
|
}
|
|
for _, changesFileName := range recipe.Changes {
|
|
err = s.replayChanges(changesFileName)
|
|
if err != nil {
|
|
diploma.Abort(diploma.ReplayChangesFailed, err)
|
|
}
|
|
}
|
|
logNumber = recipe.LogNumber
|
|
}
|
|
|
|
s.txlog, err = txlog.NewWriter(txlog.WriterOptions{
|
|
Dir: s.dir,
|
|
LogNumber: logNumber,
|
|
AppendToWorkerQueue: s.appendJobToWorkerQueue,
|
|
ExitCh: s.exitCh,
|
|
WaitGroup: s.waitGroup,
|
|
})
|
|
if err != nil {
|
|
diploma.Abort(diploma.CreateChangesWriterFailed, err)
|
|
|
|
}
|
|
go s.txlog.Run()
|
|
|
|
fileNames, err := s.searchREDOFiles()
|
|
if err != nil {
|
|
diploma.Abort(diploma.SearchREDOFilesFailed, err)
|
|
}
|
|
|
|
if len(fileNames) > 0 {
|
|
for _, fileName := range fileNames {
|
|
err = s.replayREDOFile(fileName)
|
|
if err != nil {
|
|
diploma.Abort(diploma.ReplayREDOFileFailed, err)
|
|
}
|
|
}
|
|
|
|
for _, fileName := range fileNames {
|
|
err = os.Remove(fileName)
|
|
if err != nil {
|
|
diploma.Abort(diploma.RemoveREDOFileFailed, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if recipe != nil {
|
|
if recipe.CompleteSnapshot {
|
|
err = s.dumpSnapshot(logNumber)
|
|
if err != nil {
|
|
diploma.Abort(diploma.DumpSnapshotFailed, err)
|
|
}
|
|
}
|
|
|
|
for _, fileName := range recipe.ToDelete {
|
|
err = os.Remove(fileName)
|
|
if err != nil {
|
|
diploma.Abort(diploma.RemoveRecipeFileFailed, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Database) searchREDOFiles() ([]string, error) {
|
|
var (
|
|
reREDO = regexp.MustCompile(`a\d+\.redo`)
|
|
fileNames []string
|
|
)
|
|
|
|
entries, err := os.ReadDir(s.redoDir)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, entry := range entries {
|
|
if entry.Type().IsRegular() {
|
|
baseName := entry.Name()
|
|
if reREDO.MatchString(baseName) {
|
|
fileNames = append(fileNames, filepath.Join(s.redoDir, baseName))
|
|
}
|
|
}
|
|
}
|
|
return fileNames, nil
|
|
}
|
|
|
|
func (s *Database) replayREDOFile(fileName string) error {
|
|
redoFile, err := redo.ReadREDOFile(redo.ReadREDOFileReq{
|
|
FileName: fileName,
|
|
DataPageSize: atree.DataPageSize,
|
|
IndexPageSize: atree.IndexPageSize,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("can't read REDO file %s: %s", fileName, err)
|
|
}
|
|
|
|
metric, ok := s.metrics[redoFile.MetricID]
|
|
if !ok {
|
|
return fmt.Errorf("has REDOFile, metric %d not found", redoFile.MetricID)
|
|
}
|
|
|
|
if metric.Until < redoFile.Timestamp {
|
|
waitCh := make(chan struct{})
|
|
s.atree.ApplyREDO(atree.WriteTask{
|
|
DataPage: redoFile.DataPage,
|
|
IndexPages: redoFile.IndexPages,
|
|
})
|
|
<-waitCh
|
|
|
|
waitCh = s.txlog.WriteAppendedMeasureWithOverflow(
|
|
txlog.AppendedMeasureWithOverflow{
|
|
MetricID: redoFile.MetricID,
|
|
Timestamp: redoFile.Timestamp,
|
|
Value: redoFile.Value,
|
|
IsDataPageReused: redoFile.IsDataPageReused,
|
|
DataPageNo: redoFile.DataPage.PageNo,
|
|
IsRootChanged: redoFile.IsRootChanged,
|
|
RootPageNo: redoFile.RootPageNo,
|
|
ReusedIndexPages: redoFile.ReusedIndexPages,
|
|
},
|
|
fileName,
|
|
false,
|
|
)
|
|
<-waitCh
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Database) verifySnapshot(fileName string) (_ bool, err error) {
|
|
file, err := os.Open(fileName)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer file.Close()
|
|
|
|
stat, err := file.Stat()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if stat.Size() <= 4 {
|
|
return false, nil
|
|
}
|
|
|
|
var (
|
|
payloadSize = stat.Size() - 4
|
|
hash = crc32.NewIEEE()
|
|
)
|
|
|
|
_, err = io.CopyN(hash, file, payloadSize)
|
|
if err != nil {
|
|
return
|
|
}
|
|
calculatedCRC := hash.Sum32()
|
|
|
|
storedCRC, err := bin.ReadUint32(file)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if storedCRC != calculatedCRC {
|
|
return false, fmt.Errorf("strored CRC %d not equal calculated CRC %d",
|
|
storedCRC, calculatedCRC)
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func (s *Database) replayChanges(fileName string) error {
|
|
walReader, err := txlog.NewReader(txlog.ReaderOptions{
|
|
FileName: fileName,
|
|
BufferSize: 1024 * 1024,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
lsn, records, done, err := walReader.ReadPacket()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_ = lsn
|
|
|
|
if done {
|
|
return nil
|
|
}
|
|
|
|
for _, record := range records {
|
|
if err = s.replayChangesRecord(record); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Database) replayChangesRecord(untyped any) error {
|
|
switch rec := untyped.(type) {
|
|
case txlog.AddedMetric:
|
|
var (
|
|
values diploma.ValueCompressor
|
|
timestampsBuf = conbuf.New(nil)
|
|
valuesBuf = conbuf.New(nil)
|
|
)
|
|
|
|
if rec.MetricType == diploma.Cumulative {
|
|
values = chunkenc.NewReverseCumulativeDeltaCompressor(
|
|
valuesBuf, 0, byte(rec.FracDigits))
|
|
} else {
|
|
values = chunkenc.NewReverseInstantDeltaCompressor(
|
|
valuesBuf, 0, byte(rec.FracDigits))
|
|
}
|
|
|
|
s.metrics[rec.MetricID] = &_metric{
|
|
MetricType: rec.MetricType,
|
|
FracDigits: byte(rec.FracDigits),
|
|
TimestampsBuf: timestampsBuf,
|
|
ValuesBuf: valuesBuf,
|
|
Timestamps: chunkenc.NewReverseTimeDeltaOfDeltaCompressor(timestampsBuf, 0),
|
|
Values: values,
|
|
}
|
|
|
|
case txlog.DeletedMetric:
|
|
delete(s.metrics, rec.MetricID)
|
|
if len(rec.FreeDataPages) > 0 {
|
|
s.dataFreeList.AddPages(rec.FreeDataPages)
|
|
}
|
|
if len(rec.FreeIndexPages) > 0 {
|
|
s.indexFreeList.AddPages(rec.FreeIndexPages)
|
|
}
|
|
|
|
case txlog.AppendedMeasure:
|
|
metric, ok := s.metrics[rec.MetricID]
|
|
if ok {
|
|
metric.Timestamps.Append(rec.Timestamp)
|
|
metric.Values.Append(rec.Value)
|
|
|
|
if metric.Since == 0 {
|
|
metric.Since = rec.Timestamp
|
|
metric.SinceValue = rec.Value
|
|
}
|
|
|
|
metric.Until = rec.Timestamp
|
|
metric.UntilValue = rec.Value
|
|
}
|
|
|
|
case txlog.AppendedMeasures:
|
|
metric, ok := s.metrics[rec.MetricID]
|
|
if ok {
|
|
for _, measure := range rec.Measures {
|
|
metric.Timestamps.Append(measure.Timestamp)
|
|
metric.Values.Append(measure.Value)
|
|
|
|
if metric.Since == 0 {
|
|
metric.Since = measure.Timestamp
|
|
metric.SinceValue = measure.Value
|
|
}
|
|
|
|
metric.Until = measure.Timestamp
|
|
metric.UntilValue = measure.Value
|
|
}
|
|
}
|
|
|
|
case txlog.AppendedMeasureWithOverflow:
|
|
metric, ok := s.metrics[rec.MetricID]
|
|
if ok {
|
|
metric.ReinitBy(rec.Timestamp, rec.Value)
|
|
if rec.IsRootChanged {
|
|
metric.RootPageNo = rec.RootPageNo
|
|
}
|
|
metric.LastPageNo = rec.DataPageNo
|
|
// delete free pages
|
|
if rec.IsDataPageReused {
|
|
s.dataFreeList.DeletePages([]uint32{
|
|
rec.DataPageNo,
|
|
})
|
|
}
|
|
if len(rec.ReusedIndexPages) > 0 {
|
|
s.indexFreeList.DeletePages(rec.ReusedIndexPages)
|
|
}
|
|
}
|
|
|
|
case txlog.DeletedMeasures:
|
|
metric, ok := s.metrics[rec.MetricID]
|
|
if ok {
|
|
metric.DeleteMeasures()
|
|
if len(rec.FreeDataPages) > 0 {
|
|
s.dataFreeList.AddPages(rec.FreeDataPages)
|
|
}
|
|
if len(rec.FreeDataPages) > 0 {
|
|
s.indexFreeList.AddPages(rec.FreeIndexPages)
|
|
}
|
|
}
|
|
|
|
default:
|
|
diploma.Abort(diploma.UnknownTxLogRecordTypeBug,
|
|
fmt.Errorf("bug: unknown record type %T in TransactionLog", rec))
|
|
}
|
|
return nil
|
|
}
|
|
|