Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Working on making the conversion to hdt faster #578

Open
wants to merge 12 commits into
base: dev
Choose a base branch
from
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,6 @@ data
wikidata
qendpoint-store/wdbench-indexes
wdbench-results
testing
indexing
wdbench-indexes
8 changes: 8 additions & 0 deletions qendpoint-backend/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@
<configuration>
<source>${java.source.version}</source>
<target>${java.target.version}</target>
<compilerArgs>
<arg>--add-exports</arg>
<arg>java.base/jdk.internal.misc=ALL-UNNAMED</arg>
<arg>--add-exports</arg>
<arg>java.base/jdk.internal.util=ALL-UNNAMED</arg>
<arg>--add-modules</arg>
<arg>jdk.incubator.vector</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
Expand Down
10 changes: 9 additions & 1 deletion qendpoint-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@
<configuration>
<source>17</source>
<target>17</target>
<compilerArgs>
<arg>--add-exports</arg>
<arg>java.base/jdk.internal.misc=ALL-UNNAMED</arg>
<arg>--add-exports</arg>
<arg>java.base/jdk.internal.util=ALL-UNNAMED</arg>
<arg>--add-modules</arg>
<arg>jdk.incubator.vector</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
Expand Down Expand Up @@ -75,7 +83,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.21</version>
<version>1.27.1</version>
</dependency>
<dependency>
<groupId>org.apache.jena</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException;
import com.the_qa_company.qendpoint.core.hdt.HDTVocabulary;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
import com.the_qa_company.qendpoint.core.storage.TempBuffIn;
import com.the_qa_company.qendpoint.core.storage.TempBuffOut;
import com.the_qa_company.qendpoint.core.util.io.CloseMappedByteBuffer;
import com.the_qa_company.qendpoint.core.util.io.Closer;
import com.the_qa_company.qendpoint.core.util.io.IOUtil;
Expand Down Expand Up @@ -67,7 +69,7 @@ public static MultiRoaringBitmap load(InputStream input) throws IOException {
* @throws IOException io exception when loading
*/
public static MultiRoaringBitmap load(Path input) throws IOException {
try (InputStream stream = new BufferedInputStream(Files.newInputStream(input))) {
try (InputStream stream = new TempBuffIn(Files.newInputStream(input))) {
return load(stream);
}
}
Expand Down Expand Up @@ -339,7 +341,7 @@ private void closeStreamBitmap(int layer, int index) throws IOException {
int sizeInBytes = handle.serializedSizeInBytes();
outputMax += sizeInBytes + 8 + 8 + 1;

OutputStream os = new BufferedOutputStream(Channels.newOutputStream(output.position(loc)));
OutputStream os = new TempBuffOut(Channels.newOutputStream(output.position(loc)));
os.write(BLOCK_BITMAP);
IOUtil.writeLong(os, sizeInBytes);
IOUtil.writeLong(os, layer);
Expand All @@ -354,7 +356,7 @@ private void closeStreamBitmap(int layer, int index) throws IOException {
}

public void save(Path output) throws IOException {
try (OutputStream stream = new BufferedOutputStream(Files.newOutputStream(output))) {
try (OutputStream stream = new TempBuffOut(Files.newOutputStream(output))) {
save(stream);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException;
import com.the_qa_company.qendpoint.core.hdt.HDTVocabulary;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
import com.the_qa_company.qendpoint.core.storage.TempBuffIn;
import com.the_qa_company.qendpoint.core.util.BitUtil;
import com.the_qa_company.qendpoint.core.util.crc.CRC32;
import com.the_qa_company.qendpoint.core.util.crc.CRC8;
Expand Down Expand Up @@ -65,7 +66,7 @@ public class SequenceLog64Map implements Sequence, Closeable {

public SequenceLog64Map(File f) throws IOException {
// Read from the beginning of the file
this(new CountInputStream(new BufferedInputStream(new FileInputStream(f))), f, true);
this(new CountInputStream(new TempBuffIn(new FileInputStream(f))), f, true);
}

public SequenceLog64Map(CountInputStream in, File f) throws IOException {
Expand Down Expand Up @@ -162,7 +163,7 @@ private void mapFiles(File f, long base) throws IOException {
// FIXME: Bug in the previous code, find what because it should be more
// efficient

CountInputStream in = new CountInputStream(new BufferedInputStream(new FileInputStream(f)));
CountInputStream in = new CountInputStream(new TempBuffIn(new FileInputStream(f)));
IOUtil.skip(in, base + ((numwords - 1) * 8L));
// System.out.println("Last word starts at: "+in.getTotalBytes());
// Read only used bits from last entry (byte aligned, little endian)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import com.the_qa_company.qendpoint.core.exceptions.IllegalFormatException;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
import com.the_qa_company.qendpoint.core.storage.TempBuffIn;
import com.the_qa_company.qendpoint.core.storage.TempBuffOut;
import com.the_qa_company.qendpoint.core.util.BitUtil;
import com.the_qa_company.qendpoint.core.util.Mutable;
import com.the_qa_company.qendpoint.core.util.crc.CRC32;
Expand Down Expand Up @@ -116,7 +118,7 @@ public void load(Iterator<? extends CharSequence> it, long numentries, ProgressL
ByteString previousStr = null;

try {
try (OutputStream out = new BufferedOutputStream(new FileOutputStream(file))) {
try (OutputStream out = new TempBuffOut(new FileOutputStream(file))) {
while (it.hasNext()) {
ByteString str = ByteString.of(it.next());

Expand Down Expand Up @@ -161,7 +163,7 @@ public void load(Iterator<? extends CharSequence> it, long numentries, ProgressL
byteOut.writeTo(out);
}

try (InputStream in = new BufferedInputStream(new FileInputStream(file))) {
try (InputStream in = new TempBuffIn(new FileInputStream(file))) {
// Read block by block
// Read packed data

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.the_qa_company.qendpoint.core.exceptions.IllegalFormatException;
import com.the_qa_company.qendpoint.core.exceptions.NotImplementedException;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
import com.the_qa_company.qendpoint.core.storage.TempBuffIn;
import com.the_qa_company.qendpoint.core.util.io.BigMappedByteBuffer;
import com.the_qa_company.qendpoint.core.compact.integer.VByte;
import com.the_qa_company.qendpoint.core.compact.sequence.Sequence;
Expand Down Expand Up @@ -375,7 +376,7 @@ public void load(Iterator<? extends CharSequence> it, long count, ProgressListen

@Override
public void save(OutputStream output, ProgressListener listener) throws IOException {
InputStream in = new BufferedInputStream(new FileInputStream(f));
InputStream in = new TempBuffIn(new FileInputStream(f));
IOUtil.skip(in, startOffset);
IOUtil.copyStream(in, output, endOffset - startOffset);
in.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.the_qa_company.qendpoint.core.rdf.RDFParserCallback;
import com.the_qa_company.qendpoint.core.rdf.RDFParserFactory;
import com.the_qa_company.qendpoint.core.rdf.TripleWriter;
import com.the_qa_company.qendpoint.core.storage.TempBuffOut;
import com.the_qa_company.qendpoint.core.triples.TripleString;
import com.the_qa_company.qendpoint.core.util.BitUtil;
import com.the_qa_company.qendpoint.core.util.Profiler;
Expand Down Expand Up @@ -254,8 +255,7 @@ public HDTResult doGenerateHDT(String rdfFileName, String baseURI, RDFNotation r
InputStream stream = readIs.is();

try (InputStream is = checksumPath != null ? new CRCInputStream(stream, new CRC32()) : stream;
OutputStream os = new BufferedOutputStream(
Files.newOutputStream(preDownload, openOptions))) {
OutputStream os = new TempBuffOut(Files.newOutputStream(preDownload, openOptions))) {
IOUtil.copy(is, os, listener, 10_000_000);
if (is instanceof CRCInputStream crcIs) {
checksum = crcIs.getCRC().getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.the_qa_company.qendpoint.core.hdt.impl.diskimport.TripleCompressionResult;
import com.the_qa_company.qendpoint.core.header.HeaderPrivate;
import com.the_qa_company.qendpoint.core.iterator.utils.AsyncIteratorFetcher;
import com.the_qa_company.qendpoint.core.iterator.utils.AsyncIteratorFetcherUnordered;
import com.the_qa_company.qendpoint.core.listener.MultiThreadListener;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
Expand Down Expand Up @@ -100,10 +101,14 @@ public HDTDiskImporter(HDTOptions hdtFormat, ProgressListener progressListener,
throw new IllegalArgumentException("Number of workers should be positive!");
}
// maximum size of a chunk
chunkSize = hdtFormat.getInt(HDTOptionsKeys.LOADER_DISK_CHUNK_SIZE_KEY, () -> getMaxChunkSize(this.workers));
long chunkSize = hdtFormat.getInt(HDTOptionsKeys.LOADER_DISK_CHUNK_SIZE_KEY,
() -> getMaxChunkSize(this.workers));
if (chunkSize < 0) {
throw new IllegalArgumentException("Negative chunk size!");
}
System.err.println("chunkSize: " + chunkSize);
this.chunkSize = ((((chunkSize / 1024 / 1024) / 32) * 32) * 1024 * 1024);
System.err.println("this.chunkSize: " + this.chunkSize);
long maxFileOpenedLong = hdtFormat.getInt(HDTOptionsKeys.LOADER_DISK_MAX_FILE_OPEN_KEY, 1024);
int maxFileOpened;
if (maxFileOpenedLong < 0 || maxFileOpenedLong > Integer.MAX_VALUE) {
Expand Down Expand Up @@ -178,11 +183,11 @@ public CompressTripleMapper compressDictionary(Iterator<TripleString> iterator)
throw new IllegalArgumentException("Dictionary already built! Use another importer instance!");
}
listener.notifyProgress(0,
"Sorting sections with chunk of size: " + StringUtil.humanReadableByteCount(chunkSize, true) + "B with "
+ ways + "ways and " + workers + " worker(s)");

AsyncIteratorFetcher<TripleString> source = new AsyncIteratorFetcher<>(iterator);
"Sorting sections with chunk of size: " + StringUtil.humanReadableByteCount(chunkSize, false)
+ "iB with " + ways + "ways and " + workers + " worker(s)");

AsyncIteratorFetcherUnordered<TripleString> source = new AsyncIteratorFetcherUnordered<>(iterator);
// AsyncIteratorFetcher<TripleString> source = new AsyncIteratorFetcher<>(iterator);
profiler.pushSection("section compression");
CompressionResult compressionResult;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import com.the_qa_company.qendpoint.core.options.HDTOptions;
import com.the_qa_company.qendpoint.core.options.HDTOptionsKeys;
import com.the_qa_company.qendpoint.core.options.HDTSpecification;
import com.the_qa_company.qendpoint.core.storage.TempBuffIn;
import com.the_qa_company.qendpoint.core.storage.TempBuffOut;
import com.the_qa_company.qendpoint.core.triples.DictionaryEntriesDiff;
import com.the_qa_company.qendpoint.core.triples.IteratorTripleID;
import com.the_qa_company.qendpoint.core.triples.IteratorTripleString;
Expand Down Expand Up @@ -160,9 +162,9 @@ public void loadFromHDT(InputStream input, ProgressListener listener) throws IOE
public void loadFromHDT(String hdtFileName, ProgressListener listener) throws IOException {
InputStream in;
if (hdtFileName.endsWith(".gz")) {
in = new BufferedInputStream(new GZIPInputStream(new FileInputStream(hdtFileName)));
in = new TempBuffIn(new GZIPInputStream(new FileInputStream(hdtFileName)));
} else {
in = new CountInputStream(new BufferedInputStream(new FileInputStream(hdtFileName)));
in = new CountInputStream(new TempBuffIn(new FileInputStream(hdtFileName)));
}
loadFromHDT(in, listener);
in.close();
Expand Down Expand Up @@ -192,7 +194,7 @@ public void mapFromHDT(File f, long offset, ProgressListener listener) throws IO
}

boolean dumpBinInfo = spec.getBoolean(HDTOptionsKeys.DUMP_BINARY_OFFSETS, false);
try (CountInputStream input = new CountInputStream(new BufferedInputStream(new FileInputStream(hdtFileName)),
try (CountInputStream input = new CountInputStream(new TempBuffIn(new FileInputStream(hdtFileName)),
dumpBinInfo)) {

input.printIndex("HDT CI");
Expand Down Expand Up @@ -256,7 +258,7 @@ public void mapFromHDT(File f, long offset, ProgressListener listener) throws IO
*/
@Override
public void saveToHDT(String fileName, ProgressListener listener) throws IOException {
try (OutputStream out = new BufferedOutputStream(new FileOutputStream(fileName))) {
try (OutputStream out = new TempBuffOut(new FileOutputStream(fileName))) {
// OutputStream out = new GZIPOutputStream(new
// BufferedOutputStream(new FileOutputStream(fileName)));
saveToHDT(out, listener);
Expand Down Expand Up @@ -510,7 +512,7 @@ public void loadOrCreateIndex(ProgressListener listener, HDTOptions spec) throws
}
CountInputStream in = null;
try {
in = new CountInputStream(new BufferedInputStream(new FileInputStream(ff)));
in = new CountInputStream(new TempBuffIn(new FileInputStream(ff)));
ci.load(in);
if (isMapped) {
triples.mapIndex(in, new File(indexName), ci, listener);
Expand All @@ -528,9 +530,9 @@ public void loadOrCreateIndex(ProgressListener listener, HDTOptions spec) throws

// SAVE
if (this.hdtFileName != null) {
BufferedOutputStream out = null;
OutputStream out = null;
try {
out = new BufferedOutputStream(new FileOutputStream(versionName));
out = new TempBuffOut(new FileOutputStream(versionName));
ci.clear();
triples.saveIndex(out, ci, listener);
out.close();
Expand Down Expand Up @@ -611,7 +613,7 @@ public void cat(String location, HDT hdt1, HDT hdt2, ProgressListener listener,
// map the generated dictionary
FourSectionDictionaryBig dictionary;
try (CountInputStream fis = new CountInputStream(
new BufferedInputStream(new FileInputStream(location + "dictionary")))) {
new TempBuffIn(new FileInputStream(location + "dictionary")))) {
dictionary = new FourSectionDictionaryBig(new HDTSpecification());
fis.mark(1024);
ci2.load(fis);
Expand Down Expand Up @@ -655,8 +657,7 @@ public void cat(String location, HDT hdt1, HDT hdt2, ProgressListener listener,
Files.delete(Paths.get(location + "O2" + "Types"));

// map the triples
try (CountInputStream fis2 = new CountInputStream(
new BufferedInputStream(new FileInputStream(location + "triples")))) {
try (CountInputStream fis2 = new CountInputStream(new TempBuffIn(new FileInputStream(location + "triples")))) {
ControlInfo ci2 = new ControlInformation();
ci2.clear();
fis2.mark(1024);
Expand Down Expand Up @@ -695,7 +696,7 @@ public void catCustom(String location, HDT hdt1, HDT hdt2, ProgressListener list
// map the generated dictionary
ControlInfo ci2 = new ControlInformation();
try (CountInputStream fis = new CountInputStream(
new BufferedInputStream(new FileInputStream(location + "dictionary")))) {
new TempBuffIn(new FileInputStream(location + "dictionary")))) {
HDTSpecification spec = new HDTSpecification();
spec.set(HDTOptionsKeys.TEMP_DICTIONARY_IMPL_KEY, HDTOptionsKeys.TEMP_DICTIONARY_IMPL_VALUE_MULT_HASH);
spec.set(HDTOptionsKeys.DICTIONARY_TYPE_KEY, HDTOptionsKeys.DICTIONARY_TYPE_VALUE_MULTI_OBJECTS);
Expand Down Expand Up @@ -757,8 +758,7 @@ public void catCustom(String location, HDT hdt1, HDT hdt2, ProgressListener list
Files.delete(Paths.get(location + "O2"));
Files.delete(Paths.get(location + "O2" + "Types"));
// map the triples
try (CountInputStream fis2 = new CountInputStream(
new BufferedInputStream(new FileInputStream(location + "triples")))) {
try (CountInputStream fis2 = new CountInputStream(new TempBuffIn(new FileInputStream(location + "triples")))) {
ControlInformation ci2 = new ControlInformation();
ci2.clear();
fis2.mark(1024);
Expand Down Expand Up @@ -815,7 +815,7 @@ public void diffBit(String location, HDT hdt, Bitmap deleteBitmap, ProgressListe
ControlInfo ci2 = new ControlInformation();

try (CountInputStream fis = new CountInputStream(
new BufferedInputStream(new FileInputStream(location + "dictionary")))) {
new TempBuffIn(new FileInputStream(location + "dictionary")))) {
fis.mark(1024);
ci2.load(fis);
fis.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.the_qa_company.qendpoint.core.header.HeaderPrivate;
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
import com.the_qa_company.qendpoint.core.storage.TempBuffOut;
import com.the_qa_company.qendpoint.core.triples.IteratorTripleString;
import com.the_qa_company.qendpoint.core.triples.TriplesPrivate;
import com.the_qa_company.qendpoint.core.triples.impl.WriteBitmapTriples;
Expand Down Expand Up @@ -87,7 +88,7 @@ public void loadOrCreateIndex(ProgressListener listener, HDTOptions disk) {

@Override
public void saveToHDT(String fileName, ProgressListener listener) throws IOException {
try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(Path.of(fileName)))) {
try (OutputStream out = new TempBuffOut(Files.newOutputStream(Path.of(fileName)))) {
saveToHDT(out, listener);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.the_qa_company.qendpoint.core.util.listener.IntermediateListener;
import com.the_qa_company.qendpoint.core.util.string.ByteString;
import com.the_qa_company.qendpoint.core.util.string.CompactString;
import org.apache.jena.base.Sys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -51,6 +52,7 @@ public class SectionCompressor implements KWayMerger.KWayMergerImpl<TripleString
private final int k;
private final boolean debugSleepKwayDict;
private final boolean quads;
private final long start = System.currentTimeMillis();

public SectionCompressor(CloseSuppressPath baseFileName, AsyncIteratorFetcher<TripleString> source,
MultiThreadListener listener, int bufferSize, long chunkSize, int k, boolean debugSleepKwayDict,
Expand Down Expand Up @@ -250,7 +252,10 @@ public void createChunk(SizeFetcher<TripleString> fetcher, CloseSuppressPath out
}

if (tripleID % 100_000 == 0) {
listener.notifyProgress(10, "reading triples " + tripleID);
// use start to measure how many triples are read per second
int triplesPerSecond = (int) (tripleID / ((System.currentTimeMillis() - start) / 1000.0));

listener.notifyProgress(10, "reading triples " + tripleID + " triples per second: " + triplesPerSecond);
}
// too much ram allowed?
if (subjects.size() == Integer.MAX_VALUE - 6) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.the_qa_company.qendpoint.core.listener.ProgressListener;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
import com.the_qa_company.qendpoint.core.rdf.TripleWriter;
import com.the_qa_company.qendpoint.core.storage.TempBuffOut;
import com.the_qa_company.qendpoint.core.triples.TempTriples;
import com.the_qa_company.qendpoint.core.triples.TripleString;
import com.the_qa_company.qendpoint.core.util.StopWatch;
Expand All @@ -39,10 +40,9 @@ public TripleWriterHDT(String baseUri, HDTOptions spec, String outFile, boolean
this.baseUri = baseUri;
this.spec = spec;
if (compress) {
this.out = new BufferedOutputStream(
new GZIPOutputStream(new BufferedOutputStream(new FileOutputStream(outFile))));
this.out = new TempBuffOut(new GZIPOutputStream(new TempBuffOut(new FileOutputStream(outFile))));
} else {
this.out = new BufferedOutputStream(new FileOutputStream(outFile));
this.out = new TempBuffOut(new FileOutputStream(outFile));
}
close = true;
init();
Expand All @@ -51,7 +51,7 @@ public TripleWriterHDT(String baseUri, HDTOptions spec, String outFile, boolean
public TripleWriterHDT(String baseUri, HDTOptions spec, OutputStream out) {
this.baseUri = baseUri;
this.spec = spec;
this.out = new BufferedOutputStream(out);
this.out = new TempBuffOut(out);
init();
}

Expand Down
Loading