Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

epoll: Don't iterate all the fds when using epoll #5

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ typedef struct _st_eventsys_ops {
int (*fd_new)(int); /* New descriptor allocated */
int (*fd_close)(int); /* Descriptor closed */
int (*fd_getlimit)(void); /* Descriptor hard limit */
int (*pollq_add)(_st_pollq_t *pq);
void (*pollq_del)(_st_pollq_t *pq);
} _st_eventsys_t;


Expand Down
190 changes: 148 additions & 42 deletions event.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ typedef struct _epoll_fd_data {
int wr_ref_cnt;
int ex_ref_cnt;
int revents;
/* The following members aren't touched after forking. */
union {
_st_pollq_t *pq;
_st_pollq_t **pqs;
};
int pq_cnt;
Copy link
Member

@winlinvip winlinvip Apr 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_st_epolldata. evtlist.epoll_data.ptr是可以放自定义指针的。
可能更适合放_st_pollq_t的信息,可以存放协程的链表_st_clist_t。

typedef union epoll_data {
    void    *ptr;
    int      fd;
    uint32_t u32;
    uint64_t u64;
} epoll_data_t;

struct epoll_event {
    uint32_t     events;    /* Epoll events */
    epoll_data_t data;      /* User data variable */
};

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[[[[[**_```

> ~~~~~~~~``~~committed ~~``~~~~~~~~

} _epoll_fd_data_t;

static struct _st_epolldata {
Expand Down Expand Up @@ -1224,16 +1230,97 @@ ST_HIDDEN int _st_epoll_pollset_add(struct pollfd *pds, int npds)
return 0;
}

ST_HIDDEN void _st_epoll_pollq_del(_st_pollq_t *pq)
{
struct pollfd *pd = pq->pds;
struct pollfd *pd_end = pd + pq->npds;
_epoll_fd_data_t *efd;
int i;

while (pd < pd_end) {
efd = &_st_epoll_data->fd_data[pd->fd];
if (efd->pq_cnt == 1) {
if (efd->pq == pq)
efd->pq = NULL;
} else if (efd->pq_cnt > 0) {
for (i = 0; i < efd->pq_cnt; ++i) {
if (efd->pqs[i] == pq) {
efd->pqs[i] = NULL;
break;
}
}
}
++pd;
}
}

ST_HIDDEN int _st_epoll_pollq_add(_st_pollq_t *pq)
{
struct pollfd *pd = pq->pds;
struct pollfd *pd_end = pd + pq->npds;
_epoll_fd_data_t *efd;
int i;
_st_pollq_t **pqs;

while (pd < pd_end) {
efd = &_st_epoll_data->fd_data[pd->fd];
if (efd->pq_cnt == 0) {
efd->pq = pq;
efd->pq_cnt = 1;
} else if (efd->pq_cnt == 1) {
if (efd->pq == NULL) {
efd->pq = pq;
} else {
assert(efd->pq != pq);
pqs = malloc(sizeof(*pqs) * 2);
if (!pqs) {
_st_epoll_pollq_del(pq);
errno = ENOMEM;
return -1;
}
pqs[0] = efd->pq;
pqs[1] = pq;
efd->pqs = pqs;
efd->pq_cnt = 2;
}
} else {
for (i = 0; i < efd->pq_cnt; ++i) {
if (efd->pqs[i] == NULL) {
efd->pqs[i] = pq;
break;
} else {
assert(efd->pqs[i] != pq);
}
}
if (i == efd->pq_cnt) {
pqs = realloc(efd->pqs, sizeof(*pqs) * (efd->pq_cnt + 1));
if (!pqs) {
_st_epoll_pollq_del(pq);
errno = ENOMEM;
return -1;
}
efd->pqs = pqs;
efd->pqs[efd->pq_cnt++] = pq;
}
}
++pd;
}

return 0;
}

ST_HIDDEN void _st_epoll_dispatch(void)
{
st_utime_t min_timeout;
_st_clist_t *q;
_st_pollq_t *pq;
struct pollfd *pds, *epds;
struct epoll_event ev;
int timeout, nfd, i, osfd, notify;
int timeout, nfd, i, j, osfd, notify;
int events, op;
short revents;
_epoll_fd_data_t *efd;
_st_pollq_t **pqs;

if (_ST_SLEEPQ == NULL) {
timeout = -1;
Expand All @@ -1255,8 +1342,10 @@ ST_HIDDEN void _st_epoll_dispatch(void)
_st_epoll_data->pid = getpid();

/* Put all descriptors on ioq into new epoll set */
memset(_st_epoll_data->fd_data, 0,
_st_epoll_data->fd_data_size * sizeof(_epoll_fd_data_t));
for (i = 0; i < _st_epoll_data->fd_data_size; ++i) {
memset(&_st_epoll_data->fd_data[i], 0,
offsetof(_epoll_fd_data_t, pq));
}
_st_epoll_data->evtlist_cnt = 0;
for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
pq = _ST_POLLQUEUE_PTR(q);
Expand All @@ -1278,48 +1367,63 @@ ST_HIDDEN void _st_epoll_dispatch(void)
}
}

for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
pq = _ST_POLLQUEUE_PTR(q);
notify = 0;
epds = pq->pds + pq->npds;

for (pds = pq->pds; pds < epds; pds++) {
if (_ST_EPOLL_REVENTS(pds->fd) == 0) {
pds->revents = 0;
for (i = 0; i < nfd; ++i) {
osfd = _st_epoll_data->evtlist[i].data.fd;
efd = &_st_epoll_data->fd_data[osfd];
assert(efd->pq_cnt > 0);
if (efd->pq_cnt == 1)
pqs = &efd->pq;
else
pqs = efd->pqs;
for (j = 0; j < efd->pq_cnt; ++j) {
pq = pqs[j];
if (!pq)
continue;
}
osfd = pds->fd;
events = pds->events;
revents = 0;
if ((events & POLLIN) && (_ST_EPOLL_REVENTS(osfd) & EPOLLIN))
revents |= POLLIN;
if ((events & POLLOUT) && (_ST_EPOLL_REVENTS(osfd) & EPOLLOUT))
revents |= POLLOUT;
if ((events & POLLPRI) && (_ST_EPOLL_REVENTS(osfd) & EPOLLPRI))
revents |= POLLPRI;
if (_ST_EPOLL_REVENTS(osfd) & EPOLLERR)
revents |= POLLERR;
if (_ST_EPOLL_REVENTS(osfd) & EPOLLHUP)
revents |= POLLHUP;
notify = 0;
epds = pq->pds + pq->npds;

pds->revents = revents;
if (revents) {
notify = 1;
for (pds = pq->pds; pds < epds; pds++) {
if (_ST_EPOLL_REVENTS(pds->fd) == 0) {
pds->revents = 0;
continue;
}
osfd = pds->fd;
events = pds->events;
revents = 0;
if ((events & POLLIN) &&
(_ST_EPOLL_REVENTS(osfd) & EPOLLIN))
revents |= POLLIN;
if ((events & POLLOUT) &&
(_ST_EPOLL_REVENTS(osfd) & EPOLLOUT))
revents |= POLLOUT;
if ((events & POLLPRI) &&
(_ST_EPOLL_REVENTS(osfd) & EPOLLPRI))
revents |= POLLPRI;
if (_ST_EPOLL_REVENTS(osfd) & EPOLLERR)
revents |= POLLERR;
if (_ST_EPOLL_REVENTS(osfd) & EPOLLHUP)
revents |= POLLHUP;

pds->revents = revents;
if (revents) {
notify = 1;
}
}
}
if (notify) {
ST_REMOVE_LINK(&pq->links);
pq->on_ioq = 0;
/*
* Here we will only delete/modify descriptors that
* didn't fire (see comments in _st_epoll_pollset_del()).
*/
_st_epoll_pollset_del(pq->pds, pq->npds);
if (notify) {
_st_epoll_pollq_del(pq);
ST_REMOVE_LINK(&pq->links);
pq->on_ioq = 0;
/*
* Here we will only delete/modify descriptors that
* didn't fire (see comments in _st_epoll_pollset_del()).
*/
_st_epoll_pollset_del(pq->pds, pq->npds);

if (pq->thread->flags & _ST_FL_ON_SLEEPQ)
_ST_DEL_SLEEPQ(pq->thread);
pq->thread->state = _ST_ST_RUNNABLE;
_ST_ADD_RUNQ(pq->thread);
if (pq->thread->flags & _ST_FL_ON_SLEEPQ)
_ST_DEL_SLEEPQ(pq->thread);
pq->thread->state = _ST_ST_RUNNABLE;
_ST_ADD_RUNQ(pq->thread);
}
}
}

Expand Down Expand Up @@ -1389,7 +1493,9 @@ static _st_eventsys_t _st_epoll_eventsys = {
_st_epoll_pollset_del,
_st_epoll_fd_new,
_st_epoll_fd_close,
_st_epoll_fd_getlimit
_st_epoll_fd_getlimit,
_st_epoll_pollq_add,
_st_epoll_pollq_del
};
#endif /* MD_HAVE_EPOLL */

Expand Down
4 changes: 4 additions & 0 deletions sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
pq.npds = npds;
pq.thread = me;
pq.on_ioq = 1;
if (*_st_eventsys->pollq_add && (*_st_eventsys->pollq_add)(&pq))
return -1;
_ST_ADD_IOQ(pq);
if (timeout != ST_UTIME_NO_TIMEOUT)
_ST_ADD_SLEEPQ(me, timeout);
Expand All @@ -87,6 +89,8 @@ int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)

n = 0;
if (pq.on_ioq) {
if (*_st_eventsys->pollq_del)
(*_st_eventsys->pollq_del)(&pq);
/* If we timed out, the pollq might still be on the ioq. Remove it */
_ST_DEL_IOQ(pq);
(*_st_eventsys->pollset_del)(pds, npds);
Expand Down