Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Big OpenLM/DCLM <-> AI2 PR # 1 #12

Open
wants to merge 67 commits into
base: main
Choose a base branch
from
Open
Changes from 15 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
77e5160
Barebones python/S3 implementation
Feb 29, 2024
ab02649
Bash script written
Mar 5, 2024
8aaf3c9
Added i) directory support, ii) FP rate args, iii) No-Save option
Mar 7, 2024
10b192a
Made changes requseted in PR
Mar 7, 2024
a6b63fd
Added --no-progress flag
Mar 8, 2024
a7b7ccf
Added options for no-progress (cleaner signature) | Updated README.md…
Mar 13, 2024
2cc9cac
Cleaned up readme duplicates
Mar 13, 2024
f035820
Merged main
Mar 13, 2024
5be38a8
Bash script fixed
Mar 13, 2024
091ca4b
first commit
Mar 14, 2024
8840180
Added fp rate to scripts
Mar 14, 2024
0516941
added no-save-bloom-filter option in bash script by default
Mar 14, 2024
dee44c9
Hacky AWS script seems to work okay
Mar 15, 2024
3832421
s3 stream stuff added
Mar 17, 2024
b886faa
Oops, forgot to add threading
Mar 17, 2024
9eb69b0
Subcommands for local + s3 stuff built
Mar 18, 2024
18853bf
Added sysreq command
Mar 18, 2024
8052fe3
Okay, overrode the sysreq blockers
Mar 18, 2024
1b84f9f
Added better i/o collecting for bff-remote
Mar 19, 2024
50eee20
Added streaming support for s3-in
Mar 19, 2024
241dc05
Added default region for AWS
Mar 19, 2024
4840e95
Added some more timing/ablations for some various checks
Mar 20, 2024
cd29a10
More testing stuff
Mar 20, 2024
ba3cd53
Removing debuggy things
Mar 20, 2024
1dabe05
Support subset
Mar 20, 2024
031098e
Added some better printouts for removal stats at the end
Mar 20, 2024
e6c1803
Merge pull request #1 from achalddave/s3-stream
revbucket Mar 20, 2024
8be64f1
Added retry support for bff-remote
Mar 21, 2024
bd7add8
Added better RT thread control?
Mar 21, 2024
ab8ded3
Merged, should have {retries, better RT threading}
Mar 21, 2024
029ed65
Retry within process_file_s3 with exp backoff
achalddave Mar 22, 2024
5ba59f6
Better prints
achalddave Mar 22, 2024
92c3957
semicolon
achalddave Mar 22, 2024
ce53e1c
Random delay
achalddave Mar 22, 2024
71bf7b2
fixes
achalddave Mar 22, 2024
e4fce7e
Add random delay before spawning threads
Mar 22, 2024
9b8b45c
Freezing status for dedup ablations v3
Mar 25, 2024
fc9cfca
Freezing status for dedup ablations v3
Mar 25, 2024
68853d2
Added i) parallel init for bits vector; ii) better output directory f…
Mar 27, 2024
105921e
Merge remote-tracking branch 'origin/dabv3' into s3-stream
achalddave Mar 28, 2024
5bf7f56
Shuffling s3 shards
Mar 28, 2024
77939ef
Merge remote-tracking branch 'origin/dabv3' into s3-stream
achalddave Mar 28, 2024
1a776a5
Add global retries
achalddave Mar 28, 2024
0c558b0
bugfix
achalddave Mar 28, 2024
ba8641b
Bugfix
achalddave Mar 28, 2024
de972ea
Support offset
achalddave Mar 29, 2024
45f6768
Merge pull request #2 from achalddave/s3-stream
revbucket Mar 29, 2024
0193457
Printed number of input files
Mar 29, 2024
35d2ebd
Mreged achal's pr
Mar 29, 2024
18cedb5
Added sharding control for BFF, printed out filter sparsity at the end
Mar 29, 2024
cf0947c
Allowing text in annotate mode
Apr 8, 2024
358b6b7
idk, cargo needed updating
Apr 8, 2024
222b50e
Added scripts/ to gitignore
Apr 8, 2024
631b827
Merge branch 'dabv3'
Apr 8, 2024
084df4c
Update README.md
revbucket Apr 8, 2024
68ddf50
Cleaned things up a bit
Apr 8, 2024
eac5707
Merge branch 'main' of github.com:revbucket/bff
Apr 8, 2024
333e7d1
Made bloom-filter-file not mandatory
Apr 8, 2024
dcce23c
merged
Apr 8, 2024
b7f0228
Read all lines before doing dedup
achalddave Apr 14, 2024
703cd3b
Ignore creds
achalddave Apr 14, 2024
8260401
Print loop progress every so often
achalddave Apr 14, 2024
f5eeea8
Folded in no-stream for bff-remote
Apr 15, 2024
1fe70b9
Merge branch 'no-stream'
Apr 15, 2024
2d03792
Added s5cmd wrapper
Apr 15, 2024
7e686d4
oops
Apr 15, 2024
b4267cf
Ready for PR; @dirk, it's time to d-d-duel
Apr 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 73 additions & 29 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::operation::get_object::GetObjectOutput;
use tokio::io::{AsyncBufReadExt};
use tokio::io::BufReader as tBufReader;
use tokio::time::{Duration, sleep};
use async_compression::tokio::bufread::GzipDecoder as asyncGZ;
use rayon::prelude::*;

@@ -174,11 +175,23 @@ enum Commands {
#[arg(long)]
subset: Option<usize>,

#[arg(long, default_value_t=0)]
offset: usize,

#[command(flatten)]
bff_args: BffArgs,

// Local number of retries; we try to load each file from s3 this many times.
#[arg(long, default_value_t=3)]
num_retries: usize,

// Global number of retries; we do a full loop through all remaining files this many times.
// i.e.,
// remaining = all_paths
// for i in num_retries:
// remaining = process_data(remaining)
#[arg(long, default_value_t=3)]
num_global_retries: usize,
},

Sysreq {
@@ -543,7 +556,35 @@ fn process_file(
}



async fn get_object_with_retry(client: &Client, bucket: &str, key: &str, num_retries: usize) -> Result<GetObjectOutput, aws_sdk_s3::Error> {
let mut attempts = 0;
let base_delay = Duration::from_millis(100);
let max_delay = Duration::from_millis(2000);

let mut rng = rand::thread_rng();

loop {
match client.get_object().bucket(bucket).key(key).send().await {
Ok(response) => return Ok(response),
Err(e) if attempts < num_retries => {
// Calculate delay for exponential backoff, add some randomness so multiple threads don't access at the
// same time.
println!("Error {}/{}: {}", e, attempts, num_retries);
let random_delay = rng.gen_range(Duration::from_millis(0)..Duration::from_millis(1000));
let mut exponential_delay = base_delay * 2u32.pow(attempts as u32);
if exponential_delay > max_delay {
exponential_delay = max_delay;
}
sleep(exponential_delay + random_delay).await;
attempts += 1;
}
Err(e) => {
println!("Too many errors reading: {}. Giving up.", key);
return Err(e.into());
}
}
}
}

async fn process_file_s3(
s3_bucket: &String,
@@ -552,6 +593,7 @@ async fn process_file_s3(
bloom_filter: &Arc<BloomFilter>,
bff_args: &BffArgs,
pbar_option: &Option<Arc<Mutex<ProgressBar>>>,
num_retries: usize,
) -> Result<(usize, usize), Error> {
// Phase 1a: Build s3 client
let region_provider = RegionProviderChain::default_provider();
@@ -561,17 +603,7 @@ async fn process_file_s3(
.await;
let client = Client::new(&config);


// Phase 1b: read data into lines
// Note: this reads in a streaming sense (but we don't upload in streaming)
let object: GetObjectOutput = client
.get_object()
.bucket(s3_bucket)
.key(s3_input)
.send()
.await?;


let object = get_object_with_retry(&client, s3_bucket, s3_input, num_retries).await?;
let body_stream = object.body.into_async_read();
let gz = asyncGZ::new(body_stream);
let reader = tBufReader::with_capacity(1024 * 1024, gz);
@@ -798,7 +830,7 @@ fn extract_s3_basename(input_path: &str) -> &str {



async fn gather_s3_io(bucket: &str, prefix: &str, output_dir: &str, subset: &Option<usize>) -> Result<Vec<(String, String)>, Error> {
async fn gather_s3_io(bucket: &str, prefix: &str, output_dir: &str, subset: &Option<usize>, offset: usize) -> Result<Vec<(String, String)>, Error> {
let region_provider = RegionProviderChain::default_provider();
let config = aws_config::defaults(BehaviorVersion::latest())
.region(region_provider)
@@ -813,11 +845,17 @@ async fn gather_s3_io(bucket: &str, prefix: &str, output_dir: &str, subset: &Opt
.into_paginator()
.send();

let mut skipped = 0;
let mut io_pairs: Vec<(String, String)> = Vec::new();
'outer: while let Some(result) = response.next().await {
match result {
Ok(output) => {
for object in output.contents() {
if skipped < offset {
// Skip files until the offset is reached
skipped += 1;
continue;
}
if subset.is_some() && io_pairs.len() >= subset.unwrap() {
// Saw enough data for subset, skip
break 'outer;
@@ -857,8 +895,8 @@ async fn main() -> std::io::Result<()> {
bff(inputs, output_directory, &bff_args)?;
},

Commands::BffRemote {bucket, input_dir, output_dir, subset, bff_args, num_retries} => {
bff_remote(bucket, input_dir, output_dir, subset, &bff_args, num_retries).await?;
Commands::BffRemote {bucket, input_dir, output_dir, subset, bff_args, num_retries, num_global_retries, offset} => {
bff_remote(bucket, input_dir, output_dir, subset, &bff_args, num_retries, num_global_retries, offset).await?;
}
Commands::Sysreq {expected_ngram_count, fp_rate} => {
let bff_size = compute_bloom_size(*fp_rate, *expected_ngram_count, false);
@@ -971,7 +1009,7 @@ fn bff(inputs: &Vec<PathBuf>, output_directory: &PathBuf, bff_args: &BffArgs) ->



async fn bff_remote(bucket: &String, input_dir: &String, output_dir: &String, subset: &Option<usize>, bff_args: &BffArgs, num_retries: &usize) -> std::io::Result<()> {
async fn bff_remote(bucket: &String, input_dir: &String, output_dir: &String, subset: &Option<usize>, bff_args: &BffArgs, num_retries: &usize, num_global_retries: &usize, offset: &usize) -> std::io::Result<()> {
/*
General pseudocode:
Setup:
@@ -985,7 +1023,8 @@ async fn bff_remote(bucket: &String, input_dir: &String, output_dir: &String, su
*/
let start_time = Instant::now();
let bloom_filter = Arc::new(BloomFilter::from_args(bff_args));
let mut io_pairs = gather_s3_io(bucket, input_dir, output_dir, subset).await.unwrap();

let mut io_pairs = gather_s3_io(bucket, input_dir, output_dir, subset, *offset).await.unwrap();
println!("Collected {} input files...", io_pairs.len());

let num_files = io_pairs.len();
@@ -1014,10 +1053,12 @@ async fn bff_remote(bucket: &String, input_dir: &String, output_dir: &String, su
};
let threadpool = ThreadPool::new(threads);

for retry_count in 0..*num_retries {
for retry_count in 0..*num_global_retries {
let failed_io_pairs: Arc<Mutex<Vec<(String, String)>>> = Arc::new(Mutex::new(Vec::new()));
let mut rng = rand::thread_rng();
for io_pair in &io_pairs {
let num_retries = (*num_retries).clone();
let num_global_retries = (*num_global_retries).clone();
let retry_count = retry_count.clone();
let bucket = bucket.clone();
let bloom_filter = bloom_filter.clone();
@@ -1044,19 +1085,20 @@ async fn bff_remote(bucket: &String, input_dir: &String, output_dir: &String, su
&output_path,
&bloom_filter,
&bff_args,
&pbar_option)
&pbar_option,
num_retries)
);
match result {
Ok(outputs) => {
let (rem_doc_items, tot_doc_items) = outputs;
let mut total_guard = total_items.lock().unwrap();
*total_guard += tot_doc_items;
let mut removed_guard = removed_items.lock().unwrap();
*removed_guard += rem_doc_items;
let (rem_doc_items, tot_doc_items) = outputs;
let mut total_guard = total_items.lock().unwrap();
*total_guard += tot_doc_items;
let mut removed_guard = removed_items.lock().unwrap();
*removed_guard += rem_doc_items;
}
Err(err) => {
eprintln!("Round {}/{}: Error processing {}; {:?}", retry_count+1, num_retries, input_path, err);
if retry_count < num_retries - 1 {
eprintln!("Round {}/{}: Error processing {}; {:?}", retry_count+1, num_global_retries, input_path, err);
if retry_count < num_global_retries - 1 {
// in all but last round, push the failed pair to failed_io_pairs
let mut fail_guard = failed_io_pairs.lock().unwrap();
fail_guard.push((input_path, output_path));
@@ -1068,9 +1110,11 @@ async fn bff_remote(bucket: &String, input_dir: &String, output_dir: &String, su

}
}
});

}
});
// Wait a little before spawning the next processor.
let random_delay = rng.gen_range(Duration::from_millis(0)..Duration::from_millis(100));
sleep(random_delay).await;
}
threadpool.join();
io_pairs = failed_io_pairs.lock().unwrap().clone();
}