Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

imitate the real code of lcg-cp #8

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
183 changes: 166 additions & 17 deletions src/Gate.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import org.simgrid.msg.Msg;

import java.util.Vector;
import java.util.concurrent.ThreadLocalRandom;

import org.simgrid.msg.Host;
import org.simgrid.msg.Process;
import org.simgrid.msg.MsgException;
import org.simgrid.msg.Mutex;

public class Gate extends Job {
private long simulateForNsec(long nSec) throws HostFailureException {
Expand Down Expand Up @@ -42,7 +44,144 @@ private void disconnect() {
// Use of some simulation magic here, every worker knows the mailbox of the VIP server
GateMessage.sendTo("VIPServer", "GATE_DISCONNECT", 0);
}

private String cp_dynamic(String logicalFileName, String localFileName, LFC lfc, SE closeSE) throws HostFailureException {

int i=0;
String info = "";
String SE_file = "";
double min_bandwidth = 4e6; // minimum bandwidth acceptable is 4Mbps
double timeout;
int num_retry = 3; // maximum retry of lcg-rep

Timer duration = new Timer();
Timer cr_timer = new Timer();
double cr_duration = 0.0;

Vector<SE> replicaLocations;
Msg.info("Dynamic lcg-cp '" + logicalFileName + "' to '" + localFileName + "' using '" + lfc.getName() + "'");
// get Logical File from the LFC
LogicalFile file = lfc.getLogicalFile(logicalFileName);
Msg.info("LFC '" + lfc.getName() + "' replied: " + file.toString());

timeout = (file.getSize()*8)/ min_bandwidth + 1;
duration.start();
// get all replicas locations by lcg-lr
replicaLocations = LCG.lr(lfc,logicalFileName);

if(replicaLocations.contains(closeSE)) {
info = LCG.cp(logicalFileName, localFileName, closeSE);
}
else{
SE_file = closeSE.getName() +"_" + logicalFileName;
// if some job has already created lock for replicating file in SE
// check the status of file in SE
if(lfc.getTransferLock(SE_file) == 1){
int status = lfc.getReplicaInfo(SE_file);
switch (status) {
case 0: // file is replicating to closeSE in progress

Msg.info("case0 : replicate file into closeSE in progress");
// Wait timeout before normal lcg-cp
while(status != 1 && i< timeout){
Process.sleep(2000);
status = lfc.getReplicaInfo(SE_file);
i = i+2;
Msg.debug("Retry the file whether exists in closeSE");
}
Msg.debug("Timeout, no more retry");
if(status == 1){
Msg.debug("Succeed to replicate file in CloseSE");
info = LCG.cp(logicalFileName, localFileName,closeSE);

}
else{
Msg.debug("File still not available in CloseSE, do a normal lcg-cp");
info = LCG.cp1(logicalFileName, localFileName,lfc);
}
break;
case 1: // file exists in closeSE
Msg.debug("case1: copy from closeSE");
info = LCG.cp(logicalFileName, localFileName,closeSE);
break;
case 2: // fail to replicate file in closeSE

// need to simulate some transfer errors,
// otherwise this case will never be reached
info = LCG.cp1(logicalFileName, localFileName,lfc);
break;
default:
break;
}
}
else{
int flag;
flag = lfc.createTransferLock(SE_file);
// first job will try to replicate file into closeSE
if(flag == 0){
SE src = null;
boolean flag_lcg_cp_cr;
GfalFile gf = new GfalFile(file);
lfc.fillsurls(gf);

for(int j = 0 ; j < Math.min(gf.getNbreplicas(), num_retry); j++){
SE se = gf.getCurrentReplica();
Msg.info("Lcg-cp for :"+ se.getName());
flag_lcg_cp_cr = LCG.cp(logicalFileName, localFileName, se, lfc, timeout);

cr_timer.start();
LCG.cr(localFileName, file.getSize() ,logicalFileName, closeSE, lfc);
cr_timer.stop();
cr_duration = cr_timer.getValue();

if(flag_lcg_cp_cr){
src = se;
break;
}
gf.NextReplica();
}
if(src == null){
// If no SE response before timeout
// We consider that we failed to lcg_cp_cr file into closeSE
// update status to 2
lfc.modifyReplicaInfo(SE_file, 2);
// then all jobs will do a normal lcg-cp
// No more retry of lcg_cp_cr
info = LCG.cp1(logicalFileName, localFileName,lfc);
}
else{
Msg.info("lcg-rep complete, "+ "SE used is :"+ src.getName());
//if lcg_cp_cr succeeds, update status of SE_FILE to 1
lfc.modifyReplicaInfo(SE_file, 1);
info = closeSE + "," +file.getSize() + "," + 0;
}
}
else{
int status = lfc.getReplicaInfo(SE_file);
while(status != 1 && i< timeout){
Process.sleep(2000);
status = lfc.getReplicaInfo(SE_file);
i = i+2;
Msg.debug("Retry the file whether exists in closeSE");
}
if(status !=1){
Msg.debug("do a normal lcg-cp");
info = LCG.cp1(logicalFileName, localFileName,lfc);
}
else info = LCG.cp(logicalFileName, localFileName,closeSE);

}
}
}
duration.stop();
String[] log = info.split(",");
Msg.info("cp_dynamic complete!");
double dyn_duration = duration.getValue() - cr_duration;

return log[0] + "," + log[1] + "," + dyn_duration;

}

public Gate(Host host, String name, String[] args) {
super(host, name, args);
}
Expand All @@ -55,8 +194,8 @@ public void main(String[] args) throws MsgException {
long uploadFileSize = 0;
String transferInfo;
Vector<SE> actualSources = new Vector<SE>();

int jobId = (args.length > 0 ? Integer.valueOf(args[0]).intValue() : 1);

long executionTime = (args.length > 1 ? 1000 * Long.valueOf(args[1]).longValue() : VIPSimulator.sosTime);
if (VIPSimulator.version == 1) {
uploadFileSize = VIPSimulator.fixedFileSize;
Expand Down Expand Up @@ -103,37 +242,47 @@ public void main(String[] args) throws MsgException {
Msg.error("Some input files are missing. Exit!");
System.exit(1);
}
Vector<SE> replicaLocations;

for (String logicalFileName: VIPSimulator.gateInputFileNames){
Timer lrDuration = new Timer();
//Gate job first do lcg-lr to check whether input file exists in closeSE
lrDuration.start();
replicaLocations = LCG.lr(VIPServer.getDefaultLFC(),logicalFileName);
Vector<SE> replicaLocations = LCG.lr(VIPServer.getDefaultLFC(),logicalFileName);
lrDuration.stop();

double lr_time = lrDuration.getValue();

if (VIPSimulator.version == 2){
// if closeSE found, lcg-cp with closeSE, otherwise normal lcg-cp
// if(replicaLocations.contains(getCloseSE()))
// transferInfo = LCG.cp(logicalFileName,
// "/scratch/"+logicalFileName.substring(logicalFileName.lastIndexOf("/")+1),
// getCloseSE());
// else
// transferInfo = LCG.cp(logicalFileName,
// "/scratch/"+logicalFileName.substring(logicalFileName.lastIndexOf("/")+1),
// VIPServer.getDefaultLFC());


if(VIPSimulator.algorithm.equals("lcg_cp")){
// lcg-cp in production
transferInfo = LCG.cp1(logicalFileName,
"/scratch/"+logicalFileName.substring(logicalFileName.lastIndexOf("/")+1),
VIPServer.getDefaultLFC());

}
else{
// dynamic replication
if(logicalFileName.contains("release") || logicalFileName.contains("opengate")){
transferInfo = cp_dynamic(logicalFileName,
"/scratch/"+logicalFileName.substring(logicalFileName.lastIndexOf("/")+1),
VIPServer.getDefaultLFC(), getCloseSE());
// lr is already included in cp_dynamic
lr_time = 0.0;
}
else{
transferInfo = LCG.cp1(logicalFileName,
"/scratch/"+logicalFileName.substring(logicalFileName.lastIndexOf("/")+1),
VIPServer.getDefaultLFC());

}
}

} else {
transferInfo = LCG.cp(logicalFileName,
"/scratch/"+logicalFileName.substring(logicalFileName.lastIndexOf("/")+1),
(SE) actualSources.remove(0));
}

logDownload(jobId, transferInfo, lrDuration.getValue(), "gate");
// Write download info to logs
logDownload(jobId, transferInfo, lr_time , "gate");
}
downloadTime.stop();
}
Expand Down
7 changes: 5 additions & 2 deletions src/GfalFile.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Vector;

/*
Expand All @@ -10,7 +13,7 @@
*/

public class GfalFile {
public Vector<SE> replicas;
public List<SE> replicas;
private long nbreplicas;
private long nb_current_replica;
private LogicalFile logicalFile;
Expand All @@ -20,7 +23,7 @@ public GfalFile(LogicalFile lf) {
this.logicalFile = new LogicalFile(lf.getName(),lf.getSize(),lf.getLocations());
this.nbreplicas= lf.getLocations().size();
this.nb_current_replica = 0;
this.replicas = new Vector<SE>();
this.replicas = new ArrayList<SE>(Collections.nCopies((int)this.nbreplicas, null));
}

public void NextReplica(){
Expand Down
17 changes: 17 additions & 0 deletions src/GridService.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package.
*/
import java.util.HashMap;
import java.util.Vector;

import org.simgrid.msg.Host;
import org.simgrid.msg.HostFailureException;
import org.simgrid.msg.Msg;
import org.simgrid.msg.Mutex;
import org.simgrid.msg.Process;
import org.simgrid.msg.Task;

Expand All @@ -23,6 +25,21 @@ public abstract class GridService extends Process {
// catalog: a vector of logical files
protected Vector<LogicalFile> catalog;

// A (SE_file, int) Hashmap to indicate whether the desired file exist in a SE
// For example: (SE1_file, 0) means file is replicating to SE1 in progress
// (SE1_file, 1) means file exists in SE1
// (SE1_file, 2) means failure to copy file in SE1 during execution
public HashMap<String, Integer> replicas_info = new HashMap<String,Integer>();

// Mutex to modify the status in HashMap replicas_info
public HashMap<String, Mutex> transfer_locks = new HashMap<String, Mutex>();


// Global mutex for GridService to prevent several jobs
// to modify transfer_locks or replicas_info at same time
public Mutex grid_mutex = new Mutex();


protected String findAvailableMailbox(long retryAfter) {
while (true) {
for (Process listener : this.mailboxes) {
Expand Down
24 changes: 21 additions & 3 deletions src/LCG.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,41 @@ public static void cr(String localFileName, long localFileSize, String logicalFi
se.upload(logicalFileName,localFileSize);
Msg.info("SE '" + se.getName() + "' replied with an ACK");

// Register file into LFC
LogicalFile file = new LogicalFile(logicalFileName, localFileSize, se);

// Register file into LFC
Msg.info("Ask '" + lfc.getName() + "' to register " + file.toString());

lfc.register(file);

Msg.info("lcg-cr of '" + logicalFileName + "' on '" + lfc.getName() + "' completed");
}

// cp with timeout
public static boolean cp(String logicalFileName, String localFileName, SE src, LFC lfc, double timeout){

long fileSize=0;
Msg.info("lcg-cp with timeout'" + logicalFileName + "' using '" + src.getName() + "'");

fileSize = src.download_timeout(logicalFileName,timeout);
if(fileSize != 0){
Msg.info("lcg-cp of '" + logicalFileName + "' on '" +src.getName()+ "' completed");
return true;
}
else{
Msg.info("lcg-cp of '" + logicalFileName + "' on '" + src.getName()
+ "' passed timeout!!");
return false;
}
}

public static String cp1(String logicalFileName, String localFileName, LFC lfc) {
Timer duration = new Timer();
duration.start();
Msg.info("lcg-cp '" + logicalFileName + "' to '" + localFileName + "' using '" + lfc.getName() + "'");

// get Logical File from the LFC
LogicalFile file = lfc.getLogicalFileByName(logicalFileName);
LogicalFile file = lfc.getLogicalFile(logicalFileName);
GfalFile gf = new GfalFile(file);
lfc.fillsurls(gf);
Msg.info("LFC '" + lfc.getName() + "' replied: " + file.toString());
Expand Down Expand Up @@ -104,7 +122,7 @@ public static String cp(String logicalFileName, String localFileName, SE closeSE
duration.stop();
return closeSE + "," + fileSize + "," + duration.getValue();
}

public static Vector<String> ls(LFC lfc, String directoryName) {
Vector<String> results = new Vector<String>();

Expand Down
Loading