diff --git a/mdstcpip/mdsipshr/ConnectToMds.c b/mdstcpip/mdsipshr/ConnectToMds.c index 5036a821c7..32bb0d9ac3 100644 --- a/mdstcpip/mdsipshr/ConnectToMds.c +++ b/mdstcpip/mdsipshr/ConnectToMds.c @@ -82,7 +82,7 @@ static int do_login(Connection *c) int status = SendMdsMsgC(c, msend, 0); int err; free(msend); - if (STATUS_NOT_OK) + if ((status == SsINTERNAL) || STATUS_NOT_OK) { perror("Error during login: send"); err = C_ERROR; diff --git a/mdstcpip/mdsipshr/Connections.c b/mdstcpip/mdsipshr/Connections.c index 184729e7e8..a27c2166f0 100644 --- a/mdstcpip/mdsipshr/Connections.c +++ b/mdstcpip/mdsipshr/Connections.c @@ -38,6 +38,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // #define DEBUG #include +// Because threads have their own MdsipThreadStatic data, the receiver thread +// cannot access the connection list that is stored in the main thread. If +// that capability is ever needed, must use a global pointer. However, sharing +// the connection list with both threads is inadvisable because it can result +// in deadlock. Connection *_FindConnection(int id, Connection **prev, MDSIPTHREADSTATIC_ARG) { if (id == INVALID_CONNECTION_ID) diff --git a/mdstcpip/mdsipshr/GetAnswerInfo.c b/mdstcpip/mdsipshr/GetAnswerInfo.c index e035dbc389..0f285f1138 100644 --- a/mdstcpip/mdsipshr/GetAnswerInfo.c +++ b/mdstcpip/mdsipshr/GetAnswerInfo.c @@ -74,6 +74,7 @@ int GetAnswerInfoTO(int id, char *dtype, short *length, char *ndims, int *dims, CloseConnection(id); status = MDSplusERROR; } + if (status == SsINTERNAL) status = MDSplusERROR; if (STATUS_NOT_OK) { free(m); diff --git a/mdstcpip/mdsipshr/GetMdsMsg.c b/mdstcpip/mdsipshr/GetMdsMsg.c index a8c4f1cfb1..3d93c8cc39 100644 --- a/mdstcpip/mdsipshr/GetMdsMsg.c +++ b/mdstcpip/mdsipshr/GetMdsMsg.c @@ -35,6 +35,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. //#define DEBUG #include +// Can return non-MDSplus error code, SsINTERNAL static int get_bytes_to(Connection *c, void *buffer, size_t bytes_to_recv, int to_msec) { char *bptr = (char *)buffer; @@ -83,6 +84,7 @@ static int get_bytes_to(Connection *c, void *buffer, size_t bytes_to_recv, int t // GetMdsMsg ///////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// +// Can set status to non-MDSplus error code, SsINTERNAL Message *GetMdsMsgTOC(Connection *c, int *status, int to_msec) { MsgHdr header; @@ -125,7 +127,7 @@ Message *GetMdsMsgTOC(Connection *c, int *status, int to_msec) m = malloc(msglen); m->h = header; *status = uncompress((unsigned char *)m->bytes, &dlen, - (unsigned char *)msg->bytes + 4, dlen - 4) == 0; + (unsigned char *)msg->bytes + 4, dlen - 4) == Z_OK; if (IS_OK(*status)) { m->h.msglen = msglen; diff --git a/mdstcpip/mdsipshr/SendArg.c b/mdstcpip/mdsipshr/SendArg.c index 825a8f33a4..0af7d3e3d7 100644 --- a/mdstcpip/mdsipshr/SendArg.c +++ b/mdstcpip/mdsipshr/SendArg.c @@ -103,6 +103,7 @@ int SendArg(int id, unsigned char idx, char dtype, unsigned char nargs, m->h.message_id = (idx == 0 || nargs == 0) ? ConnectionIncMessageId(c) : c->message_id; int status = m->h.message_id ? SendMdsMsgC(c, m, 0) : MDSplusERROR; + if (status == SsINTERNAL) status = MDSplusERROR; free(m); if (STATUS_NOT_OK) UnlockConnection(c); diff --git a/mdstcpip/mdsipshr/SendMdsMsg.c b/mdstcpip/mdsipshr/SendMdsMsg.c index fd6c60e451..a46c1f5b31 100644 --- a/mdstcpip/mdsipshr/SendMdsMsg.c +++ b/mdstcpip/mdsipshr/SendMdsMsg.c @@ -34,6 +34,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. //#define DEBUG #include +// Can return non-MDSplus error code, SsINTERNAL static int send_bytes(Connection *c, void *buffer, size_t bytes_to_send, int options) { if (!c || !c->io) @@ -74,6 +75,7 @@ static int send_bytes(Connection *c, void *buffer, size_t bytes_to_send, int opt return MDSplusSUCCESS; } +// Can return non-MDSplus error code of SsINTERNAL because of send_bytes() int SendMdsMsgC(Connection *c, Message *m, int msg_options) { unsigned long len = m->h.msglen - sizeof(m->h); @@ -104,7 +106,7 @@ int SendMdsMsgC(Connection *c, Message *m, int msg_options) m->h.client_type = ClientType(); if (clength && compress2((unsigned char *)cm->bytes + 4, &clength, - (unsigned char *)m->bytes, len, c->compression_level) == 0 && + (unsigned char *)m->bytes, len, c->compression_level) == Z_OK && clength < len) { cm->h = m->h; diff --git a/servershr/Client.h b/servershr/Client.h index 59604aff90..f213582749 100644 --- a/servershr/Client.h +++ b/servershr/Client.h @@ -188,6 +188,9 @@ static void Client_do_message(Client *c, fd_set *fdactive) } switch (replyType) { + // When an action service has processed an action, it sends back this reply. + // So update the status flags for the job. Don't remove the client because + // want the network connection to remain active. case SrvJobFINISHED: { Job *j = Job_get_by_jobid(jobid); diff --git a/servershr/ServerDispatchPhase.c b/servershr/ServerDispatchPhase.c index 6e4ece624c..300ac6001e 100644 --- a/servershr/ServerDispatchPhase.c +++ b/servershr/ServerDispatchPhase.c @@ -624,7 +624,7 @@ static void dispatch(int i) // ProgLoc = 7001; send_monitor(MonitorDispatched, i); // ProgLoc = 7002; - if (noact) + if (noact) // global, 1 = show actions but don't dispatch, 0 = dispatch { actions[i].dispatched = 1; actions[i].status = status = 1; @@ -687,6 +687,7 @@ static void action_done_action_locked(int idx) MdsFree1Dx(&xd, NULL); } +// Uses recursion to deal with cascade of actions static void action_done_action_unlocked(int idx) { if (is_abort_in_progress()) diff --git a/servershr/ServerQAction.c b/servershr/ServerQAction.c index 80b3387122..1f32e5ce58 100644 --- a/servershr/ServerQAction.c +++ b/servershr/ServerQAction.c @@ -969,7 +969,7 @@ static void cleanup_client(SrvJob *job) } } -/// returns the number of bytes sent +/// on success returns the number of bytes sent, on failure returns -1 static int send_all(SOCKET sock, char *msg, int len) { int sent; @@ -1006,6 +1006,7 @@ static int send_reply(SrvJob *job, int replyType, int status_in, int length, cha if (sock == INVALID_SOCKET) { MDSMSG(SVRJOB_PRI " break connection", SVRJOB_VAR(job)); + cleanup_client(job); break; } int bytes = send_all(sock, reply, 60); diff --git a/servershr/ServerSendMessage.c b/servershr/ServerSendMessage.c index c89d339a3a..7626c41acf 100644 --- a/servershr/ServerSendMessage.c +++ b/servershr/ServerSendMessage.c @@ -100,6 +100,11 @@ static SOCKET get_socket_by_conid(int conid) return INVALID_SOCKET; } +// Mdstcl has two threads: main and receiver. The main thread dispatches +// actions to action services (typically one service per computer). +// The receiver thread processes replies from all action services. +// Each thread uses a different port, thus a different network connection. +// Connections persist and thus handle all traffic between the endpoints. int ServerSendMessage(int *msgid, char *server, int op, int *retstatus, pthread_rwlock_t *lock, int *conid_out, void (*callback_done)(), void *callback_param, void (*callback_before)(), int numargs_in, @@ -200,9 +205,11 @@ int ServerSendMessage(int *msgid, char *server, int op, int *retstatus, Job_cleanup(status, jobid); return status; } + // The "action service" immediately sends back a handshake status confirming + // that it received the command sent above. status = GetAnswerInfoTS(conid, &dtype, &len, &ndims, dims, &numbytes, (void **)&dptr, &mem); - if (op == SrvStop) + if (op == SrvStop) // If stopped the server, a failed status is expected { if (STATUS_NOT_OK) { @@ -303,6 +310,7 @@ static SOCKET new_reply_socket(uint16_t *port_out) static Condition ReceiverRunning = CONDITION_INITIALIZER; +// Returns non-MDSplus status: -1, 0, or 1. OK is 0, others are error. static int start_receiver(uint16_t *port_out) { INIT_STATUS; @@ -323,7 +331,7 @@ static int start_receiver(uint16_t *port_out) if (!ReceiverRunning.value) { CREATE_DETACHED_THREAD(thread, *16, receiver_thread, &sock); - if (c_status) + if (c_status) // is from preceding macro { perror("Error creating pthread"); status = MDSplusERROR; @@ -342,7 +350,7 @@ static int start_receiver(uint16_t *port_out) static void receiver_atexit(void *arg) { (void)arg; - MDSDBG("ServerSendMessage thread exitted"); + MDSDBG("ServerSendMessage thread exited"); CONDITION_RESET(&ReceiverRunning); } @@ -376,6 +384,8 @@ static void reset_fdactive(int rep, SOCKET server, fd_set *fdactive) MDSWRN("reset fdactive in reset_fdactive"); } +// When any action service completes an action, a reply is sent back to mdstcl. +// The single receiver thread processes the replies from all action services. static void receiver_thread(void *sockptr) { atexit((void *)receiver_atexit); @@ -395,6 +405,7 @@ static void receiver_thread(void *sockptr) int rep; int num = 0; struct timeval readto, timeout = {10, 0}; + // Tries 10 times if select() always returns error (i.e., num < 0) for (rep = 0; rep < 10; rep++) { for (readfds = fdactive, readto = timeout;; @@ -523,6 +534,12 @@ EXPORT int ServerDisconnect(char *server_in) return status; } +// If a network connection already exists, reuse it. Only create a +// connection in two scenarios: new or defunct. +// In the simplest configuration, mdstcl has three connections: +// 1) to the mdsip service for tree access (to read the action nodes), +// 2) a connection to dispatch actions to the action service, and +// 3) a connection to receive replies from the action service. static inline int server_connect(char *server, uint32_t addr, uint16_t port) { int conid;