package eventstream import ( "bytes" "encoding/base64" "encoding/json" "fmt" "strconv" ) type decodedMessage struct { rawMessage Headers decodedHeaders `json:"headers"` } type jsonMessage struct { Length json.Number `json:"total_length"` HeadersLen json.Number `json:"headers_length"` PreludeCRC json.Number `json:"prelude_crc"` Headers decodedHeaders `json:"headers"` Payload []byte `json:"payload"` CRC json.Number `json:"message_crc"` } func (d *decodedMessage) UnmarshalJSON(b []byte) (err error) { var jsonMsg jsonMessage if err = json.Unmarshal(b, &jsonMsg); err != nil { return err } d.Length, err = numAsUint32(jsonMsg.Length) if err != nil { return err } d.HeadersLen, err = numAsUint32(jsonMsg.HeadersLen) if err != nil { return err } d.PreludeCRC, err = numAsUint32(jsonMsg.PreludeCRC) if err != nil { return err } d.Headers = jsonMsg.Headers d.Payload = jsonMsg.Payload d.CRC, err = numAsUint32(jsonMsg.CRC) if err != nil { return err } return nil } func (d *decodedMessage) MarshalJSON() ([]byte, error) { jsonMsg := jsonMessage{ Length: json.Number(strconv.Itoa(int(d.Length))), HeadersLen: json.Number(strconv.Itoa(int(d.HeadersLen))), PreludeCRC: json.Number(strconv.Itoa(int(d.PreludeCRC))), Headers: d.Headers, Payload: d.Payload, CRC: json.Number(strconv.Itoa(int(d.CRC))), } return json.Marshal(jsonMsg) } func numAsUint32(n json.Number) (uint32, error) { v, err := n.Int64() if err != nil { return 0, fmt.Errorf("failed to get int64 json number, %v", err) } return uint32(v), nil } func (d decodedMessage) Message() Message { return Message{ Headers: Headers(d.Headers), Payload: d.Payload, } } type decodedHeaders Headers func (hs *decodedHeaders) UnmarshalJSON(b []byte) error { var jsonHeaders []struct { Name string `json:"name"` Type valueType `json:"type"` Value interface{} `json:"value"` } decoder := json.NewDecoder(bytes.NewReader(b)) decoder.UseNumber() if err := decoder.Decode(&jsonHeaders); err != nil { return err } var headers Headers for _, h := range jsonHeaders { value, err := valueFromType(h.Type, h.Value) if err != nil { return err } headers.Set(h.Name, value) } (*hs) = decodedHeaders(headers) return nil } func valueFromType(typ valueType, val interface{}) (Value, error) { switch typ { case trueValueType: return BoolValue(true), nil case falseValueType: return BoolValue(false), nil case int8ValueType: v, err := val.(json.Number).Int64() return Int8Value(int8(v)), err case int16ValueType: v, err := val.(json.Number).Int64() return Int16Value(int16(v)), err case int32ValueType: v, err := val.(json.Number).Int64() return Int32Value(int32(v)), err case int64ValueType: v, err := val.(json.Number).Int64() return Int64Value(v), err case bytesValueType: v, err := base64.StdEncoding.DecodeString(val.(string)) return BytesValue(v), err case stringValueType: v, err := base64.StdEncoding.DecodeString(val.(string)) return StringValue(string(v)), err case timestampValueType: v, err := val.(json.Number).Int64() return TimestampValue(timeFromEpochMilli(v)), err case uuidValueType: v, err := base64.StdEncoding.DecodeString(val.(string)) var tv UUIDValue copy(tv[:], v) return tv, err default: panic(fmt.Sprintf("unknown type, %s, %T", typ.String(), val)) } }