aboutsummaryrefslogtreecommitdiff
path: root/vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi/api.go
blob: 97937c8e5980b7fb03b421ac39bd2170be2cd8b8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package eventstreamapi

import (
	"fmt"
	"io"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/private/protocol"
	"github.com/aws/aws-sdk-go/private/protocol/eventstream"
)

// Unmarshaler provides the interface for unmarshaling a EventStream
// message into a SDK type.
type Unmarshaler interface {
	UnmarshalEvent(protocol.PayloadUnmarshaler, eventstream.Message) error
}

// EventStream headers with specific meaning to async API functionality.
const (
	MessageTypeHeader    = `:message-type` // Identifies type of message.
	EventMessageType     = `event`
	ErrorMessageType     = `error`
	ExceptionMessageType = `exception`

	// Message Events
	EventTypeHeader = `:event-type` // Identifies message event type e.g. "Stats".

	// Message Error
	ErrorCodeHeader    = `:error-code`
	ErrorMessageHeader = `:error-message`

	// Message Exception
	ExceptionTypeHeader = `:exception-type`
)

// EventReader provides reading from the EventStream of an reader.
type EventReader struct {
	reader  io.ReadCloser
	decoder *eventstream.Decoder

	unmarshalerForEventType func(string) (Unmarshaler, error)
	payloadUnmarshaler      protocol.PayloadUnmarshaler

	payloadBuf []byte
}

// NewEventReader returns a EventReader built from the reader and unmarshaler
// provided.  Use ReadStream method to start reading from the EventStream.
func NewEventReader(
	reader io.ReadCloser,
	payloadUnmarshaler protocol.PayloadUnmarshaler,
	unmarshalerForEventType func(string) (Unmarshaler, error),
) *EventReader {
	return &EventReader{
		reader:                  reader,
		decoder:                 eventstream.NewDecoder(reader),
		payloadUnmarshaler:      payloadUnmarshaler,
		unmarshalerForEventType: unmarshalerForEventType,
		payloadBuf:              make([]byte, 10*1024),
	}
}

// UseLogger instructs the EventReader to use the logger and log level
// specified.
func (r *EventReader) UseLogger(logger aws.Logger, logLevel aws.LogLevelType) {
	if logger != nil && logLevel.Matches(aws.LogDebugWithEventStreamBody) {
		r.decoder.UseLogger(logger)
	}
}

// ReadEvent attempts to read a message from the EventStream and return the
// unmarshaled event value that the message is for.
//
// For EventStream API errors check if the returned error satisfies the
// awserr.Error interface to get the error's Code and Message components.
//
// EventUnmarshalers called with EventStream messages must take copies of the
// message's Payload. The payload will is reused between events read.
func (r *EventReader) ReadEvent() (event interface{}, err error) {
	msg, err := r.decoder.Decode(r.payloadBuf)
	if err != nil {
		return nil, err
	}
	defer func() {
		// Reclaim payload buffer for next message read.
		r.payloadBuf = msg.Payload[0:0]
	}()

	typ, err := GetHeaderString(msg, MessageTypeHeader)
	if err != nil {
		return nil, err
	}

	switch typ {
	case EventMessageType:
		return r.unmarshalEventMessage(msg)
	case ExceptionMessageType:
		err = r.unmarshalEventException(msg)
		return nil, err
	case ErrorMessageType:
		return nil, r.unmarshalErrorMessage(msg)
	default:
		return nil, fmt.Errorf("unknown eventstream message type, %v", typ)
	}
}

func (r *EventReader) unmarshalEventMessage(
	msg eventstream.Message,
) (event interface{}, err error) {
	eventType, err := GetHeaderString(msg, EventTypeHeader)
	if err != nil {
		return nil, err
	}

	ev, err := r.unmarshalerForEventType(eventType)
	if err != nil {
		return nil, err
	}

	err = ev.UnmarshalEvent(r.payloadUnmarshaler, msg)
	if err != nil {
		return nil, err
	}

	return ev, nil
}

func (r *EventReader) unmarshalEventException(
	msg eventstream.Message,
) (err error) {
	eventType, err := GetHeaderString(msg, ExceptionTypeHeader)
	if err != nil {
		return err
	}

	ev, err := r.unmarshalerForEventType(eventType)
	if err != nil {
		return err
	}

	err = ev.UnmarshalEvent(r.payloadUnmarshaler, msg)
	if err != nil {
		return err
	}

	var ok bool
	err, ok = ev.(error)
	if !ok {
		err = messageError{
			code: "SerializationError",
			msg: fmt.Sprintf(
				"event stream exception %s mapped to non-error %T, %v",
				eventType, ev, ev,
			),
		}
	}

	return err
}

func (r *EventReader) unmarshalErrorMessage(msg eventstream.Message) (err error) {
	var msgErr messageError

	msgErr.code, err = GetHeaderString(msg, ErrorCodeHeader)
	if err != nil {
		return err
	}

	msgErr.msg, err = GetHeaderString(msg, ErrorMessageHeader)
	if err != nil {
		return err
	}

	return msgErr
}

// Close closes the EventReader's EventStream reader.
func (r *EventReader) Close() error {
	return r.reader.Close()
}

// GetHeaderString returns the value of the header as a string. If the header
// is not set or the value is not a string an error will be returned.
func GetHeaderString(msg eventstream.Message, headerName string) (string, error) {
	headerVal := msg.Headers.Get(headerName)
	if headerVal == nil {
		return "", fmt.Errorf("error header %s not present", headerName)
	}

	v, ok := headerVal.Get().(string)
	if !ok {
		return "", fmt.Errorf("error header value is not a string, %T", headerVal)
	}

	return v, nil
}