Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wait for read flush timeout instead of flushing last line immediately on file deletion #1418

Merged
merged 4 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 69 additions & 55 deletions core/event_handler/EventHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,9 +514,6 @@ void ModifyHandler::Handle(const Event& event) {
"file device", readerArray[0]->GetDevInode().dev)(
"file inode", readerArray[0]->GetDevInode().inode)("file size",
readerArray[0]->GetFileSize()));
if (!readerArray[0]->ShouldForceReleaseDeletedFileFd() && readerArray[0]->HasDataInCache()) {
ForceReadLogAndPush(readerArray[0]);
}
// release fd as quick as possible
readerArray[0]->CloseFilePtr();
}
Expand Down Expand Up @@ -659,6 +656,10 @@ void ModifyHandler::Handle(const Event& event) {
// make sure file open success, or we just return
bool isFileOpen = reader->IsFileOpened();
while (!reader->UpdateFilePtr()) {
if (event.IsReaderFlushTimeout()) {
break;
}

if (errno == EMFILE) {
LOG_WARNING(sLogger,
("too many open files", "skip this read operation")("log path", reader->GetHostLogPath()));
Expand Down Expand Up @@ -688,55 +689,60 @@ void ModifyHandler::Handle(const Event& event) {
LOG_DEBUG(sLogger, ("read other file", reader->GetDevInode().inode));
}

bool recreateReaderFlag = false;
// if dev inode changed, delete this reader and create reader
if (!reader->CheckDevInode()) {
LOG_INFO(sLogger,
("file dev inode changed, create new reader. new path",
logPath)("old path", reader->GetHostLogPath())(ToString(readerArrayPtr->size()),
mRotatorReaderMap.size())(
ToString(reader->GetDevInode().inode), reader->GetLastFilePos())("DevInode map size",
mDevInodeReaderMap.size()));
recreateReaderFlag = true;
LogtailAlarm::GetInstance()->SendAlarm(INNER_PROFILE_ALARM,
string("file dev inode changed, create new reader. new path:")
+ reader->GetHostLogPath() + " ,project:" + reader->GetProject()
+ " ,logstore:" + reader->GetLogstore());
}
// if signature is different and logpath is different, delete this reader and create reader
else if (!reader->CheckFileSignatureAndOffset(isFileOpen) && logPath != reader->GetHostLogPath()) {
LOG_INFO(sLogger,
("file sig and name both changed, create new reader. new path",
logPath)("old path", reader->GetHostLogPath())(ToString(readerArrayPtr->size()),
mRotatorReaderMap.size())(
ToString(reader->GetDevInode().inode), reader->GetLastFilePos())("DevInode map size",
mDevInodeReaderMap.size()));
recreateReaderFlag = true;
}
if (recreateReaderFlag) {
LOG_INFO(sLogger,
("need to recreate reader", "remove the corresponding reader from the log reader queue")(
"project", reader->GetProject())("logstore", reader->GetLogstore())("config", mConfigName)(
"log reader queue name", reader->GetHostLogPath())(
"log reader queue size", readerArrayPtr->size() - 1)("file device", reader->GetDevInode().dev)(
"file inode", reader->GetDevInode().inode)("file size", reader->GetFileSize()));
readerArrayPtr->pop_front();
mDevInodeReaderMap.erase(reader->GetDevInode());
// delete this reader, do not insert into rotator reader map
// repush this event and wait for create reader
Event* ev = new Event(event);
LogInput::GetInstance()->PushEventQueue(ev);
return;
}
// the only situation where this condition is not met is when event is reader flush timeout
if (reader->IsFileOpened()) {
bool recreateReaderFlag = false;
// if dev inode changed, delete this reader and create reader
if (!reader->CheckDevInode()) {
LOG_INFO(sLogger,
("file dev inode changed, create new reader. new path",
logPath)("old path", reader->GetHostLogPath())(ToString(readerArrayPtr->size()),
mRotatorReaderMap.size())(
ToString(reader->GetDevInode().inode),
reader->GetLastFilePos())("DevInode map size", mDevInodeReaderMap.size()));
recreateReaderFlag = true;
LogtailAlarm::GetInstance()->SendAlarm(
INNER_PROFILE_ALARM,
string("file dev inode changed, create new reader. new path:") + reader->GetHostLogPath()
+ " ,project:" + reader->GetProject() + " ,logstore:" + reader->GetLogstore());
}
// if signature is different and logpath is different, delete this reader and create reader
else if (!reader->CheckFileSignatureAndOffset(isFileOpen) && logPath != reader->GetHostLogPath()) {
LOG_INFO(sLogger,
("file sig and name both changed, create new reader. new path",
logPath)("old path", reader->GetHostLogPath())(ToString(readerArrayPtr->size()),
mRotatorReaderMap.size())(
ToString(reader->GetDevInode().inode),
reader->GetLastFilePos())("DevInode map size", mDevInodeReaderMap.size()));
recreateReaderFlag = true;
}
if (recreateReaderFlag) {
LOG_INFO(
sLogger,
("need to recreate reader", "remove the corresponding reader from the log reader queue")(
"project", reader->GetProject())("logstore", reader->GetLogstore())("config", mConfigName)(
"log reader queue name", reader->GetHostLogPath())(
"log reader queue size", readerArrayPtr->size() - 1)("file device", reader->GetDevInode().dev)(
"file inode", reader->GetDevInode().inode)("file size", reader->GetFileSize()));
readerArrayPtr->pop_front();
mDevInodeReaderMap.erase(reader->GetDevInode());
// delete this reader, do not insert into rotator reader map
// repush this event and wait for create reader
Event* ev = new Event(event);
LogInput::GetInstance()->PushEventQueue(ev);
return;
}

if (reader->ShouldForceReleaseDeletedFileFd()) {
LOG_INFO(sLogger,
("force closing the file, project", reader->GetProject())("logstore", reader->GetLogstore())(
"config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
"file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
"file size", reader->GetFileSize())("last file position", reader->GetLastFilePos()));
reader->CloseFilePtr();
if (reader->ShouldForceReleaseDeletedFileFd()) {
LOG_INFO(sLogger,
("force closing the file, project", reader->GetProject())("logstore", reader->GetLogstore())(
"config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
"file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
"file size", reader->GetFileSize())("last file position", reader->GetLastFilePos()));
reader->CloseFilePtr();
}
}

bool hasMoreData;
do {
if (!LogProcess::GetInstance()->IsValidToReadLog(reader->GetLogstoreKey())) {
Expand All @@ -763,15 +769,23 @@ void ModifyHandler::Handle(const Event& event) {
hasMoreData = reader->ReadLog(*logBuffer, &event);
int32_t pushRetry = PushLogToProcessor(reader, logBuffer);
if (!hasMoreData) {
if (reader->IsFileDeleted() || reader->IsContainerStopped()) {
// release fd as quick as possible
if (reader->IsFileDeleted()) {
LOG_INFO(sLogger,
("close the file",
"current file has been read, and is marked deleted or the relative container has been "
"stopped")("project", reader->GetProject())("logstore", reader->GetLogstore())(
("close the file", "current file has been read, and is marked deleted")(
"project", reader->GetProject())("logstore", reader->GetLogstore())(
"config", mConfigName)("log reader queue name", reader->GetHostLogPath())(
"file device", reader->GetDevInode().dev)("file inode", reader->GetDevInode().inode)(
"file size", reader->GetFileSize()));
reader->CloseFilePtr();
} else if (reader->IsContainerStopped()) {
// release fd as quick as possible
LOG_INFO(
sLogger,
("close the file", "current file has been read, and the relative container has been stopped")(
"project", reader->GetProject())("logstore", reader->GetLogstore())("config", mConfigName)(
"log reader queue name", reader->GetHostLogPath())("file device",
reader->GetDevInode().dev)(
"file inode", reader->GetDevInode().inode)("file size", reader->GetFileSize()));
ForceReadLogAndPush(reader);
reader->CloseFilePtr();
}
Expand Down
Loading
Loading