diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java index bf514d81c5a2..e7957ba8ba54 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/PrivateCellUtil.java @@ -2261,6 +2261,18 @@ public static int writeFlatKey(ExtendedCell cell, OutputStream out) throws IOExc + Bytes.SIZEOF_BYTE; } + /** + * Deep clones the given cell if the cell supports deep cloning + * @param cell the cell to be cloned + * @return the cloned cell + */ + public static Cell deepClone(Cell cell) throws CloneNotSupportedException { + if (cell instanceof ExtendedCell) { + return ((ExtendedCell) cell).deepClone(); + } + throw new CloneNotSupportedException(); + } + /** * Sets the given seqId to the cell. Marked as audience Private as of 1.2.0. Setting a Cell * sequenceid is an internal implementation detail not for general public use. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 8610a6d43bd7..16f115f592f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.AsyncClusterConnection; @@ -281,10 +282,15 @@ public void replicateEntries(List entries, final CellScanner cells, } addToHashMultiMap(rowMap, table, clusterIds, mutation); } + // Clone the cell. It may be used to construct a mutation for applying the edit on + // the local cluster. Some operations may still be in flight even as we fail to apply + // some other in-flight mutations and trigger failure handling including a release + // of the buffer underlying the cellScanner that is sourcing the cells. + Cell clonedCell = PrivateCellUtil.deepClone(cell); if (CellUtil.isDelete(cell)) { - ((Delete) mutation).add(cell); + ((Delete) mutation).add(clonedCell); } else { - ((Put) mutation).add(cell); + ((Put) mutation).add(clonedCell); } previousCell = cell; } @@ -334,6 +340,8 @@ public void replicateEntries(List entries, final CellScanner cells, LOG.error("Unable to accept edit because:", ex); this.metrics.incrementFailedBatches(); throw ex; + } catch (CloneNotSupportedException e) { + throw new IOException(e); } }