diff --git a/Cargo.lock b/Cargo.lock index 968a795f..f2ed0089 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1780,16 +1780,8 @@ name = "hl7v2-network" version = "1.2.0" dependencies = [ "bytes", - "futures", - "hl7v2-model", - "hl7v2-parser", - "hl7v2-test-utils", - "hl7v2-writer", - "proptest", - "rustls", + "hl7v2", "tokio", - "tokio-rustls", - "tokio-test", "tokio-util", ] @@ -4783,17 +4775,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-test" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f6d24790a10a7af737693a3e8f1d03faef7e6ca0cc99aae5066f533766de545" -dependencies = [ - "futures-core", - "tokio", - "tokio-stream", -] - [[package]] name = "tokio-util" version = "0.7.18" diff --git a/crates/hl7v2-network/Cargo.toml b/crates/hl7v2-network/Cargo.toml index 9fd68fc8..4936db3c 100644 --- a/crates/hl7v2-network/Cargo.toml +++ b/crates/hl7v2-network/Cargo.toml @@ -13,28 +13,15 @@ keywords.workspace = true categories = ["network-programming", "asynchronous"] [dependencies] -# Core HL7v2 dependencies -hl7v2-model = { version = "1.2.0", path = "../hl7v2-model" } -hl7v2-parser = { version = "1.2.0", path = "../hl7v2-parser" } -hl7v2-writer = { version = "1.2.0", path = "../hl7v2-writer" } - -# Async runtime and networking -tokio = { workspace = true, features = ["net", "io-util", "time", "macros", "rt", "sync"] } -tokio-util = { version = "0.7.18", features = ["codec"] } -bytes = "1.11.1" -futures = "0.3.32" - -# Optional TLS support -rustls = { version = "0.23.37", optional = true, default-features = false, features = ["ring"] } -tokio-rustls = { version = "0.26.4", optional = true, default-features = false, features = ["ring"] } +hl7v2 = { version = "1.2.0", path = "../hl7v2", default-features = false, features = [ + "network", +] } [features] default = [] -tls = ["rustls", "tokio-rustls"] +tls = [] [dev-dependencies] tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time", "net", "io-util"] } -hl7v2-test-utils = { path = "../hl7v2-test-utils" } -proptest = "1.6" -tokio-test = "0.4" - +tokio-util = { workspace = true, features = ["codec"] } +bytes = { workspace = true } diff --git a/crates/hl7v2-network/README.md b/crates/hl7v2-network/README.md index 100ea6df..9b8fc7e1 100644 --- a/crates/hl7v2-network/README.md +++ b/crates/hl7v2-network/README.md @@ -1,6 +1,7 @@ # hl7v2-network -TCP and MLLP based network communication for HL7 v2. +Deprecated compatibility crate for TCP and MLLP based network communication. +Use `hl7v2::transport::network` for new Rust code. ## Usage diff --git a/crates/hl7v2-network/src/client.rs b/crates/hl7v2-network/src/client.rs deleted file mode 100644 index 9477ee4b..00000000 --- a/crates/hl7v2-network/src/client.rs +++ /dev/null @@ -1,279 +0,0 @@ -//! MLLP TCP client for sending HL7 messages. -//! -//! This module provides an async TCP client that: -//! - Connects to MLLP servers -//! - Encodes and sends HL7 messages with MLLP framing -//! - Receives and decodes ACK responses - -use super::codec::MllpCodec; -use bytes::BytesMut; -use futures::prelude::*; -use hl7v2_model::Message; -use hl7v2_parser::parse; -use hl7v2_writer::write; -use std::net::SocketAddr; -use std::time::Duration; -use tokio::net::TcpStream; -use tokio::time::timeout; -use tokio_util::codec::Framed; - -/// Configuration for MLLP client -#[derive(Debug, Clone)] -pub struct MllpClientConfig { - /// Connection timeout - pub connect_timeout: Duration, - /// Read timeout for responses - pub read_timeout: Duration, - /// Write timeout for sending messages - pub write_timeout: Duration, - /// Maximum frame size - pub max_frame_size: usize, -} - -impl Default for MllpClientConfig { - fn default() -> Self { - Self { - connect_timeout: Duration::from_secs(10), - read_timeout: Duration::from_secs(30), - write_timeout: Duration::from_secs(30), - max_frame_size: 10 * 1024 * 1024, // 10MB - } - } -} - -/// MLLP TCP client -pub struct MllpClient { - config: MllpClientConfig, - framed: Option>, - peer_addr: Option, -} - -impl MllpClient { - /// Create a new MLLP client with the given configuration - pub fn new(config: MllpClientConfig) -> Self { - Self { - config, - framed: None, - peer_addr: None, - } - } - - /// Create a new MLLP client with default configuration - pub fn with_default_config() -> Self { - Self::new(MllpClientConfig::default()) - } - - /// Connect to a remote MLLP server - pub async fn connect(&mut self, addr: impl Into) -> Result<(), std::io::Error> { - let addr = addr.into(); - - let stream = timeout(self.config.connect_timeout, TcpStream::connect(addr)) - .await - .map_err(|_| { - std::io::Error::new(std::io::ErrorKind::TimedOut, "Connection timeout") - })??; - - let codec = MllpCodec::with_max_frame_size(self.config.max_frame_size); - self.framed = Some(Framed::new(stream, codec)); - self.peer_addr = Some(addr); - - Ok(()) - } - - /// Check if the client is connected - pub fn is_connected(&self) -> bool { - self.framed.is_some() - } - - /// Get the peer address if connected - pub fn peer_addr(&self) -> Option { - self.peer_addr - } - - /// Send a message and wait for an ACK response - pub async fn send_message(&mut self, message: &Message) -> Result { - let framed = self.framed.as_mut().ok_or_else(|| { - std::io::Error::new(std::io::ErrorKind::NotConnected, "Not connected") - })?; - - // Serialize the message - let bytes = write(message); - - // Send the message with timeout - timeout( - self.config.write_timeout, - framed.send(BytesMut::from(&bytes[..])), - ) - .await - .map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "Write timeout"))??; - - // Wait for ACK response with timeout - let response = timeout(self.config.read_timeout, framed.next()) - .await - .map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "Read timeout"))? - .ok_or_else(|| { - std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "Connection closed") - })??; - - // Parse the ACK - let ack = parse(&response) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?; - - Ok(ack) - } - - /// Send a message without waiting for a response (fire-and-forget) - pub async fn send_message_no_ack(&mut self, message: &Message) -> Result<(), std::io::Error> { - let framed = self.framed.as_mut().ok_or_else(|| { - std::io::Error::new(std::io::ErrorKind::NotConnected, "Not connected") - })?; - - // Serialize the message - let bytes = write(message); - - // Send the message with timeout - timeout( - self.config.write_timeout, - framed.send(BytesMut::from(&bytes[..])), - ) - .await - .map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "Write timeout"))??; - - Ok(()) - } - - /// Receive a message from the server - pub async fn receive_message(&mut self) -> Result, std::io::Error> { - let framed = self.framed.as_mut().ok_or_else(|| { - std::io::Error::new(std::io::ErrorKind::NotConnected, "Not connected") - })?; - - match timeout(self.config.read_timeout, framed.next()).await { - Ok(Some(Ok(frame))) => { - let message = parse(&frame).map_err(|e| { - std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()) - })?; - Ok(Some(message)) - } - Ok(Some(Err(e))) => Err(e), - Ok(None) => Ok(None), // Connection closed - Err(_) => Err(std::io::Error::new( - std::io::ErrorKind::TimedOut, - "Read timeout", - )), - } - } - - /// Close the connection - pub async fn close(mut self) -> Result<(), std::io::Error> { - if let Some(framed) = self.framed.take() { - // Get the underlying stream and shut it down - let stream = framed.into_inner(); - // Just dropping the stream will close it - drop(stream); - } - Ok(()) - } - - /// Disconnect without consuming the client (allows reconnection) - pub async fn disconnect(&mut self) -> Result<(), std::io::Error> { - if let Some(framed) = self.framed.take() { - // Get the underlying stream and shut it down - let stream = framed.into_inner(); - // Just dropping the stream will close it - drop(stream); - } - self.peer_addr = None; - Ok(()) - } -} - -/// Builder for creating MLLP clients with custom configuration -pub struct MllpClientBuilder { - config: MllpClientConfig, -} - -impl MllpClientBuilder { - /// Create a new client builder with default configuration - pub fn new() -> Self { - Self { - config: MllpClientConfig::default(), - } - } - - /// Set the connection timeout - pub fn connect_timeout(mut self, timeout: Duration) -> Self { - self.config.connect_timeout = timeout; - self - } - - /// Set the read timeout - pub fn read_timeout(mut self, timeout: Duration) -> Self { - self.config.read_timeout = timeout; - self - } - - /// Set the write timeout - pub fn write_timeout(mut self, timeout: Duration) -> Self { - self.config.write_timeout = timeout; - self - } - - /// Set the maximum frame size - pub fn max_frame_size(mut self, size: usize) -> Self { - self.config.max_frame_size = size; - self - } - - /// Build the client - pub fn build(self) -> MllpClient { - MllpClient::new(self.config) - } -} - -impl Default for MllpClientBuilder { - fn default() -> Self { - Self::new() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_client_builder() { - let client = MllpClientBuilder::new() - .connect_timeout(Duration::from_secs(5)) - .read_timeout(Duration::from_secs(15)) - .write_timeout(Duration::from_secs(15)) - .max_frame_size(1024 * 1024) - .build(); - - assert_eq!(client.config.connect_timeout, Duration::from_secs(5)); - assert_eq!(client.config.read_timeout, Duration::from_secs(15)); - assert_eq!(client.config.write_timeout, Duration::from_secs(15)); - assert_eq!(client.config.max_frame_size, 1024 * 1024); - } - - #[test] - fn test_client_not_connected() { - let client = MllpClient::with_default_config(); - assert!(!client.is_connected()); - assert!(client.peer_addr().is_none()); - } - - #[tokio::test] - async fn test_client_connect_timeout() { - use std::net::SocketAddr; - - let mut client = MllpClientBuilder::new() - .connect_timeout(Duration::from_millis(1)) - .build(); - - // Try to connect to a non-routable address (should timeout) - let addr: SocketAddr = "192.0.2.1:2575".parse().unwrap(); - let result = client.connect(addr).await; - assert!(result.is_err()); - } -} diff --git a/crates/hl7v2-network/src/codec.rs b/crates/hl7v2-network/src/codec.rs deleted file mode 100644 index 4b4cd075..00000000 --- a/crates/hl7v2-network/src/codec.rs +++ /dev/null @@ -1,295 +0,0 @@ -//! MLLP (Minimal Lower Layer Protocol) codec for encoding and decoding HL7 messages. -//! -//! MLLP wraps HL7 messages with framing characters: -//! - Start: 0x0B (vertical tab) -//! - End: 0x1C 0x0D (file separator + carriage return) - -use bytes::{Buf, BufMut, BytesMut}; -use tokio_util::codec::{Decoder, Encoder}; - -/// MLLP frame start byte (vertical tab) -const MLLP_START: u8 = 0x0B; - -/// MLLP frame end byte 1 (file separator) -const MLLP_END_1: u8 = 0x1C; - -/// MLLP frame end byte 2 (carriage return) -const MLLP_END_2: u8 = 0x0D; - -/// Maximum size for a single MLLP frame (10MB) -const MAX_FRAME_SIZE: usize = 10 * 1024 * 1024; - -/// MLLP codec for encoding and decoding HL7 messages with MLLP framing. -/// -/// # Examples -/// -/// ```no_run -/// use tokio_util::codec::Framed; -/// use tokio::net::TcpStream; -/// # use hl7v2_network::MllpCodec; -/// -/// # async fn example() -> Result<(), Box> { -/// let stream = TcpStream::connect("127.0.0.1:2575").await?; -/// let mut framed = Framed::new(stream, MllpCodec::new()); -/// # Ok(()) -/// # } -/// ``` -#[derive(Debug, Clone, Default)] -pub struct MllpCodec { - /// Maximum allowed frame size - max_frame_size: usize, -} - -impl MllpCodec { - /// Create a new MLLP codec with default settings. - pub fn new() -> Self { - Self { - max_frame_size: MAX_FRAME_SIZE, - } - } - - /// Create a new MLLP codec with a custom maximum frame size. - pub fn with_max_frame_size(max_frame_size: usize) -> Self { - Self { max_frame_size } - } -} - -impl Decoder for MllpCodec { - type Item = BytesMut; - type Error = std::io::Error; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - // Need at least 3 bytes: start byte + at least one content byte + end bytes - if src.len() < 3 { - return Ok(None); - } - - // Find the start byte - let start_pos = match src.iter().position(|&b| b == MLLP_START) { - Some(pos) => pos, - None => { - // No start byte found, discard all data - src.clear(); - return Ok(None); - } - }; - - // If start byte is not at position 0, discard junk data before it - if start_pos > 0 { - src.advance(start_pos); - } - - // Now src[0] should be MLLP_START - // Look for the end sequence starting from position 1 - // position() returns the index in src[1..] where the end sequence starts - // This is also the length of the content - let end_pos = src[1..] - .windows(2) - .position(|window| window[0] == MLLP_END_1 && window[1] == MLLP_END_2); - - match end_pos { - Some(pos) => { - // pos is the index in src[1..] where we found the end sequence - // So the actual content length is just pos (not including the start byte) - let content_len = pos; - - // Check frame size before allocation - if content_len > self.max_frame_size { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!( - "Frame size {} exceeds maximum {}", - content_len, self.max_frame_size - ), - )); - } - - // We have a complete frame - // Extract the content (excluding framing bytes) - // Frame structure: [MLLP_START][content][MLLP_END_1][MLLP_END_2] - // pos is relative to src[1..], so in original src: - // - MLLP_START is at index 0 - // - content is at indices 1..=pos - // - MLLP_END_1 is at index pos+1 - // - MLLP_END_2 is at index pos+2 - - // Advance past the start byte - src.advance(1); - - // Now split off just the content (pos bytes) - let content = src.split_to(content_len); - - // Advance past the end sequence (MLLP_END_1 + MLLP_END_2) - src.advance(2); - - Ok(Some(content)) - } - None => { - // Check if buffer is getting too large while waiting for end sequence - if src.len() > self.max_frame_size { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!( - "Buffer size {} exceeds maximum {}", - src.len(), - self.max_frame_size - ), - )); - } - - // Not enough data yet, need to wait for more - Ok(None) - } - } - } -} - -impl Encoder for MllpCodec { - type Error = std::io::Error; - - fn encode(&mut self, item: BytesMut, dst: &mut BytesMut) -> Result<(), Self::Error> { - // Check that the message isn't too large - if item.len() > self.max_frame_size { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!( - "Message size {} exceeds maximum {}", - item.len(), - self.max_frame_size - ), - )); - } - - // Reserve space: start byte + content + 2 end bytes - dst.reserve(1 + item.len() + 2); - - // Write MLLP framing - dst.put_u8(MLLP_START); - dst.put(item); - dst.put_u8(MLLP_END_1); - dst.put_u8(MLLP_END_2); - - Ok(()) - } -} - -impl Encoder<&[u8]> for MllpCodec { - type Error = std::io::Error; - - fn encode(&mut self, item: &[u8], dst: &mut BytesMut) -> Result<(), Self::Error> { - // Check that the message isn't too large - if item.len() > self.max_frame_size { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!( - "Message size {} exceeds maximum {}", - item.len(), - self.max_frame_size - ), - )); - } - - // Reserve space: start byte + content + 2 end bytes - dst.reserve(1 + item.len() + 2); - - // Write MLLP framing - dst.put_u8(MLLP_START); - dst.put_slice(item); - dst.put_u8(MLLP_END_1); - dst.put_u8(MLLP_END_2); - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_encode() { - let mut codec = MllpCodec::new(); - let mut dst = BytesMut::new(); - let msg = BytesMut::from("MSH|^~\\&|TEST\r"); - - codec.encode(msg, &mut dst).unwrap(); - - assert_eq!(dst[0], MLLP_START); - assert_eq!(dst[dst.len() - 2], MLLP_END_1); - assert_eq!(dst[dst.len() - 1], MLLP_END_2); - assert_eq!(&dst[1..dst.len() - 2], b"MSH|^~\\&|TEST\r"); - } - - #[test] - fn test_decode() { - let mut codec = MllpCodec::new(); - let mut src = BytesMut::from(&b"\x0BMSH|^~\\&|TEST\r\x1C\x0D"[..]); - - let result = codec.decode(&mut src).unwrap(); - assert!(result.is_some()); - - let content = result.unwrap(); - assert_eq!(&content[..], b"MSH|^~\\&|TEST\r"); - } - - #[test] - fn test_decode_incomplete() { - let mut codec = MllpCodec::new(); - let mut src = BytesMut::from(&b"\x0BMSH|^~\\&|TEST\r"[..]); - - let result = codec.decode(&mut src).unwrap(); - assert!(result.is_none()); - } - - #[test] - fn test_decode_with_junk_before() { - let mut codec = MllpCodec::new(); - let mut src = BytesMut::from(&b"JUNK\x0BMSH|^~\\&|TEST\r\x1C\x0D"[..]); - - let result = codec.decode(&mut src).unwrap(); - assert!(result.is_some()); - - let content = result.unwrap(); - assert_eq!(&content[..], b"MSH|^~\\&|TEST\r"); - } - - #[test] - fn test_decode_no_start_byte() { - let mut codec = MllpCodec::new(); - let mut src = BytesMut::from(&b"MSH|^~\\&|TEST\r\x1C\x0D"[..]); - - let result = codec.decode(&mut src).unwrap(); - assert!(result.is_none()); - assert_eq!(src.len(), 0); // Should discard all data - } - - #[test] - fn test_max_frame_size() { - let mut codec = MllpCodec::with_max_frame_size(10); - let mut dst = BytesMut::new(); - let large_msg = BytesMut::from(&b"12345678901"[..]); // 11 bytes, exceeds limit - - let result = codec.encode(large_msg, &mut dst); - assert!(result.is_err()); - } - - #[test] - fn test_decode_multiple_frames() { - let mut codec = MllpCodec::new(); - let mut src = BytesMut::from(&b"\x0BMSG1\r\x1C\x0D\x0BMSG2\r\x1C\x0D"[..]); - - // Decode first frame - let result1 = codec.decode(&mut src).unwrap(); - assert!(result1.is_some()); - assert_eq!(&result1.unwrap()[..], b"MSG1\r"); - - // Decode second frame - let result2 = codec.decode(&mut src).unwrap(); - assert!(result2.is_some()); - assert_eq!(&result2.unwrap()[..], b"MSG2\r"); - - // No more frames - let result3 = codec.decode(&mut src).unwrap(); - assert!(result3.is_none()); - } -} diff --git a/crates/hl7v2-network/src/lib.rs b/crates/hl7v2-network/src/lib.rs index 82af00c6..c97d340d 100644 --- a/crates/hl7v2-network/src/lib.rs +++ b/crates/hl7v2-network/src/lib.rs @@ -1,80 +1,5 @@ -//! Network functionality for HL7 v2 MLLP connections. -//! -//! This crate provides: -//! - **MLLP Codec**: Encoding and decoding of MLLP frames using Tokio's codec framework -//! - **MLLP Client**: Async TCP client for sending HL7 messages and receiving ACKs -//! - **MLLP Server**: Async TCP server for receiving HL7 messages and sending ACKs -//! -//! # MLLP Protocol -//! -//! MLLP (Minimal Lower Layer Protocol) is a simple framing protocol used to transmit -//! HL7 messages over TCP. Each message is wrapped with: -//! - Start byte: `0x0B` (vertical tab) -//! - Message content (HL7 message) -//! - End bytes: `0x1C 0x0D` (file separator + carriage return) -//! -//! # Examples -//! -//! ## Client Example -//! -//! ```no_run -//! use hl7v2_network::{MllpClient, MllpClientBuilder}; -//! use hl7v2_model::{Message, Delims}; -//! use std::time::Duration; -//! -//! # async fn example() -> Result<(), Box> { -//! // Create a client -//! let mut client = MllpClientBuilder::new() -//! .connect_timeout(Duration::from_secs(5)) -//! .read_timeout(Duration::from_secs(30)) -//! .build(); -//! -//! // Connect to server -//! let addr: std::net::SocketAddr = "127.0.0.1:2575".parse()?; -//! client.connect(addr).await?; -//! -//! // Send a message (assumes you have a Message) -//! # let message = Message { delims: Delims::default(), segments: vec![], charsets: vec![] }; -//! let ack = client.send_message(&message).await?; -//! -//! // Close connection -//! client.close().await?; -//! # Ok(()) -//! # } -//! ``` -//! -//! ## Server Example -//! -//! ```no_run -//! use hl7v2_network::{MllpServer, MllpServerConfig, MessageHandler}; -//! use hl7v2_model::{Message, Error}; -//! -//! struct MyHandler; -//! -//! impl MessageHandler for MyHandler { -//! async fn handle_message(&self, message: Message) -> Result, Error> { -//! // Process the message and optionally return an ACK -//! println!("Received message with {} segments", message.segments.len()); -//! Ok(None) // Return Some(ack_message) to send an ACK -//! } -//! } -//! -//! # async fn example() -> Result<(), Box> { -//! let mut server = MllpServer::new(MllpServerConfig::default()); -//! let addr: std::net::SocketAddr = "127.0.0.1:2575".parse()?; -//! server.bind(addr).await?; -//! server.run(MyHandler).await?; -//! # Ok(()) -//! # } -//! ``` - -mod client; -mod codec; -mod server; - -pub use client::{MllpClient, MllpClientBuilder, MllpClientConfig}; -pub use codec::MllpCodec; -pub use server::{AckTimingPolicy, MessageHandler, MllpConnection, MllpServer, MllpServerConfig}; - -#[cfg(test)] -mod tests; +//! Deprecated compatibility crate. +//! +//! Use `hl7v2::transport::network` instead. + +pub use hl7v2::transport::network::*; diff --git a/crates/hl7v2-network/src/server.rs b/crates/hl7v2-network/src/server.rs deleted file mode 100644 index 097348a8..00000000 --- a/crates/hl7v2-network/src/server.rs +++ /dev/null @@ -1,341 +0,0 @@ -//! MLLP TCP server for receiving HL7 messages. -//! -//! This module provides an async TCP server that: -//! - Accepts MLLP connections -//! - Decodes incoming MLLP frames -//! - Parses HL7 messages -//! - Sends ACKs according to configurable timing policy - -use super::codec::MllpCodec; -use bytes::BytesMut; -use futures::prelude::*; -use hl7v2_model::{Error, Message}; -use hl7v2_parser::parse; -use hl7v2_writer::write; -use std::net::SocketAddr; -use std::time::Duration; -use tokio::net::{TcpListener, TcpStream}; -use tokio::time::timeout; -use tokio_util::codec::Framed; - -/// Configuration for MLLP server -#[derive(Debug, Clone)] -pub struct MllpServerConfig { - /// Read timeout for connections - pub read_timeout: Duration, - /// Write timeout for connections - pub write_timeout: Duration, - /// Maximum frame size - pub max_frame_size: usize, - /// Backlog for the TCP listener - pub backlog: u32, - /// Maximum number of concurrent connections - pub max_concurrent_connections: usize, - /// ACK timing policy - pub ack_timing: AckTimingPolicy, -} - -impl Default for MllpServerConfig { - fn default() -> Self { - Self { - read_timeout: Duration::from_secs(30), - write_timeout: Duration::from_secs(30), - max_frame_size: 10 * 1024 * 1024, // 10MB - backlog: 128, - max_concurrent_connections: 100, - ack_timing: AckTimingPolicy::Immediate, - } - } -} - -/// ACK timing policy for MLLP connections -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum AckTimingPolicy { - /// Send ACK immediately after receiving message - Immediate, - /// Send ACK after a delay - Delayed(Duration), - /// Send ACK only when explicitly requested - OnDemand, -} - -/// Handler trait for processing incoming HL7 messages -pub trait MessageHandler: Send + Sync { - /// Process a message and optionally return an ACK message - fn handle_message( - &self, - message: Message, - ) -> impl std::future::Future, Error>> + Send; -} - -/// MLLP TCP server -pub struct MllpServer { - config: MllpServerConfig, - listener: Option, -} - -impl MllpServer { - /// Create a new MLLP server with the given configuration - pub fn new(config: MllpServerConfig) -> Self { - Self { - config, - listener: None, - } - } - - /// Create a new MLLP server with default configuration - pub fn with_default_config() -> Self { - Self::new(MllpServerConfig::default()) - } - - /// Bind to the given address - pub async fn bind(&mut self, addr: impl Into) -> Result<(), std::io::Error> { - let addr = addr.into(); - let listener = TcpListener::bind(addr).await?; - self.listener = Some(listener); - Ok(()) - } - - /// Get the local address the server is bound to - pub fn local_addr(&self) -> Result { - self.listener - .as_ref() - .ok_or_else(|| { - std::io::Error::new(std::io::ErrorKind::NotConnected, "Server not bound") - })? - .local_addr() - } - - /// Run the server, processing messages with the given handler - /// - /// This will accept connections and spawn a task for each connection. - /// Uses a semaphore to limit the number of concurrent connections. - pub async fn run( - &mut self, - handler: H, - ) -> Result<(), std::io::Error> { - let listener = self.listener.as_ref().ok_or_else(|| { - std::io::Error::new(std::io::ErrorKind::NotConnected, "Server not bound") - })?; - - let handler = std::sync::Arc::new(handler); - let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new( - self.config.max_concurrent_connections, - )); - - loop { - // Wait for a permit before accepting a new connection - // This provides backpressure at the TCP level - let permit = semaphore - .clone() - .acquire_owned() - .await - .map_err(|e| std::io::Error::other(format!("Semaphore error: {}", e)))?; - - let (stream, peer_addr) = listener.accept().await?; - let handler = handler.clone(); - let config = self.config.clone(); - - // Spawn a task to handle this connection - tokio::spawn(async move { - // The permit is held by this task and dropped when it finishes - let _permit = permit; - - if let Err(e) = handle_connection(stream, peer_addr, handler, config).await { - eprintln!("Error handling connection from {}: {}", peer_addr, e); - } - }); - } - } - - /// Accept a single connection and return a connection handler - pub async fn accept(&mut self) -> Result { - let listener = self.listener.as_ref().ok_or_else(|| { - std::io::Error::new(std::io::ErrorKind::NotConnected, "Server not bound") - })?; - - let (stream, peer_addr) = listener.accept().await?; - Ok(MllpConnection::new(stream, peer_addr, self.config.clone())) - } -} - -/// Handle a single TCP connection -async fn handle_connection( - stream: TcpStream, - peer_addr: SocketAddr, - handler: std::sync::Arc, - config: MllpServerConfig, -) -> Result<(), std::io::Error> { - let codec = MllpCodec::with_max_frame_size(config.max_frame_size); - let mut framed = Framed::new(stream, codec); - - while let Some(result) = framed.next().await { - match result { - Ok(frame) => { - // Apply read timeout - let parse_result = timeout(config.read_timeout, async { - parse(&frame).map_err(|e| { - std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()) - }) - }) - .await; - - let message = match parse_result { - Ok(Ok(msg)) => msg, - Ok(Err(e)) => { - eprintln!("Failed to parse message from {}: {}", peer_addr, e); - continue; - } - Err(_) => { - eprintln!("Timeout parsing message from {}", peer_addr); - continue; - } - }; - - // Handle the message - let ack = match handler.handle_message(message).await { - Ok(Some(ack)) => ack, - Ok(None) => continue, // No ACK requested - Err(e) => { - eprintln!("Error handling message from {}: {}", peer_addr, e); - continue; - } - }; - - // Apply ACK timing policy - match config.ack_timing { - AckTimingPolicy::Immediate => { - // Send ACK immediately - let ack_bytes = write(&ack); - if let Err(e) = framed.send(BytesMut::from(&ack_bytes[..])).await { - eprintln!("Failed to send ACK to {}: {}", peer_addr, e); - break; - } - } - AckTimingPolicy::Delayed(delay) => { - // Wait before sending ACK - tokio::time::sleep(delay).await; - let ack_bytes = write(&ack); - if let Err(e) = framed.send(BytesMut::from(&ack_bytes[..])).await { - eprintln!("Failed to send ACK to {}: {}", peer_addr, e); - break; - } - } - AckTimingPolicy::OnDemand => { - // Don't send ACK automatically - // The handler would need to send it through some other mechanism - } - } - } - Err(e) => { - eprintln!("Error reading frame from {}: {}", peer_addr, e); - break; - } - } - } - - Ok(()) -} - -/// A single MLLP connection (server side) -pub struct MllpConnection { - framed: Framed, - peer_addr: SocketAddr, - config: MllpServerConfig, -} - -impl MllpConnection { - /// Create a new connection handler - pub fn new(stream: TcpStream, peer_addr: SocketAddr, config: MllpServerConfig) -> Self { - let codec = MllpCodec::with_max_frame_size(config.max_frame_size); - let framed = Framed::new(stream, codec); - Self { - framed, - peer_addr, - config, - } - } - - /// Get the peer address - pub fn peer_addr(&self) -> SocketAddr { - self.peer_addr - } - - /// Receive a message from the connection - pub async fn receive_message(&mut self) -> Result, std::io::Error> { - match timeout(self.config.read_timeout, self.framed.next()).await { - Ok(Some(Ok(frame))) => { - let message = parse(&frame).map_err(|e| { - std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()) - })?; - Ok(Some(message)) - } - Ok(Some(Err(e))) => Err(e), - Ok(None) => Ok(None), // Connection closed - Err(_) => Err(std::io::Error::new( - std::io::ErrorKind::TimedOut, - "Read timeout", - )), - } - } - - /// Send a message through the connection - pub async fn send_message(&mut self, message: &Message) -> Result<(), std::io::Error> { - let bytes = write(message); - timeout( - self.config.write_timeout, - self.framed.send(BytesMut::from(&bytes[..])), - ) - .await - .map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "Write timeout"))??; - Ok(()) - } - - /// Close the connection - pub async fn close(self) -> Result<(), std::io::Error> { - // Get the underlying stream and drop it to close - let stream = self.framed.into_inner(); - drop(stream); - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[allow(dead_code)] - struct TestHandler; - - impl MessageHandler for TestHandler { - async fn handle_message(&self, _message: Message) -> Result, Error> { - // Simple ACK - just echo the message back - Ok(None) - } - } - - #[tokio::test] - async fn test_server_bind() { - use std::net::SocketAddr; - - let mut server = MllpServer::with_default_config(); - let bind_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); - let result = server.bind(bind_addr).await; - assert!(result.is_ok()); - - let addr = server.local_addr(); - assert!(addr.is_ok()); - } - - #[tokio::test] - async fn test_connection_timeout() { - let config = MllpServerConfig { - read_timeout: Duration::from_millis(100), - ..Default::default() - }; - - // This would require a more complex test setup with actual TCP connections - // For now, we just verify the config is set correctly - assert_eq!(config.read_timeout, Duration::from_millis(100)); - } -} diff --git a/crates/hl7v2-network/src/tests.rs b/crates/hl7v2-network/src/tests.rs deleted file mode 100644 index 90dd08f7..00000000 --- a/crates/hl7v2-network/src/tests.rs +++ /dev/null @@ -1,901 +0,0 @@ -//! Comprehensive unit tests for hl7v2-network crate. -//! -//! This module contains unit tests for: -//! - MLLP codec encoding/decoding -//! - Client connection handling -//! - Server lifecycle - -use bytes::BytesMut; -use std::time::Duration; -use tokio_util::codec::{Decoder, Encoder}; - -use super::client::{MllpClient, MllpClientBuilder, MllpClientConfig}; -use super::codec::MllpCodec; -use super::server::{AckTimingPolicy, MessageHandler, MllpServer, MllpServerConfig}; -use hl7v2_model::{Atom, Comp, Delims, Error, Field, Message, Rep, Segment}; - -/// MLLP frame start byte (vertical tab) -const MLLP_START: u8 = 0x0B; - -/// MLLP frame end byte 1 (file separator) -const MLLP_END_1: u8 = 0x1C; - -/// MLLP frame end byte 2 (carriage return) -const MLLP_END_2: u8 = 0x0D; - -// ============================================================================= -// Codec Unit Tests -// ============================================================================= - -mod codec_tests { - use super::*; - - /// Test basic encoding of a simple message - #[test] - fn test_encode_simple_message() { - let mut codec = MllpCodec::new(); - let mut dst = BytesMut::new(); - let msg = BytesMut::from("MSH|^~\\&|TEST\r"); - - codec.encode(msg, &mut dst).unwrap(); - - assert_eq!(dst[0], MLLP_START); - assert_eq!(dst[dst.len() - 2], MLLP_END_1); - assert_eq!(dst[dst.len() - 1], MLLP_END_2); - assert_eq!(&dst[1..dst.len() - 2], b"MSH|^~\\&|TEST\r"); - } - - /// Test encoding with slice input - #[test] - fn test_encode_slice() { - let mut codec = MllpCodec::new(); - let mut dst = BytesMut::new(); - let msg: &[u8] = b"MSH|^~\\&|TEST\r"; - - codec.encode(msg, &mut dst).unwrap(); - - assert_eq!(dst[0], MLLP_START); - assert_eq!(&dst[1..dst.len() - 2], b"MSH|^~\\&|TEST\r"); - } - - /// Test basic decoding of a complete frame - #[test] - fn test_decode_complete_frame() { - let mut codec = MllpCodec::new(); - let mut src = BytesMut::from(&b"\x0BMSH|^~\\&|TEST\r\x1C\x0D"[..]); - - let result = codec.decode(&mut src).unwrap(); - assert!(result.is_some()); - - let content = result.unwrap(); - assert_eq!(&content[..], b"MSH|^~\\&|TEST\r"); - } - - /// Test decoding incomplete frame returns None - #[test] - fn test_decode_incomplete_frame() { - let mut codec = MllpCodec::new(); - let mut src = BytesMut::from(&b"\x0BMSH|^~\\&|TEST\r"[..]); - - let result = codec.decode(&mut src).unwrap(); - assert!(result.is_none()); - } - - /// Test decoding with only start byte - #[test] - fn test_decode_only_start_byte() { - let mut codec = MllpCodec::new(); - let mut src = BytesMut::from(&b"\x0B"[..]); - - let result = codec.decode(&mut src).unwrap(); - assert!(result.is_none()); - } - - /// Test decoding with junk data before start byte - #[test] - fn test_decode_junk_before_start() { - let mut codec = MllpCodec::new(); - let mut src = BytesMut::from(&b"JUNK\x0BMSH|^~\\&|TEST\r\x1C\x0D"[..]); - - let result = codec.decode(&mut src).unwrap(); - assert!(result.is_some()); - - let content = result.unwrap(); - assert_eq!(&content[..], b"MSH|^~\\&|TEST\r"); - } - - /// Test decoding when no start byte present - #[test] - fn test_decode_no_start_byte() { - let mut codec = MllpCodec::new(); - let mut src = BytesMut::from(&b"MSH|^~\\&|TEST\r\x1C\x0D"[..]); - - let result = codec.decode(&mut src).unwrap(); - assert!(result.is_none()); - assert_eq!(src.len(), 0); // Should discard all data - } - - /// Test encoding message exceeding max frame size - #[test] - fn test_encode_exceeds_max_frame_size() { - let mut codec = MllpCodec::with_max_frame_size(10); - let mut dst = BytesMut::new(); - let large_msg = BytesMut::from(&b"12345678901"[..]); // 11 bytes, exceeds limit - - let result = codec.encode(large_msg, &mut dst); - assert!(result.is_err()); - } - - /// Test decoding frame exceeding max frame size - #[test] - fn test_decode_exceeds_max_frame_size() { - let mut codec = MllpCodec::with_max_frame_size(10); - let mut src = BytesMut::from(&b"\x0B12345678901\r\x1C\x0D"[..]); // 11 content bytes - - let result = codec.decode(&mut src); - assert!(result.is_err()); - } - - /// Test decoding multiple frames in sequence - #[test] - fn test_decode_multiple_frames() { - let mut codec = MllpCodec::new(); - let mut src = BytesMut::from(&b"\x0BMSG1\r\x1C\x0D\x0BMSG2\r\x1C\x0D"[..]); - - // Decode first frame - let result1 = codec.decode(&mut src).unwrap(); - assert!(result1.is_some()); - assert_eq!(&result1.unwrap()[..], b"MSG1\r"); - - // Decode second frame - let result2 = codec.decode(&mut src).unwrap(); - assert!(result2.is_some()); - assert_eq!(&result2.unwrap()[..], b"MSG2\r"); - - // No more frames - let result3 = codec.decode(&mut src).unwrap(); - assert!(result3.is_none()); - } - - /// Test decoding with partial end sequence - #[test] - fn test_decode_partial_end_sequence() { - let mut codec = MllpCodec::new(); - let mut src = BytesMut::from(&b"\x0BMSH|^~\\&|TEST\r\x1C"[..]); // Missing final 0x0D - - let result = codec.decode(&mut src).unwrap(); - assert!(result.is_none()); - } - - /// Test encoding empty message - #[test] - fn test_encode_empty_message() { - let mut codec = MllpCodec::new(); - let mut dst = BytesMut::new(); - let msg = BytesMut::new(); - - codec.encode(msg, &mut dst).unwrap(); - - assert_eq!(dst.len(), 3); // Start + 2 end bytes - assert_eq!(dst[0], MLLP_START); - assert_eq!(dst[1], MLLP_END_1); - assert_eq!(dst[2], MLLP_END_2); - } - - /// Test decoding empty content frame - #[test] - fn test_decode_empty_content() { - let mut codec = MllpCodec::new(); - let mut src = BytesMut::from(&b"\x0B\x1C\x0D"[..]); - - let result = codec.decode(&mut src).unwrap(); - assert!(result.is_some()); - - let content = result.unwrap(); - assert_eq!(content.len(), 0); - } - - /// Test codec roundtrip encode then decode - #[test] - fn test_codec_roundtrip() { - let mut codec = MllpCodec::new(); - let original = BytesMut::from("MSH|^~\\&|TEST|FACILITY\r"); - - // Encode - let mut encoded = BytesMut::new(); - codec.encode(original.clone(), &mut encoded).unwrap(); - - // Decode - let decoded = codec.decode(&mut encoded).unwrap(); - assert!(decoded.is_some()); - assert_eq!(decoded.unwrap(), original); - } - - /// Test decoding with content containing end byte values - #[test] - fn test_decode_content_with_special_bytes() { - let mut codec = MllpCodec::new(); - // Content contains 0x1C but not followed by 0x0D - let mut src = BytesMut::from(&b"\x0BMSH|test\x1Cdata\r\x1C\x0D"[..]); - - let result = codec.decode(&mut src).unwrap(); - assert!(result.is_some()); - - let content = result.unwrap(); - assert_eq!(&content[..], b"MSH|test\x1Cdata\r"); - } - - /// Test decoding large message near max size - #[test] - fn test_decode_near_max_size() { - let max_size = 100; - let mut codec = MllpCodec::with_max_frame_size(max_size); - - // Create message just under max size - let content: Vec = vec![b'X'; max_size - 1]; - let mut frame = vec![MLLP_START]; - frame.extend(&content); - frame.extend(&[MLLP_END_1, MLLP_END_2]); - let mut src = BytesMut::from(&frame[..]); - - let result = codec.decode(&mut src).unwrap(); - assert!(result.is_some()); - assert_eq!(result.unwrap().len(), max_size - 1); - } - - /// Test decoding buffer overflow protection - #[test] - fn test_decode_buffer_overflow_protection() { - let max_size = 10; - let mut codec = MllpCodec::with_max_frame_size(max_size); - - // Create incomplete frame that would exceed max size - let content: Vec = vec![b'X'; max_size + 5]; - let mut frame = vec![MLLP_START]; - frame.extend(&content); - // No end sequence - buffer should grow until limit - let mut src = BytesMut::from(&frame[..]); - - let result = codec.decode(&mut src); - assert!(result.is_err()); - } -} - -// ============================================================================= -// Client Unit Tests -// ============================================================================= - -mod client_tests { - use super::*; - - /// Test client builder creates client with correct config - #[test] - fn test_client_builder_configuration() { - let config = MllpClientConfig { - connect_timeout: Duration::from_secs(5), - read_timeout: Duration::from_secs(15), - write_timeout: Duration::from_secs(20), - max_frame_size: 1024 * 1024, - }; - let client = MllpClient::new(config); - - assert!(!client.is_connected()); - } - - /// Test client default configuration - #[test] - fn test_client_default_config() { - let config = MllpClientConfig::default(); - - assert_eq!(config.connect_timeout, Duration::from_secs(10)); - assert_eq!(config.read_timeout, Duration::from_secs(30)); - assert_eq!(config.write_timeout, Duration::from_secs(30)); - assert_eq!(config.max_frame_size, 10 * 1024 * 1024); - } - - /// Test client is not connected initially - #[test] - fn test_client_not_connected_initially() { - let client = MllpClient::with_default_config(); - assert!(!client.is_connected()); - assert!(client.peer_addr().is_none()); - } - - /// Test client builder default implementation - #[test] - fn test_client_builder_default() { - let builder = MllpClientBuilder::default(); - let client = builder.build(); - - assert!(!client.is_connected()); - } - - /// Test client with custom config - #[test] - fn test_client_custom_config() { - let config = MllpClientConfig { - connect_timeout: Duration::from_secs(2), - read_timeout: Duration::from_secs(5), - write_timeout: Duration::from_secs(5), - max_frame_size: 5000, - }; - - let client = MllpClient::new(config); - assert!(!client.is_connected()); - } - - /// Test connect timeout to non-routable address - #[tokio::test] - async fn test_client_connect_timeout() { - use std::net::SocketAddr; - - let mut client = MllpClientBuilder::new() - .connect_timeout(Duration::from_millis(1)) - .build(); - - // Try to connect to a non-routable address (should timeout) - let addr: SocketAddr = "192.0.2.1:2575".parse().unwrap(); - let result = client.connect(addr).await; - assert!(result.is_err()); - - if let Err(e) = result { - assert_eq!(e.kind(), std::io::ErrorKind::TimedOut); - } - } - - /// Test send_message fails when not connected - #[tokio::test] - async fn test_send_message_not_connected() { - let mut client = MllpClient::with_default_config(); - let message = create_test_message(); - - let result = client.send_message(&message).await; - assert!(result.is_err()); - - if let Err(e) = result { - assert_eq!(e.kind(), std::io::ErrorKind::NotConnected); - } - } - - /// Test send_message_no_ack fails when not connected - #[tokio::test] - async fn test_send_message_no_ack_not_connected() { - let mut client = MllpClient::with_default_config(); - let message = create_test_message(); - - let result = client.send_message_no_ack(&message).await; - assert!(result.is_err()); - } - - /// Test receive_message fails when not connected - #[tokio::test] - async fn test_receive_message_not_connected() { - let mut client = MllpClient::with_default_config(); - - let result = client.receive_message().await; - assert!(result.is_err()); - } - - /// Test close on unconnected client succeeds - #[tokio::test] - async fn test_close_unconnected_client() { - let client = MllpClient::with_default_config(); - let result = client.close().await; - assert!(result.is_ok()); - } - - /// Test disconnect on unconnected client succeeds - #[tokio::test] - async fn test_disconnect_unconnected_client() { - let mut client = MllpClient::with_default_config(); - let result = client.disconnect().await; - assert!(result.is_ok()); - } -} - -// ============================================================================= -// Server Unit Tests -// ============================================================================= - -mod server_tests { - use super::*; - use std::net::SocketAddr; - - /// Test server default configuration - #[test] - fn test_server_default_config() { - let config = MllpServerConfig::default(); - - assert_eq!(config.read_timeout, Duration::from_secs(30)); - assert_eq!(config.write_timeout, Duration::from_secs(30)); - assert_eq!(config.max_frame_size, 10 * 1024 * 1024); - assert_eq!(config.backlog, 128); - assert_eq!(config.ack_timing, AckTimingPolicy::Immediate); - } - - /// Test server creation with default config - #[test] - fn test_server_creation() { - let server = MllpServer::with_default_config(); - // Server created successfully - let _ = server; - } - - /// Test server bind to available port - #[tokio::test] - async fn test_server_bind() { - let mut server = MllpServer::with_default_config(); - let bind_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); - - let result = server.bind(bind_addr).await; - assert!(result.is_ok()); - - let addr = server.local_addr(); - assert!(addr.is_ok()); - assert_ne!(addr.unwrap().port(), 0); - } - - /// Test server local_addr fails when not bound - #[test] - fn test_server_local_addr_not_bound() { - let server = MllpServer::with_default_config(); - let result = server.local_addr(); - - assert!(result.is_err()); - if let Err(e) = result { - assert_eq!(e.kind(), std::io::ErrorKind::NotConnected); - } - } - - /// Test ACK timing policy variants - #[test] - fn test_ack_timing_policy() { - assert_eq!(AckTimingPolicy::Immediate, AckTimingPolicy::Immediate); - assert_ne!(AckTimingPolicy::Immediate, AckTimingPolicy::OnDemand); - - let delayed = AckTimingPolicy::Delayed(Duration::from_millis(100)); - assert!(matches!(delayed, AckTimingPolicy::Delayed(_))); - } - - /// Test server config with custom values - #[test] - fn test_server_custom_config() { - let config = MllpServerConfig { - read_timeout: Duration::from_secs(5), - write_timeout: Duration::from_secs(5), - max_frame_size: 1024, - backlog: 64, - max_concurrent_connections: 50, - ack_timing: AckTimingPolicy::Delayed(Duration::from_millis(50)), - }; - - assert_eq!(config.read_timeout, Duration::from_secs(5)); - assert_eq!(config.backlog, 64); - assert_eq!(config.max_concurrent_connections, 50); - assert!(matches!(config.ack_timing, AckTimingPolicy::Delayed(_))); - } - - /// Test connection timeout configuration - #[tokio::test] - async fn test_connection_timeout_config() { - let config = MllpServerConfig { - read_timeout: Duration::from_millis(100), - ..Default::default() - }; - - assert_eq!(config.read_timeout, Duration::from_millis(100)); - } - - /// Test message handler trait implementation - struct EchoHandler; - - impl MessageHandler for EchoHandler { - async fn handle_message(&self, message: Message) -> Result, Error> { - // Echo the message back as ACK - Ok(Some(message)) - } - } - - #[tokio::test] - async fn test_message_handler_echo() { - let handler = EchoHandler; - let message = create_test_message(); - - let result = handler.handle_message(message).await; - assert!(result.is_ok()); - assert!(result.unwrap().is_some()); - } - - /// Test message handler that returns None - struct SilentHandler; - - impl MessageHandler for SilentHandler { - async fn handle_message(&self, _message: Message) -> Result, Error> { - Ok(None) - } - } - - #[tokio::test] - async fn test_message_handler_silent() { - let handler = SilentHandler; - let message = create_test_message(); - - let result = handler.handle_message(message).await; - assert!(result.is_ok()); - assert!(result.unwrap().is_none()); - } - - /// Test message handler that returns error - struct ErrorHandler; - - impl MessageHandler for ErrorHandler { - async fn handle_message(&self, _message: Message) -> Result, Error> { - Err(Error::InvalidFieldFormat { - details: "Test error".to_string(), - }) - } - } - - #[tokio::test] - async fn test_message_handler_error() { - let handler = ErrorHandler; - let message = create_test_message(); - - let result = handler.handle_message(message).await; - assert!(result.is_err()); - } -} - -// ============================================================================= -// Property-Based Tests -// ============================================================================= - -#[cfg(test)] -mod property_tests { - use super::*; - use proptest::prelude::*; - - // Generate arbitrary valid HL7 message content (printable ASCII) - prop_compose! { - fn arb_message_content()(bytes in "[ -~]*") -> BytesMut { - BytesMut::from(bytes.as_bytes()) - } - } - - proptest! { - /// Test codec roundtrip with arbitrary content - #[test] - fn prop_codec_roundtrip(content in arb_message_content()) { - let mut codec = MllpCodec::new(); - let original = content; - - // Encode - let mut encoded = BytesMut::new(); - let encode_result = codec.encode(original.clone(), &mut encoded); - - // Only test if encoding succeeded - if encode_result.is_ok() { - // Decode - let decoded = codec.decode(&mut encoded); - - prop_assert!(decoded.is_ok()); - if let Ok(Some(decoded_content)) = decoded { - prop_assert_eq!(&decoded_content[..], &original[..]); - } - } - } - - /// Test encoding never panics with any byte sequence - #[test] - fn prop_encode_no_panic(bytes: Vec) { - let mut codec = MllpCodec::with_max_frame_size(10000); - let mut dst = BytesMut::new(); - let msg = BytesMut::from(&bytes[..]); - - // Should never panic, may return error for large messages - let _ = codec.encode(msg, &mut dst); - prop_assert!(true); - } - - /// Test decoding never panics with any byte sequence - #[test] - fn prop_decode_no_panic(bytes: Vec) { - let mut codec = MllpCodec::with_max_frame_size(10000); - let mut src = BytesMut::from(&bytes[..]); - - // Should never panic - let result = codec.decode(&mut src); - prop_assert!(result.is_ok() || result.is_err()); - } - - /// Test multiple messages can be encoded and decoded - #[test] - fn prop_multiple_messages_roundtrip(msgs: Vec) { - let mut codec = MllpCodec::new(); - let mut buffer = BytesMut::new(); - - // Encode all messages - for msg in &msgs { - let mut encoded = BytesMut::new(); - let _ = codec.encode(BytesMut::from(msg.as_bytes()), &mut encoded); - buffer.extend(encoded); - } - - // Decode all messages - let mut decoded_count = 0; - while let Ok(Some(_)) = codec.decode(&mut buffer) { - decoded_count += 1; - } - - prop_assert_eq!(decoded_count, msgs.len() as i32); - } - } -} - -// ============================================================================= -// Helper Functions -// ============================================================================= - -/// Create a simple test message for testing -fn create_test_message() -> Message { - Message { - delims: Delims::default(), - segments: vec![Segment { - id: *b"MSH", - fields: vec![ - Field { - reps: vec![Rep { - comps: vec![Comp { - subs: vec![Atom::Text("^~\\&".to_string())], - }], - }], - }, - Field { - reps: vec![Rep { - comps: vec![Comp { - subs: vec![Atom::Text("TEST".to_string())], - }], - }], - }, - ], - }], - charsets: vec![], - } -} - -// ============================================================================= -// Integration Tests (run with --test-threads=1 for network tests) -// ============================================================================= - -#[cfg(test)] -mod network_tests { - use super::*; - use std::net::SocketAddr; - use std::sync::Arc; - use tokio::sync::Notify; - - /// Test basic client-server communication - #[tokio::test] - async fn test_client_server_communication() { - struct TestHandler { - notify: Arc, - } - - impl MessageHandler for TestHandler { - async fn handle_message(&self, _message: Message) -> Result, Error> { - self.notify.notify_one(); - Ok(Some(create_test_message())) - } - } - - // Start server - let mut server = MllpServer::with_default_config(); - let bind_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); - server.bind(bind_addr).await.unwrap(); - let server_addr = server.local_addr().unwrap(); - - let notify = Arc::new(Notify::new()); - let handler = TestHandler { - notify: notify.clone(), - }; - - // Spawn server task - tokio::spawn(async move { - let _ = server.run(handler).await; - }); - - // Give server time to start - tokio::time::sleep(Duration::from_millis(100)).await; - - // Create client and connect - let mut client = MllpClientBuilder::new() - .connect_timeout(Duration::from_secs(5)) - .build(); - - client.connect(server_addr).await.unwrap(); - assert!(client.is_connected()); - - // Send a message - let message = create_test_message(); - let ack = client.send_message(&message).await.unwrap(); - - // Verify we got an ACK back - assert_eq!(ack.segments.len(), 1); - - // Close client - client.close().await.unwrap(); - } - - /// Test server handles multiple connections - #[tokio::test] - async fn test_server_multiple_connections() { - struct CountingHandler { - count: Arc, - } - - impl MessageHandler for CountingHandler { - async fn handle_message(&self, _message: Message) -> Result, Error> { - self.count.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - Ok(Some(create_test_message())) - } - } - - // Start server - let mut server = MllpServer::with_default_config(); - let bind_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); - server.bind(bind_addr).await.unwrap(); - let server_addr = server.local_addr().unwrap(); - - let count = Arc::new(std::sync::atomic::AtomicU32::new(0)); - let handler = CountingHandler { - count: count.clone(), - }; - - // Spawn server task - tokio::spawn(async move { - let _ = server.run(handler).await; - }); - - // Give server time to start - tokio::time::sleep(Duration::from_millis(100)).await; - - // Create multiple clients - let mut handles = vec![]; - for _ in 0..3 { - let addr = server_addr; - let handle = tokio::spawn(async move { - let mut client = MllpClientBuilder::new() - .connect_timeout(Duration::from_secs(5)) - .build(); - - client.connect(addr).await.unwrap(); - let message = create_test_message(); - let _ = client.send_message(&message).await; - let _ = client.close().await; - }); - handles.push(handle); - } - - // Wait for all clients - for handle in handles { - handle.await.unwrap(); - } - - // Give time for messages to be processed - tokio::time::sleep(Duration::from_millis(200)).await; - - // Verify all messages were handled - assert!(count.load(std::sync::atomic::Ordering::SeqCst) >= 3); - } - - /// Test server concurrency limit - #[tokio::test] - async fn test_server_concurrency_limit() { - struct SlowHandler { - active_count: Arc, - max_active: Arc, - } - - impl MessageHandler for SlowHandler { - async fn handle_message(&self, _message: Message) -> Result, Error> { - let current = self - .active_count - .fetch_add(1, std::sync::atomic::Ordering::SeqCst) - + 1; - - // Track max concurrent handlers active - let mut max = self.max_active.load(std::sync::atomic::Ordering::SeqCst); - while current > max { - match self.max_active.compare_exchange( - max, - current, - std::sync::atomic::Ordering::SeqCst, - std::sync::atomic::Ordering::SeqCst, - ) { - Ok(_) => break, - Err(actual) => max = actual, - } - } - - // Sleep to keep the connection/handler active - tokio::time::sleep(Duration::from_millis(200)).await; - - self.active_count - .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); - Ok(Some(create_test_message())) - } - } - - // Start server with limit of 2 concurrent connections - let config = MllpServerConfig { - max_concurrent_connections: 2, - ..Default::default() - }; - let mut server = MllpServer::new(config); - let bind_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); - server.bind(bind_addr).await.unwrap(); - let server_addr = server.local_addr().unwrap(); - - let active_count = Arc::new(std::sync::atomic::AtomicU32::new(0)); - let max_active = Arc::new(std::sync::atomic::AtomicU32::new(0)); - let handler = SlowHandler { - active_count: active_count.clone(), - max_active: max_active.clone(), - }; - - // Spawn server task - tokio::spawn(async move { - let _ = server.run(handler).await; - }); - - tokio::time::sleep(Duration::from_millis(100)).await; - - // Try to connect 5 clients - let mut handles = vec![]; - for _ in 0..5 { - let addr = server_addr; - let handle = tokio::spawn(async move { - let mut client = MllpClientBuilder::new() - .connect_timeout(Duration::from_secs(5)) - .build(); - - if client.connect(addr).await.is_ok() { - let message = create_test_message(); - let _ = client.send_message(&message).await; - let _ = client.close().await; - } - }); - handles.push(handle); - } - - // Wait for some time - tokio::time::sleep(Duration::from_millis(500)).await; - - // Verify that we never exceeded 2 concurrent handlers - assert!(max_active.load(std::sync::atomic::Ordering::SeqCst) <= 2); - } - - /// Test codec handles partial frames correctly - #[tokio::test] - async fn test_codec_partial_frames() { - let mut codec = MllpCodec::new(); - - // Simulate partial frame arrival - let part1 = BytesMut::from(&b"\x0BMSH"[..]); - let part2 = BytesMut::from(&b"|^~\\&\r\x1C\x0D"[..]); - - let mut buffer = part1; - - // First part should not decode - let result1 = codec.decode(&mut buffer).unwrap(); - assert!(result1.is_none()); - - // Add second part - buffer.extend(part2); - - // Now should decode - let result2 = codec.decode(&mut buffer).unwrap(); - assert!(result2.is_some()); - assert_eq!(&result2.unwrap()[..], b"MSH|^~\\&\r"); - } -} diff --git a/crates/hl7v2-network/tests/common/mod.rs b/crates/hl7v2-network/tests/common/mod.rs index 91ad82dc..7ea97639 100644 --- a/crates/hl7v2-network/tests/common/mod.rs +++ b/crates/hl7v2-network/tests/common/mod.rs @@ -5,7 +5,7 @@ #![allow(dead_code)] -use hl7v2_model::{Atom, Comp, Delims, Field, Message, Rep, Segment}; +use hl7v2::{Atom, Comp, Delims, Error, Field, Message, Rep, Segment}; use hl7v2_network::{MessageHandler, MllpClient, MllpClientBuilder, MllpServer, MllpServerConfig}; use std::net::SocketAddr; use std::sync::Arc; @@ -164,10 +164,7 @@ impl EchoHandler { } impl MessageHandler for EchoHandler { - async fn handle_message( - &self, - message: Message, - ) -> Result, hl7v2_model::Error> { + async fn handle_message(&self, message: Message) -> Result, Error> { self.notify.notify_one(); Ok(Some(message)) } @@ -185,10 +182,7 @@ impl AckHandler { } impl MessageHandler for AckHandler { - async fn handle_message( - &self, - _message: Message, - ) -> Result, hl7v2_model::Error> { + async fn handle_message(&self, _message: Message) -> Result, Error> { self.notify.notify_one(); // Create a simple ACK Ok(Some(create_test_message())) @@ -209,10 +203,7 @@ impl DelayedHandler { } impl MessageHandler for DelayedHandler { - async fn handle_message( - &self, - message: Message, - ) -> Result, hl7v2_model::Error> { + async fn handle_message(&self, message: Message) -> Result, Error> { tokio::time::sleep(self.delay).await; self.notify.notify_one(); Ok(Some(message)) @@ -231,10 +222,7 @@ impl CountingHandler { } impl MessageHandler for CountingHandler { - async fn handle_message( - &self, - _message: Message, - ) -> Result, hl7v2_model::Error> { + async fn handle_message(&self, _message: Message) -> Result, Error> { self.count.fetch_add(1, std::sync::atomic::Ordering::SeqCst); Ok(Some(create_test_message())) } @@ -244,11 +232,8 @@ impl MessageHandler for CountingHandler { pub struct ErrorHandler; impl MessageHandler for ErrorHandler { - async fn handle_message( - &self, - _message: Message, - ) -> Result, hl7v2_model::Error> { - Err(hl7v2_model::Error::InvalidFieldFormat { + async fn handle_message(&self, _message: Message) -> Result, Error> { + Err(Error::InvalidFieldFormat { details: "Test error".to_string(), }) } @@ -258,10 +243,7 @@ impl MessageHandler for ErrorHandler { pub struct SilentHandler; impl MessageHandler for SilentHandler { - async fn handle_message( - &self, - _message: Message, - ) -> Result, hl7v2_model::Error> { + async fn handle_message(&self, _message: Message) -> Result, Error> { Ok(None) } }