1
+ /*
2
+ * Licensed to Elastic Search and Shay Banon under one
3
+ * or more contributor license agreements. See the NOTICE file
4
+ * distributed with this work for additional information
5
+ * regarding copyright ownership. Elastic Search licenses this
6
+ * file to you under the Apache License, Version 2.0 (the
7
+ * "License"); you may not use this file except in compliance
8
+ * with the License. You may obtain a copy of the License at
9
+ *
10
+ * http://www.apache.org/licenses/LICENSE-2.0
11
+ *
12
+ * Unless required by applicable law or agreed to in writing,
13
+ * software distributed under the License is distributed on an
14
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
+ * KIND, either express or implied. See the License for the
16
+ * specific language governing permissions and limitations
17
+ * under the License.
18
+ */
19
+
20
+ package org .elasticsearch .index .merge .policy ;
21
+
22
+ import org .apache .lucene .index .CorruptIndexException ;
23
+ import org .apache .lucene .index .MergePolicy ;
24
+ import org .apache .lucene .index .SegmentInfo ;
25
+ import org .apache .lucene .index .SegmentInfos ;
26
+ import org .apache .lucene .index .TieredMergePolicy ;
27
+ import org .elasticsearch .ElasticSearchException ;
28
+ import org .elasticsearch .cluster .metadata .IndexMetaData ;
29
+ import org .elasticsearch .common .inject .Inject ;
30
+ import org .elasticsearch .common .settings .Settings ;
31
+ import org .elasticsearch .common .unit .ByteSizeUnit ;
32
+ import org .elasticsearch .common .unit .ByteSizeValue ;
33
+ import org .elasticsearch .index .settings .IndexSettingsService ;
34
+ import org .elasticsearch .index .shard .AbstractIndexShardComponent ;
35
+ import org .elasticsearch .index .store .Store ;
36
+
37
+ import java .io .IOException ;
38
+ import java .util .Set ;
39
+ import java .util .concurrent .CopyOnWriteArraySet ;
40
+
41
+ public class TieredMergePolicyProvider extends AbstractIndexShardComponent implements MergePolicyProvider <TieredMergePolicy > {
42
+
43
+ private final IndexSettingsService indexSettingsService ;
44
+
45
+ private final Set <CustomTieredMergePolicyProvider > policies = new CopyOnWriteArraySet <CustomTieredMergePolicyProvider >();
46
+
47
+ private volatile boolean compoundFormat ;
48
+ private volatile double expungeDeletesPctAllowed ;
49
+ private volatile ByteSizeValue floorSegment ;
50
+ private volatile int maxMergeAtOnce ;
51
+ private volatile int maxMergeAtOnceExplicit ;
52
+ private volatile ByteSizeValue maxMergedSegment ;
53
+ private volatile double segmentsPerTier ;
54
+ private boolean asyncMerge ;
55
+
56
+ private final ApplySettings applySettings = new ApplySettings ();
57
+
58
+ @ Inject public TieredMergePolicyProvider (Store store , IndexSettingsService indexSettingsService ) {
59
+ super (store .shardId (), store .indexSettings ());
60
+ this .indexSettingsService = indexSettingsService ;
61
+
62
+ this .compoundFormat = indexSettings .getAsBoolean ("index.compound_format" , store .suggestUseCompoundFile ());
63
+ this .asyncMerge = indexSettings .getAsBoolean ("index.merge.async" , true );
64
+ this .expungeDeletesPctAllowed = componentSettings .getAsDouble ("expunge_deletes_allowed" , 10d ); // percentage
65
+ this .floorSegment = componentSettings .getAsBytesSize ("floor_segment" , new ByteSizeValue (2 , ByteSizeUnit .MB ));
66
+ this .maxMergeAtOnce = componentSettings .getAsInt ("max_merge_at_once" , 10 );
67
+ this .maxMergeAtOnceExplicit = componentSettings .getAsInt ("max_merge_at_once_explicit" , 30 );
68
+ // TODO is this really a good default number for max_merge_segment, what happens for large indices, won't they end up with many segments?
69
+ this .maxMergedSegment = componentSettings .getAsBytesSize ("max_merge_segment" , new ByteSizeValue (5 , ByteSizeUnit .GB ));
70
+ this .segmentsPerTier = componentSettings .getAsDouble ("segments_per_tier" , 10d );
71
+
72
+ logger .debug ("using [tiered] merge policy with expunge_deletes_allowed[{}], floor_segment[{}], max_merge_at_once[{}], max_merge_at_once_explicit[{}], max_merge_segment[{}], segments_per_tier[{}], async_merge[{}]" ,
73
+ expungeDeletesPctAllowed , floorSegment , maxMergeAtOnce , maxMergeAtOnceExplicit , maxMergedSegment , segmentsPerTier , asyncMerge );
74
+
75
+ indexSettingsService .addListener (applySettings );
76
+ }
77
+
78
+
79
+ @ Override public TieredMergePolicy newMergePolicy () {
80
+ CustomTieredMergePolicyProvider mergePolicy ;
81
+ if (asyncMerge ) {
82
+ mergePolicy = new EnableMergeTieredMergePolicyProvider (this );
83
+ } else {
84
+ mergePolicy = new CustomTieredMergePolicyProvider (this );
85
+ }
86
+ mergePolicy .setUseCompoundFile (compoundFormat );
87
+ mergePolicy .setExpungeDeletesPctAllowed (expungeDeletesPctAllowed );
88
+ mergePolicy .setFloorSegmentMB (floorSegment .mbFrac ());
89
+ mergePolicy .setMaxMergeAtOnce (maxMergeAtOnce );
90
+ mergePolicy .setMaxMergeAtOnceExplicit (maxMergeAtOnceExplicit );
91
+ mergePolicy .setMaxMergedSegmentMB (maxMergedSegment .mbFrac ());
92
+ mergePolicy .setSegmentsPerTier (segmentsPerTier );
93
+ return mergePolicy ;
94
+ }
95
+
96
+ @ Override public void close (boolean delete ) throws ElasticSearchException {
97
+ indexSettingsService .removeListener (applySettings );
98
+ }
99
+
100
+ static {
101
+ IndexMetaData .addDynamicSettings (
102
+ "index.merge.policy.expunge_deletes_allowed" ,
103
+ "index.merge.policy.floor_segment" ,
104
+ "index.merge.policy.max_merge_at_once" ,
105
+ "index.merge.policy.max_merge_at_once_explicit" ,
106
+ "index.merge.policy.max_merged_segment" ,
107
+ "index.merge.policy.segments_per_tier" ,
108
+ "index.compound_format"
109
+ );
110
+ }
111
+
112
+ class ApplySettings implements IndexSettingsService .Listener {
113
+ @ Override public void onRefreshSettings (Settings settings ) {
114
+ double expungeDeletesPctAllowed = settings .getAsDouble ("index.merge.policy.expunge_deletes_allowed" , TieredMergePolicyProvider .this .expungeDeletesPctAllowed );
115
+ if (expungeDeletesPctAllowed != TieredMergePolicyProvider .this .expungeDeletesPctAllowed ) {
116
+ logger .info ("updating [expunge_deletes_allowed] from [{}] to [{}]" , TieredMergePolicyProvider .this .expungeDeletesPctAllowed , expungeDeletesPctAllowed );
117
+ TieredMergePolicyProvider .this .expungeDeletesPctAllowed = expungeDeletesPctAllowed ;
118
+ for (CustomTieredMergePolicyProvider policy : policies ) {
119
+ policy .setExpungeDeletesPctAllowed (expungeDeletesPctAllowed );
120
+ }
121
+ }
122
+
123
+ ByteSizeValue floorSegment = settings .getAsBytesSize ("index.merge.policy.floor_segment" , TieredMergePolicyProvider .this .floorSegment );
124
+ if (!floorSegment .equals (TieredMergePolicyProvider .this .floorSegment )) {
125
+ logger .info ("updating [floor_segment] from [{}] to [{}]" , TieredMergePolicyProvider .this .floorSegment , floorSegment );
126
+ TieredMergePolicyProvider .this .floorSegment = floorSegment ;
127
+ for (CustomTieredMergePolicyProvider policy : policies ) {
128
+ policy .setFloorSegmentMB (floorSegment .mbFrac ());
129
+ }
130
+ }
131
+
132
+ int maxMergeAtOnce = settings .getAsInt ("index.merge.policy.max_merge_at_once" , TieredMergePolicyProvider .this .maxMergeAtOnce );
133
+ if (maxMergeAtOnce != TieredMergePolicyProvider .this .maxMergeAtOnce ) {
134
+ logger .info ("updating [max_merge_at_once] from [{}] to [{}]" , TieredMergePolicyProvider .this .maxMergeAtOnce , maxMergeAtOnce );
135
+ TieredMergePolicyProvider .this .maxMergeAtOnce = maxMergeAtOnce ;
136
+ for (CustomTieredMergePolicyProvider policy : policies ) {
137
+ policy .setMaxMergeAtOnce (maxMergeAtOnce );
138
+ }
139
+ }
140
+
141
+ int maxMergeAtOnceExplicit = settings .getAsInt ("index.merge.policy.max_merge_at_once_explicit" , TieredMergePolicyProvider .this .maxMergeAtOnceExplicit );
142
+ if (maxMergeAtOnce != TieredMergePolicyProvider .this .maxMergeAtOnceExplicit ) {
143
+ logger .info ("updating [max_merge_at_once_explicit] from [{}] to [{}]" , TieredMergePolicyProvider .this .maxMergeAtOnceExplicit , maxMergeAtOnceExplicit );
144
+ TieredMergePolicyProvider .this .maxMergeAtOnceExplicit = maxMergeAtOnceExplicit ;
145
+ for (CustomTieredMergePolicyProvider policy : policies ) {
146
+ policy .setMaxMergeAtOnceExplicit (maxMergeAtOnceExplicit );
147
+ }
148
+ }
149
+
150
+ ByteSizeValue maxMergedSegment = settings .getAsBytesSize ("index.merge.policy.max_merged_segment" , TieredMergePolicyProvider .this .maxMergedSegment );
151
+ if (!maxMergedSegment .equals (TieredMergePolicyProvider .this .maxMergedSegment )) {
152
+ logger .info ("updating [max_merged_segment] from [{}] to [{}]" , TieredMergePolicyProvider .this .maxMergedSegment , maxMergedSegment );
153
+ TieredMergePolicyProvider .this .maxMergedSegment = maxMergedSegment ;
154
+ for (CustomTieredMergePolicyProvider policy : policies ) {
155
+ policy .setFloorSegmentMB (maxMergedSegment .mbFrac ());
156
+ }
157
+ }
158
+
159
+ double segmentsPerTier = settings .getAsDouble ("index.merge.policy.segments_per_tier" , TieredMergePolicyProvider .this .segmentsPerTier );
160
+ if (segmentsPerTier != TieredMergePolicyProvider .this .segmentsPerTier ) {
161
+ logger .info ("updating [segments_per_tier] from [{}] to [{}]" , TieredMergePolicyProvider .this .segmentsPerTier , segmentsPerTier );
162
+ TieredMergePolicyProvider .this .segmentsPerTier = segmentsPerTier ;
163
+ for (CustomTieredMergePolicyProvider policy : policies ) {
164
+ policy .setSegmentsPerTier (segmentsPerTier );
165
+ }
166
+ }
167
+
168
+ boolean compoundFormat = settings .getAsBoolean ("index.compound_format" , TieredMergePolicyProvider .this .compoundFormat );
169
+ if (compoundFormat != TieredMergePolicyProvider .this .compoundFormat ) {
170
+ logger .info ("updating index.compound_format from [{}] to [{}]" , TieredMergePolicyProvider .this .compoundFormat , compoundFormat );
171
+ TieredMergePolicyProvider .this .compoundFormat = compoundFormat ;
172
+ for (CustomTieredMergePolicyProvider policy : policies ) {
173
+ policy .setUseCompoundFile (compoundFormat );
174
+ }
175
+ }
176
+ }
177
+ }
178
+
179
+ public static class CustomTieredMergePolicyProvider extends TieredMergePolicy {
180
+
181
+ private final TieredMergePolicyProvider provider ;
182
+
183
+ public CustomTieredMergePolicyProvider (TieredMergePolicyProvider provider ) {
184
+ super ();
185
+ this .provider = provider ;
186
+ }
187
+
188
+ @ Override public void close () {
189
+ super .close ();
190
+ provider .policies .remove (this );
191
+ }
192
+ }
193
+
194
+ public static class EnableMergeTieredMergePolicyProvider extends CustomTieredMergePolicyProvider implements EnableMergePolicy {
195
+
196
+ private final ThreadLocal <Boolean > enableMerge = new ThreadLocal <Boolean >() {
197
+ @ Override protected Boolean initialValue () {
198
+ return Boolean .FALSE ;
199
+ }
200
+ };
201
+
202
+ public EnableMergeTieredMergePolicyProvider (TieredMergePolicyProvider provider ) {
203
+ super (provider );
204
+ }
205
+
206
+ @ Override public void enableMerge () {
207
+ enableMerge .set (Boolean .TRUE );
208
+ }
209
+
210
+ @ Override public void disableMerge () {
211
+ enableMerge .set (Boolean .FALSE );
212
+ }
213
+
214
+ @ Override public boolean isMergeEnabled () {
215
+ return enableMerge .get () == Boolean .TRUE ;
216
+ }
217
+
218
+ @ Override public void close () {
219
+ enableMerge .remove ();
220
+ super .close ();
221
+ }
222
+
223
+ @ Override public MergePolicy .MergeSpecification findMerges (SegmentInfos infos ) throws IOException {
224
+ if (enableMerge .get () == Boolean .FALSE ) {
225
+ return null ;
226
+ }
227
+ return super .findMerges (infos );
228
+ }
229
+
230
+ @ Override public MergePolicy .MergeSpecification findMergesToExpungeDeletes (SegmentInfos segmentInfos ) throws CorruptIndexException , IOException {
231
+ if (enableMerge .get () == Boolean .FALSE ) {
232
+ return null ;
233
+ }
234
+ return super .findMergesToExpungeDeletes (segmentInfos );
235
+ }
236
+
237
+ @ Override public MergePolicy .MergeSpecification findMergesForOptimize (SegmentInfos infos , int maxNumSegments , Set <SegmentInfo > segmentsToOptimize ) throws IOException {
238
+ if (enableMerge .get () == Boolean .FALSE ) {
239
+ return null ;
240
+ }
241
+ return super .findMergesForOptimize (infos , maxNumSegments , segmentsToOptimize );
242
+ }
243
+ }
244
+ }
0 commit comments