Skip to content

Conversation

@mn-mikke
Copy link
Contributor

@mn-mikke mn-mikke commented May 9, 2018

What changes were proposed in this pull request?

The PR adds the map_from_entries function that returns a map created from the given array of entries.

How was this patch tested?

New tests added into:

  • CollectionExpressionSuite
  • DataFrameFunctionSuite

CodeGen Examples

Primitive-type Keys and Values

val idf = Seq(
  Seq((1, 10), (2, 20), (3, 10)),
  Seq((1, 10), null, (2, 20))
).toDF("a")
idf.filter('a.isNotNull).select(map_from_entries('a)).debugCodegen

Result:

/* 042 */         boolean project_isNull_0 = false;
/* 043 */         MapData project_value_0 = null;
/* 044 */
/* 045 */         for (int project_idx_2 = 0; !project_isNull_0 && project_idx_2 < inputadapter_value_0.numElements(); project_idx_2++) {
/* 046 */           project_isNull_0 |= inputadapter_value_0.isNullAt(project_idx_2);
/* 047 */         }
/* 048 */         if (!project_isNull_0) {
/* 049 */           final int project_numEntries_0 = inputadapter_value_0.numElements();
/* 050 */
/* 051 */           final long project_keySectionSize_0 = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(project_numEntries_0, 4);
/* 052 */           final long project_valueSectionSize_0 = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(project_numEntries_0, 4);
/* 053 */           final long project_byteArraySize_0 = 8 + project_keySectionSize_0 + project_valueSectionSize_0;
/* 054 */           if (project_byteArraySize_0 > 2147483632) {
/* 055 */             final Object[] project_keys_0 = new Object[project_numEntries_0];
/* 056 */             final Object[] project_values_0 = new Object[project_numEntries_0];
/* 057 */
/* 058 */             for (int project_idx_1 = 0; project_idx_1 < project_numEntries_0; project_idx_1++) {
/* 059 */               InternalRow project_entry_1 = inputadapter_value_0.getStruct(project_idx_1, 2);
/* 060 */
/* 061 */               project_keys_0[project_idx_1] = project_entry_1.getInt(0);
/* 062 */               project_values_0[project_idx_1] = project_entry_1.getInt(1);
/* 063 */             }
/* 064 */
/* 065 */             project_value_0 = org.apache.spark.sql.catalyst.util.ArrayBasedMapData.apply(project_keys_0, project_values_0);
/* 066 */
/* 067 */           } else {
/* 068 */             final byte[] project_byteArray_0 = new byte[(int)project_byteArraySize_0];
/* 069 */             UnsafeMapData project_unsafeMapData_0 = new UnsafeMapData();
/* 070 */             Platform.putLong(project_byteArray_0, 16, project_keySectionSize_0);
/* 071 */             Platform.putLong(project_byteArray_0, 24, project_numEntries_0);
/* 072 */             Platform.putLong(project_byteArray_0, 24 + project_keySectionSize_0, project_numEntries_0);
/* 073 */             project_unsafeMapData_0.pointTo(project_byteArray_0, 16, (int)project_byteArraySize_0);
/* 074 */             ArrayData project_keyArrayData_0 = project_unsafeMapData_0.keyArray();
/* 075 */             ArrayData project_valueArrayData_0 = project_unsafeMapData_0.valueArray();
/* 076 */
/* 077 */             for (int project_idx_0 = 0; project_idx_0 < project_numEntries_0; project_idx_0++) {
/* 078 */               InternalRow project_entry_0 = inputadapter_value_0.getStruct(project_idx_0, 2);
/* 079 */
/* 080 */               project_keyArrayData_0.setInt(project_idx_0, project_entry_0.getInt(0));
/* 081 */               project_valueArrayData_0.setInt(project_idx_0, project_entry_0.getInt(1));
/* 082 */             }
/* 083 */
/* 084 */             project_value_0 = project_unsafeMapData_0;
/* 085 */           }
/* 086 */
/* 087 */         }

Non-primitive-type Keys and Values

val sdf = Seq(
  Seq(("a", null), ("b", "bb"), ("c", "aa")),
  Seq(("a", "aa"), null, (null, "bb"))
).toDF("a")
sdf.filter('a.isNotNull).select(map_from_entries('a)).debugCodegen

Result:

/* 042 */         boolean project_isNull_0 = false;
/* 043 */         MapData project_value_0 = null;
/* 044 */
/* 045 */         for (int project_idx_1 = 0; !project_isNull_0 && project_idx_1 < inputadapter_value_0.numElements(); project_idx_1++) {
/* 046 */           project_isNull_0 |= inputadapter_value_0.isNullAt(project_idx_1);
/* 047 */         }
/* 048 */         if (!project_isNull_0) {
/* 049 */           final int project_numEntries_0 = inputadapter_value_0.numElements();
/* 050 */
/* 051 */           final Object[] project_keys_0 = new Object[project_numEntries_0];
/* 052 */           final Object[] project_values_0 = new Object[project_numEntries_0];
/* 053 */
/* 054 */           for (int project_idx_0 = 0; project_idx_0 < project_numEntries_0; project_idx_0++) {
/* 055 */             InternalRow project_entry_0 = inputadapter_value_0.getStruct(project_idx_0, 2);
/* 056 */
/* 057 */             if (project_entry_0.isNullAt(0)) {
/* 058 */               throw new RuntimeException("The first field from a struct (key) can't be null.");
/* 059 */             }
/* 060 */
/* 061 */             project_keys_0[project_idx_0] = project_entry_0.getUTF8String(0);
/* 062 */             project_values_0[project_idx_0] = project_entry_0.getUTF8String(1);
/* 063 */           }
/* 064 */
/* 065 */           project_value_0 = org.apache.spark.sql.catalyst.util.ArrayBasedMapData.apply(project_keys_0, project_values_0);
/* 066 */
/* 067 */         }

@mn-mikke
Copy link
Contributor Author

mn-mikke commented May 9, 2018

cc @ueshin @gatorsmile

@HyukjinKwon
Copy link
Member

ok to test

@HyukjinKwon
Copy link
Member

add to whitelist

if (key == null) {
throw new RuntimeException("The first field from a struct (key) can't be null.")
}
if (keySet.contains(key)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check necessary for now? This is because other operations (e.g. CreateMap) allows us to create a map with duplicated key. Is it better to be consistent in Spark?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we've already touched this topic in your PR for SPARK-23933. I think if some hashing is added into maps in future, these duplicity checks will have to be introduced anyway. So if we add it now, we can avoid breaking changes in future. But I understand your point of view.

Presto also doesn't support duplicates:

presto:default> SELECT map_from_entries(ARRAY[(1, 'x'), (1, 'y')]);
Query 20180510_090536_00005_468a9 failed: Duplicate keys (1) are not allowed

WDYT @ueshin @gatorsmile

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry for the super delay.
Let's just ignore the duplicated key like CreateMap for now. We will need to discuss map-related topics, such as duplicate keys, equality or ordering, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, no problem. I've removed duplicity checks.

@SparkQA
Copy link

SparkQA commented May 10, 2018

Test build #90434 has finished for PR 21282 at commit 8c6039c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class MapFromEntries(child: Expression) extends UnaryExpression

since = "2.4.0")
case class MapFromEntries(child: Expression) extends UnaryExpression
{
private lazy val resolvedDataType: Option[MapType] = child.dataType match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@transient?

private lazy val resolvedDataType: Option[MapType] = child.dataType match {
case ArrayType(
StructType(Array(
StructField(_, keyType, false, _),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need key field to be nullable = false because we check the nullability when creating an array?

StructType(Array(
StructField(_, keyType, false, _),
StructField(_, valueType, valueNullable, _))),
false) => Some(MapType(keyType, valueType, valueNullable))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reject an array with containsNull = true here? The array might not contain nulls.

""",
since = "2.4.0")
case class MapFromEntries(child: Expression) extends UnaryExpression
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: style

@SparkQA
Copy link

SparkQA commented May 10, 2018

Test build #90459 has finished for PR 21282 at commit 25aa879.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class MapFromEntries(child: Expression) extends UnaryExpression

@SparkQA
Copy link

SparkQA commented May 17, 2018

Test build #90721 has finished for PR 21282 at commit 8d12d9f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 17, 2018

Test build #90741 has finished for PR 21282 at commit 7fd824e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ArraysOverlap(left: Expression, right: Expression)

@mn-mikke
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented May 17, 2018

Test build #90748 has finished for PR 21282 at commit 7fd824e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ArraysOverlap(left: Expression, right: Expression)

@kiszk
Copy link
Member

kiszk commented May 18, 2018

retest this please

@SparkQA
Copy link

SparkQA commented May 18, 2018

Test build #90773 has finished for PR 21282 at commit 7fd824e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ArraysOverlap(left: Expression, right: Expression)

@mn-mikke
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented May 18, 2018

Test build #90782 has finished for PR 21282 at commit 7fd824e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ArraysOverlap(left: Expression, right: Expression)

@SparkQA
Copy link

SparkQA commented May 28, 2018

Test build #91229 has finished for PR 21282 at commit 45e4633.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 3, 2018

Test build #91421 has finished for PR 21282 at commit 10ace84.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

var i = 0
var j = 0
while (i < length) {
if (!arrayData.isNullAt(i)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should throw an exception if arrayData.isNullAt(i)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ueshin,
wouldn't it be better return null in this case? And follow null handling of other functions like flatten?

flatten(array(array(1,2), null, array(3,4))) => null

WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that sounds reasonable. Thanks.

@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Jun 13, 2018

Test build #91774 has finished for PR 21282 at commit 10ace84.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 21, 2018

Test build #92173 has finished for PR 21282 at commit 599656e.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 21, 2018

Test build #92175 has finished for PR 21282 at commit 4eaedc5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented Jun 22, 2018

LGTM.

@ueshin
Copy link
Member

ueshin commented Jun 22, 2018

Thanks! merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants