Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public class DataStorageConfiguration implements Serializable {
/** */
private static final long serialVersionUID = 0L;

/** Value used for making WAL archive size unlimited */
public static final long UNLIMITED_WAL_ARCHIVE = -1;

/** Default data region start size (256 MB). */
public static final long DFLT_DATA_REGION_INITIAL_SIZE = 256L * 1024 * 1024;

Expand Down Expand Up @@ -594,21 +597,26 @@ public boolean isWalHistorySizeParameterUsed() {
/**
* Gets a max allowed size(in bytes) of WAL archives.
*
* @return max size(in bytes) of WAL archive directory(always greater than 0).
* @return max size(in bytes) of WAL archive directory(greater than 0, or {@link #UNLIMITED_WAL_ARCHIVE} if
* WAL archive size is unlimited).
*/
public long getMaxWalArchiveSize() {
return maxWalArchiveSize <= 0 ? DFLT_WAL_ARCHIVE_MAX_SIZE : maxWalArchiveSize;
return maxWalArchiveSize;
}

/**
* Sets a max allowed size(in bytes) of WAL archives.
*
* If value is not positive, {@link #DFLT_WAL_ARCHIVE_MAX_SIZE} will be used.
* If value is not positive or {@link #UNLIMITED_WAL_ARCHIVE}, {@link #DFLT_WAL_ARCHIVE_MAX_SIZE} will be used.
*
* @param walArchiveMaxSize max size(in bytes) of WAL archive directory.
* @return {@code this} for chaining.
*/
public DataStorageConfiguration setMaxWalArchiveSize(long walArchiveMaxSize) {
if (walArchiveMaxSize != UNLIMITED_WAL_ARCHIVE)
A.ensure(walArchiveMaxSize > 0, "Max WAL archive size can be only greater than 0 " +
"or must be equal to " + UNLIMITED_WAL_ARCHIVE + " (to be unlimited)");

this.maxWalArchiveSize = walArchiveMaxSize;

return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,15 +627,18 @@ private void checkWalArchiveSizeConfiguration(DataStorageConfiguration memCfg) t
LT.warn(log, "DataRegionConfiguration.maxWalArchiveSize instead DataRegionConfiguration.walHistorySize " +
"would be used for removing old archive wal files");
else if (memCfg.getMaxWalArchiveSize() == DFLT_WAL_ARCHIVE_MAX_SIZE)
LT.warn(log, "walHistorySize was deprecated. maxWalArchiveSize should be used instead");
LT.warn(log, "walHistorySize was deprecated and does not have any effect anymore. " +
"maxWalArchiveSize should be used instead");
else
throw new IgniteCheckedException("Should be used only one of wal history size or max wal archive size." +
"(use DataRegionConfiguration.maxWalArchiveSize because DataRegionConfiguration.walHistorySize was deprecated)"
);

if (memCfg.getMaxWalArchiveSize() < memCfg.getWalSegmentSize())
if (memCfg.getMaxWalArchiveSize() != DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE
&& memCfg.getMaxWalArchiveSize() < memCfg.getWalSegmentSize())
throw new IgniteCheckedException(
"DataRegionConfiguration.maxWalArchiveSize should be greater than DataRegionConfiguration.walSegmentSize"
"DataRegionConfiguration.maxWalArchiveSize should be greater than DataRegionConfiguration.walSegmentSize " +
"or must be equal to " + DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE + "."
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public class CheckpointHistory {
/** The maximal number of checkpoints hold in memory. */
private final int maxCpHistMemSize;

/** If WalHistorySize was setted by user will use old way for removing checkpoints. */
private final boolean isWalHistorySizeParameterEnabled;
/** Should WAL be truncated */
private final boolean isWalTruncationEnabled;

/** Map stores the earliest checkpoint for each partition from particular group. */
private final Map<GroupPartitionId, CheckpointEntry> earliestCp = new ConcurrentHashMap<>();
Expand All @@ -80,9 +80,6 @@ public class CheckpointHistory {
/** Checking that checkpoint is applicable or not for given cache group. */
private final IgniteThrowableBiPredicate</*Checkpoint timestamp*/Long, /*Group id*/Integer> checkpointInapplicable;

/** It is available or not to truncate WAL on checkpoint finish. */
private final boolean truncateWalOnCpFinish;

/** It is available or not to reserve checkpoint(deletion protection). */
private final boolean reservationDisabled;

Expand All @@ -103,15 +100,9 @@ public class CheckpointHistory {
this.wal = wal;
this.checkpointInapplicable = inapplicable;

maxCpHistMemSize = Math.min(dsCfg.getWalHistorySize(),
IgniteSystemProperties.getInteger(IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE,
DFLT_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE));

isWalHistorySizeParameterEnabled = dsCfg.isWalHistorySizeParameterUsed();
isWalTruncationEnabled = dsCfg.getMaxWalArchiveSize() != DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE;

truncateWalOnCpFinish = dsCfg.isWalHistorySizeParameterUsed()
? dsCfg.getWalHistorySize() != Integer.MAX_VALUE
: dsCfg.getMaxWalArchiveSize() != Long.MAX_VALUE;
maxCpHistMemSize = IgniteSystemProperties.getInteger(IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE, DFLT_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE);

reservationDisabled = dsCfg.getWalMode() == WALMode.NONE;
}
Expand Down Expand Up @@ -317,7 +308,7 @@ private void addPartitionToEarliestCheckpoints(GroupPartitionId grpPartKey, Chec
* @return {@code true} if there is space for next checkpoint.
*/
public boolean hasSpace() {
return histMap.size() + 1 <= maxCpHistMemSize;
return isWalTruncationEnabled || histMap.size() + 1 <= maxCpHistMemSize;
}

/**
Expand All @@ -334,30 +325,69 @@ public List<CheckpointEntry> onWalTruncated(WALPointer highBound) {
if (highBound.compareTo(cpPnt) <= 0)
break;

if (wal.reserved(cpEntry.checkpointMark())) {
U.warn(log, "Could not clear historyMap due to WAL reservation on cp: " + cpEntry +
", history map size is " + histMap.size());

if (!removeCheckpoint(cpEntry))
break;
}

synchronized (earliestCp) {
CheckpointEntry deletedCpEntry = histMap.remove(cpEntry.timestamp());
removed.add(cpEntry);
}

CheckpointEntry oldestCpInHistory = firstCheckpoint();
return removed;
}

for (Map.Entry<GroupPartitionId, CheckpointEntry> grpPartPerCp : earliestCp.entrySet()) {
if (grpPartPerCp.getValue() == deletedCpEntry)
grpPartPerCp.setValue(oldestCpInHistory);
}
}
/**
* Removes checkpoints from history.
*
* @return List of checkpoint entries removed from history.
*/
public List<CheckpointEntry> removeCheckpoints(int countToRemove) {
if (countToRemove == 0)
return Collections.emptyList();

removed.add(cpEntry);
List<CheckpointEntry> removed = new ArrayList<>();

for (Iterator<Map.Entry<Long, CheckpointEntry>> iterator = histMap.entrySet().iterator();
iterator.hasNext() && removed.size() < countToRemove; ) {
Map.Entry<Long, CheckpointEntry> entry = iterator.next();

CheckpointEntry checkpoint = entry.getValue();

if (!removeCheckpoint(checkpoint))
break;

removed.add(checkpoint);
}

return removed;
}

/**
* Remove checkpoint from history
*
* @param checkpoint Checkpoint to be removed
* @return Whether checkpoint was removed from history
*/
private boolean removeCheckpoint(CheckpointEntry checkpoint) {
if (wal.reserved(checkpoint.checkpointMark())) {
U.warn(log, "Could not clear historyMap due to WAL reservation on cp: " + checkpoint +
", history map size is " + histMap.size());

return false;
}

synchronized (earliestCp) {
CheckpointEntry deletedCpEntry = histMap.remove(checkpoint.timestamp());

CheckpointEntry oldestCpInHistory = firstCheckpoint();

for (Map.Entry<GroupPartitionId, CheckpointEntry> grpPartPerCp : earliestCp.entrySet()) {
if (grpPartPerCp.getValue() == deletedCpEntry)
grpPartPerCp.setValue(oldestCpInHistory);
}
}

return true;
}

/**
* Logs and clears checkpoint history after checkpoint finish.
*
Expand All @@ -366,21 +396,20 @@ public List<CheckpointEntry> onWalTruncated(WALPointer highBound) {
public List<CheckpointEntry> onCheckpointFinished(Checkpoint chp) {
chp.walSegsCoveredRange(calculateWalSegmentsCovered());

WALPointer checkpointMarkUntilDel = isWalHistorySizeParameterEnabled //check for compatibility mode.
? checkpointMarkUntilDeleteByMemorySize()
: newerPointer(checkpointMarkUntilDeleteByMemorySize(), checkpointMarkUntilDeleteByArchiveSize());
int removeCount = isWalTruncationEnabled
? checkpointCountUntilDeleteByArchiveSize()
: (histMap.size() - maxCpHistMemSize);

if (checkpointMarkUntilDel == null)
if (removeCount <= 0)
return Collections.emptyList();

List<CheckpointEntry> deletedCheckpoints = onWalTruncated(checkpointMarkUntilDel);
List<CheckpointEntry> deletedCheckpoints = removeCheckpoints(removeCount);

int deleted = 0;
if (isWalTruncationEnabled) {
int deleted = wal.truncate(firstCheckpointPointer());

if (truncateWalOnCpFinish)
deleted += wal.truncate(firstCheckpointPointer());

chp.walFilesDeleted(deleted);
chp.walFilesDeleted(deleted);
}

return deletedCheckpoints;
}
Expand Down Expand Up @@ -420,28 +449,32 @@ private WALPointer checkpointMarkUntilDeleteByMemorySize() {
}

/**
* Calculate mark until delete by maximum allowed archive size.
* Calculate count of checkpoints to delete by maximum allowed archive size.
*
* @return Checkpoint mark until which checkpoints can be deleted(not including this pointer).
* @return Checkpoint count to be deleted.
*/
@Nullable private WALPointer checkpointMarkUntilDeleteByArchiveSize() {
private int checkpointCountUntilDeleteByArchiveSize() {
long absFileIdxToDel = wal.maxArchivedSegmentToDelete();

if (absFileIdxToDel < 0)
return null;
return 0;

long fileUntilDel = absFileIdxToDel + 1;

long checkpointFileIdx = absFileIdx(lastCheckpoint());

int countToRemove = 0;

for (CheckpointEntry cpEntry : histMap.values()) {
long currFileIdx = absFileIdx(cpEntry);

if (checkpointFileIdx <= currFileIdx || fileUntilDel <= currFileIdx)
return cpEntry.checkpointMark();
return countToRemove;

countToRemove++;
}

return lastCheckpoint().checkpointMark();
return histMap.size() - 1;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1649,8 +1649,8 @@ private FileDescriptor[] walArchiveFiles() {

/** {@inheritDoc} */
@Override public long maxArchivedSegmentToDelete() {
//When maxWalArchiveSize==MAX_VALUE deleting files is not permit.
if (dsCfg.getMaxWalArchiveSize() == Long.MAX_VALUE)
//When maxWalArchiveSize==-1 deleting files is not permitted.
if (dsCfg.getMaxWalArchiveSize() == DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE)
return -1;

FileDescriptor[] archivedFiles = walArchiveFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10797,7 +10797,8 @@ else if (regCfg.getMaxSize() < 8 * GB)
* @return User-set max WAL archive size of triple size of the maximum checkpoint buffer.
*/
public static long adjustedWalHistorySize(DataStorageConfiguration dsCfg, @Nullable IgniteLogger log) {
if (dsCfg.getMaxWalArchiveSize() != DataStorageConfiguration.DFLT_WAL_ARCHIVE_MAX_SIZE)
if (dsCfg.getMaxWalArchiveSize() != DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE &&
dsCfg.getMaxWalArchiveSize() != DataStorageConfiguration.DFLT_WAL_ARCHIVE_MAX_SIZE)
return dsCfg.getMaxWalArchiveSize();

// Find out the maximum checkpoint buffer size.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class IgnitePdsReserveWalSegmentsTest extends GridCommonAbstractTest {
new DataStorageConfiguration()
.setCheckpointFrequency(Long.MAX_VALUE)
.setWalMode(WALMode.LOG_ONLY)
.setMaxWalArchiveSize(Long.MAX_VALUE)
.setMaxWalArchiveSize(DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE)
.setWalSegmentSize(1024 * 1024)
.setWalSegments(10)
.setDefaultDataRegionConfiguration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class IgnitePdsStartWIthEmptyArchive extends GridCommonAbstractTest {
cfg.setDataStorageConfiguration(
new DataStorageConfiguration()
// Checkpoint should not remove any WAL archive files.
.setMaxWalArchiveSize(Long.MAX_VALUE)
.setMaxWalArchiveSize(DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE)
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.ignite.internal.processors.cache.persistence.db.wal;

import java.io.File;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.ignite.Ignite;
Expand Down Expand Up @@ -195,42 +194,6 @@ public void testCheckpointStarted_WhenWalHasTooBigSizeWithoutCheckpoint() throws
assertEquals("too big size of WAL without checkpoint", checkpointReason);
}

/**
* Test for check deprecated removing checkpoint by deprecated walHistorySize parameter
*
* @deprecated Test old removing process depends on WalHistorySize.
*/
@Test
public void testCheckpointHistoryRemovingByWalHistorySize() throws Exception {
//given: configured grid with wal history size = 10
int walHistorySize = 10;

Ignite ignite = startGrid(dbCfg -> {
dbCfg.setWalHistorySize(walHistorySize);
});

GridCacheDatabaseSharedManager dbMgr = gridDatabase(ignite);

IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cacheConfiguration());

//when: put to cache and do checkpoint
int testNumberOfCheckpoint = walHistorySize * 2;

for (int i = 0; i < testNumberOfCheckpoint; i++) {
cache.put(i, i);
//and: wait for checkpoint finished
forceCheckpoint();
}

//then: number of checkpoints less or equal than walHistorySize
CheckpointHistory hist = dbMgr.checkpointHistory();
assertTrue(hist.checkpoints().size() == walHistorySize);

File[] cpFiles = dbMgr.checkpointDirectory().listFiles();

assertTrue(cpFiles.length <= (walHistorySize * 2 + 1));// starts & ends + node_start
}

/**
* Correct delete checkpoint history from memory depends on IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE. WAL files
* doesn't delete because deleting was disabled.
Expand All @@ -240,7 +203,7 @@ public void testCheckpointHistoryRemovingByWalHistorySize() throws Exception {
public void testCorrectDeletedCheckpointHistoryButKeepWalFiles() throws Exception {
//given: configured grid with disabled WAL removing.
Ignite ignite = startGrid(dbCfg -> {
dbCfg.setMaxWalArchiveSize(Long.MAX_VALUE);
dbCfg.setMaxWalArchiveSize(DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE);
});

GridCacheDatabaseSharedManager dbMgr = gridDatabase(ignite);
Expand Down