You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
89 lines
1.5 KiB
89 lines
1.5 KiB
package transform
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
|
|
"gordenko.dev/dima/diploma/bin"
|
|
"gordenko.dev/dima/diploma/proto"
|
|
)
|
|
|
|
// CHUNKED RESPONDER
|
|
|
|
var endMsg = []byte{
|
|
proto.RespEndOfValue, // end of stream
|
|
}
|
|
|
|
type ChunkedResponder struct {
|
|
recordsQty int
|
|
buf *bytes.Buffer
|
|
dst io.Writer
|
|
}
|
|
|
|
func NewChunkedResponder(dst io.Writer) *ChunkedResponder {
|
|
s := &ChunkedResponder{
|
|
recordsQty: 0,
|
|
buf: bytes.NewBuffer(nil),
|
|
dst: dst,
|
|
}
|
|
|
|
s.buf.Write([]byte{
|
|
proto.RespPartOfValue, // message type
|
|
0, 0, 0, 0, // records qty
|
|
})
|
|
return s
|
|
}
|
|
|
|
func (s *ChunkedResponder) BufferRecord(rec []byte) {
|
|
s.buf.Write(rec)
|
|
s.recordsQty++
|
|
}
|
|
|
|
func (s *ChunkedResponder) AppendRecord(rec []byte) error {
|
|
s.buf.Write(rec)
|
|
s.recordsQty++
|
|
|
|
if s.buf.Len() < 1500 {
|
|
return nil
|
|
}
|
|
|
|
if err := s.sendBuffered(); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.buf.Write([]byte{
|
|
proto.RespPartOfValue, // message type
|
|
0, 0, 0, 0, // records qty
|
|
})
|
|
s.recordsQty = 0
|
|
return nil
|
|
}
|
|
|
|
func (s *ChunkedResponder) Flush() error {
|
|
if s.recordsQty > 0 {
|
|
if err := s.sendBuffered(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if _, err := s.dst.Write(endMsg); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *ChunkedResponder) sendBuffered() (err error) {
|
|
msg := s.buf.Bytes()
|
|
bin.PutUint32(msg[1:], uint32(s.recordsQty))
|
|
|
|
n, err := s.dst.Write(msg)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if n != len(msg) {
|
|
return fmt.Errorf("incomplete write %d bytes instead of %d", n, len(msg))
|
|
}
|
|
s.buf.Reset()
|
|
return
|
|
}
|
|
|