diff --git a/packages/google_cloud_storage/_dev/deploy/docker/docker-compose.yml b/packages/google_cloud_storage/_dev/deploy/docker/docker-compose.yml index eb4f52c7cd9..87f214f7ce7 100644 --- a/packages/google_cloud_storage/_dev/deploy/docker/docker-compose.yml +++ b/packages/google_cloud_storage/_dev/deploy/docker/docker-compose.yml @@ -1,9 +1,16 @@ version: '2.3' services: - google-cloud-storage-emulator: - image: fsouza/fake-gcs-server:latest - command: -host=0.0.0.0 -public-host=elastic-package-service_google-cloud-storage-emulator_1 -port=4443 -scheme=http + gcs-mock-service: + image: golang:1.24.7-alpine + working_dir: /app volumes: - - ./sample_logs:/data + - ./gcs-mock-service:/app + - ./files/manifest.yml:/files/manifest.yml:ro + - ./sample_logs/:/data:ro ports: - - 4443/tcp + - "4443/tcp" + healthcheck: + test: "wget --no-verbose --tries=1 --spider http://localhost:4443/health || exit 1" + interval: 10s + timeout: 5s + command: go run main.go -manifest /files/manifest.yml diff --git a/packages/google_cloud_storage/_dev/deploy/docker/files/manifest.yml b/packages/google_cloud_storage/_dev/deploy/docker/files/manifest.yml new file mode 100644 index 00000000000..61e12fd5342 --- /dev/null +++ b/packages/google_cloud_storage/_dev/deploy/docker/files/manifest.yml @@ -0,0 +1,5 @@ +buckets: + testbucket: + files: + - path: /data/testbucket/testdata.log + content-type: application/x-ndjson diff --git a/packages/google_cloud_storage/_dev/deploy/docker/gcs-mock-service/go.mod b/packages/google_cloud_storage/_dev/deploy/docker/gcs-mock-service/go.mod new file mode 100644 index 00000000000..df08767f7e8 --- /dev/null +++ b/packages/google_cloud_storage/_dev/deploy/docker/gcs-mock-service/go.mod @@ -0,0 +1,5 @@ +module gcs-mock-service + +go 1.24.7 + +require gopkg.in/yaml.v3 v3.0.1 diff --git a/packages/google_cloud_storage/_dev/deploy/docker/gcs-mock-service/go.sum b/packages/google_cloud_storage/_dev/deploy/docker/gcs-mock-service/go.sum new file mode 100644 index 00000000000..a62c313c5b0 --- /dev/null +++ b/packages/google_cloud_storage/_dev/deploy/docker/gcs-mock-service/go.sum @@ -0,0 +1,4 @@ +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/packages/google_cloud_storage/_dev/deploy/docker/gcs-mock-service/main.go b/packages/google_cloud_storage/_dev/deploy/docker/gcs-mock-service/main.go new file mode 100644 index 00000000000..391e75c7efe --- /dev/null +++ b/packages/google_cloud_storage/_dev/deploy/docker/gcs-mock-service/main.go @@ -0,0 +1,297 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package main + +import ( + "encoding/json" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "strconv" + "strings" + + "gopkg.in/yaml.v3" +) + +func main() { + host := flag.String("host", "0.0.0.0", "host to listen on") + port := flag.String("port", "4443", "port to listen on") + manifest := flag.String("manifest", "", "path to YAML manifest file for preloading buckets and objects") + flag.Parse() + + addr := fmt.Sprintf("%s:%s", *host, *port) + + fmt.Printf("Starting mock GCS server on http://%s\n", addr) + if *manifest != "" { + m, err := readManifest(*manifest) + if err != nil { + log.Fatalf("error reading manifest: %v", err) + } + if err := processManifest(m); err != nil { + log.Fatalf("error processing manifest: %v", err) + } + } else { + fmt.Println("Store is empty. Create buckets and objects via API calls.") + } + + // setup HTTP handlers + mux := http.NewServeMux() + // health check + mux.HandleFunc("/health", healthHandler) + // standard gcs api calls + mux.HandleFunc("GET /storage/v1/b/{bucket}/o", handleListObjects) + mux.HandleFunc("GET /storage/v1/b/{bucket}/o/{object...}", handleGetObject) + mux.HandleFunc("POST /storage/v1/b", handleCreateBucket) + mux.HandleFunc("POST /upload/storage/v1/b/{bucket}/o", handleUploadObject) + mux.HandleFunc("POST /upload/storage/v1/b/{bucket}/o/{object...}", handleUploadObject) + // direct path-style gcs sdk calls + mux.HandleFunc("GET /{bucket}/o/{object...}", handleGetObject) + mux.HandleFunc("GET /{bucket}/{object...}", handleGetObject) + // debug: log all requests + loggedMux := loggingMiddleware(mux) + + if err := http.ListenAndServe(addr, loggedMux); err != nil { + log.Fatalf("failed to start server: %v", err) + } +} + +// loggingMiddleware logs incoming HTTP requests. +func loggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Printf("%s %s\n", r.Method, r.URL.Path) + next.ServeHTTP(w, r) + }) +} + +// readManifest reads and parses the YAML manifest file. +func readManifest(path string) (*Manifest, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read manifest: %w", err) + } + + var manifest Manifest + if err := yaml.Unmarshal(data, &manifest); err != nil { + return nil, fmt.Errorf("failed to parse manifest: %w", err) + } + + return &manifest, nil +} + +// processManifest creates buckets and uploads objects as specified in the manifest. +func processManifest(manifest *Manifest) error { + for bucketName, bucket := range manifest.Buckets { + for _, file := range bucket.Files { + fmt.Printf("preloading data for bucket: %s | path: %s | content-type: %s...\n", + bucketName, file.Path, file.ContentType) + + if err := createBucket(bucketName); err != nil { + return fmt.Errorf("failed to create bucket '%s': %w", bucketName, err) + } + data, err := os.ReadFile(file.Path) + if err != nil { + return fmt.Errorf("failed to read bucket data file '%s': %w", file.Path, err) + } + pathParts := strings.Split(file.Path, "/") + if _, err := uploadObject(bucketName, pathParts[len(pathParts)-1], data, file.ContentType); err != nil { + return fmt.Errorf("failed to create object '%s' in bucket '%s': %w", file.Path, bucketName, err) + } + } + } + return nil +} + +// healthHandler responds with a simple "OK" message for health checks. +func healthHandler(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, "OK") +} + +// handleListObjects lists all objects in the specified bucket. +func handleListObjects(w http.ResponseWriter, r *http.Request) { + bucketName := r.PathValue("bucket") + + if bucket, ok := inMemoryStore[bucketName]; ok { + response := GCSListResponse{ + Kind: "storage#objects", + Items: []GCSObject{}, + } + for name, object := range bucket { + item := GCSObject{ + Kind: "storage#object", + Name: name, + Bucket: bucketName, + Size: strconv.Itoa(len(object.Data)), + ContentType: object.ContentType, + } + response.Items = append(response.Items, item) + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) + return + } + http.Error(w, "not found", http.StatusNotFound) +} + +// handleGetObject retrieves a specific object from a bucket. +func handleGetObject(w http.ResponseWriter, r *http.Request) { + bucketName := r.PathValue("bucket") + objectName := r.PathValue("object") + + if bucketName == "" || objectName == "" { + http.Error(w, "not found: invalid URL format", http.StatusNotFound) + return + } + + if bucket, ok := inMemoryStore[bucketName]; ok { + if object, ok := bucket[objectName]; ok { + w.Header().Set("Content-Type", object.ContentType) + w.Write(object.Data) + return + } + } + http.Error(w, "not found", http.StatusNotFound) +} + +// handleCreateBucket creates a new bucket. +func handleCreateBucket(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.NotFound(w, r) + return + } + + var bucketInfo struct { + Name string `json:"name"` + } + if err := json.NewDecoder(r.Body).Decode(&bucketInfo); err != nil { + http.Error(w, "invalid JSON body", http.StatusBadRequest) + return + } + if bucketInfo.Name == "" { + http.Error(w, "bucket name is required", http.StatusBadRequest) + return + } + + if err := createBucket(bucketInfo.Name); err != nil { + http.Error(w, err.Error(), http.StatusConflict) + return + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(bucketInfo) +} + +// handleUploadObject uploads an object to a specified bucket. +func handleUploadObject(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.NotFound(w, r) + return + } + + bucketName := r.PathValue("bucket") + objectName := r.URL.Query().Get("name") + if objectName == "" { + objectName = r.PathValue("object") + } + + if bucketName == "" || objectName == "" { + http.Error(w, "missing bucket or object name", http.StatusBadRequest) + return + } + + data, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "failed to read request body", http.StatusInternalServerError) + return + } + defer r.Body.Close() + + contentType := r.Header.Get("Content-Type") + if contentType == "" { + contentType = "application/octet-stream" + } + + response, err := uploadObject(bucketName, objectName, data, contentType) + if err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} + +func createBucket(bucketName string) error { + if _, exists := inMemoryStore[bucketName]; exists { + return fmt.Errorf("bucket already exists") + } + inMemoryStore[bucketName] = make(map[string]ObjectData) + log.Printf("created bucket: %s", bucketName) + return nil +} + +func uploadObject(bucketName, objectName string, data []byte, contentType string) (*GCSObject, error) { + if _, ok := inMemoryStore[bucketName]; !ok { + return nil, fmt.Errorf("bucket not found") + } + + inMemoryStore[bucketName][objectName] = ObjectData{ + Data: data, + ContentType: contentType, + } + log.Printf("created object '%s' in bucket '%s' with Content-Type '%s'", + objectName, bucketName, contentType) + + return &GCSObject{ + Kind: "storage#object", + Name: objectName, + Bucket: bucketName, + Size: strconv.Itoa(len(data)), + ContentType: contentType, + }, nil +} + +// The in-memory store to hold ObjectData structs. +var inMemoryStore = make(map[string]map[string]ObjectData) + +// ObjectData stores the raw data and its content type. +type ObjectData struct { + Data []byte + ContentType string +} + +// GCSListResponse mimics the structure of a real GCS object list response. +type GCSListResponse struct { + Kind string `json:"kind"` + Items []GCSObject `json:"items"` +} + +// GCSObject mimics the structure of a GCS object resource with ContentType. +type GCSObject struct { + Kind string `json:"kind"` + Name string `json:"name"` + Bucket string `json:"bucket"` + Size string `json:"size"` + ContentType string `json:"contentType"` +} + +// Manifest represents the top-level structure of the YAML file +type Manifest struct { + Buckets map[string]Bucket `yaml:"buckets"` +} + +// Bucket represents each bucket and its files +type Bucket struct { + Files []File `yaml:"files"` +} + +// File represents each file entry inside a bucket +type File struct { + Path string `yaml:"path"` + ContentType string `yaml:"content-type"` +} diff --git a/packages/google_cloud_storage/_dev/test/system/test-default-config.yml b/packages/google_cloud_storage/_dev/test/system/test-default-config.yml index 6b19c97c611..0411ba73079 100644 --- a/packages/google_cloud_storage/_dev/test/system/test-default-config.yml +++ b/packages/google_cloud_storage/_dev/test/system/test-default-config.yml @@ -1,10 +1,15 @@ -service: google-cloud-storage-emulator +deployer: docker +service: gcs-mock-service input: gcs vars: project_id: testproject alternative_host: "http://{{Hostname}}:{{Port}}" + number_of_workers: 1 + service_account_key: "{\"type\":\"service_account\",\"project_id\":\"testproject\"}" data_stream.dataset: google_cloud_storage.gcs buckets: | - name: testbucket poll: true poll_interval: 15s +assert: + hit_count: 1 diff --git a/packages/google_cloud_storage/sample_event.json b/packages/google_cloud_storage/sample_event.json index 37ef5bb15b6..aab1c49ba86 100644 --- a/packages/google_cloud_storage/sample_event.json +++ b/packages/google_cloud_storage/sample_event.json @@ -1,36 +1,57 @@ { - "@timestamp": "2024-04-18T11:56:08.098Z", + "@timestamp": "2025-12-25T12:18:28.789Z", "agent": { - "ephemeral_id": "839b31c3-59b9-4418-b825-8f0ba4219502", - "id": "80442ebe-168f-468b-82af-30451478d848", - "name": "docker-fleet-agent", + "ephemeral_id": "b26cd9f8-57d0-4ccc-a116-e599c04ff251", + "id": "8f3305a5-3650-4e75-8687-d419445742be", + "name": "elastic-agent-63184", "type": "filebeat", - "version": "8.12.1" + "version": "8.13.0" + }, + "cloud": { + "provider": "google cloud" }, "data_stream": { "dataset": "google_cloud_storage.gcs", - "namespace": "ep", + "namespace": "30478", "type": "logs" }, "ecs": { - "version": "8.11.0" + "version": "8.0.0" }, "elastic_agent": { - "id": "80442ebe-168f-468b-82af-30451478d848", + "id": "8f3305a5-3650-4e75-8687-d419445742be", "snapshot": false, - "version": "8.12.1" + "version": "8.13.0" }, "event": { "agent_id_status": "verified", "dataset": "google_cloud_storage.gcs", - "ingested": "2024-04-18T11:56:20Z" + "ingested": "2025-12-25T12:18:31Z" + }, + "gcs": { + "storage": { + "bucket": { + "name": "testbucket" + }, + "object": { + "content_type": "application/x-ndjson", + "json_data": [], + "name": "testdata.log" + } + } }, "input": { "type": "gcs" }, - "message": "job with jobId testbucket-testdata.log-worker-0 encountered an error: content-type text/plain; charset=utf-8 not supported", + "log": { + "file": { + "path": "gs://testbucket/testdata.log" + }, + "offset": 0 + }, + "message": "{ \"testmessage\": \"success\" }", "tags": [ "forwarded", "google_cloud_storage-generic" ] -} \ No newline at end of file +}