Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions cpp/src/arrow/array/builder_dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class DictionaryBuilderBase : public ArrayBuilder {
value_type_(value_type) {}

template <typename T1 = T>
DictionaryBuilderBase(
explicit DictionaryBuilderBase(
enable_if_t<!is_fixed_size_binary_type<T1>::value, const std::shared_ptr<DataType>&>
value_type,
MemoryPool* pool = default_memory_pool())
Expand Down Expand Up @@ -176,8 +176,8 @@ class DictionaryBuilderBase : public ArrayBuilder {
: DictionaryBuilderBase<BuilderType, T1>(TypeTraits<T1>::type_singleton(), pool) {}

// This constructor doesn't check for errors. Use InsertMemoValues instead.
DictionaryBuilderBase(const std::shared_ptr<Array>& dictionary,
MemoryPool* pool = default_memory_pool())
explicit DictionaryBuilderBase(const std::shared_ptr<Array>& dictionary,
MemoryPool* pool = default_memory_pool())
: ArrayBuilder(pool),
memo_table_(new internal::DictionaryMemoTable(pool, dictionary)),
delta_offset_(0),
Expand Down Expand Up @@ -404,8 +404,8 @@ class DictionaryBuilderBase<BuilderType, NullType> : public ArrayBuilder {
MemoryPool* pool = default_memory_pool())
: ArrayBuilder(pool), indices_builder_(start_int_size, pool) {}

DictionaryBuilderBase(const std::shared_ptr<DataType>& value_type,
MemoryPool* pool = default_memory_pool())
explicit DictionaryBuilderBase(const std::shared_ptr<DataType>& value_type,
MemoryPool* pool = default_memory_pool())
: ArrayBuilder(pool), indices_builder_(pool) {}

template <typename B = BuilderType>
Expand All @@ -418,8 +418,8 @@ class DictionaryBuilderBase<BuilderType, NullType> : public ArrayBuilder {
explicit DictionaryBuilderBase(MemoryPool* pool = default_memory_pool())
: ArrayBuilder(pool), indices_builder_(pool) {}

DictionaryBuilderBase(const std::shared_ptr<Array>& dictionary,
MemoryPool* pool = default_memory_pool())
explicit DictionaryBuilderBase(const std::shared_ptr<Array>& dictionary,
MemoryPool* pool = default_memory_pool())
: ArrayBuilder(pool), indices_builder_(pool) {}

/// \brief Append a scalar null value
Expand Down
11 changes: 4 additions & 7 deletions cpp/src/arrow/dataset/dataset_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -506,13 +506,10 @@ TEST_F(TestEndToEnd, EndToEndSingleDataset) {
// In the simplest case, consumption is simply conversion to a Table.
ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable());

using row_type = std::tuple<double, std::string, util::optional<std::string>>;
std::vector<row_type> rows{
row_type{152.25, "3", "CA"},
row_type{273.5, "3", "US"},
};
std::shared_ptr<Table> expected;
ASSERT_OK(stl::TableFromTupleRange(default_memory_pool(), rows, columns, &expected));
auto expected = TableFromJSON(scanner->schema(), {R"([
{"sales": 152.25, "model": "3", "country": "CA"},
{"sales": 273.5, "model": "3", "country": "US"}
])"});
AssertTablesEqual(*expected, *table, false, true);
}

Expand Down
6 changes: 0 additions & 6 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,6 @@ Status FileSystemDataset::Write(std::shared_ptr<Schema> schema,

base_dir = std::string(fs::internal::RemoveTrailingSlash(base_dir));

for (const auto& f : partitioning->schema()->fields()) {
if (f->type()->id() == Type::DICTIONARY) {
return Status::NotImplemented("writing with dictionary partitions");
}
}

int i = 0;
for (auto maybe_fragment : fragment_it) {
ARROW_ASSIGN_OR_RAISE(auto fragment, maybe_fragment);
Expand Down
11 changes: 7 additions & 4 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -776,14 +776,17 @@ Result<std::vector<std::shared_ptr<Schema>>> ParquetDatasetFactory::InspectSchem
ARROW_ASSIGN_OR_RAISE(auto physical_schema, GetSchema(*metadata_, properties));
schemas.push_back(std::move(physical_schema));

if (options_.partitioning.factory() != nullptr) {
if (auto factory = options_.partitioning.factory()) {
// Gather paths found in RowGroups' ColumnChunks.
ARROW_ASSIGN_OR_RAISE(auto paths, CollectPaths(*metadata_, properties));

ARROW_ASSIGN_OR_RAISE(auto partition_schema,
options_.partitioning.GetOrInferSchema(StripPrefixAndFilename(
paths, options_.partition_base_dir)));
ARROW_ASSIGN_OR_RAISE(
auto partition_schema,
factory->Inspect(StripPrefixAndFilename(paths, options_.partition_base_dir)));

schemas.push_back(std::move(partition_schema));
} else {
schemas.push_back(std::move(options_.partitioning.partitioning()->schema()));
}

return schemas;
Expand Down
16 changes: 9 additions & 7 deletions cpp/src/arrow/dataset/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1617,15 +1617,17 @@ class StructDictionary {
}

private:
Status AddOne(const std::shared_ptr<Array>& column,
std::shared_ptr<Int32Array>* fused_indices) {
ARROW_ASSIGN_OR_RAISE(Datum encoded, compute::DictionaryEncode(column));
ArrayData* encoded_array = encoded.mutable_array();
Status AddOne(Datum column, std::shared_ptr<Int32Array>* fused_indices) {
ArrayData* encoded;
if (column.type()->id() != Type::DICTIONARY) {
ARROW_ASSIGN_OR_RAISE(column, compute::DictionaryEncode(column));
}
encoded = column.mutable_array();

auto indices = std::make_shared<Int32Array>(encoded_array->length,
std::move(encoded_array->buffers[1]));
auto indices =
std::make_shared<Int32Array>(encoded->length, std::move(encoded->buffers[1]));

dictionaries_.push_back(MakeArray(std::move(encoded_array->dictionary)));
dictionaries_.push_back(MakeArray(std::move(encoded->dictionary)));
auto dictionary_size = static_cast<int32_t>(dictionaries_.back()->length());

if (*fused_indices == nullptr) {
Expand Down
142 changes: 69 additions & 73 deletions cpp/src/arrow/dataset/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "arrow/array/array_base.h"
#include "arrow/array/array_nested.h"
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_dict.h"
#include "arrow/compute/api_scalar.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/file_base.h"
Expand All @@ -39,6 +40,7 @@
#include "arrow/scalar.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
#include "arrow/util/make_unique.h"
#include "arrow/util/range.h"
#include "arrow/util/sort.h"
#include "arrow/util/string_view.h"
Expand Down Expand Up @@ -306,75 +308,61 @@ Result<std::string> DirectoryPartitioning::FormatValues(
return fs::internal::JoinAbstractPath(std::move(segments));
}

class KeyValuePartitioningInspectImpl {
public:
explicit KeyValuePartitioningInspectImpl(const PartitioningFactoryOptions& options)
class KeyValuePartitioningFactory : public PartitioningFactory {
protected:
explicit KeyValuePartitioningFactory(PartitioningFactoryOptions options)
: options_(options) {}

static Result<std::shared_ptr<DataType>> InferType(const std::string& name,
const std::set<std::string>& reprs,
int max_partition_dictionary_size) {
if (reprs.empty()) {
return Status::Invalid("No segments were available for field '", name,
"'; couldn't infer type");
}

bool all_integral = std::all_of(reprs.begin(), reprs.end(), [](string_view repr) {
// TODO(bkietz) use ParseUnsigned or so
return repr.find_first_not_of("0123456789") == string_view::npos;
});

if (all_integral) {
return int32();
}
int GetOrInsertField(const std::string& name) {
auto it_inserted =
name_to_index_.emplace(name, static_cast<int>(name_to_index_.size()));

if (reprs.size() > static_cast<size_t>(max_partition_dictionary_size)) {
return utf8();
if (it_inserted.second) {
repr_memos_.push_back(MakeMemo());
}

return dictionary(int32(), utf8());
return it_inserted.first->second;
}

int GetOrInsertField(const std::string& name) {
auto name_index =
name_to_index_.emplace(name, static_cast<int>(name_to_index_.size())).first;

if (static_cast<size_t>(name_index->second) >= values_.size()) {
values_.resize(name_index->second + 1);
}
return name_index->second;
Status InsertRepr(const std::string& name, util::string_view repr) {
return InsertRepr(GetOrInsertField(name), repr);
}

void InsertRepr(const std::string& name, std::string repr) {
InsertRepr(GetOrInsertField(name), std::move(repr));
Status InsertRepr(int index, util::string_view repr) {
int dummy;
return repr_memos_[index]->GetOrInsert<StringType>(repr, &dummy);
}

void InsertRepr(int index, std::string repr) { values_[index].insert(std::move(repr)); }

Result<std::shared_ptr<Schema>> Finish(ArrayVector* dictionaries) {
dictionaries->clear();

if (options_.max_partition_dictionary_size != 0) {
dictionaries->resize(name_to_index_.size());
}
Result<std::shared_ptr<Schema>> DoInpsect() {
dictionaries_.assign(name_to_index_.size(), nullptr);

std::vector<std::shared_ptr<Field>> fields(name_to_index_.size());

for (const auto& name_index : name_to_index_) {
const auto& name = name_index.first;
auto index = name_index.second;
ARROW_ASSIGN_OR_RAISE(auto type, InferType(name, values_[index],
options_.max_partition_dictionary_size));
if (type->id() == Type::DICTIONARY) {
StringBuilder builder;
for (const auto& repr : values_[index]) {
RETURN_NOT_OK(builder.Append(repr));
}
RETURN_NOT_OK(builder.Finish(&dictionaries->at(index)));

std::shared_ptr<ArrayData> reprs;
RETURN_NOT_OK(repr_memos_[index]->GetArrayData(0, &reprs));

if (reprs->length == 0) {
return Status::Invalid("No segments were available for field '", name,
"'; couldn't infer type");
}

// try casting to int32, otherwise bail and just use the string reprs
auto dict = compute::Cast(reprs, int32()).ValueOr(reprs).make_array();
auto type = dict->type();
if (options_.infer_dictionary) {
// wrap the inferred type in dictionary()
type = dictionary(int32(), std::move(type));
}

fields[index] = field(name, std::move(type));
dictionaries_[index] = std::move(dict);
}

Reset();
return ::arrow::schema(std::move(fields));
}

Expand All @@ -387,38 +375,44 @@ class KeyValuePartitioningInspectImpl {
return names;
}

private:
virtual void Reset() {
name_to_index_.clear();
repr_memos_.clear();
}

std::unique_ptr<internal::DictionaryMemoTable> MakeMemo() {
return internal::make_unique<internal::DictionaryMemoTable>(default_memory_pool(),
utf8());
}

PartitioningFactoryOptions options_;
ArrayVector dictionaries_;
std::unordered_map<std::string, int> name_to_index_;
std::vector<std::set<std::string>> values_;
const PartitioningFactoryOptions& options_;
std::vector<std::unique_ptr<internal::DictionaryMemoTable>> repr_memos_;
};

class DirectoryPartitioningFactory : public PartitioningFactory {
class DirectoryPartitioningFactory : public KeyValuePartitioningFactory {
public:
DirectoryPartitioningFactory(std::vector<std::string> field_names,
PartitioningFactoryOptions options)
: field_names_(std::move(field_names)), options_(options) {}
: KeyValuePartitioningFactory(options), field_names_(std::move(field_names)) {
Reset();
}

std::string type_name() const override { return "schema"; }

Result<std::shared_ptr<Schema>> Inspect(
const std::vector<std::string>& paths) override {
KeyValuePartitioningInspectImpl impl(options_);

for (const auto& name : field_names_) {
impl.GetOrInsertField(name);
}

for (auto path : paths) {
size_t field_index = 0;
for (auto&& segment : fs::internal::SplitAbstractPath(path)) {
if (field_index == field_names_.size()) break;

impl.InsertRepr(static_cast<int>(field_index++), std::move(segment));
RETURN_NOT_OK(InsertRepr(static_cast<int>(field_index++), segment));
}
}

return impl.Finish(&dictionaries_);
return DoInpsect();
}

Result<std::shared_ptr<Partitioning>> Finish(
Expand All @@ -435,9 +429,15 @@ class DirectoryPartitioningFactory : public PartitioningFactory {
}

private:
void Reset() override {
KeyValuePartitioningFactory::Reset();

for (const auto& name : field_names_) {
GetOrInsertField(name);
}
}

std::vector<std::string> field_names_;
ArrayVector dictionaries_;
PartitioningFactoryOptions options_;
};

std::shared_ptr<PartitioningFactory> DirectoryPartitioning::MakeFactory(
Expand Down Expand Up @@ -490,27 +490,25 @@ Result<std::string> HivePartitioning::FormatValues(
return fs::internal::JoinAbstractPath(std::move(segments));
}

class HivePartitioningFactory : public PartitioningFactory {
class HivePartitioningFactory : public KeyValuePartitioningFactory {
public:
explicit HivePartitioningFactory(PartitioningFactoryOptions options)
: options_(options) {}
: KeyValuePartitioningFactory(options) {}

std::string type_name() const override { return "hive"; }

Result<std::shared_ptr<Schema>> Inspect(
const std::vector<std::string>& paths) override {
KeyValuePartitioningInspectImpl impl(options_);

for (auto path : paths) {
for (auto&& segment : fs::internal::SplitAbstractPath(path)) {
if (auto key = HivePartitioning::ParseKey(segment)) {
impl.InsertRepr(key->name, key->value);
RETURN_NOT_OK(InsertRepr(key->name, key->value));
}
}
}

field_names_ = impl.FieldNames();
return impl.Finish(&dictionaries_);
field_names_ = FieldNames();
return DoInpsect();
}

Result<std::shared_ptr<Partitioning>> Finish(
Expand All @@ -520,7 +518,7 @@ class HivePartitioningFactory : public PartitioningFactory {
} else {
for (FieldRef ref : field_names_) {
// ensure all of field_names_ are present in schema
RETURN_NOT_OK(ref.FindOne(*schema).status());
RETURN_NOT_OK(ref.FindOne(*schema));
}

// drop fields which aren't in field_names_
Expand All @@ -532,8 +530,6 @@ class HivePartitioningFactory : public PartitioningFactory {

private:
std::vector<std::string> field_names_;
ArrayVector dictionaries_;
PartitioningFactoryOptions options_;
};

std::shared_ptr<PartitioningFactory> HivePartitioning::MakeFactory(
Expand Down
13 changes: 5 additions & 8 deletions cpp/src/arrow/dataset/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,11 @@ class ARROW_DS_EXPORT Partitioning {
};

struct PartitioningFactoryOptions {
/// When inferring a schema for partition fields, string fields may be inferred as
/// a dictionary type instead. This can be more efficient when materializing virtual
/// columns. If the number of discovered unique values of a string field exceeds
/// max_partition_dictionary_size, it will instead be inferred as a string.
///
/// max_partition_dictionary_size = 0: No fields will be inferred as dictionary.
/// max_partition_dictionary_size = -1: All fields will be inferred as dictionary.
int max_partition_dictionary_size = 0;
/// When inferring a schema for partition fields, yield dictionary encoded types
/// instead of plain. This can be more efficient when materializing virtual
/// columns, and Expressions parsed by the finished Partitioning will include
/// dictionaries of all unique inspected values for each field.
bool infer_dictionary = false;
};

/// \brief PartitioningFactory provides creation of a partitioning when the
Expand Down
Loading