Skip to content

Commit cfa6bdf

Browse files
committed
Merge branch 'master' into utf8string-java
Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
2 parents a3b124d + c8d551d commit cfa6bdf

35 files changed

+402
-180
lines changed

dev/create-release/known_translations

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,41 @@ zapletal-martin - Martin Zapletal
9191
zuxqoj - Shekhar Bansal
9292
mingyukim - Mingyu Kim
9393
sigmoidanalytics - Mayur Rustagi
94+
AiHe - Ai He
95+
BenFradet - Ben Fradet
96+
FavioVazquez - Favio Vazquez
97+
JaysonSunshine - Jayson Sunshine
98+
Liuchang0812 - Liu Chang
99+
Sephiroth-Lin - Sephiroth Lin
100+
dobashim - Masaru Dobashi
101+
ehnalis - Zoltan Zvara
102+
emres - Emre Sevinc
103+
gchen - Guancheng Chen
104+
haiyangsea - Haiyang Sea
105+
hlin09 - Hao Lin
106+
hqzizania - Qian Huang
107+
jeanlyn - Jean Lyn
108+
jerluc - Jeremy A. Lucas
109+
jrabary - Jaonary Rabarisoa
110+
judynash - Judy Nash
111+
kaka1992 - Chen Song
112+
ksonj - Kalle Jepsen
113+
kuromatsu-nobuyuki - Nobuyuki Kuromatsu
114+
lazyman500 - Dong Xu
115+
leahmcguire - Leah McGuire
116+
mbittmann - Mark Bittmann
117+
mbonaci - Marko Bonaci
118+
meawoppl - Matthew Goodman
119+
nyaapa - Arsenii Krasikov
120+
phatak-dev - Madhukara Phatak
121+
prabeesh - Prabeesh K
122+
rakeshchalasani - Rakesh Chalasani
123+
rekhajoshm - Rekha Joshi
124+
sisihj - June He
125+
szheng79 - Shuai Zheng
126+
texasmichelle - Michelle Casbon
127+
vinodkc - Vinod KC
128+
yongtang - Yong Tang
129+
ypcat - Pei-Lun Lee
130+
zhichao-li - Zhichao Li
131+
zzcclp - Zhichao Zhang

ec2/spark_ec2.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
raw_input = input
5252
xrange = range
5353

54-
SPARK_EC2_VERSION = "1.3.1"
54+
SPARK_EC2_VERSION = "1.4.0"
5555
SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__))
5656

5757
VALID_SPARK_VERSIONS = set([
@@ -89,7 +89,7 @@
8989

9090
# Default location to get the spark-ec2 scripts (and ami-list) from
9191
DEFAULT_SPARK_EC2_GITHUB_REPO = "https://github.com/mesos/spark-ec2"
92-
DEFAULT_SPARK_EC2_BRANCH = "branch-1.3"
92+
DEFAULT_SPARK_EC2_BRANCH = "branch-1.4"
9393

9494

9595
def setup_external_libs(libs):

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1244,7 +1244,7 @@
12441244
<include>**/*Suite.java</include>
12451245
</includes>
12461246
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
1247-
<argLine>-Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
1247+
<argLine>-Xmx3g -Xss4096k -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
12481248
<environmentVariables>
12491249
<!--
12501250
Setting SPARK_DIST_CLASSPATH is a simple way to make sure any child processes

project/SparkBuild.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ object TestSettings {
516516
javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")
517517
.map { case (k,v) => s"-D$k=$v" }.toSeq,
518518
javaOptions in Test += "-ea",
519-
javaOptions in Test ++= "-Xmx3g -XX:PermSize=128M -XX:MaxNewSize=256m -XX:MaxPermSize=1g"
519+
javaOptions in Test ++= "-Xmx3g -Xss4096k -XX:PermSize=128M -XX:MaxNewSize=256m -XX:MaxPermSize=1g"
520520
.split(" ").toSeq,
521521
javaOptions += "-Xmx3g",
522522
// Show full stack trace and duration in test cases.

python/pyspark/sql/tests.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import tempfile
2727
import pickle
2828
import functools
29+
import time
2930
import datetime
3031

3132
import py4j
@@ -47,6 +48,20 @@
4748
from pyspark.sql.window import Window
4849

4950

51+
class UTC(datetime.tzinfo):
52+
"""UTC"""
53+
ZERO = datetime.timedelta(0)
54+
55+
def utcoffset(self, dt):
56+
return self.ZERO
57+
58+
def tzname(self, dt):
59+
return "UTC"
60+
61+
def dst(self, dt):
62+
return self.ZERO
63+
64+
5065
class ExamplePointUDT(UserDefinedType):
5166
"""
5267
User-defined type (UDT) for ExamplePoint.
@@ -588,6 +603,23 @@ def test_filter_with_datetime(self):
588603
self.assertEqual(0, df.filter(df.date > date).count())
589604
self.assertEqual(0, df.filter(df.time > time).count())
590605

606+
def test_time_with_timezone(self):
607+
day = datetime.date.today()
608+
now = datetime.datetime.now()
609+
ts = time.mktime(now.timetuple()) + now.microsecond / 1e6
610+
# class in __main__ is not serializable
611+
from pyspark.sql.tests import UTC
612+
utc = UTC()
613+
utcnow = datetime.datetime.fromtimestamp(ts, utc)
614+
df = self.sqlCtx.createDataFrame([(day, now, utcnow)])
615+
day1, now1, utcnow1 = df.first()
616+
# Pyrolite serialize java.sql.Date as datetime, will be fixed in new version
617+
self.assertEqual(day1.date(), day)
618+
# Pyrolite does not support microsecond, the error should be
619+
# less than 1 millisecond
620+
self.assertTrue(now - now1 < datetime.timedelta(0.001))
621+
self.assertTrue(now - utcnow1 < datetime.timedelta(0.001))
622+
591623
def test_dropna(self):
592624
schema = StructType([
593625
StructField("name", StringType(), True),

python/pyspark/sql/types.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -655,12 +655,15 @@ def _need_python_to_sql_conversion(dataType):
655655
_need_python_to_sql_conversion(dataType.valueType)
656656
elif isinstance(dataType, UserDefinedType):
657657
return True
658-
elif isinstance(dataType, TimestampType):
658+
elif isinstance(dataType, (DateType, TimestampType)):
659659
return True
660660
else:
661661
return False
662662

663663

664+
EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal()
665+
666+
664667
def _python_to_sql_converter(dataType):
665668
"""
666669
Returns a converter that converts a Python object into a SQL datum for the given type.
@@ -698,26 +701,32 @@ def converter(obj):
698701
return tuple(c(d.get(n)) for n, c in zip(names, converters))
699702
else:
700703
return tuple(c(v) for c, v in zip(converters, obj))
701-
else:
704+
elif obj is not None:
702705
raise ValueError("Unexpected tuple %r with type %r" % (obj, dataType))
703706
return converter
704707
elif isinstance(dataType, ArrayType):
705708
element_converter = _python_to_sql_converter(dataType.elementType)
706-
return lambda a: [element_converter(v) for v in a]
709+
return lambda a: a and [element_converter(v) for v in a]
707710
elif isinstance(dataType, MapType):
708711
key_converter = _python_to_sql_converter(dataType.keyType)
709712
value_converter = _python_to_sql_converter(dataType.valueType)
710-
return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()])
713+
return lambda m: m and dict([(key_converter(k), value_converter(v)) for k, v in m.items()])
714+
711715
elif isinstance(dataType, UserDefinedType):
712-
return lambda obj: dataType.serialize(obj)
716+
return lambda obj: obj and dataType.serialize(obj)
717+
718+
elif isinstance(dataType, DateType):
719+
return lambda d: d and d.toordinal() - EPOCH_ORDINAL
720+
713721
elif isinstance(dataType, TimestampType):
714722

715723
def to_posix_timstamp(dt):
716-
if dt.tzinfo is None:
717-
return int(time.mktime(dt.timetuple()) * 1e7 + dt.microsecond * 10)
718-
else:
719-
return int(calendar.timegm(dt.utctimetuple()) * 1e7 + dt.microsecond * 10)
724+
if dt:
725+
seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
726+
else time.mktime(dt.timetuple()))
727+
return int(seconds * 1e7 + dt.microsecond * 10)
720728
return to_posix_timstamp
729+
721730
else:
722731
raise ValueError("Unexpected type %r" % dataType)
723732

sql/catalyst/src/main/scala/org/apache/spark/sql/BaseRow.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,27 @@ public int fieldIndex(String name) {
154154
throw new UnsupportedOperationException();
155155
}
156156

157+
/**
158+
* A generic version of Row.equals(Row), which is used for tests.
159+
*/
160+
@Override
161+
public boolean equals(Object other) {
162+
if (other instanceof Row) {
163+
Row row = (Row) other;
164+
int n = size();
165+
if (n != row.size()) {
166+
return false;
167+
}
168+
for (int i = 0; i < n; i ++) {
169+
if (isNullAt(i) != row.isNullAt(i) || (!isNullAt(i) && !get(i).equals(row.get(i)))) {
170+
return false;
171+
}
172+
}
173+
return true;
174+
}
175+
return false;
176+
}
177+
157178
@Override
158179
public Row copy() {
159180
final int n = size();

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ class Analyzer(
172172
* expressions which equal GroupBy expressions with Literal(null), if those expressions
173173
* are not set for this grouping set (according to the bit mask).
174174
*/
175-
private[this] def expand(g: GroupingSets): Seq[GroupExpression] = {
176-
val result = new scala.collection.mutable.ArrayBuffer[GroupExpression]
175+
private[this] def expand(g: GroupingSets): Seq[Seq[Expression]] = {
176+
val result = new scala.collection.mutable.ArrayBuffer[Seq[Expression]]
177177

178178
g.bitmasks.foreach { bitmask =>
179179
// get the non selected grouping attributes according to the bit mask
@@ -194,7 +194,7 @@ class Analyzer(
194194
Literal.create(bitmask, IntegerType)
195195
})
196196

197-
result += GroupExpression(substitution)
197+
result += substitution
198198
}
199199

200200
result.toSeq

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,10 @@ object FunctionRegistry {
8989
expression[CreateArray]("array"),
9090
expression[Coalesce]("coalesce"),
9191
expression[Explode]("explode"),
92-
expression[Lower]("lower"),
93-
expression[Substring]("substr"),
94-
expression[Substring]("substring"),
9592
expression[Rand]("rand"),
9693
expression[Randn]("randn"),
9794
expression[CreateStruct]("struct"),
9895
expression[Sqrt]("sqrt"),
99-
expression[Upper]("upper"),
10096

10197
// Math functions
10298
expression[Acos]("acos"),
@@ -115,6 +111,7 @@ object FunctionRegistry {
115111
expression[Log10]("log10"),
116112
expression[Log1p]("log1p"),
117113
expression[Pi]("pi"),
114+
expression[Log2]("log2"),
118115
expression[Pow]("pow"),
119116
expression[Rint]("rint"),
120117
expression[Signum]("signum"),
@@ -132,7 +129,14 @@ object FunctionRegistry {
132129
expression[Last]("last"),
133130
expression[Max]("max"),
134131
expression[Min]("min"),
135-
expression[Sum]("sum")
132+
expression[Sum]("sum"),
133+
134+
// string functions
135+
expression[Lower]("lower"),
136+
expression[StringLength]("length"),
137+
expression[Substring]("substr"),
138+
expression[Substring]("substring"),
139+
expression[Upper]("upper")
136140
)
137141

138142
val builtin: FunctionRegistry = {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
142142
case ByteType =>
143143
buildCast[Byte](_, _ != 0)
144144
case DecimalType() =>
145-
buildCast[Decimal](_, _ != 0)
145+
buildCast[Decimal](_, _ != Decimal(0))
146146
case DoubleType =>
147147
buildCast[Double](_, _ != 0)
148148
case FloatType =>
@@ -455,7 +455,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
455455
case (BooleanType, dt: NumericType) =>
456456
defineCodeGen(ctx, ev, c => s"(${ctx.javaType(dt)})($c ? 1 : 0)")
457457
case (dt: DecimalType, BooleanType) =>
458-
defineCodeGen(ctx, ev, c => s"$c.isZero()")
458+
defineCodeGen(ctx, ev, c => s"!$c.isZero()")
459459
case (dt: NumericType, BooleanType) =>
460460
defineCodeGen(ctx, ev, c => s"$c != 0")
461461

0 commit comments

Comments
 (0)