Skip to content

Commit fb7111e

Browse files
committed
fix codec initializing
1 parent 544e283 commit fb7111e

File tree

1 file changed

+40
-25
lines changed

1 file changed

+40
-25
lines changed

src/WebSockets.jl

+40-25
Original file line numberDiff line numberDiff line change
@@ -90,27 +90,18 @@ function mask!(bytes::Vector{UInt8}, mask)
9090
end
9191
return
9292
end
93-
94-
function compress(data::T) where T <: AbstractVector{UInt8}
95-
compressed = transcode(DeflateCompressor, data)
96-
push!(compressed, 0x00)
97-
return compressed
98-
end
99-
100-
function compress(data::String)
101-
compressed = transcode(DeflateCompressor, data)
102-
push!(compressed, 0x00)
103-
return String(compressed)
93+
function final_deflate_codecs(t::Tuple)
94+
CodecZlib.TranscodingStreams.finalize(t[1])
95+
CodecZlib.TranscodingStreams.finalize(t[2])
10496
end
10597

106-
function decompress(data::T) where T <: AbstractVector{UInt8}
107-
decompressed = transcode(DeflateDecompressor, append!(data, [0x00, 0x00, 0xff, 0xff, 0x03, 0x00]))
108-
return decompressed
109-
end
98+
function init_deflate_codecs()
99+
codecco = DeflateCompressor()
100+
CodecZlib.TranscodingStreams.initialize(codecco)
101+
codecde = DeflateDecompressor()
102+
CodecZlib.TranscodingStreams.initialize(codecde)
110103

111-
function decompress(data::String)
112-
decompressed = transcode(DeflateDecompressor, append!(Vector{UInt8}(data), [0x00, 0x00, 0xff, 0xff, 0x03, 0x00]))
113-
return String(decompressed)
104+
return (codecco, codecde)
114105
end
115106

116107

@@ -316,20 +307,21 @@ mutable struct WebSocket
316307
writebuffer::Vector{UInt8}
317308
readclosed::Bool
318309
writeclosed::Bool
319-
isdeflate::Bool
310+
deflate::Union{Nothing, Tuple{CodecZlib.CompressorCodec, CodecZlib.DecompressorCodec}}
320311
end
321312

322313
const DEFAULT_MAX_FRAG = 1024
323314

324315
WebSocket(io::Connection, req=Request(), resp=Response(); client::Bool=true, maxframesize::Integer=typemax(Int), maxfragmentation::Integer=DEFAULT_MAX_FRAG, isdeflate::Bool=false) =
325-
WebSocket(uuid4(), io, req, resp, maxframesize, maxfragmentation, client, UInt8[], UInt8[], false, false, isdeflate)
316+
WebSocket(uuid4(), io, req, resp, maxframesize, maxfragmentation, client, UInt8[], UInt8[], false, false, isdeflate ? init_deflate_codecs() : nothing)
326317

327318
"""
328319
WebSockets.isclosed(ws) -> Bool
329320
330321
Check whether a `WebSocket` has sent and received CLOSE frames.
331322
"""
332323
isclosed(ws::WebSocket) = ws.readclosed && ws.writeclosed
324+
isdeflate(ws::WebSocket) = !isnothing(ws.deflate)
333325

334326
# Handshake
335327
"Check whether a HTTP.Request or HTTP.Response is a websocket upgrade request/response"
@@ -534,7 +526,7 @@ function Sockets.send(ws::WebSocket, x)
534526
# so we can appropriately set the FIN bit for the last fragmented frame
535527
nextstate = iterate(x, st)
536528
while true
537-
n += writeframe(ws.io, Frame(nextstate === nothing, first ? opcode(item) : CONTINUATION, ws.client, payload(ws, item); rsv1 = first ? ws.isdeflate : false))
529+
n += writeframe(ws.io, Frame(nextstate === nothing, first ? opcode(item) : CONTINUATION, ws.client, payload(ws, item); rsv1 = first ? isdeflate(ws) : false))
538530
first = false
539531
nextstate === nothing && break
540532
item, st = nextstate
@@ -543,8 +535,8 @@ function Sockets.send(ws::WebSocket, x)
543535
else
544536
# single binary or text frame for message
545537
@label write_single_frame
546-
pl = ws.isdeflate ? compress(payload(ws, x)) : payload(ws, x)
547-
return writeframe(ws.io, Frame(true, opcode(x), ws.client, pl; rsv1=ws.isdeflate))
538+
pl = isdeflate(ws) ? compress(ws, payload(ws, x)) : payload(ws, x)
539+
return writeframe(ws.io, Frame(true, opcode(x), ws.client, pl; rsv1=isdeflate(ws)))
548540
end
549541
end
550542

@@ -620,11 +612,34 @@ function Base.close(ws::WebSocket, body::CloseFrameBody=CloseFrameBody(1000, "")
620612
@assert ws.readclosed
621613
# if we're the server, it's our job to close the underlying socket
622614
!ws.client && isopen(ws.io) && close(ws.io)
615+
final_deflate_codecs(ws.deflate)
623616
return
624617
end
625618

626619
# Receiving messages
627620

621+
function compress(ws::WebSocket, data::T) where T <: AbstractVector{UInt8}
622+
compressed = transcode(ws.deflate[1], data)
623+
push!(compressed, 0x00)
624+
return compressed
625+
end
626+
627+
function compress(ws::WebSocket, data::String)
628+
compressed = transcode(ws.deflate[1], data)
629+
push!(compressed, 0x00)
630+
return String(compressed)
631+
end
632+
633+
function decompress(ws::WebSocket, data::T) where T <: AbstractVector{UInt8}
634+
decompressed = transcode(ws.deflate[2], append!(data, [0x00, 0x00, 0xff, 0xff, 0x03, 0x00]))
635+
return decompressed
636+
end
637+
638+
function decompress(ws::WebSocket, data::String)
639+
decompressed = transcode(ws.deflate[2], append!(Vector{UInt8}(data), [0x00, 0x00, 0xff, 0xff, 0x03, 0x00]))
640+
return String(decompressed)
641+
end
642+
628643
# returns whether additional frames should be read
629644
# true if fragmented message or a ping/pong frame was handled
630645
@noinline control_len_check(len) = len > 125 && throw(WebSocketError(CloseFrameBody(1002, "Invalid length for control frame")))
@@ -686,7 +701,7 @@ function receive(ws::WebSocket)
686701
done = checkreadframe!(ws, frame)
687702
# common case of reading single non-control frame
688703
if done
689-
payload = ws.isdeflate ? decompress(frame.payload) : frame.payload
704+
payload = isdeflate(ws) ? decompress(ws, frame.payload) : frame.payload
690705
payload isa String && utf8check(payload)
691706
return payload
692707
end
@@ -704,7 +719,7 @@ function receive(ws::WebSocket)
704719
end
705720
done && break
706721
end
707-
payload = ws.isdeflate ? decompress(payload) : payload
722+
payload = isdeflate(ws) ? decompress(ws, payload) : payload
708723
payload isa String && utf8check(payload)
709724
@debugv 2 "Read message: $(payload[1:min(1024, sizeof(payload))])"
710725
return payload

0 commit comments

Comments
 (0)