-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathrows.go
More file actions
119 lines (111 loc) · 3 KB
/
rows.go
File metadata and controls
119 lines (111 loc) · 3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package redshiftdatasqldriver
import (
"context"
"database/sql/driver"
"io"
"strings"
"time"
"github.com/aws/aws-sdk-go-v2/service/redshiftdata"
"github.com/aws/aws-sdk-go-v2/service/redshiftdata/types"
)
type redshiftDataRows struct {
id string
p *redshiftdata.GetStatementResultPaginator
resultSet *redshiftdata.GetStatementResultOutput
columns []types.ColumnMetadata
columnNames []string
index int
}
func newRows(id string, p *redshiftdata.GetStatementResultPaginator) *redshiftDataRows {
debugLogger.Printf("[%s] create rows", id)
return &redshiftDataRows{
id: id,
p: p,
}
}
func (rows *redshiftDataRows) Close() (err error) {
debugLogger.Printf("[%s] rows close called", rows.id)
return nil
}
func (rows *redshiftDataRows) Columns() []string {
debugLogger.Printf("[%s] rows columns called", rows.id)
if rows.columnNames != nil {
return rows.columnNames
}
if err := rows.getStatementResult(); err != nil {
return []string{}
}
return rows.columnNames
}
func (rows *redshiftDataRows) getStatementResult() error {
if rows.p == nil {
return io.EOF
}
var err error
rows.resultSet, err = rows.p.NextPage(context.Background())
if err != nil {
return err
}
rows.columns = rows.resultSet.ColumnMetadata
rows.columnNames = make([]string, 0, len(rows.columns))
for _, meta := range rows.columns {
rows.columnNames = append(rows.columnNames, *meta.Name)
}
rows.index = 0
return nil
}
func (rows *redshiftDataRows) Next(dest []driver.Value) error {
debugLogger.Printf("[%s] rows next called", rows.id)
if rows.resultSet == nil || rows.index >= len(rows.resultSet.Records) {
if !rows.p.HasMorePages() {
return io.EOF
}
if err := rows.getStatementResult(); err != nil {
return err
}
rows.index = 0
if len(rows.resultSet.Records) == 0 {
return io.EOF
}
}
record := rows.resultSet.Records[rows.index]
for i := range dest {
if i < len(record) {
switch field := record[i].(type) {
case *types.FieldMemberIsNull:
dest[i] = nil
case *types.FieldMemberStringValue:
switch {
case strings.EqualFold(*rows.resultSet.ColumnMetadata[i].TypeName, "timestamp"):
t, err := time.Parse("2006-01-02 15:04:05", field.Value)
if err != nil {
errLogger.Printf(`time.Parse("2006-01-02 15:04:05", "%s"): %v`, field.Value, err)
dest[i] = nil
} else {
dest[i] = t
}
case strings.EqualFold(*rows.resultSet.ColumnMetadata[i].TypeName, "timestamptz"):
t, err := time.Parse("2006-01-02 15:04:05-07", field.Value)
if err != nil {
errLogger.Printf(`time.Parse("2006-01-02 15:04:05-07", "%s"): %v`, field.Value, err)
dest[i] = nil
} else {
dest[i] = t
}
default:
dest[i] = field.Value
}
case *types.FieldMemberLongValue:
dest[i] = field.Value
case *types.FieldMemberBooleanValue:
dest[i] = field.Value
case *types.FieldMemberDoubleValue:
dest[i] = field.Value
case *types.FieldMemberBlobValue:
dest[i] = field.Value
}
}
}
rows.index++
return nil
}