Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ public class DataStorageConfiguration implements Serializable {
/** Default wal compaction level. */
public static final int DFLT_WAL_COMPACTION_LEVEL = Deflater.BEST_SPEED;

/** Default defragmentation thread pool size. */
public static final int DFLT_DEFRAGMENTATION_THREAD_POOL_SIZE = 4;

/** Default compression algorithm for WAL page snapshot records. */
public static final DiskPageCompression DFLT_WAL_PAGE_COMPRESSION = DiskPageCompression.DISABLED;

Expand Down Expand Up @@ -320,6 +323,9 @@ public class DataStorageConfiguration implements Serializable {
/** Encryption configuration. */
private EncryptionConfiguration encCfg = new EncryptionConfiguration();

/** Maximum number of partitions which can be defragmented at the same time. */
private int defragmentationThreadPoolSize = DFLT_DEFRAGMENTATION_THREAD_POOL_SIZE;

/**
* Creates valid durable memory configuration with all default values.
*/
Expand Down Expand Up @@ -1173,6 +1179,30 @@ public DataStorageConfiguration setDefaultWarmUpConfiguration(@Nullable WarmUpCo
return dfltWarmUpCfg;
}

/**
* Sets maximum number of partitions which can be defragmented at the same time.
*
* @param defragmentationThreadPoolSize Maximum number of partitions which can be defragmented at the same time.
* Default is {@link DataStorageConfiguration#DFLT_DEFRAGMENTATION_THREAD_POOL_SIZE}.
* @return {@code this} for chaining.
*/
public DataStorageConfiguration setDefragmentationThreadPoolSize(int defragmentationThreadPoolSize) {
A.ensure(defragmentationThreadPoolSize > 1, "Defragmentation thread pool size must be greater or equal to 1.");

this.defragmentationThreadPoolSize = defragmentationThreadPoolSize;

return this;
}

/**
* Maximum number of partitions which can be defragmented at the same time.
*
* @return Thread pool size for defragmentation.
*/
public int getDefragmentationThreadPoolSize() {
return defragmentationThreadPoolSize;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DataStorageConfiguration.class, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,8 @@ private void prepareCacheDefragmentation(List<String> cacheNames) throws IgniteC
(FilePageStoreManager)cctx.pageStore(),
checkpointManager,
lightCheckpointMgr,
persistenceCfg.getPageSize()
persistenceCfg.getPageSize(),
persistenceCfg.getDefragmentationThreadPoolSize()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -71,8 +72,10 @@
import org.apache.ignite.internal.processors.query.GridQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.collection.IntHashMap;
import org.apache.ignite.internal.util.collection.IntMap;
import org.apache.ignite.internal.util.collection.IntRWHashMap;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
Expand All @@ -82,6 +85,8 @@
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.maintenance.MaintenanceRegistry;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;

import static java.util.Comparator.comparing;
import static java.util.stream.StreamSupport.stream;
Expand Down Expand Up @@ -158,6 +163,9 @@ public class CachePartitionDefragmentationManager {
/** */
private final GridFutureAdapter<?> completionFut = new GridFutureAdapter<>();

/** Checkpoint runner thread pool. If null tasks are to be run in single thread */
@Nullable private volatile IgniteThreadPoolExecutor defragmentationThreadPool;

/**
* @param cacheNames Names of caches to be defragmented. Empty means "all".
* @param sharedCtx Cache shared context.
Expand All @@ -174,7 +182,8 @@ public CachePartitionDefragmentationManager(
FilePageStoreManager filePageStoreMgr,
CheckpointManager nodeCheckpoint,
LightweightCheckpointManager defragmentationCheckpoint,
int pageSize
int pageSize,
int defragmentationThreadPoolSize
) throws IgniteCheckedException {
cachesForDefragmentation = new HashSet<>(cacheNames);

Expand All @@ -190,6 +199,15 @@ public CachePartitionDefragmentationManager(

partDataRegion = dbMgr.dataRegion(DEFRAGMENTATION_PART_REGION_NAME);
mappingDataRegion = dbMgr.dataRegion(DEFRAGMENTATION_MAPPING_REGION_NAME);

defragmentationThreadPool = new IgniteThreadPoolExecutor(
"defragmentation-worker",
sharedCtx.igniteInstanceName(),
defragmentationThreadPoolSize,
defragmentationThreadPoolSize,
30_000,
new LinkedBlockingQueue<Runnable>()
);
}

/** */
Expand Down Expand Up @@ -353,103 +371,24 @@ public void executeDefragmentation() throws IgniteCheckedException {
defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock();
}

IntMap<LinkMap> linkMapByPart = new IntHashMap<>();

for (CacheDataStore oldCacheDataStore : oldCacheDataStores) {
checkCancellation();

int partId = oldCacheDataStore.partId();
IntMap<LinkMap> linkMapByPart = new IntRWHashMap<>();

PartitionContext partCtx = new PartitionContext(
workDir,
grpId,
partId,
partDataRegion,
mappingDataRegion,
IgniteUtils.doInParallel(
defragmentationThreadPool,
oldCacheDataStores,
oldCacheDataStore -> defragmentOnePartition(
oldGrpCtx,
grpId,
workDir,
offheap,
pageStoreFactory,
cmpFut,
oldPageMem,
newGrpCtx,
cacheDataStores.get(partId),
pageStoreFactory
);

if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) {
partCtx.createPageStore(
() -> defragmentedPartMappingFile(workDir, partId).toPath(),
partCtx.mappingPagesAllocated,
partCtx.mappingPageMemory
);

linkMapByPart.put(partId, partCtx.createLinkMapTree(false));

continue;
}

partCtx.createPageStore(
() -> defragmentedPartMappingFile(workDir, partId).toPath(),
partCtx.mappingPagesAllocated,
partCtx.mappingPageMemory
);

linkMapByPart.put(partId, partCtx.createLinkMapTree(true));

checkCancellation();

partCtx.createPageStore(
() -> defragmentedPartTmpFile(workDir, partId).toPath(),
partCtx.partPagesAllocated,
partCtx.partPageMemory
);

partCtx.createNewCacheDataStore(offheap);

copyPartitionData(partCtx, treeIter);

DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager();

PageStore oldPageStore = filePageStoreMgr.getStore(grpId, partId);

status.onPartitionDefragmented(
oldGrpCtx,
oldPageStore.size(),
pageSize + partCtx.partPagesAllocated.get() * pageSize // + file header.
);

//TODO Move inside of defragmentSinglePartition.
IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> {
if (fut.error() != null)
return;

if (log.isDebugEnabled()) {
log.debug(S.toString(
"Partition defragmented",
"grpId", grpId, false,
"partId", partId, false,
"oldPages", oldPageStore.pages(), false,
"newPages", partCtx.partPagesAllocated.get() + 1, false,
"mappingPages", partCtx.mappingPagesAllocated.get() + 1, false,
"pageSize", pageSize, false,
"partFile", defragmentedPartFile(workDir, partId).getName(), false,
"workDir", workDir, false
));
}

oldPageMem.invalidate(grpId, partId);

partCtx.partPageMemory.invalidate(grpId, partId);

pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second.

renameTempPartitionFile(workDir, partId);
};

GridFutureAdapter<?> cpFut = defragmentationCheckpoint
.forceCheckpoint("partition defragmented", null)
.futureFor(CheckpointState.FINISHED);

cpFut.listen(cpLsnr);

cmpFut.add((IgniteInternalFuture<Object>)cpFut);
}
linkMapByPart,
oldCacheDataStore
)
);

// A bit too general for now, but I like it more then saving only the last checkpoint future.
cmpFut.markInitialized().get();
Expand Down Expand Up @@ -551,6 +490,119 @@ public void executeDefragmentation() throws IgniteCheckedException {
}
}

/**
* Defragment one given partition.
*/
private boolean defragmentOnePartition(
CacheGroupContext oldGrpCtx,
int grpId,
File workDir,
GridCacheOffheapManager offheap,
FilePageStoreFactory pageStoreFactory,
GridCompoundFuture<Object, Object> cmpFut,
PageMemoryEx oldPageMem,
CacheGroupContext newGrpCtx,
IntMap<LinkMap> linkMapByPart,
CacheDataStore oldCacheDataStore
) throws IgniteCheckedException {
TreeIterator treeIter = new TreeIterator(pageSize);

checkCancellation();

int partId = oldCacheDataStore.partId();

PartitionContext partCtx = new PartitionContext(
workDir,
grpId,
partId,
partDataRegion,
mappingDataRegion,
oldGrpCtx,
newGrpCtx,
oldCacheDataStore,
pageStoreFactory
);

if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) {
partCtx.createPageStore(
() -> defragmentedPartMappingFile(workDir, partId).toPath(),
partCtx.mappingPagesAllocated,
partCtx.mappingPageMemory
);

linkMapByPart.put(partId, partCtx.createLinkMapTree(false));

return false;
}

partCtx.createPageStore(
() -> defragmentedPartMappingFile(workDir, partId).toPath(),
partCtx.mappingPagesAllocated,
partCtx.mappingPageMemory
);

linkMapByPart.put(partId, partCtx.createLinkMapTree(true));

checkCancellation();

partCtx.createPageStore(
() -> defragmentedPartTmpFile(workDir, partId).toPath(),
partCtx.partPagesAllocated,
partCtx.partPageMemory
);

partCtx.createNewCacheDataStore(offheap);

copyPartitionData(partCtx, treeIter);

DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager();

PageStore oldPageStore = filePageStoreMgr.getStore(grpId, partId);

status.onPartitionDefragmented(
oldGrpCtx,
oldPageStore.size(),
pageSize + partCtx.partPagesAllocated.get() * pageSize // + file header.
);

//TODO Move inside of defragmentSinglePartition.
IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> {
if (fut.error() == null) {
if (log.isDebugEnabled()) {
log.debug(S.toString(
"Partition defragmented",
"grpId", grpId, false,
"partId", partId, false,
"oldPages", oldPageStore.pages(), false,
"newPages", partCtx.partPagesAllocated.get() + 1, false,
"mappingPages", partCtx.mappingPagesAllocated.get() + 1, false,
"pageSize", pageSize, false,
"partFile", defragmentedPartFile(workDir, partId).getName(), false,
"workDir", workDir, false
));
}

oldPageMem.invalidate(grpId, partId);

partCtx.partPageMemory.invalidate(grpId, partId);

pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second.

renameTempPartitionFile(workDir, partId);
}
};

GridFutureAdapter<?> cpFut = defragmentationCheckpoint
.forceCheckpoint("partition defragmented", null)
.futureFor(CheckpointState.FINISHED);

cpFut.listen(cpLsnr);

cmpFut.add((IgniteInternalFuture<Object>)cpFut);

return true;
}

/** */
public IgniteInternalFuture<?> completionFuture() {
return completionFut.chain(future -> null);
Expand Down Expand Up @@ -839,7 +891,8 @@ private void defragmentIndexPartition(
(PageMemoryEx)partDataRegion.pageMemory(),
mappingByPartition,
cpLock,
cancellationChecker
cancellationChecker,
defragmentationThreadPool
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ public void addPageStore(
) {
IntMap<PageStore> pageStoresMap = grpPageStoresMap.get(grpId);

//This code cannot be used concurrently. If we decide to parallel defragmentation then we should correct current class.
if (pageStoresMap == null)
grpPageStoresMap.put(grpId, pageStoresMap = new IntRWHashMap<>());
if (pageStoresMap == null) {
grpPageStoresMap.putIfAbsent(grpId, new IntRWHashMap<>());

pageStoresMap = grpPageStoresMap.get(grpId);
}

pageStoresMap.put(partId, pageStore);
}
Expand Down
Loading