Skip to content

Commit c6acbeb

Browse files
committed
panic removal and variable renaming
Signed-off-by: Gergely Szabo <gergely.szabo@origoss.com>
1 parent a83cd3b commit c6acbeb

File tree

1 file changed

+37
-33
lines changed

1 file changed

+37
-33
lines changed

fluent/fluent.go

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -202,25 +202,25 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err
202202
}
203203

204204
func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}) error {
205-
var data *msgToSend
205+
var msg *msgToSend
206206
var err error
207-
if data, err = f.EncodeData(tag, tm, message); err != nil {
207+
if msg, err = f.EncodeData(tag, tm, message); err != nil {
208208
return fmt.Errorf("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%v", message, err)
209209
}
210-
return f.postRawData(data)
210+
return f.postRawData(msg)
211211
}
212212

213213
// Deprecated: Use EncodeAndPostData instead
214-
func (f *Fluent) PostRawData(data *msgToSend) {
215-
f.postRawData(data)
214+
func (f *Fluent) PostRawData(msg *msgToSend) {
215+
f.postRawData(msg)
216216
}
217217

218-
func (f *Fluent) postRawData(data *msgToSend) error {
218+
func (f *Fluent) postRawData(msg *msgToSend) error {
219219
if f.Config.Async {
220-
return f.appendBuffer(data)
220+
return f.appendBuffer(msg)
221221
}
222222
// Synchronous write
223-
return f.write(data)
223+
return f.write(msg)
224224
}
225225

226226
// For sending forward protocol adopted JSON
@@ -247,37 +247,41 @@ func (chunk *MessageChunk) MarshalJSON() ([]byte, error) {
247247
// getUniqueID returns a base64 encoded unique ID that can be used for chunk/ack
248248
// mechanism, see
249249
// https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#option
250-
func getUniqueID(timeUnix int64) string {
250+
func getUniqueID(timeUnix int64) (string, error) {
251251
buf := bytes.NewBuffer(nil)
252252
enc := base64.NewEncoder(base64.StdEncoding, buf)
253253
if err := binary.Write(enc, binary.LittleEndian, timeUnix); err != nil {
254-
panic(err)
254+
return "", err
255255
}
256256
if err := binary.Write(enc, binary.LittleEndian, rand.Uint64()); err != nil {
257-
panic(err)
257+
return "", err
258258
}
259259
enc.Close()
260-
return buf.String()
260+
return buf.String(), nil
261261
}
262262

263-
func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (data *msgToSend, err error) {
263+
func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg *msgToSend, err error) {
264264
option := make(map[string]string)
265-
data = &msgToSend{}
265+
msg = &msgToSend{}
266266
timeUnix := tm.Unix()
267267
if f.Config.RequestAck {
268-
data.ack = getUniqueID(timeUnix)
269-
option["chunk"] = data.ack
268+
var err error
269+
msg.ack, err = getUniqueID(timeUnix)
270+
if err != nil {
271+
return nil, err
272+
}
273+
option["chunk"] = msg.ack
270274
}
271275
if f.Config.MarshalAsJSON {
272-
msg := Message{Tag: tag, Time: timeUnix, Record: message, Option: option}
273-
chunk := &MessageChunk{message: msg}
274-
data.data, err = json.Marshal(chunk)
276+
m := Message{Tag: tag, Time: timeUnix, Record: message, Option: option}
277+
chunk := &MessageChunk{message: m}
278+
msg.data, err = json.Marshal(chunk)
275279
} else if f.Config.SubSecondPrecision {
276-
msg := &MessageExt{Tag: tag, Time: EventTime(tm), Record: message, Option: option}
277-
data.data, err = msg.MarshalMsg(nil)
280+
m := &MessageExt{Tag: tag, Time: EventTime(tm), Record: message, Option: option}
281+
msg.data, err = m.MarshalMsg(nil)
278282
} else {
279-
msg := &Message{Tag: tag, Time: timeUnix, Record: message, Option: option}
280-
data.data, err = msg.MarshalMsg(nil)
283+
m := &Message{Tag: tag, Time: timeUnix, Record: message, Option: option}
284+
msg.data, err = m.MarshalMsg(nil)
281285
}
282286
return
283287
}
@@ -293,9 +297,9 @@ func (f *Fluent) Close() (err error) {
293297
}
294298

295299
// appendBuffer appends data to buffer with lock.
296-
func (f *Fluent) appendBuffer(data *msgToSend) error {
300+
func (f *Fluent) appendBuffer(msg *msgToSend) error {
297301
select {
298-
case f.pending <- data:
302+
case f.pending <- msg:
299303
default:
300304
return fmt.Errorf("fluent#appendBuffer: Buffer full, limit %v", f.Config.BufferLimit)
301305
}
@@ -346,7 +350,7 @@ func e(x, y float64) int {
346350
return int(math.Pow(x, y))
347351
}
348352

349-
func (f *Fluent) write(data *msgToSend) error {
353+
func (f *Fluent) write(msg *msgToSend) error {
350354

351355
for i := 0; i < f.Config.MaxRetry; i++ {
352356

@@ -366,28 +370,28 @@ func (f *Fluent) write(data *msgToSend) error {
366370
}
367371
f.muconn.Unlock()
368372

369-
// We're connected, write data
373+
// We're connected, write msg
370374
t := f.Config.WriteTimeout
371375
if time.Duration(0) < t {
372376
f.conn.SetWriteDeadline(time.Now().Add(t))
373377
} else {
374378
f.conn.SetWriteDeadline(time.Time{})
375379
}
376-
_, err := f.conn.Write(data.data)
380+
_, err := f.conn.Write(msg.data)
377381
if err != nil {
378382
f.close()
379383
} else {
380384
// Acknowledgment check
381-
if data.ack != "" {
382-
ack := &AckResp{}
385+
if msg.ack != "" {
386+
resp := &AckResp{}
383387
if f.Config.MarshalAsJSON {
384388
dec := json.NewDecoder(f.conn)
385-
err = dec.Decode(ack)
389+
err = dec.Decode(resp)
386390
} else {
387391
r := msgp.NewReader(f.conn)
388-
err = ack.DecodeMsg(r)
392+
err = resp.DecodeMsg(r)
389393
}
390-
if err != nil || ack.Ack != data.ack {
394+
if err != nil || resp.Ack != msg.ack {
391395
f.close()
392396
continue
393397
}

0 commit comments

Comments
 (0)