-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathInputStream.cs
141 lines (128 loc) · 3.75 KB
/
InputStream.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using SpinHttpWorld;
using SpinHttpWorld.wit.imports.wasi.io.v0_2_1;
namespace Spin.Http;
public class InputStream : Stream
{
IStreams.InputStream stream;
int offset;
byte[]? buffer;
bool closed;
public InputStream(IStreams.InputStream stream)
{
this.stream = stream;
}
public override bool CanRead => true;
public override bool CanWrite => false;
public override bool CanSeek => false;
public override long Length => throw new NotImplementedException();
public override long Position
{
get => throw new NotImplementedException();
set => throw new NotImplementedException();
}
public new void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected override void Dispose(bool disposing)
{
stream.Dispose();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void Flush()
{
// ignore
}
public override void SetLength(long length)
{
throw new NotImplementedException();
}
public override int Read(byte[] buffer, int offset, int length)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int length)
{
throw new NotImplementedException();
}
public override async Task<int> ReadAsync(
byte[] bytes,
int offset,
int length,
CancellationToken cancellationToken
)
{
while (true)
{
if (closed)
{
return 0;
}
else if (this.buffer == null)
{
try
{
// TODO: should we add a special case to the bindings generator
// to allow passing a buffer to IStreams.InputStream.Read and
// avoid the extra copy?
var result = stream.Read(16 * 1024);
var buffer = result;
if (buffer.Length == 0)
{
await WasiEventLoop.Register(stream.Subscribe(), cancellationToken).ConfigureAwait(false);
}
else
{
this.buffer = buffer;
this.offset = 0;
}
}
catch (WitException e)
{
if (((IStreams.StreamError)e.Value).Tag == IStreams.StreamError.CLOSED)
{
closed = true;
return 0;
}
else
{
throw;
}
}
}
else
{
var min = Math.Min(this.buffer.Length - this.offset, length);
Array.Copy(this.buffer, this.offset, bytes, offset, min);
if (min < buffer.Length - this.offset)
{
this.offset += min;
}
else
{
this.buffer = null;
}
return min;
}
}
}
public override async ValueTask<int> ReadAsync(
Memory<byte> buffer,
CancellationToken cancellationToken = default
)
{
// TODO: avoid copy when possible and use ArrayPool when not
var dst = new byte[buffer.Length];
var result = await ReadAsync(dst, 0, buffer.Length, cancellationToken);
new ReadOnlySpan<byte>(dst, 0, result).CopyTo(buffer.Span);
return result;
}
}