From a525b91342dd421cb55f6863a5800667369ce91a Mon Sep 17 00:00:00 2001 From: yinbenrong <505484338@qq.com> Date: Thu, 16 Oct 2025 16:52:39 +0800 Subject: [PATCH 1/2] =?UTF-8?q?checkpoint=E5=88=B7=E6=96=B0kerbors?= =?UTF-8?q?=E4=B8=80=E4=B8=AA=E8=BF=87=E6=9C=9F=E5=91=A8=E6=9C=9F=E5=88=B7?= =?UTF-8?q?=E6=96=B02=E6=AC=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../checkpoint/storage/hdfs/HdfsStorage.java | 577 +++++++++++------- 1 file changed, 350 insertions(+), 227 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java index ce917728078..19b4b742f4a 100644 --- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java +++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java @@ -20,14 +20,13 @@ package org.apache.seatunnel.engine.checkpoint.storage.hdfs; -import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils; - import org.apache.seatunnel.engine.checkpoint.storage.PipelineState; import org.apache.seatunnel.engine.checkpoint.storage.api.AbstractCheckpointStorage; import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException; import org.apache.seatunnel.engine.checkpoint.storage.hdfs.common.AbstractConfiguration; import org.apache.seatunnel.engine.checkpoint.storage.hdfs.common.FileConfiguration; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -40,37 +39,91 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.seatunnel.engine.checkpoint.storage.constants.StorageConstants.STORAGE_NAME_SPACE; @Slf4j public class HdfsStorage extends AbstractCheckpointStorage { + public FileSystem fs; private static final String STORAGE_TMP_SUFFIX = "tmp"; private static final String STORAGE_TYPE_KEY = "storage.type"; + private final Map initConfiguration; // 初始配置 + private final ReentrantReadWriteLock fsLock = new ReentrantReadWriteLock(); // 读写锁保护 fs + private Integer ticketLifetime = 900; //kerberos票据过期时间 + public HdfsStorage(Map configuration) throws CheckpointStorageException { - this.initStorage(configuration); + // 深拷贝初始配置 + this.initConfiguration = deepCopy(configuration); + this.ticketLifetime=Integer.valueOf(StringUtils.defaultString(configuration.get("seatunnel.hadoop.dfs.kerberos.ticket.lifetime"),"900")); + // 初次初始化 + initStorage(configuration); + // 启动周期刷新任务 + startPeriodicInit(); } - @Override + /** + * 初始化 FileSystem + */ public void initStorage(Map configuration) throws CheckpointStorageException { - if (StringUtils.isNotBlank(configuration.get(STORAGE_NAME_SPACE))) { - setStorageNameSpace(configuration.get(STORAGE_NAME_SPACE)); - configuration.remove(STORAGE_NAME_SPACE); + Map configurationCopy = deepCopy(configuration); // 深拷贝配置 + + if (StringUtils.isNotBlank(configurationCopy.get(STORAGE_NAME_SPACE))) { + setStorageNameSpace(configurationCopy.get(STORAGE_NAME_SPACE)); + configurationCopy.remove(STORAGE_NAME_SPACE); } - Configuration hadoopConf = getConfiguration(configuration); + + Configuration hadoopConf = getConfiguration(configurationCopy); + + fsLock.writeLock().lock(); // 写锁:阻塞所有正在用 fs 的读线程 try { - fs = FileSystem.get(hadoopConf); + FileSystem newFs = FileSystem.get(hadoopConf); + if (fs != null) { + try { + fs.close(); + log.info("Old FileSystem closed."); + } catch (IOException e) { + log.warn("Old FileSystem close failed", e); + } + } + fs = newFs; + log.info("FileSystem refreshed."); } catch (IOException e) { throw new CheckpointStorageException("Failed to get file system", e); + } finally { + fsLock.writeLock().unlock(); } } + /** + * 深拷贝 Map + */ + private Map deepCopy(Map source) { + return new HashMap<>(source); + } + /** + * 启动周期性刷新 FileSystem + */ + private void startPeriodicInit() { + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + Integer delay = ticketLifetime / 2;//一个过期周期刷新2次 + scheduler.scheduleWithFixedDelay(() -> { + try { + initStorage(initConfiguration); + System.out.println("[HdfsStorage] Periodic fs refresh success."); + } catch (Exception e) { + System.err.println("[HdfsStorage] Periodic fs refresh failed: " + e.getMessage()); + e.printStackTrace(); + } + }, delay, delay, TimeUnit.SECONDS); + log.info("[HdfsStorage] Periodic fs refresh scheduled. refresh time delay ="+delay); + } private Configuration getConfiguration(Map config) throws CheckpointStorageException { @@ -84,271 +137,334 @@ private Configuration getConfiguration(Map config) @Override public String storeCheckPoint(PipelineState state) throws CheckpointStorageException { - byte[] datas; + fsLock.readLock().lock(); try { - datas = serializeCheckPointData(state); - } catch (IOException e) { - throw new CheckpointStorageException( - String.format("Failed to serialize checkpoint data, state: %s", state), e); - } - Path filePath = - new Path( - getStorageParentDirectory() - + state.getJobId() - + "/" - + getCheckPointName(state)); - - Path tmpFilePath = - new Path( - getStorageParentDirectory() - + state.getJobId() - + "/" - + getCheckPointName(state) - + STORAGE_TMP_SUFFIX); - try (FSDataOutputStream out = fs.create(tmpFilePath, false)) { - out.write(datas); - } catch (IOException e) { - throw new CheckpointStorageException( - String.format( - "Failed to write checkpoint data, file: %s, state: %s", - tmpFilePath, state), - e); - } - try { - boolean success = fs.rename(tmpFilePath, filePath); - if (!success) { - throw new CheckpointStorageException("Failed to rename tmp file to final file"); + byte[] datas; + try { + datas = serializeCheckPointData(state); + } catch (IOException e) { + throw new CheckpointStorageException( + String.format("Failed to serialize checkpoint data, state: %s", state), e); } + Path filePath = + new Path( + getStorageParentDirectory() + + state.getJobId() + + "/" + + getCheckPointName(state)); - } catch (IOException e) { - throw new CheckpointStorageException("Failed to rename tmp file to final file"); - } finally { + Path tmpFilePath = + new Path( + getStorageParentDirectory() + + state.getJobId() + + "/" + + getCheckPointName(state) + + STORAGE_TMP_SUFFIX); + try (FSDataOutputStream out = fs.create(tmpFilePath, false)) { + out.write(datas); + } catch (IOException e) { + throw new CheckpointStorageException( + String.format( + "Failed to write checkpoint data, file: %s, state: %s", + tmpFilePath, state), + e); + } try { - // clean up tmp file, if still lying around - if (fs.exists(tmpFilePath)) { - fs.delete(tmpFilePath, false); + boolean success = fs.rename(tmpFilePath, filePath); + if (!success) { + throw new CheckpointStorageException("Failed to rename tmp file to final file"); + } + + } catch (IOException e) { + throw new CheckpointStorageException("Failed to rename tmp file to final file"); + } finally { + try { + // clean up tmp file, if still lying around + if (fs.exists(tmpFilePath)) { + fs.delete(tmpFilePath, false); + } + } catch (IOException ioe) { + log.error("Failed to delete tmp file", ioe); } - } catch (IOException ioe) { - log.error("Failed to delete tmp file", ioe); } - } - return filePath.getName(); + return filePath.getName(); + } finally { + fsLock.readLock().unlock(); + } } @Override public List getAllCheckpoints(String jobId) throws CheckpointStorageException { - String path = getStorageParentDirectory() + jobId; - List fileNames = getFileNames(path); - if (fileNames.isEmpty()) { - log.info("No checkpoint found for this job, the job id is: " + jobId); - return new ArrayList<>(); - } - List states = new ArrayList<>(); - fileNames.forEach( - file -> { - try { - states.add(readPipelineState(file, jobId)); - } catch (CheckpointStorageException e) { - log.error("Failed to read checkpoint data from file: " + file, e); - } - }); - if (states.isEmpty()) { - throw new CheckpointStorageException( - "No checkpoint found for job, job id is: " + jobId); + fsLock.readLock().lock(); + try { + String path = getStorageParentDirectory() + jobId; + List fileNames = getFileNames(path); + if (fileNames.isEmpty()) { + log.info("No checkpoint found for this job, the job id is: " + jobId); + return new ArrayList<>(); + } + List states = new ArrayList<>(); + fileNames.forEach( + file -> { + try { + states.add(readPipelineState(file, jobId)); + } catch (CheckpointStorageException e) { + log.error("Failed to read checkpoint data from file: " + file, e); + } + }); + if (states.isEmpty()) { + throw new CheckpointStorageException( + "No checkpoint found for job, job id is: " + jobId); + } + return states; + } finally { + fsLock.readLock().unlock(); } - return states; } @Override public List getLatestCheckpoint(String jobId) throws CheckpointStorageException { - String path = getStorageParentDirectory() + jobId; - List fileNames = getFileNames(path); - if (fileNames.isEmpty()) { - log.info("No checkpoint found for this job, the job id is: " + jobId); - return new ArrayList<>(); - } - Set latestPipelineNames = getLatestPipelineNames(fileNames); - List latestPipelineStates = new ArrayList<>(); - latestPipelineNames.forEach( - fileName -> { - try { - latestPipelineStates.add(readPipelineState(fileName, jobId)); - } catch (CheckpointStorageException e) { - log.error("Failed to read pipeline state for file: {}", fileName, e); - } - }); + fsLock.readLock().lock(); + try { + String path = getStorageParentDirectory() + jobId; + List fileNames = getFileNames(path); + if (fileNames.isEmpty()) { + log.info("No checkpoint found for this job, the job id is: " + jobId); + return new ArrayList<>(); + } + Set latestPipelineNames = getLatestPipelineNames(fileNames); + List latestPipelineStates = new ArrayList<>(); + latestPipelineNames.forEach( + fileName -> { + try { + latestPipelineStates.add(readPipelineState(fileName, jobId)); + } catch (CheckpointStorageException e) { + log.error("Failed to read pipeline state for file: {}", fileName, e); + } + }); - if (latestPipelineStates.isEmpty()) { - log.info("No checkpoint found for this job, the job id:{} ", jobId); + if (latestPipelineStates.isEmpty()) { + log.info("No checkpoint found for this job, the job id:{} ", jobId); + } + return latestPipelineStates; + } finally { + fsLock.readLock().unlock(); } - return latestPipelineStates; + } @Override public PipelineState getLatestCheckpointByJobIdAndPipelineId(String jobId, String pipelineId) throws CheckpointStorageException { - String path = getStorageParentDirectory() + jobId; - List fileNames = getFileNames(path); - if (fileNames.isEmpty()) { - log.info("No checkpoint found for job, job id is: " + jobId); - return null; - } + fsLock.readLock().lock(); + try { + String path = getStorageParentDirectory() + jobId; + List fileNames = getFileNames(path); + if (fileNames.isEmpty()) { + log.info("No checkpoint found for job, job id is: " + jobId); + return null; + } - String latestFileName = - getLatestCheckpointFileNameByJobIdAndPipelineId(fileNames, pipelineId); - if (latestFileName == null) { - log.info( - "No checkpoint found for this job, the job id is: " - + jobId - + ", pipeline id is: " - + pipelineId); - return null; + String latestFileName = + getLatestCheckpointFileNameByJobIdAndPipelineId(fileNames, pipelineId); + if (latestFileName == null) { + log.info( + "No checkpoint found for this job, the job id is: " + + jobId + + ", pipeline id is: " + + pipelineId); + return null; + } + return readPipelineState(latestFileName, jobId); + } finally { + fsLock.readLock().unlock(); } - return readPipelineState(latestFileName, jobId); + } @Override public List getCheckpointsByJobIdAndPipelineId(String jobId, String pipelineId) throws CheckpointStorageException { - String path = getStorageParentDirectory() + jobId; - List fileNames = getFileNames(path); - if (fileNames.isEmpty()) { - log.info("No checkpoint found for this job, the job id is: " + jobId); - return new ArrayList<>(); - } + fsLock.readLock().lock(); + try { + String path = getStorageParentDirectory() + jobId; + List fileNames = getFileNames(path); + if (fileNames.isEmpty()) { + log.info("No checkpoint found for this job, the job id is: " + jobId); + return new ArrayList<>(); + } - List pipelineStates = new ArrayList<>(); - fileNames.forEach( - file -> { - String filePipelineId = getPipelineIdByFileName(file); - if (pipelineId.equals(filePipelineId)) { - try { - pipelineStates.add(readPipelineState(file, jobId)); - } catch (Exception e) { - log.error("Failed to read checkpoint data from file " + file, e); + List pipelineStates = new ArrayList<>(); + fileNames.forEach( + file -> { + String filePipelineId = getPipelineIdByFileName(file); + if (pipelineId.equals(filePipelineId)) { + try { + pipelineStates.add(readPipelineState(file, jobId)); + } catch (Exception e) { + log.error("Failed to read checkpoint data from file " + file, e); + } } - } - }); - return pipelineStates; + }); + return pipelineStates; + } finally { + fsLock.readLock().unlock(); + } + + } @Override public void deleteCheckpoint(String jobId) { - String jobPath = getStorageParentDirectory() + jobId; + fsLock.readLock().lock(); try { - fs.delete(new Path(jobPath), true); - } catch (IOException e) { - log.warn("Failed to delete checkpoint for job {}", jobId, e); + String jobPath = getStorageParentDirectory() + jobId; + try { + fs.delete(new Path(jobPath), true); + } catch (IOException e) { + log.warn("Failed to delete checkpoint for job {}", jobId, e); + } + } finally { + fsLock.readLock().unlock(); } + } @Override public PipelineState getCheckpoint(String jobId, String pipelineId, String checkpointId) throws CheckpointStorageException { - String path = getStorageParentDirectory() + jobId; - List fileNames = getFileNames(path); - if (fileNames.isEmpty()) { - log.info("No checkpoint found for this job, the job id is: " + jobId); - return null; - } - for (String fileName : fileNames) { - if (pipelineId.equals(getPipelineIdByFileName(fileName)) - && checkpointId.equals(getCheckpointIdByFileName(fileName))) { - try { - return readPipelineState(fileName, jobId); - } catch (Exception e) { - log.error( - "Failed to get checkpoint {} for job {}, pipeline {}", - checkpointId, - jobId, - pipelineId, - e); + fsLock.readLock().lock(); + try { + + String path = getStorageParentDirectory() + jobId; + List fileNames = getFileNames(path); + if (fileNames.isEmpty()) { + log.info("No checkpoint found for this job, the job id is: " + jobId); + return null; + } + for (String fileName : fileNames) { + if (pipelineId.equals(getPipelineIdByFileName(fileName)) + && checkpointId.equals(getCheckpointIdByFileName(fileName))) { + try { + return readPipelineState(fileName, jobId); + } catch (Exception e) { + log.error( + "Failed to get checkpoint {} for job {}, pipeline {}", + checkpointId, + jobId, + pipelineId, + e); + } } } + throw new CheckpointStorageException( + String.format( + "No checkpoint found, job(%s), pipeline(%s), checkpoint(%s)", + jobId, pipelineId, checkpointId)); + } finally { + fsLock.readLock().unlock(); } - throw new CheckpointStorageException( - String.format( - "No checkpoint found, job(%s), pipeline(%s), checkpoint(%s)", - jobId, pipelineId, checkpointId)); + } @Override public synchronized void deleteCheckpoint(String jobId, String pipelineId, String checkpointId) throws CheckpointStorageException { - String path = getStorageParentDirectory() + jobId; - List fileNames = getFileNames(path); - if (fileNames.isEmpty()) { - throw new CheckpointStorageException( - "No checkpoint found for job, job id is: " + jobId); - } - fileNames.forEach( - fileName -> { - if (pipelineId.equals(getPipelineIdByFileName(fileName)) - && checkpointId.equals(getCheckpointIdByFileName(fileName))) { - try { - fs.delete( - new Path(path + DEFAULT_CHECKPOINT_FILE_PATH_SPLIT + fileName), - false); - } catch (Exception e) { - log.error( - "Failed to delete checkpoint {} for job {}, pipeline {}", - checkpointId, - jobId, - pipelineId, - e); + fsLock.readLock().lock(); + try { + String path = getStorageParentDirectory() + jobId; + List fileNames = getFileNames(path); + if (fileNames.isEmpty()) { + throw new CheckpointStorageException( + "No checkpoint found for job, job id is: " + jobId); + } + fileNames.forEach( + fileName -> { + if (pipelineId.equals(getPipelineIdByFileName(fileName)) + && checkpointId.equals(getCheckpointIdByFileName(fileName))) { + try { + fs.delete( + new Path(path + DEFAULT_CHECKPOINT_FILE_PATH_SPLIT + fileName), + false); + } catch (Exception e) { + log.error( + "Failed to delete checkpoint {} for job {}, pipeline {}", + checkpointId, + jobId, + pipelineId, + e); + } } - } - }); + }); + } finally { + fsLock.readLock().unlock(); + } + + } @Override public void deleteCheckpoint(String jobId, String pipelineId, List checkpointIdList) throws CheckpointStorageException { - String path = getStorageParentDirectory() + jobId; - List fileNames = getFileNames(path); - if (fileNames.isEmpty()) { - throw new CheckpointStorageException( - "No checkpoint found for job, job id is: " + jobId); - } - fileNames.forEach( - fileName -> { - String checkpointIdByFileName = getCheckpointIdByFileName(fileName); - if (pipelineId.equals(getPipelineIdByFileName(fileName)) - && checkpointIdList.contains(checkpointIdByFileName)) { - try { - fs.delete( - new Path(path + DEFAULT_CHECKPOINT_FILE_PATH_SPLIT + fileName), - false); - } catch (Exception e) { - log.error( - "Failed to delete checkpoint {} for job {}, pipeline {}", - checkpointIdByFileName, - jobId, - pipelineId, - e); + fsLock.readLock().lock(); + try { + String path = getStorageParentDirectory() + jobId; + List fileNames = getFileNames(path); + if (fileNames.isEmpty()) { + throw new CheckpointStorageException( + "No checkpoint found for job, job id is: " + jobId); + } + fileNames.forEach( + fileName -> { + String checkpointIdByFileName = getCheckpointIdByFileName(fileName); + if (pipelineId.equals(getPipelineIdByFileName(fileName)) + && checkpointIdList.contains(checkpointIdByFileName)) { + try { + fs.delete( + new Path(path + DEFAULT_CHECKPOINT_FILE_PATH_SPLIT + fileName), + false); + } catch (Exception e) { + log.error( + "Failed to delete checkpoint {} for job {}, pipeline {}", + checkpointIdByFileName, + jobId, + pipelineId, + e); + } } - } - }); + }); + } finally { + fsLock.readLock().unlock(); + } + + } public List getFileNames(String path) throws CheckpointStorageException { + fsLock.readLock().lock(); try { - Path parentPath = new Path(path); - if (!fs.exists(parentPath)) { - log.info("Path " + path + " is not a directory"); - return new ArrayList<>(); - } - FileStatus[] fileStatus = - fs.listStatus(parentPath, path1 -> path1.getName().endsWith(FILE_FORMAT)); - List fileNames = new ArrayList<>(); - for (FileStatus status : fileStatus) { - fileNames.add(status.getPath().getName()); + try { + Path parentPath = new Path(path); + if (!fs.exists(parentPath)) { + log.info("Path " + path + " is not a directory"); + return new ArrayList<>(); + } + FileStatus[] fileStatus = + fs.listStatus(parentPath, path1 -> path1.getName().endsWith(FILE_FORMAT)); + List fileNames = new ArrayList<>(); + for (FileStatus status : fileStatus) { + fileNames.add(status.getPath().getName()); + } + return fileNames; + } catch (IOException e) { + throw new CheckpointStorageException("Failed to list files from names" + path, e); } - return fileNames; - } catch (IOException e) { - throw new CheckpointStorageException("Failed to list files from names" + path, e); + } finally { + fsLock.readLock().unlock(); } + + } /** @@ -359,19 +475,26 @@ public List getFileNames(String path) throws CheckpointStorageException */ private PipelineState readPipelineState(String fileName, String jobId) throws CheckpointStorageException { - fileName = - getStorageParentDirectory() + jobId + DEFAULT_CHECKPOINT_FILE_PATH_SPLIT + fileName; - try (FSDataInputStream in = fs.open(new Path(fileName)); - ByteArrayOutputStream stream = new ByteArrayOutputStream()) { - IOUtils.copyBytes(in, stream, 1024); - byte[] bytes = stream.toByteArray(); - return deserializeCheckPointData(bytes); - } catch (IOException e) { - throw new CheckpointStorageException( - String.format( - "Failed to read checkpoint data, file name is %s,job id is %s", - fileName, jobId), - e); + fsLock.readLock().lock(); + try { + fileName = + getStorageParentDirectory() + jobId + DEFAULT_CHECKPOINT_FILE_PATH_SPLIT + fileName; + try (FSDataInputStream in = fs.open(new Path(fileName)); + ByteArrayOutputStream stream = new ByteArrayOutputStream()) { + IOUtils.copyBytes(in, stream, 1024); + byte[] bytes = stream.toByteArray(); + return deserializeCheckPointData(bytes); + } catch (IOException e) { + throw new CheckpointStorageException( + String.format( + "Failed to read checkpoint data, file name is %s,job id is %s", + fileName, jobId), + e); + } + } finally { + fsLock.readLock().unlock(); } + + } } From f843835845a00b552082b1e123e52f0fa2aa0670 Mon Sep 17 00:00:00 2001 From: yinbenrong <505484338@qq.com> Date: Thu, 16 Oct 2025 18:04:59 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=96=87=E6=A1=A3=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/en/seatunnel-engine/checkpoint-storage.md | 3 +++ docs/zh/seatunnel-engine/checkpoint-storage.md | 3 +++ 2 files changed, 6 insertions(+) diff --git a/docs/en/seatunnel-engine/checkpoint-storage.md b/docs/en/seatunnel-engine/checkpoint-storage.md index 973777c0f7f..0c4c4332fec 100644 --- a/docs/en/seatunnel-engine/checkpoint-storage.md +++ b/docs/en/seatunnel-engine/checkpoint-storage.md @@ -199,6 +199,9 @@ seatunnel: // if you used kerberos, you can config like this: kerberosPrincipal: your-kerberos-principal kerberosKeytabFilePath: your-kerberos-keytab + // kerberos Expiration time of certification : + seatunnel.hadoop.dfs.kerberos.ticket.lifetime: 86400 + // if you need hdfs-site config, you can config like this: hdfs_site_path: /path/to/your/hdfs_site_path ``` diff --git a/docs/zh/seatunnel-engine/checkpoint-storage.md b/docs/zh/seatunnel-engine/checkpoint-storage.md index 030789f0069..e0accf33a1b 100644 --- a/docs/zh/seatunnel-engine/checkpoint-storage.md +++ b/docs/zh/seatunnel-engine/checkpoint-storage.md @@ -175,6 +175,9 @@ seatunnel: // 如果您使用kerberos,您可以这样配置: kerberosPrincipal: your-kerberos-principal kerberosKeytabFilePath: your-kerberos-keytab + // kerberos认证过期时间 : + seatunnel.hadoop.dfs.kerberos.ticket.lifetime: 86400 + ``` 如果HDFS是HA模式,您可以这样配置: