Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 48 additions & 25 deletions be/src/core/data_type_serde/data_type_struct_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "core/data_type_serde/data_type_struct_serde.h"

#include <algorithm>

#include "arrow/array/builder_nested.h"
#include "common/status.h"
#include "core/column/column.h"
Expand Down Expand Up @@ -516,21 +518,17 @@ Status DataTypeStructSerDe::_from_string(StringRef& str, IColumn& column,

const auto elem_size = elem_serdes_ptrs.size();

std::vector<StringRef> field_value;
// check syntax error
if (split_result.size() == elem_size) {
// no field name
for (int i = 0; i < split_result.size(); i++) {
if (i != split_result.size() - 1 &&
split_result[i].delimiter != options.collection_delim) {
return Status::InvalidArgument(
"Struct field value {} is not separated by collection_delim.", i);
}
field_value.push_back(split_result[i].element);
}
} else if (split_result.size() == 2 * elem_size) {
// field name : field value
int field_pos = 0;
std::vector<StringRef> field_value(elem_size);
std::vector<bool> got(elem_size, false);

// Named mode is detected by the delimiter structure (the first token is followed by a
// map_key_delim, e.g. {f1:1,f2:2}), so it also covers the case where only some fields are
// provided. In named mode the input field order may differ from the schema order and
// missing nullable fields are filled with NULL. Otherwise the tokens are matched positionally.
bool named_mode = !split_result.empty() && (split_result.size() % 2 == 0) &&
split_result[0].delimiter == options.map_key_delim;

if (named_mode) {
for (int i = 0; i < split_result.size(); i += 2) {
if (split_result[i].delimiter != options.map_key_delim) {
return Status::InvalidArgument(
Expand All @@ -540,18 +538,37 @@ Status DataTypeStructSerDe::_from_string(StringRef& str, IColumn& column,
return Status::InvalidArgument(
"Struct name-value pair does not have collection delimiter");
}
if (field_pos >= elem_size) {
return Status::InvalidArgument(
"Struct field number is more than schema field number");
}
auto field_name = split_result[i].element.trim_quote();

if (!field_name.eq(StringRef(elem_names[field_pos]))) {
return Status::InvalidArgument("Cannot find struct field name {} in schema.",
split_result[i].element.to_string());
// struct field names are stored lower-cased, so lower-case the input key for a
// case-insensitive match (consistent with the simdjson JSON reader).
std::string lower_name(field_name.data, field_name.size);
std::transform(lower_name.begin(), lower_name.end(), lower_name.begin(), ::tolower);
auto name_it = std::find(elem_names.begin(), elem_names.end(), lower_name);
if (name_it == elem_names.end()) {
// the input key is not a field of this struct
if constexpr (is_strict_mode) {
// strict CAST treats an unmatched key as bad input and fails
return Status::InvalidArgument("Cannot find struct field name {} in schema.",
split_result[i].element.to_string());
}
// non-strict load tolerates it: drop the extra key, matching the simdjson
// JSON reader that feeds STRUCT columns on JSON stream load
continue;
}
field_value.push_back(split_result[i + 1].element);
field_pos++;
size_t target = name_it - elem_names.begin();
field_value[target] = split_result[i + 1].element;
got[target] = true;
}
} else if (split_result.size() == elem_size) {
// no field name, matched positionally
for (int i = 0; i < split_result.size(); i++) {
if (i != split_result.size() - 1 &&
split_result[i].delimiter != options.collection_delim) {
return Status::InvalidArgument(
"Struct field value {} is not separated by collection_delim.", i);
}
field_value[i] = split_result[i].element;
got[i] = true;
}
} else {
return Status::InvalidArgument(
Expand All @@ -560,6 +577,12 @@ Status DataTypeStructSerDe::_from_string(StringRef& str, IColumn& column,
}

for (int field_pos = 0; field_pos < elem_size; ++field_pos) {
if (!got[field_pos]) {
// a missing field is filled with NULL (struct sub-columns are always nullable,
// same as the empty-struct '{}' handling above)
struct_column.get_column(field_pos).insert_default();
continue;
}
// Previously, there was rollback logic here in case of errors, similar to the logic in deserialize_one_cell_from_json.
// But it's not necessary here.
// If it is non-strict mode, the internal type is Nullable, and Nullable will handle errors itself.
Expand Down
103 changes: 103 additions & 0 deletions be/test/core/data_type_serde/data_type_serde_struct_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
#include "core/data_type/data_type_string.h"
#include "core/data_type/data_type_struct.h"
#include "core/data_type/define_primitive_type.h"
#include "core/data_type/primitive_type.h"
#include "core/field.h"
#include "core/string_buffer.hpp"
#include "core/types.h"
#include "storage/olap_common.h"
#include "testutil/test_util.h"
Expand Down Expand Up @@ -157,4 +159,105 @@ TEST_F(DataTypeStructSerDeTest, ArrowMemNotAligned) {
EXPECT_TRUE(st.ok());
}

// Regression test for OPENSOURCE-374: from_string (used by CAST string->struct, which is the
// stream-load JSON path) must match struct sub-fields by name, tolerate missing fields, and
// keep working for positional input. This covers every branch of DataTypeStructSerDe::_from_string.
TEST_F(DataTypeStructSerDeTest, FromStringByFieldName) {
DataTypePtr f1 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
DataTypePtr f2 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeFloat32>());
DataTypePtr f3 = std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
DataTypePtr st =
std::make_shared<DataTypeStruct>(DataTypes {f1, f2, f3}, Strings {"f1", "f2", "f3"});
auto serde = st->get_serde(1);
DataTypeSerDe::FormatOptions opt;

auto from_string = [&](const std::string& s, MutableColumnPtr& col) -> Status {
col = st->create_column();
std::string buf = s;
StringRef ref(buf.data(), buf.size());
return serde->from_string(ref, *col, opt);
};
// to_string is the inverse of from_string and always writes fields in schema order, so the
// input field order cancels out and we can compare results structurally.
auto to_string = [&](const MutableColumnPtr& col) -> std::string {
auto out = ColumnString::create();
VectorBufferWriter bw(*out);
serde->to_string(*col, 0, bw, opt);
bw.commit();
auto s = out->get_data_at(0);
return std::string(s.data, s.size);
};
auto has = [](const Status& st, const std::string& sub) {
return st.to_string().find(sub) != std::string::npos;
};

MutableColumnPtr ordered;
MutableColumnPtr c;
ASSERT_TRUE(from_string(R"({"f1":1,"f2":2.5,"f3":"a"})", ordered).ok());

// 1) out-of-order keys are matched by name (the core fix)
ASSERT_TRUE(from_string(R"({"f2":2.5,"f1":1,"f3":"a"})", c).ok());
EXPECT_EQ(to_string(ordered), to_string(c));

// 2) field names are matched case-insensitively (input is lower-cased before lookup)
ASSERT_TRUE(from_string(R"({"F2":2.5,"F1":1,"F3":"a"})", c).ok());
EXPECT_EQ(to_string(ordered), to_string(c));

// 3) a missing field is filled with NULL (named mode tolerates fewer fields)
MutableColumnPtr with_null;
ASSERT_TRUE(from_string(R"({"f1":1,"f2":2.5,"f3":null})", with_null).ok());
ASSERT_TRUE(from_string(R"({"f2":2.5,"f1":1})", c).ok());
EXPECT_EQ(to_string(with_null), to_string(c));

// 4) positional input (no field names) still works
ASSERT_TRUE(from_string(R"({1,2.5,"a"})", c).ok());
EXPECT_EQ(to_string(ordered), to_string(c));

// 5) empty struct '{}' yields all-NULL fields
MutableColumnPtr empty;
ASSERT_TRUE(from_string("{}", empty).ok());
ASSERT_TRUE(from_string(R"({"f1":null,"f2":null,"f3":null})", c).ok());
EXPECT_EQ(to_string(c), to_string(empty));

// 6) an unknown field is ignored, missing schema fields become NULL (consistent with the
// simdjson reader and PostgreSQL/Spark/Trino)
MutableColumnPtr f2_null;
ASSERT_TRUE(from_string(R"({"f1":1,"f2":null,"f3":"a"})", f2_null).ok());
ASSERT_TRUE(from_string(R"({"f1":1,"fx":2.5,"f3":"a"})", c).ok());
EXPECT_EQ(to_string(f2_null), to_string(c));

// 7) extra named fields beyond the schema are ignored (4 fields into a 3-field struct)
ASSERT_TRUE(from_string(R"({"f1":1,"f2":2.5,"f3":"a","f4":9})", c).ok());
EXPECT_EQ(to_string(ordered), to_string(c));

// --- error paths (each exercises a distinct branch / message) ---
// 8) name-value pair missing the collection delimiter, e.g. {"f1":1:"f2":2}
Status e = from_string(R"({"f1":1:"f2":2})", c);
EXPECT_FALSE(e.ok());
EXPECT_TRUE(has(e, "does not have collection delimiter"));

// 9) positional input whose count does not match the schema is rejected (too few or too
// many) -- without field names the arity must be exact, same as PG/Spark/Trino
e = from_string(R"({1,2.5})", c); // 2 values into a 3-field struct
EXPECT_FALSE(e.ok());
EXPECT_TRUE(has(e, "not equal to schema field number"));
e = from_string(R"({1,2.5,a,9})", c); // 4 values into a 3-field struct
EXPECT_FALSE(e.ok());
EXPECT_TRUE(has(e, "not equal to schema field number"));

// 10) positional value not separated by the collection delimiter, e.g. {1,2.5:3}
e = from_string(R"({1,2.5:x})", c);
EXPECT_FALSE(e.ok());
EXPECT_TRUE(has(e, "not separated by collection_delim"));

// 11) bad framing (missing braces)
EXPECT_FALSE(from_string(R"("f1":1)", c).ok());

// 12) strict mode propagates a sub-field parse error
auto sc = st->create_column();
std::string sbuf = R"({"f1":"notanint","f2":2.5,"f3":"a"})";
StringRef sref(sbuf.data(), sbuf.size());
EXPECT_FALSE(serde->from_string_strict_mode(sref, *sc, opt).ok());
}

} // namespace doris
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[
{"id": 1, "c_struct": {"f1": 10, "f2": 3.14, "f3": "Emily"}},
{"id": 2, "c_struct": {"f1": 4, "f2": 1.5, "f3": null}},
{"id": 3, "c_struct": {"f1": 7, "f2": null, "f3": "Benjamin"}},
{"id": 4, "c_struct": {}},
{"id": 5, "c_struct": null}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[
{"id": 20, "c_struct": {"f2": 1.5, "f1": 4, "f3": null}},
{"id": 21, "c_struct": {"f3": "Tom", "f2": 2.5, "f1": 9}},
{"id": 22, "c_struct": {"f2": 8.5, "f1": 1}},
{"id": 23, "c_struct": {"f1": 7, "f2": 3.5, "f3": "Z", "f4": 999}},
{"id": 24, "c_struct": {"F2": 6.5, "F1": 5, "F3": "U"}}
]
6 changes: 3 additions & 3 deletions regression-test/data/load_p0/stream_load/test_stream_load.out
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@
3 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":null, "f8":null, "f9":null, "f10":1.100000}
4 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
5 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
6 \N
7 \N
8 \N
6 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
7 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
8 {"f1":1, "f2":null, "f3":null, "f4":null, "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
9 {"f1":null, "f2":null, "f3":null, "f4":null, "f5":null, "f6":null, "f7":null, "f8":null, "f9":null, "f10":null}
10 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
11 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":null, "f8":null, "f9":null, "f10":1.100000}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@
3 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":null, "f8":null, "f9":null, "f10":1.100000}
4 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
5 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
6 \N
7 \N
8 \N
6 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
7 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
8 {"f1":1, "f2":null, "f3":null, "f4":null, "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
9 {"f1":null, "f2":null, "f3":null, "f4":null, "f5":null, "f6":null, "f7":null, "f8":null, "f9":null, "f10":null}
10 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":"2023-02-26 17:58:00", "f8":1.01, "f9":3.1415926, "f10":1.100000}
11 {"f1":1, "f2":100, "f3":100000, "f4":"a", "f5":"doris", "f6":"2023-02-26", "f7":null, "f8":null, "f9":null, "f10":1.100000}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@
{"f1":1, "f2":"2022-10-10 00:00:00"}

-- !sql15 --
\N
{"a":1, "b":1}

Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Regression test for OPENSOURCE-374: when loading JSON into a STRUCT column, sub-fields must
// be matched by field name. Out-of-order JSON keys in Stream Load used to turn the whole struct
// column into NULL; a missing field is now filled with NULL and an unknown field is ignored
// (consistent with PostgreSQL / Spark / Trino). Struct-to-struct CAST stays positional, which
// also matches those engines.
suite("test_struct_field_align") {
def tableName = "test_struct_field_align"

sql "DROP TABLE IF EXISTS ${tableName}"
sql """
CREATE TABLE ${tableName} (
id INT NOT NULL,
c_struct STRUCT<f1:INT,f2:FLOAT,f3:STRING> NULL
)
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1")
"""

def doStreamLoad = { fileName ->
streamLoad {
table tableName
set 'format', 'json'
set 'strip_outer_array', 'true'
set 'columns', 'id, c_struct'
file fileName
time 10000
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(0, json.NumberFilteredRows)
assertTrue(json.NumberLoadedRows > 0)
}
}
}

// 1) ordered keys (baseline)
doStreamLoad "test_struct_field_align_ordered.json"
Comment thread
csun5285 marked this conversation as resolved.
// 2) keys whose order differs from the DDL (the core bug) and a row that omits f3
doStreamLoad "test_struct_field_align_swapped.json"

sql "sync"

def rows = sql "SELECT id, c_struct FROM ${tableName} ORDER BY id"
def actual = [:]
for (row in rows) {
actual[row[0] as int] = (row[1] == null ? "NULL" : row[1].toString())
}

def expected = [
1 : '{"f1":10, "f2":3.14, "f3":"Emily"}',
2 : '{"f1":4, "f2":1.5, "f3":null}',
3 : '{"f1":7, "f2":null, "f3":"Benjamin"}',
4 : '{"f1":null, "f2":null, "f3":null}',
5 : 'NULL',
// swapped JSON keys must align by name instead of producing NULL
20 : '{"f1":4, "f2":1.5, "f3":null}',
21 : '{"f1":9, "f2":2.5, "f3":"Tom"}',
// f3 omitted -> filled with NULL
22 : '{"f1":1, "f2":8.5, "f3":null}',
// an unknown field (f4) is ignored, the matched fields still align by name
23 : '{"f1":7, "f2":3.5, "f3":"Z"}',
// upper-case JSON keys are matched case-insensitively to the lower-cased schema names
24 : '{"f1":5, "f2":6.5, "f3":"U"}',
]

assertEquals(expected.size(), actual.size())
for (e in expected) {
assertEquals(e.value, actual[e.key], "row id=${e.key} mismatch".toString())
}
}
Loading