Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
tags:
- "v*.*.*" # e.g., v1.2.3 triggers a release
pull_request:

env:
CARGO_TERM_COLOR: always
Expand Down Expand Up @@ -59,7 +60,6 @@ jobs:
- name: Build (cross)
run: |
cross build --release --target ${{ matrix.target }} --bin tcp-proxy
cross build --release --target ${{ matrix.target }} --bin echo-server
- name: Package artifacts
env:
VER: ${{ needs.version.outputs.release_version }}
Expand All @@ -70,7 +70,6 @@ jobs:
OUTDIR="dist/${TARGET}"
mkdir -p "${OUTDIR}"
cp "${BIN_DIR}/tcp-proxy" "${OUTDIR}/"
cp "${BIN_DIR}/echo-server" "${OUTDIR}/"
cp README.md LICENSE-MIT LICENSE-APACHE "${OUTDIR}/"
tar -C dist -czf "tcp-proxy-${VER}-${TARGET}.tar.gz" "${TARGET}"
sha256sum "tcp-proxy-${VER}-${TARGET}.tar.gz" > "tcp-proxy-${VER}-${TARGET}.tar.gz.sha256"
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
/target
# rustrover
.idea
11 changes: 9 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ edition = "2024"
license = "MIT OR Apache-2.0"

[dependencies]
anyhow = "1"
tokio = { version = "1.39", features = ["net", "io-util", "rt-multi-thread", "macros", "signal", "time"] }
clap = { version = "4.5", features = ["derive"] }
humantime = "2.1"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"] }

[[bin]]
name = "tcp-proxy"
path = "src/main.rs"

[[bin]]
name = "echo-server"
path = "src/echo-server.rs"
[lints.rust]
unsafe_code = "forbid"
non_ascii_idents = "deny"
unreachable_pub = "deny"
trivial_casts = "deny"
unused_crate_dependencies = "deny"
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# tcp-proxy

A small, async TCP port‑forwarding proxy built with Rust and Tokio. It listens on a local address/port and forwards raw bytes bidirectionally to a target address/port. Includes graceful shutdown and structured logging via `tracing`.
A small, async TCP port‑forwarding proxy built with Rust and Tokio. It listens on a local address/port and forwards raw bytes bidirectionally to a target address/port. Includes structured logging via `tracing`.

## Quick Start

Expand All @@ -18,30 +18,32 @@ RUST_LOG=info ./target/release/tcp-proxy \

```text
tcp-proxy --listen <ADDR:PORT> --to <ADDR:PORT> \
[--connect-timeout <DURATION>] [--session-timeout <DURATION>]
[--connect-timeout <DURATION>]
```

- `--listen <ADDR:PORT>`: Local address:port to accept client connections (e.g., `0.0.0.0:5000`).
- `--to <ADDR:PORT>`: Remote target address:port to forward to (e.g., `10.1.1.10:6000`).
- `--connect-timeout <DURATION>`: Max time to establish the outbound connection (default: `5s`).
- `--session-timeout <DURATION>`: Max lifetime of a connection; `0s` disables (default: `0s`).

Durations use `humantime` format, e.g., `250ms`, `10s`, `2m`, `1h`.

### Examples

- Forward local port 5000 to 10.1.1.10:6000:

```bash
RUST_LOG=info tcp-proxy --listen 0.0.0.0:5000 --to 10.1.1.10:6000
```

- With a 2s connect timeout and 30s session timeout:
- With a 2s connect timeout:

```bash
tcp-proxy --listen 127.0.0.1:5000 --to 127.0.0.1:6000 \
--connect-timeout 2s --session-timeout 30s
--connect-timeout 2s
```

- Quick local test with netcat:

```bash
# Terminal A: echo server on 6000
nc -lk 127.0.0.1 6000
Expand All @@ -62,12 +64,10 @@ Durations use `humantime` format, e.g., `250ms`, `10s`, `2m`, `1h`.
- `RUST_LOG=tcp_proxy=debug tcp-proxy ...` (enable debug for this crate only)
- Connection context: logs emitted while handling a connection are prefixed with a span like `conn{id=..., client=..., remote=...}`.


## Notes

- On Ctrl+C, the proxy stops accepting new connections and waits for existing ones to complete.
- On connect timeout, the client socket is closed and the attempt is logged as an error.
- On session timeout, both sockets are closed and a warning is logged.
- On Ctrl+C, the proxy exits immediately; active connections are aborted.
- On connect timeout, the client socket is closed and the attempt is logged as a warning.
- No authentication, authorization, or TLS.

## License
Expand Down
124 changes: 46 additions & 78 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
use anyhow::Context;
use clap::Parser;
use std::io as stdio;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream};
use tokio::signal;
use tokio::task::JoinSet;
use tokio::time;
use tracing::{error, info, warn};
use tracing_subscriber::EnvFilter;
use tracing::Instrument;

static NEXT_CONN_ID: AtomicU64 = AtomicU64::new(1);
use tracing::{info, warn};
use tracing_subscriber::EnvFilter;

#[derive(Parser, Debug)]
/// A simple TCP port-forwarding proxy
Expand Down Expand Up @@ -40,110 +35,83 @@ struct Cli {
/// Max time to establish the outbound connection (humantime, e.g., 2s, 500ms)
#[arg(long = "connect-timeout", default_value = "5s", value_parser = humantime::parse_duration, value_name = "DURATION")]
connect_timeout: Duration,
/// Max lifetime of a proxied connection; 0s by default (disables the timeout)
#[arg(long = "session-timeout", default_value = "0s", value_parser = humantime::parse_duration, value_name = "DURATION")]
session_timeout: Duration,
}

async fn handle_connection(
mut client_socket: TcpStream,
client_socket: TcpStream,
remote_addr: SocketAddr,
connect_timeout: Duration,
session_timeout: Duration,
) {
let connect = time::timeout(connect_timeout, TcpStream::connect(remote_addr)).await;
let mut remote_socket = match connect {
Ok(Ok(s)) => {
info!("Connected");
s
}
Ok(Err(e)) => {
error!(error = %e, "Failed to connect");
let _ = client_socket.shutdown().await;
return;
}
Err(_) => {
error!(timeout = ?connect_timeout, "Connect timeout");
let _ = client_socket.shutdown().await;
return;
}
};
async fn handle_connection_inner(
mut client_socket: TcpStream,
remote_addr: SocketAddr,
connect_timeout: Duration,
) -> anyhow::Result<(u64, u64)> {
let mut remote_socket = time::timeout(connect_timeout, TcpStream::connect(remote_addr))
.await
.context("connect timed out")?
.context("failed to connect to remote")?;

let res = if session_timeout.is_zero() {
// If zero, there is no timeout and the proxying runs until the connection closes
tokio::io::copy_bidirectional(&mut client_socket, &mut remote_socket).await.map(Some)
} else {
match time::timeout(session_timeout, tokio::io::copy_bidirectional(&mut client_socket, &mut remote_socket)).await {
Ok(inner) => inner.map(Some),
Err(_) => Ok(None),
}
};
let stats = tokio::io::copy_bidirectional(&mut client_socket, &mut remote_socket)
.await
.context("proxying data")?;

match res {
Ok(Some((c_to_r, r_to_c))) => {
info!(client_to_remote = c_to_r, remote_to_client = r_to_c, "Closed connection");
}
Ok(None) => {
warn!(timeout = ?session_timeout, "Session timeout");
Ok(stats)
}

match handle_connection_inner(client_socket, remote_addr, connect_timeout).await {
Ok((c_to_r, r_to_c)) => {
info!(
client_to_remote = c_to_r,
remote_to_client = r_to_c,
"closed connection"
);
}
Err(e) => {
error!(error = %e, "Piping error");
Err(err) => {
warn!("session error: {err}");
}
}

let _ = client_socket.shutdown().await;
let _ = remote_socket.shutdown().await;
}

#[tokio::main]
async fn main() -> std::io::Result<()> {
async fn main() -> anyhow::Result<()> {
// Initialize logging from RUST_LOG or default to info
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.with_env_filter(
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")),
)
.with_target(false)
.init();

let args = Cli::parse();
let listener = TcpListener::bind(args.listen).await?;
info!(listen = %args.listen, to = %args.to, "Listening (Ctrl+C to stop accepting)");
let listener = TcpListener::bind(args.listen)
.await
.context("unable to bind listener")?;

info!(listen = %args.listen, to = %args.to, "listening (Ctrl+C exits immediately)");

let mut tasks: JoinSet<()> = JoinSet::new();
let to = args.to;
let connect_timeout = args.connect_timeout;
let session_timeout = args.session_timeout;

let mut next_conn_id: u64 = 1;

loop {
tokio::select! {
_ = signal::ctrl_c() => {
info!("Ctrl+C received — stopping accept and waiting for active connections...");
break;
info!("Ctrl+C received — exiting immediately");
return Ok(());
}
res = listener.accept() => {
match res {
Ok((socket, client_addr)) => {
let id = NEXT_CONN_ID.fetch_add(1, Ordering::Relaxed);
info!(id = id, client = %client_addr, "Accepted connection");
let to = to;
let id = next_conn_id;
next_conn_id += 1;
info!(id = id, client = %client_addr, "accepted connection");
let span = tracing::info_span!("conn", id = id, client = %client_addr, remote = %to);
tasks.spawn(async move {
handle_connection(socket, to, connect_timeout, session_timeout).await;
}.instrument(span));
tokio::spawn(handle_connection(socket, to, connect_timeout).instrument(span));
}
Err(e) => warn!(error = %e, "Failed to accept connection"),
Err(e) => warn!(error = %e, "failed to accept connection"),
}
}
}
}

// Close listener so no further accepts happen
drop(listener);

while let Some(res) = tasks.join_next().await {
if let Err(e) = res {
error!(error = %e, "A connection task ended with an error");
}
}

info!("Shutdown complete.");
Ok(())
}
Loading