Skip to content

Commit

Permalink
Fix dbengine consistency when a writer modifies a page concurrently w…
Browse files Browse the repository at this point in the history
…ith a reader querying its metrics (netdata#6979)
  • Loading branch information
mfundul authored and Saruspete committed Oct 9, 2019
1 parent a9f473d commit 34162a0
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 12 deletions.
38 changes: 38 additions & 0 deletions database/engine/pagecache.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,42 @@ extern void free_page_cache(struct rrdengine_instance *ctx);
extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr);
extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index);

static inline void
pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_timep, uint32_t *page_lengthp)
{
usec_t end_time, old_end_time;
uint32_t page_length;

if (NULL == descr->extent) {
/* this page is currently being modified, get consistent info locklessly */
do {
end_time = descr->end_time;
__sync_synchronize();
old_end_time = end_time;
page_length = descr->page_length;
__sync_synchronize();
end_time = descr->end_time;
__sync_synchronize();
} while ((end_time != old_end_time || (end_time & 1) != 0));

*end_timep = end_time;
*page_lengthp = page_length;
} else {
*end_timep = descr->end_time;
*page_lengthp = descr->page_length;
}
}

/* The caller must hold a reference to the page and must have already set the new data */
static inline void pg_cache_atomic_set_pg_info(struct rrdeng_page_descr *descr, usec_t end_time, uint32_t page_length)
{
assert(!(end_time & 1));
__sync_synchronize();
descr->end_time |= 1; /* mark start of uncertainty period by adding 1 microsecond */
__sync_synchronize();
descr->page_length = page_length;
__sync_synchronize();
descr->end_time = end_time; /* mark end of uncertainty period */
}

#endif /* NETDATA_PAGECACHE_H */
28 changes: 16 additions & 12 deletions database/engine/rrdengineapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n
}
page = descr->pg_cache_descr->page;
page[descr->page_length / sizeof(number)] = number;
descr->end_time = point_in_time;
descr->page_length += sizeof(number);
pg_cache_atomic_set_pg_info(descr, point_in_time, descr->page_length + sizeof(number));

if (perfect_page_alignment)
rd->rrdset->rrddim_page_alignment = descr->page_length;
if (unlikely(INVALID_TIME == descr->start_time)) {
Expand Down Expand Up @@ -470,7 +470,8 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
struct rrdeng_page_descr *descr;
storage_number *page, ret;
unsigned position, entries;
usec_t next_page_time, current_position_time;
usec_t next_page_time, current_position_time, page_end_time;
uint32_t page_length;

handle = &rrdimm_handle->rrdeng;
if (unlikely(INVALID_TIME == handle->next_page_time)) {
Expand All @@ -480,15 +481,17 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
if (unlikely(NULL == (descr = handle->descr))) {
/* it's the first call */
next_page_time = handle->next_page_time * USEC_PER_SEC;
} else {
pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length);
}
position = handle->position + 1;

if (unlikely(NULL == descr ||
position >= (descr->page_length / sizeof(storage_number)))) {
position >= (page_length / sizeof(storage_number)))) {
/* We need to get a new page */
if (descr) {
/* Drop old page's reference */
handle->next_page_time = (descr->end_time / USEC_PER_SEC) + 1;
handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1;
if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) {
goto no_more_metrics;
}
Expand All @@ -508,26 +511,27 @@ storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1);
#endif
handle->descr = descr;
pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length);
if (unlikely(INVALID_TIME == descr->start_time ||
INVALID_TIME == descr->end_time)) {
INVALID_TIME == page_end_time)) {
goto no_more_metrics;
}
if (unlikely(descr->start_time != descr->end_time && next_page_time > descr->start_time)) {
if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) {
/* we're in the middle of the page somewhere */
entries = descr->page_length / sizeof(storage_number);
position = ((uint64_t)(next_page_time - descr->start_time)) * entries /
(descr->end_time - descr->start_time + 1);
entries = page_length / sizeof(storage_number);
position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) /
(page_end_time - descr->start_time);
} else {
position = 0;
}
}
page = descr->pg_cache_descr->page;
ret = page[position];
entries = descr->page_length / sizeof(storage_number);
entries = page_length / sizeof(storage_number);
if (entries > 1) {
usec_t dt;

dt = (descr->end_time - descr->start_time) / (entries - 1);
dt = (page_end_time - descr->start_time) / (entries - 1);
current_position_time = descr->start_time + position * dt;
} else {
current_position_time = descr->start_time;
Expand Down

0 comments on commit 34162a0

Please sign in to comment.