diff --git a/include/espdy.hrl b/include/espdy.hrl index 387e98d..bc0ac54 100644 --- a/include/espdy.hrl +++ b/include/espdy.hrl @@ -1,4 +1,4 @@ --define(LOG(S,A), io:format("~p\t" ++ S ++"\n",[self()|A])). +-define(LOG(S,A), io:format(case whereis(spdy_logging) of undefined -> standard_io; LoggingPid -> LoggingPid end, "~p\t" ++ S ++"\n",[self()|A])). %% DATA FRAMES: -record(spdy_data, { diff --git a/src/espdy_session.erl b/src/espdy_session.erl index dcf60ba..afe171e 100644 --- a/src/espdy_session.erl +++ b/src/espdy_session.erl @@ -57,11 +57,6 @@ init([Socket, Transport, CBMod, Opts]) -> ok = zlib:inflateInit(Zinf), Zdef = zlib:open(), ok = zlib:deflateInit(Zdef), - %%ok = zlib:deflateInit(Z, best_compression,deflated, 15, 9, default), - case SpdyVersion of - 2 -> zlib:deflateSetDictionary(Zdef, ?HEADERS_ZLIB_DICT); - 3 -> zlib:deflateSetDictionary(Zdef, ?HEADERS_ZLIB_DICT_V3) - end, State = #state{ socket=Socket, cbmod=CBMod, transport=Transport, @@ -70,9 +65,18 @@ init([Socket, Transport, CBMod, Opts]) -> spdy_version = SpdyVersion, spdy_opts=Opts }, - ?LOG("SPDY_VERSION init v~B ~p ~p",[SpdyVersion, self(), State]), + init_deflate(State), + ?LOG("SPDY_VERSION init v:~p ~p ~p",[SpdyVersion, self(), State]), {ok, State}. +init_deflate(State) -> + Zdef = State#state.z_context_def, + case State#state.spdy_version of + 2 -> zlib:deflateSetDictionary(Zdef, ?HEADERS_ZLIB_DICT); + 3 -> zlib:deflateSetDictionary(Zdef, ?HEADERS_ZLIB_DICT_V3); + negotiate -> deferred + end. + handle_call(none_implemented, _From, State) -> Reply = ok, {reply, Reply, State}. @@ -201,6 +205,7 @@ handle_frame(#spdy_syn_stream{ version=_Version, self(), Headers, State#state.cbmod, + lookup_setting(?SETTINGS_INITIAL_WINDOW_SIZE, State), State#state.spdy_opts), %% TODO pass fin into startlink? hasflag(Flags,?DATA_FLAG_FIN) andalso espdy_stream:received_fin(Pid), @@ -222,9 +227,7 @@ handle_frame(#spdy_syn_stream{version=FrameVersion, streamid=StreamID}, State=#state{spdy_version=SessionVersion}) -> ?LOG("SYN_STREAM mismatched version, ~p -> ~p, sending stream_error", [FrameVersion, SessionVersion]), - % UNSUPPORTED_VERSION is reserved for stream recipients (i.e. the client), - % so send a generic PROTOCOL_ERROR. - stream_error(protocol_error, #stream{id=StreamID}, State), + stream_error(unsupported_version, #stream{id=StreamID}, State), State; handle_frame(#spdy_syn_reply{version=_Version, @@ -253,8 +256,7 @@ handle_frame(#spdy_syn_reply{version=_Version, handle_frame(#spdy_syn_reply{version=FrameVersion, streamid=StreamID}, State=#state{spdy_version=SessionVersion}) -> - ?LOG("SYN_REPLY mismatched version, ~p -> ~p, sending stream_error", [FrameVersion, SessionVersion]), - stream_error(unsupported_version, #stream{id=StreamID}, State), + ?LOG("SYN_REPLY mismatched version, ~p -> ~p, ignoring frame", [FrameVersion, SessionVersion]), State; handle_frame(#spdy_rst_stream{version=_Version, @@ -275,10 +277,15 @@ handle_frame(#spdy_rst_stream{version=_Version, handle_frame(#spdy_rst_stream{version=FrameVersion}, State=#state{spdy_version=SessionVersion}) -> - ?LOG("RST_STREAM mismatched version, ~p -> ~p, sending session_error", [FrameVersion, SessionVersion]), - session_error(protocol_error, State), + ?LOG("RST_STREAM mismatched version, ~p -> ~p, ignoring frame", [FrameVersion, SessionVersion]), State; +handle_frame(Settings = #spdy_settings{version=Version}, State=#state{spdy_version=negotiate, spdy_opts=Opts}) -> + ?LOG("SETTINGS mismatched version, ~p, switching versions", [Version]), + NewOpts = [{spdy_version, Version} | proplists:delete(spdy_version, Opts)], + NewState = State#state{spdy_version=Version, spdy_opts=NewOpts}, + init_deflate(NewState), + handle_frame(Settings, NewState); handle_frame(#spdy_settings{version=_Version, flags=_Flags, settings=Settings @@ -288,11 +295,9 @@ handle_frame(#spdy_settings{version=_Version, handle_frame(#spdy_settings{version=FrameVersion}, State=#state{spdy_version=SessionVersion}) -> - ?LOG("SETTINGS mismatched version, ~p -> ~p, sending session_error", [FrameVersion, SessionVersion]), - session_error(protocol_error, State), + ?LOG("SETTINGS mismatched version, ~p -> ~p, ignoring frame", [FrameVersion, SessionVersion]), State; - handle_frame(#spdy_noop{version=2}, State) -> State; @@ -302,8 +307,7 @@ handle_frame(F=#spdy_ping{version=_Version}, State=#state{spdy_version=_Version} handle_frame(#spdy_ping{version=FrameVersion}, State=#state{spdy_version=SessionVersion}) -> - ?LOG("PING mismatched version, ~p -> ~p, sending session_error", [FrameVersion, SessionVersion]), - session_error(protocol_error, State), + ?LOG("PING mismatched version, ~p -> ~p, ignoring frame", [FrameVersion, SessionVersion]), State; handle_frame(#spdy_goaway{version=_Version, @@ -317,8 +321,7 @@ handle_frame(#spdy_goaway{version=_Version, handle_frame(#spdy_goaway{version=FrameVersion}, State=#state{spdy_version=SessionVersion}) -> - ?LOG("GOAWAY mismatched version, ~p -> ~p, sending session_error", [FrameVersion, SessionVersion]), - session_error(protocol_error, State), + ?LOG("GOAWAY mismatched version, ~p -> ~p, ignoring frame", [FrameVersion, SessionVersion]), State; handle_frame(#spdy_headers{version=_Version, @@ -342,10 +345,23 @@ handle_frame(#spdy_headers{version=_Version, handle_frame(#spdy_headers{version=FrameVersion, streamid=StreamID}, State=#state{spdy_version=SessionVersion}) -> - ?LOG("HEADERS mismatched version, ~p -> ~p, sending stream_error", [FrameVersion, SessionVersion]), - stream_error(unsupported_version, #stream{id=StreamID}, State), + ?LOG("HEADERS mismatched version, ~p -> ~p, ignoring frame", [FrameVersion, SessionVersion]), State; +handle_frame(#spdy_window_update{ streamid=StreamID, + delta_size=DeltaSize}, State) -> + case lookup_stream(StreamID, State) of + undefined -> + F = #spdy_rst_stream{version=State#state.spdy_version, + streamid=StreamID, + statuscode=?INVALID_STREAM}, + socket_write(F, State), + State; + S = #stream{} -> %% TODO check stream is known to be active still? + espdy_stream:window_updated(S#stream.pid, DeltaSize), + State + end; + %% DATA FRAME: handle_frame(#spdy_data{ streamid=StreamID, flags=Flags, @@ -395,6 +411,13 @@ apply_settings(Settings, State = #state{settings=OldSettings}) -> end, OldSettings, Settings), ?LOG("SETTINGS FOR THIS SESSION: ~p",[NewSettings]), State#state{settings=NewSettings}. + +lookup_setting(Id, #state{settings=Settings}) -> + lookup_setting(Id, Settings); +lookup_setting(_Id, []) -> undefined; +lookup_setting(Id, [#spdy_setting_pair{id=Id, value=Value} | _]) -> Value; +lookup_setting(Id, [_ | Settings]) -> lookup_setting(Id, Settings). + %% STATUS CODES used by rst-stream, goaway, etc diff --git a/src/espdy_stream.erl b/src/espdy_stream.erl index 05d0192..649af56 100644 --- a/src/espdy_stream.erl +++ b/src/espdy_stream.erl @@ -12,8 +12,9 @@ -compile(export_all). %% API --export([start_link/5, send_data_fin/1, send_data/2, closed/2, received_data/2, - send_response/3, received_fin/1, send_frame/2, headers_updated/3 +-export([start_link/6, send_data_fin/1, send_data/2, closed/2, received_data/2, + send_response/3, received_fin/1, send_frame/2, headers_updated/3, + window_updated/2 ]). %% gen_server callbacks @@ -27,12 +28,13 @@ mod, mod_state, headers, + window_size, spdy_version, spdy_opts}). %% API -start_link(StreamID, Pid, Headers, Mod, Opts) -> - gen_server:start(?MODULE, [StreamID, Pid, Headers, Mod, Opts], []). +start_link(StreamID, Pid, Headers, Mod, WindowSize, Opts) -> + gen_server:start(?MODULE, [StreamID, Pid, Headers, Mod, WindowSize, Opts], []). send_data(Pid, Data) when is_pid(Pid), is_binary(Data) -> gen_server:cast(Pid, {data, Data}). @@ -58,19 +60,19 @@ send_frame(Pid, F) -> headers_updated(Pid, Delta, NewMergedHeaders) -> gen_server:cast(Pid, {headers_updated, Delta, NewMergedHeaders}). +window_updated(Pid, Delta) -> + gen_server:cast(Pid, {window_updated, Delta}). + %% gen_server callbacks -init([StreamID, Pid, Headers, Mod, Opts]) -> - self() ! init_callback, -%% Z = zlib:open(), -%% ok = zlib:deflateInit(Z), - %%ok = zlib:deflateInit(Z, best_compression,deflated, 15, 9, default), -%% zlib:deflateSetDictionary(Z, ?HEADERS_ZLIB_DICT), +init([StreamID, Pid, Headers, Mod, WindowSize, Opts]) -> + gen_server:cast(self(), init_callback), SpdyVersion = proplists:get_value(spdy_version, Opts), {ok, #state{streamid=StreamID, pid=Pid, mod=Mod, headers=Headers, + window_size=WindowSize, spdy_version=SpdyVersion, spdy_opts=Opts}}. @@ -78,9 +80,72 @@ handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. +%% Called when we got a syn_stream for this stream. +%% cb module is supposed to make and send the reply. +handle_cast(init_callback, State) -> + case (State#state.mod):init(self(), State#state.headers, State#state.spdy_opts) of + %% In this case, the callback module provides the full response + %% with no need for this process to persist for streaming the body + %% so we can terminate this process after replying. + {ok, Headers, Body} when is_list(Headers), is_binary(Body) -> + %% se re-send this as a message to ourselves, because the callback + %% module may have dispatched other frames (eg, settings) before + %% returning us this response: + send_response(self(), Headers, Body), + {noreply, State}; + %% The callback module will call msg us the send_http_response + %% (typically from within the guts of cowboy_http_req, so that + %% we can reuse the existing http API) + {ok, noreply} -> + %% TODO track state, set timeout on getting the response from CB + {noreply, State}; + {ok, noreply, ModState} -> + {noreply, State#state{mod_state=ModState}}; + %% CB module is going to stream us the body data, so we keep this process + %% alive until we get the fin packet as part of the stream. + %%%% {ok, Headers, stream, ModState} when is_list(Headers) -> + %%%% NVPairsData = encode_name_value_pairs(Headers, State#state.z_context), + %%%% StreamID = State#state.streamid, + %%%% F = #cframe{type=?SYN_REPLY, + %%%% flags=0, + %%%% length = 6 + byte_size(NVPairsData), + %%%% data= <<0:1, + %%%% StreamID:31/big-unsigned-integer, + %%%% 0:16/big-unsigned-integer, %% UNUSED + %%%% NVPairsData/binary + %%%% >>}, + %%%% espdy_session:snd(State#state.pid, StreamID, F), + %%%% {noreply, State#state{mod_state=ModState}}; + %% CB module can't respond, because request is invalid + {error, not_http} -> + StreamID = State#state.streamid, + F = #spdy_rst_stream{ streamid=StreamID, statuscode=?PROTOCOL_ERROR }, + espdy_session:snd(State#state.pid, StreamID, F), + {stop, normal, State}; + {error, ErrorStatusCode} when is_number(ErrorStatusCode) -> + StreamID = State#state.streamid, + F = #spdy_rst_stream{ streamid=StreamID, statuscode=ErrorStatusCode }, + espdy_session:snd(State#state.pid, StreamID, F), + {stop, normal, State} + end; + handle_cast({send_frame, F}, State) -> - espdy_session:snd(State#state.pid, State#state.streamid, F), - {noreply, State}; + StreamId = State#state.streamid, + FrameType = element(1, F), + F2 = case FrameType of + spdy_data -> + setelement(2, F, StreamId); + spdy_window_update -> + setelement(3, F, StreamId); + _ -> + setelement(4, F, StreamId) + end, + FullF = case FrameType of + spdy_data -> F; + _ -> setelement(2, F2, State#state.spdy_version) + end, + espdy_session:snd(State#state.pid, StreamId, FullF), + {noreply, decrement_window(F, State)}; handle_cast(received_fin, State = #state{clientclosed=true}) -> ?LOG("Got FIN but client has already closed?", []), @@ -103,24 +168,27 @@ handle_cast({headers_updated, Delta, NewMergedHeaders}, State) -> {ok, NewModState} = (State#state.mod):headers_updated(Delta, NewMergedHeaders, State#state.mod_state), {noreply, State#state{mod_state=NewModState}}; +handle_cast({window_updated, Delta}, State = #state{window_size=Size}) -> + {noreply, State#state{window_size=Size + Delta}}; + handle_cast({closed, Reason}, State) -> (State#state.mod):closed(Reason, State#state.mod_state), {stop, normal, State}; %% part of streamed body -handle_cast({data, Bin, false}, State) when is_binary(Bin) -> +handle_cast({data, Bin}, State) when is_binary(Bin) -> F = #spdy_data{ streamid = State#state.streamid, data=Bin}, espdy_session:snd(State#state.pid, State#state.streamid, F), - {noreply, State}; + {noreply, decrement_window(F, State)}; %% last of streamed body -handle_cast({data, Bin, true}, State) when is_binary(Bin) -> +handle_cast({data, fin}, State) -> F = #spdy_data{ streamid = State#state.streamid, flags=?DATA_FLAG_FIN, - data=Bin}, + data= <<"">>}, espdy_session:snd(State#state.pid, State#state.streamid, F), - NewState = State#state{serverclosed=true}, + NewState = decrement_window(F, State#state{serverclosed=true}), case both_closed(NewState) of true -> ?LOG("Both ends closed, stopping stream ~w",[State#state.streamid]), {stop, normal, NewState}; @@ -138,48 +206,9 @@ handle_cast({send_response, Headers, Body}, State) -> {noreply, NewState} end. - -%% Called when we got a syn_stream for this stream. -%% cb module is supposed to make and send the reply. -handle_info(init_callback, State) -> - case (State#state.mod):init(self(), State#state.headers, State#state.spdy_opts) of - %% In this case, the callback module provides the full response - %% with no need for this process to persist for streaming the body - %% so we can terminate this process after replying. - {ok, Headers, Body} when is_list(Headers), is_binary(Body) -> - %% se re-send this as a message to ourselves, because the callback - %% module may have dispatched other frames (eg, settings) before - %% returning us this response: - send_response(self(), Headers, Body), - {noreply, State}; - %% The callback module will call msg us the send_http_response - %% (typically from within the guts of cowboy_http_req, so that - %% we can reuse the existing http API) - {ok, noreply} -> - %% TODO track state, set timeout on getting the response from CB - {noreply, State}; - %% CB module is going to stream us the body data, so we keep this process - %% alive until we get the fin packet as part of the stream. - %%%% {ok, Headers, stream, ModState} when is_list(Headers) -> - %%%% NVPairsData = encode_name_value_pairs(Headers, State#state.z_context), - %%%% StreamID = State#state.streamid, - %%%% F = #cframe{type=?SYN_REPLY, - %%%% flags=0, - %%%% length = 6 + byte_size(NVPairsData), - %%%% data= <<0:1, - %%%% StreamID:31/big-unsigned-integer, - %%%% 0:16/big-unsigned-integer, %% UNUSED - %%%% NVPairsData/binary - %%%% >>}, - %%%% espdy_session:snd(State#state.pid, StreamID, F), - %%%% {noreply, State#state{mod_state=ModState}}; - %% CB module can't respond, because request is invalid - {error, not_http} -> - StreamID = State#state.streamid, - F = #spdy_rst_stream{ streamid=StreamID, statuscode=?PROTOCOL_ERROR }, - espdy_session:snd(State#state.pid, StreamID, F), - {stop, normal, State} - end. +handle_info(M, State) -> + {noreply, NewModState} = (State#state.mod):handle_info(M, State#state.mod_state), + {noreply, State#state{mod_state=NewModState}}. terminate(_Reason, _State) -> ok. @@ -189,6 +218,11 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions +decrement_window(#spdy_data{data=Data}, State = #state{window_size=Size}) -> + State#state{window_size=Size - size(Data)}; +decrement_window(_, State) -> + State. + send_http_response(Headers, Body, State = #state{}) when is_list(Headers), is_binary(Body) -> io:format("Respond with: ~p ~p\n",[Headers, Body]), StreamID = State#state.streamid,