Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 4 additions & 18 deletions rttp_client/src/connection/async_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use std::net::{TcpStream, ToSocketAddrs};
use futures::io::{AllowStdIo, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use socket2::{Domain, Protocol, Socket, Type};
use socks::{Socks4Stream, Socks5Stream};
use std::io::{self, Read, Write};
use std::io::{self, Write};
use std::time;
use url::Url;

#[cfg(feature = "tls-rustls")]
use std::sync::Arc;

use crate::connection::connection::Connection;
use crate::connection::connection::{read_proxy_connect_response, Connection};
use crate::connection::connection_reader::{response_body_kind, ResponseBodyKind};
use crate::error;
use crate::request::RawRequest;
Expand Down Expand Up @@ -401,7 +401,7 @@ impl<'a> AsyncConnection<'a> {
let addr = format!("{}:{}", proxy.host(), proxy.port());
let stream = self.async_tcp_stream(&addr).await?;
let mut stream = AllowStdIo::new(stream);
let header = self.conn.proxy_http_header(url);
let header = self.conn.proxy_http_header(url, proxy);

self.async_write_request(&mut stream, &header).await?;
self.async_read_stream(url, &mut stream).await
Expand All @@ -417,21 +417,7 @@ impl<'a> AsyncConnection<'a> {
.write_all(connect_header.as_bytes())
.map_err(error::request)?;
stream.flush().map_err(error::request)?;

// HTTP/1.1 200 Connection Established
let mut res = vec![0u8; 1024];
let bytes = stream.read(&mut res).map_err(error::request)?;

let res_s = match String::from_utf8(res[..bytes].to_vec()) {
Ok(r) => r,
Err(_) => return Err(error::bad_proxy("parse proxy server response error.")),
};
if !res_s
.to_ascii_lowercase()
.contains("connection established")
{
return Err(error::bad_proxy("Proxy server response error."));
}
read_proxy_connect_response(&mut stream)?;

self.async_send_with_stream(url, stream).await
}
Expand Down
23 changes: 4 additions & 19 deletions rttp_client/src/connection/block_connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::io::{Read, Write};
use std::io::Write;

use socks::{Socks4Stream, Socks5Stream};
use url::Url;
Expand Down Expand Up @@ -78,7 +78,7 @@ impl<'a> BlockConnection<'a> {
fn call_with_proxy_http(&self, url: &Url, proxy: &Proxy) -> error::Result<Vec<u8>> {
let addr = format!("{}:{}", proxy.host(), proxy.port());
let mut stream = self.conn.block_tcp_stream(&addr)?;
let header = self.conn.proxy_http_header(url);
let header = self.conn.proxy_http_header(url, proxy);

stream
.write_all(header.as_bytes())
Expand All @@ -101,25 +101,10 @@ impl<'a> BlockConnection<'a> {
let mut stream = self.conn.block_tcp_stream(&addr)?;

stream
.write(connect_header.as_bytes())
.write_all(connect_header.as_bytes())
.map_err(error::request)?;
stream.flush().map_err(error::request)?;

//HTTP/1.1 200 Connection Established
let mut res = [0u8; 1024];
stream.read(&mut res).map_err(error::request)?;

let res_s = String::from_utf8(res.to_vec())
.map_err(|_| error::bad_proxy("parse proxy server response error."))?;
if !res_s
.to_ascii_lowercase()
.contains("connection established")
{
return Err(error::bad_proxy(format!(
"Proxy server response error: {}",
res_s
)));
}
crate::connection::connection::read_proxy_connect_response(&mut stream)?;

self.conn.block_send_with_stream(url, &mut stream)
}
Expand Down
193 changes: 170 additions & 23 deletions rttp_client/src/connection/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,30 +218,23 @@ impl<'a> Connection<'a> {
let mut proxy_header = String::new();
proxy_header.push_str(&format!("CONNECT {}:{} HTTP/1.1\r\n", host, port));
proxy_header.push_str(&format!("Host: {}:{}\r\n", host, port));

if let Some(username) = proxy.username() {
let auth = if let Some(password) = proxy.password() {
format!("{}:{}", username, password)
} else {
format!("{}:", username)
};
let auth = STANDARD.encode(auth.as_bytes());
proxy_header.push_str(&format!("Authorization: Basic {}\r\n", auth));
}
append_proxy_authorization_header(&mut proxy_header, proxy);

proxy_header.push_str("\r\n");
Ok(proxy_header)
}

pub fn proxy_http_header(&self, url: &Url) -> String {
pub fn proxy_http_header(&self, url: &Url, proxy: &Proxy) -> String {
let header = self.header();
let (_, rest) = header.split_once("\r\n").unwrap_or(("", ""));
format!(
let mut proxy_header = format!(
"{} {} HTTP/1.1\r\n{}",
self.request.origin().method().to_uppercase(),
absolute_url(url),
rest
)
);
append_proxy_authorization_header(&mut proxy_header, proxy);
proxy_header
}

pub fn redirect_url(&self, url: &Url, location: &str) -> error::Result<String> {
Expand All @@ -262,6 +255,88 @@ fn absolute_url(url: &Url) -> String {
absolute.to_string()
}

fn proxy_authorization_value(proxy: &Proxy) -> Option<String> {
proxy.username().as_ref().map(|username| {
let auth = if let Some(password) = proxy.password() {
format!("{}:{}", username, password)
} else {
format!("{}:", username)
};
STANDARD.encode(auth.as_bytes())
})
}

fn append_proxy_authorization_header(header: &mut String, proxy: &Proxy) {
if let Some(auth) = proxy_authorization_value(proxy) {
header.push_str(&format!("Proxy-Authorization: Basic {}\r\n", auth));
}
}

fn write_http_request<W>(
stream: &mut W,
header: &str,
body: Option<&RequestBody>,
) -> error::Result<()>
where
W: io::Write,
{
stream
.write_all(header.as_bytes())
.map_err(error::request)?;
if let Some(body) = body {
stream.write_all(body.bytes()).map_err(error::request)?;
}
stream.flush().map_err(error::request)?;
Ok(())
}

pub(crate) fn parse_proxy_connect_response(header: &[u8]) -> error::Result<()> {
let header = String::from_utf8(header.to_vec())
.map_err(|_| error::bad_proxy("parse proxy server response error."))?;
let status_line = header
.lines()
.next()
.ok_or_else(|| error::bad_proxy("Proxy server response error."))?;
let status_code = status_line
.split_whitespace()
.nth(1)
.ok_or_else(|| error::bad_proxy("Proxy server response error."))?
.parse::<u16>()
.map_err(|_| error::bad_proxy("parse proxy server response error."))?;

if status_code == 200 {
Ok(())
} else {
Err(error::bad_proxy(format!(
"Proxy server response error: {}",
status_line
)))
}
}

pub(crate) fn read_proxy_connect_response<R>(reader: &mut R) -> error::Result<()>
where
R: io::Read,
{
let mut header = Vec::new();
let mut byte = [0u8; 1];

loop {
let read = reader.read(&mut byte).map_err(error::request)?;
if read == 0 {
if header.is_empty() {
return Err(error::bad_proxy("Proxy server response error."));
}
return Err(error::bad_proxy("Incomplete proxy response headers"));
}

header.push(byte[0]);
if header.ends_with(b"\r\n\r\n") {
return parse_proxy_connect_response(&header);
}
}
}

impl<'a> Connection<'a> {
pub fn block_tcp_stream(&self, addr: &String) -> error::Result<std::net::TcpStream> {
let config = self.config();
Expand Down Expand Up @@ -307,16 +382,7 @@ impl<'a> Connection<'a> {
where
S: io::Write,
{
let header = self.header();
let body = self.body();

stream.write(header.as_bytes()).map_err(error::request)?;
if let Some(body) = body {
stream.write(body.bytes()).map_err(error::request)?;
}
stream.flush().map_err(error::request)?;

Ok(())
write_http_request(stream, self.header(), self.body().as_ref())
}

pub fn block_read_stream<S>(&self, url: &Url, stream: &mut S) -> error::Result<Vec<u8>>
Expand Down Expand Up @@ -446,3 +512,84 @@ impl<'a> Connection<'a> {
self.block_read_stream(url, &mut tls)
}
}

#[cfg(test)]
mod tests {
use std::io::{self, Cursor, Write};

use crate::request::RequestBody;
use crate::types::Proxy;

use super::{
parse_proxy_connect_response, proxy_authorization_value, read_proxy_connect_response,
write_http_request,
};

struct PartialWriter {
max_chunk: usize,
written: Vec<u8>,
}

impl PartialWriter {
fn new(max_chunk: usize) -> Self {
Self {
max_chunk,
written: Vec::new(),
}
}
}

impl Write for PartialWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let take = buf.len().min(self.max_chunk);
self.written.extend_from_slice(&buf[..take]);
Ok(take)
}

fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}

#[test]
fn test_write_http_request_retries_until_full_payload_is_written() {
let header = "POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n";
let body = RequestBody::with_text("hello");
let mut writer = PartialWriter::new(3);

write_http_request(&mut writer, header, Some(&body)).unwrap();

assert_eq!(
format!("{}hello", header).as_bytes(),
writer.written.as_slice()
);
}

#[test]
fn test_proxy_authorization_value_encodes_credentials() {
let proxy = Proxy::http_with_authorization("127.0.0.1", 8080, "user", "secret");

assert_eq!(
Some("dXNlcjpzZWNyZXQ=".to_string()),
proxy_authorization_value(&proxy)
);
}

#[test]
fn test_parse_proxy_connect_response_requires_200_status() {
let header = b"HTTP/1.1 407 Proxy Authentication Required\r\n\r\n";
let err = parse_proxy_connect_response(header).unwrap_err();

assert!(err
.to_string()
.contains("407 Proxy Authentication Required"));
}

#[test]
fn test_read_proxy_connect_response_waits_for_complete_headers() {
let header = b"HTTP/1.1 200 Connection Established\r\nProxy-Agent: test\r\n\r\n";
let mut reader = Cursor::new(header);

read_proxy_connect_response(&mut reader).unwrap();
}
}
Loading