@@ -7786,22 +7786,18 @@ def reindex(
7786
7786
df = self
7787
7787
7788
7788
if index is not None :
7789
- df = df ._reindex_index (index )
7789
+ df = df ._reindex_index (index , fill_value )
7790
7790
7791
7791
if columns is not None :
7792
- df = df ._reindex_columns (columns )
7793
-
7794
- # Process missing values.
7795
- if fill_value is not None :
7796
- df = df .fillna (fill_value )
7792
+ df = df ._reindex_columns (columns , fill_value )
7797
7793
7798
7794
# Copy
7799
- if copy :
7795
+ if copy and df is self :
7800
7796
return df .copy ()
7801
7797
else :
7802
7798
return df
7803
7799
7804
- def _reindex_index (self , index ):
7800
+ def _reindex_index (self , index , fill_value ):
7805
7801
# When axis is index, we can mimic pandas' by a right outer join.
7806
7802
assert (
7807
7803
len (self ._internal .index_spark_column_names ) <= 1
@@ -7811,15 +7807,38 @@ def _reindex_index(self, index):
7811
7807
7812
7808
kser = ks .Series (list (index ))
7813
7809
labels = kser ._internal .spark_frame .select (kser .spark .column .alias (index_column ))
7810
+ frame = self ._internal .resolved_copy .spark_frame .drop (NATURAL_ORDER_COLUMN_NAME )
7814
7811
7815
- joined_df = self ._internal .resolved_copy .spark_frame .drop (NATURAL_ORDER_COLUMN_NAME ).join (
7816
- labels , on = index_column , how = "right"
7817
- )
7818
- internal = self ._internal .with_new_sdf (joined_df )
7812
+ if fill_value is not None :
7813
+ frame_index_column = verify_temp_column_name (frame , "__frame_index_column__" )
7814
+ frame = frame .withColumnRenamed (index_column , frame_index_column )
7815
+
7816
+ temp_fill_value = verify_temp_column_name (frame , "__fill_value__" )
7817
+ labels = labels .withColumn (temp_fill_value , F .lit (fill_value ))
7818
+
7819
+ frame_index_scol = scol_for (frame , frame_index_column )
7820
+ labels_index_scol = scol_for (labels , index_column )
7819
7821
7822
+ joined_df = frame .join (labels , on = [frame_index_scol == labels_index_scol ], how = "right" )
7823
+ joined_df = joined_df .select (
7824
+ labels_index_scol ,
7825
+ * [
7826
+ F .when (
7827
+ frame_index_scol .isNull () & labels_index_scol .isNotNull (),
7828
+ scol_for (joined_df , temp_fill_value ),
7829
+ )
7830
+ .otherwise (scol_for (joined_df , col ))
7831
+ .alias (col )
7832
+ for col in self ._internal .data_spark_column_names
7833
+ ]
7834
+ )
7835
+ else :
7836
+ joined_df = frame .join (labels , on = index_column , how = "right" )
7837
+
7838
+ internal = self ._internal .with_new_sdf (joined_df )
7820
7839
return DataFrame (internal )
7821
7840
7822
- def _reindex_columns (self , columns ):
7841
+ def _reindex_columns (self , columns , fill_value ):
7823
7842
level = self ._internal .column_labels_level
7824
7843
if level > 1 :
7825
7844
label_columns = list (columns )
@@ -7833,12 +7852,13 @@ def _reindex_columns(self, columns):
7833
7852
raise ValueError (
7834
7853
"shape (1,{}) doesn't match the shape (1,{})" .format (len (col ), level )
7835
7854
)
7855
+ fill_value = np .nan if fill_value is None else fill_value
7836
7856
scols , labels = [], []
7837
7857
for label in label_columns :
7838
7858
if label in self ._internal .column_labels :
7839
7859
scols .append (self ._internal .spark_column_for (label ))
7840
7860
else :
7841
- scols .append (F .lit (np . nan ).alias (name_like_string (label )))
7861
+ scols .append (F .lit (fill_value ).alias (name_like_string (label )))
7842
7862
labels .append (label )
7843
7863
7844
7864
return DataFrame (self ._internal .with_new_columns (scols , column_labels = labels ))
0 commit comments