Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

port_create race test and fix #215

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 239 additions & 0 deletions app/tests/port_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,244 @@ static int ping_pong_thread(void *arg)
return __LINE__;
}

const char *kStatusPortNames[] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sure all of these are static so the compiler/linker will strip them if not building with the console.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reposting with that change now -- thanks!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Some of the functions in there can be made static too and I can make/test that for this pull or just do a second. Whichever works!)

"status0",
"status1",
};
port_packet_t kRepeat = {{'R', 'E', 'P', 'E', 'A', 'T', 0, 0}};
port_packet_t kQuit = {{'Q', 'U', 'I', 'T', 0, 0, 0, 0}};
const char *kRacePortName = "racer_port";
event_t race_evt;

static int race_thread(void *arg)
{
port_t r_port;
int tid = (int)arg;

printf("thread %d: connecting to control port\n", tid);
status_t st = port_open("race_ctl", NULL, &r_port);
if (st < 0) {
printf("thread %d: could not open control port, status = %d\n", tid, st);
return __LINE__;
}

printf("thread %d: creating status port\n", tid);
port_t w_port;
st = port_create(kStatusPortNames[tid], PORT_MODE_UNICAST, &w_port);
if (st < 0) {
printf("thread %d: could not create status port, status = %d\n", tid, st);
port_close(r_port);
return __LINE__;
}

// Loop is meant to coordinate a port_create() race.
// The event triggers the race.
// The thread sleeps briefly then cleans up
// The thread reports its claim to its status port.
// It then waits for a repeat or quit message.
int ret = -1;
while (ret < 0) {
LTRACEF_LEVEL(1, "thread %d: waiting at the starting line\n", tid);
if (event_wait_timeout(&race_evt, INFINITE_TIME) != NO_ERROR) {
ret = __LINE__;
break;
}

port_t race_port;
while(true) {
st = port_create("racer_port", PORT_MODE_UNICAST, &race_port);
if (st != ERR_BUSY)
break;
thread_sleep(25);
} // EINTR all over again . . .
LTRACEF_LEVEL(1, "thread %d: sampling chronochip (%x)\n", tid, race_port);
if (st == ERR_ALREADY_EXISTS) {
// lost the race to create the port.
} else if (st < 0) {
LTRACEF_LEVEL(1, "thread %d: could not open port, status = %d\n", tid, st);
ret = __LINE__;
break;
} else { // Dispose of it now.
thread_sleep(25);
port_close(race_port);
port_destroy(race_port);
}

// Now send the stale pointer address as a status.
port_packet_t claimed_port = {{0}};
int len = sizeof(claimed_port.value);
if (sizeof(race_port) < (size_t)len)
len = sizeof(race_port);
for (int i = 0; i < len; ++i) {
claimed_port.value[i] = 0xff & ((int)race_port) >> (i * 8);
}
LTRACEF_LEVEL(1, "thread %d: reporting status\n", tid);
st = port_write(w_port, &claimed_port, 1);
if (st < 0) {
printf("thread %d: could not write port, status = %d\n", tid, st);
ret = __LINE__;
break;
}

LTRACEF_LEVEL(1, "thread %d: awaiting instructions\n", tid);
port_result_t pr;
st = port_read(r_port, INFINITE_TIME, &pr);
if (st == ERR_CANCELLED) {
printf("thread %d: could not read port, status = %d (CANCELLED)\n", tid, st);
ret = __LINE__;
break;
} else if (st < 0) {
printf("thread %d: could not read port, status = %d\n", tid, st);
ret = __LINE__;
break;
}
if (memcmp(pr.packet.value, kQuit.value, sizeof(pr.packet.value)) == 0) {
ret = 0;
break;
}
if (memcmp(pr.packet.value, kRepeat.value, sizeof(pr.packet.value)) == 0) {
continue;
}
printf("thread %d: got a weird message from the control port\n", tid);
ret = __LINE__;
}
thread_sleep((1+tid) * 5); // Make console output orderly.
printf("thread %d: shutting down (ret=%d)\n", tid, ret);

port_close(r_port);
port_close(w_port);
port_destroy(w_port);
return ret;
}


int two_threads_race(void)
{
printf("two_threads_race test . . .\n");
// Used to tell the threads what to do.
port_t w_port;
status_t st = port_create("race_ctl", PORT_MODE_BROADCAST, &w_port);
if (st < 0) {
printf("could not create port, status = %d\n", st);
return __LINE__;
}

event_init(&race_evt, false, 0);

thread_t *t1 = thread_create(
"rt0", &race_thread, (void *)0, DEFAULT_PRIORITY, DEFAULT_STACK_SIZE);
thread_t *t2 = thread_create(
"rt1", &race_thread, (void *)1, DEFAULT_PRIORITY, DEFAULT_STACK_SIZE);
thread_set_real_time(t1);
thread_set_real_time(t2);
thread_resume(t1);
thread_resume(t2);

// wait for each status port to be created so we can
// track behavior.
port_t r_port0, r_port1;
printf("control: connecting to thread 0 . . .\n");
while (true) {
status_t st = port_open(kStatusPortNames[0], NULL, &r_port0);
if (st == NO_ERROR) {
break;
} else if (st == ERR_NOT_FOUND) {
thread_sleep(100);
} else {
printf("could not open port, status = %d\n", st);
// XXX: clean up...
break;
}
}
printf("control: connecting to thread 1 . . .\n");
while (true) {
status_t st = port_open(kStatusPortNames[1], NULL, &r_port1);
if (st == NO_ERROR) {
break;
} else if (st == ERR_NOT_FOUND) {
thread_sleep(100);
} else {
printf("could not open port, status = %d\n", st);
return __LINE__;
}
}

// control port says: 0 "REPEAT, or 1 "QUIT"
int ret = 0;
int count = 0;
while (ret == 0) {
LTRACEF_LEVEL(1, "Go!\n");
printf(".");
event_signal(&race_evt, false);
port_result_t pr0, pr1;
LTRACEF_LEVEL(1, "Collecting status from thread 0 . . .\n");
st = port_read(r_port0, INFINITE_TIME, &pr0);
if (st < 0) {
printf("could not read port, status = %d\n", st);
ret = __LINE__;
}
LTRACEF_LEVEL(1, "Collecting status from thread 1 . . .\n");
st = port_read(r_port1, INFINITE_TIME, &pr1);
if (st < 0) {
printf("could not read port, status = %d\n", st);
ret = __LINE__;
}
LTRACEF_LEVEL(1, "Checking responses . . .\n");
if (memcmp(pr0.packet.value, pr1.packet.value, sizeof(pr0.packet.value)) != 0) {
printf("Race detected on iteration %d!\n", count);
ret = __LINE__;
}
event_unsignal(&race_evt);
int repeat = (ret == 0 && count++ < 99 ? 1 : 0);
LTRACEF_LEVEL(1, "Telling threads to %s\n", (repeat ? "repeat" : "quit"));
st = port_write(w_port, (repeat ? &kRepeat : &kQuit), 1);
if (st < 0) {
printf("could not write port, status = %d\n", st);
ret = __LINE__;
}
if (!repeat) {
break;
}
}
printf("\n%d passes completed with result %d\n", count, ret);

st = port_close(r_port0);
if (st < 0) {
printf("could not close port, status = %d\n", st);
ret = __LINE__;
}

st = port_close(r_port1);
if (st < 0) {
printf("could not close port, status = %d\n", st);
ret = __LINE__;
}

st = port_close(w_port);
if (st < 0) {
printf("could not close port, status = %d\n", st);
ret = __LINE__;
}

int retcode = -1;
thread_join(t1, &retcode, INFINITE_TIME);
if (retcode)
ret = retcode;

thread_join(t2, &retcode, INFINITE_TIME);
if (retcode)
ret = retcode;

st = port_destroy(w_port);
if (st < 0) {
printf("could not destroy port, status = %d\n", st);
ret = __LINE__;
}

printf("two_thread_race: %d\n", ret);
return ret;
}


int two_threads_basic(void)
{
Expand Down Expand Up @@ -738,6 +976,7 @@ int port_tests(void)
while (count--) {
RUN_TEST(single_thread_basic);
RUN_TEST(two_threads_basic);
RUN_TEST(two_threads_race);
RUN_TEST(group_basic);
RUN_TEST(group_dynamic);
}
Expand Down
34 changes: 27 additions & 7 deletions kernel/port.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

#define READPORT_MAGIC (0x70727472) // 'prtr'
#define PORTGROUP_MAGIC (0x70727467) // 'prtg'
#define PORTHOLD_MAGIC (0x70727467) // 'prth'

#define PORT_BUFF_SIZE 8
#define PORT_BUFF_SIZE_BIG 64
Expand Down Expand Up @@ -148,16 +149,24 @@ status_t port_create(const char *name, port_mode_t mode, port_t *port)
return ERR_INVALID_ARGS;
}

if (strlen(name) >= PORT_NAME_LEN)
if (strnlen(name, PORT_NAME_LEN) >= PORT_NAME_LEN)
return ERR_INVALID_ARGS;


// Add a stack-allocated port to the list until we can
// replace it with a heap-allocated port.
write_port_t stack_wp = { .magic = PORTHOLD_MAGIC };
// We waste a few cycles here with a throwaway copy.
strlcpy(stack_wp.name, name, sizeof(stack_wp.name));

// lookup for existing port, return that if found.
write_port_t *wp = NULL;
THREAD_LOCK(state1);
list_for_every_entry(&write_port_list, wp, write_port_t, node) {
if (strcmp(wp->name, name) == 0) {
// can't return closed ports.
if (wp->magic == WRITEPORT_MAGIC_X)
// can't return closed or partial ports.
if (wp->magic == WRITEPORT_MAGIC_X ||
wp->magic == PORTHOLD_MAGIC)
wp = NULL;
THREAD_UNLOCK(state1);
if (wp) {
Expand All @@ -168,12 +177,17 @@ status_t port_create(const char *name, port_mode_t mode, port_t *port)
}
}
}
list_add_tail(&write_port_list, &stack_wp.node);
THREAD_UNLOCK(state1);

// not found, create the write port and the circular buffer.
wp = calloc(1, sizeof(write_port_t));
if (!wp)
if (!wp) {
THREAD_LOCK(state2);
list_delete(&stack_wp.node);
THREAD_UNLOCK(state2);
return ERR_NO_MEMORY;
}

wp->magic = WRITEPORT_MAGIC_W;
wp->mode = mode;
Expand All @@ -184,13 +198,18 @@ status_t port_create(const char *name, port_mode_t mode, port_t *port)
wp->buf = make_buf(size);
if (!wp->buf) {
free(wp);
THREAD_LOCK(state2);
list_delete(&stack_wp.node);
THREAD_UNLOCK(state2);
return ERR_NO_MEMORY;
}

// todo: race condtion! a port with the same name could have been created
// by another thread at is point.
// Avoid a name collision by swapping the temporary placeholder out of the
// list for the actual port.
THREAD_LOCK(state2);
// Let's reserve a stack allocated entry then swap it for the allocated one.
list_add_tail(&write_port_list, &wp->node);
list_delete(&stack_wp.node);
THREAD_UNLOCK(state2);

*port = (void *)wp;
Expand Down Expand Up @@ -226,7 +245,8 @@ status_t port_open(const char *name, void *ctx, port_t *port)
THREAD_LOCK(state);
write_port_t *wp = NULL;
list_for_every_entry(&write_port_list, wp, write_port_t, node) {
if (strcmp(wp->name, name) == 0) {
if (strcmp(wp->name, name) == 0 &&
wp->magic != PORTHOLD_MAGIC) {
// found; add read port to write port list.
rp->wport = wp;
if (wp->buf) {
Expand Down