Skip to content

Commit

Permalink
[update][plugin][streamwriter] Refactor code to enhance readability a…
Browse files Browse the repository at this point in the history
…nd efficiency

 1. Replace `Thread.sleep` with `TimeUnit.SECONDS.sleep` to avoid busy-waiting.
2. Fix issue where `count` was not reset after sleeping.
3. Apply minor code improvements for better maintainability.
  • Loading branch information
wgzhao committed Oct 13, 2024
1 parent 2b674cb commit 7d45b9e
Showing 1 changed file with 6 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR;
import static com.wgzhao.addax.common.spi.ErrorCode.ILLEGAL_VALUE;
Expand Down Expand Up @@ -132,12 +133,6 @@ private void validateParameter(String path, String fileName)
}
}

@Override
public void prepare()
{
//
}

@Override
public List<Configuration> split(int mandatoryNumber)
{
Expand All @@ -149,12 +144,6 @@ public List<Configuration> split(int mandatoryNumber)
return writerSplitConfigs;
}

@Override
public void post()
{
//
}

@Override
public void destroy()
{
Expand All @@ -166,7 +155,6 @@ public static class Task
extends Writer.Task
{
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private static final String NULL_FLAG = "NULL";
private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");

private String fieldDelimiter;
Expand All @@ -192,21 +180,15 @@ public void init()
this.fileName = writerSliceConfig.getString(StreamKey.FILE_NAME, null);
this.recordNumBeforeSleep = writerSliceConfig.getLong(StreamKey.RECORD_NUM_BEFORE_SLEEP, 0);
this.sleepTime = writerSliceConfig.getLong(StreamKey.SLEEP_TIME, 0);
this.nullFormat = writerSliceConfig.getString(StreamKey.NULL_FORMAT, NULL_FLAG);
this.nullFormat = writerSliceConfig.getString(StreamKey.NULL_FORMAT, StreamKey.NULL_FLAG);
if (recordNumBeforeSleep < 0) {
throw AddaxException.asAddaxException(ILLEGAL_VALUE, "recordNumber 不能为负值");
throw AddaxException.asAddaxException(ILLEGAL_VALUE, "recordNumber must be greater than 0");
}
if (sleepTime < 0) {
throw AddaxException.asAddaxException(ILLEGAL_VALUE, "sleep 不能为负值");
throw AddaxException.asAddaxException(ILLEGAL_VALUE, "sleep time must be greater than 0");
}
}

@Override
public void prepare()
{
//
}

@Override
public void startWrite(RecordReceiver recordReceiver)
{
Expand Down Expand Up @@ -238,22 +220,15 @@ private void writeToFile(RecordReceiver recordReceiver, String path, String file
String fileFullPath = buildFilePath(path, fileName);
LOG.info("write to file : [{}]", fileFullPath);
File newFile = new File(fileFullPath);
try {
if (!newFile.createNewFile()) {
LOG.error("failed to create file {}", fileFullPath);
}
}
catch (IOException ioe) {
LOG.error("failed to create file {}", fileFullPath);
}
try (BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(newFile, true), StandardCharsets.UTF_8))) {
Record record;
int count = 0;
while ((record = recordReceiver.getFromReader()) != null) {
if (recordNumBeforeSleep > 0 && sleepTime > 0 && count == recordNumBeforeSleep) {
LOG.info("StreamWriter start to sleep ... recordNumBeforeSleep={},sleepTime={}", recordNumBeforeSleep, sleepTime);
Thread.sleep(sleepTime * 1000L);
TimeUnit.SECONDS.sleep(sleepTime);
count=0;
}
writer.write(recordToString(record));
count++;
Expand All @@ -265,12 +240,6 @@ private void writeToFile(RecordReceiver recordReceiver, String path, String file
}
}

@Override
public void post()
{
//
}

@Override
public void destroy()
{
Expand Down

0 comments on commit 7d45b9e

Please sign in to comment.