Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

WIP - Basic clustering support - issue #56 #74

Merged
merged 15 commits into from
Dec 16, 2015
Merged

Conversation

woodsaj
Copy link
Member

@woodsaj woodsaj commented Dec 11, 2015

This is not ready for merging yet.

@woodsaj woodsaj mentioned this pull request Dec 11, 2015
@Dieterbe
Copy link
Contributor

does it make sense for me to start reviewing or should i hold off until you're more happy with the code?

@woodsaj
Copy link
Member Author

woodsaj commented Dec 11, 2015

@Dieterbe hold of for a bit.

Conflicts:
	metric_tank/aggmetric.go
@woodsaj
Copy link
Member Author

woodsaj commented Dec 14, 2015

@Dieterbe this is ready for review now.

This handler is designed to be used by LB healthchecks. The handler
will return a 503 unavailable response if the node is not the
primary node and has not been running for at least warmUpPeriod
seconds.  This will ensure that metric Get requests will only be
sent to the node when it has sufficient data to serve.
}
}

func (a *AggMetric) getChunkByT0(ts uint32) *Chunk {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function assumes lock is held? put that in comment. also ByT0 suffix can be dropped. rename arg1 to t0.

@Dieterbe
Copy link
Contributor

overall looks pretty good. similar to what i had in mind.

how far can we go in assuring that a secondary node is able to get all messages from a primary, so that it doesn't end up saving (overwriting?) partial chunks that were already saved properly but for some reason (nsqd disconnect/timeout) it lost the message? i made a comment in the line where the nsqd consumer is created about using a non-ephemeral disk-backed channel. I think we should aim to guarantee delivery except when a nsqd machine just fatally dies or something, or is not brought back up before we want to promote to primary

@@ -28,13 +28,15 @@ var (
showVersion = flag.Bool("version", false, "print version string")
dryRun = flag.Bool("dry", false, "dry run (disable actually storing into cassandra")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that we have better terminology, i think it would be better to use a var named primaryMode bool default true, or something.

- rename SaveChunkByT0 to SyncChunkSaveState
- fix incorrect mod instead of division
- use a non-ephemeral channel for the metric-persist messages. As
the metrics channel is non-ephemeral this ensures that metric-persist
messages get bufferd just like metrics between restarts and outages.
- convert cluster API to just use basic form data for setting the
primary state. state can now be changed with
`curl -X POST http://localhost:6063/cluster?primary=true`
- rename "dryRun" cli argument to "primaryNode"
- ensure cassandra is initialized before we start consuming metrics
- update comments to ensure clarity
@woodsaj
Copy link
Member Author

woodsaj commented Dec 14, 2015

Thanks for all the feedback Dieter.

how far can we go in assuring that a secondary node is able to get all messages from a primary, so that it doesn't end up saving (overwriting?)

It is generally ok if a secondary node saves a chunk to Cassandra that has already been previously saved. Cassandra will just tombstone the original write and store the new write, ie a normal Cassandra Update. The only time this is problematic is if the second save is only of a partial chunk.

The only time the metricPersist message would be lost is if the primary node crashes or is stopped between writing data to Cassandra and publishing the metricPersist message.

So delivery of the messages is reliable enough. What is more important is to ensure that the secondary nodes have processed all metricPersist message before they are promoted to Primary. As this is still a manual process, that can easily be controlled.

I think there are still a few issues we need to resolve here, related to this problem.

  • how to handle the first chunk stored in memory as this will likely only be a partial chunk.

This is an issue when saving chunks (we really only want to save partial chunks when the node has been primary since startup), and also when handling Get methods (if the chunk is partial, then we should fetch the chunk from cassandra instead)

@Dieterbe
Copy link
Contributor

The only time the metricPersist message would be lost is if the primary node crashes or is stopped between writing data to Cassandra and publishing the metricPersist message.

or if nsqd (machine) crashes irrecoverably but i'm not too worried about that. or if nsqd crashes, the nsqd machine crashes or becomes unreachable, or even the connection between nsqd and the consumer drops momentarily or for a while, because it's currently implemented as an ephemeral channel, i.e. nsqd will drop messages it receives on the topic if no consumer is connected. Because of this I think it's likely that we'll have the issue of accidentally overwriting full chunks with a partial one in cassandra and we should try harder avoiding this (e.g. by using a disk backed, non-ephemeral channel).

another problem in such scenario (even, and especially if the chunks are not partial) is that the write load to cassandra becomes a lot heavier, needlessly.

re your comment on stopping the program, when we stop the program, we just ensure it has sufficient time to send the persistmessages after persisting the chunks.

ensure that the secondary nodes have processed all metricPersist message before they are promoted to Primary. As this is still a manual process, that can easily be controlled.

yeah i figure we could just look at nsqadmin and verify that the new instance has 0 backlog on the persist topic of all of the nsqd's.

how to handle the first chunk stored in memory as this will likely only be a partial chunk. ....This is an issue when saving chunks (we really only want to save partial chunks when the node has been primary since startup)

agreed on only do this when it was primary at startup.
the case where it was secondary at startup and then gets promoted doesn't sound like an issue to me, because when it's promoted, we assured it processed all persistmetric messages from the previous primary so it wouldn't overwrite any full chunks with partial ones.

also when handling Get methods (if the chunk is partial, then we should fetch the chunk from cassandra instead)

yes, except when we're primary mode I think. MT starting up in primary mode is a pretty good indication that what we have in RAM is the best we have and that we'll have a gap in data in cassandra (i.e. starting in primary mode is a worst case).

we could just track for each chunk what's the first ts "contained within": if we created the chunk and there was a chunk before this, then we know the chunk has the data from its t0 and onwards (irrespective if there's a point with that ts in it), but if it was the first chunk created, then it only has data from the ts of the first point. this might still be enough to satisfy certain read requests that only need a section of the chunk. But this should be enough to add that nuance to the lowest value returned by AggMetric.Get() which already triggers the right logic to fetch from cassandra when needed.
edit: actually no if Aggmetric.Get() sees that our partial chunk is not enough to cover from from, it should not use the first partial chunk at all, and lowest should be the first t0 of the first non-partial chunk. with my previous suggestion we'd have duplicate data (full chunk from cassandra + partial chunk from AggMetric)

Conflicts:
	metric_tank/aggmetric.go
The first chunk recorded is likely a partial chunk.  When not the
cluster primary dont serve the first chunk out of memory. Instead
the chunk will be fetched from Cassandra.
When there is no primary node (due to crash or shutdown) chunks
will not be getting saved. When a secondary node gets promoted
to primary there may be older chunks that never got saved and this
commit ensures that we check for this condition.  If there are older
chunks that need saving, we add them to the writeQueue. Older chunks
are added to the writeQueue before newer chunks.
@woodsaj
Copy link
Member Author

woodsaj commented Dec 15, 2015

I think these latest changes should get us to a good enough position.
The changes address:

  • Secondary nodes will not serve the first chunk they create for a metric. Instead they the Get() request will resort to fetching the data from cassandra.
  • Ensure that old chunks have been saved. As it is a manual process, there can be a large delay between a primary disappearing and a secondary node being promoted. During this window multiple chunks of data could be written, none of which would be saved. So when saving chunks we need to check and ensure that previous chunks have also been saved (or are at least in the writeQueue). If they are not we need to add them to the writeQueue.

thoughts @Dieterbe

@Dieterbe
Copy link
Contributor

looks good. can be merged. 2 points:

  • the pending chunks are in reverse chronological order. this is not a problem right?
  • first partial chunk may be enough to cover requested data. I still think that we should probably look at what's the first point we have in the chunk and only ignore the chunk if its range falls short, which is more efficient than getting a full chunk from cassandra, but we can implement this as a later optimization. the overhead of tracking a firstTs uint32 per chunk should be worth it.

@woodsaj
Copy link
Member Author

woodsaj commented Dec 16, 2015

the pending chunks are in reverse chronological order. this is not a problem right?

Good catch Dieter, I had already corrected this, but had not pushed the commit yet.

@Dieterbe
Copy link
Contributor

looks good to me. you have some logic that would benefit a lot from unit tests though.

woodsaj pushed a commit that referenced this pull request Dec 16, 2015
WIP - Basic clustering support - issue #56
@woodsaj woodsaj merged commit 807bf38 into master Dec 16, 2015
@woodsaj woodsaj deleted the coordinatePersist branch December 16, 2015 11:33
Dieterbe added a commit that referenced this pull request Aug 14, 2018
this is an old optimization (?) that has been with us
since a long time ago: #74
2029113

here's how it caused data loss at read time:
- when only 1 chunk of data had been filled:
  the "update" of the field is a no-op
  because len(chunks) == 1, so oldPos goes back to 0
  (not sure if intentional or a bug) so reading the
  first chunk worked.
- once you have more than 1 chunk: update of oldPos works.
  we start hitting cassandra.
  depending on how long the chunk takes to get saved
  to cassandra, we will miss data at read time.
  also, our chunk cache does not cache absence of data,
  hitting cassandra harder during this period.
- once the chunk is saved to cassandra the problem disappears
- once the circular buffer recycles the first time (effectively
  removing the first chunk) this optimization no longer applies,
  but at that point we still hit cassandra just as before.

This problem is now solved. However, removing that code
enables another avenue for data loss at read time:
- when a read node starts (without data backfill)
- or a read node starts with data backfill, but the backfill
  doesn't have old data for the particular metric, IOW
  when the data only covers 1 chunk's worth
- a read node starts with data backfill, but since backfilling starts
  at arbitrary positions, the first chunk will miss some data in the
  beginning.

In all these cases, the first chunk is a partial chunk, whereas
a full version of the chunk is most likely already in cassandra.

To make sure this is not a problem, if the first chunk we used was
partial, we set oldest to the first timestamp, so that the rest
can be retrieved from cassandra.
Typically, this will cause the "same" chunk (but a full version)
to be retrieved from cassandra, which is then cached and seamlessly
merged via Fix()
@Dieterbe Dieterbe mentioned this pull request Aug 14, 2018
Dieterbe added a commit that referenced this pull request Aug 14, 2018
this is an old optimization (?) that has been with us
since a long time ago: #74
2029113

here's how it caused data loss at read time:
- when only 1 chunk of data had been filled:
  the "update" of the field is a no-op
  because len(chunks) == 1, so oldPos goes back to 0
  (not sure if intentional or a bug) so reading the
  first chunk worked.
- once you have more than 1 chunk: update of oldPos works.
  we start hitting cassandra.
  depending on how long the chunk takes to get saved
  to cassandra, we will miss data at read time.
  also, our chunk cache does not cache absence of data,
  hitting cassandra harder during this period.
- once the chunk is saved to cassandra the problem disappears
- once the circular buffer recycles the first time (effectively
  removing the first chunk) this optimization no longer applies,
  but at that point we still hit cassandra just as before.

This problem is now solved. However, removing that code
enables another avenue for data loss at read time:
- when a read node starts (without data backfill)
- or a read node starts with data backfill, but the backfill
  doesn't have old data for the particular metric, IOW
  when the data only covers 1 chunk's worth
- a read node starts with data backfill, but since backfilling starts
  at arbitrary positions, the first chunk will miss some data in the
  beginning.

In all these cases, the first chunk is a partial chunk, whereas
a full version of the chunk is most likely already in cassandra.

To make sure this is not a problem, if the first chunk we used was
partial, we set oldest to the first timestamp, so that the rest
can be retrieved from cassandra.
Typically, this will cause the "same" chunk (but a full version)
to be retrieved from cassandra, which is then cached and seamlessly
merged via Fix()

fix #78
fix #988
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants