Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions servershr/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ static inline int Client_free(Client *client, fd_set *fdactive)
MDSDBG(CLIENT_PRI, CLIENT_VAR(client));
if (client->reply_sock != INVALID_SOCKET)
{
shutdown(client->reply_sock, 2);
close(client->reply_sock);
shutdown(client->reply_sock, SHUT_RDWR);
closesocket(client->reply_sock);
if (fdactive)
FD_CLR(client->reply_sock, fdactive);
}
Expand Down Expand Up @@ -161,7 +161,7 @@ static void Client_do_message(Client *c, fd_set *fdactive)
if (nbytes != 0)
MDSWRN(CLIENT_PRI " Invalid read %d/60", CLIENT_VAR(c), nbytes);
else
MDSDBG(CLIENT_PRI " Clint disconnected", CLIENT_VAR(c));
MDSDBG(CLIENT_PRI " Client disconnected", CLIENT_VAR(c));
Client_remove(c, fdactive);
return;
}
Expand Down
10 changes: 3 additions & 7 deletions servershr/ServerDispatchPhase.c
Original file line number Diff line number Diff line change
Expand Up @@ -634,20 +634,16 @@ static void dispatch(int i)
}
else
{
actions[i].dispatched = 1;
UNLOCK_ACTION(i, d_w);
status = ServerDispatchAction(
0, Server(server, actions[i].server), table->tree, table->shot,
actions[i].nid, action_done, (void *)(intptr_t)i, &actions[i].status,
&actions[i].lock, &actions[i].netid, before);
if (STATUS_OK)
{
WRLOCK_ACTION(i, d_w);
actions[i].dispatched = 1;
UNLOCK_ACTION(i, d);
}
else
if (STATUS_NOT_OK)
{
WRLOCK_ACTION(i, d_w);
actions[i].dispatched = 0;
actions[i].status = status;
action_done_action_locked(i);
UNLOCK_ACTION(i, d);
Expand Down
183 changes: 125 additions & 58 deletions servershr/ServerQAction.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ static int DoSrvClose(SrvJob *job_in);
static int DoSrvCreatePulse(SrvJob *job_in);
static void DoSrvMonitor(SrvJob *job_in);

static void RemoveClient(SrvJob *job);
extern uint32_t MdsGetClientAddr();
extern char *GetPortname();
static pthread_mutex_t ClientsMutex = PTHREAD_MUTEX_INITIALIZER;
static ClientList *Clients = NULL;

static MonitorList *Monitors = NULL;
Expand Down Expand Up @@ -838,65 +838,135 @@ static void KillWorker()
}

// both
static SOCKET AttachPort(uint32_t addr, uint16_t port)
static void close_socket(SOCKET sock)
{
shutdown(sock, SHUT_RDWR);
closesocket(sock);
}
static SOCKET open_socket(uint32_t addr, uint16_t port)
{
SOCKET sock;
struct sockaddr_in sin;
ClientList *l, *new;
for (l = Clients; l; l = l->next)
if (l->addr == addr && l->port == port)
return l->sock;
sin.sin_port = htons(port);
sin.sin_family = AF_INET;
*(uint32_t *)(&sin.sin_addr) = addr;
sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == INVALID_SOCKET)
SOCKET sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock != INVALID_SOCKET)
{
MDSERR("Cannot get socket for " IPADDRPRI ":%u", IPADDRVAR(&addr), port);
if (connect(sock, (struct sockaddr *)&sin, sizeof(sin)) == 0)
{
MDSDBG("Connected to " IPADDRPRI ":%u", IPADDRVAR(&addr), port);
return sock;
}
MDSERR("Cannot connect to " IPADDRPRI ":%u", IPADDRVAR(&addr), port);
close_socket(sock);
}
else
{
if (connect(sock, (struct sockaddr *)&sin, sizeof(sin)) == -1)
MDSERR("Cannot get socket for " IPADDRPRI ":%u", IPADDRVAR(&addr), port);
}
return INVALID_SOCKET;
}
static void add_client(uint32_t addr, uint16_t port, SOCKET sock)
{
ClientList *new = (ClientList *)malloc(sizeof(ClientList));
new->addr = addr;
new->port = port;
new->sock = sock;
MDSDBG("add socket %" PRI_SOCKET " for " IPADDRPRI ":%u", new->sock, IPADDRVAR(&new->addr), new->port);
pthread_mutex_lock(&ClientsMutex);
new->next = Clients;
Clients = new;
pthread_mutex_unlock(&ClientsMutex);
}
static int check_socket(SOCKET socket)
{
if (socket != INVALID_SOCKET)
{
fd_set fdactive;
FD_ZERO(&fdactive);
FD_SET(socket, &fdactive);
struct timeval timeout = {0, 0}; // non-blocking
return select(socket + 1, &fdactive, 0, 0, &timeout) == 0;
}
return B_FALSE;
}
static SOCKET find_client(uint32_t addr, uint16_t port)
{
ClientList **p;
SOCKET sock = INVALID_SOCKET;
pthread_mutex_lock(&ClientsMutex);
for (p = &Clients; *p != NULL; p = &(*p)->next)
{
if ((*p)->addr == addr && (*p)->port == port)
{
MDSERR("Cannot connect to " IPADDRPRI ":%u", IPADDRVAR(&addr), port);
shutdown(sock, 2);
close(sock);
return INVALID_SOCKET;
ClientList *l = *p;
if (check_socket(l->sock))
{
sock = l->sock;
MDSDBG("reuse socket %" PRI_SOCKET " for " IPADDRPRI ":%u", sock, IPADDRVAR(&addr), port);
}
else
{
*p = l->next;
close_socket(l->sock);
MDSDBG("cannot reuse socket %" PRI_SOCKET " for " IPADDRPRI ":%u?", l->sock, IPADDRVAR(&addr), port);
free(l);
}
break;
}
MDSDBG("Connected to " IPADDRPRI ":%u", IPADDRVAR(&addr), port);
new = (ClientList *)malloc(sizeof(ClientList));
l = Clients;
Clients = new;
new->addr = addr;
new->port = port;
new->sock = sock;
new->next = l;
}
pthread_mutex_unlock(&ClientsMutex);
return sock;
}
// both
static void RemoveClient(SrvJob *job)
static SOCKET remove_client(uint32_t addr, uint16_t port)
{
ClientList *l, *prev;
for (prev = 0, l = Clients; l;)
ClientList **p;
SOCKET sock = INVALID_SOCKET;
pthread_mutex_lock(&ClientsMutex);
for (p = &Clients; *p != NULL; p = &(*p)->next)
{
if (job->h.addr == l->addr && job->h.port == l->port)
if ((*p)->addr == addr && (*p)->port == port)
{
shutdown(l->sock, 2);
close(l->sock);
if (prev)
prev->next = l->next;
else
Clients = l->next;
ClientList *l = *p;
sock = l->sock;
*p = l->next;
free(l);
break;
}
else
}
pthread_mutex_unlock(&ClientsMutex);
return sock;
}
static SOCKET setup_client(SrvJob *job)
{
const uint32_t addr = job->h.addr;
const uint16_t port = job->h.port;
SOCKET sock = find_client(addr, port);
if (sock == INVALID_SOCKET)
{
sock = open_socket(addr, port);
if (sock != INVALID_SOCKET)
{
prev = l;
l = l->next;
add_client(addr, port, sock);
MDSMSG("setup connection %" PRI_SOCKET " " SVRJOB_PRI, sock, SVRJOB_VAR(job));
}
}
return sock;
}
// both
static void cleanup_client(SrvJob *job)
{
SOCKET sock = remove_client(job->h.addr, job->h.port);
if (sock != INVALID_SOCKET)
close_socket(sock);
if (STATIC_Debug)
{
MDSMSG("cleanup connection %" PRI_SOCKET " " SVRJOB_PRI, sock, SVRJOB_VAR(job));
}
else
{
MDSDBG("cleanup connection %" PRI_SOCKET " " SVRJOB_PRI, sock, SVRJOB_VAR(job));
}
}

/// returns the number of bytes sent
Expand All @@ -910,7 +980,8 @@ static int send_all(SOCKET sock, char *msg, int len)
const int bytes = send(sock, msg + sent, len - sent, MSG_NOSIGNAL);
if (bytes <= 0)
{
sent = bytes;
if (bytes != 0)
sent = bytes;
break;
}
sent += bytes;
Expand All @@ -922,9 +993,7 @@ static int send_all(SOCKET sock, char *msg, int len)
static int send_reply(SrvJob *job, int replyType, int status_in, int length, char *msg)
{
MDSDBG(SVRJOB_PRI " %d", SVRJOB_VAR(job), replyType);
int status;
status = MDSplusERROR;
SOCKET sock;
int status = MDSplusERROR;
long msg_len = msg ? (long)strlen(msg) : 0;
int try_again = FALSE;
char reply[60];
Expand All @@ -933,32 +1002,30 @@ static int send_reply(SrvJob *job, int replyType, int status_in, int length, cha
do
{
errno = 0;
sock = AttachPort(job->h.addr, (uint16_t)job->h.port);
SOCKET sock = setup_client(job);
if (sock == INVALID_SOCKET)
{
MDSMSG(SVRJOB_PRI " break connection", SVRJOB_VAR(job));
break;
}
int bytes = send_all(sock, reply, 60);
MDSDBG("send 60: %d %d", bytes, replyType);
if (bytes == 60)
{
bytes = send_all(sock, msg, length);
if (bytes == length)
if (check_socket(sock))
{
status = MDSplusSUCCESS;
break;
bytes = send_all(sock, msg, length);
MDSDBG("send msg: %d/%d", bytes, length);
if (bytes == length)
{
status = MDSplusSUCCESS;
break;
}
}
}
if (STATUS_NOT_OK)
{
else
try_again = errno == EPIPE;
int debug;
pthread_mutex_lock(&STATIC_lock);
debug = STATIC_Debug;
pthread_mutex_unlock(&STATIC_lock);
if (debug)
{
MDSMSG(SVRJOB_PRI " drop connection", SVRJOB_VAR(job));
}
RemoveClient(job);
}
cleanup_client(job);
} while (try_again--);
return status;
}
40 changes: 28 additions & 12 deletions servershr/ServerSendMessage.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ int ServerSendMessage();
#include <unistd.h>
#endif

// #define DEBUG
#include <socket_port.h>
#include <condition.h>
#include <ipdesc.h>
Expand All @@ -73,7 +74,6 @@ int ServerSendMessage();
#define _NO_SERVER_SEND_MESSAGE_PROTO
#include "servershrp.h"

//#define DEBUG
#include "Client.h"

extern short ArgLen();
Expand Down Expand Up @@ -148,7 +148,7 @@ int ServerSendMessage(int *msgid, char *server, int op, int *retstatus,
addr = *(uint32_t *)&addr_struct.sin_addr;
if (!addr)
{
MDSWRN("could not resolve address the socket is bound to");
MDSWRN("could not resolve address socket %" PRI_SOCKET " is bound to", sock);
if (callback_done)
callback_done(callback_param);
return ServerSOCKET_ADDR_ERROR;
Expand Down Expand Up @@ -196,7 +196,7 @@ int ServerSendMessage(int *msgid, char *server, int op, int *retstatus,
status = SendArg(conid, 0, DTYPE_CSTRING, 1, (short)(ccmd - cmd), 0, 0, cmd);
if (STATUS_NOT_OK)
{
MDSWRN("could not sending message to server");
MDSWRN("could not send message to server");
Job_cleanup(status, jobid);
return status;
}
Expand Down Expand Up @@ -246,7 +246,7 @@ static SOCKET new_reply_socket(uint16_t *port_out)
{
char *dash;
for (dash = range; *dash && *dash != '-'; dash++)
;
continue;
if (dash)
*(dash++) = 0;
start_port = (uint16_t)(strtol(range, NULL, 0) & 0xffff);
Expand Down Expand Up @@ -527,16 +527,32 @@ static inline int server_connect(char *server, uint32_t addr, uint16_t port)
{
int conid;
LOCK_CLIENTS;
conid = ConnectToMds(server);
if (conid != INVALID_CONNECTION_ID)
Client *c = Client_get_by_addr_and_port_locked(addr, port);
if (c && get_socket_by_conid(c->conid) != INVALID_SOCKET)
{
Client *c = newClient(addr, port, conid);
MDSDBG(CLIENT_PRI " connected to %s", CLIENT_VAR(c), server);
Client_push_locked(c);
conid = c->conid;
}
else
{
MDSWRN("Could not connect to %s (" IPADDRPRI ":%d)", server, IPADDRVAR(&addr), port);
conid = ConnectToMds(server);
if (conid != INVALID_CONNECTION_ID)
{
if (c)
{
MDSDBG(CLIENT_PRI " re-connected to %s as %d", CLIENT_VAR(c), server, conid);
c->conid = conid;
}
else
{
c = newClient(addr, port, conid);
MDSDBG(CLIENT_PRI " connected to %s", CLIENT_VAR(c), server);
Client_push_locked(c);
}
}
else
{
MDSWRN("Could not connect to %s (" IPADDRPRI ":%d)", server, IPADDRVAR(&addr), port);
}
}
UNLOCK_CLIENTS;
return conid;
Expand Down Expand Up @@ -585,7 +601,7 @@ static void accept_client(SOCKET reply_sock, struct sockaddr_in *sin, fd_set *fd
else
{
MDSWRN("Dropped connection from " IPADDRPRI ":%d", IPADDRVAR(&addr), port);
shutdown(reply_sock, 2);
close(reply_sock);
shutdown(reply_sock, SHUT_RDWR);
closesocket(reply_sock);
}
}
Loading