Skip to content

Commit 48b622d

Browse files
committed
Screw it, let's just get LimitedInputStream
1 parent 3543b70 commit 48b622d

File tree

5 files changed

+111
-3
lines changed

5 files changed

+111
-3
lines changed

LICENSE

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -754,7 +754,7 @@ SUCH DAMAGE.
754754

755755

756756
========================================================================
757-
For Timsort (core/src/main/java/org/apache/spark/util/collection/Sorter.java):
757+
For Timsort (core/src/main/java/org/apache/spark/util/collection/TimSort.java):
758758
========================================================================
759759
Copyright (C) 2008 The Android Open Source Project
760760

@@ -771,6 +771,25 @@ See the License for the specific language governing permissions and
771771
limitations under the License.
772772

773773

774+
========================================================================
775+
For LimitedInputStream
776+
(network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java):
777+
========================================================================
778+
Copyright (C) 2007 The Guava Authors
779+
780+
Licensed under the Apache License, Version 2.0 (the "License");
781+
you may not use this file except in compliance with the License.
782+
You may obtain a copy of the License at
783+
784+
http://www.apache.org/licenses/LICENSE-2.0
785+
786+
Unless required by applicable law or agreed to in writing, software
787+
distributed under the License is distributed on an "AS IS" BASIS,
788+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
789+
See the License for the specific language governing permissions and
790+
limitations under the License.
791+
792+
774793
========================================================================
775794
BSD-style licenses
776795
========================================================================

network/common/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
<dependency>
5151
<groupId>com.google.guava</groupId>
5252
<artifactId>guava</artifactId>
53+
<version>11.0.2</version> <!-- yarn 2.4.0's version -->
5354
<scope>provided</scope>
5455
</dependency>
5556

network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@
2727

2828
import com.google.common.base.Objects;
2929
import com.google.common.io.ByteStreams;
30-
import com.google.common.io.LimitInputStream;
3130
import io.netty.channel.DefaultFileRegion;
3231

3332
import org.apache.spark.network.util.JavaUtils;
33+
import org.apache.spark.network.util.LimitedInputStream;
3434

3535
/**
3636
* A {@link ManagedBuffer} backed by a segment in a file.
@@ -102,7 +102,7 @@ public InputStream createInputStream() throws IOException {
102102
try {
103103
is = new FileInputStream(file);
104104
ByteStreams.skipFully(is, offset);
105-
return new LimitInputStream(is, length);
105+
return new LimitedInputStream(is, length);
106106
} catch (IOException e) {
107107
try {
108108
if (is != null) {
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.util;
19+
20+
import java.io.FilterInputStream;
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
24+
import com.google.common.base.Preconditions;
25+
26+
/**
27+
* Wraps a {@link InputStream}, limiting the number of bytes which can be read.
28+
*
29+
* This code is from Guava's 14.0 source code, because there is no compatible way to
30+
* use this functionality in both a Guava 11 environment and a Guava >14 environment.
31+
*/
32+
public final class LimitedInputStream extends FilterInputStream {
33+
private long left;
34+
private long mark = -1;
35+
36+
public LimitedInputStream(InputStream in, long limit) {
37+
super(in);
38+
Preconditions.checkNotNull(in);
39+
Preconditions.checkArgument(limit >= 0, "limit must be non-negative");
40+
left = limit;
41+
}
42+
@Override public int available() throws IOException {
43+
return (int) Math.min(in.available(), left);
44+
}
45+
// it's okay to mark even if mark isn't supported, as reset won't work
46+
@Override public synchronized void mark(int readLimit) {
47+
in.mark(readLimit);
48+
mark = left;
49+
}
50+
@Override public int read() throws IOException {
51+
if (left == 0) {
52+
return -1;
53+
}
54+
int result = in.read();
55+
if (result != -1) {
56+
--left;
57+
}
58+
return result;
59+
}
60+
@Override public int read(byte[] b, int off, int len) throws IOException {
61+
if (left == 0) {
62+
return -1;
63+
}
64+
len = (int) Math.min(len, left);
65+
int result = in.read(b, off, len);
66+
if (result != -1) {
67+
left -= result;
68+
}
69+
return result;
70+
}
71+
@Override public synchronized void reset() throws IOException {
72+
if (!in.markSupported()) {
73+
throw new IOException("Mark not supported");
74+
}
75+
if (mark == -1) {
76+
throw new IOException("Mark not set");
77+
}
78+
in.reset();
79+
left = mark;
80+
}
81+
@Override public long skip(long n) throws IOException {
82+
n = Math.min(n, left);
83+
long skipped = in.skip(n);
84+
left -= skipped;
85+
return skipped;
86+
}
87+
}

network/shuffle/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
<dependency>
5252
<groupId>com.google.guava</groupId>
5353
<artifactId>guava</artifactId>
54+
<version>11.0.2</version> <!-- yarn 2.4.0's version -->
5455
<scope>provided</scope>
5556
</dependency>
5657

0 commit comments

Comments
 (0)