diff --git a/servershr/Client.h b/servershr/Client.h index bc7aca0490..59604aff90 100644 --- a/servershr/Client.h +++ b/servershr/Client.h @@ -86,8 +86,8 @@ static inline int Client_free(Client *client, fd_set *fdactive) MDSDBG(CLIENT_PRI, CLIENT_VAR(client)); if (client->reply_sock != INVALID_SOCKET) { - shutdown(client->reply_sock, 2); - close(client->reply_sock); + shutdown(client->reply_sock, SHUT_RDWR); + closesocket(client->reply_sock); if (fdactive) FD_CLR(client->reply_sock, fdactive); } @@ -161,7 +161,7 @@ static void Client_do_message(Client *c, fd_set *fdactive) if (nbytes != 0) MDSWRN(CLIENT_PRI " Invalid read %d/60", CLIENT_VAR(c), nbytes); else - MDSDBG(CLIENT_PRI " Clint disconnected", CLIENT_VAR(c)); + MDSDBG(CLIENT_PRI " Client disconnected", CLIENT_VAR(c)); Client_remove(c, fdactive); return; } diff --git a/servershr/ServerDispatchPhase.c b/servershr/ServerDispatchPhase.c index 486d87e6e4..6e4ece624c 100644 --- a/servershr/ServerDispatchPhase.c +++ b/servershr/ServerDispatchPhase.c @@ -634,20 +634,16 @@ static void dispatch(int i) } else { + actions[i].dispatched = 1; UNLOCK_ACTION(i, d_w); status = ServerDispatchAction( 0, Server(server, actions[i].server), table->tree, table->shot, actions[i].nid, action_done, (void *)(intptr_t)i, &actions[i].status, &actions[i].lock, &actions[i].netid, before); - if (STATUS_OK) - { - WRLOCK_ACTION(i, d_w); - actions[i].dispatched = 1; - UNLOCK_ACTION(i, d); - } - else + if (STATUS_NOT_OK) { WRLOCK_ACTION(i, d_w); + actions[i].dispatched = 0; actions[i].status = status; action_done_action_locked(i); UNLOCK_ACTION(i, d); diff --git a/servershr/ServerQAction.c b/servershr/ServerQAction.c index 3351fc26e1..80b3387122 100644 --- a/servershr/ServerQAction.c +++ b/servershr/ServerQAction.c @@ -75,9 +75,9 @@ static int DoSrvClose(SrvJob *job_in); static int DoSrvCreatePulse(SrvJob *job_in); static void DoSrvMonitor(SrvJob *job_in); -static void RemoveClient(SrvJob *job); extern uint32_t MdsGetClientAddr(); extern char *GetPortname(); +static pthread_mutex_t ClientsMutex = PTHREAD_MUTEX_INITIALIZER; static ClientList *Clients = NULL; static MonitorList *Monitors = NULL; @@ -838,65 +838,135 @@ static void KillWorker() } // both -static SOCKET AttachPort(uint32_t addr, uint16_t port) +static void close_socket(SOCKET sock) +{ + shutdown(sock, SHUT_RDWR); + closesocket(sock); +} +static SOCKET open_socket(uint32_t addr, uint16_t port) { - SOCKET sock; struct sockaddr_in sin; - ClientList *l, *new; - for (l = Clients; l; l = l->next) - if (l->addr == addr && l->port == port) - return l->sock; sin.sin_port = htons(port); sin.sin_family = AF_INET; *(uint32_t *)(&sin.sin_addr) = addr; - sock = socket(AF_INET, SOCK_STREAM, 0); - if (sock == INVALID_SOCKET) + SOCKET sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock != INVALID_SOCKET) { - MDSERR("Cannot get socket for " IPADDRPRI ":%u", IPADDRVAR(&addr), port); + if (connect(sock, (struct sockaddr *)&sin, sizeof(sin)) == 0) + { + MDSDBG("Connected to " IPADDRPRI ":%u", IPADDRVAR(&addr), port); + return sock; + } + MDSERR("Cannot connect to " IPADDRPRI ":%u", IPADDRVAR(&addr), port); + close_socket(sock); } else { - if (connect(sock, (struct sockaddr *)&sin, sizeof(sin)) == -1) + MDSERR("Cannot get socket for " IPADDRPRI ":%u", IPADDRVAR(&addr), port); + } + return INVALID_SOCKET; +} +static void add_client(uint32_t addr, uint16_t port, SOCKET sock) +{ + ClientList *new = (ClientList *)malloc(sizeof(ClientList)); + new->addr = addr; + new->port = port; + new->sock = sock; + MDSDBG("add socket %" PRI_SOCKET " for " IPADDRPRI ":%u", new->sock, IPADDRVAR(&new->addr), new->port); + pthread_mutex_lock(&ClientsMutex); + new->next = Clients; + Clients = new; + pthread_mutex_unlock(&ClientsMutex); +} +static int check_socket(SOCKET socket) +{ + if (socket != INVALID_SOCKET) + { + fd_set fdactive; + FD_ZERO(&fdactive); + FD_SET(socket, &fdactive); + struct timeval timeout = {0, 0}; // non-blocking + return select(socket + 1, &fdactive, 0, 0, &timeout) == 0; + } + return B_FALSE; +} +static SOCKET find_client(uint32_t addr, uint16_t port) +{ + ClientList **p; + SOCKET sock = INVALID_SOCKET; + pthread_mutex_lock(&ClientsMutex); + for (p = &Clients; *p != NULL; p = &(*p)->next) + { + if ((*p)->addr == addr && (*p)->port == port) { - MDSERR("Cannot connect to " IPADDRPRI ":%u", IPADDRVAR(&addr), port); - shutdown(sock, 2); - close(sock); - return INVALID_SOCKET; + ClientList *l = *p; + if (check_socket(l->sock)) + { + sock = l->sock; + MDSDBG("reuse socket %" PRI_SOCKET " for " IPADDRPRI ":%u", sock, IPADDRVAR(&addr), port); + } + else + { + *p = l->next; + close_socket(l->sock); + MDSDBG("cannot reuse socket %" PRI_SOCKET " for " IPADDRPRI ":%u?", l->sock, IPADDRVAR(&addr), port); + free(l); + } + break; } - MDSDBG("Connected to " IPADDRPRI ":%u", IPADDRVAR(&addr), port); - new = (ClientList *)malloc(sizeof(ClientList)); - l = Clients; - Clients = new; - new->addr = addr; - new->port = port; - new->sock = sock; - new->next = l; } + pthread_mutex_unlock(&ClientsMutex); return sock; } -// both -static void RemoveClient(SrvJob *job) +static SOCKET remove_client(uint32_t addr, uint16_t port) { - ClientList *l, *prev; - for (prev = 0, l = Clients; l;) + ClientList **p; + SOCKET sock = INVALID_SOCKET; + pthread_mutex_lock(&ClientsMutex); + for (p = &Clients; *p != NULL; p = &(*p)->next) { - if (job->h.addr == l->addr && job->h.port == l->port) + if ((*p)->addr == addr && (*p)->port == port) { - shutdown(l->sock, 2); - close(l->sock); - if (prev) - prev->next = l->next; - else - Clients = l->next; + ClientList *l = *p; + sock = l->sock; + *p = l->next; free(l); break; } - else + } + pthread_mutex_unlock(&ClientsMutex); + return sock; +} +static SOCKET setup_client(SrvJob *job) +{ + const uint32_t addr = job->h.addr; + const uint16_t port = job->h.port; + SOCKET sock = find_client(addr, port); + if (sock == INVALID_SOCKET) + { + sock = open_socket(addr, port); + if (sock != INVALID_SOCKET) { - prev = l; - l = l->next; + add_client(addr, port, sock); + MDSMSG("setup connection %" PRI_SOCKET " " SVRJOB_PRI, sock, SVRJOB_VAR(job)); } } + return sock; +} +// both +static void cleanup_client(SrvJob *job) +{ + SOCKET sock = remove_client(job->h.addr, job->h.port); + if (sock != INVALID_SOCKET) + close_socket(sock); + if (STATIC_Debug) + { + MDSMSG("cleanup connection %" PRI_SOCKET " " SVRJOB_PRI, sock, SVRJOB_VAR(job)); + } + else + { + MDSDBG("cleanup connection %" PRI_SOCKET " " SVRJOB_PRI, sock, SVRJOB_VAR(job)); + } } /// returns the number of bytes sent @@ -910,7 +980,8 @@ static int send_all(SOCKET sock, char *msg, int len) const int bytes = send(sock, msg + sent, len - sent, MSG_NOSIGNAL); if (bytes <= 0) { - sent = bytes; + if (bytes != 0) + sent = bytes; break; } sent += bytes; @@ -922,9 +993,7 @@ static int send_all(SOCKET sock, char *msg, int len) static int send_reply(SrvJob *job, int replyType, int status_in, int length, char *msg) { MDSDBG(SVRJOB_PRI " %d", SVRJOB_VAR(job), replyType); - int status; - status = MDSplusERROR; - SOCKET sock; + int status = MDSplusERROR; long msg_len = msg ? (long)strlen(msg) : 0; int try_again = FALSE; char reply[60]; @@ -933,32 +1002,30 @@ static int send_reply(SrvJob *job, int replyType, int status_in, int length, cha do { errno = 0; - sock = AttachPort(job->h.addr, (uint16_t)job->h.port); + SOCKET sock = setup_client(job); if (sock == INVALID_SOCKET) + { + MDSMSG(SVRJOB_PRI " break connection", SVRJOB_VAR(job)); break; + } int bytes = send_all(sock, reply, 60); + MDSDBG("send 60: %d %d", bytes, replyType); if (bytes == 60) { - bytes = send_all(sock, msg, length); - if (bytes == length) + if (check_socket(sock)) { - status = MDSplusSUCCESS; - break; + bytes = send_all(sock, msg, length); + MDSDBG("send msg: %d/%d", bytes, length); + if (bytes == length) + { + status = MDSplusSUCCESS; + break; + } } } - if (STATUS_NOT_OK) - { + else try_again = errno == EPIPE; - int debug; - pthread_mutex_lock(&STATIC_lock); - debug = STATIC_Debug; - pthread_mutex_unlock(&STATIC_lock); - if (debug) - { - MDSMSG(SVRJOB_PRI " drop connection", SVRJOB_VAR(job)); - } - RemoveClient(job); - } + cleanup_client(job); } while (try_again--); return status; } diff --git a/servershr/ServerSendMessage.c b/servershr/ServerSendMessage.c index 3b3714a848..c89d339a3a 100644 --- a/servershr/ServerSendMessage.c +++ b/servershr/ServerSendMessage.c @@ -62,6 +62,7 @@ int ServerSendMessage(); #include #endif +// #define DEBUG #include #include #include @@ -73,7 +74,6 @@ int ServerSendMessage(); #define _NO_SERVER_SEND_MESSAGE_PROTO #include "servershrp.h" -//#define DEBUG #include "Client.h" extern short ArgLen(); @@ -148,7 +148,7 @@ int ServerSendMessage(int *msgid, char *server, int op, int *retstatus, addr = *(uint32_t *)&addr_struct.sin_addr; if (!addr) { - MDSWRN("could not resolve address the socket is bound to"); + MDSWRN("could not resolve address socket %" PRI_SOCKET " is bound to", sock); if (callback_done) callback_done(callback_param); return ServerSOCKET_ADDR_ERROR; @@ -196,7 +196,7 @@ int ServerSendMessage(int *msgid, char *server, int op, int *retstatus, status = SendArg(conid, 0, DTYPE_CSTRING, 1, (short)(ccmd - cmd), 0, 0, cmd); if (STATUS_NOT_OK) { - MDSWRN("could not sending message to server"); + MDSWRN("could not send message to server"); Job_cleanup(status, jobid); return status; } @@ -246,7 +246,7 @@ static SOCKET new_reply_socket(uint16_t *port_out) { char *dash; for (dash = range; *dash && *dash != '-'; dash++) - ; + continue; if (dash) *(dash++) = 0; start_port = (uint16_t)(strtol(range, NULL, 0) & 0xffff); @@ -527,16 +527,32 @@ static inline int server_connect(char *server, uint32_t addr, uint16_t port) { int conid; LOCK_CLIENTS; - conid = ConnectToMds(server); - if (conid != INVALID_CONNECTION_ID) + Client *c = Client_get_by_addr_and_port_locked(addr, port); + if (c && get_socket_by_conid(c->conid) != INVALID_SOCKET) { - Client *c = newClient(addr, port, conid); - MDSDBG(CLIENT_PRI " connected to %s", CLIENT_VAR(c), server); - Client_push_locked(c); + conid = c->conid; } else { - MDSWRN("Could not connect to %s (" IPADDRPRI ":%d)", server, IPADDRVAR(&addr), port); + conid = ConnectToMds(server); + if (conid != INVALID_CONNECTION_ID) + { + if (c) + { + MDSDBG(CLIENT_PRI " re-connected to %s as %d", CLIENT_VAR(c), server, conid); + c->conid = conid; + } + else + { + c = newClient(addr, port, conid); + MDSDBG(CLIENT_PRI " connected to %s", CLIENT_VAR(c), server); + Client_push_locked(c); + } + } + else + { + MDSWRN("Could not connect to %s (" IPADDRPRI ":%d)", server, IPADDRVAR(&addr), port); + } } UNLOCK_CLIENTS; return conid; @@ -585,7 +601,7 @@ static void accept_client(SOCKET reply_sock, struct sockaddr_in *sin, fd_set *fd else { MDSWRN("Dropped connection from " IPADDRPRI ":%d", IPADDRVAR(&addr), port); - shutdown(reply_sock, 2); - close(reply_sock); + shutdown(reply_sock, SHUT_RDWR); + closesocket(reply_sock); } } diff --git a/servershr/servershrp.h b/servershr/servershrp.h index c1ff748c7b..8d6b84bbe1 100644 --- a/servershr/servershrp.h +++ b/servershr/servershrp.h @@ -4,11 +4,13 @@ #include #ifdef _WIN32 #include +#define PRI_SOCKET PRIdPTR #else #ifndef HAVE_PTHREAD_LOCK_GLOBAL_NP extern void pthread_lock_global_np(); extern void pthread_unlock_global_np(); #endif +#define PRI_SOCKET "d" #endif #ifdef MDSOBJECTSCPPSHRVS_EXPORTS