-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix flaky capture service start #20024
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -398,7 +398,7 @@ capture_service::do_capture() | |
zmq_msg_t msg; | ||
zmq_msg_init(&msg); | ||
int rc = zmq_msg_recv(&msg, cap_sub_sock, 0); | ||
RET_ON_ERR(rc == 1, "Failed to read subscription message when XSUB connects to XPUB"); | ||
|
||
/* | ||
* When XSUB socket connects to XPUB, a subscription message is sent as a single byte 1. | ||
* When capture service begins to read, the very first message that it will read is this | ||
|
@@ -409,8 +409,30 @@ capture_service::do_capture() | |
* | ||
* This behavior will only happen once when XSUB connects to XPUB not everytime cache is started. | ||
* | ||
* There are chances that there are events already published to XSUB endpoint before XSUB is able to connect to XPUB, so we can receive events | ||
before the subscription message | ||
*/ | ||
init_done = true; | ||
|
||
|
||
if(rc == 1) { // Expected case to receive subscription message as very first message | ||
SWSS_LOG_INFO("Received subscription message when XSUB connects to XPUB"); | ||
} else if (rc > 1) { // If there are events already published to XSUB before XSUB connects to XPUB, we can receive events before subscription message | ||
string event_source((const char*)zmq_msg_data(&msg), zmq_msg_size(&msg)); | ||
SWSS_LOG_DEBUG("Receiving event from source: %s, will read second part of event", event_source.c_str()); | ||
int more = 0; | ||
size_t more_size = sizeof(more); | ||
zmq_getsockopt(cap_sub_sock, ZMQ_RCVMORE, &more, &more_size); | ||
if(more) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need a loop here to read all message? Also, if there is an ADO please update the PR description. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed logic to not account for capture service capturing control character. Will be done in zmq_read_part There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. #Closed |
||
zmq_msg_t msg_part; | ||
zmq_msg_init(&msg_part); | ||
zmq_msg_recv(&msg_part, cap_sub_sock, 0); | ||
zmq_msg_close(&msg_part); | ||
} | ||
} else { | ||
SWSS_LOG_ERROR("Error reading from ZMQ socket, rc=%d", rc); | ||
} | ||
zmq_msg_close(&msg); | ||
init_done = true; | ||
} | ||
|
||
while (m_ctrl != START_CAPTURE) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are too much "received subscription message" log, suggest change to SWSS_LOG_DEBUG.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will happen only once during process lifetime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#Closed