Skip to content

Commit

Permalink
Task stabilize notify (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolphmax authored Feb 4, 2024
1 parent 0b39992 commit cfff2b2
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 50 deletions.
21 changes: 21 additions & 0 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,27 @@ name: Run Tests
on: [push]

jobs:
test_p2:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Get cmake
uses: threeal/[email protected]
- name: build application
run: |
cmake -B build
make -C build
- name: Set up Python 3.11
uses: actions/setup-python@v3
with:
python-version: 3.11
- name: Run tests
uses: dariocurr/pytest-summary@main
with:
paths: test/test_praxis2.py
options: --quiet
show: all

test_p3:
runs-on: ubuntu-latest
steps:
Expand Down
5 changes: 5 additions & 0 deletions src/lib/dht.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,18 @@ dht_node* dht_node_init(char *dht_node_id, char *dht_anchor_ip, char *dht_anchor
void dht_node_free(dht_node *node) {
if (node->pred != NULL) free(node->pred);
if (node->succ != NULL)free(node->succ);
free(node->lookup_cache);
free(node);
}

unsigned short dht_node_is_responsible(dht_node *node, uint16_t hash) {
if (node->pred == NULL) return 1;

if (node->pred->ID > node->ID) {
if (node->ID >= hash || node->pred->ID < hash) return 1;
} else if (node->ID >= hash && node->pred->ID < hash) return 1;

if (node->succ == NULL) return 1;

if (node->ID > node->succ->ID) {
if (hash <= node->succ->ID || hash > node->ID) return 2;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/dht.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <stdint.h>

#define LOOKUP_CACHE_SIZE 10
#define STABILIZE_INTERVAL 10
#define STABILIZE_INTERVAL 1000

typedef enum dht_node_status {
JOINING,
Expand Down
2 changes: 1 addition & 1 deletion src/lib/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ int http_handle(int *in_fd, webserver *ws, file_system *fs) {
} else perror("Error processing request");

http_response_free(res);
http_request_free(req);
if (req != NULL) http_request_free(req);

free(buf);
return 0;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ int socket_open(webserver *ws, int socktype) {
}

ws->open_sockets[ws->num_open_sockets].fd = sockfd;
// ws->open_sockets[ws->num_open_sockets].events = POLLIN | POLLOUT;
ws->open_sockets[ws->num_open_sockets].events = POLLIN | POLLOUT;
ws->open_sockets_config[ws->num_open_sockets].is_server_socket = 1;
ws->num_open_sockets++;
freeaddrinfo(res);
Expand Down
66 changes: 46 additions & 20 deletions src/lib/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ char* udp_packet_serialize(udp_packet *pkt) {
pkt->bytesize = UDP_DATA_SIZE;
char *msg = calloc(pkt->bytesize, sizeof(char));

memcpy(msg, &(pkt->type), 1);
// memcpy(msg, &(pkt->type), 1);
msg[0] = pkt->type;
memcpy(msg + 1, &h, 2);
memcpy(msg + 3, &id, 2);
memcpy(msg + 5, &ip, 4);
Expand Down Expand Up @@ -112,12 +113,12 @@ int udp_parse_packet(char *pkt_string, udp_packet *pkt) {
int udp_process_packet(webserver *ws, udp_packet *pkt_out, udp_packet *pkt_in) {
if (pkt_in == NULL) return -1;

unsigned short responsibility;
if (ws->node == NULL) responsibility = 1;
else if (pkt_in->type == JOIN) responsibility = dht_node_is_responsible(ws->node, pkt_in->node_id);
else responsibility = dht_node_is_responsible(ws->node, pkt_in->hash);

if (pkt_in->type == LOOKUP || pkt_in->type == JOIN) {
unsigned short responsibility;
if (ws->node == NULL) responsibility = 1;
else if (pkt_in->type == JOIN) responsibility = dht_node_is_responsible(ws->node, pkt_in->node_id);
else responsibility = dht_node_is_responsible(ws->node, pkt_in->hash);

if (responsibility == 0 || (pkt_in->type == JOIN && responsibility == 2)) { // -> forward message to successor
strcpy(pkt_out->node_ip, pkt_in->node_ip);
pkt_out->node_port = pkt_in->node_port;
Expand Down Expand Up @@ -146,6 +147,10 @@ int udp_process_packet(webserver *ws, udp_packet *pkt_out, udp_packet *pkt_in)
if (pkt_in->type == JOIN) {
free(ws->node->pred);
ws->node->pred = dht_neighbor_from_packet(pkt_in);

if (ws->node->succ == NULL) {
ws->node->succ = dht_neighbor_from_packet(pkt_in);
}
}

} else if (responsibility == 2) {
Expand All @@ -157,10 +162,23 @@ int udp_process_packet(webserver *ws, udp_packet *pkt_out, udp_packet *pkt_in)

return 0;

} else if (pkt_in->type == STABILIZE) {
if (ws->node->pred == NULL) {
ws->node->pred = dht_neighbor_from_packet(pkt_in);
}

pkt_out->type = NOTIFY;
pkt_out->hash = 0;
pkt_out->node_id = ws->node->pred->ID;
strcpy(pkt_out->node_ip, ws->node->pred->IP);
pkt_out->node_port = strtol(ws->node->pred->PORT, NULL, 10);
return 0;

} else if (pkt_in->type == NOTIFY) {
free(ws->node->succ);
ws->node->succ = dht_neighbor_from_packet(pkt_in);
ws->node->status = OK;
if (pkt_in->node_id != ws->node->ID || pkt_in->node_port != strtol(ws->PORT, NULL, 10) || strcmp(pkt_in->node_ip, ws->HOST) != 0) {
free(ws->node->succ);
ws->node->succ = dht_neighbor_from_packet(pkt_in);
}

} else if (pkt_in->type == REPLY) {
pkt_out->node_id = pkt_in->node_id;
Expand All @@ -177,7 +195,7 @@ int udp_process_packet(webserver *ws, udp_packet *pkt_out, udp_packet *pkt_in)
return 1; // don't answer received replies / notfies
}

int udp_handle(int *in_fd, webserver *ws) {
int udp_handle(short events, int *in_fd, webserver *ws) {
char *buf = calloc(UDP_DATA_SIZE+1, sizeof(char));

udp_packet *pkt_in = udp_packet_create(0, 0, 0, NULL, NULL);
Expand All @@ -196,19 +214,23 @@ int udp_handle(int *in_fd, webserver *ws) {
strcpy(pkt_in->node_ip, ws->node->succ->IP);
pkt_in->node_port = strtol(ws->node->succ->PORT, NULL, 10);

} else if (ws->node->status == STABILIZING) {
pkt_out->type = STABILIZE;
pkt_out->hash = ws->node->ID;
pkt_out->node_id = ws->node->ID;
strcpy(pkt_out->node_ip, ws->HOST);
pkt_out->node_port = strtol(ws->PORT, NULL, 10);

strcpy(pkt_in->node_ip, ws->node->succ->IP);
pkt_in->node_port = strtol(ws->node->succ->PORT, NULL, 10);
ws->node->status = OK;

} else if (ws->node->status == STABILIZING) { // This node has to stabilize
if (ws->node->succ != NULL) {
pkt_out->type = STABILIZE;
pkt_out->hash = ws->node->ID;
pkt_out->node_id = ws->node->ID;
strcpy(pkt_out->node_ip, ws->HOST);
pkt_out->node_port = strtol(ws->PORT, NULL, 10);

strcpy(pkt_in->node_ip, ws->node->succ->IP);
pkt_in->node_port = strtol(ws->node->succ->PORT, NULL, 10);
}

ws->node->status = OK;

} else {
} else if (events & POLLIN) {
// TODO: refactor this into combined function in socket (ideally)
struct sockaddr_in addr;
socklen_t addr_len = sizeof(addr);
Expand Down Expand Up @@ -244,14 +266,18 @@ int udp_handle(int *in_fd, webserver *ws) {
udp_packet_free(pkt_in);
udp_packet_free(pkt_out);
free(buf);
return -1;
}
}

if (!(events & POLLOUT)) return 0;

char *res_msg = udp_packet_serialize(pkt_out);

char *port_str = calloc(7, sizeof(char));
snprintf(port_str, 6, "%d", pkt_in->node_port);
socket_send(ws, in_fd, res_msg, pkt_out->bytesize, pkt_in->node_ip, port_str);
free(port_str);
free(res_msg);

udp_packet_free(pkt_in);
Expand Down
3 changes: 2 additions & 1 deletion src/lib/udp.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ int udp_send_to_node(webserver *ws, int *sockfd, udp_packet *packet, dht_neighbo
* Handles an incoming UDP connection.
* @param in_fd Socket File Descriptor of the accepted connection.
* @param ws Webserver object.
* @param evemt The event(s) returned by poll.
* @return 0 on success, -1 on error.
*/
int udp_handle(int *in_fd, webserver *ws);
int udp_handle(short events, int *in_fd, webserver *ws);

#endif //RN_PRAXIS_UDP_H
55 changes: 29 additions & 26 deletions src/webserver.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,19 @@ webserver* webserver_init(char* hostname, char* port_str) {
return ws;
}

void handle_connection(int *in_fd, enum connection_protocol protocol, webserver *ws, file_system *fs) {
/**
* TODO: Doc this
* @return 0 when the connection is still alive, -1 when the socket's events have to be reevaluated
*/
int handle_connection(short events, int *in_fd, enum connection_protocol protocol, webserver *ws, file_system *fs) {
if (protocol == TCP) {
http_handle(in_fd, ws, fs);
if (http_handle(in_fd, ws, fs) < 0) return -1;
} else if (protocol == UDP) {
udp_handle(in_fd, ws);
udp_handle(events, in_fd, ws);
return -1;
}

return 0;
}

int webserver_tick(webserver *ws, file_system *fs) {
Expand Down Expand Up @@ -90,28 +97,20 @@ int webserver_tick(webserver *ws, file_system *fs) {
}

// Handle UDP server socket & all client sockets
handle_connection(&(sock->fd), sock_config->protocol, ws, fs);
sock->events = 0;
if (sock_config->is_server_socket == 1) return 0;

// Finding corresponding server-socket and re-enabling it
/*
for (int j = 0; j < ws->num_open_sockets; j++) {
if (ws->open_sockets_config[j].is_server_socket != 1) continue;
if (ws->open_sockets_config[j].protocol == sock_config->protocol) {
ws->open_sockets[j].events = POLLIN;
break;
if (handle_connection(sock->revents, &(sock->fd), sock_config->protocol, ws, fs) < 0) {
// Disableing all events
sock->events = 0;

if (sock_config->protocol == TCP) {
// Disabling the TCP client socket
sock->fd = -1;
sock_config->is_server_socket = 0;
sock_config->protocol = 0;

// re-enableing the TCP server socket
// TODO: Check if this is necessary
}
}
*/

// Disabling the client socket
// TODO: when to remove client sockets?
/*sock->events = 0;
sock->fd = -1;
sock_config->is_server_socket = 0;
sock_config->protocol = 0;*/
}

return 0;
Expand All @@ -121,6 +120,7 @@ void webserver_free(webserver *ws) {
free(ws->HOST);
free(ws->PORT);
free(ws->open_sockets);
free(ws->open_sockets_config);

if (ws->node != NULL) dht_node_free(ws->node);

Expand Down Expand Up @@ -167,6 +167,9 @@ int main(int argc, char **argv) {
exit(EXIT_FAILURE);
}

int should_stabilize = 0;
if (getenv("NO_STABILIZE") == NULL) should_stabilize = 1;

int t = 0;
int quit = 0;
while(!quit) {
Expand All @@ -178,17 +181,17 @@ int main(int argc, char **argv) {

if (sock_config->is_server_socket == 1) {
if ((ws->node->status == JOINING | ws->node->status == STABILIZING) && sock_config->protocol == UDP) {
sock->events = sock->events | POLLOUT;
sock->events = POLLOUT;
break;
} else if (ws->node->status == OK) {
sock->events = sock->events | POLLIN;
sock->events = POLLIN | POLLOUT;
}
}
}

if (webserver_tick(ws, fs) != 0) quit = 1;

if (t % STABILIZE_INTERVAL == 0 && ws->node != NULL) {
if (should_stabilize == 1 && t % STABILIZE_INTERVAL == 0 && ws->node != NULL) {
ws->node->status = STABILIZING;
t = 0;
}
Expand Down

0 comments on commit cfff2b2

Please sign in to comment.