Skip to content

Commit c27e6f4

Browse files
committed
fix: parallel access to IFileStream
When using parallel write to streams, in each stream only update the corresponding written bytes.
1 parent 54775d4 commit c27e6f4

File tree

3 files changed

+153
-3
lines changed

3 files changed

+153
-3
lines changed

Source/Testably.Abstractions.Testing/FileSystem/FileStreamMock.cs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ public override int WriteTimeout
177177
private bool _isContentChanged;
178178
private bool _isDisposed;
179179
private readonly IStorageLocation _location;
180+
private long _maxWrite;
181+
private long _minWrite = long.MaxValue;
180182
private readonly FileMode _mode;
181183
private readonly FileOptions _options;
182184
private readonly MemoryStream _stream;
@@ -248,7 +250,8 @@ private FileStreamMock(MemoryStream stream,
248250

249251
throw ExceptionFactory.FileAlreadyExists(
250252
_fileSystem.Execute.Path.GetFullPath(base.Name), 17);
251-
} else if (_mode.Equals(FileMode.CreateNew))
253+
}
254+
else if (_mode.Equals(FileMode.CreateNew))
252255
{
253256
throw ExceptionFactory.FileAlreadyExists(
254257
_fileSystem.Execute.Path.GetFullPath(Name),
@@ -575,6 +578,8 @@ public override void Write(byte[] buffer, int offset, int count)
575578
}
576579

577580
_isContentChanged = true;
581+
_minWrite = Position;
582+
_maxWrite = Position + count;
578583
base.Write(buffer, offset, count);
579584
}
580585

@@ -592,6 +597,8 @@ public override void Write(ReadOnlySpan<byte> buffer)
592597
}
593598

594599
_isContentChanged = true;
600+
_minWrite = Position;
601+
_maxWrite = Position + buffer.Length;
595602
base.Write(buffer);
596603
}
597604
#endif
@@ -610,6 +617,8 @@ public override async Task WriteAsync(byte[] buffer, int offset, int count,
610617
}
611618

612619
_isContentChanged = true;
620+
_minWrite = Position;
621+
_maxWrite = Position + count;
613622
await base.WriteAsync(buffer, offset, count, cancellationToken);
614623
}
615624

@@ -628,6 +637,8 @@ public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer,
628637
}
629638

630639
_isContentChanged = true;
640+
_minWrite = Position;
641+
_maxWrite = Position + buffer.Length;
631642
await base.WriteAsync(buffer, cancellationToken);
632643
}
633644
#endif
@@ -645,6 +656,8 @@ public override void WriteByte(byte value)
645656
}
646657

647658
_isContentChanged = true;
659+
_minWrite = Position;
660+
_maxWrite = Position + 1L;
648661
base.WriteByte(value);
649662
}
650663

@@ -679,12 +692,14 @@ private void InitializeStream()
679692
else
680693
{
681694
_isContentChanged = true;
695+
_minWrite = Position;
696+
_maxWrite = Position;
682697
}
683698
}
684699

685700
private void InternalFlush()
686701
{
687-
if (!_isContentChanged)
702+
if (!_isContentChanged || Length == 0)
688703
{
689704
return;
690705
}
@@ -696,12 +711,20 @@ private void InternalFlush()
696711
_ = _stream.Read(data, 0, (int)Length);
697712
_stream.Seek(position, SeekOrigin.Begin);
698713
_container.WriteBytes(data);
714+
_minWrite = int.MaxValue;
715+
_maxWrite = 0;
699716
}
700717

701718
private void OnBytesChanged(object? sender, EventArgs e)
702719
{
703720
byte[] existingContents = _container.GetBytes();
704721
long position = _stream.Position;
722+
if (_minWrite < _maxWrite)
723+
{
724+
_stream.Position = _minWrite;
725+
_ = _stream.Read(existingContents, (int)_minWrite, (int)(_maxWrite - _minWrite));
726+
}
727+
705728
_stream.Position = 0;
706729
_stream.Write(existingContents, 0, existingContents.Length);
707730
_stream.Position = position;

Tests/Testably.Abstractions.Testing.Tests/TimeSystem/TimerMockTests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ void Act()
6969
await That(Act).Throws<ObjectDisposedException>()
7070
.Whose(x => x.Message,
7171
it => it.Satisfies(m
72-
=> m!.Contains("Cannot access a disposed object.", StringComparison.Ordinal) &&
72+
=> m != null &&
73+
m.Contains("Cannot access a disposed object.", StringComparison.Ordinal) &&
7374
m.Contains(nameof(ITimer.Change), StringComparison.Ordinal)));
7475
#endif
7576
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
using System.IO;
2+
using System.Text;
3+
4+
namespace Testably.Abstractions.Tests.FileSystem.FileStream;
5+
6+
[FileSystemTests]
7+
public partial class ParallelTests
8+
{
9+
[Theory]
10+
[AutoData]
11+
public async Task MultipleFlush_ShouldKeepLatestChanges(string path)
12+
{
13+
using (FileSystemStream stream1 = FileSystem.File.Open(path, FileMode.OpenOrCreate,
14+
FileAccess.Write, FileShare.ReadWrite))
15+
{
16+
using FileSystemStream stream2 = FileSystem.File.Open(path, FileMode.OpenOrCreate,
17+
FileAccess.Write, FileShare.ReadWrite);
18+
19+
stream2.Write(Encoding.UTF8.GetBytes("foo"), 0, 3);
20+
stream1.Write(Encoding.UTF8.GetBytes("bar"), 0, 3);
21+
22+
stream1.Flush();
23+
stream2.Flush();
24+
}
25+
26+
await That(FileSystem.File.ReadAllText(path)).IsEqualTo("foo");
27+
}
28+
29+
[Theory]
30+
[AutoData]
31+
public async Task MultipleFlush_DifferentLength_ShouldKeepAdditionalBytes(string path)
32+
{
33+
using (FileSystemStream stream1 = FileSystem.File.Open(path, FileMode.OpenOrCreate,
34+
FileAccess.Write, FileShare.ReadWrite))
35+
{
36+
using FileSystemStream stream2 = FileSystem.File.Open(path, FileMode.OpenOrCreate,
37+
FileAccess.Write, FileShare.ReadWrite);
38+
39+
stream2.Write(Encoding.UTF8.GetBytes("foo"), 0, 3);
40+
stream1.Write(Encoding.UTF8.GetBytes("barfoo"), 0, 6);
41+
42+
await That(stream1).HasLength().EqualTo(6);
43+
await That(stream2).HasLength().EqualTo(3);
44+
45+
stream1.Flush();
46+
47+
await That(stream1).HasLength().EqualTo(6);
48+
await That(stream2).HasLength().EqualTo(6);
49+
50+
stream2.Flush();
51+
52+
await That(stream1).HasLength().EqualTo(6);
53+
await That(stream2).HasLength().EqualTo(6);
54+
}
55+
56+
await That(FileSystem.File.ReadAllText(path)).IsEqualTo("foofoo");
57+
}
58+
59+
[Theory]
60+
[AutoData]
61+
public async Task MultipleFlush_DifferentPosition_ShouldKeepAdditionalBytes(string path)
62+
{
63+
FileSystem.File.WriteAllText(path, "AAAAAAAAAAAA");
64+
using (FileSystemStream stream1 = FileSystem.File.Open(path, FileMode.OpenOrCreate,
65+
FileAccess.Write, FileShare.ReadWrite))
66+
{
67+
using FileSystemStream stream2 = FileSystem.File.Open(path, FileMode.OpenOrCreate,
68+
FileAccess.Write, FileShare.ReadWrite);
69+
stream2.Position = 3;
70+
stream1.Position = 2;
71+
72+
stream2.Write(Encoding.UTF8.GetBytes("CCC"), 0, 3);
73+
stream1.Write(Encoding.UTF8.GetBytes("bbbbbb"), 0, 6);
74+
75+
stream1.Flush();
76+
stream2.Flush();
77+
}
78+
79+
await That(FileSystem.File.ReadAllText(path)).IsEqualTo("AAbCCCbbAAAA");
80+
}
81+
82+
[Theory]
83+
[AutoData]
84+
public async Task MultipleFlush_DifferentPositionWithGaps_ShouldKeepAdditionalBytes(string path)
85+
{
86+
FileSystem.File.WriteAllText(path, "AAAAAAAAAAAA");
87+
using (FileSystemStream stream1 = FileSystem.File.Open(path, FileMode.OpenOrCreate,
88+
FileAccess.Write, FileShare.ReadWrite))
89+
{
90+
using FileSystemStream stream2 = FileSystem.File.Open(path, FileMode.OpenOrCreate,
91+
FileAccess.Write, FileShare.ReadWrite);
92+
stream1.Position = 2;
93+
94+
stream2.Position = 3;
95+
stream2.Write(Encoding.UTF8.GetBytes("C"), 0, 1);
96+
stream2.Position = 5;
97+
stream2.Write(Encoding.UTF8.GetBytes("C"), 0, 1);
98+
stream1.Write(Encoding.UTF8.GetBytes("bbbbbb"), 0, 6);
99+
100+
stream1.Flush();
101+
stream2.Flush();
102+
}
103+
104+
await That(FileSystem.File.ReadAllText(path)).IsEqualTo("AAbbbCbbAAAA");
105+
}
106+
107+
[Theory]
108+
[AutoData]
109+
public async Task WriteEmpty_ShouldNotOverwrite(string path)
110+
{
111+
using (FileSystemStream stream1 = FileSystem.File.Open(path, FileMode.OpenOrCreate,
112+
FileAccess.Write, FileShare.ReadWrite))
113+
{
114+
using FileSystemStream stream2 = FileSystem.File.Open(path, FileMode.OpenOrCreate,
115+
FileAccess.Write, FileShare.ReadWrite);
116+
117+
stream2.Write(Encoding.UTF8.GetBytes(""), 0, 0);
118+
stream1.Write(Encoding.UTF8.GetBytes("barfoo"), 0, 6);
119+
120+
stream1.Flush();
121+
stream2.Flush();
122+
}
123+
124+
await That(FileSystem.File.ReadAllText(path)).IsEqualTo("barfoo");
125+
}
126+
}

0 commit comments

Comments
 (0)