diff --git a/plugin/writer/streamwriter/src/main/java/com/wgzhao/addax/plugin/writer/streamwriter/StreamWriter.java b/plugin/writer/streamwriter/src/main/java/com/wgzhao/addax/plugin/writer/streamwriter/StreamWriter.java index 7aa73a41f..7f1093cbc 100644 --- a/plugin/writer/streamwriter/src/main/java/com/wgzhao/addax/plugin/writer/streamwriter/StreamWriter.java +++ b/plugin/writer/streamwriter/src/main/java/com/wgzhao/addax/plugin/writer/streamwriter/StreamWriter.java @@ -39,6 +39,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR; import static com.wgzhao.addax.common.spi.ErrorCode.ILLEGAL_VALUE; @@ -132,12 +133,6 @@ private void validateParameter(String path, String fileName) } } - @Override - public void prepare() - { - // - } - @Override public List split(int mandatoryNumber) { @@ -149,12 +144,6 @@ public List split(int mandatoryNumber) return writerSplitConfigs; } - @Override - public void post() - { - // - } - @Override public void destroy() { @@ -166,7 +155,6 @@ public static class Task extends Writer.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); - private static final String NULL_FLAG = "NULL"; private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n"); private String fieldDelimiter; @@ -192,21 +180,15 @@ public void init() this.fileName = writerSliceConfig.getString(StreamKey.FILE_NAME, null); this.recordNumBeforeSleep = writerSliceConfig.getLong(StreamKey.RECORD_NUM_BEFORE_SLEEP, 0); this.sleepTime = writerSliceConfig.getLong(StreamKey.SLEEP_TIME, 0); - this.nullFormat = writerSliceConfig.getString(StreamKey.NULL_FORMAT, NULL_FLAG); + this.nullFormat = writerSliceConfig.getString(StreamKey.NULL_FORMAT, StreamKey.NULL_FLAG); if (recordNumBeforeSleep < 0) { - throw AddaxException.asAddaxException(ILLEGAL_VALUE, "recordNumber 不能为负值"); + throw AddaxException.asAddaxException(ILLEGAL_VALUE, "recordNumber must be greater than 0"); } if (sleepTime < 0) { - throw AddaxException.asAddaxException(ILLEGAL_VALUE, "sleep 不能为负值"); + throw AddaxException.asAddaxException(ILLEGAL_VALUE, "sleep time must be greater than 0"); } } - @Override - public void prepare() - { - // - } - @Override public void startWrite(RecordReceiver recordReceiver) { @@ -238,14 +220,6 @@ private void writeToFile(RecordReceiver recordReceiver, String path, String file String fileFullPath = buildFilePath(path, fileName); LOG.info("write to file : [{}]", fileFullPath); File newFile = new File(fileFullPath); - try { - if (!newFile.createNewFile()) { - LOG.error("failed to create file {}", fileFullPath); - } - } - catch (IOException ioe) { - LOG.error("failed to create file {}", fileFullPath); - } try (BufferedWriter writer = new BufferedWriter( new OutputStreamWriter(new FileOutputStream(newFile, true), StandardCharsets.UTF_8))) { Record record; @@ -253,7 +227,8 @@ private void writeToFile(RecordReceiver recordReceiver, String path, String file while ((record = recordReceiver.getFromReader()) != null) { if (recordNumBeforeSleep > 0 && sleepTime > 0 && count == recordNumBeforeSleep) { LOG.info("StreamWriter start to sleep ... recordNumBeforeSleep={},sleepTime={}", recordNumBeforeSleep, sleepTime); - Thread.sleep(sleepTime * 1000L); + TimeUnit.SECONDS.sleep(sleepTime); + count=0; } writer.write(recordToString(record)); count++; @@ -265,12 +240,6 @@ private void writeToFile(RecordReceiver recordReceiver, String path, String file } } - @Override - public void post() - { - // - } - @Override public void destroy() {