Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 50 additions & 6 deletions cf_remote/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import hashlib
import os
import shutil
import glob
import sys
import re
import json
import getpass
import tempfile
import fcntl
from collections import OrderedDict
from cf_remote import log
from datetime import datetime
Expand Down Expand Up @@ -226,15 +229,56 @@ def print_progress_dot(*args):
sys.stdout.flush() # STDOUT is line-buffered


# atomic copy, see lock-free whack-a-mole algorithm
# https://www.cl.cam.ac.uk/techreports/UCAM-CL-TR-697.pdf#page=66
def copy_file(input_path, output_path):
filename = os.path.basename(input_path)
output_dir = os.path.dirname(output_path)
assert not input_path.endswith("/")

tmp_filename = ".{}.tmp".format(filename)
tmp_output_path = os.path.join(output_dir, tmp_filename)
output_filename = os.path.basename(output_path)
if not output_filename:
output_filename = os.path.basename(input_path)

shutil.copyfile(input_path, tmp_output_path)
os.rename(tmp_output_path, output_path)
output_dirname = os.path.dirname(output_path)
tmp_fd, tmp_path = tempfile.mkstemp(
".tmp", "{}-".format(output_filename), output_dirname
)

# copy input content to tmp

with open(input_path, "r") as input_file:
input_fd = input_file.fileno()

fcntl.flock(input_fd, fcntl.LOCK_SH)
shutil.copy(input_path, tmp_path)
fcntl.flock(input_fd, fcntl.LOCK_UN)

# rename tmp to tmp.mole

my_mole = "{}.mole".format(tmp_path)
os.rename(tmp_path, my_mole)
os.close(tmp_fd)

glob_pattern = "{}-*.tmp.mole".format(output_filename)
moles = glob.glob(os.path.join(output_dirname, glob_pattern))
for mole in moles:
mole = os.path.join(output_dirname, mole)
if mole == my_mole:
continue

mole_to_whack, my_mole = sorted((mole, my_mole))
try:
os.remove(mole_to_whack)
except OSError:
pass
try:
with open(output_path, "a") as output_file:
output_fd = output_file.fileno()

fcntl.flock(output_fd, fcntl.LOCK_EX)
os.rename(my_mole, output_path)
fcntl.flock(output_fd, fcntl.LOCK_UN)
except OSError:
pass


def is_different_checksum(checksum, content):
Expand Down
46 changes: 45 additions & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from cf_remote.utils import has_unescaped_character, parse_envfile
import os
import shutil
from multiprocessing import Pool
from cf_remote.utils import has_unescaped_character, parse_envfile, copy_file


def test_parse_envfile():
Expand Down Expand Up @@ -45,3 +48,44 @@ def test_has_unescaped_character():
assert not has_unescaped_character(r"\"test\"", '"')
assert has_unescaped_character(r'hello"world', '"')
assert has_unescaped_character(r'hello\""world', '"')


def copy_file_with_args(args):
src, dest = args
copy_file(src, dest)


def test_copy_file():

src_dir = "/tmp/cf-remote-test-src/"
dest_dir = "/tmp/cf-remote-test-dest/"
os.makedirs(src_dir, exist_ok=True)
os.makedirs(dest_dir, exist_ok=True)

src_file = "myfile.txt"
dest_file = "copy.txt"

src = os.path.join(src_dir, src_file)
dest = os.path.join(dest_dir, dest_file)

with open(src, "w") as f:
f.write("This is a test file for atomic copy.")

num_processes = 10

with Pool(num_processes) as copy_pool:
copy_pool.map(copy_file_with_args, [(src, dest) for _ in range(num_processes)])

content = None
try:
with open(dest, "r") as f:
content = f.read()
except:
assert False

assert content
assert content == "This is a test file for atomic copy."
assert os.listdir(dest_dir) == [dest_file]

shutil.rmtree(src_dir)
shutil.rmtree(dest_dir)