package eventstream import ( "bytes" "encoding/binary" "hash/crc32" ) const preludeLen = 8 const preludeCRCLen = 4 const msgCRCLen = 4 const minMsgLen = preludeLen + preludeCRCLen + msgCRCLen const maxPayloadLen = 1024 * 1024 * 16 // 16MB const maxHeadersLen = 1024 * 128 // 128KB const maxMsgLen = minMsgLen + maxHeadersLen + maxPayloadLen var crc32IEEETable = crc32.MakeTable(crc32.IEEE) // A Message provides the eventstream message representation. type Message struct { Headers Headers Payload []byte } func (m *Message) rawMessage() (rawMessage, error) { var raw rawMessage if len(m.Headers) > 0 { var headers bytes.Buffer if err := encodeHeaders(&headers, m.Headers); err != nil { return rawMessage{}, err } raw.Headers = headers.Bytes() raw.HeadersLen = uint32(len(raw.Headers)) } raw.Length = raw.HeadersLen + uint32(len(m.Payload)) + minMsgLen hash := crc32.New(crc32IEEETable) binaryWriteFields(hash, binary.BigEndian, raw.Length, raw.HeadersLen) raw.PreludeCRC = hash.Sum32() binaryWriteFields(hash, binary.BigEndian, raw.PreludeCRC) if raw.HeadersLen > 0 { hash.Write(raw.Headers) } // Read payload bytes and update hash for it as well. if len(m.Payload) > 0 { raw.Payload = m.Payload hash.Write(raw.Payload) } raw.CRC = hash.Sum32() return raw, nil } type messagePrelude struct { Length uint32 HeadersLen uint32 PreludeCRC uint32 } func (p messagePrelude) PayloadLen() uint32 { return p.Length - p.HeadersLen - minMsgLen } func (p messagePrelude) ValidateLens() error { if p.Length == 0 || p.Length > maxMsgLen { return LengthError{ Part: "message prelude", Want: maxMsgLen, Have: int(p.Length), } } if p.HeadersLen > maxHeadersLen { return LengthError{ Part: "message headers", Want: maxHeadersLen, Have: int(p.HeadersLen), } } if payloadLen := p.PayloadLen(); payloadLen > maxPayloadLen { return LengthError{ Part: "message payload", Want: maxPayloadLen, Have: int(payloadLen), } } return nil } type rawMessage struct { messagePrelude Headers []byte Payload []byte CRC uint32 }