1616import org .elasticsearch .common .bytes .BytesReference ;
1717import org .elasticsearch .common .component .AbstractLifecycleComponent ;
1818import org .elasticsearch .common .lease .Releasable ;
19+ import org .elasticsearch .common .metrics .CounterMetric ;
1920import org .elasticsearch .common .settings .Settings ;
2021import org .elasticsearch .common .unit .TimeValue ;
22+ import org .elasticsearch .common .util .CombinedRateLimiter ;
2123import org .elasticsearch .common .util .concurrent .AbstractRefCounted ;
2224import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
2325import org .elasticsearch .common .util .concurrent .KeyedLock ;
4244import java .util .concurrent .ConcurrentHashMap ;
4345import java .util .concurrent .CopyOnWriteArrayList ;
4446import java .util .function .Consumer ;
47+ import java .util .function .LongConsumer ;
4548
4649public class CcrRestoreSourceService extends AbstractLifecycleComponent implements IndexEventListener {
4750
@@ -52,6 +55,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
5255 private final CopyOnWriteArrayList <Consumer <String >> closeSessionListeners = new CopyOnWriteArrayList <>();
5356 private final ThreadPool threadPool ;
5457 private final CcrSettings ccrSettings ;
58+ private final CounterMetric throttleTime = new CounterMetric ();
5559
5660 public CcrRestoreSourceService (ThreadPool threadPool , CcrSettings ccrSettings ) {
5761 this .threadPool = threadPool ;
@@ -136,7 +140,7 @@ public synchronized SessionReader getSessionReader(String sessionUUID) {
136140 throw new IllegalArgumentException ("session [" + sessionUUID + "] not found" );
137141 }
138142 restore .idle = false ;
139- return new SessionReader (restore );
143+ return new SessionReader (restore , ccrSettings , throttleTime :: inc );
140144 }
141145
142146 private void internalCloseSession (String sessionUUID , boolean throwIfSessionMissing ) {
@@ -182,6 +186,10 @@ private void maybeTimeout(String sessionUUID) {
182186 }
183187 }
184188
189+ public long getThrottleTime () {
190+ return this .throttleTime .count ();
191+ }
192+
185193 private static class RestoreSession extends AbstractRefCounted {
186194
187195 private final String sessionUUID ;
@@ -254,9 +262,13 @@ protected void closeInternal() {
254262 public static class SessionReader implements Closeable {
255263
256264 private final RestoreSession restoreSession ;
265+ private final CcrSettings ccrSettings ;
266+ private final LongConsumer throttleListener ;
257267
258- private SessionReader (RestoreSession restoreSession ) {
268+ private SessionReader (RestoreSession restoreSession , CcrSettings ccrSettings , LongConsumer throttleListener ) {
259269 this .restoreSession = restoreSession ;
270+ this .ccrSettings = ccrSettings ;
271+ this .throttleListener = throttleListener ;
260272 restoreSession .incRef ();
261273 }
262274
@@ -270,6 +282,9 @@ private SessionReader(RestoreSession restoreSession) {
270282 * @throws IOException if the read fails
271283 */
272284 public long readFileBytes (String fileName , BytesReference reference ) throws IOException {
285+ CombinedRateLimiter rateLimiter = ccrSettings .getRateLimiter ();
286+ long throttleTime = rateLimiter .maybePause (reference .length ());
287+ throttleListener .accept (throttleTime );
273288 return restoreSession .readFileBytes (fileName , reference );
274289 }
275290
0 commit comments