diff --git a/pipeline/alignments/align.py b/pipeline/alignments/align.py index 2851f7a58..02774d6ac 100755 --- a/pipeline/alignments/align.py +++ b/pipeline/alignments/align.py @@ -36,7 +36,6 @@ from pipeline.common.logging import get_logger logger = get_logger("alignments") -COMPRESSION_CMD = "zstdmt" class Tokenization(Enum): @@ -97,7 +96,7 @@ def run( remap(corpus_src, corpus_trg, tokenized_src, tokenized_trg, output_aln, remapped_aln) if output_path.endswith(".zst"): logger.info("Compressing final alignments") - subprocess.check_call([COMPRESSION_CMD, "--rm", remapped_aln]) + subprocess.check_call(["zstdmt", "--rm", remapped_aln]) remapped_aln += ".zst" shutil.move(remapped_aln, output_path) @@ -105,7 +104,7 @@ def run( def decompress(file_path: str): if file_path.endswith(".zst"): logger.info(f"Decompressing file {file_path}") - subprocess.check_call([COMPRESSION_CMD, "-d", "-f", "--rm", file_path]) + subprocess.check_call(["zstdmt", "-d", "-f", "--rm", file_path]) return file_path[:-4] return file_path @@ -182,10 +181,10 @@ def symmetrize(bin: str, fwd_path: str, rev_path: str, output_path: str): os.makedirs(os.path.dirname(output_path), exist_ok=True) # Wrap the file with a compressor stream if it needs to be compressed with ExitStack() as stack: - with zstandard.ZstdCompressor().stream_writer( - stack.enter_context(open(output_path, "wb")) - ) if output_path.endswith(".zst") else stack.enter_context( - open(output_path, "w", encoding="utf-8") + with ( + zstandard.ZstdCompressor().stream_writer(stack.enter_context(open(output_path, "wb"))) + if output_path.endswith(".zst") + else stack.enter_context(open(output_path, "w", encoding="utf-8")) ) as stream: with subprocess.Popen( [ diff --git a/pipeline/alignments/generate-shortlist.sh b/pipeline/alignments/generate-shortlist.sh index c55de6520..cf71baca9 100755 --- a/pipeline/alignments/generate-shortlist.sh +++ b/pipeline/alignments/generate-shortlist.sh @@ -20,9 +20,6 @@ vocab_path=$2 output_dir=$3 threads=$4 -COMPRESSION_CMD="${COMPRESSION_CMD:-pigz}" -ARTIFACT_EXT="${ARTIFACT_EXT:-gz}" - if [ "$threads" = "auto" ]; then threads=$(nproc) fi @@ -33,16 +30,16 @@ mkdir -p "${output_dir}" dir="${output_dir}/tmp" mkdir -p "${dir}" -corpus_src="${corpus_prefix}.${SRC}.${ARTIFACT_EXT}" -corpus_trg="${corpus_prefix}.${TRG}.${ARTIFACT_EXT}" +corpus_src="${corpus_prefix}.${SRC}.zst" +corpus_trg="${corpus_prefix}.${TRG}.zst" echo "### Subword segmentation with SentencePiece" -${COMPRESSION_CMD} -dc "${corpus_src}" | +zstdmt -dc "${corpus_src}" | parallel --no-notice --pipe -k -j "${threads}" --block 50M "${MARIAN}/spm_encode" --model "${vocab_path}" \ >"${dir}/corpus.spm.${SRC}" -${COMPRESSION_CMD} -dc "${corpus_trg}" | +zstdmt -dc "${corpus_trg}" | parallel --no-notice --pipe -k -j "${threads}" --block 50M "${MARIAN}/spm_encode" --model "${vocab_path}" \ >"${dir}/corpus.spm.${TRG}" @@ -60,7 +57,7 @@ echo "### Creating shortlist" "${dir}/lex.t2s" if [ -f "${dir}/lex.s2t" ]; then - ${COMPRESSION_CMD} "${dir}/lex.s2t" + zstdmt "${dir}/lex.s2t" fi rm "${dir}/corpus.spm.${TRG}" @@ -69,10 +66,10 @@ rm "${output_dir}/corpus.aln" echo "### Shortlist pruning" "${MARIAN}/spm_export_vocab" --model="${vocab_path}" --output="${dir}/vocab.txt" -${COMPRESSION_CMD} -dc "${dir}/lex.s2t.${ARTIFACT_EXT}" | +zstdmt -dc "${dir}/lex.s2t.zst" | grep -v NULL | python3 "prune_shortlist.py" 100 "${dir}/vocab.txt" | - ${COMPRESSION_CMD} >"${output_dir}/lex.s2t.pruned.${ARTIFACT_EXT}" + zstdmt >"${output_dir}/lex.s2t.pruned.zst" echo "### Deleting tmp dir" rm -rf "${dir}" diff --git a/pipeline/alignments/requirements/alignments.in b/pipeline/alignments/requirements/alignments.in index e77201ca8..7cf4412bd 100644 --- a/pipeline/alignments/requirements/alignments.in +++ b/pipeline/alignments/requirements/alignments.in @@ -1,3 +1,4 @@ eflomal==1.0.0b1 opus-fast-mosestokenizer==0.0.8.5 tqdm +requests==2.31.0 diff --git a/pipeline/alignments/requirements/alignments.txt b/pipeline/alignments/requirements/alignments.txt index 6b67b885a..fd83f6857 100644 --- a/pipeline/alignments/requirements/alignments.txt +++ b/pipeline/alignments/requirements/alignments.txt @@ -1,16 +1,234 @@ # -# This file is autogenerated by pip-compile with Python 3.11 +# This file is autogenerated by pip-compile with Python 3.10 # by the following command: # -# pip-compile pipeline/alignments/requirements/alignments.in +# pip-compile --allow-unsafe --generate-hashes pipeline/alignments/requirements/alignments.in # -cython==3.0.9 +certifi==2024.7.4 \ + --hash=sha256:5a1e7645bc0ec61a09e26c36f6106dd4cf40c6db3a1fb6352b0244e7fb057c7b \ + --hash=sha256:c198e21b1289c2ab85ee4e67bb4b4ef3ead0892059901a8d5b622f24a1101e90 + # via requests +charset-normalizer==3.3.2 \ + --hash=sha256:06435b539f889b1f6f4ac1758871aae42dc3a8c0e24ac9e60c2384973ad73027 \ + --hash=sha256:06a81e93cd441c56a9b65d8e1d043daeb97a3d0856d177d5c90ba85acb3db087 \ + --hash=sha256:0a55554a2fa0d408816b3b5cedf0045f4b8e1a6065aec45849de2d6f3f8e9786 \ + --hash=sha256:0b2b64d2bb6d3fb9112bafa732def486049e63de9618b5843bcdd081d8144cd8 \ + --hash=sha256:10955842570876604d404661fbccbc9c7e684caf432c09c715ec38fbae45ae09 \ + --hash=sha256:122c7fa62b130ed55f8f285bfd56d5f4b4a5b503609d181f9ad85e55c89f4185 \ + --hash=sha256:1ceae2f17a9c33cb48e3263960dc5fc8005351ee19db217e9b1bb15d28c02574 \ + --hash=sha256:1d3193f4a680c64b4b6a9115943538edb896edc190f0b222e73761716519268e \ + --hash=sha256:1f79682fbe303db92bc2b1136016a38a42e835d932bab5b3b1bfcfbf0640e519 \ + --hash=sha256:2127566c664442652f024c837091890cb1942c30937add288223dc895793f898 \ + --hash=sha256:22afcb9f253dac0696b5a4be4a1c0f8762f8239e21b99680099abd9b2b1b2269 \ + --hash=sha256:25baf083bf6f6b341f4121c2f3c548875ee6f5339300e08be3f2b2ba1721cdd3 \ + --hash=sha256:2e81c7b9c8979ce92ed306c249d46894776a909505d8f5a4ba55b14206e3222f \ + --hash=sha256:3287761bc4ee9e33561a7e058c72ac0938c4f57fe49a09eae428fd88aafe7bb6 \ + --hash=sha256:34d1c8da1e78d2e001f363791c98a272bb734000fcef47a491c1e3b0505657a8 \ + --hash=sha256:37e55c8e51c236f95b033f6fb391d7d7970ba5fe7ff453dad675e88cf303377a \ + --hash=sha256:3d47fa203a7bd9c5b6cee4736ee84ca03b8ef23193c0d1ca99b5089f72645c73 \ + --hash=sha256:3e4d1f6587322d2788836a99c69062fbb091331ec940e02d12d179c1d53e25fc \ + --hash=sha256:42cb296636fcc8b0644486d15c12376cb9fa75443e00fb25de0b8602e64c1714 \ + --hash=sha256:45485e01ff4d3630ec0d9617310448a8702f70e9c01906b0d0118bdf9d124cf2 \ + --hash=sha256:4a78b2b446bd7c934f5dcedc588903fb2f5eec172f3d29e52a9096a43722adfc \ + --hash=sha256:4ab2fe47fae9e0f9dee8c04187ce5d09f48eabe611be8259444906793ab7cbce \ + --hash=sha256:4d0d1650369165a14e14e1e47b372cfcb31d6ab44e6e33cb2d4e57265290044d \ + --hash=sha256:549a3a73da901d5bc3ce8d24e0600d1fa85524c10287f6004fbab87672bf3e1e \ + --hash=sha256:55086ee1064215781fff39a1af09518bc9255b50d6333f2e4c74ca09fac6a8f6 \ + --hash=sha256:572c3763a264ba47b3cf708a44ce965d98555f618ca42c926a9c1616d8f34269 \ + --hash=sha256:573f6eac48f4769d667c4442081b1794f52919e7edada77495aaed9236d13a96 \ + --hash=sha256:5b4c145409bef602a690e7cfad0a15a55c13320ff7a3ad7ca59c13bb8ba4d45d \ + --hash=sha256:6463effa3186ea09411d50efc7d85360b38d5f09b870c48e4600f63af490e56a \ + --hash=sha256:65f6f63034100ead094b8744b3b97965785388f308a64cf8d7c34f2f2e5be0c4 \ + --hash=sha256:663946639d296df6a2bb2aa51b60a2454ca1cb29835324c640dafb5ff2131a77 \ + --hash=sha256:6897af51655e3691ff853668779c7bad41579facacf5fd7253b0133308cf000d \ + --hash=sha256:68d1f8a9e9e37c1223b656399be5d6b448dea850bed7d0f87a8311f1ff3dabb0 \ + --hash=sha256:6ac7ffc7ad6d040517be39eb591cac5ff87416c2537df6ba3cba3bae290c0fed \ + --hash=sha256:6b3251890fff30ee142c44144871185dbe13b11bab478a88887a639655be1068 \ + --hash=sha256:6c4caeef8fa63d06bd437cd4bdcf3ffefe6738fb1b25951440d80dc7df8c03ac \ + --hash=sha256:6ef1d82a3af9d3eecdba2321dc1b3c238245d890843e040e41e470ffa64c3e25 \ + --hash=sha256:753f10e867343b4511128c6ed8c82f7bec3bd026875576dfd88483c5c73b2fd8 \ + --hash=sha256:7cd13a2e3ddeed6913a65e66e94b51d80a041145a026c27e6bb76c31a853c6ab \ + --hash=sha256:7ed9e526742851e8d5cc9e6cf41427dfc6068d4f5a3bb03659444b4cabf6bc26 \ + --hash=sha256:7f04c839ed0b6b98b1a7501a002144b76c18fb1c1850c8b98d458ac269e26ed2 \ + --hash=sha256:802fe99cca7457642125a8a88a084cef28ff0cf9407060f7b93dca5aa25480db \ + --hash=sha256:80402cd6ee291dcb72644d6eac93785fe2c8b9cb30893c1af5b8fdd753b9d40f \ + --hash=sha256:8465322196c8b4d7ab6d1e049e4c5cb460d0394da4a27d23cc242fbf0034b6b5 \ + --hash=sha256:86216b5cee4b06df986d214f664305142d9c76df9b6512be2738aa72a2048f99 \ + --hash=sha256:87d1351268731db79e0f8e745d92493ee2841c974128ef629dc518b937d9194c \ + --hash=sha256:8bdb58ff7ba23002a4c5808d608e4e6c687175724f54a5dade5fa8c67b604e4d \ + --hash=sha256:8c622a5fe39a48f78944a87d4fb8a53ee07344641b0562c540d840748571b811 \ + --hash=sha256:8d756e44e94489e49571086ef83b2bb8ce311e730092d2c34ca8f7d925cb20aa \ + --hash=sha256:8f4a014bc36d3c57402e2977dada34f9c12300af536839dc38c0beab8878f38a \ + --hash=sha256:9063e24fdb1e498ab71cb7419e24622516c4a04476b17a2dab57e8baa30d6e03 \ + --hash=sha256:90d558489962fd4918143277a773316e56c72da56ec7aa3dc3dbbe20fdfed15b \ + --hash=sha256:923c0c831b7cfcb071580d3f46c4baf50f174be571576556269530f4bbd79d04 \ + --hash=sha256:95f2a5796329323b8f0512e09dbb7a1860c46a39da62ecb2324f116fa8fdc85c \ + --hash=sha256:96b02a3dc4381e5494fad39be677abcb5e6634bf7b4fa83a6dd3112607547001 \ + --hash=sha256:9f96df6923e21816da7e0ad3fd47dd8f94b2a5ce594e00677c0013018b813458 \ + --hash=sha256:a10af20b82360ab00827f916a6058451b723b4e65030c5a18577c8b2de5b3389 \ + --hash=sha256:a50aebfa173e157099939b17f18600f72f84eed3049e743b68ad15bd69b6bf99 \ + --hash=sha256:a981a536974bbc7a512cf44ed14938cf01030a99e9b3a06dd59578882f06f985 \ + --hash=sha256:a9a8e9031d613fd2009c182b69c7b2c1ef8239a0efb1df3f7c8da66d5dd3d537 \ + --hash=sha256:ae5f4161f18c61806f411a13b0310bea87f987c7d2ecdbdaad0e94eb2e404238 \ + --hash=sha256:aed38f6e4fb3f5d6bf81bfa990a07806be9d83cf7bacef998ab1a9bd660a581f \ + --hash=sha256:b01b88d45a6fcb69667cd6d2f7a9aeb4bf53760d7fc536bf679ec94fe9f3ff3d \ + --hash=sha256:b261ccdec7821281dade748d088bb6e9b69e6d15b30652b74cbbac25e280b796 \ + --hash=sha256:b2b0a0c0517616b6869869f8c581d4eb2dd83a4d79e0ebcb7d373ef9956aeb0a \ + --hash=sha256:b4a23f61ce87adf89be746c8a8974fe1c823c891d8f86eb218bb957c924bb143 \ + --hash=sha256:bd8f7df7d12c2db9fab40bdd87a7c09b1530128315d047a086fa3ae3435cb3a8 \ + --hash=sha256:beb58fe5cdb101e3a055192ac291b7a21e3b7ef4f67fa1d74e331a7f2124341c \ + --hash=sha256:c002b4ffc0be611f0d9da932eb0f704fe2602a9a949d1f738e4c34c75b0863d5 \ + --hash=sha256:c083af607d2515612056a31f0a8d9e0fcb5876b7bfc0abad3ecd275bc4ebc2d5 \ + --hash=sha256:c180f51afb394e165eafe4ac2936a14bee3eb10debc9d9e4db8958fe36afe711 \ + --hash=sha256:c235ebd9baae02f1b77bcea61bce332cb4331dc3617d254df3323aa01ab47bd4 \ + --hash=sha256:cd70574b12bb8a4d2aaa0094515df2463cb429d8536cfb6c7ce983246983e5a6 \ + --hash=sha256:d0eccceffcb53201b5bfebb52600a5fb483a20b61da9dbc885f8b103cbe7598c \ + --hash=sha256:d965bba47ddeec8cd560687584e88cf699fd28f192ceb452d1d7ee807c5597b7 \ + --hash=sha256:db364eca23f876da6f9e16c9da0df51aa4f104a972735574842618b8c6d999d4 \ + --hash=sha256:ddbb2551d7e0102e7252db79ba445cdab71b26640817ab1e3e3648dad515003b \ + --hash=sha256:deb6be0ac38ece9ba87dea880e438f25ca3eddfac8b002a2ec3d9183a454e8ae \ + --hash=sha256:e06ed3eb3218bc64786f7db41917d4e686cc4856944f53d5bdf83a6884432e12 \ + --hash=sha256:e27ad930a842b4c5eb8ac0016b0a54f5aebbe679340c26101df33424142c143c \ + --hash=sha256:e537484df0d8f426ce2afb2d0f8e1c3d0b114b83f8850e5f2fbea0e797bd82ae \ + --hash=sha256:eb00ed941194665c332bf8e078baf037d6c35d7c4f3102ea2d4f16ca94a26dc8 \ + --hash=sha256:eb6904c354526e758fda7167b33005998fb68c46fbc10e013ca97f21ca5c8887 \ + --hash=sha256:eb8821e09e916165e160797a6c17edda0679379a4be5c716c260e836e122f54b \ + --hash=sha256:efcb3f6676480691518c177e3b465bcddf57cea040302f9f4e6e191af91174d4 \ + --hash=sha256:f27273b60488abe721a075bcca6d7f3964f9f6f067c8c4c605743023d7d3944f \ + --hash=sha256:f30c3cb33b24454a82faecaf01b19c18562b1e89558fb6c56de4d9118a032fd5 \ + --hash=sha256:fb69256e180cb6c8a894fee62b3afebae785babc1ee98b81cdf68bbca1987f33 \ + --hash=sha256:fd1abc0d89e30cc4e02e4064dc67fcc51bd941eb395c502aac3ec19fab46b519 \ + --hash=sha256:ff8fa367d09b717b2a17a052544193ad76cd49979c805768879cb63d9ca50561 + # via requests +cython==3.0.9 \ + --hash=sha256:000af6deb7412eb7ac0c635ff5e637fb8725dd0a7b88cc58dfc2b3de14e701c4 \ + --hash=sha256:00b105b5d050645dd59e6767bc0f18b48a4aa11c85f42ec7dd8181606f4059e3 \ + --hash=sha256:02c6e809f060bed073dc7cba1648077fe3b68208863d517c8b39f3920eecf9dd \ + --hash=sha256:09c44501d476d16aaa4cbc29c87f8c0f54fc20e69b650d59cbfa4863426fc70c \ + --hash=sha256:1315aee506506e8d69cf6631d8769e6b10131fdcc0eb66df2698f2a3ddaeeff2 \ + --hash=sha256:1365d5f76bf4d19df3d19ce932584c9bb76e9fb096185168918ef9b36e06bfa4 \ + --hash=sha256:157973807c2796addbed5fbc4d9c882ab34bbc60dc297ca729504901479d5df7 \ + --hash=sha256:158c38360bbc5063341b1e78d3737f1251050f89f58a3df0d10fb171c44262be \ + --hash=sha256:15c7f5c2d35bed9aa5f2a51eaac0df23ae72f2dbacf62fc672dd6bfaa75d2d6f \ + --hash=sha256:1dc320a9905ab95414013f6de805efbff9e17bb5fb3b90bbac533f017bec8136 \ + --hash=sha256:22b8fae756c5c0d8968691bed520876de452f216c28ec896a00739a12dba3bd9 \ + --hash=sha256:296bd30d4445ac61b66c9d766567f6e81a6e262835d261e903c60c891a6729d3 \ + --hash=sha256:2cc92504b5d22ac66031ffb827bd3a967fc75a5f0f76ab48bce62df19be6fdfd \ + --hash=sha256:2fd198c1a7f8e9382904d622cc0efa3c184605881fd5262c64cbb7168c4c1ec5 \ + --hash=sha256:33b6ac376538a7fc8c567b85d3c71504308a9318702ec0485dd66c059f3165cb \ + --hash=sha256:357e2fad46a25030b0c0496487e01a9dc0fdd0c09df0897f554d8ba3c1bc4872 \ + --hash=sha256:35e6665a20d6b8a152d72b7fd87dbb2af6bb6b18a235b71add68122d594dbd41 \ + --hash=sha256:36f5a2dfc724bea1f710b649f02d802d80fc18320c8e6396684ba4a48412445a \ + --hash=sha256:38df37d0e732fbd9a2fef898788492e82b770c33d1e4ed12444bbc8a3b3f89c0 \ + --hash=sha256:44457279da56e0f829bb1fc5a5dc0836e5d498dbcf9b2324f32f7cc9d2ec6569 \ + --hash=sha256:4558814fa025b193058d42eeee498a53d6b04b2980d01339fc2444b23fd98e58 \ + --hash=sha256:4ae349960ebe0da0d33724eaa7f1eb866688fe5434cc67ce4dbc06d6a719fbfc \ + --hash=sha256:4cf0ed273bf60e97922fcbbdd380c39693922a597760160b4b4355e6078ca188 \ + --hash=sha256:4f836192140f033b2319a0128936367c295c2b32e23df05b03b672a6015757ea \ + --hash=sha256:5055988b007c92256b6e9896441c3055556038c3497fcbf8c921a6c1fce90719 \ + --hash=sha256:530c01c4aebba709c0ec9c7ecefe07177d0b9fd7ffee29450a118d92192ccbdf \ + --hash=sha256:539cd1d74fd61f6cfc310fa6bbbad5adc144627f2b7486a07075d4e002fd6aad \ + --hash=sha256:56f3b643dbe14449248bbeb9a63fe3878a24256664bc8c8ef6efd45d102596d8 \ + --hash=sha256:5eb9bd4ae12ebb2bc79a193d95aacf090fbd8d7013e11ed5412711650cb34934 \ + --hash=sha256:63d2537bf688247f76ded6dee28ebd26274f019309aef1eb4f2f9c5c482fde2d \ + --hash=sha256:757ca93bdd80702546df4d610d2494ef2e74249cac4d5ba9464589fb464bd8a3 \ + --hash=sha256:858c3766b9aa3ab8a413392c72bbab1c144a9766b7c7bfdef64e2e414363fa0c \ + --hash=sha256:8bf30b045f7deda0014b042c1b41c1d272facc762ab657529e3b05505888e878 \ + --hash=sha256:8edd59d22950b400b03ca78d27dc694d2836a92ef0cac4f64cb4b2ff902f7e25 \ + --hash=sha256:95ed792c966f969cea7489c32ff90150b415c1f3567db8d5a9d489c7c1602dac \ + --hash=sha256:976c8d2bedc91ff6493fc973d38b2dc01020324039e2af0e049704a8e1b22936 \ + --hash=sha256:9a001fd95c140c94d934078544ff60a3c46aca2dc86e75a76e4121d3cd1f4b33 \ + --hash=sha256:9cda0d92a09f3520f29bd91009f1194ba9600777c02c30c6d2d4ac65fb63e40d \ + --hash=sha256:a274fe9ca5c53fafbcf5c8f262f8ad6896206a466f0eeb40aaf36a7951e957c0 \ + --hash=sha256:a2d354f059d1f055d34cfaa62c5b68bc78ac2ceab6407148d47fb508cf3ba4f3 \ + --hash=sha256:a30d96938c633e3ec37000ac3796525da71254ef109e66bdfd78f29891af6454 \ + --hash=sha256:ac5536d09bef240cae0416d5a703d298b74c7bbc397da803ac9d344e732d4369 \ + --hash=sha256:ad7fd88ebaeaf2e76fd729a8919fae80dab3d6ac0005e28494261d52ff347a8f \ + --hash=sha256:bf96417714353c5454c2e3238fca9338599330cf51625cdc1ca698684465646f \ + --hash=sha256:c0eb1e6ef036028a52525fd9a012a556f6dd4788a0e8755fe864ba0e70cde2ff \ + --hash=sha256:c232e7f279388ac9625c3e5a5a9f0078a9334959c5d6458052c65bbbba895e1e \ + --hash=sha256:c3232926cd406ee02eabb732206f6e882c3aed9d58f0fea764013d9240405bcf \ + --hash=sha256:c4b419a1adc2af43f4660e2f6eaf1e4fac2dbac59490771eb8ac3d6063f22356 \ + --hash=sha256:c8191941073ea5896321de3c8c958fd66e5f304b0cd1f22c59edd0b86c4dd90d \ + --hash=sha256:cc9c3b9f20d8e298618e5ccd32083ca386e785b08f9893fbec4c50b6b85be772 \ + --hash=sha256:d6f3ff1cd6123973fe03e0fb8ee936622f976c0c41138969975824d08886572b \ + --hash=sha256:d9360606d964c2d0492a866464efcf9d0a92715644eede3f6a2aa696de54a137 \ + --hash=sha256:deaf4197d4b0bcd5714a497158ea96a2bd6d0f9636095437448f7e06453cc83d \ + --hash=sha256:e32b016030bc72a8a22a1f21f470a2f57573761a4f00fbfe8347263f4fbdb9f1 \ + --hash=sha256:ec612418490941ed16c50c8d3784c7bdc4c4b2a10c361259871790b02ec8c1db \ + --hash=sha256:f496b52845cb45568a69d6359a2c335135233003e708ea02155c10ce3548aa89 \ + --hash=sha256:f49aa4970cd3bec66ac22e701def16dca2a49c59cceba519898dd7526e0be2c0 \ + --hash=sha256:f92f4960c40ad027bd8c364c50db11104eadc59ffeb9e5b7f605ca2f05946e20 # via eflomal -eflomal==1.0.0b1 +eflomal==1.0.0b1 \ + --hash=sha256:49e34be37016084052a899e32ab57c3e485c248322d2a3d0dd16e5442ecc5558 # via -r pipeline/alignments/requirements/alignments.in -numpy==1.26.4 +idna==3.8 \ + --hash=sha256:050b4e5baadcd44d760cedbd2b8e639f2ff89bbc7a5730fcc662954303377aac \ + --hash=sha256:d838c2c0ed6fced7693d5e8ab8e734d5f8fda53a039c0164afb0b82e771e3603 + # via requests +numpy==1.26.4 \ + --hash=sha256:03a8c78d01d9781b28a6989f6fa1bb2c4f2d51201cf99d3dd875df6fbd96b23b \ + --hash=sha256:08beddf13648eb95f8d867350f6a018a4be2e5ad54c8d8caed89ebca558b2818 \ + --hash=sha256:1af303d6b2210eb850fcf03064d364652b7120803a0b872f5211f5234b399f20 \ + --hash=sha256:1dda2e7b4ec9dd512f84935c5f126c8bd8b9f2fc001e9f54af255e8c5f16b0e0 \ + --hash=sha256:2a02aba9ed12e4ac4eb3ea9421c420301a0c6460d9830d74a9df87efa4912010 \ + --hash=sha256:2e4ee3380d6de9c9ec04745830fd9e2eccb3e6cf790d39d7b98ffd19b0dd754a \ + --hash=sha256:3373d5d70a5fe74a2c1bb6d2cfd9609ecf686d47a2d7b1d37a8f3b6bf6003aea \ + --hash=sha256:47711010ad8555514b434df65f7d7b076bb8261df1ca9bb78f53d3b2db02e95c \ + --hash=sha256:4c66707fabe114439db9068ee468c26bbdf909cac0fb58686a42a24de1760c71 \ + --hash=sha256:50193e430acfc1346175fcbdaa28ffec49947a06918b7b92130744e81e640110 \ + --hash=sha256:52b8b60467cd7dd1e9ed082188b4e6bb35aa5cdd01777621a1658910745b90be \ + --hash=sha256:60dedbb91afcbfdc9bc0b1f3f402804070deed7392c23eb7a7f07fa857868e8a \ + --hash=sha256:62b8e4b1e28009ef2846b4c7852046736bab361f7aeadeb6a5b89ebec3c7055a \ + --hash=sha256:666dbfb6ec68962c033a450943ded891bed2d54e6755e35e5835d63f4f6931d5 \ + --hash=sha256:675d61ffbfa78604709862923189bad94014bef562cc35cf61d3a07bba02a7ed \ + --hash=sha256:679b0076f67ecc0138fd2ede3a8fd196dddc2ad3254069bcb9faf9a79b1cebcd \ + --hash=sha256:7349ab0fa0c429c82442a27a9673fc802ffdb7c7775fad780226cb234965e53c \ + --hash=sha256:7ab55401287bfec946ced39700c053796e7cc0e3acbef09993a9ad2adba6ca6e \ + --hash=sha256:7e50d0a0cc3189f9cb0aeb3a6a6af18c16f59f004b866cd2be1c14b36134a4a0 \ + --hash=sha256:95a7476c59002f2f6c590b9b7b998306fba6a5aa646b1e22ddfeaf8f78c3a29c \ + --hash=sha256:96ff0b2ad353d8f990b63294c8986f1ec3cb19d749234014f4e7eb0112ceba5a \ + --hash=sha256:9fad7dcb1aac3c7f0584a5a8133e3a43eeb2fe127f47e3632d43d677c66c102b \ + --hash=sha256:9ff0f4f29c51e2803569d7a51c2304de5554655a60c5d776e35b4a41413830d0 \ + --hash=sha256:a354325ee03388678242a4d7ebcd08b5c727033fcff3b2f536aea978e15ee9e6 \ + --hash=sha256:a4abb4f9001ad2858e7ac189089c42178fcce737e4169dc61321660f1a96c7d2 \ + --hash=sha256:ab47dbe5cc8210f55aa58e4805fe224dac469cde56b9f731a4c098b91917159a \ + --hash=sha256:afedb719a9dcfc7eaf2287b839d8198e06dcd4cb5d276a3df279231138e83d30 \ + --hash=sha256:b3ce300f3644fb06443ee2222c2201dd3a89ea6040541412b8fa189341847218 \ + --hash=sha256:b97fe8060236edf3662adfc2c633f56a08ae30560c56310562cb4f95500022d5 \ + --hash=sha256:bfe25acf8b437eb2a8b2d49d443800a5f18508cd811fea3181723922a8a82b07 \ + --hash=sha256:cd25bcecc4974d09257ffcd1f098ee778f7834c3ad767fe5db785be9a4aa9cb2 \ + --hash=sha256:d209d8969599b27ad20994c8e41936ee0964e6da07478d6c35016bc386b66ad4 \ + --hash=sha256:d5241e0a80d808d70546c697135da2c613f30e28251ff8307eb72ba696945764 \ + --hash=sha256:edd8b5fe47dab091176d21bb6de568acdd906d1887a4584a15a9a96a1dca06ef \ + --hash=sha256:f870204a840a60da0b12273ef34f7051e98c3b5961b61b0c2c1be6dfd64fbcd3 \ + --hash=sha256:ffa75af20b44f8dba823498024771d5ac50620e6915abac414251bd971b4529f # via eflomal -opus-fast-mosestokenizer==0.0.8.5 +opus-fast-mosestokenizer==0.0.8.5 \ + --hash=sha256:0b19bb8e01c31c62814d9a9aef2bea41ad6de8b23cfde7279bc0bc6bc6fab5c0 \ + --hash=sha256:2fc28ddfecf00f80022186f6f703ccb76c0c794c5e39248758d0b1bcffd30c84 \ + --hash=sha256:6e9e67dbc63a1c38af981f2cdb24b85e18cb1880b61e3ec57a084c8b5ab6b8d9 \ + --hash=sha256:7d3c7df00c09a362fdfacfb15cbff83e8d039b6aa6b129824610a75046430e86 \ + --hash=sha256:8eeac5d174abe6e32c6f3b3830369aafa22c27b9008aa10d5611cbe865386025 \ + --hash=sha256:a4f02e53a508a95d82ac868c03647c4472208074c2b6a308b08266caa2d085d3 \ + --hash=sha256:c563a02c5f753d2f7f44ebd6ddfc3a6c368b538568567629e5b8e876c2967a9d \ + --hash=sha256:c82fc1cabe4ebe6fa59dae115ec5f55fec9099f460eb302cb9c22e3a88ccdd01 \ + --hash=sha256:d1e20514411d70abab57e30102fccac1df4e1b26016c21d6d2adcf5a14f9d97d \ + --hash=sha256:d22e5a684a4f7b50ea3c5f88d80bf0a628a446df67d307be74d72f8c668fc37e \ + --hash=sha256:db1a7ba69c65f08a30e3e4c8e5bad59a7a56b6023c29a5600be0e056cfcd25a6 \ + --hash=sha256:e4b653fb22334af30649a7a3c76792fafad97c8f316ba9a6a53f6c2f0707e720 \ + --hash=sha256:fa693333e98cb6277eb702be36b1e93a74001e2a032985445a320c502773811d # via -r pipeline/alignments/requirements/alignments.in -tqdm==4.66.4 +requests==2.31.0 \ + --hash=sha256:58cd2187c01e70e6e26505bca751777aa9f2ee0b7f4300988b709f44e013003f \ + --hash=sha256:942c5a758f98d790eaed1a29cb6eefc7ffb0d1cf7af05c3d2791656dbd6ad1e1 # via -r pipeline/alignments/requirements/alignments.in +tqdm==4.66.4 \ + --hash=sha256:b75ca56b413b030bc3f00af51fd2c1a1a5eac6a0c1cca83cbb37a5c52abce644 \ + --hash=sha256:e4d936c9de8727928f3be6079590e97d9abfe8d39a590be678eb5919ffc186bb + # via -r pipeline/alignments/requirements/alignments.in +urllib3==2.2.2 \ + --hash=sha256:a448b2f64d686155468037e1ace9f2d2199776e17f0a46610480d311f73e3472 \ + --hash=sha256:dd505485549a7a552833da5e6063639d0d177c04f23bc3864e41e5dc5f612168 + # via requests diff --git a/pipeline/bicleaner/bicleaner.sh b/pipeline/bicleaner/bicleaner.sh index 9acbb3002..de075533b 100755 --- a/pipeline/bicleaner/bicleaner.sh +++ b/pipeline/bicleaner/bicleaner.sh @@ -24,9 +24,6 @@ bicleaner_threshold=$3 threads=$4 pack_dir=$5 -COMPRESSION_CMD="${COMPRESSION_CMD:-pigz}" -ARTIFACT_EXT="${ARTIFACT_EXT:-gz}" - if [ "$threads" = "auto" ]; then threads=$(nproc) fi @@ -36,8 +33,8 @@ mkdir -p "${output_dir}" if [ "${bicleaner_threshold}" == "0" ] || [ "${bicleaner_threshold}" == "0.0" ]; then echo "Threshold is 0, skipping filtering" - cp "${corpus_prefix}.${SRC}.${ARTIFACT_EXT}" "${output_prefix}.${SRC}.${ARTIFACT_EXT}" - cp "${corpus_prefix}.${TRG}.${ARTIFACT_EXT}" "${output_prefix}.${TRG}.${ARTIFACT_EXT}" + cp "${corpus_prefix}.${SRC}.zst" "${output_prefix}.${SRC}.zst" + cp "${corpus_prefix}.${TRG}.zst" "${output_prefix}.${TRG}.zst" else export scol=1 @@ -83,32 +80,32 @@ else } export -f biclean # {%} is a 1-indexed job slot number from GNU parallel. We use that as the 1-indexed offset in CUDA_VISIBLE_ARRAY - paste <(${COMPRESSION_CMD} -dc "${corpus_prefix}.${SRC}.${ARTIFACT_EXT}") <(${COMPRESSION_CMD} -dc "${corpus_prefix}.${TRG}.${ARTIFACT_EXT}") | + paste <(zstdmt -dc "${corpus_prefix}.${SRC}.zst") <(zstdmt -dc "${corpus_prefix}.${TRG}.zst") | parallel -j ${#CUDA_VISIBLE_ARRAY[@]} --pipe -k --block 10M biclean "${pack_dir}"/*.yaml {%} | - ${COMPRESSION_CMD} >"${output_prefix}.scored.${ARTIFACT_EXT}" + zstdmt >"${output_prefix}.scored.zst" else export BICLEANER_AI_THREADS=${threads} - paste <(${COMPRESSION_CMD} -dc "${corpus_prefix}.${SRC}.${ARTIFACT_EXT}") <(${COMPRESSION_CMD} -dc "${corpus_prefix}.${TRG}.${ARTIFACT_EXT}") | + paste <(zstdmt -dc "${corpus_prefix}.${SRC}.zst") <(zstdmt -dc "${corpus_prefix}.${TRG}.zst") | bicleaner-ai-classify ${hardrules} --scol ${scol} --tcol ${tcol} "${threads}" - - "${pack_dir}"/*.yaml | - ${COMPRESSION_CMD} >"${output_prefix}.scored.${ARTIFACT_EXT}" + zstdmt >"${output_prefix}.scored.zst" fi echo "### Filtering" - ${COMPRESSION_CMD} -dc "${output_prefix}.scored.${ARTIFACT_EXT}" | + zstdmt -dc "${output_prefix}.scored.zst" | awk -v threshold=${bicleaner_threshold} -F"\t" '{if ($3>threshold) {print $0}}' | - ${COMPRESSION_CMD} >"${output_prefix}.best.${ARTIFACT_EXT}" + zstdmt >"${output_prefix}.best.zst" - ${COMPRESSION_CMD} -dc "${output_prefix}.scored.${ARTIFACT_EXT}" | + zstdmt -dc "${output_prefix}.scored.zst" | awk -v threshold=${bicleaner_threshold} -F"\t" '{if ($3<=threshold) {print $0}}' | - ${COMPRESSION_CMD} >"${output_prefix}.filtered.${ARTIFACT_EXT}" + zstdmt >"${output_prefix}.filtered.zst" - echo "Lines before filtering: $(${COMPRESSION_CMD} -dc "${output_prefix}.scored.${ARTIFACT_EXT}" | wc -l)" - echo "Lines after filtering: $(${COMPRESSION_CMD} -dc "${output_prefix}.best.${ARTIFACT_EXT}" | wc -l)" + echo "Lines before filtering: $(zstdmt -dc "${output_prefix}.scored.zst" | wc -l)" + echo "Lines after filtering: $(zstdmt -dc "${output_prefix}.best.zst" | wc -l)" echo "### Writing output corpus" - ${COMPRESSION_CMD} -dc "${output_prefix}.best.${ARTIFACT_EXT}" | - tee >(cut -f1 | ${COMPRESSION_CMD} >"${output_prefix}.${SRC}.${ARTIFACT_EXT}") | - cut -f2 | ${COMPRESSION_CMD} >"${output_prefix}.${TRG}.${ARTIFACT_EXT}" + zstdmt -dc "${output_prefix}.best.zst" | + tee >(cut -f1 | zstdmt >"${output_prefix}.${SRC}.zst") | + cut -f2 | zstdmt >"${output_prefix}.${TRG}.zst" # do not delete intermediate files to inspect them and tune the threshold fi diff --git a/pipeline/bicleaner/download_pack.py b/pipeline/bicleaner/download_pack.py index 5ccbfdb63..dc0c27035 100644 --- a/pipeline/bicleaner/download_pack.py +++ b/pipeline/bicleaner/download_pack.py @@ -7,7 +7,6 @@ python download_pack.py \ --src=en \ --trg=ru \ - --compression_cmd=zstd \ artifacts/bicleaner-model-en-ru.zst """ @@ -19,6 +18,7 @@ import tempfile from typing import Optional +from pipeline.common.downloads import compress_file from pipeline.common.logging import get_logger logger = get_logger(__file__) @@ -36,22 +36,14 @@ def _run_download(src: str, trg: str, dir: str) -> subprocess.CompletedProcess: ) -def _compress_dir(dir_path: str, compression_cmd: str) -> str: +def _compress_dir(dir_path: str) -> str: logger.info(f"Compressing {dir_path}") - if compression_cmd not in ["gzip", "zstd", "zstdmt", "pigz"]: - raise ValueError(f"Unsupported compression tool {compression_cmd}.") tarball_path = f"{dir_path}.tar" with tarfile.open(tarball_path, "w") as tar: tar.add(dir_path, arcname=os.path.basename(dir_path)) - if compression_cmd in ("gzip", "pigz"): - comp_ext = ".gz" - else: - comp_ext = ".zst" - - compressed_path = tarball_path + comp_ext - subprocess.run([compression_cmd, tarball_path], check=True) + compressed_path = str(compress_file(tarball_path)) return compressed_path @@ -64,7 +56,7 @@ def check_result(result: subprocess.CompletedProcess): result.check_returncode() -def download(src: str, trg: str, output_path: str, compression_cmd: str) -> None: +def download(src: str, trg: str, output_path: str) -> None: tmp_dir = os.path.join(tempfile.gettempdir(), f"bicleaner-ai-{src}-{trg}") if os.path.exists(tmp_dir): @@ -105,9 +97,8 @@ def download(src: str, trg: str, output_path: str, compression_cmd: str) -> None print(f"The model for {src}-{trg} is downloaded") pack_path = tmp_dir - if compression_cmd: - logger.info("Compress the downloaded pack.") - pack_path = _compress_dir(pack_path, compression_cmd) + logger.info("Compress the downloaded pack.") + pack_path = _compress_dir(pack_path) # Move to the expected path logger.info(f"Moving {pack_path} to {output_path}") @@ -123,12 +114,6 @@ def main(args: Optional[list[str]] = None) -> None: ) parser.add_argument("--src", type=str, help="Source language code") parser.add_argument("--trg", type=str, help="Target language code") - parser.add_argument( - "--compression_cmd", - type=str, - help="Compression command (eg. pigz, zstd). " - "Optional, if not provided the directory will not be compressed", - ) parser.add_argument( "output_path", type=str, @@ -141,7 +126,6 @@ def main(args: Optional[list[str]] = None) -> None: src=parsed_args.src, trg=parsed_args.trg, output_path=parsed_args.output_path, - compression_cmd=parsed_args.compression_cmd, ) diff --git a/pipeline/cefilter/ce-filter.sh b/pipeline/cefilter/ce-filter.sh index 15a564876..ab083b0c3 100755 --- a/pipeline/cefilter/ce-filter.sh +++ b/pipeline/cefilter/ce-filter.sh @@ -14,9 +14,6 @@ corpus_prefix=$1 output_prefix=$2 scores=$3 -COMPRESSION_CMD="${COMPRESSION_CMD:-pigz}" -ARTIFACT_EXT="${ARTIFACT_EXT:-gz}" - cd "$(dirname "${0}")" # Part of the data to be removed (0.05 is 5%) @@ -26,24 +23,24 @@ tmp="${output_dir}/tmp" mkdir -p "${tmp}" echo "### Sorting scores" -if [ ! -s "${tmp}/sorted.${ARTIFACT_EXT}" ]; then +if [ ! -s "${tmp}/sorted.zst" ]; then buffer_size="$(echo "$(grep MemTotal /proc/meminfo | awk '{print $2}')"*0.9 | bc | cut -f1 -d.)" - paste "${scores}" <(${COMPRESSION_CMD} -dc "${corpus_prefix}.${SRC}.${ARTIFACT_EXT}") <(${COMPRESSION_CMD} -dc "${corpus_prefix}.${TRG}.${ARTIFACT_EXT}") | + paste "${scores}" <(zstdmt -dc "${corpus_prefix}.${SRC}.zst") <(zstdmt -dc "${corpus_prefix}.${TRG}.zst") | LC_ALL=C sort -n -k1,1 -S "${buffer_size}K" -T "${tmp}" | - ${COMPRESSION_CMD} >"${tmp}/sorted.${ARTIFACT_EXT}" + zstdmt >"${tmp}/sorted.zst" fi echo "### Cutting the best scored corpus" -if [ ! -s "${tmp}/best.${ARTIFACT_EXT}" ]; then - lines=$(${COMPRESSION_CMD} -dc "${tmp}/sorted.${ARTIFACT_EXT}" | wc -l) +if [ ! -s "${tmp}/best.zst" ]; then + lines=$(zstdmt -dc "${tmp}/sorted.zst" | wc -l) startline=$(echo ${lines}*${remove} | bc | cut -f1 -d.) - ${COMPRESSION_CMD} -dc "${tmp}/sorted.${ARTIFACT_EXT}" | tail -n +${startline} | cut -f2,3 | ${COMPRESSION_CMD} >"${tmp}/best.${ARTIFACT_EXT}" + zstdmt -dc "${tmp}/sorted.zst" | tail -n +${startline} | cut -f2,3 | zstdmt >"${tmp}/best.zst" fi echo "### Writing output corpus" -${COMPRESSION_CMD} -dc "${tmp}/best.${ARTIFACT_EXT}" | - tee >(cut -f1 | ${COMPRESSION_CMD} >"${output_prefix}.${SRC}.${ARTIFACT_EXT}") | - cut -f2 | ${COMPRESSION_CMD} >"${output_prefix}.${TRG}.${ARTIFACT_EXT}" +zstdmt -dc "${tmp}/best.zst" | + tee >(cut -f1 | zstdmt >"${output_prefix}.${SRC}.zst") | + cut -f2 | zstdmt >"${output_prefix}.${TRG}.zst" echo "### Deleting tmp dir" rm -rf "${tmp}" diff --git a/pipeline/cefilter/score.sh b/pipeline/cefilter/score.sh index b899f4a96..119f33ccf 100755 --- a/pipeline/cefilter/score.sh +++ b/pipeline/cefilter/score.sh @@ -19,15 +19,8 @@ vocab=$2 corpus_prefix=$3 output=$4 -ARTIFACT_EXT="${ARTIFACT_EXT:-gz}" - -if [ "${ARTIFACT_EXT}" = "zst" ]; then - zstdmt --rm -d "${corpus_prefix}.${SRC}.${ARTIFACT_EXT}" - zstdmt --rm -d "${corpus_prefix}.${TRG}.${ARTIFACT_EXT}" - ARTIFACT_EXT="" -else - ARTIFACT_EXT=".gz" -fi +zstdmt --rm -d "${corpus_prefix}.${SRC}.zst" +zstdmt --rm -d "${corpus_prefix}.${TRG}.zst" dir=$(dirname "${output}") mkdir -p "${dir}" @@ -35,7 +28,7 @@ mkdir -p "${dir}" "${MARIAN}/marian-scorer" \ --model "${model}" \ --vocabs "${vocab}" "${vocab}" \ - --train-sets "${corpus_prefix}.${TRG}${ARTIFACT_EXT}" "${corpus_prefix}.${SRC}${ARTIFACT_EXT}" \ + --train-sets "${corpus_prefix}.${TRG}" "${corpus_prefix}.${SRC}" \ --mini-batch 32 \ --mini-batch-words 1500 \ --maxi-batch 1000 \ diff --git a/pipeline/clean/clean-corpus.sh b/pipeline/clean/clean-corpus.sh index dccf31cc9..ef3a22a95 100755 --- a/pipeline/clean/clean-corpus.sh +++ b/pipeline/clean/clean-corpus.sh @@ -17,9 +17,6 @@ output_prefix=$2 threads=$3 dataset=$4 -COMPRESSION_CMD="${COMPRESSION_CMD:-pigz}" -ARTIFACT_EXT="${ARTIFACT_EXT:-gz}" - if [ "$threads" = "auto" ]; then threads=$(nproc) fi @@ -34,24 +31,24 @@ echo "### Cleaning ${input_prefix}" ###################################################################### echo "### Basic preprocessing" for lng in "${SRC}" "${TRG}"; do - test -s "${output_prefix}.${lng}.nrm.${ARTIFACT_EXT}" || - ${COMPRESSION_CMD} -dc "${input_prefix}.${lng}.${ARTIFACT_EXT}" | + test -s "${output_prefix}.${lng}.nrm.zst" || + zstdmt -dc "${input_prefix}.${lng}.zst" | parallel --no-notice --pipe -k -j "${threads}" --block 50M \ "perl tools/deescape-special-chars.perl | perl tools/remove-non-printing-char.perl" | - ${COMPRESSION_CMD} >"${output_prefix}.${lng}.nrm.${ARTIFACT_EXT}" + zstdmt >"${output_prefix}.${lng}.nrm.zst" done ##################################################################### echo "### Apply monolingual fixes" for lng in $SRC $TRG; do if [[ ! -x fixes/${dataset}.${lng}.sh ]]; then - test -s "${output_prefix}.${lng}.monofix.${ARTIFACT_EXT}" || - cp "${output_prefix}.${lng}.nrm.${ARTIFACT_EXT}" "${output_prefix}.${lng}.monofix.${ARTIFACT_EXT}" + test -s "${output_prefix}.${lng}.monofix.zst" || + cp "${output_prefix}.${lng}.nrm.zst" "${output_prefix}.${lng}.monofix.zst" else - test -s "${output_prefix}.${lng}.monofix.${ARTIFACT_EXT}" || - ${COMPRESSION_CMD} -dc "${output_prefix}.${lng}.nrm.${ARTIFACT_EXT}" \ + test -s "${output_prefix}.${lng}.monofix.zst" || + zstdmt -dc "${output_prefix}.${lng}.nrm.zst" \ | fixes/"${dataset}"."${lng}".sh \ - | ${COMPRESSION_CMD} >"${output_prefix}.${lng}.monofix.${ARTIFACT_EXT}" + | zstdmt >"${output_prefix}.${lng}.monofix.zst" fi done @@ -62,55 +59,55 @@ if [[ -x fixes/${dataset}.sh ]]; then else FIX="cat" fi -test -s "${output_prefix}.${SRC}${TRG}.fix.${ARTIFACT_EXT}" || - paste <(${COMPRESSION_CMD} -dc "${output_prefix}.${SRC}.monofix.${ARTIFACT_EXT}") <(${COMPRESSION_CMD} -dc "${output_prefix}.${TRG}.monofix.${ARTIFACT_EXT}") \ +test -s "${output_prefix}.${SRC}${TRG}.fix.zst" || + paste <(zstdmt -dc "${output_prefix}.${SRC}.monofix.zst") <(zstdmt -dc "${output_prefix}.${TRG}.monofix.zst") \ | $FIX \ - | ${COMPRESSION_CMD} > "${output_prefix}.${SRC}${TRG}.fix.${ARTIFACT_EXT}" + | zstdmt > "${output_prefix}.${SRC}${TRG}.fix.zst" ###################################################################### echo "### Rule-based filtering" -test -s "${output_prefix}.${SRC}${TRG}.rule-based.${ARTIFACT_EXT}" || - ${COMPRESSION_CMD} -dc "${output_prefix}.${SRC}${TRG}.fix.${ARTIFACT_EXT}" | +test -s "${output_prefix}.${SRC}${TRG}.rule-based.zst" || + zstdmt -dc "${output_prefix}.${SRC}${TRG}.fix.zst" | parallel --no-notice --pipe -k -j "${threads}" --block 50M \ "python3 tools/clean_parallel.py -l1 ${SRC} -l2 ${TRG} --debug" \ 2>"${output_prefix}.${SRC}${TRG}.clean.debug.txt" | - ${COMPRESSION_CMD} >"${output_prefix}.${SRC}${TRG}.rule-based.${ARTIFACT_EXT}" + zstdmt >"${output_prefix}.${SRC}${TRG}.rule-based.zst" ###################################################################### echo "### Language identification" -test -s "${output_prefix}.${SRC}${TRG}.langid.${ARTIFACT_EXT}" || +test -s "${output_prefix}.${SRC}${TRG}.langid.zst" || # langid_fasttext.py will download this file if it is not already present. When it runs in # parallel, this will typically cause the file to be corrupt. test -s tools/lid.176.bin || wget -O tools/lid.176.bin https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin - ${COMPRESSION_CMD} -dc "${output_prefix}.${SRC}${TRG}.rule-based.${ARTIFACT_EXT}" | + zstdmt -dc "${output_prefix}.${SRC}${TRG}.rule-based.zst" | # memory intensive parallel --no-notice --pipe -k -j "$(echo "${threads}"/4 | bc)" --block 50M \ "python3 -Wi tools/langid_fasttext.py -f 1 | python3 -Wi tools/langid_fasttext.py -f 1" | grep -P "^${SRC}\t${TRG}\t" | cut -f3,4 | - ${COMPRESSION_CMD} >"${output_prefix}.${SRC}${TRG}.langid.${ARTIFACT_EXT}" + zstdmt >"${output_prefix}.${SRC}${TRG}.langid.zst" ###################################################################### echo "### Removing leading and repetitive white spaces" -${COMPRESSION_CMD} -dc "${output_prefix}.${SRC}${TRG}.langid.${ARTIFACT_EXT}" | +zstdmt -dc "${output_prefix}.${SRC}${TRG}.langid.zst" | cut -f1 | sed -e 's/^[[:space:]]*//' | tr -s " " | -${COMPRESSION_CMD} >"${output_prefix}.${SRC}.${ARTIFACT_EXT}" +zstdmt >"${output_prefix}.${SRC}.zst" -${COMPRESSION_CMD} -dc "${output_prefix}.${SRC}${TRG}.langid.${ARTIFACT_EXT}" | +zstdmt -dc "${output_prefix}.${SRC}${TRG}.langid.zst" | cut -f2 | sed -e 's/^[[:space:]]*//' | tr -s " " | -${COMPRESSION_CMD} >"${output_prefix}.${TRG}.${ARTIFACT_EXT}" +zstdmt >"${output_prefix}.${TRG}.zst" -test -s "${output_prefix}.${SRC}.${ARTIFACT_EXT}" || exit 1 -test -s "${output_prefix}.${TRG}.${ARTIFACT_EXT}" || exit 1 +test -s "${output_prefix}.${SRC}.zst" || exit 1 +test -s "${output_prefix}.${TRG}.zst" || exit 1 echo "### Remove input_prefix from intermediate steps" -rm -rf "${output_prefix}".*.nrm.${ARTIFACT_EXT} "${output_prefix}".*.langid.${ARTIFACT_EXT} \ - "${output_prefix}".*.rule-based.${ARTIFACT_EXT} "${output_prefix}".*.*fix.${ARTIFACT_EXT} +rm -rf "${output_prefix}".*.nrm.zst "${output_prefix}".*.langid.zst \ + "${output_prefix}".*.rule-based.zst "${output_prefix}".*.*fix.zst echo "### Clean ${input_prefix} is written to ${output_prefix}" diff --git a/pipeline/clean/clean-mono.sh b/pipeline/clean/clean-mono.sh index 864b17c38..9dfcbf6a9 100755 --- a/pipeline/clean/clean-mono.sh +++ b/pipeline/clean/clean-mono.sh @@ -24,9 +24,6 @@ dataset=$5 # news-crawl_news.2007 # Example output: /builds/worker/artifacts/news_2007.en.zst -COMPRESSION_CMD="${COMPRESSION_CMD:-pigz}" -ARTIFACT_EXT="${ARTIFACT_EXT:-gz}" - echo "### Cleaning ${input_prefix}" if [ "$threads" = "auto" ]; then @@ -40,53 +37,53 @@ mkdir -p "${dir}" ###################################################################### echo "### Basic preprocessing from moses" -test -s "${output_prefix}.${lang}.nrm.${ARTIFACT_EXT}" || - ${COMPRESSION_CMD} -dc "${input_prefix}.${lang}.${ARTIFACT_EXT}" | +test -s "${output_prefix}.${lang}.nrm.zst" || + zstdmt -dc "${input_prefix}.${lang}.zst" | parallel --no-notice --pipe -k -j "${threads}" --block 50M \ "perl tools/deescape-special-chars.perl | perl tools/remove-non-printing-char.perl" | - ${COMPRESSION_CMD} -c >"${output_prefix}.${lang}.nrm.${ARTIFACT_EXT}" + zstdmt -c >"${output_prefix}.${lang}.nrm.zst" ##################################################################### echo "### Apply dataset fixes from pipeline/clean/fixes" if [[ ! -x fixes/${dataset}.${lang}.sh ]]; then - test -s "${output_prefix}.${lang}.monofix.${ARTIFACT_EXT}" || - cp "${output_prefix}.${lang}.nrm.${ARTIFACT_EXT}" "${output_prefix}.${lang}.monofix.${ARTIFACT_EXT}" + test -s "${output_prefix}.${lang}.monofix.zst" || + cp "${output_prefix}.${lang}.nrm.zst" "${output_prefix}.${lang}.monofix.zst" else - test -s "${output_prefix}.${lang}.monofix.${ARTIFACT_EXT}" || - ${COMPRESSION_CMD} -dc "${output_prefix}.${lang}.nrm.${ARTIFACT_EXT}" \ + test -s "${output_prefix}.${lang}.monofix.zst" || + zstdmt -dc "${output_prefix}.${lang}.nrm.zst" \ | fixes/"${dataset}"."${lang}".sh \ - | ${COMPRESSION_CMD} >"${output_prefix}.${lang}.monofix.${ARTIFACT_EXT}" + | zstdmt >"${output_prefix}.${lang}.monofix.zst" fi ###################################################################### echo "### Filter by language identification" -test -s "${output_prefix}.${lang}.langid.${ARTIFACT_EXT}" || +test -s "${output_prefix}.${lang}.langid.zst" || # langid_fasttext.py will download this file if it is not already present. When it runs in # parallel, this will typically cause the file to be corrupt. test -s tools/lid.176.bin || wget -O tools/lid.176.bin https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin - ${COMPRESSION_CMD} -dc "${output_prefix}.${lang}.monofix.${ARTIFACT_EXT}" | + zstdmt -dc "${output_prefix}.${lang}.monofix.zst" | # memory intensive parallel --no-notice --pipe -k -j "$(echo "${threads}"/4 | bc)" --block 50M "python3 tools/langid_fasttext.py" | grep -P "^${lang}\t" | cut -f2 | - ${COMPRESSION_CMD} >"${output_prefix}.${lang}.langid.${ARTIFACT_EXT}" + zstdmt >"${output_prefix}.${lang}.langid.zst" ###################################################################### echo "### Rule-based filtering" -${COMPRESSION_CMD} -dc "${output_prefix}.${lang}.langid.${ARTIFACT_EXT}" | +zstdmt -dc "${output_prefix}.${lang}.langid.zst" | parallel --no-notice --pipe -k -j "${threads}" --block 50M \ "python3 tools/clean_mono.py -l ${lang} --debug" \ 2>"${output_prefix}.${lang}.clean.debug.txt" | -${COMPRESSION_CMD} >"${output_prefix}.${lang}.${ARTIFACT_EXT}" +zstdmt >"${output_prefix}.${lang}.zst" -test -s "${output_prefix}.${lang}.${ARTIFACT_EXT}" || exit 1 +test -s "${output_prefix}.${lang}.zst" || exit 1 ###################################################################### echo "### Remove data from intermediate steps" -rm -rf "${output_prefix}".*.nrm.${ARTIFACT_EXT} "${output_prefix}".*.langid.${ARTIFACT_EXT} \ - "${output_prefix}".*.monofix.${ARTIFACT_EXT} +rm -rf "${output_prefix}".*.nrm.zst "${output_prefix}".*.langid.zst \ + "${output_prefix}".*.monofix.zst echo "### Rule-based cleaning log written to: ${output_prefix}.${lang}.clean.debug.txt" -echo "### Clean data is written to: ${output_prefix}.${lang}.${ARTIFACT_EXT}" +echo "### Clean data is written to: ${output_prefix}.${lang}.zst" echo "###### Done: Cleaning monolingual data" diff --git a/pipeline/clean/merge-corpus.sh b/pipeline/clean/merge-corpus.sh index f6394fc68..c6985b997 100755 --- a/pipeline/clean/merge-corpus.sh +++ b/pipeline/clean/merge-corpus.sh @@ -15,30 +15,27 @@ test -v BIN output_prefix=$1 inputs=( "${@:2}" ) -COMPRESSION_CMD="${COMPRESSION_CMD:-pigz}" -ARTIFACT_EXT="${ARTIFACT_EXT:-gz}" - tmp="${output_prefix}/merge" mkdir -p "${tmp}" echo "### Merging" -if [[ "${inputs[0]}" == *.${ARTIFACT_EXT} ]]; then - cat `echo ${inputs[@]} | tr ' ' '\n' | grep "${SRC}.${ARTIFACT_EXT}" | tr '\n' ' '` >"${tmp}/corpus.${SRC}.dup.${ARTIFACT_EXT}" - cat `echo ${inputs[@]} | tr ' ' '\n' | grep "${TRG}.${ARTIFACT_EXT}" | tr '\n' ' '` >"${tmp}/corpus.${TRG}.dup.${ARTIFACT_EXT}" +if [[ "${inputs[0]}" == *.zst ]]; then + cat `echo ${inputs[@]} | tr ' ' '\n' | grep "${SRC}.zst" | tr '\n' ' '` >"${tmp}/corpus.${SRC}.dup.zst" + cat `echo ${inputs[@]} | tr ' ' '\n' | grep "${TRG}.zst" | tr '\n' ' '` >"${tmp}/corpus.${TRG}.dup.zst" else - cat "${inputs[@]/%/.${SRC}.${ARTIFACT_EXT}}" >"${tmp}/corpus.${SRC}.dup.${ARTIFACT_EXT}" - cat "${inputs[@]/%/.${TRG}.${ARTIFACT_EXT}}" >"${tmp}/corpus.${TRG}.dup.${ARTIFACT_EXT}" + cat "${inputs[@]/%/.${SRC}.zst}" >"${tmp}/corpus.${SRC}.dup.zst" + cat "${inputs[@]/%/.${TRG}.zst}" >"${tmp}/corpus.${TRG}.dup.zst" fi # See pipeline/translate/merge-corpus.sh for more information on the deduplication step. echo "### Deduplication" -paste <(${COMPRESSION_CMD} -dc "${tmp}/corpus.${SRC}.dup.${ARTIFACT_EXT}") <(${COMPRESSION_CMD} -dc "${tmp}/corpus.${TRG}.dup.${ARTIFACT_EXT}") | +paste <(zstdmt -dc "${tmp}/corpus.${SRC}.dup.zst") <(zstdmt -dc "${tmp}/corpus.${TRG}.dup.zst") | ${BIN}/dedupe | -${COMPRESSION_CMD} >"${tmp}.${SRC}${TRG}.${ARTIFACT_EXT}" +zstdmt >"${tmp}.${SRC}${TRG}.zst" -${COMPRESSION_CMD} -dc "${tmp}.${SRC}${TRG}.${ARTIFACT_EXT}" | cut -f1 | ${COMPRESSION_CMD} > "${output_prefix}.${SRC}.${ARTIFACT_EXT}" -${COMPRESSION_CMD} -dc "${tmp}.${SRC}${TRG}.${ARTIFACT_EXT}" | cut -f2 | ${COMPRESSION_CMD} > "${output_prefix}.${TRG}.${ARTIFACT_EXT}" +zstdmt -dc "${tmp}.${SRC}${TRG}.zst" | cut -f1 | zstdmt > "${output_prefix}.${SRC}.zst" +zstdmt -dc "${tmp}.${SRC}${TRG}.zst" | cut -f2 | zstdmt > "${output_prefix}.${TRG}.zst" rm -rf "${tmp}" diff --git a/pipeline/clean/opuscleaner/clean-corpus.sh b/pipeline/clean/opuscleaner/clean-corpus.sh index 3d128de4a..72ebaeaaa 100755 --- a/pipeline/clean/opuscleaner/clean-corpus.sh +++ b/pipeline/clean/opuscleaner/clean-corpus.sh @@ -17,9 +17,6 @@ threads=$3 dataset=$4 mode=$5 -COMPRESSION_CMD="${COMPRESSION_CMD:-pigz}" -ARTIFACT_EXT="${ARTIFACT_EXT:-gz}" - if [ "$threads" = "auto" ]; then threads=$(nproc) fi @@ -40,22 +37,22 @@ python3 generate_filters.py "${input_prefix}" "${SRC}" "${TRG}" "${dataset}" "${ test -s "${filter_path}" || exit 1 echo "### Cleaning ${input_prefix} with filter ${filter_path}" -paste <(${COMPRESSION_CMD} -dc "${input_prefix}.${SRC}.${ARTIFACT_EXT}") \ - <(${COMPRESSION_CMD} -dc "${input_prefix}.${TRG}.${ARTIFACT_EXT}") | +paste <(zstdmt -dc "${input_prefix}.${SRC}.zst") \ + <(zstdmt -dc "${input_prefix}.${TRG}.zst") | opuscleaner-clean \ --parallel ${threads} \ --batch-size=50000 \ --input=- \ "${filter_path}" "${SRC}" "${TRG}" | - tee >(cut -f1 | ${COMPRESSION_CMD} >"${output_prefix}.${SRC}.${ARTIFACT_EXT}") | - cut -f2 | ${COMPRESSION_CMD} >"${output_prefix}.${TRG}.${ARTIFACT_EXT}" + tee >(cut -f1 | zstdmt >"${output_prefix}.${SRC}.zst") | + cut -f2 | zstdmt >"${output_prefix}.${TRG}.zst" echo "### Checking length of the files" -test -s "${output_prefix}.${SRC}.${ARTIFACT_EXT}" || exit 1 -test -s "${output_prefix}.${TRG}.${ARTIFACT_EXT}" || exit 1 -new_len_src="$(${COMPRESSION_CMD} -dc "${output_prefix}.${SRC}.${ARTIFACT_EXT}" | wc -l)" -new_len_trg="$(${COMPRESSION_CMD} -dc "${output_prefix}.${TRG}.${ARTIFACT_EXT}" | wc -l)" -orig_len_src="$(${COMPRESSION_CMD} -dc "${input_prefix}.${SRC}.${ARTIFACT_EXT}" | wc -l)" +test -s "${output_prefix}.${SRC}.zst" || exit 1 +test -s "${output_prefix}.${TRG}.zst" || exit 1 +new_len_src="$(zstdmt -dc "${output_prefix}.${SRC}.zst" | wc -l)" +new_len_trg="$(zstdmt -dc "${output_prefix}.${TRG}.zst" | wc -l)" +orig_len_src="$(zstdmt -dc "${input_prefix}.${SRC}.zst" | wc -l)" [[ ${new_len_src} -ge 1 ]] || exit 1 [[ ${new_len_trg} -ge 1 ]] || exit 1 [[ "${new_len_src}" = "${new_len_trg}" ]] || exit 1 diff --git a/pipeline/common/downloads.py b/pipeline/common/downloads.py index b998d3af3..9e7db6bcf 100644 --- a/pipeline/common/downloads.py +++ b/pipeline/common/downloads.py @@ -6,7 +6,7 @@ from contextlib import ExitStack, contextmanager from io import BufferedReader from pathlib import Path -from typing import Generator, Optional, Union +from typing import Generator, Literal, Optional, Union from zipfile import ZipFile import requests @@ -485,3 +485,81 @@ def get_human_readable_file_size(location: Union[Path, str]) -> tuple[int, str]: """Get the size of a file in a human-readable string, and the numeric bytes.""" bytes = get_file_size(location) return format_bytes(bytes), bytes + + +def compress_file( + path: Union[str, Path], keep_original: bool = True, compression: Literal["zst", "gz"] = "zst" +) -> Path: + """ + Compresses a file to .zst or .gz format. It returns the path of the compressed file. + "zst" is the preferred compression scheme. + """ + path = Path(path) + + if compression == "zst": + compressed_path = Path(str(path) + ".zst") + cctx = ZstdCompressor() + with open(path, "rb") as infile: + with open(compressed_path, "wb") as outfile: + outfile.write(cctx.compress(infile.read())) + + elif compression == "gz": + compressed_path = Path(str(path) + ".gz") + with open(path, "rb") as infile: + with gzip.open(compressed_path, "wb") as outfile: + outfile.write(infile.read()) + + else: + raise ValueError(f"Unsupported compression format: {compression}") + + if not keep_original: + # Delete the original file + path.unlink() + + return compressed_path + + +def decompress_file( + path: Union[str, Path], + keep_original: bool = True, + decompressed_path: Optional[Union[str, Path]] = None, +) -> Path: + """ + Decompresses a .gz or .zst file. It returns the path of the decompressed file. + """ + path = Path(path) + + if decompressed_path: + decompressed_path = Path(decompressed_path) + else: + # Remove the original suffix + decompressed_path = path.with_suffix("") + + with ExitStack() as stack: + decompressed_file = stack.enter_context(decompressed_path.open("wb")) + + if path.suffix == ".gz": + compressed_file = stack.enter_context(gzip.open(str(path), "rb")) + decompressed_file.write(compressed_file.read()) + while True: + # Write the data out in chunks so that all of the it doesn't need to be + # into memory. + chunk = compressed_file.read(10_240) + if not chunk: + break + decompressed_file.write(chunk) + + elif path.suffix == ".zst": + compressed_file = stack.enter_context(open(path, "rb")) + for chunk in ZstdDecompressor().read_to_iter(compressed_file): + # Write the data out in chunks so that all of the it doesn't need to be + # into memory. + decompressed_file.write(chunk) + else: + raise ValueError(f"Unsupported file extension: {path.suffix}") + + if not keep_original: + # Delete the original file + path.unlink() + + return str(decompressed_path) diff --git a/pipeline/data/dataset_importer.py b/pipeline/data/dataset_importer.py index 29d563c3f..bf776df16 100755 --- a/pipeline/data/dataset_importer.py +++ b/pipeline/data/dataset_importer.py @@ -23,11 +23,11 @@ from opustrainer.modifiers.typos import TypoModifier from opustrainer.types import Modifier +from pipeline.common.downloads import compress_file, decompress_file + # these envs are standard across the pipeline SRC = os.environ["SRC"] TRG = os.environ["TRG"] -COMP_CMD = os.getenv("COMPRESSION_CMD", "pigz") -COMP_EXT = os.getenv("ARTIFACT_EXT", "gz") random.seed(1111) @@ -135,8 +135,8 @@ def augment(output_prefix: str, aug_modifer: str): # file paths for compressed and uncompressed corpus uncompressed_src = f"{output_prefix}.{SRC}" uncompressed_trg = f"{output_prefix}.{TRG}" - compressed_src = f"{output_prefix}.{SRC}.{COMP_EXT}" - compressed_trg = f"{output_prefix}.{TRG}.{COMP_EXT}" + compressed_src = f"{output_prefix}.{SRC}.zst" + compressed_trg = f"{output_prefix}.{TRG}.zst" corpus = read_corpus_tsv(compressed_src, compressed_trg, uncompressed_src, uncompressed_trg) @@ -164,13 +164,9 @@ def read_corpus_tsv( if os.path.isfile(uncompressed_trg): os.remove(uncompressed_trg) - # decompress the original corpus - run_cmd([COMP_CMD, "-d", compressed_src]) - run_cmd([COMP_CMD, "-d", compressed_trg]) - if os.path.isfile(compressed_src): - os.remove(compressed_src) - if os.path.isfile(compressed_trg): - os.remove(compressed_trg) + # Decompress the original corpus. + decompress_file(compressed_src, keep_original=False) + decompress_file(compressed_trg, keep_original=False) # Since this is only used on small evaluation sets, it's fine to load the entire dataset # and augmentation into memory rather than streaming it. @@ -196,12 +192,8 @@ def write_modified(modified: List[str], uncompressed_src: str, uncompressed_trg: f.writelines(modified_trg) # compress corpus back - run_cmd([COMP_CMD, uncompressed_src]) - run_cmd([COMP_CMD, uncompressed_trg]) - if os.path.isfile(uncompressed_src): - os.remove(uncompressed_src) - if os.path.isfile(uncompressed_trg): - os.remove(uncompressed_trg) + compress_file(uncompressed_src, keep_original=False) + compress_file(uncompressed_trg, keep_original=False) def run_import(type: str, dataset: str, output_prefix: str): diff --git a/pipeline/data/importers/corpus/flores.sh b/pipeline/data/importers/corpus/flores.sh index 73ecdf2e1..cf9b4862c 100755 --- a/pipeline/data/importers/corpus/flores.sh +++ b/pipeline/data/importers/corpus/flores.sh @@ -14,8 +14,6 @@ trg=$2 output_prefix=$3 dataset=$4 -COMPRESSION_CMD="${COMPRESSION_CMD:-pigz}" -ARTIFACT_EXT="${ARTIFACT_EXT:-gz}" WGET="${WGET:-wget}" # This can be overridden by tests. tmp="$(mktemp -d)/flores/${dataset}" @@ -41,8 +39,8 @@ flores_code() { src_flores=$(flores_code "${src}") trg_flores=$(flores_code "${trg}") -${COMPRESSION_CMD} -c "${tmp}/flores101_dataset/${dataset}/${src_flores}.${dataset}" > "${output_prefix}.${src}.${ARTIFACT_EXT}" -${COMPRESSION_CMD} -c "${tmp}/flores101_dataset/${dataset}/${trg_flores}.${dataset}" > "${output_prefix}.${trg}.${ARTIFACT_EXT}" +zstdmt -c "${tmp}/flores101_dataset/${dataset}/${src_flores}.${dataset}" > "${output_prefix}.${src}.zst" +zstdmt -c "${tmp}/flores101_dataset/${dataset}/${trg_flores}.${dataset}" > "${output_prefix}.${trg}.zst" rm -rf "${tmp}" diff --git a/pipeline/data/importers/corpus/mtdata.sh b/pipeline/data/importers/corpus/mtdata.sh index 69ce3ea01..1131623a8 100755 --- a/pipeline/data/importers/corpus/mtdata.sh +++ b/pipeline/data/importers/corpus/mtdata.sh @@ -13,9 +13,6 @@ trg=$2 output_prefix=$3 dataset=$4 -COMPRESSION_CMD="${COMPRESSION_CMD:-pigz}" -ARTIFACT_EXT="${ARTIFACT_EXT:-gz}" - tmp="$(dirname "${output_prefix}")/mtdata/${dataset}" mkdir -p "${tmp}" @@ -26,8 +23,8 @@ mtdata get -l "${src}-${trg}" -tr "${dataset}" -o "${tmp}" find "${tmp}" -cat "${tmp}/train-parts/${dataset}.${src_iso}" | ${COMPRESSION_CMD} -c > "${output_prefix}.${src}.${ARTIFACT_EXT}" -cat "${tmp}/train-parts/${dataset}.${trg_iso}" | ${COMPRESSION_CMD} -c > "${output_prefix}.${trg}.${ARTIFACT_EXT}" +cat "${tmp}/train-parts/${dataset}.${src_iso}" | zstdmt -c > "${output_prefix}.${src}.zst" +cat "${tmp}/train-parts/${dataset}.${trg_iso}" | zstdmt -c > "${output_prefix}.${trg}.zst" rm -rf "${tmp}" diff --git a/pipeline/data/importers/corpus/opus.sh b/pipeline/data/importers/corpus/opus.sh index 920531123..84116c776 100755 --- a/pipeline/data/importers/corpus/opus.sh +++ b/pipeline/data/importers/corpus/opus.sh @@ -13,8 +13,6 @@ trg=$2 output_prefix=$3 dataset=$4 -COMPRESSION_CMD="${COMPRESSION_CMD:-pigz}" -ARTIFACT_EXT="${ARTIFACT_EXT:-gz}" WGET="${WGET:-wget}" # This can be overridden by tests. name=${dataset%%/*} @@ -30,8 +28,8 @@ ${WGET} -O "${archive_path}" "https://object.pouta.csc.fi/OPUS-${dataset}/moses/ unzip -o "${archive_path}" -d "${tmp}" for lang in ${src} ${trg}; do - ${COMPRESSION_CMD} -c "${tmp}/${name}.${src}-${trg}.${lang}" > "${output_prefix}.${lang}.${ARTIFACT_EXT}" || - ${COMPRESSION_CMD} -c "${tmp}/${name}.${trg}-${src}.${lang}" > "${output_prefix}.${lang}.${ARTIFACT_EXT}" + zstdmt -c "${tmp}/${name}.${src}-${trg}.${lang}" > "${output_prefix}.${lang}.zst" || + zstdmt -c "${tmp}/${name}.${trg}-${src}.${lang}" > "${output_prefix}.${lang}.zst" done rm -rf "${tmp}" diff --git a/pipeline/data/importers/corpus/sacrebleu.sh b/pipeline/data/importers/corpus/sacrebleu.sh index ced532aac..889a58fab 100755 --- a/pipeline/data/importers/corpus/sacrebleu.sh +++ b/pipeline/data/importers/corpus/sacrebleu.sh @@ -13,9 +13,6 @@ trg=$2 output_prefix=$3 dataset=$4 -COMPRESSION_CMD="${COMPRESSION_CMD:-pigz}" -ARTIFACT_EXT="${ARTIFACT_EXT:-gz}" - { set +e @@ -23,13 +20,13 @@ ARTIFACT_EXT="${ARTIFACT_EXT:-gz}" --test-set "${dataset}" \ --language-pair "${src}-${trg}" \ --echo src \ - | ${COMPRESSION_CMD} -c > "${output_prefix}.${src}.${ARTIFACT_EXT}" + | zstdmt -c > "${output_prefix}.${src}.zst" sacrebleu \ --test-set "${dataset}" \ --language-pair "${src}-${trg}" \ --echo ref \ - | ${COMPRESSION_CMD} -c > "${output_prefix}.${trg}.${ARTIFACT_EXT}" + | zstdmt -c > "${output_prefix}.${trg}.zst" status=$? @@ -43,13 +40,13 @@ if [ $status -ne 0 ]; then --test-set "${dataset}" \ --language-pair "${trg}-${src}" \ --echo src \ - | ${COMPRESSION_CMD} -c > "${output_prefix}.${trg}.${ARTIFACT_EXT}" + | zstdmt -c > "${output_prefix}.${trg}.zst" sacrebleu \ --test-set "${dataset}" \ --language-pair "${trg}-${src}" \ --echo ref \ - | ${COMPRESSION_CMD} -c > "${output_prefix}.${src}.${ARTIFACT_EXT}" + | zstdmt -c > "${output_prefix}.${src}.zst" fi diff --git a/pipeline/eval/eval.py b/pipeline/eval/eval.py index 6c0f29498..1435060cd 100755 --- a/pipeline/eval/eval.py +++ b/pipeline/eval/eval.py @@ -51,6 +51,7 @@ from sacrebleu.metrics.bleu import BLEU, BLEUScore from sacrebleu.metrics.chrf import CHRF, CHRFScore +from pipeline.common.downloads import decompress_file from pipeline.common.logging import get_logger logger = get_logger("eval") @@ -88,11 +89,6 @@ def run_bash_oneliner(command: str): return subprocess.check_call(command, shell=True) -# De-compresses files, and pipes the result as necessary. -def decompress(path: str, compression_cmd: str, artifact_ext: str): - subprocess.check_call(f'{compression_cmd} -dc "{path}"') - - def main(args_list: Optional[list[str]] = None) -> None: parser = argparse.ArgumentParser( description=__doc__, @@ -110,14 +106,6 @@ def main(args_list: Optional[list[str]] = None) -> None: parser.add_argument("--trg", type=str, help='The target language, e.g "ca".') parser.add_argument("--marian", type=str, help="The path the to marian binaries.") parser.add_argument("--marian_config", type=str, help="The marian yaml config for the model.") - parser.add_argument( - "--compression_cmd", default="pigz", help="The name of the compression command to use." - ) - parser.add_argument( - "--artifact_ext", - default="gz", - help="The artifact extension for the compression", - ) parser.add_argument( "--quantized", action="store_true", @@ -163,9 +151,9 @@ def main(args_list: Optional[list[str]] = None) -> None: artifacts_prefix = args.artifacts_prefix artifacts_dir = os.path.dirname(artifacts_prefix) - source_file_compressed = f"{dataset_prefix}.{src}.{args.artifact_ext}" + source_file_compressed = f"{dataset_prefix}.{src}.zst" source_file = f"{artifacts_prefix}.{src}" - target_file_compressed = f"{dataset_prefix}.{trg}.{args.artifact_ext}" + target_file_compressed = f"{dataset_prefix}.{trg}.zst" target_file = f"{artifacts_prefix}.{trg}" target_ref_file = f"{artifacts_prefix}.{trg}.ref" marian_decoder = f'"{args.marian}"/marian-decoder' @@ -215,16 +203,12 @@ def main(args_list: Optional[list[str]] = None) -> None: logger.info("Save the original target sentences to the artifacts") - run_bash_oneliner( - f""" - {args.compression_cmd} -dc "{target_file_compressed}" > "{target_ref_file}" - """ - ) + decompress_file(target_file_compressed, keep_original=False, decompressed_path=target_ref_file) run_bash_oneliner( f""" - # Decompress the source file, e.g. $fetches/wmt09.en.gz - {args.compression_cmd} -dc "{source_file_compressed}" + # Decompress the source file, e.g. $fetches/wmt09.en.zst + zstdmt -dc "{source_file_compressed}" # Tee the source file into the artifacts directory, e.g. $artifacts/wmt09.en | tee "{source_file}" diff --git a/pipeline/train/spm-vocab.sh b/pipeline/train/spm-vocab.sh index dff13e2b0..1db595544 100755 --- a/pipeline/train/spm-vocab.sh +++ b/pipeline/train/spm-vocab.sh @@ -57,13 +57,11 @@ if [ "$num_threads" = "auto" ]; then num_threads=$(nproc) fi -COMPRESSION_CMD="${COMPRESSION_CMD:-pigz}" - vocab_dir=$(dirname "${vocab_output}") mkdir -p "${vocab_dir}" -${COMPRESSION_CMD} -dc "${merged_corpus_src}" >"${vocab_dir}/data.src.txt" -${COMPRESSION_CMD} -dc "${merged_corpus_trg}" >"${vocab_dir}/data.trg.txt" +zstdmt -dc "${merged_corpus_src}" >"${vocab_dir}/data.src.txt" +zstdmt -dc "${merged_corpus_trg}" >"${vocab_dir}/data.trg.txt" # The input arguments are available here: # https://github.com/google/sentencepiece/blob/master/doc/options.md diff --git a/pipeline/train/train.sh b/pipeline/train/train.sh index 2f42d1dc2..2a2d34af4 100755 --- a/pipeline/train/train.sh +++ b/pipeline/train/train.sh @@ -28,9 +28,6 @@ seed=${11} teacher_mode=${12} extra_params=( "${@:13}" ) -COMPRESSION_CMD="${COMPRESSION_CMD:-pigz}" -ARTIFACT_EXT="${ARTIFACT_EXT:-gz}" - test -v GPUS test -v MARIAN test -v WORKSPACE @@ -66,9 +63,9 @@ for index in "${!datasets[@]}"; do train_aln="${alns[index]}" echo "### Generating tsv dataset with alignments ${alignments}" - paste <(${COMPRESSION_CMD} -dc "${train_set_prefix}.${src}.${ARTIFACT_EXT}") \ - <(${COMPRESSION_CMD} -dc "${train_set_prefix}.${trg}.${ARTIFACT_EXT}") \ - <(${COMPRESSION_CMD} -dc "${train_aln}") \ + paste <(zstdmt -dc "${train_set_prefix}.${src}.zst") \ + <(zstdmt -dc "${train_set_prefix}.${trg}.zst") \ + <(zstdmt -dc "${train_aln}") \ >"${tsv_dataset}" rm "${train_aln}" # grep returns exit code 1 if there are no matches @@ -77,13 +74,13 @@ for index in "${!datasets[@]}"; do else echo "### Generating tsv dataset" # OpusTrainer supports only tsv and gzip - paste <(${COMPRESSION_CMD} -dc "${train_set_prefix}.${src}.${ARTIFACT_EXT}") \ - <(${COMPRESSION_CMD} -dc "${train_set_prefix}.${trg}.${ARTIFACT_EXT}") \ + paste <(zstdmt -dc "${train_set_prefix}.${src}.zst") \ + <(zstdmt -dc "${train_set_prefix}.${trg}.zst") \ >"${tsv_dataset}" fi # free disk space - rm "${train_set_prefix}.${src}.${ARTIFACT_EXT}" - rm "${train_set_prefix}.${trg}.${ARTIFACT_EXT}" + rm "${train_set_prefix}.${src}.zst" + rm "${train_set_prefix}.${trg}.zst" # replace the dataset path in the template in place sed -i -e "s##${tsv_dataset}#g" "${new_config}" done @@ -101,8 +98,8 @@ sed -i -e "s##${seed}#g" "${new_config}" # if the training set is a tsv, validation set also has to be a tsv echo "### Converting validation sets to tsv" valid_tsv_dataset="${valid_set_prefix}.${src}${trg}.tsv" -paste <(${COMPRESSION_CMD} -dc "${valid_set_prefix}.${src}.${ARTIFACT_EXT}") \ - <(${COMPRESSION_CMD} -dc "${valid_set_prefix}.${trg}.${ARTIFACT_EXT}") \ +paste <(zstdmt -dc "${valid_set_prefix}.${src}.zst") \ + <(zstdmt -dc "${valid_set_prefix}.${trg}.zst") \ >"${valid_tsv_dataset}" diff --git a/pipeline/translate/collect.sh b/pipeline/translate/collect.sh index 449fa4cc7..ffae6302d 100755 --- a/pipeline/translate/collect.sh +++ b/pipeline/translate/collect.sh @@ -34,19 +34,17 @@ output_path=$2 mono_path=$3 -COMPRESSION_CMD="${COMPRESSION_CMD:-pigz}" - echo "### Collecting translations" find "${chunks_dir}" -name '*.out' | # For example, finds "fetches/file.1.out", "fetches/file.2.out", etc. sort -t '.' -k2,2n | # Sort by the number in "file.1.out", e.g. 1 here. xargs cat | # Combine all of these files together. - ${COMPRESSION_CMD} >"${output_path}" + zstdmt >"${output_path}" echo "### Comparing number of sentences in source and artificial target files" -src_len=$(${COMPRESSION_CMD} -dc "${mono_path}" | wc -l) -trg_len=$(${COMPRESSION_CMD} -dc "${output_path}" | wc -l) +src_len=$(zstdmt -dc "${mono_path}" | wc -l) +trg_len=$(zstdmt -dc "${output_path}" | wc -l) if [ "${src_len}" != "${trg_len}" ]; then echo "### Error: length of ${mono_path} ${src_len} is different from ${output_path} ${trg_len}" diff --git a/pipeline/translate/merge-corpus.sh b/pipeline/translate/merge-corpus.sh index 7552ad9b5..b76159859 100755 --- a/pipeline/translate/merge-corpus.sh +++ b/pipeline/translate/merge-corpus.sh @@ -29,14 +29,11 @@ trg2=$4 res_src=$5 res_trg=$6 -COMPRESSION_CMD="${COMPRESSION_CMD:-pigz}" -ARTIFACT_EXT="${ARTIFACT_EXT:-gz}" - tmp_dir="$(dirname "${res_src}")/tmp" mkdir -p "${tmp_dir}" -cat <(${COMPRESSION_CMD} -dc "${src1}") <(${COMPRESSION_CMD} -dc "${src2}") | ${COMPRESSION_CMD} >"${tmp_dir}/original.src.${ARTIFACT_EXT}" -cat <(${COMPRESSION_CMD} -dc "${trg1}") <(${COMPRESSION_CMD} -dc "${trg2}") | ${COMPRESSION_CMD} >"${tmp_dir}/original.trg.${ARTIFACT_EXT}" +cat <(zstdmt -dc "${src1}") <(zstdmt -dc "${src2}") | zstdmt >"${tmp_dir}/original.src.zst" +cat <(zstdmt -dc "${trg1}") <(zstdmt -dc "${trg2}") | zstdmt >"${tmp_dir}/original.trg.zst" # De-duplicating uses dedupe from: https://github.com/kpu/preprocess # @@ -53,16 +50,16 @@ cat <(${COMPRESSION_CMD} -dc "${trg1}") <(${COMPRESSION_CMD} -dc "${trg2}") | ${ # Deduplicate parallel data, removing if either side is non-unique ./bin/dedupe -p in_en in_fr out_en out_fr echo "#### Deduplicating" -paste <(${COMPRESSION_CMD} -dc "${tmp_dir}/original.src.${ARTIFACT_EXT}") <(${COMPRESSION_CMD} -dc "${tmp_dir}/original.trg.${ARTIFACT_EXT}") | +paste <(zstdmt -dc "${tmp_dir}/original.src.zst") <(zstdmt -dc "${tmp_dir}/original.trg.zst") | shuf --random-source=<(get_seeded_random 42) | ${BIN}/dedupe | - ${COMPRESSION_CMD} > "${tmp_dir}/all.${ARTIFACT_EXT}" + zstdmt > "${tmp_dir}/all.zst" -${COMPRESSION_CMD} -dc "${tmp_dir}/all.${ARTIFACT_EXT}" | cut -f1 | ${COMPRESSION_CMD} > "${res_src}" -${COMPRESSION_CMD} -dc "${tmp_dir}/all.${ARTIFACT_EXT}" | cut -f2 | ${COMPRESSION_CMD} > "${res_trg}" +zstdmt -dc "${tmp_dir}/all.zst" | cut -f1 | zstdmt > "${res_src}" +zstdmt -dc "${tmp_dir}/all.zst" | cut -f2 | zstdmt > "${res_trg}" -src_len=$(${COMPRESSION_CMD} -dc "${res_src}" | wc -l) -trg_len=$(${COMPRESSION_CMD} -dc "${res_trg}" | wc -l) +src_len=$(zstdmt -dc "${res_src}" | wc -l) +trg_len=$(zstdmt -dc "${res_trg}" | wc -l) if [ "${src_len}" != "${trg_len}" ]; then echo "Error: length of ${res_src} ${src_len} is different from ${res_trg} ${trg_len}" exit 1 diff --git a/pipeline/translate/requirements/splitter.in b/pipeline/translate/requirements/splitter.in new file mode 100644 index 000000000..2c24336eb --- /dev/null +++ b/pipeline/translate/requirements/splitter.in @@ -0,0 +1 @@ +requests==2.31.0 diff --git a/pipeline/translate/requirements/splitter.txt b/pipeline/translate/requirements/splitter.txt new file mode 100644 index 000000000..1f9608082 --- /dev/null +++ b/pipeline/translate/requirements/splitter.txt @@ -0,0 +1,114 @@ +# +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: +# +# pip-compile --allow-unsafe --generate-hashes pipeline/translate/requirements/splitter.in +# +certifi==2024.7.4 \ + --hash=sha256:5a1e7645bc0ec61a09e26c36f6106dd4cf40c6db3a1fb6352b0244e7fb057c7b \ + --hash=sha256:c198e21b1289c2ab85ee4e67bb4b4ef3ead0892059901a8d5b622f24a1101e90 + # via requests +charset-normalizer==3.3.2 \ + --hash=sha256:06435b539f889b1f6f4ac1758871aae42dc3a8c0e24ac9e60c2384973ad73027 \ + --hash=sha256:06a81e93cd441c56a9b65d8e1d043daeb97a3d0856d177d5c90ba85acb3db087 \ + --hash=sha256:0a55554a2fa0d408816b3b5cedf0045f4b8e1a6065aec45849de2d6f3f8e9786 \ + --hash=sha256:0b2b64d2bb6d3fb9112bafa732def486049e63de9618b5843bcdd081d8144cd8 \ + --hash=sha256:10955842570876604d404661fbccbc9c7e684caf432c09c715ec38fbae45ae09 \ + --hash=sha256:122c7fa62b130ed55f8f285bfd56d5f4b4a5b503609d181f9ad85e55c89f4185 \ + --hash=sha256:1ceae2f17a9c33cb48e3263960dc5fc8005351ee19db217e9b1bb15d28c02574 \ + --hash=sha256:1d3193f4a680c64b4b6a9115943538edb896edc190f0b222e73761716519268e \ + --hash=sha256:1f79682fbe303db92bc2b1136016a38a42e835d932bab5b3b1bfcfbf0640e519 \ + --hash=sha256:2127566c664442652f024c837091890cb1942c30937add288223dc895793f898 \ + --hash=sha256:22afcb9f253dac0696b5a4be4a1c0f8762f8239e21b99680099abd9b2b1b2269 \ + --hash=sha256:25baf083bf6f6b341f4121c2f3c548875ee6f5339300e08be3f2b2ba1721cdd3 \ + --hash=sha256:2e81c7b9c8979ce92ed306c249d46894776a909505d8f5a4ba55b14206e3222f \ + --hash=sha256:3287761bc4ee9e33561a7e058c72ac0938c4f57fe49a09eae428fd88aafe7bb6 \ + --hash=sha256:34d1c8da1e78d2e001f363791c98a272bb734000fcef47a491c1e3b0505657a8 \ + --hash=sha256:37e55c8e51c236f95b033f6fb391d7d7970ba5fe7ff453dad675e88cf303377a \ + --hash=sha256:3d47fa203a7bd9c5b6cee4736ee84ca03b8ef23193c0d1ca99b5089f72645c73 \ + --hash=sha256:3e4d1f6587322d2788836a99c69062fbb091331ec940e02d12d179c1d53e25fc \ + --hash=sha256:42cb296636fcc8b0644486d15c12376cb9fa75443e00fb25de0b8602e64c1714 \ + --hash=sha256:45485e01ff4d3630ec0d9617310448a8702f70e9c01906b0d0118bdf9d124cf2 \ + --hash=sha256:4a78b2b446bd7c934f5dcedc588903fb2f5eec172f3d29e52a9096a43722adfc \ + --hash=sha256:4ab2fe47fae9e0f9dee8c04187ce5d09f48eabe611be8259444906793ab7cbce \ + --hash=sha256:4d0d1650369165a14e14e1e47b372cfcb31d6ab44e6e33cb2d4e57265290044d \ + --hash=sha256:549a3a73da901d5bc3ce8d24e0600d1fa85524c10287f6004fbab87672bf3e1e \ + --hash=sha256:55086ee1064215781fff39a1af09518bc9255b50d6333f2e4c74ca09fac6a8f6 \ + --hash=sha256:572c3763a264ba47b3cf708a44ce965d98555f618ca42c926a9c1616d8f34269 \ + --hash=sha256:573f6eac48f4769d667c4442081b1794f52919e7edada77495aaed9236d13a96 \ + --hash=sha256:5b4c145409bef602a690e7cfad0a15a55c13320ff7a3ad7ca59c13bb8ba4d45d \ + --hash=sha256:6463effa3186ea09411d50efc7d85360b38d5f09b870c48e4600f63af490e56a \ + --hash=sha256:65f6f63034100ead094b8744b3b97965785388f308a64cf8d7c34f2f2e5be0c4 \ + --hash=sha256:663946639d296df6a2bb2aa51b60a2454ca1cb29835324c640dafb5ff2131a77 \ + --hash=sha256:6897af51655e3691ff853668779c7bad41579facacf5fd7253b0133308cf000d \ + --hash=sha256:68d1f8a9e9e37c1223b656399be5d6b448dea850bed7d0f87a8311f1ff3dabb0 \ + --hash=sha256:6ac7ffc7ad6d040517be39eb591cac5ff87416c2537df6ba3cba3bae290c0fed \ + --hash=sha256:6b3251890fff30ee142c44144871185dbe13b11bab478a88887a639655be1068 \ + --hash=sha256:6c4caeef8fa63d06bd437cd4bdcf3ffefe6738fb1b25951440d80dc7df8c03ac \ + --hash=sha256:6ef1d82a3af9d3eecdba2321dc1b3c238245d890843e040e41e470ffa64c3e25 \ + --hash=sha256:753f10e867343b4511128c6ed8c82f7bec3bd026875576dfd88483c5c73b2fd8 \ + --hash=sha256:7cd13a2e3ddeed6913a65e66e94b51d80a041145a026c27e6bb76c31a853c6ab \ + --hash=sha256:7ed9e526742851e8d5cc9e6cf41427dfc6068d4f5a3bb03659444b4cabf6bc26 \ + --hash=sha256:7f04c839ed0b6b98b1a7501a002144b76c18fb1c1850c8b98d458ac269e26ed2 \ + --hash=sha256:802fe99cca7457642125a8a88a084cef28ff0cf9407060f7b93dca5aa25480db \ + --hash=sha256:80402cd6ee291dcb72644d6eac93785fe2c8b9cb30893c1af5b8fdd753b9d40f \ + --hash=sha256:8465322196c8b4d7ab6d1e049e4c5cb460d0394da4a27d23cc242fbf0034b6b5 \ + --hash=sha256:86216b5cee4b06df986d214f664305142d9c76df9b6512be2738aa72a2048f99 \ + --hash=sha256:87d1351268731db79e0f8e745d92493ee2841c974128ef629dc518b937d9194c \ + --hash=sha256:8bdb58ff7ba23002a4c5808d608e4e6c687175724f54a5dade5fa8c67b604e4d \ + --hash=sha256:8c622a5fe39a48f78944a87d4fb8a53ee07344641b0562c540d840748571b811 \ + --hash=sha256:8d756e44e94489e49571086ef83b2bb8ce311e730092d2c34ca8f7d925cb20aa \ + --hash=sha256:8f4a014bc36d3c57402e2977dada34f9c12300af536839dc38c0beab8878f38a \ + --hash=sha256:9063e24fdb1e498ab71cb7419e24622516c4a04476b17a2dab57e8baa30d6e03 \ + --hash=sha256:90d558489962fd4918143277a773316e56c72da56ec7aa3dc3dbbe20fdfed15b \ + --hash=sha256:923c0c831b7cfcb071580d3f46c4baf50f174be571576556269530f4bbd79d04 \ + --hash=sha256:95f2a5796329323b8f0512e09dbb7a1860c46a39da62ecb2324f116fa8fdc85c \ + --hash=sha256:96b02a3dc4381e5494fad39be677abcb5e6634bf7b4fa83a6dd3112607547001 \ + --hash=sha256:9f96df6923e21816da7e0ad3fd47dd8f94b2a5ce594e00677c0013018b813458 \ + --hash=sha256:a10af20b82360ab00827f916a6058451b723b4e65030c5a18577c8b2de5b3389 \ + --hash=sha256:a50aebfa173e157099939b17f18600f72f84eed3049e743b68ad15bd69b6bf99 \ + --hash=sha256:a981a536974bbc7a512cf44ed14938cf01030a99e9b3a06dd59578882f06f985 \ + --hash=sha256:a9a8e9031d613fd2009c182b69c7b2c1ef8239a0efb1df3f7c8da66d5dd3d537 \ + --hash=sha256:ae5f4161f18c61806f411a13b0310bea87f987c7d2ecdbdaad0e94eb2e404238 \ + --hash=sha256:aed38f6e4fb3f5d6bf81bfa990a07806be9d83cf7bacef998ab1a9bd660a581f \ + --hash=sha256:b01b88d45a6fcb69667cd6d2f7a9aeb4bf53760d7fc536bf679ec94fe9f3ff3d \ + --hash=sha256:b261ccdec7821281dade748d088bb6e9b69e6d15b30652b74cbbac25e280b796 \ + --hash=sha256:b2b0a0c0517616b6869869f8c581d4eb2dd83a4d79e0ebcb7d373ef9956aeb0a \ + --hash=sha256:b4a23f61ce87adf89be746c8a8974fe1c823c891d8f86eb218bb957c924bb143 \ + --hash=sha256:bd8f7df7d12c2db9fab40bdd87a7c09b1530128315d047a086fa3ae3435cb3a8 \ + --hash=sha256:beb58fe5cdb101e3a055192ac291b7a21e3b7ef4f67fa1d74e331a7f2124341c \ + --hash=sha256:c002b4ffc0be611f0d9da932eb0f704fe2602a9a949d1f738e4c34c75b0863d5 \ + --hash=sha256:c083af607d2515612056a31f0a8d9e0fcb5876b7bfc0abad3ecd275bc4ebc2d5 \ + --hash=sha256:c180f51afb394e165eafe4ac2936a14bee3eb10debc9d9e4db8958fe36afe711 \ + --hash=sha256:c235ebd9baae02f1b77bcea61bce332cb4331dc3617d254df3323aa01ab47bd4 \ + --hash=sha256:cd70574b12bb8a4d2aaa0094515df2463cb429d8536cfb6c7ce983246983e5a6 \ + --hash=sha256:d0eccceffcb53201b5bfebb52600a5fb483a20b61da9dbc885f8b103cbe7598c \ + --hash=sha256:d965bba47ddeec8cd560687584e88cf699fd28f192ceb452d1d7ee807c5597b7 \ + --hash=sha256:db364eca23f876da6f9e16c9da0df51aa4f104a972735574842618b8c6d999d4 \ + --hash=sha256:ddbb2551d7e0102e7252db79ba445cdab71b26640817ab1e3e3648dad515003b \ + --hash=sha256:deb6be0ac38ece9ba87dea880e438f25ca3eddfac8b002a2ec3d9183a454e8ae \ + --hash=sha256:e06ed3eb3218bc64786f7db41917d4e686cc4856944f53d5bdf83a6884432e12 \ + --hash=sha256:e27ad930a842b4c5eb8ac0016b0a54f5aebbe679340c26101df33424142c143c \ + --hash=sha256:e537484df0d8f426ce2afb2d0f8e1c3d0b114b83f8850e5f2fbea0e797bd82ae \ + --hash=sha256:eb00ed941194665c332bf8e078baf037d6c35d7c4f3102ea2d4f16ca94a26dc8 \ + --hash=sha256:eb6904c354526e758fda7167b33005998fb68c46fbc10e013ca97f21ca5c8887 \ + --hash=sha256:eb8821e09e916165e160797a6c17edda0679379a4be5c716c260e836e122f54b \ + --hash=sha256:efcb3f6676480691518c177e3b465bcddf57cea040302f9f4e6e191af91174d4 \ + --hash=sha256:f27273b60488abe721a075bcca6d7f3964f9f6f067c8c4c605743023d7d3944f \ + --hash=sha256:f30c3cb33b24454a82faecaf01b19c18562b1e89558fb6c56de4d9118a032fd5 \ + --hash=sha256:fb69256e180cb6c8a894fee62b3afebae785babc1ee98b81cdf68bbca1987f33 \ + --hash=sha256:fd1abc0d89e30cc4e02e4064dc67fcc51bd941eb395c502aac3ec19fab46b519 \ + --hash=sha256:ff8fa367d09b717b2a17a052544193ad76cd49979c805768879cb63d9ca50561 + # via requests +idna==3.7 \ + --hash=sha256:028ff3aadf0609c1fd278d8ea3089299412a7a8b9bd005dd08b9f8285bcb5cfc \ + --hash=sha256:82fee1fc78add43492d3a1898bfa6d8a904cc97d8427f683ed8e798d07761aa0 + # via requests +requests==2.31.0 \ + --hash=sha256:58cd2187c01e70e6e26505bca751777aa9f2ee0b7f4300988b709f44e013003f \ + --hash=sha256:942c5a758f98d790eaed1a29cb6eefc7ffb0d1cf7af05c3d2791656dbd6ad1e1 + # via -r pipeline/translate/requirements/splitter.in +urllib3==2.2.2 \ + --hash=sha256:a448b2f64d686155468037e1ace9f2d2199776e17f0a46610480d311f73e3472 \ + --hash=sha256:dd505485549a7a552833da5e6063639d0d177c04f23bc3864e41e5dc5f612168 + # via requests diff --git a/pipeline/translate/splitter.py b/pipeline/translate/splitter.py index 9702a8e25..f8eff26df 100644 --- a/pipeline/translate/splitter.py +++ b/pipeline/translate/splitter.py @@ -5,7 +5,6 @@ Example: python splitter.py \ --output_dir=test_data \ - --compression_cmd=zstd \ --num_parts=10 \ --output_suffix=.ref \ test_data/corpus.en.zst @@ -17,23 +16,14 @@ from contextlib import ExitStack from typing import Optional +from pipeline.common.downloads import compress_file -def compress(compression_cmd: str, file_path: str): - print(f"Compressing {file_path} with {compression_cmd}") - subprocess.run([compression_cmd, file_path], check=True) - # gzip and pigz remove the file by default, but zstd does not. - if os.path.isfile(file_path): - os.remove(file_path) - - -def split_file( - mono_path: str, output_dir: str, num_parts: int, compression_cmd: str, output_suffix: str = "" -): +def split_file(mono_path: str, output_dir: str, num_parts: int, output_suffix: str = ""): os.makedirs(output_dir, exist_ok=True) # Initialize the decompression command - decompress_cmd = f"{compression_cmd} -dc {mono_path}" + decompress_cmd = f"zstdmt -dc {mono_path}" # Use ExitStack to manage the cleanup of file handlers with ExitStack() as stack: @@ -59,7 +49,7 @@ def split_file( if current_line_count == 0 or current_line_count >= lines_per_part: if current_file is not None: current_file.close() - compress(compression_cmd, current_name) + compress_file(current_name, keep_original=False) current_name = f"{output_dir}/file.{file_index}{output_suffix}" current_file = stack.enter_context(open(current_name, "w")) @@ -70,8 +60,8 @@ def split_file( current_file.write(line.decode()) current_line_count += 1 - # decompress the last file after closing - compress(compression_cmd, current_name) + # Compress the last file after closing. + compress_file(current_name, keep_original=False) print("Done") @@ -84,9 +74,6 @@ def main(args: Optional[list[str]] = None) -> None: parser.add_argument("mono_path", type=str, help="Path to the compressed monolingual dataset") parser.add_argument("--output_dir", type=str, help="Output directory to store split files") parser.add_argument("--num_parts", type=int, help="Number of parts to split the file into") - parser.add_argument( - "--compression_cmd", type=str, help="Compression command (e.g., pigz, gzip, zstd)" - ) parser.add_argument( "--output_suffix", type=str, help="A suffix for output files, for example .ref", default="" ) @@ -97,7 +84,6 @@ def main(args: Optional[list[str]] = None) -> None: mono_path=parsed_args.mono_path, output_dir=parsed_args.output_dir, num_parts=parsed_args.num_parts, - compression_cmd=parsed_args.compression_cmd, output_suffix=parsed_args.output_suffix, ) diff --git a/taskcluster/kinds/bicleaner-model/kind.yml b/taskcluster/kinds/bicleaner-model/kind.yml index 57864349b..deeee7c87 100644 --- a/taskcluster/kinds/bicleaner-model/kind.yml +++ b/taskcluster/kinds/bicleaner-model/kind.yml @@ -68,7 +68,6 @@ tasks: python3 $VCS_PATH/pipeline/bicleaner/download_pack.py --src={src_locale} --trg={trg_locale} - --compression_cmd=zstdmt artifacts/bicleaner-ai-{src_locale}-{trg_locale}.tar.zst fetches: diff --git a/taskcluster/kinds/bicleaner/kind.yml b/taskcluster/kinds/bicleaner/kind.yml index de75d506e..fcb2d7c36 100644 --- a/taskcluster/kinds/bicleaner/kind.yml +++ b/taskcluster/kinds/bicleaner/kind.yml @@ -77,8 +77,6 @@ tasks: # get interpreted. CUDA_DIR: fetches/cuda-toolkit CUDNN_DIR: fetches/cuda-toolkit - COMPRESSION_CMD: zstdmt - ARTIFACT_EXT: zst # 128 happens when cloning this repository fails # 9001 is the code for when tensorflow fails to find GPUs on the system, # and biclenaer reverts to CPU time. Rather than waste time, we should diff --git a/taskcluster/kinds/cefilter/kind.yml b/taskcluster/kinds/cefilter/kind.yml index 1174f723d..e2f448915 100644 --- a/taskcluster/kinds/cefilter/kind.yml +++ b/taskcluster/kinds/cefilter/kind.yml @@ -50,8 +50,6 @@ tasks: path: /builds/worker/artifacts type: directory env: - COMPRESSION_CMD: zstdmt - ARTIFACT_EXT: zst SRC: "{src_locale}" TRG: "{trg_locale}" # 128 happens when cloning this repository fails diff --git a/taskcluster/kinds/clean-corpus/kind.yml b/taskcluster/kinds/clean-corpus/kind.yml index c553d19b7..b212cd668 100644 --- a/taskcluster/kinds/clean-corpus/kind.yml +++ b/taskcluster/kinds/clean-corpus/kind.yml @@ -74,8 +74,6 @@ tasks: TRG: "{trg_locale}" USE_OPUSCLEANER: "{use_opuscleaner}" OPUSCLEANER_MODE: "{opuscleaner_mode}" - COMPRESSION_CMD: zstdmt - ARTIFACT_EXT: zst # there might be intermittent issues with OpusCleaner, specifically FastText model downloading can fail retry-exit-status: - 1 diff --git a/taskcluster/kinds/clean-mono/kind.yml b/taskcluster/kinds/clean-mono/kind.yml index 2a23dd2c7..b7982102e 100644 --- a/taskcluster/kinds/clean-mono/kind.yml +++ b/taskcluster/kinds/clean-mono/kind.yml @@ -63,9 +63,7 @@ task-defaults: - name: public/build path: /builds/worker/artifacts type: directory - env: - COMPRESSION_CMD: zstdmt - ARTIFACT_EXT: zst + env: {} # 128 happens when cloning this repository fails retry-exit-status: [128] diff --git a/taskcluster/kinds/collect-corpus/kind.yml b/taskcluster/kinds/collect-corpus/kind.yml index 4b5945513..78dd9cda3 100644 --- a/taskcluster/kinds/collect-corpus/kind.yml +++ b/taskcluster/kinds/collect-corpus/kind.yml @@ -62,8 +62,7 @@ tasks: - name: public/build path: /builds/worker/artifacts type: directory - env: - COMPRESSION_CMD: zstdmt + env: {} # 128 happens when cloning this repository fails retry-exit-status: [128] diff --git a/taskcluster/kinds/collect-mono-src/kind.yml b/taskcluster/kinds/collect-mono-src/kind.yml index 904d919e4..13e43579f 100644 --- a/taskcluster/kinds/collect-mono-src/kind.yml +++ b/taskcluster/kinds/collect-mono-src/kind.yml @@ -43,8 +43,7 @@ task-defaults: - name: public/build path: /builds/worker/artifacts type: directory - env: - COMPRESSION_CMD: zstdmt + env: {} # 128 happens when cloning this repository fails retry-exit-status: [128] diff --git a/taskcluster/kinds/collect-mono-trg/kind.yml b/taskcluster/kinds/collect-mono-trg/kind.yml index c7cc242f6..541645df1 100644 --- a/taskcluster/kinds/collect-mono-trg/kind.yml +++ b/taskcluster/kinds/collect-mono-trg/kind.yml @@ -43,8 +43,7 @@ task-defaults: - name: public/build path: /builds/worker/artifacts type: directory - env: - COMPRESSION_CMD: zstdmt + env: {} # 128 happens when cloning this repository fails retry-exit-status: [128] diff --git a/taskcluster/kinds/dataset/kind.yml b/taskcluster/kinds/dataset/kind.yml index 3708efbd0..92a8eeba2 100644 --- a/taskcluster/kinds/dataset/kind.yml +++ b/taskcluster/kinds/dataset/kind.yml @@ -33,8 +33,6 @@ task-defaults: docker-image: {in-tree: toolchain-build} max-run-time: 86400 env: - COMPRESSION_CMD: zstdmt - ARTIFACT_EXT: zst SRC: "{src_locale}" TRG: "{trg_locale}" artifacts: diff --git a/taskcluster/kinds/evaluate-quantized/kind.yml b/taskcluster/kinds/evaluate-quantized/kind.yml index bc5cc8d79..bc9fdccce 100644 --- a/taskcluster/kinds/evaluate-quantized/kind.yml +++ b/taskcluster/kinds/evaluate-quantized/kind.yml @@ -97,8 +97,6 @@ tasks: $VCS_PATH/pipeline/eval/eval.py --src {src_locale} --trg {trg_locale} - --compression_cmd zstdmt - --artifact_ext zst --models "$MOZ_FETCHES_DIR/model.intgemm.alphas.bin" --dataset_prefix "$MOZ_FETCHES_DIR/{dataset_sanitized}" --vocab "$MOZ_FETCHES_DIR/vocab.spm" diff --git a/taskcluster/kinds/evaluate-teacher-ensemble/kind.yml b/taskcluster/kinds/evaluate-teacher-ensemble/kind.yml index 38e9539e9..2d47e0c23 100644 --- a/taskcluster/kinds/evaluate-teacher-ensemble/kind.yml +++ b/taskcluster/kinds/evaluate-teacher-ensemble/kind.yml @@ -103,8 +103,6 @@ tasks: $VCS_PATH/pipeline/eval/eval.py --src {src_locale} --trg {trg_locale} - --compression_cmd zstdmt - --artifact_ext zst --dataset_prefix "$MOZ_FETCHES_DIR/{dataset_sanitized}" --marian_config "$MOZ_FETCHES_DIR/final.model.npz.best-{best_model}.npz.decoder.yml" --models "$MOZ_FETCHES_DIR/model*/*.npz" diff --git a/taskcluster/kinds/evaluate/kind.yml b/taskcluster/kinds/evaluate/kind.yml index 485f00312..4e0b2d030 100644 --- a/taskcluster/kinds/evaluate/kind.yml +++ b/taskcluster/kinds/evaluate/kind.yml @@ -100,8 +100,6 @@ task-defaults: sed -i -e "s,- .*artifacts,- $MOZ_FETCHES_DIR," $TASK_WORKDIR/fetches/*.yml && $VCS_PATH/pipeline/eval/eval.py {language_pair} - --compression_cmd zstdmt - --artifact_ext zst --marian_config "$MOZ_FETCHES_DIR/final.model.npz.best-{best_model}.npz.decoder.yml" --models "$MOZ_FETCHES_DIR/final.model.npz.best-{best_model}.npz" --dataset_prefix "$MOZ_FETCHES_DIR/{dataset_sanitized}" diff --git a/taskcluster/kinds/extract-best/kind.yml b/taskcluster/kinds/extract-best/kind.yml index 1572441eb..9178a7cee 100644 --- a/taskcluster/kinds/extract-best/kind.yml +++ b/taskcluster/kinds/extract-best/kind.yml @@ -66,8 +66,7 @@ tasks: - name: public/build path: /builds/worker/artifacts type: directory - env: - COMPRESSION_CMD: zstdmt + env: {} # 128 happens when cloning this repository fails retry-exit-status: [128] diff --git a/taskcluster/kinds/finetune-student/kind.yml b/taskcluster/kinds/finetune-student/kind.yml index b6329717f..7c437807a 100644 --- a/taskcluster/kinds/finetune-student/kind.yml +++ b/taskcluster/kinds/finetune-student/kind.yml @@ -66,9 +66,6 @@ tasks: # 128 happens when cloning this repository fails retry-exit-status: [17, 128] env: - ARTIFACT_EXT: zst - COMPRESSION_CMD: zstdmt - # Weight & Biases trigger WANDB_PUBLICATION: "{wandb_publication}" WANDB_AUTHOR: "{owner}" diff --git a/taskcluster/kinds/merge-corpus/kind.yml b/taskcluster/kinds/merge-corpus/kind.yml index 12c603858..d5353b1bf 100644 --- a/taskcluster/kinds/merge-corpus/kind.yml +++ b/taskcluster/kinds/merge-corpus/kind.yml @@ -52,8 +52,6 @@ task-defaults: env: SRC: "{src_locale}" TRG: "{trg_locale}" - COMPRESSION_CMD: zstdmt - ARTIFACT_EXT: zst # 128 happens when cloning this repository fails retry-exit-status: [128] diff --git a/taskcluster/kinds/merge-devset/kind.yml b/taskcluster/kinds/merge-devset/kind.yml index f27fea348..22abf10e8 100644 --- a/taskcluster/kinds/merge-devset/kind.yml +++ b/taskcluster/kinds/merge-devset/kind.yml @@ -52,8 +52,6 @@ task-defaults: env: SRC: "{src_locale}" TRG: "{trg_locale}" - COMPRESSION_CMD: zstdmt - ARTIFACT_EXT: zst # 128 happens when cloning this repository fails retry-exit-status: [128] diff --git a/taskcluster/kinds/merge-translated/kind.yml b/taskcluster/kinds/merge-translated/kind.yml index 154e9ac6a..bb61fa389 100644 --- a/taskcluster/kinds/merge-translated/kind.yml +++ b/taskcluster/kinds/merge-translated/kind.yml @@ -50,8 +50,6 @@ task-defaults: env: SRC: "{src_locale}" TRG: "{trg_locale}" - COMPRESSION_CMD: zstdmt - ARTIFACT_EXT: zst # 128 happens when cloning this repository fails retry-exit-status: [128] diff --git a/taskcluster/kinds/score/kind.yml b/taskcluster/kinds/score/kind.yml index d7c379c0f..bf7576092 100644 --- a/taskcluster/kinds/score/kind.yml +++ b/taskcluster/kinds/score/kind.yml @@ -55,7 +55,6 @@ tasks: # TODO: this needs to be updated, ideally to have the script detect # GPUs. it should _always_ be aligned with the # of GPUs on the intsance GPUS: "0 1 2 3" - ARTIFACT_EXT: zst SRC: "{src_locale}" TRG: "{trg_locale}" artifacts: diff --git a/taskcluster/kinds/shortlist/kind.yml b/taskcluster/kinds/shortlist/kind.yml index 88dd1b6e7..cdd2542ed 100644 --- a/taskcluster/kinds/shortlist/kind.yml +++ b/taskcluster/kinds/shortlist/kind.yml @@ -55,8 +55,6 @@ tasks: path: /builds/worker/artifacts type: directory env: - COMPRESSION_CMD: zstdmt - ARTIFACT_EXT: zst SRC: "{src_locale}" TRG: "{trg_locale}" # 128 happens when cloning this repository fails diff --git a/taskcluster/kinds/split-corpus/kind.yml b/taskcluster/kinds/split-corpus/kind.yml index 5d60c5589..7be08f981 100644 --- a/taskcluster/kinds/split-corpus/kind.yml +++ b/taskcluster/kinds/split-corpus/kind.yml @@ -27,6 +27,7 @@ tasks: type: split-corpus resources: - pipeline/translate/splitter.py + - pipeline/translate/requirements/splitter.txt from-parameters: split_chunks: training_config.taskcluster.split-chunks task-context: @@ -49,9 +50,7 @@ tasks: - name: public/build path: /builds/worker/artifacts type: directory - env: - COMPRESSION_CMD: zstdmt - ARTIFACT_EXT: zst + env: {} # 128 happens when cloning this repository fails retry-exit-status: [128] @@ -64,17 +63,16 @@ tasks: - bash - -c - >- + pip3 install -r $VCS_PATH/pipeline/translate/requirements/splitter.txt && export PYTHONPATH=$PYTHONPATH:$VCS_PATH && python3 $VCS_PATH/pipeline/translate/splitter.py --output_dir=$TASK_WORKDIR/artifacts --num_parts={split_chunks} - --compression_cmd=zstdmt fetches/corpus.{src_locale}.zst && python3 $VCS_PATH/pipeline/translate/splitter.py --output_dir=$TASK_WORKDIR/artifacts --num_parts={split_chunks} --output_suffix=.ref - --compression_cmd=zstdmt fetches/corpus.{trg_locale}.zst dependencies: diff --git a/taskcluster/kinds/split-mono-src/kind.yml b/taskcluster/kinds/split-mono-src/kind.yml index 462325312..3356a411f 100644 --- a/taskcluster/kinds/split-mono-src/kind.yml +++ b/taskcluster/kinds/split-mono-src/kind.yml @@ -23,6 +23,7 @@ task-defaults: type: split-mono resources: - pipeline/translate/splitter.py + - pipeline/translate/requirements/splitter.txt from-parameters: split_chunks: training_config.taskcluster.split-chunks @@ -49,8 +50,6 @@ task-defaults: type: directory env: LOCALE: "{locale}" - COMPRESSION_CMD: zstdmt - ARTIFACT_EXT: zst # 128 happens when cloning this repository fails retry-exit-status: [128] @@ -63,11 +62,11 @@ task-defaults: - bash - -c - >- + pip3 install -r $VCS_PATH/pipeline/translate/requirements/splitter.txt && export PYTHONPATH=$PYTHONPATH:$VCS_PATH && python3 $VCS_PATH/pipeline/translate/splitter.py --output_dir=$TASK_WORKDIR/artifacts --num_parts={split_chunks} - --compression_cmd=zstdmt $MOZ_FETCHES_DIR/mono.$LOCALE.zst tasks: diff --git a/taskcluster/kinds/split-mono-trg/kind.yml b/taskcluster/kinds/split-mono-trg/kind.yml index 7d168519d..37ad51514 100644 --- a/taskcluster/kinds/split-mono-trg/kind.yml +++ b/taskcluster/kinds/split-mono-trg/kind.yml @@ -23,6 +23,7 @@ task-defaults: type: split-mono resources: - pipeline/translate/splitter.py + - pipeline/translate/requirements/splitter.txt from-parameters: split_chunks: training_config.taskcluster.split-chunks @@ -49,8 +50,6 @@ task-defaults: type: directory env: LOCALE: "{locale}" - COMPRESSION_CMD: zstdmt - ARTIFACT_EXT: zst # 128 happens when cloning this repository fails retry-exit-status: [128] @@ -63,11 +62,11 @@ task-defaults: - bash - -c - >- + pip3 install -r $VCS_PATH/pipeline/translate/requirements/splitter.txt && export PYTHONPATH=$PYTHONPATH:$VCS_PATH && python3 $VCS_PATH/pipeline/translate/splitter.py --output_dir=$TASK_WORKDIR/artifacts --num_parts={split_chunks} - --compression_cmd=zstdmt $MOZ_FETCHES_DIR/mono.$LOCALE.zst diff --git a/taskcluster/kinds/train-backwards/kind.yml b/taskcluster/kinds/train-backwards/kind.yml index 4272ebb56..4631190ac 100644 --- a/taskcluster/kinds/train-backwards/kind.yml +++ b/taskcluster/kinds/train-backwards/kind.yml @@ -65,9 +65,6 @@ tasks: # train_taskcluster.py exits with 17 if a request to Taskcluster fails retry-exit-status: [17] env: - ARTIFACT_EXT: zst - COMPRESSION_CMD: zstdmt - # Weight & Biases trigger WANDB_PUBLICATION: "{wandb_publication}" WANDB_AUTHOR: "{owner}" diff --git a/taskcluster/kinds/train-student/kind.yml b/taskcluster/kinds/train-student/kind.yml index e5c915f22..43aeeb673 100644 --- a/taskcluster/kinds/train-student/kind.yml +++ b/taskcluster/kinds/train-student/kind.yml @@ -64,9 +64,6 @@ tasks: # 128 happens when cloning this repository fails retry-exit-status: [17, 128] env: - ARTIFACT_EXT: zst - COMPRESSION_CMD: zstdmt - # Weight & Biases trigger WANDB_PUBLICATION: "{wandb_publication}" WANDB_AUTHOR: "{owner}" diff --git a/taskcluster/kinds/train-teacher/kind.yml b/taskcluster/kinds/train-teacher/kind.yml index 50e2d9cb8..f300caae3 100644 --- a/taskcluster/kinds/train-teacher/kind.yml +++ b/taskcluster/kinds/train-teacher/kind.yml @@ -89,9 +89,6 @@ tasks: # 128 happens when cloning this repository fails retry-exit-status: [17, 128] env: - ARTIFACT_EXT: zst - COMPRESSION_CMD: zstdmt - # Weight & Biases trigger WANDB_PUBLICATION: "{wandb_publication}" WANDB_AUTHOR: "{owner}" diff --git a/taskcluster/kinds/train-vocab/kind.yml b/taskcluster/kinds/train-vocab/kind.yml index 26d216173..56e9504f4 100644 --- a/taskcluster/kinds/train-vocab/kind.yml +++ b/taskcluster/kinds/train-vocab/kind.yml @@ -52,8 +52,7 @@ tasks: - name: public/build path: /builds/worker/artifacts type: directory - env: - COMPRESSION_CMD: zstdmt + env: {} # 128 happens when cloning this repository fails retry-exit-status: [128] diff --git a/tests/test_alignments.py b/tests/test_alignments.py index e1c746e20..faf874687 100644 --- a/tests/test_alignments.py +++ b/tests/test_alignments.py @@ -71,8 +71,6 @@ def test_teacher_original_alignments(): env = { "TEST_ARTIFACTS": data_dir.path, "BIN": bin_dir, - "COMPRESSION_CMD": "zstd", - "ARTIFACT_EXT": "zst", "SRC": "en", "TRG": "ru", "ALN_CHUNK_LINES": "3", @@ -92,8 +90,6 @@ def test_teacher_backtranslated_alignments(): env = { "TEST_ARTIFACTS": data_dir.path, "BIN": bin_dir, - "COMPRESSION_CMD": "zstd", - "ARTIFACT_EXT": "zst", "SRC": "en", "TRG": "ru", "ALN_CHUNK_LINES": "3", @@ -117,8 +113,6 @@ def test_student_alignments(): env = { "TEST_ARTIFACTS": data_dir.path, "BIN": bin_dir, - "COMPRESSION_CMD": "zstd", - "ARTIFACT_EXT": "zst", "SRC": "en", "TRG": "ru", "ALN_CHUNK_LINES": "3", @@ -140,14 +134,12 @@ def test_student_alignments(): def test_shortlist(): data_dir = DataDir("test_shortlist") - data_dir.create_zst("corpus.en.zst", en_sample), - data_dir.create_zst("corpus.ru.zst", ru_sample), + data_dir.create_zst("corpus.en.zst", en_sample) + data_dir.create_zst("corpus.ru.zst", ru_sample) env = { "TEST_ARTIFACTS": data_dir.path, "BIN": bin_dir, "MARIAN": marian_dir, - "COMPRESSION_CMD": "zstd", - "ARTIFACT_EXT": "zst", "SRC": "en", "TRG": "ru", "ALN_CHUNK_LINES": "3", diff --git a/tests/test_bicleaner.py b/tests/test_bicleaner.py index 9bb9c0d37..ef92929fa 100644 --- a/tests/test_bicleaner.py +++ b/tests/test_bicleaner.py @@ -55,7 +55,7 @@ def test_model_download(src, trg, model_src, model_trg, init, data_dir): decompressed_path = data_dir.join(f"bicleaner-ai-{src}-{trg}") meta_path = os.path.join(decompressed_path, "metadata.yaml") - download_model([f"--src={src}", f"--trg={trg}", "--compression_cmd=zstd", target_path]) + download_model([f"--src={src}", f"--trg={trg}", target_path]) assert os.path.isfile(target_path) decompress(target_path) diff --git a/tests/test_read_lines.py b/tests/test_common_downloads.py similarity index 77% rename from tests/test_read_lines.py rename to tests/test_common_downloads.py index 96af14d5f..9e5f40966 100644 --- a/tests/test_read_lines.py +++ b/tests/test_common_downloads.py @@ -1,13 +1,14 @@ import gzip import io from http.server import HTTPServer, SimpleHTTPRequestHandler +from pathlib import Path from threading import Thread import pytest import zstandard from fixtures import DataDir -from pipeline.common.downloads import read_lines, write_lines +from pipeline.common.downloads import compress_file, decompress_file, read_lines, write_lines # Content to serve line_fixtures = [ @@ -169,3 +170,43 @@ def test_read_lines_local_multiple(): with read_lines(file_paths) as lines: assert list(lines) == [*line_fixtures, *line_fixtures, *line_fixtures] + + +def assert_matches_test_content(file_path: str): + with read_lines(file_path) as lines: + assert list(lines) == line_fixtures, f"{file_path} matches the fixtures" + + +@pytest.mark.parametrize( + "compression, keep_original", + [("gz", False), ("zst", True)], +) +def test_compress_file(compression: str, keep_original: bool): + data_dir = DataDir("test_compress_file") + + text_file = data_dir.join("text_file.txt") + compressed_file = data_dir.join(f"text_file.txt.{compression}") + + write_test_content(text_file) + + compress_file(text_file, keep_original, compression) + data_dir.print_tree() + assert Path(text_file).exists() == keep_original + assert_matches_test_content(compressed_file) + + +@pytest.mark.parametrize( + "compression, keep_original", + [("gz", False), ("zst", True)], +) +def test_decompress_file(compression: str, keep_original: bool): + data_dir = DataDir("test_compress_file") + + compressed_file = data_dir.join(f"text_file.txt.{compression}") + text_file = data_dir.join("text_file.txt") + write_test_content(compressed_file) + + decompress_file(compressed_file, keep_original, text_file) + data_dir.print_tree() + assert Path(compressed_file).exists() == keep_original + assert_matches_test_content(text_file) diff --git a/tests/test_data_importer.py b/tests/test_data_importer.py index a5c8153c3..945d5d3c8 100644 --- a/tests/test_data_importer.py +++ b/tests/test_data_importer.py @@ -6,12 +6,8 @@ SRC = "ru" TRG = "en" -ARTIFACT_EXT = "zst" -COMPRESSION_CMD = "zstd" CURRENT_FOLDER = os.path.dirname(os.path.abspath(__file__)) -os.environ["ARTIFACT_EXT"] = ARTIFACT_EXT -os.environ["COMPRESSION_CMD"] = COMPRESSION_CMD os.environ["SRC"] = SRC os.environ["TRG"] = TRG @@ -102,16 +98,14 @@ def test_basic_corpus_import(importer, dataset, data_dir): data_dir.run_task( f"dataset-{importer}-{dataset}-en-ru", env={ - "COMPRESSION_CMD": COMPRESSION_CMD, - "ARTIFACT_EXT": ARTIFACT_EXT, "WGET": os.path.join(CURRENT_FOLDER, "fixtures/wget"), "MOCKED_DOWNLOADS": get_mocked_downloads(), }, ) prefix = data_dir.join(f"artifacts/{dataset}") - output_src = f"{prefix}.ru.{ARTIFACT_EXT}" - output_trg = f"{prefix}.en.{ARTIFACT_EXT}" + output_src = f"{prefix}.ru.zst" + output_trg = f"{prefix}.en.zst" assert os.path.exists(output_src) assert os.path.exists(output_trg) @@ -136,15 +130,13 @@ def test_mono_source_import(importer, language, dataset, sort_order, data_dir): data_dir.run_task( f"dataset-{importer}-{dataset}-{language}", env={ - "COMPRESSION_CMD": COMPRESSION_CMD, - "ARTIFACT_EXT": ARTIFACT_EXT, "WGET": os.path.join(CURRENT_FOLDER, "fixtures/wget"), "MOCKED_DOWNLOADS": get_mocked_downloads(), }, ) prefix = data_dir.join(f"artifacts/{dataset}") - mono_data = f"{prefix}.{language}.{ARTIFACT_EXT}" + mono_data = f"{prefix}.{language}.zst" data_dir.print_tree() @@ -188,10 +180,10 @@ def test_specific_augmentation(params, data_dir): original_dataset = "sacrebleu_wmt19" prefix_aug = data_dir.join(dataset) prefix_original = data_dir.join(original_dataset) - output_src = f"{prefix_aug}.{SRC}.{ARTIFACT_EXT}" - output_trg = f"{prefix_aug}.{TRG}.{ARTIFACT_EXT}" - original_src = f"{prefix_original}.{SRC}.{ARTIFACT_EXT}" - original_trg = f"{prefix_original}.{TRG}.{ARTIFACT_EXT}" + output_src = f"{prefix_aug}.{SRC}.zst" + output_trg = f"{prefix_aug}.{TRG}.zst" + original_src = f"{prefix_original}.{SRC}.zst" + original_trg = f"{prefix_original}.{TRG}.zst" run_import("corpus", original_dataset, prefix_original) run_import("corpus", dataset, prefix_aug) @@ -223,10 +215,10 @@ def test_augmentation_mix(data_dir): original_dataset = "sacrebleu_wmt19" prefix = data_dir.join(dataset) prefix_original = data_dir.join(original_dataset) - output_src = f"{prefix}.{SRC}.{ARTIFACT_EXT}" - output_trg = f"{prefix}.{TRG}.{ARTIFACT_EXT}" - original_src = f"{prefix_original}.{SRC}.{ARTIFACT_EXT}" - original_trg = f"{prefix_original}.{TRG}.{ARTIFACT_EXT}" + output_src = f"{prefix}.{SRC}.zst" + output_trg = f"{prefix}.{TRG}.zst" + original_src = f"{prefix_original}.{SRC}.zst" + original_trg = f"{prefix_original}.{TRG}.zst" run_import("corpus", original_dataset, prefix_original) run_import("corpus", dataset, prefix) diff --git a/tests/test_eval.py b/tests/test_eval.py index 89ac85de4..8b53d3f02 100644 --- a/tests/test_eval.py +++ b/tests/test_eval.py @@ -90,8 +90,6 @@ def run_eval_test(params) -> None: "TEST_ARTIFACTS": data_dir.path, # Replace marian with the one in the fixtures path. "MARIAN": fixtures_path, - # This is included via the poetry install - "COMPRESSION_CMD": "zstd", "COMET_MODEL_DIR": model_path, "COMET_CPU": "1", } @@ -102,8 +100,6 @@ def run_eval_test(params) -> None: "TEST_ARTIFACTS": data_dir.path, # Replace marian with the one in the fixtures path. "BMT_MARIAN": fixtures_path, - # This is included via the poetry install - "COMPRESSION_CMD": "zstd", "COMET_MODEL_DIR": model_path, "COMET_CPU": "1", } diff --git a/tests/test_split_collect.py b/tests/test_split_collect.py index 4b18c75b9..b4e48debc 100644 --- a/tests/test_split_collect.py +++ b/tests/test_split_collect.py @@ -1,5 +1,4 @@ import glob -import os import random import shutil import string @@ -11,8 +10,6 @@ from pipeline.translate.splitter import main as split_file -COMPRESSION_CMD = "zstdmt" - @pytest.fixture(scope="function") def data_dir(): @@ -52,7 +49,6 @@ def read_file(path): def test_split_collect_mono(data_dir): - os.environ["COMPRESSION_CMD"] = COMPRESSION_CMD length = 1234 path = data_dir.join("mono.in") output = data_dir.join("mono.output") @@ -63,7 +59,6 @@ def test_split_collect_mono(data_dir): [ f"--output_dir={data_dir.path}", "--num_parts=10", - f"--compression_cmd={COMPRESSION_CMD}", f"{path}.zst", ] ) @@ -83,7 +78,6 @@ def test_split_collect_mono(data_dir): def test_split_collect_corpus(data_dir): - os.environ["COMPRESSION_CMD"] = COMPRESSION_CMD length = 1234 path_src = data_dir.join("corpus.src.in") path_trg = data_dir.join("corpus.trg.in") @@ -96,7 +90,6 @@ def test_split_collect_corpus(data_dir): [ f"--output_dir={data_dir.path}", "--num_parts=10", - f"--compression_cmd={COMPRESSION_CMD}", f"{path_src}.zst", ] ) @@ -104,7 +97,6 @@ def test_split_collect_corpus(data_dir): [ f"--output_dir={data_dir.path}", "--num_parts=10", - f"--compression_cmd={COMPRESSION_CMD}", "--output_suffix=.ref", f"{path_trg}.zst", ] diff --git a/tests/test_spm_vocab.py b/tests/test_spm_vocab.py index 49af2cfbd..f9e1a6ef9 100644 --- a/tests/test_spm_vocab.py +++ b/tests/test_spm_vocab.py @@ -21,7 +21,6 @@ def run_spm_test(arguments: list[str]) -> list[str]: env = { **os.environ, "MARIAN": fixtures_path, - "COMPRESSION_CMD": "zstd", # This allows the spm_train fixture to know where to output the vocab. "SPM_VOCAB_DATA_DIRECTORY": test_data_dir.path, } diff --git a/tests/test_training.py b/tests/test_training.py index f0d413f39..e234218ad 100644 --- a/tests/test_training.py +++ b/tests/test_training.py @@ -64,8 +64,6 @@ def alignments(data_dir, vocab, corpus): "TEST_ARTIFACTS": data_dir.path, "BIN": bin_dir, "MARIAN": marian_dir, - "COMPRESSION_CMD": "zstd", - "ARTIFACT_EXT": "zst", "SRC": "en", "TRG": "ru", } @@ -87,8 +85,6 @@ def test_train_student_mocked(alignments, data_dir): "TEST_ARTIFACTS": data_dir.path, "BIN": bin_dir, "MARIAN": fixtures_path, - "COMPRESSION_CMD": "zstd", - "ARTIFACT_EXT": "zst", "SRC": "en", "TRG": "ru", } @@ -113,8 +109,6 @@ def test_train_student(alignments, data_dir): "TEST_ARTIFACTS": data_dir.path, "BIN": bin_dir, "MARIAN": marian_dir, - "COMPRESSION_CMD": "zstd", - "ARTIFACT_EXT": "zst", "SRC": "en", "TRG": "ru", "USE_CPU": "true",