Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class CmdCid extends CmdBase implements UsageAware {
@Override
void run() {
if( !args ) {
usage(List.of())
return
}
// setup the plugins system and load the secrets provider
Expand Down
41 changes: 22 additions & 19 deletions modules/nextflow/src/test/groovy/nextflow/cli/CmdCidTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import nextflow.data.cid.CidHistoryRecord
import nextflow.data.cid.CidStoreFactory
import nextflow.data.cid.model.Checksum
import nextflow.data.cid.model.Parameter
import nextflow.data.cid.model.TaskOutput
import nextflow.data.cid.model.DataOutput
import nextflow.data.cid.model.TaskRun
import nextflow.data.cid.model.WorkflowOutput
import nextflow.plugin.Plugins
import org.junit.Rule
import spock.lang.Specification
Expand Down Expand Up @@ -77,7 +76,7 @@ class CmdCidTest extends Specification {
def launcher = Mock(Launcher){
getOptions() >> new CliOptions(config: [configFile.toString()])
}
def recordEntry = "${CidHistoryRecord.TIMESTAMP_FMT.format(date)}\trun_name\t${uniqueId}\tcid://123456\tcid://456789".toString()
def recordEntry = "${CidHistoryRecord.TIMESTAMP_FMT.format(date)}\trun_name\t${uniqueId}\tcid://123456".toString()
historyFile.text = recordEntry
when:
def cidCmd = new CmdCid(launcher: launcher, args: ["log"])
Expand Down Expand Up @@ -136,10 +135,10 @@ class CmdCidTest extends Specification {
def launcher = Mock(Launcher){
getOptions() >> new CliOptions(config: [configFile.toString()])
}
def time = Instant.ofEpochMilli(123456789).toString()
def time = Instant.ofEpochMilli(123456789)
def encoder = new CidEncoder().withPrettyPrint(true)
def entry = new WorkflowOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://123987/file.bam", 1234, time, time, null)
def entry = new DataOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://123987/file.bam","cid://123987/", 1234, time, time, null)
def jsonSer = encoder.encode(entry)
def expectedOutput = jsonSer
cidFile.text = jsonSer
Expand Down Expand Up @@ -208,22 +207,26 @@ class CmdCidTest extends Specification {
Files.createDirectories(cidFile4.parent)
Files.createDirectories(cidFile5.parent)
def encoder = new CidEncoder()
def time = Instant.ofEpochMilli(123456789).toString()
def entry = new WorkflowOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://123987/file.bam", 1234, time, time, null)
def time = Instant.ofEpochMilli(123456789)
def entry = new DataOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://123987/file.bam", "cid://45678", 1234, time, time, null)
cidFile.text = encoder.encode(entry)
entry = new TaskOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://123987", 1234, time, time, null)
entry = new DataOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://123987", "cid://123987", 1234, time, time, null)
cidFile2.text = encoder.encode(entry)
entry = new TaskRun("u345-2346-1stw2", "foo", new Checksum("abcde2345","nextflow","standard"),
entry = new TaskRun("u345-2346-1stw2", "foo",
new Checksum("abcde2345","nextflow","standard"),
new Checksum("abfsc2375","nextflow","standard"),
[new Parameter( "ValueInParam", "sample_id","ggal_gut"),
new Parameter("FileInParam","reads",["cid://45678/output.txt"])],
null, null, null, null, [:],[], null)
cidFile3.text = encoder.encode(entry)
entry = new TaskOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://45678", 1234, time, time, null)
entry = new DataOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://45678", "cid://45678", 1234, time, time, null)
cidFile4.text = encoder.encode(entry)
entry = new TaskRun("u345-2346-1stw2", "bar", new Checksum("abfs2556","nextflow","standard"),
entry = new TaskRun("u345-2346-1stw2", "bar",
new Checksum("abfs2556","nextflow","standard"),
new Checksum("abfsc2375","nextflow","standard"),
null,null, null, null, null, [:],[], null)
cidFile5.text = encoder.encode(entry)
final network = """flowchart BT
Expand Down Expand Up @@ -275,14 +278,14 @@ class CmdCidTest extends Specification {
getOptions() >> new CliOptions(config: [configFile.toString()])
}
def encoder = new CidEncoder().withPrettyPrint(true)
def time = Instant.ofEpochMilli(123456789).toString()
def entry = new WorkflowOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://123987/file.bam", 1234, time, time, null)
def time = Instant.ofEpochMilli(123456789)
def entry = new DataOutput("path/to/file",new Checksum("45372qe","nextflow","standard"),
"cid://123987/file.bam", "cid://123987/", 1234, time, time, null)
def jsonSer = encoder.encode(entry)
def expectedOutput = jsonSer
cidFile.text = jsonSer
when:
def cidCmd = new CmdCid(launcher: launcher, args: ["show", "cid:///?type=WorkflowOutput"])
def cidCmd = new CmdCid(launcher: launcher, args: ["show", "cid:///?type=DataOutput"])
cidCmd.run()
def stdout = capture
.toString()
Expand Down
69 changes: 18 additions & 51 deletions modules/nf-cid/src/main/nextflow/data/cid/CidHistoryFile.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@
*/
package nextflow.data.cid

import groovy.util.logging.Slf4j

import java.nio.channels.FileChannel
import java.nio.channels.FileLock
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.extension.FilesEx
/**
* File to store a history of the workflow executions and their corresponding CIDs
*
* @author Jorge Ejarque <[email protected]>
*/
@Slf4j
@CompileStatic
class CidHistoryFile implements CidHistoryLog {

Path path
Expand All @@ -38,13 +40,13 @@ class CidHistoryFile implements CidHistoryLog {
this.path = file
}

void write(String name, UUID key, String runCid, String resultsCid, Date date = null) {
void write(String name, UUID key, String runCid, Date date = null) {
assert key

withFileLock {
def timestamp = date ?: new Date()
log.trace("Writting record for $key in CID history file $this")
path << new CidHistoryRecord(timestamp, name, key, runCid, resultsCid).toString() << '\n'
log.trace("Writting record for $key in CID history file ${FilesEx.toUriString(this.path)}")
path << new CidHistoryRecord(timestamp, name, key, runCid).toString() << '\n'
}
}

Expand All @@ -55,18 +57,7 @@ class CidHistoryFile implements CidHistoryLog {
withFileLock { updateRunCid0(sessionId, runCid) }
}
catch (Throwable e) {
log.warn "Can't update CID history file: $this", e.message
}
}

void updateResultsCid(UUID sessionId, String resultsCid) {
assert sessionId

try {
withFileLock { updateResultsCid0(sessionId, resultsCid) }
}
catch (Throwable e) {
log.warn "Can't update CID history file: $this", e.message
log.warn "Can't update CID history file: ${FilesEx.toUriString(this.path)}", e.message
}
}

Expand All @@ -76,7 +67,7 @@ class CidHistoryFile implements CidHistoryLog {
withFileLock { this.path.eachLine {list.add(CidHistoryRecord.parse(it)) } }
}
catch (Throwable e) {
log.warn "Can't read records from CID history file: $this", e.message
log.warn "Can't read records from CID history file: ${FilesEx.toUriString(this.path)}", e.message
}
return list
}
Expand All @@ -91,7 +82,7 @@ class CidHistoryFile implements CidHistoryLog {
return current
}
}
log.warn("Can't find session $id in CID history file $this")
log.warn("Can't find session $id in CID history file ${FilesEx.toUriString(this.path)}")
return null
}

Expand All @@ -100,43 +91,19 @@ class CidHistoryFile implements CidHistoryLog {
assert id
def newHistory = new StringBuilder()

this.path.readLines().each { line ->
try {
def current = line ? CidHistoryRecord.parse(line) : null
if (current.sessionId == id) {
log.trace("Updating record for $id in CID history file $this")
final newRecord = new CidHistoryRecord(current.timestamp, current.runName, current.sessionId, runCid, current.resultsCid)
newHistory << newRecord.toString() << '\n'
} else {
newHistory << line << '\n'
}
}
catch (IllegalArgumentException e) {
log.warn("Can't read CID history file: $this", e.message)
}
}

// rewrite the history content
this.path.setText(newHistory.toString())
}

private void updateResultsCid0(UUID id, String resultsCid) {
assert id
def newHistory = new StringBuilder()

this.path.readLines().each { line ->
for( String line : this.path.readLines()) {
try {
def current = line ? CidHistoryRecord.parse(line) : null
if (current.sessionId == id) {
log.trace("Updating record for $id in CID history file $this")
final newRecord = new CidHistoryRecord(current.timestamp, current.runName, current.sessionId, current.runCid, resultsCid)
log.trace("Updating record for $id in CID history file ${FilesEx.toUriString(this.path)}")
final newRecord = new CidHistoryRecord(current.timestamp, current.runName, current.sessionId, runCid)
newHistory << newRecord.toString() << '\n'
} else {
newHistory << line << '\n'
}
}
catch (IllegalArgumentException e) {
log.warn("Can't read CID history file: $this", e.message)
log.warn("Can't read CID history file: ${FilesEx.toUriString(this.path)}", e.message)
}
}

Expand All @@ -161,11 +128,11 @@ class CidHistoryFile implements CidHistoryLog {
try {
fos = FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.CREATE)
} catch (UnsupportedOperationException e){
log.warn("File System Provider for ${this.path} do not support file locking. Continuing without lock...")
log.warn("File System Provider for ${this.path} do not support file locking - Attemting without locking", e)
return action.call()
}
if (!fos){
throw new IllegalStateException("Can't create a file channel for ${this.path.toAbsolutePath()}")
throw new IllegalStateException("Can't create a file channel for ${FilesEx.toUriString(this.path)}")
}
try {
Throwable error
Expand All @@ -178,7 +145,7 @@ class CidHistoryFile implements CidHistoryLog {
if (System.currentTimeMillis() - ts < 1_000)
sleep rnd.nextInt(75)
else {
error = new IllegalStateException("Can't lock file: ${this.path.toAbsolutePath()} -- Nextflow needs to run in a file system that supports file locks")
error = new IllegalStateException("Can't lock file: ${FilesEx.toUriString(this.path)} - Nextflow needs to run in a file system that supports file locks")
break
}
}
Expand All @@ -200,4 +167,4 @@ class CidHistoryFile implements CidHistoryLog {
file.delete()
}
}
}
}
10 changes: 1 addition & 9 deletions modules/nf-cid/src/main/nextflow/data/cid/CidHistoryLog.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ interface CidHistoryLog {
* @param runCid Workflow run CID.
* @param resultsCid Workflow results CID.
*/
void write(String name, UUID sessionId, String runCid, String resultsCid)
void write(String name, UUID sessionId, String runCid)

/**
* Updates the run CID for a given session ID.
Expand All @@ -40,14 +40,6 @@ interface CidHistoryLog {
*/
void updateRunCid(UUID sessionId, String runCid)

/**
* Updates the results CID for a given session ID.
*
* @param sessionId Workflow session ID.
* @param resultsCid Workflow results CID.
*/
void updateResultsCid(UUID sessionId, String resultsCid)

/**
* Get the store records in the CidHistoryLog.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,12 @@ class CidHistoryRecord {
final String runName
final UUID sessionId
final String runCid
final String resultsCid

CidHistoryRecord(Date timestamp, String name, UUID sessionId, String runCid, String resultsCid = null) {
CidHistoryRecord(Date timestamp, String name, UUID sessionId, String runCid) {
this.timestamp = timestamp
this.runName = name
this.sessionId = sessionId
this.runCid = runCid
this.resultsCid = resultsCid
}

CidHistoryRecord(UUID sessionId, String name = null) {
Expand All @@ -58,7 +56,6 @@ class CidHistoryRecord {
line << (runName ?: '-')
line << (sessionId.toString())
line << (runCid ?: '-')
line << (resultsCid ?: '-')
}

@Override
Expand All @@ -71,8 +68,8 @@ class CidHistoryRecord {
if (cols.size() == 2)
return new CidHistoryRecord(UUID.fromString(cols[0]))

if (cols.size() == 5) {
return new CidHistoryRecord(TIMESTAMP_FMT.parse(cols[0]), cols[1], UUID.fromString(cols[2]), cols[3], cols[4])
if (cols.size() == 4) {
return new CidHistoryRecord(TIMESTAMP_FMT.parse(cols[0]), cols[1], UUID.fromString(cols[2]), cols[3])
}

throw new IllegalArgumentException("Not a valid history entry: `$line`")
Expand Down
Loading