diff --git a/cpp/src/arrow/array/builder_dict.h b/cpp/src/arrow/array/builder_dict.h index c5db0d157c9..7f920bb8cd6 100644 --- a/cpp/src/arrow/array/builder_dict.h +++ b/cpp/src/arrow/array/builder_dict.h @@ -134,7 +134,7 @@ class DictionaryBuilderBase : public ArrayBuilder { value_type_(value_type) {} template - DictionaryBuilderBase( + explicit DictionaryBuilderBase( enable_if_t::value, const std::shared_ptr&> value_type, MemoryPool* pool = default_memory_pool()) @@ -176,8 +176,8 @@ class DictionaryBuilderBase : public ArrayBuilder { : DictionaryBuilderBase(TypeTraits::type_singleton(), pool) {} // This constructor doesn't check for errors. Use InsertMemoValues instead. - DictionaryBuilderBase(const std::shared_ptr& dictionary, - MemoryPool* pool = default_memory_pool()) + explicit DictionaryBuilderBase(const std::shared_ptr& dictionary, + MemoryPool* pool = default_memory_pool()) : ArrayBuilder(pool), memo_table_(new internal::DictionaryMemoTable(pool, dictionary)), delta_offset_(0), @@ -404,8 +404,8 @@ class DictionaryBuilderBase : public ArrayBuilder { MemoryPool* pool = default_memory_pool()) : ArrayBuilder(pool), indices_builder_(start_int_size, pool) {} - DictionaryBuilderBase(const std::shared_ptr& value_type, - MemoryPool* pool = default_memory_pool()) + explicit DictionaryBuilderBase(const std::shared_ptr& value_type, + MemoryPool* pool = default_memory_pool()) : ArrayBuilder(pool), indices_builder_(pool) {} template @@ -418,8 +418,8 @@ class DictionaryBuilderBase : public ArrayBuilder { explicit DictionaryBuilderBase(MemoryPool* pool = default_memory_pool()) : ArrayBuilder(pool), indices_builder_(pool) {} - DictionaryBuilderBase(const std::shared_ptr& dictionary, - MemoryPool* pool = default_memory_pool()) + explicit DictionaryBuilderBase(const std::shared_ptr& dictionary, + MemoryPool* pool = default_memory_pool()) : ArrayBuilder(pool), indices_builder_(pool) {} /// \brief Append a scalar null value diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc index 7a378cd9210..8121ae78fdc 100644 --- a/cpp/src/arrow/dataset/dataset_test.cc +++ b/cpp/src/arrow/dataset/dataset_test.cc @@ -506,13 +506,10 @@ TEST_F(TestEndToEnd, EndToEndSingleDataset) { // In the simplest case, consumption is simply conversion to a Table. ASSERT_OK_AND_ASSIGN(auto table, scanner->ToTable()); - using row_type = std::tuple>; - std::vector rows{ - row_type{152.25, "3", "CA"}, - row_type{273.5, "3", "US"}, - }; - std::shared_ptr expected; - ASSERT_OK(stl::TableFromTupleRange(default_memory_pool(), rows, columns, &expected)); + auto expected = TableFromJSON(scanner->schema(), {R"([ + {"sales": 152.25, "model": "3", "country": "CA"}, + {"sales": 273.5, "model": "3", "country": "US"} + ])"}); AssertTablesEqual(*expected, *table, false, true); } diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index ee8a3ce1869..1e47b15ee43 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -210,12 +210,6 @@ Status FileSystemDataset::Write(std::shared_ptr schema, base_dir = std::string(fs::internal::RemoveTrailingSlash(base_dir)); - for (const auto& f : partitioning->schema()->fields()) { - if (f->type()->id() == Type::DICTIONARY) { - return Status::NotImplemented("writing with dictionary partitions"); - } - } - int i = 0; for (auto maybe_fragment : fragment_it) { ARROW_ASSIGN_OR_RAISE(auto fragment, maybe_fragment); diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 33f74637ee7..7112260a580 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -776,14 +776,17 @@ Result>> ParquetDatasetFactory::InspectSchem ARROW_ASSIGN_OR_RAISE(auto physical_schema, GetSchema(*metadata_, properties)); schemas.push_back(std::move(physical_schema)); - if (options_.partitioning.factory() != nullptr) { + if (auto factory = options_.partitioning.factory()) { // Gather paths found in RowGroups' ColumnChunks. ARROW_ASSIGN_OR_RAISE(auto paths, CollectPaths(*metadata_, properties)); - ARROW_ASSIGN_OR_RAISE(auto partition_schema, - options_.partitioning.GetOrInferSchema(StripPrefixAndFilename( - paths, options_.partition_base_dir))); + ARROW_ASSIGN_OR_RAISE( + auto partition_schema, + factory->Inspect(StripPrefixAndFilename(paths, options_.partition_base_dir))); + schemas.push_back(std::move(partition_schema)); + } else { + schemas.push_back(std::move(options_.partitioning.partitioning()->schema())); } return schemas; diff --git a/cpp/src/arrow/dataset/filter.cc b/cpp/src/arrow/dataset/filter.cc index 968d55c4318..ea7056c7ab1 100644 --- a/cpp/src/arrow/dataset/filter.cc +++ b/cpp/src/arrow/dataset/filter.cc @@ -1617,15 +1617,17 @@ class StructDictionary { } private: - Status AddOne(const std::shared_ptr& column, - std::shared_ptr* fused_indices) { - ARROW_ASSIGN_OR_RAISE(Datum encoded, compute::DictionaryEncode(column)); - ArrayData* encoded_array = encoded.mutable_array(); + Status AddOne(Datum column, std::shared_ptr* fused_indices) { + ArrayData* encoded; + if (column.type()->id() != Type::DICTIONARY) { + ARROW_ASSIGN_OR_RAISE(column, compute::DictionaryEncode(column)); + } + encoded = column.mutable_array(); - auto indices = std::make_shared(encoded_array->length, - std::move(encoded_array->buffers[1])); + auto indices = + std::make_shared(encoded->length, std::move(encoded->buffers[1])); - dictionaries_.push_back(MakeArray(std::move(encoded_array->dictionary))); + dictionaries_.push_back(MakeArray(std::move(encoded->dictionary))); auto dictionary_size = static_cast(dictionaries_.back()->length()); if (*fused_indices == nullptr) { diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index fbc33869d9c..a47ba849cfc 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -28,6 +28,7 @@ #include "arrow/array/array_base.h" #include "arrow/array/array_nested.h" #include "arrow/array/builder_binary.h" +#include "arrow/array/builder_dict.h" #include "arrow/compute/api_scalar.h" #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/file_base.h" @@ -39,6 +40,7 @@ #include "arrow/scalar.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" +#include "arrow/util/make_unique.h" #include "arrow/util/range.h" #include "arrow/util/sort.h" #include "arrow/util/string_view.h" @@ -306,75 +308,61 @@ Result DirectoryPartitioning::FormatValues( return fs::internal::JoinAbstractPath(std::move(segments)); } -class KeyValuePartitioningInspectImpl { - public: - explicit KeyValuePartitioningInspectImpl(const PartitioningFactoryOptions& options) +class KeyValuePartitioningFactory : public PartitioningFactory { + protected: + explicit KeyValuePartitioningFactory(PartitioningFactoryOptions options) : options_(options) {} - static Result> InferType(const std::string& name, - const std::set& reprs, - int max_partition_dictionary_size) { - if (reprs.empty()) { - return Status::Invalid("No segments were available for field '", name, - "'; couldn't infer type"); - } - - bool all_integral = std::all_of(reprs.begin(), reprs.end(), [](string_view repr) { - // TODO(bkietz) use ParseUnsigned or so - return repr.find_first_not_of("0123456789") == string_view::npos; - }); - - if (all_integral) { - return int32(); - } + int GetOrInsertField(const std::string& name) { + auto it_inserted = + name_to_index_.emplace(name, static_cast(name_to_index_.size())); - if (reprs.size() > static_cast(max_partition_dictionary_size)) { - return utf8(); + if (it_inserted.second) { + repr_memos_.push_back(MakeMemo()); } - return dictionary(int32(), utf8()); + return it_inserted.first->second; } - int GetOrInsertField(const std::string& name) { - auto name_index = - name_to_index_.emplace(name, static_cast(name_to_index_.size())).first; - - if (static_cast(name_index->second) >= values_.size()) { - values_.resize(name_index->second + 1); - } - return name_index->second; + Status InsertRepr(const std::string& name, util::string_view repr) { + return InsertRepr(GetOrInsertField(name), repr); } - void InsertRepr(const std::string& name, std::string repr) { - InsertRepr(GetOrInsertField(name), std::move(repr)); + Status InsertRepr(int index, util::string_view repr) { + int dummy; + return repr_memos_[index]->GetOrInsert(repr, &dummy); } - void InsertRepr(int index, std::string repr) { values_[index].insert(std::move(repr)); } - - Result> Finish(ArrayVector* dictionaries) { - dictionaries->clear(); - - if (options_.max_partition_dictionary_size != 0) { - dictionaries->resize(name_to_index_.size()); - } + Result> DoInpsect() { + dictionaries_.assign(name_to_index_.size(), nullptr); std::vector> fields(name_to_index_.size()); for (const auto& name_index : name_to_index_) { const auto& name = name_index.first; auto index = name_index.second; - ARROW_ASSIGN_OR_RAISE(auto type, InferType(name, values_[index], - options_.max_partition_dictionary_size)); - if (type->id() == Type::DICTIONARY) { - StringBuilder builder; - for (const auto& repr : values_[index]) { - RETURN_NOT_OK(builder.Append(repr)); - } - RETURN_NOT_OK(builder.Finish(&dictionaries->at(index))); + + std::shared_ptr reprs; + RETURN_NOT_OK(repr_memos_[index]->GetArrayData(0, &reprs)); + + if (reprs->length == 0) { + return Status::Invalid("No segments were available for field '", name, + "'; couldn't infer type"); + } + + // try casting to int32, otherwise bail and just use the string reprs + auto dict = compute::Cast(reprs, int32()).ValueOr(reprs).make_array(); + auto type = dict->type(); + if (options_.infer_dictionary) { + // wrap the inferred type in dictionary() + type = dictionary(int32(), std::move(type)); } + fields[index] = field(name, std::move(type)); + dictionaries_[index] = std::move(dict); } + Reset(); return ::arrow::schema(std::move(fields)); } @@ -387,38 +375,44 @@ class KeyValuePartitioningInspectImpl { return names; } - private: + virtual void Reset() { + name_to_index_.clear(); + repr_memos_.clear(); + } + + std::unique_ptr MakeMemo() { + return internal::make_unique(default_memory_pool(), + utf8()); + } + + PartitioningFactoryOptions options_; + ArrayVector dictionaries_; std::unordered_map name_to_index_; - std::vector> values_; - const PartitioningFactoryOptions& options_; + std::vector> repr_memos_; }; -class DirectoryPartitioningFactory : public PartitioningFactory { +class DirectoryPartitioningFactory : public KeyValuePartitioningFactory { public: DirectoryPartitioningFactory(std::vector field_names, PartitioningFactoryOptions options) - : field_names_(std::move(field_names)), options_(options) {} + : KeyValuePartitioningFactory(options), field_names_(std::move(field_names)) { + Reset(); + } std::string type_name() const override { return "schema"; } Result> Inspect( const std::vector& paths) override { - KeyValuePartitioningInspectImpl impl(options_); - - for (const auto& name : field_names_) { - impl.GetOrInsertField(name); - } - for (auto path : paths) { size_t field_index = 0; for (auto&& segment : fs::internal::SplitAbstractPath(path)) { if (field_index == field_names_.size()) break; - impl.InsertRepr(static_cast(field_index++), std::move(segment)); + RETURN_NOT_OK(InsertRepr(static_cast(field_index++), segment)); } } - return impl.Finish(&dictionaries_); + return DoInpsect(); } Result> Finish( @@ -435,9 +429,15 @@ class DirectoryPartitioningFactory : public PartitioningFactory { } private: + void Reset() override { + KeyValuePartitioningFactory::Reset(); + + for (const auto& name : field_names_) { + GetOrInsertField(name); + } + } + std::vector field_names_; - ArrayVector dictionaries_; - PartitioningFactoryOptions options_; }; std::shared_ptr DirectoryPartitioning::MakeFactory( @@ -490,27 +490,25 @@ Result HivePartitioning::FormatValues( return fs::internal::JoinAbstractPath(std::move(segments)); } -class HivePartitioningFactory : public PartitioningFactory { +class HivePartitioningFactory : public KeyValuePartitioningFactory { public: explicit HivePartitioningFactory(PartitioningFactoryOptions options) - : options_(options) {} + : KeyValuePartitioningFactory(options) {} std::string type_name() const override { return "hive"; } Result> Inspect( const std::vector& paths) override { - KeyValuePartitioningInspectImpl impl(options_); - for (auto path : paths) { for (auto&& segment : fs::internal::SplitAbstractPath(path)) { if (auto key = HivePartitioning::ParseKey(segment)) { - impl.InsertRepr(key->name, key->value); + RETURN_NOT_OK(InsertRepr(key->name, key->value)); } } } - field_names_ = impl.FieldNames(); - return impl.Finish(&dictionaries_); + field_names_ = FieldNames(); + return DoInpsect(); } Result> Finish( @@ -520,7 +518,7 @@ class HivePartitioningFactory : public PartitioningFactory { } else { for (FieldRef ref : field_names_) { // ensure all of field_names_ are present in schema - RETURN_NOT_OK(ref.FindOne(*schema).status()); + RETURN_NOT_OK(ref.FindOne(*schema)); } // drop fields which aren't in field_names_ @@ -532,8 +530,6 @@ class HivePartitioningFactory : public PartitioningFactory { private: std::vector field_names_; - ArrayVector dictionaries_; - PartitioningFactoryOptions options_; }; std::shared_ptr HivePartitioning::MakeFactory( diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 021f82245e2..6228999b41d 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -85,14 +85,11 @@ class ARROW_DS_EXPORT Partitioning { }; struct PartitioningFactoryOptions { - /// When inferring a schema for partition fields, string fields may be inferred as - /// a dictionary type instead. This can be more efficient when materializing virtual - /// columns. If the number of discovered unique values of a string field exceeds - /// max_partition_dictionary_size, it will instead be inferred as a string. - /// - /// max_partition_dictionary_size = 0: No fields will be inferred as dictionary. - /// max_partition_dictionary_size = -1: All fields will be inferred as dictionary. - int max_partition_dictionary_size = 0; + /// When inferring a schema for partition fields, yield dictionary encoded types + /// instead of plain. This can be more efficient when materializing virtual + /// columns, and Expressions parsed by the finished Partitioning will include + /// dictionaries of all unique inspected values for each field. + bool infer_dictionary = false; }; /// \brief PartitioningFactory provides creation of a partitioning when the diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index 27ab00ab0c2..ef92a99a967 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -89,10 +89,14 @@ class TestPartitioning : public ::testing::Test { return field(std::move(name), utf8()); } - static std::shared_ptr Dict(std::string name) { + static std::shared_ptr DictStr(std::string name) { return field(std::move(name), dictionary(int32(), utf8())); } + static std::shared_ptr DictInt(std::string name) { + return field(std::move(name), dictionary(int32(), int32())); + } + std::shared_ptr partitioning_; std::shared_ptr factory_; }; @@ -158,28 +162,25 @@ TEST_F(TestPartitioning, DiscoverSchema) { TEST_F(TestPartitioning, DictionaryInference) { PartitioningFactoryOptions options; - options.max_partition_dictionary_size = 2; + options.infer_dictionary = true; factory_ = DirectoryPartitioning::MakeFactory({"alpha", "beta"}, options); // type is still int32 if possible - AssertInspect({"/0/1"}, {Int("alpha"), Int("beta")}); + AssertInspect({"/0/1"}, {DictInt("alpha"), DictInt("beta")}); // successful dictionary inference - AssertInspect({"/a/0"}, {Dict("alpha"), Int("beta")}); - AssertInspect({"/a/0", "/a/1"}, {Dict("alpha"), Int("beta")}); - AssertInspect({"/a/0", "/b/0", "/a/1", "/b/1"}, {Dict("alpha"), Int("beta")}); - AssertInspect({"/a/-", "/b/-", "/a/_", "/b/_"}, {Dict("alpha"), Dict("beta")}); - - // fall back to string if max dictionary size is exceeded - AssertInspect({"/a/0", "/b/0", "/c/1", "/d/1"}, {Str("alpha"), Int("beta")}); + AssertInspect({"/a/0"}, {DictStr("alpha"), DictInt("beta")}); + AssertInspect({"/a/0", "/a/1"}, {DictStr("alpha"), DictInt("beta")}); + AssertInspect({"/a/0", "/b/0", "/a/1", "/b/1"}, {DictStr("alpha"), DictInt("beta")}); + AssertInspect({"/a/-", "/b/-", "/a/_", "/b/_"}, {DictStr("alpha"), DictStr("beta")}); } TEST_F(TestPartitioning, DictionaryHasUniqueValues) { PartitioningFactoryOptions options; - options.max_partition_dictionary_size = -1; + options.infer_dictionary = true; factory_ = DirectoryPartitioning::MakeFactory({"alpha"}, options); - auto alpha = Dict("alpha"); + auto alpha = DictStr("alpha"); AssertInspect({"/a", "/b", "/a", "/b", "/c", "/a"}, {alpha}); ASSERT_OK_AND_ASSIGN(auto partitioning, factory_->Finish(schema({alpha}))); @@ -262,34 +263,29 @@ TEST_F(TestPartitioning, DiscoverHiveSchema) { TEST_F(TestPartitioning, HiveDictionaryInference) { PartitioningFactoryOptions options; - options.max_partition_dictionary_size = 2; + options.infer_dictionary = true; factory_ = HivePartitioning::MakeFactory(options); // type is still int32 if possible - AssertInspect({"/alpha=0/beta=1"}, {Int("alpha"), Int("beta")}); + AssertInspect({"/alpha=0/beta=1"}, {DictInt("alpha"), DictInt("beta")}); // successful dictionary inference - AssertInspect({"/alpha=a/beta=0"}, {Dict("alpha"), Int("beta")}); - AssertInspect({"/alpha=a/beta=0", "/alpha=a/1"}, {Dict("alpha"), Int("beta")}); + AssertInspect({"/alpha=a/beta=0"}, {DictStr("alpha"), DictInt("beta")}); + AssertInspect({"/alpha=a/beta=0", "/alpha=a/1"}, {DictStr("alpha"), DictInt("beta")}); AssertInspect( {"/alpha=a/beta=0", "/alpha=b/beta=0", "/alpha=a/beta=1", "/alpha=b/beta=1"}, - {Dict("alpha"), Int("beta")}); + {DictStr("alpha"), DictInt("beta")}); AssertInspect( {"/alpha=a/beta=-", "/alpha=b/beta=-", "/alpha=a/beta=_", "/alpha=b/beta=_"}, - {Dict("alpha"), Dict("beta")}); - - // fall back to string if max dictionary size is exceeded - AssertInspect( - {"/alpha=a/beta=0", "/alpha=b/beta=0", "/alpha=c/beta=1", "/alpha=d/beta=1"}, - {Str("alpha"), Int("beta")}); + {DictStr("alpha"), DictStr("beta")}); } TEST_F(TestPartitioning, HiveDictionaryHasUniqueValues) { PartitioningFactoryOptions options; - options.max_partition_dictionary_size = -1; + options.infer_dictionary = true; factory_ = HivePartitioning::MakeFactory(options); - auto alpha = Dict("alpha"); + auto alpha = DictStr("alpha"); AssertInspect({"/alpha=a", "/alpha=b", "/alpha=a", "/alpha=b", "/alpha=c", "/alpha=a"}, {alpha}); ASSERT_OK_AND_ASSIGN(auto partitioning, factory_->Finish(schema({alpha}))); diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 62a73457d8c..fcecf46552a 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -555,7 +555,8 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { FileSystemFactoryOptions options; options.selector_ignore_prefixes = {"."}; - options.partitioning = HivePartitioning::MakeFactory(); + options.partitioning = std::make_shared( + SchemaFromColumnNames(source_schema_, {"year", "month"})); ASSERT_OK_AND_ASSIGN(auto factory, FileSystemDatasetFactory::Make(fs_, s, source_format, options)); ASSERT_OK_AND_ASSIGN(dataset_, factory->Finish()); diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 3b8621f9709..a8198b66d70 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1321,7 +1321,8 @@ cdef class DirectoryPartitioning(Partitioning): self.directory_partitioning = sp.get() @staticmethod - def discover(field_names, object max_partition_dictionary_size=0): + def discover(field_names, infer_dictionary=False, + max_partition_dictionary_size=0): """ Discover a DirectoryPartitioning. @@ -1329,11 +1330,16 @@ cdef class DirectoryPartitioning(Partitioning): ---------- field_names : list of str The names to associate with the values from the subdirectory names. - max_partition_dictionary_size : int or None, default 0 - The maximum number of unique values to consider for dictionary - encoding. By default no field will be inferred as dictionary - encoded. If None is provided dictionary encoding will be used for - every string field. + infer_dictionary : bool, default False + When inferring a schema for partition fields, yield dictionary + encoded types instead of plain types. This can be more efficient + when materializing virtual columns, and Expressions parsed by the + finished Partitioning will include dictionaries of all unique + inspected values for each field. + max_partition_dictionary_size : int, default 0 + Synonymous with infer_dictionary for backwards compatibility with + 1.0: setting this to -1 or None is equivalent to passing + infer_dictionary=True. Returns ------- @@ -1341,18 +1347,21 @@ cdef class DirectoryPartitioning(Partitioning): To be used in the FileSystemFactoryOptions. """ cdef: - CPartitioningFactoryOptions options + CPartitioningFactoryOptions c_options vector[c_string] c_field_names - if max_partition_dictionary_size is None: - max_partition_dictionary_size = -1 + if max_partition_dictionary_size in {-1, None}: + infer_dictionary = True + elif max_partition_dictionary_size != 0: + raise NotImplemented("max_partition_dictionary_size must be " + "0, -1, or None") - options.max_partition_dictionary_size = \ - int(max_partition_dictionary_size) + if infer_dictionary: + c_options.infer_dictionary = True c_field_names = [tobytes(s) for s in field_names] return PartitioningFactory.wrap( - CDirectoryPartitioning.MakeFactory(c_field_names, options)) + CDirectoryPartitioning.MakeFactory(c_field_names, c_options)) cdef class HivePartitioning(Partitioning): @@ -1403,17 +1412,22 @@ cdef class HivePartitioning(Partitioning): self.hive_partitioning = sp.get() @staticmethod - def discover(object max_partition_dictionary_size=0): + def discover(infer_dictionary=False, max_partition_dictionary_size=0): """ Discover a HivePartitioning. - Params - ------ - max_partition_dictionary_size : int or None, default 0 - The maximum number of unique values to consider for dictionary - encoding. By default no field will be inferred as dictionary - encoded. If -1 is provided dictionary encoding will be used for - every string field. + Parameters + ---------- + infer_dictionary : bool, default False + When inferring a schema for partition fields, yield dictionary + encoded types instead of plain. This can be more efficient when + materializing virtual columns, and Expressions parsed by the + finished Partitioning will include dictionaries of all unique + inspected values for each field. + max_partition_dictionary_size : int, default 0 + Synonymous with infer_dictionary for backwards compatibility with + 1.0: setting this to -1 or None is equivalent to passing + infer_dictionary=True. Returns ------- @@ -1421,16 +1435,19 @@ cdef class HivePartitioning(Partitioning): To be used in the FileSystemFactoryOptions. """ cdef: - CPartitioningFactoryOptions options + CPartitioningFactoryOptions c_options - if max_partition_dictionary_size is None: - max_partition_dictionary_size = -1 + if max_partition_dictionary_size in {-1, None}: + infer_dictionary = True + elif max_partition_dictionary_size != 0: + raise NotImplemented("max_partition_dictionary_size must be " + "0, -1, or None") - options.max_partition_dictionary_size = \ - int(max_partition_dictionary_size) + if infer_dictionary: + c_options.infer_dictionary = True return PartitioningFactory.wrap( - CHivePartitioning.MakeFactory(options)) + CHivePartitioning.MakeFactory(c_options)) cdef class DatasetFactory(_Weakrefable): diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 3e7ca1d5752..a1f7cd4aa4b 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -302,7 +302,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CPartitioningFactoryOptions \ "arrow::dataset::PartitioningFactoryOptions": - int max_partition_dictionary_size + c_bool infer_dictionary cdef cppclass CPartitioningFactory "arrow::dataset::PartitioningFactory": pass diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index b6dc6a8bd3b..cc8345a8a4d 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -1454,8 +1454,7 @@ def __init__(self, path_or_paths, filesystem=None, filters=None, # check partitioning to enable dictionary encoding if partitioning == "hive": partitioning = ds.HivePartitioning.discover( - max_partition_dictionary_size=-1 - ) + infer_dictionary=True) self._dataset = ds.dataset(path_or_paths, filesystem=filesystem, format=parquet_format, diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index cab6f700c34..80a55aa75d6 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1080,33 +1080,23 @@ def test_partitioning_factory(mockfs): assert isinstance(hive_partitioning_factory, ds.PartitioningFactory) -def test_partitioning_factory_dictionary(mockfs): +@pytest.mark.parametrize('infer_dictionary', [False, True]) +def test_partitioning_factory_dictionary(mockfs, infer_dictionary): paths_or_selector = fs.FileSelector('subdir', recursive=True) format = ds.ParquetFileFormat() options = ds.FileSystemFactoryOptions('subdir') - max_size_to_inferred_type = { - 0: pa.string(), - 1: pa.string(), - 2: pa.dictionary(pa.int32(), pa.string()), - 64: pa.dictionary(pa.int32(), pa.string()), - None: pa.dictionary(pa.int32(), pa.string()), - } - - for max_size, expected_type in max_size_to_inferred_type.items(): - options.partitioning_factory = ds.DirectoryPartitioning.discover( - ['group', 'key'], - max_partition_dictionary_size=max_size) + options.partitioning_factory = ds.DirectoryPartitioning.discover( + ['group', 'key'], infer_dictionary=infer_dictionary) - factory = ds.FileSystemDatasetFactory( - mockfs, paths_or_selector, format, options) + factory = ds.FileSystemDatasetFactory( + mockfs, paths_or_selector, format, options) - inferred_schema = factory.inspect() + inferred_schema = factory.inspect() + if infer_dictionary: + expected_type = pa.dictionary(pa.int32(), pa.string()) assert inferred_schema.field('key').type == expected_type - if expected_type == pa.string(): - continue - table = factory.finish().to_table().combine_chunks() actual = table.column('key').chunk(0) expected = pa.array(['xxx'] * 5 + ['yyy'] * 5).dictionary_encode() @@ -1117,6 +1107,8 @@ def test_partitioning_factory_dictionary(mockfs): actual = table.column('key').chunk(0) expected = expected.slice(0, 5) assert actual.equals(expected) + else: + assert inferred_schema.field('key').type == pa.string() def test_partitioning_function(): @@ -1492,33 +1484,34 @@ def test_open_dataset_partitioned_dictionary_type(tempdir, partitioning, import pyarrow.parquet as pq table = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5}) + if partitioning == "directory": + partitioning = ds.DirectoryPartitioning.discover( + ["part1", "part2"], infer_dictionary=True) + fmt = "{0}/{1}" + else: + partitioning = ds.HivePartitioning.discover(infer_dictionary=True) + fmt = "part1={0}/part2={1}" + basepath = tempdir / "dataset" basepath.mkdir() part_keys1, part_keys2 = partition_keys for part1 in part_keys1: for part2 in part_keys2: - if partitioning == 'directory': - fmt = "{0}/{1}" - else: - fmt = "part1={0}/part2={1}" path = basepath / fmt.format(part1, part2) path.mkdir(parents=True) pq.write_table(table, path / "test.parquet") - if partitioning == "directory": - part = ds.DirectoryPartitioning.discover( - ["part1", "part2"], max_partition_dictionary_size=None) - else: - part = ds.HivePartitioning.discover(max_partition_dictionary_size=None) - - dataset = ds.dataset(str(basepath), partitioning=part) + dataset = ds.dataset(str(basepath), partitioning=partitioning) - dict_type = pa.dictionary(pa.int32(), pa.string()) - part_type1 = dict_type if isinstance(part_keys1[0], str) else pa.int32() - part_type2 = dict_type if isinstance(part_keys2[0], str) else pa.int32() + def dict_type(key): + value_type = pa.string() if isinstance(key, str) else pa.int32() + return pa.dictionary(pa.int32(), value_type) expected_schema = table.schema.append( - pa.field("part1", part_type1)).append(pa.field("part2", part_type2)) + pa.field("part1", dict_type(part_keys1[0])) + ).append( + pa.field("part2", dict_type(part_keys2[0])) + ) assert dataset.schema.equals(expected_schema) diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index b2026f88599..a59682c8be5 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -2230,9 +2230,7 @@ def _partition_test_for_filesystem(fs, base_path, use_legacy_dataset=True): .reset_index(drop=True) .reindex(columns=result_df.columns)) - if use_legacy_dataset: - # integer partition field not dictionary encoded with new API - expected_df['foo'] = pd.Categorical(df['foo'], categories=foo_keys) + expected_df['foo'] = pd.Categorical(df['foo'], categories=foo_keys) expected_df['bar'] = pd.Categorical(df['bar'], categories=bar_keys) assert (result_df.columns == ['index', 'values', 'foo', 'bar']).all()