Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

Cannot dump heavily skewed data in parallel #75

Closed
kennytm opened this issue May 15, 2020 · 4 comments
Closed

Cannot dump heavily skewed data in parallel #75

kennytm opened this issue May 15, 2020 · 4 comments
Labels
priority/P1 High priority issue

Comments

@kennytm
Copy link
Collaborator

kennytm commented May 15, 2020

Example:

create table x(id bigint primary key);
insert into x (a) values (1);
insert into x (a) values (2);
...
insert into x (a) values (9999);
insert into x (a) values (10000);

insert into x (a) values (9223372036854775807);

Currently Dumpling only splits the range using min and max ignoring the actual data distribution, so this will cause one thread to dump all small ID and while the rest only dump zero or one rows.

We could either introduce mydumper's bisection algorithm, or use a work-stealing algorithm for the completed threads to "steal" the unprocessed range from the working threads.

@kennytm
Copy link
Collaborator Author

kennytm commented May 15, 2020

The original reproduction case:

  1. Create a table with shard_row_id_bits = 6 (64 shards)
  2. Ingest 37,000,000 rows of data via Lightning. Lightning currently ignores shard_row_id_bits, so all of them will occupy the the shard 0.
  3. Insert 100 rows. These rows shall evenly distribute within the 64 shards, causing the _tidb_rowid to have a very wide range but all data are concentrated at one point.

@kennytm kennytm added the priority/P1 High priority issue label May 28, 2020
@kennytm
Copy link
Collaborator Author

kennytm commented Jun 25, 2020

Proposed implementation:

  1. A parallelizable table has at least one unique, non-null, sorted ("B-Tree") index. The index must be a well-ordered interpolatable type such as integer or binary string.

  2. An interval of values can be closed ([x, y]) or half-open ((x, y]). An interval can be split into N disjoint sub-intervals with roughly equal lengths (e.g. [0, 99] split 3 = [0, 33](33, 66](66, 99]).

  3. A worker can either be in "running" or "idle" states. A running worker is associated with an interval.

  4. A scheduler distributes dumping work to N workers. The scheduler tracks two things:

    • W, the list of idle workers
    • R, the list of intervals not yet associated with any running workers
  5. The scheduler has 3 main methods:

    • Enqueue(worker)
      1. append the worker to W.
    • Return(interval)
      1. append the interval to R.
    • Step() bool
      1. block until W is not empty.
      2. if R is empty, and length of W is N (all workers are idle), return "completed"
      3. if R is empty, and length of W is less than N (some workers are still running), block until R is not empty.
      4. pop an interval out of R, then split the interval by the length of W.
      5. pop workers out of W for the number of sub-intervals.
      6. associate every popped worker with a sub-interval, and run them in background in parallel.
  6. Running a worker i with an interval (x, y] means that:

    1. execute the SQL select * from tbl where key > x and key <= y order by key limit 1000 and dump the result (the number 1000 can be configured)
    2. if 1000 rows are returned, obtain the value of the last key z, and then do Return( (z, y] ), provided the interval is not empty.
    3. do Enqueue(i).
  7. The whole dumping procedure is like this:

    1. perform select min(key), max(key) from tbl to get the interval [min, max] covering the whole table.
    2. for i in 0..N: Enqueue(i).
    3. Return( [min, max] ).
    4. loop Step() until "completed".

Scenario 1, evenly distributed data in the range `[0, 3333]` with 3 workers, and every number is filled.
time R W 0 1 2 covered
0 [0, 3333] [0, 1, 2] -
0 - - [0, 1111] (1111, 2222] (2222, 3333] -
1000 (999, 1111] [0] - (1111, 2222] (2222, 3333] [0, 999]
1000 (2111, 2222] [1] (999, 1111] - (2222, 3333] [0, 999]
(1111, 2111]
1000 (3222, 3333] [2] (999, 1111] (2111, 2222] - [0, 999]
(1111, 2111]
(2222, 3222]
1111 - [0] - (2111, 2222] (3222, 3333] [0, 999]
(1111, 2111]
(2222, 3222]
(999, 1111]
1111 - [0, 1] - - (3222, 3333] [0, 999]
(1111, 2110]
(2222, 3221]
(999, 1111]
(2111, 2222]
1111 - [0, 1, 2] - - - [0, 999]
(1111, 2110]
(2222, 3221]
(999, 1111]
(2111, 2222]
(3221, 3333]
completed - - - - - -
Scenario 2, heavily skewed data, with data `[1, 10000]` ∪ `[12345678]` using 10 workers
time R W 0 1 2 3 4 5 6 7 8 9
0 [1, 12345678] 0...9 - - - - - - - - - -
0 - - [1, 1234568] (_,2469136] (_,3703704] (_,4938271] (_,6172839] (_,7407407] (_,8641974] (_,9876542] (_,11111110] (_,12345678]
1 - 1...9 [1, 1234568] - - - - - - - - -
1000 (1000, 1234568] 0...9 - - - - - - - - - -
1000 - - (1000, 124356] (_,247713] (_,371070] (_,494427] (_,617784] (_,744140] (_,864497] (_,987854] (_,1111211] (_,1234568]
1000 - 1...9 (1000, 124356] - - - - - - - - -
2000 (2000, 124356] 0...9 - - - - - - - - - -
2000 - - (2000, 14235] (_,26471] (_,38706] (_,50942] (_,63178] (_,75413] (_,87649] (_,99884] (_,112120] (_,124356]
2000 - 1...9 (2000, 14235] - - - - - - - - -
3000 (3000, 14235] 0...9 - - - - - - - - - -
3000 - - (3000, 4123] (4123, 5247] (5247, 6370] (6370, 7494] (7494, 8617] (8617, 9741] (9741, 10864] (10864, 11988] (11988, 13111] (13111, 14235]
4000 (4000, 4123] 0 - (4123, 5247] (5247, 6370] (6370, 7494] (7494, 8617] (8617, 9741] (9741, 10864] (10864, 11988] (11988, 13111] (13111, 14235]
4000 (5123, 5247] 1 (4000, 4123] - (5247, 6370] (6370, 7494] (7494, 8617] (8617, 9741] (9741, 10864] (10864, 11988] (11988, 13111] (13111, 14235]
                       
4000 (14111, 14235] 9 (4000, 4123] (5123, 5247] (6247, 6370] (7370, 7494] (8494, 8617] (9617, 9741] (10741, 10864] (11864, 11988] (12988, 13111] -
4123 - 0 - (5123, 5247] (6247, 6370] (7370, 7494] (8494, 8617] (9617, 9741] (10741, 10864] (11864, 11988] (12988, 13111] (14111, 14235]
4123 - 0...1 - - (6247, 6370] (7370, 7494] (8494, 8617] (9617, 9741] (10741, 10864] (11864, 11988] (12988, 13111] (14111, 14235]
                       
4123 - 0...8 - - - - - - - - - (14111, 14235]
4123 - 0...9 - - - - - - - - - -
completed - - - - - - - - - - - -

Notice how in scenario 2 the algorithm exponentially narrow down the range to the uniform part and start to utilize all 10 threads.

@IANTHEREAL
Copy link
Collaborator

can we close it now?

@lichunzhu
Copy link
Contributor

Fixed on TiDB 3.0+ by tablesample and tableregion.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
priority/P1 High priority issue
Projects
None yet
Development

No branches or pull requests

3 participants