Skip to content

Commit 0838812

Browse files
HynoRssongliu
andauthored
feat: Migrate to MinIO SDK and enhance S3 client with TLS and context (#12207)
* chore: update dependencies and migrate S3 client to MinIO SDK - Removed AWS SDK dependency and replaced S3 client implementation with MinIO SDK. - Updated various dependencies in go.mod and go.sum, including version upgrades for `klauspost/compress`, `minio/minio-go`, and `rs/xid`. - Adjusted S3 client methods to utilize MinIO's API for bucket listing, object existence checks, uploads, downloads, and deletions. * refactor: enhance S3 client with context management and TLS support - Introduced context management with timeouts for S3 client operations to improve reliability. - Added TLS configuration to support secure connections based on the endpoint scheme. - Updated S3 client methods to utilize the new context and transport settings for enhanced performance and security. --------- Co-authored-by: ssongliu <sloooop1x@gmail.com>
1 parent 34ccc68 commit 0838812

3 files changed

Lines changed: 92 additions & 78 deletions

File tree

agent/go.mod

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ replace github.com/moby/go-archive => github.com/moby/go-archive v0.1.0
66

77
require (
88
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
9-
github.com/aws/aws-sdk-go v1.55.0
109
github.com/compose-spec/compose-go/v2 v2.9.1
1110
github.com/creack/pty v1.1.24
1211
github.com/docker/cli v29.2.1+incompatible
@@ -57,6 +56,7 @@ require (
5756
golang.org/x/oauth2 v0.35.0
5857
golang.org/x/sys v0.41.0
5958
golang.org/x/text v0.35.0
59+
golang.org/x/time v0.14.0
6060
google.golang.org/genproto v0.0.0-20241021214115-324edc3d5d38
6161
gopkg.in/ini.v1 v1.67.1
6262
gopkg.in/yaml.v3 v3.0.1
@@ -165,7 +165,6 @@ require (
165165
github.com/jackc/puddle/v2 v2.2.2 // indirect
166166
github.com/jinzhu/inflection v1.0.0 // indirect
167167
github.com/jinzhu/now v1.1.5 // indirect
168-
github.com/jmespath/go-jmespath v0.4.0 // indirect
169168
github.com/json-iterator/go v1.1.13-0.20220915233716-71ac16282d12 // indirect
170169
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
171170
github.com/klauspost/pgzip v1.2.5 // indirect
@@ -267,7 +266,6 @@ require (
267266
golang.org/x/mod v0.33.0 // indirect
268267
golang.org/x/sync v0.20.0 // indirect
269268
golang.org/x/term v0.40.0 // indirect
270-
golang.org/x/time v0.14.0 // indirect
271269
golang.org/x/tools v0.42.0 // indirect
272270
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
273271
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect

agent/go.sum

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,6 @@ github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3d
135135
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw=
136136
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
137137
github.com/aws/aws-sdk-go v1.40.45/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
138-
github.com/aws/aws-sdk-go v1.55.0 h1:hVALKPjXz33kP1R9nTyJpUK7qF59dO2mleQxUW9mCVE=
139-
github.com/aws/aws-sdk-go v1.55.0/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
140138
github.com/aws/aws-sdk-go-v2 v1.9.1/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
141139
github.com/aws/aws-sdk-go-v2 v1.41.1 h1:ABlyEARCDLN034NhxlRUSZr4l71mh+T5KAeGh6cerhU=
142140
github.com/aws/aws-sdk-go-v2 v1.41.1/go.mod h1:MayyLB8y+buD9hZqkCW3kX1AKq07Y5pXxtgB+rRFhz0=
@@ -650,9 +648,7 @@ github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkr
650648
github.com/jinzhu/now v1.0.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
651649
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
652650
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
653-
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
654651
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
655-
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
656652
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
657653
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
658654
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
Lines changed: 91 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,30 @@
11
package client
22

33
import (
4+
"context"
5+
"crypto/tls"
6+
"net/http"
47
"os"
8+
"strings"
9+
"time"
510

6-
"github.com/aws/aws-sdk-go/aws"
7-
"github.com/aws/aws-sdk-go/aws/awserr"
8-
"github.com/aws/aws-sdk-go/aws/credentials"
9-
"github.com/aws/aws-sdk-go/aws/session"
10-
"github.com/aws/aws-sdk-go/service/s3"
11-
"github.com/aws/aws-sdk-go/service/s3/s3manager"
11+
"github.com/minio/minio-go/v7"
12+
"github.com/minio/minio-go/v7/pkg/credentials"
13+
)
14+
15+
const (
16+
s3DefaultTimeout = 30 * time.Second
17+
s3TransferTimeout = 24 * time.Hour
1218
)
1319

1420
type s3Client struct {
1521
scType string
1622
bucket string
17-
Sess session.Session
23+
client *minio.Client
24+
}
25+
26+
func (s *s3Client) ctx(timeout time.Duration) (context.Context, context.CancelFunc) {
27+
return context.WithTimeout(context.Background(), timeout)
1828
}
1929

2030
func NewS3Client(vars map[string]interface{}) (*s3Client, error) {
@@ -31,69 +41,80 @@ func NewS3Client(vars map[string]interface{}) (*s3Client, error) {
3141
if len(mode) == 0 {
3242
mode = "virtual hosted"
3343
}
34-
sess, err := session.NewSession(&aws.Config{
35-
Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""),
36-
Endpoint: aws.String(endpoint),
37-
Region: aws.String(region),
38-
DisableSSL: aws.Bool(true), S3ForcePathStyle: aws.Bool(mode == "path"),
44+
45+
lookupStyle := minio.BucketLookupDNS
46+
if mode == "path" {
47+
lookupStyle = minio.BucketLookupPath
48+
}
49+
50+
ssl := strings.Split(endpoint, ":")[0]
51+
secure := false
52+
tlsConfig := &tls.Config{}
53+
if ssl == "https" {
54+
secure = true
55+
tlsConfig.InsecureSkipVerify = true
56+
}
57+
var transport http.RoundTripper = &http.Transport{
58+
TLSClientConfig: tlsConfig,
59+
}
60+
61+
endpoint = strings.TrimPrefix(endpoint, ssl+"://")
62+
63+
client, err := minio.New(endpoint, &minio.Options{
64+
Creds: credentials.NewStaticV4(accessKey, secretKey, ""),
65+
Secure: secure,
66+
Region: region,
67+
BucketLookup: lookupStyle,
68+
Transport: transport,
3969
})
4070
if err != nil {
4171
return nil, err
4272
}
43-
return &s3Client{scType: scType, bucket: bucket, Sess: *sess}, nil
73+
return &s3Client{scType: scType, bucket: bucket, client: client}, nil
4474
}
4575

4676
func (s s3Client) ListBuckets() ([]interface{}, error) {
47-
var result []interface{}
48-
svc := s3.New(&s.Sess)
49-
res, err := svc.ListBuckets(nil)
77+
ctx, cancel := s.ctx(s3DefaultTimeout)
78+
defer cancel()
79+
buckets, err := s.client.ListBuckets(ctx)
5080
if err != nil {
5181
return nil, err
5282
}
53-
for _, b := range res.Buckets {
83+
var result []interface{}
84+
for _, b := range buckets {
5485
result = append(result, b.Name)
5586
}
5687
return result, nil
5788
}
5889

5990
func (s s3Client) Exist(path string) (bool, error) {
60-
svc := s3.New(&s.Sess)
61-
if _, err := svc.HeadObject(&s3.HeadObjectInput{
62-
Bucket: &s.bucket,
63-
Key: &path,
64-
}); err != nil {
65-
if aerr, ok := err.(awserr.RequestFailure); ok {
66-
if aerr.StatusCode() == 404 {
67-
return false, nil
68-
}
69-
} else {
70-
return false, aerr
91+
ctx, cancel := s.ctx(s3DefaultTimeout)
92+
defer cancel()
93+
_, err := s.client.StatObject(ctx, s.bucket, path, minio.StatObjectOptions{})
94+
if err != nil {
95+
resp := minio.ToErrorResponse(err)
96+
if resp.StatusCode == 404 {
97+
return false, nil
7198
}
99+
return false, err
72100
}
73101
return true, nil
74102
}
75103

76104
func (s *s3Client) Size(path string) (int64, error) {
77-
svc := s3.New(&s.Sess)
78-
file, err := svc.GetObject(&s3.GetObjectInput{
79-
Bucket: &s.bucket,
80-
Key: &path,
81-
})
105+
ctx, cancel := s.ctx(s3DefaultTimeout)
106+
defer cancel()
107+
info, err := s.client.StatObject(ctx, s.bucket, path, minio.StatObjectOptions{})
82108
if err != nil {
83109
return 0, err
84110
}
85-
return *file.ContentLength, nil
111+
return info.Size, nil
86112
}
87113

88114
func (s s3Client) Delete(path string) (bool, error) {
89-
svc := s3.New(&s.Sess)
90-
if _, err := svc.DeleteObject(&s3.DeleteObjectInput{Bucket: aws.String(s.bucket), Key: aws.String(path)}); err != nil {
91-
return false, err
92-
}
93-
if err := svc.WaitUntilObjectNotExists(&s3.HeadObjectInput{
94-
Bucket: aws.String(s.bucket),
95-
Key: aws.String(path),
96-
}); err != nil {
115+
ctx, cancel := s.ctx(s3DefaultTimeout)
116+
defer cancel()
117+
if err := s.client.RemoveObject(ctx, s.bucket, path, minio.RemoveObjectOptions{}); err != nil {
97118
return false, err
98119
}
99120
return true, nil
@@ -110,16 +131,21 @@ func (s s3Client) Upload(src, target string) (bool, error) {
110131
}
111132
defer file.Close()
112133

113-
uploader := s3manager.NewUploader(&s.Sess)
114-
if fileInfo.Size() > s3manager.MaxUploadParts*s3manager.DefaultUploadPartSize {
115-
uploader.PartSize = fileInfo.Size() / (s3manager.MaxUploadParts - 1)
134+
opts := minio.PutObjectOptions{
135+
StorageClass: s.scType,
136+
}
137+
138+
const maxParts = 10000
139+
const defaultPartSize = 64 * 1024 * 1024 // 64 MiB
140+
partSize := uint64(defaultPartSize)
141+
if fileInfo.Size() > int64(maxParts)*int64(defaultPartSize) {
142+
partSize = uint64(fileInfo.Size()) / (maxParts - 1)
116143
}
117-
if _, err := uploader.Upload(&s3manager.UploadInput{
118-
Bucket: aws.String(s.bucket),
119-
Key: aws.String(target),
120-
Body: file,
121-
StorageClass: &s.scType,
122-
}); err != nil {
144+
opts.PartSize = partSize
145+
146+
ctx, cancel := s.ctx(s3TransferTimeout)
147+
defer cancel()
148+
if _, err := s.client.PutObject(ctx, s.bucket, target, file, fileInfo.Size(), opts); err != nil {
123149
return false, err
124150
}
125151
return true, nil
@@ -129,34 +155,28 @@ func (s s3Client) Download(src, target string) (bool, error) {
129155
if _, err := os.Stat(target); err == nil {
130156
_ = os.Remove(target)
131157
}
132-
file, err := os.Create(target)
133-
if err != nil {
134-
return false, err
135-
}
136-
defer file.Close()
137-
downloader := s3manager.NewDownloader(&s.Sess)
138-
if _, err = downloader.Download(file, &s3.GetObjectInput{
139-
Bucket: aws.String(s.bucket),
140-
Key: aws.String(src),
141-
}); err != nil {
158+
ctx, cancel := s.ctx(s3TransferTimeout)
159+
defer cancel()
160+
if err := s.client.FGetObject(ctx, s.bucket, src, target, minio.GetObjectOptions{}); err != nil {
142161
os.Remove(target)
143162
return false, err
144163
}
145164
return true, nil
146165
}
147166

148167
func (s *s3Client) ListObjects(prefix string) ([]string, error) {
149-
svc := s3.New(&s.Sess)
150-
var result []string
151-
outputs, err := svc.ListObjects(&s3.ListObjectsInput{
152-
Bucket: &s.bucket,
153-
Prefix: &prefix,
154-
})
155-
if err != nil {
156-
return result, err
168+
opts := minio.ListObjectsOptions{
169+
Recursive: true,
170+
Prefix: prefix,
157171
}
158-
for _, item := range outputs.Contents {
159-
result = append(result, *item.Key)
172+
var result []string
173+
ctx, cancel := s.ctx(s3DefaultTimeout)
174+
defer cancel()
175+
for object := range s.client.ListObjects(ctx, s.bucket, opts) {
176+
if object.Err != nil {
177+
return result, object.Err
178+
}
179+
result = append(result, object.Key)
160180
}
161181
return result, nil
162182
}

0 commit comments

Comments
 (0)