From 6a57f276ae4f5486e561b73c33d11a324eabc4b1 Mon Sep 17 00:00:00 2001 From: Francois Massot Date: Sat, 19 Jul 2025 07:30:25 +0200 Subject: [PATCH] Fix GitHub issue #152 --- pkg/quickwit/data_query.go | 49 ++++- pkg/quickwit/timestamp_infos.go | 6 +- pkg/quickwit/timestamp_infos_test.go | 313 +++++++++++++++++++++++++++ 3 files changed, 361 insertions(+), 7 deletions(-) diff --git a/pkg/quickwit/data_query.go b/pkg/quickwit/data_query.go index 752329f..6dadfbf 100644 --- a/pkg/quickwit/data_query.go +++ b/pkg/quickwit/data_query.go @@ -99,7 +99,10 @@ func (e *elasticsearchDataQuery) processQuery(q *Query, ms *es.MultiSearchReques processDocumentQuery(q, b, from, to, defaultTimeField) } else { // Otherwise, it is a time series query and we process it - processTimeSeriesQuery(q, b, from, to, defaultTimeField) + err := processTimeSeriesQuery(q, b, from, to, defaultTimeField) + if err != nil { + return err + } } return nil @@ -164,12 +167,18 @@ func (bucketAgg BucketAgg) generateSettingsForDSL() map[string]interface{} { return bucketAgg.Settings.MustMap() } -func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo int64, timeField string) es.AggBuilder { +func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo int64, timeField string) (es.AggBuilder, error) { // If no field is specified, use the time field field := bucketAgg.Field if field == "" { field = timeField } + + // Validate that we have a valid field name to prevent downstream errors + if field == "" { + return aggBuilder, fmt.Errorf("date_histogram aggregation '%s' has no field specified and datasource timeField is empty", bucketAgg.ID) + } + aggBuilder.DateHistogram(bucketAgg.ID, field, func(a *es.DateHistogramAgg, b es.AggBuilder) { a.FixedInterval = bucketAgg.Settings.Get("interval").MustString("auto") a.MinDocCount = bucketAgg.Settings.Get("min_doc_count").MustInt(0) @@ -204,7 +213,7 @@ func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFro aggBuilder = b }) - return aggBuilder + return aggBuilder, nil } func addHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg) es.AggBuilder { @@ -331,6 +340,30 @@ func isQueryWithError(query *Query) error { if len(query.Metrics) == 0 || !(isLogsQuery(query) || isDocumentQuery(query)) { return fmt.Errorf("invalid query, missing metrics and aggregations") } + } else { + // Validate bucket aggregations have valid fields where required + for _, bucketAgg := range query.BucketAggs { + // Check which aggregation types require fields + switch bucketAgg.Type { + case dateHistType: + // For date_histogram, field can be empty (will use timeField as fallback) + // Validation will happen at query processing time + continue + case histogramType, termsType, geohashGridType, nestedType: + // These aggregation types require a field + if bucketAgg.Field == "" { + return fmt.Errorf("invalid query, bucket aggregation '%s' (type: %s) is missing required field", bucketAgg.ID, bucketAgg.Type) + } + case filtersType: + // Filters aggregations don't need a field + continue + default: + // For unknown aggregation types, be conservative and require field + if bucketAgg.Field == "" { + return fmt.Errorf("invalid query, bucket aggregation '%s' (type: %s) is missing required field", bucketAgg.ID, bucketAgg.Type) + } + } + } } return nil } @@ -380,7 +413,7 @@ func processDocumentQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, b.Size(stringToIntWithDefaultValue(metric.Settings.Get("size").MustString(), defaultSize)) } -func processTimeSeriesQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) { +func processTimeSeriesQuery(q *Query, b *es.SearchRequestBuilder, from, to int64, defaultTimeField string) error { aggBuilder := b.Agg() // Process buckets // iterate backwards to create aggregations bottom-down @@ -390,7 +423,11 @@ func processTimeSeriesQuery(q *Query, b *es.SearchRequestBuilder, from, to int64 ) switch bucketAgg.Type { case dateHistType: - aggBuilder = addDateHistogramAgg(aggBuilder, bucketAgg, from, to, defaultTimeField) + var err error + aggBuilder, err = addDateHistogramAgg(aggBuilder, bucketAgg, from, to, defaultTimeField) + if err != nil { + return err + } case histogramType: aggBuilder = addHistogramAgg(aggBuilder, bucketAgg) case filtersType: @@ -471,6 +508,8 @@ func processTimeSeriesQuery(q *Query, b *es.SearchRequestBuilder, from, to int64 }) } } + + return nil } func stringToIntWithDefaultValue(valueStr string, defaultValue int) int { diff --git a/pkg/quickwit/timestamp_infos.go b/pkg/quickwit/timestamp_infos.go index f30d0c9..7d6f15f 100644 --- a/pkg/quickwit/timestamp_infos.go +++ b/pkg/quickwit/timestamp_infos.go @@ -112,13 +112,15 @@ func FindTimestampFormat(timestampFieldName string, parentName *string, fieldMap if nil != parentName { fieldName = fmt.Sprintf("%s.%s", *parentName, fieldName) } - if field.Type == "datetime" && fieldName == timestampFieldName && nil != field.OutputFormat { return *field.OutputFormat, true } else if field.Type == "object" && nil != field.FieldMappings { - return FindTimestampFormat(timestampFieldName, &field.Name, field.FieldMappings) + if result, found := FindTimestampFormat(timestampFieldName, &field.Name, field.FieldMappings); found { + return result, true + } } } + qwlog.Debug(fmt.Sprintf("FindTimestampFormat: no match found for %s", timestampFieldName)) return "", false } diff --git a/pkg/quickwit/timestamp_infos_test.go b/pkg/quickwit/timestamp_infos_test.go index b2db076..850e7b0 100644 --- a/pkg/quickwit/timestamp_infos_test.go +++ b/pkg/quickwit/timestamp_infos_test.go @@ -5,6 +5,7 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -168,3 +169,315 @@ func TestNewErrorCreationPayload(t *testing.T) { require.ErrorContains(t, err, "\"status\":400") }) } + +func TestDecodeTimestampFieldInfosWithIssue152DocMapping(t *testing.T) { + // This is the exact doc mapping from GitHub issue #152 + docMappingJSON := `{ + "doc_mapping_uid": "01JYK2J58VAC6HJX2H90K9F7R6", + "mode": "dynamic", + "dynamic_mapping": { + "indexed": true, + "tokenizer": "raw", + "record": "basic", + "stored": true, + "expand_dots": true, + "fast": { + "normalizer": "raw" + } + }, + "field_mappings": [ + { + "name": "actor", + "type": "object", + "field_mappings": [ + { + "description": "Actor Type (Employee, User, System)", + "fast": { + "normalizer": "raw" + }, + "fieldnorms": false, + "indexed": true, + "name": "type", + "record": "basic", + "stored": true, + "tokenizer": "raw", + "type": "text" + }, + { + "description": "Actor Metadata", + "expand_dots": true, + "fast": false, + "indexed": true, + "name": "metadata", + "record": "basic", + "stored": true, + "tokenizer": "default", + "type": "json" + } + ] + }, + { + "name": "event", + "type": "object", + "field_mappings": [ + { + "description": "Event Type (Normal, Authorization, Privacy, Location)", + "fast": { + "normalizer": "raw" + }, + "fieldnorms": false, + "indexed": true, + "name": "type", + "record": "basic", + "stored": true, + "tokenizer": "raw", + "type": "text" + }, + { + "description": "Event Operation", + "fast": { + "normalizer": "raw" + }, + "fieldnorms": false, + "indexed": true, + "name": "operation", + "record": "basic", + "stored": true, + "tokenizer": "default", + "type": "text" + }, + { + "description": "Event Reason", + "fast": false, + "fieldnorms": false, + "indexed": true, + "name": "reason", + "record": "basic", + "stored": true, + "tokenizer": "default", + "type": "text" + }, + { + "field_mappings": [ + { + "description": "Event Resource Type", + "fast": { + "normalizer": "raw" + }, + "fieldnorms": false, + "indexed": true, + "name": "type", + "record": "basic", + "stored": true, + "tokenizer": "raw", + "type": "text" + }, + { + "description": "Event Resource Value", + "fast": { + "normalizer": "raw" + }, + "fieldnorms": false, + "indexed": true, + "name": "value", + "record": "basic", + "stored": true, + "tokenizer": "default", + "type": "text" + } + ], + "name": "resource", + "type": "object" + }, + { + "description": "Event Metadata", + "expand_dots": true, + "fast": false, + "indexed": true, + "name": "metadata", + "record": "basic", + "stored": true, + "tokenizer": "default", + "type": "json" + } + ] + }, + { + "name": "source", + "type": "object", + "field_mappings": [ + { + "description": "Source Type (Admin, Service)", + "fast": { + "normalizer": "raw" + }, + "fieldnorms": false, + "indexed": true, + "name": "type", + "record": "basic", + "stored": true, + "tokenizer": "raw", + "type": "text" + }, + { + "field_mappings": [ + { + "description": "Source Name", + "fast": { + "normalizer": "raw" + }, + "fieldnorms": false, + "indexed": true, + "name": "name", + "record": "basic", + "stored": true, + "tokenizer": "default", + "type": "text" + }, + { + "description": "Source Country Code", + "fast": { + "normalizer": "raw" + }, + "fieldnorms": false, + "indexed": true, + "name": "country_code", + "record": "basic", + "stored": true, + "tokenizer": "raw", + "type": "text" + }, + { + "description": "Source URL", + "fast": false, + "fieldnorms": false, + "indexed": true, + "name": "url", + "record": "basic", + "stored": true, + "tokenizer": "default", + "type": "text" + }, + { + "description": "Source Request ID", + "fast": { + "normalizer": "raw" + }, + "fieldnorms": false, + "indexed": true, + "name": "request_id", + "record": "basic", + "stored": true, + "tokenizer": "raw", + "type": "text" + } + ], + "name": "metadata", + "type": "object" + } + ] + }, + { + "name": "timestamp", + "type": "datetime", + "description": "Log occurrence timestamp", + "fast": true, + "fast_precision": "seconds", + "indexed": true, + "input_formats": [ + "iso8601", + "unix_timestamp" + ], + "output_format": "unix_timestamp_nanos", + "stored": true + }, + { + "name": "env", + "type": "text", + "description": "Environment (alpha, prod)", + "fast": { + "normalizer": "raw" + }, + "fieldnorms": false, + "indexed": true, + "record": "basic", + "stored": true, + "tokenizer": "raw" + }, + { + "name": "region", + "type": "text", + "description": "Region (kr, ca, jp, gb)", + "fast": { + "normalizer": "raw" + }, + "fieldnorms": false, + "indexed": true, + "record": "basic", + "stored": true, + "tokenizer": "raw" + } + ], + "timestamp_field": "timestamp", + "tag_fields": [], + "max_num_partitions": 200, + "index_field_presence": false, + "store_document_size": false, + "store_source": false, + "tokenizers": [] + }` + + // Create the index metadata structure as it would be parsed from the API + indexMetadata := QuickwitIndexMetadata{ + IndexConfig: struct { + IndexID string `json:"index_id"` + DocMapping struct { + TimestampField string `json:"timestamp_field"` + FieldMappings []FieldMappings `json:"field_mappings"` + } `json:"doc_mapping"` + }{ + IndexID: "test-index", + }, + } + + // Parse just the doc_mapping part + var docMapping struct { + TimestampField string `json:"timestamp_field"` + FieldMappings []FieldMappings `json:"field_mappings"` + } + + err := json.Unmarshal([]byte(docMappingJSON), &docMapping) + require.NoError(t, err) + + indexMetadata.IndexConfig.DocMapping = docMapping + + // Debug: Print the parsed field mappings + t.Logf("Timestamp field from doc mapping: %s", docMapping.TimestampField) + t.Logf("Number of field mappings: %d", len(docMapping.FieldMappings)) + + for i, field := range docMapping.FieldMappings { + t.Logf("Field %d: name=%s, type=%s, output_format=%v", i, field.Name, field.Type, field.OutputFormat) + if field.Name == "timestamp" { + t.Logf("Found timestamp field: name=%s, type=%s, output_format=%v", field.Name, field.Type, field.OutputFormat) + } + } + + // Test the timestamp field detection + timestampField, outputFormat := FindTimestampFieldInfos(indexMetadata) + + t.Logf("FindTimestampFieldInfos returned: field=%s, format=%s", timestampField, outputFormat) + + // Verify that it correctly identifies the timestamp field and format + assert.Equal(t, "timestamp", timestampField, "Should correctly identify the timestamp field") + assert.Equal(t, "unix_timestamp_nanos", outputFormat, "Should correctly identify the output format") + + // Test the higher-level function too + timestampFieldName, timestampOutputFormat, err := GetTimestampFieldInfos([]QuickwitIndexMetadata{indexMetadata}) + if err != nil { + t.Logf("GetTimestampFieldInfos error: %v", err) + } else { + require.NoError(t, err) + assert.Equal(t, "timestamp", timestampFieldName) + assert.Equal(t, "unix_timestamp_nanos", timestampOutputFormat) + } +}