Skip to content

Commit

Permalink
Remove the compression command configuration, and only use zstandard (#…
Browse files Browse the repository at this point in the history
…824)

* Remove compression options - Update all kind.yml

* Remove compression options - Update .sh files

* Remove compression options - Update .py files

* Rename test_read_lines to test_common_downloads

* Add unit tests for compress_file and decompress_file

* Add requirements files for python code that uses requests
  • Loading branch information
gregtatum authored Sep 4, 2024
1 parent 68aa0a7 commit 92c2b45
Show file tree
Hide file tree
Showing 60 changed files with 648 additions and 380 deletions.
13 changes: 6 additions & 7 deletions pipeline/alignments/align.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from pipeline.common.logging import get_logger

logger = get_logger("alignments")
COMPRESSION_CMD = "zstdmt"


class Tokenization(Enum):
Expand Down Expand Up @@ -97,15 +96,15 @@ def run(
remap(corpus_src, corpus_trg, tokenized_src, tokenized_trg, output_aln, remapped_aln)
if output_path.endswith(".zst"):
logger.info("Compressing final alignments")
subprocess.check_call([COMPRESSION_CMD, "--rm", remapped_aln])
subprocess.check_call(["zstdmt", "--rm", remapped_aln])
remapped_aln += ".zst"
shutil.move(remapped_aln, output_path)


def decompress(file_path: str):
if file_path.endswith(".zst"):
logger.info(f"Decompressing file {file_path}")
subprocess.check_call([COMPRESSION_CMD, "-d", "-f", "--rm", file_path])
subprocess.check_call(["zstdmt", "-d", "-f", "--rm", file_path])
return file_path[:-4]
return file_path

Expand Down Expand Up @@ -182,10 +181,10 @@ def symmetrize(bin: str, fwd_path: str, rev_path: str, output_path: str):
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Wrap the file with a compressor stream if it needs to be compressed
with ExitStack() as stack:
with zstandard.ZstdCompressor().stream_writer(
stack.enter_context(open(output_path, "wb"))
) if output_path.endswith(".zst") else stack.enter_context(
open(output_path, "w", encoding="utf-8")
with (
zstandard.ZstdCompressor().stream_writer(stack.enter_context(open(output_path, "wb")))
if output_path.endswith(".zst")
else stack.enter_context(open(output_path, "w", encoding="utf-8"))
) as stream:
with subprocess.Popen(
[
Expand Down
17 changes: 7 additions & 10 deletions pipeline/alignments/generate-shortlist.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ vocab_path=$2
output_dir=$3
threads=$4

COMPRESSION_CMD="${COMPRESSION_CMD:-pigz}"
ARTIFACT_EXT="${ARTIFACT_EXT:-gz}"

if [ "$threads" = "auto" ]; then
threads=$(nproc)
fi
Expand All @@ -33,16 +30,16 @@ mkdir -p "${output_dir}"
dir="${output_dir}/tmp"
mkdir -p "${dir}"

corpus_src="${corpus_prefix}.${SRC}.${ARTIFACT_EXT}"
corpus_trg="${corpus_prefix}.${TRG}.${ARTIFACT_EXT}"
corpus_src="${corpus_prefix}.${SRC}.zst"
corpus_trg="${corpus_prefix}.${TRG}.zst"


echo "### Subword segmentation with SentencePiece"
${COMPRESSION_CMD} -dc "${corpus_src}" |
zstdmt -dc "${corpus_src}" |
parallel --no-notice --pipe -k -j "${threads}" --block 50M "${MARIAN}/spm_encode" --model "${vocab_path}" \
>"${dir}/corpus.spm.${SRC}"

${COMPRESSION_CMD} -dc "${corpus_trg}" |
zstdmt -dc "${corpus_trg}" |
parallel --no-notice --pipe -k -j "${threads}" --block 50M "${MARIAN}/spm_encode" --model "${vocab_path}" \
>"${dir}/corpus.spm.${TRG}"

Expand All @@ -60,7 +57,7 @@ echo "### Creating shortlist"
"${dir}/lex.t2s"

if [ -f "${dir}/lex.s2t" ]; then
${COMPRESSION_CMD} "${dir}/lex.s2t"
zstdmt "${dir}/lex.s2t"
fi

rm "${dir}/corpus.spm.${TRG}"
Expand All @@ -69,10 +66,10 @@ rm "${output_dir}/corpus.aln"

echo "### Shortlist pruning"
"${MARIAN}/spm_export_vocab" --model="${vocab_path}" --output="${dir}/vocab.txt"
${COMPRESSION_CMD} -dc "${dir}/lex.s2t.${ARTIFACT_EXT}" |
zstdmt -dc "${dir}/lex.s2t.zst" |
grep -v NULL |
python3 "prune_shortlist.py" 100 "${dir}/vocab.txt" |
${COMPRESSION_CMD} >"${output_dir}/lex.s2t.pruned.${ARTIFACT_EXT}"
zstdmt >"${output_dir}/lex.s2t.pruned.zst"

echo "### Deleting tmp dir"
rm -rf "${dir}"
Expand Down
1 change: 1 addition & 0 deletions pipeline/alignments/requirements/alignments.in
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
eflomal==1.0.0b1
opus-fast-mosestokenizer==0.0.8.5
tqdm
requests==2.31.0
232 changes: 225 additions & 7 deletions pipeline/alignments/requirements/alignments.txt

Large diffs are not rendered by default.

33 changes: 15 additions & 18 deletions pipeline/bicleaner/bicleaner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ bicleaner_threshold=$3
threads=$4
pack_dir=$5

COMPRESSION_CMD="${COMPRESSION_CMD:-pigz}"
ARTIFACT_EXT="${ARTIFACT_EXT:-gz}"

if [ "$threads" = "auto" ]; then
threads=$(nproc)
fi
Expand All @@ -36,8 +33,8 @@ mkdir -p "${output_dir}"

if [ "${bicleaner_threshold}" == "0" ] || [ "${bicleaner_threshold}" == "0.0" ]; then
echo "Threshold is 0, skipping filtering"
cp "${corpus_prefix}.${SRC}.${ARTIFACT_EXT}" "${output_prefix}.${SRC}.${ARTIFACT_EXT}"
cp "${corpus_prefix}.${TRG}.${ARTIFACT_EXT}" "${output_prefix}.${TRG}.${ARTIFACT_EXT}"
cp "${corpus_prefix}.${SRC}.zst" "${output_prefix}.${SRC}.zst"
cp "${corpus_prefix}.${TRG}.zst" "${output_prefix}.${TRG}.zst"
else

export scol=1
Expand Down Expand Up @@ -83,32 +80,32 @@ else
}
export -f biclean
# {%} is a 1-indexed job slot number from GNU parallel. We use that as the 1-indexed offset in CUDA_VISIBLE_ARRAY
paste <(${COMPRESSION_CMD} -dc "${corpus_prefix}.${SRC}.${ARTIFACT_EXT}") <(${COMPRESSION_CMD} -dc "${corpus_prefix}.${TRG}.${ARTIFACT_EXT}") |
paste <(zstdmt -dc "${corpus_prefix}.${SRC}.zst") <(zstdmt -dc "${corpus_prefix}.${TRG}.zst") |
parallel -j ${#CUDA_VISIBLE_ARRAY[@]} --pipe -k --block 10M biclean "${pack_dir}"/*.yaml {%} |
${COMPRESSION_CMD} >"${output_prefix}.scored.${ARTIFACT_EXT}"
zstdmt >"${output_prefix}.scored.zst"
else
export BICLEANER_AI_THREADS=${threads}
paste <(${COMPRESSION_CMD} -dc "${corpus_prefix}.${SRC}.${ARTIFACT_EXT}") <(${COMPRESSION_CMD} -dc "${corpus_prefix}.${TRG}.${ARTIFACT_EXT}") |
paste <(zstdmt -dc "${corpus_prefix}.${SRC}.zst") <(zstdmt -dc "${corpus_prefix}.${TRG}.zst") |
bicleaner-ai-classify ${hardrules} --scol ${scol} --tcol ${tcol} "${threads}" - - "${pack_dir}"/*.yaml |
${COMPRESSION_CMD} >"${output_prefix}.scored.${ARTIFACT_EXT}"
zstdmt >"${output_prefix}.scored.zst"
fi

echo "### Filtering"
${COMPRESSION_CMD} -dc "${output_prefix}.scored.${ARTIFACT_EXT}" |
zstdmt -dc "${output_prefix}.scored.zst" |
awk -v threshold=${bicleaner_threshold} -F"\t" '{if ($3>threshold) {print $0}}' |
${COMPRESSION_CMD} >"${output_prefix}.best.${ARTIFACT_EXT}"
zstdmt >"${output_prefix}.best.zst"

${COMPRESSION_CMD} -dc "${output_prefix}.scored.${ARTIFACT_EXT}" |
zstdmt -dc "${output_prefix}.scored.zst" |
awk -v threshold=${bicleaner_threshold} -F"\t" '{if ($3<=threshold) {print $0}}' |
${COMPRESSION_CMD} >"${output_prefix}.filtered.${ARTIFACT_EXT}"
zstdmt >"${output_prefix}.filtered.zst"

echo "Lines before filtering: $(${COMPRESSION_CMD} -dc "${output_prefix}.scored.${ARTIFACT_EXT}" | wc -l)"
echo "Lines after filtering: $(${COMPRESSION_CMD} -dc "${output_prefix}.best.${ARTIFACT_EXT}" | wc -l)"
echo "Lines before filtering: $(zstdmt -dc "${output_prefix}.scored.zst" | wc -l)"
echo "Lines after filtering: $(zstdmt -dc "${output_prefix}.best.zst" | wc -l)"

echo "### Writing output corpus"
${COMPRESSION_CMD} -dc "${output_prefix}.best.${ARTIFACT_EXT}" |
tee >(cut -f1 | ${COMPRESSION_CMD} >"${output_prefix}.${SRC}.${ARTIFACT_EXT}") |
cut -f2 | ${COMPRESSION_CMD} >"${output_prefix}.${TRG}.${ARTIFACT_EXT}"
zstdmt -dc "${output_prefix}.best.zst" |
tee >(cut -f1 | zstdmt >"${output_prefix}.${SRC}.zst") |
cut -f2 | zstdmt >"${output_prefix}.${TRG}.zst"

# do not delete intermediate files to inspect them and tune the threshold
fi
Expand Down
28 changes: 6 additions & 22 deletions pipeline/bicleaner/download_pack.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
python download_pack.py \
--src=en \
--trg=ru \
--compression_cmd=zstd \
artifacts/bicleaner-model-en-ru.zst
"""

Expand All @@ -19,6 +18,7 @@
import tempfile
from typing import Optional

from pipeline.common.downloads import compress_file
from pipeline.common.logging import get_logger

logger = get_logger(__file__)
Expand All @@ -36,22 +36,14 @@ def _run_download(src: str, trg: str, dir: str) -> subprocess.CompletedProcess:
)


def _compress_dir(dir_path: str, compression_cmd: str) -> str:
def _compress_dir(dir_path: str) -> str:
logger.info(f"Compressing {dir_path}")
if compression_cmd not in ["gzip", "zstd", "zstdmt", "pigz"]:
raise ValueError(f"Unsupported compression tool {compression_cmd}.")

tarball_path = f"{dir_path}.tar"
with tarfile.open(tarball_path, "w") as tar:
tar.add(dir_path, arcname=os.path.basename(dir_path))

if compression_cmd in ("gzip", "pigz"):
comp_ext = ".gz"
else:
comp_ext = ".zst"

compressed_path = tarball_path + comp_ext
subprocess.run([compression_cmd, tarball_path], check=True)
compressed_path = str(compress_file(tarball_path))

return compressed_path

Expand All @@ -64,7 +56,7 @@ def check_result(result: subprocess.CompletedProcess):
result.check_returncode()


def download(src: str, trg: str, output_path: str, compression_cmd: str) -> None:
def download(src: str, trg: str, output_path: str) -> None:
tmp_dir = os.path.join(tempfile.gettempdir(), f"bicleaner-ai-{src}-{trg}")

if os.path.exists(tmp_dir):
Expand Down Expand Up @@ -105,9 +97,8 @@ def download(src: str, trg: str, output_path: str, compression_cmd: str) -> None
print(f"The model for {src}-{trg} is downloaded")

pack_path = tmp_dir
if compression_cmd:
logger.info("Compress the downloaded pack.")
pack_path = _compress_dir(pack_path, compression_cmd)
logger.info("Compress the downloaded pack.")
pack_path = _compress_dir(pack_path)

# Move to the expected path
logger.info(f"Moving {pack_path} to {output_path}")
Expand All @@ -123,12 +114,6 @@ def main(args: Optional[list[str]] = None) -> None:
)
parser.add_argument("--src", type=str, help="Source language code")
parser.add_argument("--trg", type=str, help="Target language code")
parser.add_argument(
"--compression_cmd",
type=str,
help="Compression command (eg. pigz, zstd). "
"Optional, if not provided the directory will not be compressed",
)
parser.add_argument(
"output_path",
type=str,
Expand All @@ -141,7 +126,6 @@ def main(args: Optional[list[str]] = None) -> None:
src=parsed_args.src,
trg=parsed_args.trg,
output_path=parsed_args.output_path,
compression_cmd=parsed_args.compression_cmd,
)


Expand Down
21 changes: 9 additions & 12 deletions pipeline/cefilter/ce-filter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ corpus_prefix=$1
output_prefix=$2
scores=$3

COMPRESSION_CMD="${COMPRESSION_CMD:-pigz}"
ARTIFACT_EXT="${ARTIFACT_EXT:-gz}"

cd "$(dirname "${0}")"

# Part of the data to be removed (0.05 is 5%)
Expand All @@ -26,24 +23,24 @@ tmp="${output_dir}/tmp"
mkdir -p "${tmp}"

echo "### Sorting scores"
if [ ! -s "${tmp}/sorted.${ARTIFACT_EXT}" ]; then
if [ ! -s "${tmp}/sorted.zst" ]; then
buffer_size="$(echo "$(grep MemTotal /proc/meminfo | awk '{print $2}')"*0.9 | bc | cut -f1 -d.)"
paste "${scores}" <(${COMPRESSION_CMD} -dc "${corpus_prefix}.${SRC}.${ARTIFACT_EXT}") <(${COMPRESSION_CMD} -dc "${corpus_prefix}.${TRG}.${ARTIFACT_EXT}") |
paste "${scores}" <(zstdmt -dc "${corpus_prefix}.${SRC}.zst") <(zstdmt -dc "${corpus_prefix}.${TRG}.zst") |
LC_ALL=C sort -n -k1,1 -S "${buffer_size}K" -T "${tmp}" |
${COMPRESSION_CMD} >"${tmp}/sorted.${ARTIFACT_EXT}"
zstdmt >"${tmp}/sorted.zst"
fi

echo "### Cutting the best scored corpus"
if [ ! -s "${tmp}/best.${ARTIFACT_EXT}" ]; then
lines=$(${COMPRESSION_CMD} -dc "${tmp}/sorted.${ARTIFACT_EXT}" | wc -l)
if [ ! -s "${tmp}/best.zst" ]; then
lines=$(zstdmt -dc "${tmp}/sorted.zst" | wc -l)
startline=$(echo ${lines}*${remove} | bc | cut -f1 -d.)
${COMPRESSION_CMD} -dc "${tmp}/sorted.${ARTIFACT_EXT}" | tail -n +${startline} | cut -f2,3 | ${COMPRESSION_CMD} >"${tmp}/best.${ARTIFACT_EXT}"
zstdmt -dc "${tmp}/sorted.zst" | tail -n +${startline} | cut -f2,3 | zstdmt >"${tmp}/best.zst"
fi

echo "### Writing output corpus"
${COMPRESSION_CMD} -dc "${tmp}/best.${ARTIFACT_EXT}" |
tee >(cut -f1 | ${COMPRESSION_CMD} >"${output_prefix}.${SRC}.${ARTIFACT_EXT}") |
cut -f2 | ${COMPRESSION_CMD} >"${output_prefix}.${TRG}.${ARTIFACT_EXT}"
zstdmt -dc "${tmp}/best.zst" |
tee >(cut -f1 | zstdmt >"${output_prefix}.${SRC}.zst") |
cut -f2 | zstdmt >"${output_prefix}.${TRG}.zst"

echo "### Deleting tmp dir"
rm -rf "${tmp}"
Expand Down
13 changes: 3 additions & 10 deletions pipeline/cefilter/score.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,16 @@ vocab=$2
corpus_prefix=$3
output=$4

ARTIFACT_EXT="${ARTIFACT_EXT:-gz}"

if [ "${ARTIFACT_EXT}" = "zst" ]; then
zstdmt --rm -d "${corpus_prefix}.${SRC}.${ARTIFACT_EXT}"
zstdmt --rm -d "${corpus_prefix}.${TRG}.${ARTIFACT_EXT}"
ARTIFACT_EXT=""
else
ARTIFACT_EXT=".gz"
fi
zstdmt --rm -d "${corpus_prefix}.${SRC}.zst"
zstdmt --rm -d "${corpus_prefix}.${TRG}.zst"

dir=$(dirname "${output}")
mkdir -p "${dir}"

"${MARIAN}/marian-scorer" \
--model "${model}" \
--vocabs "${vocab}" "${vocab}" \
--train-sets "${corpus_prefix}.${TRG}${ARTIFACT_EXT}" "${corpus_prefix}.${SRC}${ARTIFACT_EXT}" \
--train-sets "${corpus_prefix}.${TRG}" "${corpus_prefix}.${SRC}" \
--mini-batch 32 \
--mini-batch-words 1500 \
--maxi-batch 1000 \
Expand Down
Loading

0 comments on commit 92c2b45

Please sign in to comment.