Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,46 +169,31 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
public static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal\\.tmp");

/** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
public static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() {
@Override public boolean accept(File file) {
return !file.isDirectory() && WAL_NAME_PATTERN.matcher(file.getName()).matches();
}
};
public static final FileFilter WAL_SEGMENT_FILE_FILTER = file -> !file.isDirectory() &&
WAL_NAME_PATTERN.matcher(file.getName()).matches();

/** WAL segment temporary file filter, see {@link #WAL_TEMP_NAME_PATTERN} */
private static final FileFilter WAL_SEGMENT_TEMP_FILE_FILTER = new FileFilter() {
@Override public boolean accept(File file) {
return !file.isDirectory() && WAL_TEMP_NAME_PATTERN.matcher(file.getName()).matches();
}
};
private static final FileFilter WAL_SEGMENT_TEMP_FILE_FILTER = file -> !file.isDirectory() &&
WAL_TEMP_NAME_PATTERN.matcher(file.getName()).matches();

/** */
public static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip");

/** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
public static final FileFilter WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER = new FileFilter() {
@Override public boolean accept(File file) {
return !file.isDirectory() && (WAL_NAME_PATTERN.matcher(file.getName()).matches() ||
WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches());
}
};
public static final FileFilter WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER = file -> !file.isDirectory() &&
(WAL_NAME_PATTERN.matcher(file.getName()).matches() ||
WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches());

/** */
private static final Pattern WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip\\.tmp");

/** */
private static final FileFilter WAL_SEGMENT_FILE_COMPACTED_FILTER = new FileFilter() {
@Override public boolean accept(File file) {
return !file.isDirectory() && WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches();
}
};
private static final FileFilter WAL_SEGMENT_FILE_COMPACTED_FILTER = file -> !file.isDirectory() &&
WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches();

/** */
private static final FileFilter WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER = new FileFilter() {
@Override public boolean accept(File file) {
return !file.isDirectory() && WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches();
}
};
private static final FileFilter WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER = file -> !file.isDirectory() &&
WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches();

/** Buffer size. */
private static final int BUF_SIZE = 1024 * 1024;
Expand Down Expand Up @@ -497,7 +482,7 @@ public void setFileIOFactory(FileIOFactory ioFactory) {
});
}

segmentAware = new SegmentAware(dsCfg.getWalSegments(), dsCfg.isWalCompactionEnabled());
segmentAware = new SegmentAware(dsCfg.getWalSegments(), dsCfg.isWalCompactionEnabled(), log);

// We have to initialize compressor before archiver in order to setup already compressed segments.
// Otherwise, FileArchiver initialization will trigger redundant work for FileCompressor.
Expand Down Expand Up @@ -1062,26 +1047,25 @@ private boolean hasIndex(long absIdx) {

if (desc.idx >= lastCheckpointPtr.index() // We cannot delete segments needed for binary recovery.
|| desc.idx >= lastArchived // We cannot delete last segment, it is needed at start of node and avoid gaps.
|| desc.idx >= high.index() // We cannot delete segments larger than the border.
|| !segmentAware.minReserveIndex(desc.idx)) // We cannot delete reserved segment.
return deleted;

if (desc.idx < high.index()) {
if (!desc.file.delete()) {
U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " +
desc.file.getAbsolutePath());
}
else {
deleted++;
if (!desc.file.delete()) {
U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " +
desc.file.getAbsolutePath());
}
else {
deleted++;

segmentSize.remove(desc.idx());
}
segmentSize.remove(desc.idx());
}

// Bump up the oldest archive segment index.
if (segmentAware.lastTruncatedArchiveIdx() < desc.idx)
segmentAware.lastTruncatedArchiveIdx(desc.idx);
// Bump up the oldest archive segment index.
if (segmentAware.lastTruncatedArchiveIdx() < desc.idx)
segmentAware.lastTruncatedArchiveIdx(desc.idx);

cctx.kernalContext().encryption().onWalSegmentRemoved(desc.idx);
}
cctx.kernalContext().encryption().onWalSegmentRemoved(desc.idx);
}

return deleted;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.processors.cache.persistence.wal.aware;

import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;

/**
Expand All @@ -43,12 +44,13 @@ public class SegmentAware {
*
* @param walSegmentsCnt Total WAL segments count.
* @param compactionEnabled Is wal compaction enabled.
* @param log Logger.
*/
public SegmentAware(int walSegmentsCnt, boolean compactionEnabled) {
public SegmentAware(int walSegmentsCnt, boolean compactionEnabled, IgniteLogger log) {
segmentArchivedStorage = new SegmentArchivedStorage(segmentLockStorage);

segmentCurrStateStorage = new SegmentCurrentStateStorage(walSegmentsCnt);
segmentCompressStorage = new SegmentCompressStorage(compactionEnabled);
segmentCompressStorage = new SegmentCompressStorage(log, compactionEnabled);

segmentArchivedStorage.addObserver(segmentCurrStateStorage::onSegmentArchived);
segmentArchivedStorage.addObserver(segmentCompressStorage::onSegmentArchived);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;

/**
* Storage of actual information about current index of compressed segments.
*/
public class SegmentCompressStorage {
/** Logger. */
private final IgniteLogger log;

/** Flag of interrupt waiting on this object. */
private volatile boolean interrupted;

Expand All @@ -51,9 +55,11 @@ public class SegmentCompressStorage {
/**
* Constructor.
*
* @param log Logger.
* @param compactionEnabled If WAL compaction enabled.
*/
SegmentCompressStorage(boolean compactionEnabled) {
SegmentCompressStorage(IgniteLogger log, boolean compactionEnabled) {
this.log = log;
this.compactionEnabled = compactionEnabled;
}

Expand All @@ -63,6 +69,9 @@ public class SegmentCompressStorage {
* @param compressedIdx Index of compressed segment.
*/
synchronized void onSegmentCompressed(long compressedIdx) {
if (log.isInfoEnabled())
log.info("Segment compressed notification [idx=" + compressedIdx + ']');

if (compressedIdx > lastMaxCompressedIdx)
lastMaxCompressedIdx = compressedIdx;

Expand Down Expand Up @@ -129,8 +138,12 @@ private void checkInterrupted() throws IgniteInterruptedCheckedException {
* Callback for waking up compressor when new segment is archived.
*/
synchronized void onSegmentArchived(long lastAbsArchivedIdx) {
while (lastEnqueuedToCompressIdx < lastAbsArchivedIdx && compactionEnabled)
while (lastEnqueuedToCompressIdx < lastAbsArchivedIdx && compactionEnabled) {
if (log.isInfoEnabled())
log.info("Enqueuing segment for compression [idx=" + (lastEnqueuedToCompressIdx + 1) + ']');

segmentsToCompress.add(++lastEnqueuedToCompressIdx);
}

notifyAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValueHierarchy;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;

/**
Expand Down Expand Up @@ -180,6 +181,29 @@ public void testNotTruncateSegmentsForBinaryRecovery() throws Exception {
assertEquals(0, truncated);
}

/**
* Check that the minimum reserved index will not be greater than the actual deleted segment.
*
* @throws Exception If failed.
*/
@Test
public void testMinReserveIdx() throws Exception {
IgniteEx n = prepareGrid(1);

forceCheckpoint();

FileWriteAheadLogManager wal = (FileWriteAheadLogManager)n.context().cache().context().wal();
assertNotNull(wal);

if (compactionEnabled(n))
assertTrue(waitForCondition(() -> wal.lastCompactedSegment() >= 1, 10_000));

assertEquals(1, wal.truncate(new WALPointer(1, 0, 0)));

Long minReserveIdx = getFieldValueHierarchy(wal, "segmentAware", "reservationStorage", "minReserveIdx");
assertEquals(0L, minReserveIdx.longValue());
}

/**
* Starts grid and populates test data.
*
Expand Down Expand Up @@ -211,7 +235,7 @@ private IgniteEx prepareGrid(int cnt) throws Exception {
* @param dbMgr Database shared manager.
*/
private long getReservedWalSegmentIndex(IgniteWriteAheadLogManager dbMgr) {
return ((WALPointer)GridTestUtils.getFieldValueHierarchy(dbMgr, "lastCheckpointPtr")).index();
return ((WALPointer)getFieldValueHierarchy(dbMgr, "lastCheckpointPtr")).index();
}

/**
Expand Down
Loading