Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 63 additions & 44 deletions cpp/src/arrow/csv/converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,31 +105,45 @@ Status PresizeBuilder(const BlockParser& parser, BuilderType* builder) {
}
}

/////////////////////////////////////////////////////////////////////////
// Shared Tries cache to avoid rebuilding them for each decoder instance

struct TrieCache {
Trie null_trie;
Trie true_trie;
Trie false_trie;

static Result<std::shared_ptr<TrieCache>> Make(const ConvertOptions& options) {
auto cache = std::make_shared<TrieCache>();
RETURN_NOT_OK(InitializeTrie(options.null_values, &cache->null_trie));
RETURN_NOT_OK(InitializeTrie(options.true_values, &cache->true_trie));
RETURN_NOT_OK(InitializeTrie(options.false_values, &cache->false_trie));
return cache;
}
};

/////////////////////////////////////////////////////////////////////////
// Per-type value decoders

struct ValueDecoder {
explicit ValueDecoder(const std::shared_ptr<DataType>& type,
const ConvertOptions& options)
: type_(type), options_(options) {}
const ConvertOptions& options, const TrieCache* trie_cache)
: type_(type), options_(options), trie_cache_(trie_cache) {}

Status Initialize() {
// TODO no need to build a separate Trie for each instance
return InitializeTrie(options_.null_values, &null_trie_);
}
Status Initialize() { return Status::OK(); }

bool IsNull(const uint8_t* data, uint32_t size, bool quoted) {
if (quoted && !options_.quoted_strings_can_be_null) {
return false;
}
return null_trie_.Find(std::string_view(reinterpret_cast<const char*>(data), size)) >=
0;
return trie_cache_->null_trie.Find(
std::string_view(reinterpret_cast<const char*>(data), size)) >= 0;
}

protected:
Trie null_trie_;
const std::shared_ptr<DataType> type_;
const ConvertOptions& options_;
const TrieCache* trie_cache_;
};

//
Expand All @@ -140,8 +154,9 @@ struct FixedSizeBinaryValueDecoder : public ValueDecoder {
using value_type = const uint8_t*;

explicit FixedSizeBinaryValueDecoder(const std::shared_ptr<DataType>& type,
const ConvertOptions& options)
: ValueDecoder(type, options),
const ConvertOptions& options,
const TrieCache* trie_cache)
: ValueDecoder(type, options, trie_cache),
byte_width_(checked_cast<const FixedSizeBinaryType&>(*type).byte_width()) {}

Status Decode(const uint8_t* data, uint32_t size, bool quoted, value_type* out) {
Expand Down Expand Up @@ -207,8 +222,8 @@ struct NumericValueDecoder : public ValueDecoder {
using value_type = typename T::c_type;

NumericValueDecoder(const std::shared_ptr<DataType>& type,
const ConvertOptions& options)
: ValueDecoder(type, options),
const ConvertOptions& options, const TrieCache* trie_cache)
: ValueDecoder(type, options, trie_cache),
concrete_type_(checked_cast<const T&>(*type)),
string_converter_(MakeStringConverter<T>(options)) {}

Expand Down Expand Up @@ -236,31 +251,20 @@ struct BooleanValueDecoder : public ValueDecoder {

using ValueDecoder::ValueDecoder;

Status Initialize() {
// TODO no need to build separate Tries for each instance
RETURN_NOT_OK(InitializeTrie(options_.true_values, &true_trie_));
RETURN_NOT_OK(InitializeTrie(options_.false_values, &false_trie_));
return ValueDecoder::Initialize();
}

Status Decode(const uint8_t* data, uint32_t size, bool quoted, value_type* out) {
// XXX should quoted values be allowed at all?
if (false_trie_.Find(std::string_view(reinterpret_cast<const char*>(data), size)) >=
0) {
if (trie_cache_->false_trie.Find(
std::string_view(reinterpret_cast<const char*>(data), size)) >= 0) {
*out = false;
return Status::OK();
}
if (ARROW_PREDICT_TRUE(true_trie_.Find(std::string_view(
if (ARROW_PREDICT_TRUE(trie_cache_->true_trie.Find(std::string_view(
reinterpret_cast<const char*>(data), size)) >= 0)) {
*out = true;
return Status::OK();
}
return GenericConversionError(type_, data, size);
}

protected:
Trie true_trie_;
Trie false_trie_;
};

//
Expand All @@ -271,8 +275,8 @@ struct DecimalValueDecoder : public ValueDecoder {
using value_type = Decimal128;

explicit DecimalValueDecoder(const std::shared_ptr<DataType>& type,
const ConvertOptions& options)
: ValueDecoder(type, options),
const ConvertOptions& options, const TrieCache* trie_cache)
: ValueDecoder(type, options, trie_cache),
decimal_type_(internal::checked_cast<const DecimalType&>(*type_)),
type_precision_(decimal_type_.precision()),
type_scale_(decimal_type_.scale()) {}
Expand Down Expand Up @@ -310,8 +314,10 @@ struct CustomDecimalPointValueDecoder : public ValueDecoder {
using value_type = typename WrappedDecoder::value_type;

explicit CustomDecimalPointValueDecoder(const std::shared_ptr<DataType>& type,
const ConvertOptions& options)
: ValueDecoder(type, options), wrapped_decoder_(type, options) {}
const ConvertOptions& options,
const TrieCache* trie_cache)
: ValueDecoder(type, options, trie_cache),
wrapped_decoder_(type, options, trie_cache) {}

Status Initialize() {
RETURN_NOT_OK(wrapped_decoder_.Initialize());
Expand All @@ -321,7 +327,7 @@ struct CustomDecimalPointValueDecoder : public ValueDecoder {
mapping_[options_.decimal_point] = '.';
mapping_['.'] = options_.decimal_point; // error out on standard decimal point
temp_.resize(30);
return Status::OK();
return ValueDecoder::Initialize();
}

Status Decode(const uint8_t* data, uint32_t size, bool quoted, value_type* out) {
Expand Down Expand Up @@ -357,8 +363,9 @@ struct InlineISO8601ValueDecoder : public ValueDecoder {
using value_type = int64_t;

explicit InlineISO8601ValueDecoder(const std::shared_ptr<DataType>& type,
const ConvertOptions& options)
: ValueDecoder(type, options),
const ConvertOptions& options,
const TrieCache* trie_cache)
: ValueDecoder(type, options, trie_cache),
unit_(checked_cast<const TimestampType&>(*type_).unit()),
expect_timezone_(!checked_cast<const TimestampType&>(*type_).timezone().empty()) {
}
Expand Down Expand Up @@ -396,8 +403,9 @@ struct SingleParserTimestampValueDecoder : public ValueDecoder {
using value_type = int64_t;

explicit SingleParserTimestampValueDecoder(const std::shared_ptr<DataType>& type,
const ConvertOptions& options)
: ValueDecoder(type, options),
const ConvertOptions& options,
const TrieCache* trie_cache)
: ValueDecoder(type, options, trie_cache),
unit_(checked_cast<const TimestampType&>(*type_).unit()),
expect_timezone_(!checked_cast<const TimestampType&>(*type_).timezone().empty()),
parser_(*options_.timestamp_parsers[0]) {}
Expand Down Expand Up @@ -436,8 +444,9 @@ struct MultipleParsersTimestampValueDecoder : public ValueDecoder {
using value_type = int64_t;

explicit MultipleParsersTimestampValueDecoder(const std::shared_ptr<DataType>& type,
const ConvertOptions& options)
: ValueDecoder(type, options),
const ConvertOptions& options,
const TrieCache* trie_cache)
: ValueDecoder(type, options, trie_cache),
unit_(checked_cast<const TimestampType&>(*type_).unit()),
expect_timezone_(!checked_cast<const TimestampType&>(*type_).timezone().empty()),
parsers_(GetParsers(options_)) {}
Expand Down Expand Up @@ -477,8 +486,9 @@ struct DurationValueDecoder : public ValueDecoder {
using value_type = int64_t;

explicit DurationValueDecoder(const std::shared_ptr<DataType>& type,
const ConvertOptions& options)
: ValueDecoder(type, options),
const ConvertOptions& options,
const TrieCache* trie_cache)
: ValueDecoder(type, options, trie_cache),
concrete_type_(checked_cast<const DurationType&>(*type)),
string_converter_() {}

Expand Down Expand Up @@ -517,7 +527,8 @@ class NullConverter : public ConcreteConverter {
public:
NullConverter(const std::shared_ptr<DataType>& type, const ConvertOptions& options,
MemoryPool* pool)
: ConcreteConverter(type, options, pool), decoder_(type_, options_) {}
: ConcreteConverter(type, options, pool),
decoder_(type_, options_, static_cast<const TrieCache*>(trie_cache_.get())) {}

Result<std::shared_ptr<Array>> Convert(const BlockParser& parser,
int32_t col_index) override {
Expand Down Expand Up @@ -551,7 +562,8 @@ class PrimitiveConverter : public ConcreteConverter {
public:
PrimitiveConverter(const std::shared_ptr<DataType>& type, const ConvertOptions& options,
MemoryPool* pool)
: ConcreteConverter(type, options, pool), decoder_(type_, options_) {}
: ConcreteConverter(type, options, pool),
decoder_(type_, options_, static_cast<const TrieCache*>(trie_cache_.get())) {}

Result<std::shared_ptr<Array>> Convert(const BlockParser& parser,
int32_t col_index) override {
Expand Down Expand Up @@ -593,7 +605,8 @@ class TypedDictionaryConverter : public ConcreteDictionaryConverter {
TypedDictionaryConverter(const std::shared_ptr<DataType>& value_type,
const ConvertOptions& options, MemoryPool* pool)
: ConcreteDictionaryConverter(value_type, options, pool),
decoder_(value_type, options_) {}
decoder_(value_type, options_, static_cast<const TrieCache*>(trie_cache_.get())) {
}

Result<std::shared_ptr<Array>> Convert(const BlockParser& parser,
int32_t col_index) override {
Expand Down Expand Up @@ -684,7 +697,13 @@ std::shared_ptr<ConverterType> MakeRealConverter(const std::shared_ptr<DataType>

Converter::Converter(const std::shared_ptr<DataType>& type, const ConvertOptions& options,
MemoryPool* pool)
: options_(options), pool_(pool), type_(type) {}
: options_(options), pool_(pool), type_(type) {
// Build shared Trie cache (errors handled in Initialize())
auto maybe_cache = TrieCache::Make(options);
if (maybe_cache.ok()) {
trie_cache_ = std::static_pointer_cast<void>(*std::move(maybe_cache));
}
}

DictionaryConverter::DictionaryConverter(const std::shared_ptr<DataType>& value_type,
const ConvertOptions& options, MemoryPool* pool)
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/csv/converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class ARROW_EXPORT Converter {
const ConvertOptions& options_;
MemoryPool* pool_;
std::shared_ptr<DataType> type_;
// Opaque TrieCache pointer. TrieCache destructor is called via control block.
// https://en.cppreference.com/w/cpp/memory/shared_ptr
std::shared_ptr<void> trie_cache_;
};

class ARROW_EXPORT DictionaryConverter : public Converter {
Expand Down
Loading