Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clustering and distributed execution #140

Open
liclac opened this issue Mar 7, 2017 · 32 comments
Open

Clustering and distributed execution #140

liclac opened this issue Mar 7, 2017 · 32 comments
Labels

Comments

@liclac
Copy link
Contributor

liclac commented Mar 7, 2017

This is something that isn't top priority at the moment, but it's going to take a lot of design work, so I'd like to get the ball rolling on the actual planning.

My current idea is to have a k6 hive command (name subject to change), which hooks up to etcd - a lovely piece of software that can handle most of the heavy lifting around clustering, and is also the backbone that makes among other things Kubernetes tick.

Each instance registers itself, along with how many VUs it can handle, exposes an API to talk to the cluster, and triggers a leader election. The information registered in etcd might have a structure like:

  • /loadimpact/k6/nodes/nodeA - node data (JSON)
  • /loadimpact/k6/nodes/nodeB - node data (JSON)

Running a test on the cluster is a matter of calling k6 run --remote=https://a.node.address/ script.js. This causes it to, instead of running the test locally, roll up all the data and files needed and push them to the cluster, where they're stored in etcd - available from each node.

  • /loadimpact/k6/test - test data (JSON)
  • /loadimpact/k6/test/src/... - root for the pushed filesystem

When test data is loaded, each VU instantiates an engine and its maximum number of VUs right away, and watches its own registry information for changes. The elected leader then takes care of patching other nodes to distribute VUs evenly across the cluster.

@neilstuartcraig
Copy link

WRT #202 / request for comments:

I don't have direct experience with etcd so i don't have an opinion there but the notion of using existing, proven software is clearly sound so i'd welcome that.

My initial thought is, how would someone know how many VU's a machine can handle? If there's an automated estimation based on system resources (though obv. e.g. a CPU core on AWS is definitely not === a CPU core on dedicated hardware) then that'd help to at least provide consistency which would be good. I'm aware though that it'd quite easy to overload a load generator with work and thus skew its output as it would lack the system resources to measure accurately. However it's implemented, i would imagine it's going to be necessary to allow users to set/amend the VU capability and a good user guide would help a lot - i.e. defining a way in which users can calculate/estimate - that might be the best way to start actually, keeping it simple and iterating/adding from there.

@liclac
Copy link
Contributor Author

liclac commented May 8, 2017

Honestly, your best shot is probably trial and error. The limiting factor is not typically CPU power, but rather local socket usage, and to some lesser extent, RAM usage, both of which vary slightly between scripts. A good start would be just split your desired number of VUs across as many hosts as you want and seeing if it flies or not.

@ghost
Copy link

ghost commented May 10, 2017

Linux has a maximum of 64K ephemeral ports. That's your connection limit. I'd also kernel tune TIME_WAIT etc
http://gatling.io/docs/current/general/operations/#id3.
Telegraf should have a minimal footprint also https://github.com/influxdata/telegraf

@arichiardi
Copy link

arichiardi commented Nov 30, 2017

Thanks for this, I am very interested!

I was wondering if there would be a way to avoid adding a new service to the pool.

The need of extra shared meta date is going to be there, because you would probably want to direct the load test output to a single InfluxDB instance, so maybe we can save this kind of metadata directly there?

I know that this would force folks to stick with InfluxDB, but if we'd use etcd people would in any case need to custom tailor something to collect and aggregate results.

@liclac
Copy link
Contributor Author

liclac commented Dec 20, 2017

@arichiardi I think it's more important that we look at how we can best implement this, using all available tools, rather than looking at how to minimise dependencies right out of the bat. I'm not saying we should introduce dependencies for the sake of it, but we want this done right.

The current requirements for this to be implemented is as follows:

Prerequisite: Leader assignment

Most of the below requirements have one prerequisite: we need a central point to make all decisions from. The leader doesn't need a lot of processing power, it just needs to keep an eye on things, so to speak.

  1. The initiating client acts as the brain.

    This is the absolute simplest solution, with the drawback that the test will have to be aborted if the client loses connection.

  2. Dedicated master, á la kubernetes.

    Second simplest. Needs a coordinating process that can communicate with all instances, either directly or through an indirection layer (eg. etcd, redis, etc). Because this can be expected to run in the cluster, and we have a persistence mechanism for the test, we could technically recover from a master failure/network hiccup by just skipping the chunk of the timeline that never passed.

  3. Leader election.

    This is fortunately not something we have to implement the algorithm for ourselves (etcd and similar provide it with a single function call), but it does add a hard dependency on something that can provide this. If one of the instances in a cluster can be dynamically elected as the acting master, it would theoretically simplify deployment, I just have a bad feeling it might be opening a can of worms of synchronisation bugs.

Spreading VUs across instances.

The algorithm for this could simply be to spread VUs evenly across all available instances, respecting their caps. We could possibly do some weighing, eg. between an instance with max 1000 VUs and one with max 2000 VUs, the latter could get 2x as many VUs allocated to it.

Possible implementations I can see:

  1. All instances hold persistent connections to the master.

    I don't like this. Connections break. It's real simple though.

  2. Key-Value Observing.

    All instances register themselves in a control plane of some kind (etcd, consul, redis), then watch their own keyspaces. The master node updates them. Reliability is offloaded to the control plane, we don't have to worry about it.

Central execution of thresholds, from a data source.

This would be fairly simple using something like InfluxDB; we can parse threshold snippets for the variables they refer to (there's some code for that already), then query them out of the database, using the starting timestamp of the test as delimiter.

We could do something with shipping samples back to the master, but that feels... a little silly.

Distributed rate limiting.

The --rps flag, and possible future rate limits, need to be distributed to work properly.

Distributed data storage.

We need to be able to store two kinds of things:

  • Archive data - script files, static files, options
  • Runtime data like the fixed seed used for the init context, setup data (setup and teardown methods #194), etc

This can be anything that can store keys and values of arbitrary size.

@AnotherDevBoy
Copy link

Have you considered implementing something similar to what was done for Locust?

They have a master/slave architecture where the synchronization happens via ZMQ (TCP), which is lightweight enough.

One advantage, in this case, is that there is no need for introducing a hard dependency. The synchronization master/slave can be implemented via ZMQ, HTTP or whatever network protocol you might consider.

IMHO, the only disadvantage that Locust implementation has is the fact that it is a stateful system, where the master must always be started first and can't recover if a slave disappears and then comes back.

I would rather see a stateless system that can handle connectivity issues gracefully.

@na--
Copy link
Member

na-- commented Oct 19, 2018

@coderlifter, thanks, we still haven't finalized the k6 distributed execution design yet, so we'll definitely consider this approach when we get to it. We'll post a final design/RFC here when we start implementing this, so it can be discussed by any interested people.

@GTB3NW
Copy link

GTB3NW commented Jan 14, 2019

Any progress on internal discussions? This is a killer feature which is pushing users more towards python at the moment.

@na--
Copy link
Member

na-- commented Jan 15, 2019

@GTB3NW, sorry, we still haven't started specifically implementing this functionality yet, the next major feature we're currently working on is the arrival-rate based execution support, i.e. being able to schedule the execution in terms of iterations (requests) per second. As a part of the refactoring we're doing for that, we'll also improve some k6 internals in a way that would facilitate the easier implementation of the distributed execution as well, so we're slowly getting to that point, but we're not there yet.

@efology
Copy link

efology commented Apr 25, 2019

Linux has a maximum of 64K ephemeral ports. That's your connection limit. I'd also kernel tune TIME_WAIT etc
http://gatling.io/docs/current/general/operations/#id3.
Telegraf should have a minimal footprint also https://github.com/influxdata/telegraf

You should be able to set up n(ip) * ~65K(ports) sockets, since a source of a packet is src ip + src port.
You need to create virtual IP addresses for that.

@na-- na-- mentioned this issue May 15, 2019
39 tasks
@qcastel
Copy link

qcastel commented Apr 25, 2020

Any updates on this issue? Very keen to be able to run K6 in a distributed manner

@na--
Copy link
Member

na-- commented Apr 27, 2020

We are currently finishing a big refactoring of the core of how script execution works in k6, which would be foundational for native distributed execution, among other thigs. You can see some details #1007 (specifically, the execution segments part of it, #997).

This would allow you to partition a load test among however many instances you require, without any synchronization between them. For example, if you want to split a test between 3 instances, you would be able to do something like this when #1007 is merged:

k6 run --execution-segment "0:1/3" --execution-segment-sequence "0,1/3,2/3,1" my-script.js
k6 run --execution-segment "1/3:2/3" --execution-segment-sequence "0,1/3,2/3,1" my-script.js
k6 run --execution-segment "2/3:1" --execution-segment-sequence "0,1/3,2/3,1" my-script.js

Each instance will execute only its own piece of the puzzle. To start the tests synchronously, you can use the --paused flag and the REST API to start the test run. Though for now you'd still need a centralized metrics store, if you want to explore the generated metrics, and things like thresholds won't work in a distributed manner. To have that working, we'd still need to implement #763 and some synchronization between the instances...

@luketn
Copy link

luketn commented Aug 8, 2020

Linux has a maximum of 64K ephemeral ports. That's your connection limit. I'd also kernel tune TIME_WAIT etc
http://gatling.io/docs/current/general/operations/#id3.
Telegraf should have a minimal footprint also https://github.com/influxdata/telegraf

@ghost @efology
It's actually possible to create far more connections than 64k per client, because the limitation for TCP connections is on the tuple of:

{local IP address, local port, remote IP address, remote  port}

https://tools.ietf.org/html/bcp156#section-2.2

The local IP address is likely singular, and the remote port is likely constant.
The local port is probably limited to ~64,511 (assuming you expand the defaults but still exclude 0-1024).
The remote IP address of the target may not be singular and is particularly likely to resolve to multiple IPs if it is a redundant system.

e.g. Just looking at the public hostname of one of my services hosted by AWS, the DNS resolves to 4 IP addresses. In theory one k6 client machine configured to allow 64,511 ephemeral ports per remote IP would be able to create 258,044 concurrent connections to it.

As you say @efology you could also configure a client with multiple IPs and multiply the potential connections that way.

Particularly if you want to load test a web socket back end, where you have large numbers of idle connections this could be useful.

You might start to hit other limitations like CPU and memory at those higher connection counts too, depending on the internals of how K6 is managing connections.

@liclac
Copy link
Contributor Author

liclac commented Aug 8, 2020

Side note to this: As a short-term stopgap solution, you can squeeze more sockets out of a machine by load balancing across multiple IPs, subject to your network topology allowing this. Go actually makes this fairly trivial to implement, just set net.Dialer.LocalAddr.

This is trivial in IPv6 setups that assign prefixes to machines, eg. my laptop currently has a /64 subset of this flat's /48. For IPv4 networks, it more commonly takes some fiddling with your interface configuration and adding discrete IPs - but you can query it all the same, with net.Interface.Addrs().

(* Neither this nor distributed execution helps if you're trying to load test through a NAT with only a single public IP between you and your destination host; that's your bottleneck in that case.)

@mstoykov
Copy link
Contributor

@liclac, This is what is used in all the multi NIC implementations, one of which will hopefully get in v0.28.0 ;). And it really does work and if you have multiple IPs/NICs

@ragnarlonn
Copy link

I just saw this mentioned on the road map (https://github.com/orgs/grafana/projects/443/views/1) and wanted to make an annoying comment :-) as this must surely be one of the oldest open tickets around?

For six years now it has been about "a year away". Isn't it better to just remove it from the road map until we know we're ready to start working on it?

I consider it to be the holy grail for k6. When it is done, in the way @liclac envisioned it, when it is as easy to run a distributed test in k6 as it is in Locust, then it is going to be hard for most people to justify using any other load testing tool.

Not to take away from the awesome work that's been done, and is being done - you guys rock! But this old killer feature is bugging me with its tendency to never get built but still appear on the road map all the time!

/a grumpy old user

@na--
Copy link
Member

na-- commented Jun 22, 2023

@ragnarlonn, very happy to hear from you! The good news is that we have been low-key working towards this goal for a while 😅 There is an open PR (#2816) with a simple proof-of-concept implementation of distributed execution and even a very rudimentary test suites (#1342) support in a separate commit 🎉 That PoC is not polished at all, it basically only works for the happy path, but it works and I can run a k6 test on multiple instances, with a (mostly) working end-of-test summary and thresholds.

Guided by that PoC, so far we have been clearing the obstacles and prerequisites before we can have robust distributed execution implementation. A couple of versions ago we released k6 v0.43.0 that included a series of refactoring efforts spanning 10+ PRs and culminating in #2815, the removal of the core.Engine component! Removing the Engine was a necessary prerequisite for distributed execution (and test suites) because the Engine was responsible for two different major "jobs": managing the test execution and crunching metrics (and a few smaller ones). These two major responsibilities were reasonably OK to be in the same component in local execution (though even that had a bunch of problems), but cannot coexist in distributed execution, so we had to split them apart and simplify things.

As you can see from the first commit of #2816, the only remaining obstacle before we can implement distributed execution is the lack of HDR/sparse histograms (#763), to efficiently move Trend metric values over the network. Though even on that front we have cleared out plenty of prerequisites and done a lot of refactoring already.

The architecture of that PoC is somewhat different than what was described at the start of the issue. Some of the original ideas were not feasible because k6 doesn't just care about endlessly looping VUs anymore, we now have arrival-rate executors and multiple scenarios in the same test. It also introduced 2 new sub-commands (k6 coordinator and k6 agent), though that is mostly necessary for ease of use. Because of execution segments, every k6 instance knows what it is supposed to do, so the synchronization between the agent and coordinator instances is very light. That means that if one is willing to give up automatic centralized thresholds and end-of-test summary, a somewhat different etcd-based architecture is also quite feasible! I may share a more detailed design doc publicly at some point, we'll see.

In any case, one big reason we haven't pushed distributed execution forward more swiftly has been the fact that it is competing with a lot of other features with high priorities, while we already have a reasonably OK distributed execution solution for a lot of use cases. And I am not talking about just k6 cloud, but mostly about k6-operator. It's not perfect and native k6 distributed execution support will plug some of the current gaps, but if you want to run distributed k6 tests in Kubernetes, it already works pretty well!

So yeah, including native distributed k6 execution in the new roadmap is a bit more realistic this time compared to before, but it's still competing with a lot of other priorities.

@ragnarlonn
Copy link

@na-- So you're saying we really are getting close to being able to release it? That would be super cool. Will we bump the version to 1.0 also then, or what's the criteria for that? I might try to test that PR!

@na--
Copy link
Member

na-- commented Jun 22, 2023

Not quite. We have a proof of concept and we are close to actually starting work on a full solution (HDR histograms are the only remaining prerequisite). But we haven't actually started on that final push, e.g. no work except some design docs are planned for it during the current release cycle. Follow the roadmap and milestones to track when that work actually starts.

And after work has actually started, releasing it publicly is another matter entirely... I am not sure how long a fully-featured version of my PoC would take to deliver, or if we won't need to make some major architectural deviations from that PoC... 😅 If everything goes well, it shouldn't be too long, given that we have a PoC to follow and that the whole feature is backwards compatible. So it can probably be released as an "experimental" feature initially, allowing us to iterate and make some breaking changes for a few versions to fine-tune things 🤔 But no promises or even guesstimates at this point, sorry 😅

Regarding k6 v1.0, there are other concerns. It hasn't been released not just due to the lack of this feature (or other major ones), but mostly due to the fact that xk6 extensions use parts of k6 as Go dependencies. And Go module import paths change based on the major version of the dependency repo... See #2640 (comment) for more details, but the TLDR version is that before we release k6 v1.0.0, we probably need to refactor the k6 codebase and split it in 2 different Go modules:

  1. one for the k6 binary itself, which we can increment to whatever version we want
  2. a second one with library code that is forever v0.x and is imported by the main k6 module above and by every xk6 extension out there

@ragnarlonn
Copy link

@na-- I get it. And I know you're making the right calls, I just can't help fretting over that distributed execution ;) When it does get released, there should be a big party thrown somewhere.

@luketn
Copy link

luketn commented Jun 27, 2023

push them to the cluster, where they're stored in etcd -

I think it would also be important to support streaming the aggregated results to Grafana for visualising the cluster job as it proceeds.

@liclac
Copy link
Contributor Author

liclac commented Jun 27, 2023

I'm up for a party and/or lending a hand if needed (especially if I can do so as a freelancer ;D), not getting to finish this before I left and didn't have time anymore is one of my big regrets.

@na--
Copy link
Member

na-- commented Jul 20, 2023

@ragnarlonn, somewhat prompted by your questions, I've worked on pushing this issue forward as much as I can before I go on a long vacation/sabbatical for the next few months 😮‍💨 I'll also likely change teams a few months after I come back, so, like @liclac, I might not be the person who finishes this... 😅 😞

As you might see from the many issues and PR linked above, I've enhanced, refactored and split up my original distributed execution PoC from #2438 and #2816 into a few hopefully merge-able PRs. They are still very much proofs of concepts - very rough and not ready for any serious usage! The rest of the team also hasn't reviewed them or even approved the overall architecture. Nor has the work of reviewing, approving, merging and finishing up any remaining related tasks been prioritized. So this is still far from ready for production use... 😞

However, the distributed execution changes have been refactored into a few small, atomic and self-sufficient commits/PRs that should be safe to merge even as they currently are, since they should now be completely backwards compatible! 🤞 🎉

They no longer disable any unit tests or linters, or rely on the HDR histograms changes to be merged first! Support for HDR/sparse histograms still needs to be added before this feature can be used for big tests, but this can now be done after distributed execution has been merged! 🎉 Moreover, this distributed execution implementation doesn't affect the k6 run or k6 cloud sub-commands in any way, instead it is done with new k6 coordinator and k6 agent sub-commands. They can be hidden and considered completely experimental and liable to change and break until the team says otherwise. That should hopefully give enough room to maneuver and handle the remaining tasks in an iterative approach, as separate smallish PRs after the ones I created are merged... 🤞

To wrap up, if anyone is interested in my proposal, I created a new issue to track all of its details and sub-tasks, #3218. I also wrote down my thoughts and ideas on the topic in a new design document, #3217. It is a long read, but I tried to provide the maximum amount of context so that everyone can grok the overall architecture and the reasons behind the PoC code. I've tried to make what exists as code and ideas in my head as easy as possible for someone else to adopt and built upon, so 🤞 😅

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

17 participants