Skip to content

Commit

Permalink
Merge pull request #392 from the-qa-company/GH-391-dump-store
Browse files Browse the repository at this point in the history
GH-391 Dump dataset while running the endpoint
  • Loading branch information
ate47 authored Jul 26, 2023
2 parents 3188bce + 98fc29e commit 0c700fc
Show file tree
Hide file tree
Showing 12 changed files with 284 additions and 48 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ qendpoint-cli/*.iml
qendpoint-cli/.idea/
/qendpoint/
/tests/
/dump/

# See https://help.github.com/articles/ignoring-files/ for more about ignoring files.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ public ResponseEntity<Sparql.MergeRequestResult> mergeStore() {
return ResponseEntity.status(HttpStatus.OK).body(sparql.askForAMerge());
}

@GetMapping("/dump")
public ResponseEntity<Sparql.MergeRequestResult> dumpStore() {
return ResponseEntity.status(HttpStatus.OK).body(sparql.askForADump());
}

@GetMapping("/reindex")
public ResponseEntity<Sparql.LuceneIndexRequestResult> reindex() throws Exception {
return ResponseEntity.status(HttpStatus.OK).body(sparql.reindexLucene());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Calendar;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -469,6 +471,22 @@ public MergeRequestResult askForAMerge() {
return new MergeRequestResult(true);
}

/**
* ask for a merge of the endpoint store
*
* @return see
* {@link com.the_qa_company.qendpoint.store.EndpointStore#mergeStore()}
* return value
*/
public MergeRequestResult askForADump() {
if (endpoint == null) {
throw new ServerWebInputException("No endpoint store, bad config?");
}
Path outLocation = Path.of("dump")
.resolve(DateTimeFormatter.ofPattern("yyy-MM-dd HHmmss").format(LocalDateTime.now()));
return new MergeRequestResult(this.sparqlRepository.askDump(outLocation));
}

/**
* @return if the store is merging
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,9 @@
import com.the_qa_company.qendpoint.core.util.crc.CRCOutputStream;
import com.the_qa_company.qendpoint.core.util.io.IOUtil;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Enumeration;

/**
Expand Down Expand Up @@ -91,20 +88,6 @@ public void save(OutputStream output) throws IOException {
out.writeCRC();
}

@Override
public void load(String filename) throws IOException {
try (InputStream is = IOUtil.getFileInputStream(filename)) {
load(is);
}
}

@Override
public void load(Path filename) throws IOException {
try (InputStream is = new BufferedInputStream(Files.newInputStream(filename))) {
load(is);
}
}

@Override
public void load(InputStream input) throws IOException {
CRCInputStream in = new CRCInputStream(input, new CRC16());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package com.the_qa_company.qendpoint.compiler;

import com.the_qa_company.qendpoint.compiler.sail.LuceneSailCompiler;
import com.the_qa_company.qendpoint.core.exceptions.ParserException;
import com.the_qa_company.qendpoint.core.hdt.HDT;
import com.the_qa_company.qendpoint.core.hdt.HDTManager;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
import com.the_qa_company.qendpoint.core.triples.TripleString;
import com.the_qa_company.qendpoint.store.EndpointFiles;
import com.the_qa_company.qendpoint.store.EndpointStore;
import com.the_qa_company.qendpoint.store.exception.EndpointStoreException;
Expand All @@ -13,19 +18,13 @@
import org.eclipse.rdf4j.sail.Sail;
import org.eclipse.rdf4j.sail.SailConnection;
import org.eclipse.rdf4j.sail.SailException;
import org.eclipse.rdf4j.sail.helpers.NotifyingSailConnectionWrapper;
import org.eclipse.rdf4j.sail.helpers.NotifyingSailWrapper;
import org.eclipse.rdf4j.sail.helpers.SailConnectionWrapper;
import org.eclipse.rdf4j.sail.helpers.SailWrapper;
import org.eclipse.rdf4j.sail.lmdb.LmdbStore;
import org.eclipse.rdf4j.sail.lucene.LuceneSail;
import org.eclipse.rdf4j.sail.memory.MemoryStore;
import org.eclipse.rdf4j.sail.nativerdf.NativeStore;
import com.the_qa_company.qendpoint.core.exceptions.ParserException;
import com.the_qa_company.qendpoint.core.hdt.HDT;
import com.the_qa_company.qendpoint.core.hdt.HDTManager;
import com.the_qa_company.qendpoint.core.options.HDTOptions;
import com.the_qa_company.qendpoint.core.triples.TripleString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -233,6 +232,13 @@ public boolean hasLuceneSail() {
return !luceneSails.isEmpty();
}

/**
* @return the lucene sails linked with this compiled sail
*/
public Set<LuceneSail> getLuceneSails() {
return luceneSails;
}

@Override
public SailConnection getConnection() throws SailException {
bellow.beginConnectionBuilding();
Expand All @@ -245,6 +251,22 @@ public SailConnection getConnection() throws SailException {
}
}

/**
* ask to dump the store, this method won't be working if the source sail
* isn't a qEndpoint sail.
*
* @param output output
* @return if the dump was started
* @throws SailException Can't start the dump or the sail isn't a qEndpoint
* sail
*/
public boolean dumpStore(Path output) throws SailException {
if (!(source instanceof EndpointStore store)) {
throw new SailException("Can't dump sail of type " + source.getClass());
}
return store.dump(new CompiledSailEndpointStoreDump(output, this));
}

/**
* Compiled Sail connection, allows to get the source connection
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.the_qa_company.qendpoint.compiler;

import com.the_qa_company.qendpoint.store.EndpointStore;
import com.the_qa_company.qendpoint.store.EndpointStoreDump;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext;
import org.eclipse.rdf4j.common.concurrent.locks.Lock;
import org.eclipse.rdf4j.sail.lucene.LuceneSail;
import org.eclipse.rdf4j.sail.lucene.impl.LuceneIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Set;

public class CompiledSailEndpointStoreDump extends EndpointStoreDump.EndpointStoreDumpDataset {
private static final Logger logger = LoggerFactory.getLogger(CompiledSailEndpointStoreDump.class);
private final CompiledSail compiledSail;

public CompiledSailEndpointStoreDump(Path outputLocation, CompiledSail compiledSail) {
super(outputLocation);
this.compiledSail = compiledSail;
}

@Override
public void beforeMerge(EndpointStore store) throws IOException {
// dump the Lucene dataset
Lock lock = store.getLocksNotify().createLock("merge-dumplucene");
StringBuilder infoWriter = new StringBuilder();
Path out = outputLocation.resolve("lucene");
try {
int ukn = 0;
Set<String> names = new HashSet<>();
for (LuceneSail ls : compiledSail.getLuceneSails()) {
if (!(ls.getLuceneIndex() instanceof LuceneIndex li)) {
logger.error("Can't dump index {}", ls.getLuceneIndex());
continue;
}
li.getIndexWriter().flush();

// find a unique name for the lucene dir
String configDir = ls.getParameter("lucenedir");
String name;
if (configDir == null || configDir.isEmpty()) {
name = "ukn_" + ukn++;
} else {
name = Path.of(configDir).getFileName().toString();
}

String tname = name;
int dif = 1;
while (!names.add(tname)) {
tname = name + "." + dif++;
}
Path outputDataset = out.resolve(tname);

// write dataset metadata
infoWriter.append("location=%s\noutput=%s\n---\n".formatted(configDir, outputDataset));

// clone the index
Files.createDirectories(outputDataset);
Directory lsdir = li.getDirectory();
try (FSDirectory dir = FSDirectory.open(outputDataset)) {
for (String file : lsdir.listAll()) {
dir.copyFrom(lsdir, file, file, new IOContext());
}
}
}
Files.writeString(outputLocation.resolve("lucene.info"), infoWriter.toString());
} finally {
lock.release();
}
super.beforeMerge(store);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,17 @@ public boolean hasLuceneSail() {
return compiledSail.hasLuceneSail();
}

/**
* Dump the store, this method will call
* {@link CompiledSail#dumpStore(Path)}
*
* @param location location
* @return if the dump was started
*/
public boolean askDump(Path location) {
return compiledSail.dumpStore(location);
}

/**
* execute a sparql query
*
Expand Down
Loading

0 comments on commit 0c700fc

Please sign in to comment.