feat(transport): event-driven transport adapters (#172)#178
feat(transport): event-driven transport adapters (#172)#178
Conversation
Made-with: Cursor
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdded event-driven TCP and UDP transports and socket-adapter interfaces, plus multicast transport interface, implementing callback-driven I/O, TCP stream reassembly/resynchronization, UDP multicast management, lifecycle control, and unit tests. Changes
Sequence Diagram(s)sequenceDiagram
actor App as Application
participant Trans as EventDrivenTcpTransport
participant Adapt as ITcpSocketAdapter
participant Listener as ITransportListener
App->>Trans: initialize(local_endpoint)
Trans->>Adapt: open(local_endpoint)
Adapt-->>Trans: Result::SUCCESS
App->>Trans: start()
Trans->>Adapt: set_receive_callback(...)
Trans->>Adapt: set_connected_callback(...)
Trans->>Adapt: set_disconnected_callback(...)
Adapt->>Trans: on_adapter_connected(remote)
Trans->>Listener: on_connection_established(remote)
Adapt->>Trans: on_adapter_receive(bytes)
Trans->>Trans: append bytes to receive_buffer_
Trans->>Trans: parse_message_from_buffer()
Trans-->>Trans: enqueue MessagePtr
Trans->>Listener: on_message_received(message, remote)
Adapt->>Trans: on_adapter_disconnected()
Trans->>Listener: on_connection_lost()
sequenceDiagram
actor App as Application
participant Trans as EventDrivenUdpTransport
participant Adapt as IUdpSocketAdapter
participant Listener as ITransportListener
App->>Trans: start()
Trans->>Adapt: set_receive_callback(...)
Trans->>Adapt: open(local_endpoint)
Adapt-->>Trans: Result::SUCCESS
Adapt->>Trans: on_adapter_receive(datagram, sender)
Trans->>Trans: allocate Message & deserialize
alt valid
Trans->>Listener: on_message_received(message, sender)
Trans-->>App: message available via receive_message()
else invalid
Trans->>Listener: on_error(INVALID_MESSAGE)
end
App->>Trans: connect(multicast_endpoint)
Trans->>Trans: is_multicast_ipv4()
Trans->>Adapt: join_multicast(address, interface)
Adapt-->>Trans: Result::SUCCESS
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related issues
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 10
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@include/transport/event_driven_tcp_transport.h`:
- Line 82: The field server_mode_ is a plain bool accessed concurrently (read in
connect() and try_accept_connection(), written in enable_server_mode() and
stop()) and should be made thread-safe like running_ and initialized_; change
server_mode_ to std::atomic<bool> (or protect all accesses with the same mutex
used for running_/initialized_), update any direct reads/writes in connect(),
try_accept_connection(), enable_server_mode(), and stop() to use atomic
load/store (or guarded access), and ensure header includes <atomic> so there are
no data races.
- Around line 47-55: The three non-polymorphic setup methods on
EventDrivenTcpTransport (initialize, enable_server_mode, try_accept_connection)
prevent correct usage via ITransport*; change EventDrivenTcpTransport to accept
the required local_endpoint in its constructor (mirror EventDrivenUdpTransport's
pattern), remove/privatize the standalone initialize() to avoid misuse, and keep
enable_server_mode/try_accept_connection as concrete-type-only APIs but add a
clear comment on EventDrivenTcpTransport explaining they are not part of
ITransport and must be invoked only when using the concrete class; update all
callers to construct with the Endpoint and to call server-mode methods on the
concrete type.
In `@src/transport/event_driven_tcp_transport.cpp`:
- Around line 131-133: set_listener writes to listener_ without synchronization
while callbacks like on_adapter_receive and other adapter callbacks read it,
causing a data race; fix by making listener_ an std::atomic<ITransportListener*>
or guarding all accesses (in set_listener, on_adapter_receive, etc.) with the
same mutex (e.g., listener_mutex) and update the variable access sites
(listener_, set_listener(), on_adapter_receive(), and other methods that read
listener_) to use atomic loads/stores or lock/unlock the mutex consistently.
- Around line 211-218: The disconnect handler clears receive_buffer_ without
synchronization, risking concurrent access with on_adapter_receive(); modify
EventDrivenTcpTransport::on_adapter_disconnected() to acquire the same
queue_mutex_ (e.g., via std::lock_guard or std::unique_lock on queue_mutex_)
before calling receive_buffer_.clear() and any other accesses to shared state
(like initialized_ or connection_remote_ if they are also accessed elsewhere) so
the buffer mutation follows the same locking discipline as on_adapter_receive();
keep locking minimal and consistent to avoid deadlocks and ensure
listener_->on_connection_lost(lost) is invoked after releasing the lock if the
listener may call back into this object.
- Around line 151-172: In EventDrivenTcpTransport::stop(), set running_ = false
immediately at the start (or at least before you clear callbacks and call
adapter_.close()) so is_running() no longer reports true while the transport is
being torn down; move the running_ = false assignment before the
adapter_.set_receive_callback(), set_connected_callback(),
set_disconnected_callback() and adapter_.close() calls (keep the rest of the
teardown: listener_ = nullptr,
initialized_/server_mode_/receive_buffer_/message_queue_ cleanup intact) to
match the behavior of EventDrivenUdpTransport::stop().
In `@src/transport/event_driven_udp_transport.cpp`:
- Around line 96-98: The set_listener write to listener_ is unsynchronized while
on_adapter_receive reads it, causing a potential data race/use-after-free (e.g.,
set_listener(nullptr) racing with on_adapter_receive); fix by either making
listener_ an std::atomic<ITransportListener*> and use atomic load/store in
EventDrivenUdpTransport::set_listener and on_adapter_receive, or protect all
accesses to listener_ (both EventDrivenUdpTransport::set_listener and the read
sites in on_adapter_receive) with the existing mutex (lock the same mutex around
assignment and around the read/use) to ensure safe publication and no torn
reads.
- Around line 27-31: The constructor currently ignores invalid local_endpoint_
when exceptions are disabled, leaving EventDrivenUdpTransport in an unusable
state; modify creation to perform an explicit validity check and surface failure
instead of silently continuing — e.g., add a static factory method like
EventDrivenUdpTransport::Create(...) that checks local_endpoint_.is_valid() and
returns std::optional<EventDrivenUdpTransport> or a Result/error code, or change
the function that constructs EventDrivenUdpTransport to return a bool/error on
invalid endpoint; ensure all call sites use the new factory/return value and
remove the reliance on throwing within the constructor.
- Around line 163-169: platform::allocate_message() can return nullptr on
low-memory embedded platforms, so add a null check right after MessagePtr
message = platform::allocate_message();: if message is nullptr then (if
listener_) call listener_->on_error(Result::NO_MEMORY) (or the appropriate
out-of-memory Result enum used in the project) and return; only call
message->deserialize(data) after confirming message is non-null. Update the
block around MessagePtr/message, deserialize, listener_->on_error and Result
enum usage accordingly.
In `@tests/CMakeLists.txt`:
- Line 125: Add a parallel test target for the TCP transport by updating
tests/CMakeLists.txt to build and register the TCP test executable just like the
UDP one: ensure you add a test source target for
test_event_driven_tcp_transport.cpp (matching how
test_event_driven_udp_transport is added) and add a corresponding add_test(NAME
EventDrivenTcpTransportTest COMMAND test_event_driven_tcp_transport) entry so
the test runner will execute it; locate the existing add_test for
EventDrivenUdpTransportTest and mirror its add_executable/add_library and
add_test pattern for the TCP test.
In `@tests/test_event_driven_udp_transport.cpp`:
- Around line 57-60: received_messages_ is modified under mutex_ but tests read
it directly, risking data races if callbacks become async; add a mutex-guarded
accessor that returns a copy (e.g., GetReceivedMessagesSnapshot() or
snapshotReceivedMessages()) and use that in tests instead of direct access, keep
all writes using the existing mutex_ (the same mutex_ used where
received_messages_ is modified), and similarly replace any direct reads of
shared state referenced around last_error_ and error_count_ with mutex-protected
getters or snapshot accessors to ensure safe access from tests.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 813e1013-64d0-4cc9-91cf-da9e49c323e6
📒 Files selected for processing (9)
include/transport/event_driven_tcp_transport.hinclude/transport/event_driven_udp_transport.hinclude/transport/tcp_socket_adapter.hinclude/transport/udp_socket_adapter.hsrc/CMakeLists.txtsrc/transport/event_driven_tcp_transport.cppsrc/transport/event_driven_udp_transport.cpptests/CMakeLists.txttests/test_event_driven_udp_transport.cpp
| [[nodiscard]] Result initialize(const Endpoint& local_endpoint); | ||
|
|
||
| [[nodiscard]] Result enable_server_mode(int backlog = 5); | ||
|
|
||
| /** | ||
| * @brief Non-blocking accept attempt (server mode). Adapter should invoke | ||
| * connected/disconnected callbacks when the connection is ready or lost. | ||
| */ | ||
| [[nodiscard]] Result try_accept_connection(Endpoint& remote_out); |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Non-polymorphic initialization methods may cause usage errors.
initialize(), enable_server_mode(), and try_accept_connection() are not part of the ITransport interface. Code using ITransport* polymorphically cannot call these required setup methods, potentially leading to NOT_INITIALIZED errors at runtime.
Consider either:
- Accepting
local_endpointin the constructor (matchingEventDrivenUdpTransport) - Documenting that this transport requires concrete type usage for setup
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@include/transport/event_driven_tcp_transport.h` around lines 47 - 55, The
three non-polymorphic setup methods on EventDrivenTcpTransport (initialize,
enable_server_mode, try_accept_connection) prevent correct usage via
ITransport*; change EventDrivenTcpTransport to accept the required
local_endpoint in its constructor (mirror EventDrivenUdpTransport's pattern),
remove/privatize the standalone initialize() to avoid misuse, and keep
enable_server_mode/try_accept_connection as concrete-type-only APIs but add a
clear comment on EventDrivenTcpTransport explaining they are not part of
ITransport and must be invoked only when using the concrete class; update all
callers to construct with the Endpoint and to call server-mode methods on the
concrete type.
|
|
||
| std::atomic<bool> running_{false}; | ||
| std::atomic<bool> initialized_{false}; | ||
| bool server_mode_{false}; |
There was a problem hiding this comment.
server_mode_ should be atomic or protected.
server_mode_ is a plain bool that can be read in connect() and try_accept_connection() while potentially being written in enable_server_mode() or stop() from different threads. Consider using std::atomic<bool> for consistency with running_ and initialized_.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@include/transport/event_driven_tcp_transport.h` at line 82, The field
server_mode_ is a plain bool accessed concurrently (read in connect() and
try_accept_connection(), written in enable_server_mode() and stop()) and should
be made thread-safe like running_ and initialized_; change server_mode_ to
std::atomic<bool> (or protect all accesses with the same mutex used for
running_/initialized_), update any direct reads/writes in connect(),
try_accept_connection(), enable_server_mode(), and stop() to use atomic
load/store (or guarded access), and ensure header includes <atomic> so there are
no data races.
| if (!local_endpoint_.is_valid()) { | ||
| #if defined(__cpp_exceptions) || defined(__EXCEPTIONS) | ||
| throw std::invalid_argument("Invalid local endpoint"); | ||
| #endif | ||
| } |
There was a problem hiding this comment.
Silent failure when exceptions are disabled.
When __cpp_exceptions is not defined, an invalid local_endpoint is silently accepted, leaving the transport in an unusable state. Consider returning an error from a factory method or adding an is_valid() check.
💡 Alternative: Add validity check
+ bool is_valid() const { return local_endpoint_.is_valid(); }Or use a static factory method that returns std::optional<EventDrivenUdpTransport> or Result.
🧰 Tools
🪛 Cppcheck (2.20.0)
[information] 28-28: Include file
(missingInclude)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/transport/event_driven_udp_transport.cpp` around lines 27 - 31, The
constructor currently ignores invalid local_endpoint_ when exceptions are
disabled, leaving EventDrivenUdpTransport in an unusable state; modify creation
to perform an explicit validity check and surface failure instead of silently
continuing — e.g., add a static factory method like
EventDrivenUdpTransport::Create(...) that checks local_endpoint_.is_valid() and
returns std::optional<EventDrivenUdpTransport> or a Result/error code, or change
the function that constructs EventDrivenUdpTransport to return a bool/error on
invalid endpoint; ensure all call sites use the new factory/return value and
remove the reliance on throwing within the constructor.
- Make listener_ atomic in both UDP and TCP event-driven transports to prevent data races between set_listener() and callback invocations - Make server_mode_ atomic in TCP transport for thread-safe access - Move running_ = false to start of stop() in TCP transport to avoid inconsistent state window where is_running() returns true during teardown - Guard receive_buffer_.clear() in TCP disconnect handler with queue_mutex_ to prevent concurrent access with on_adapter_receive() - Add null check for allocate_message() in UDP transport to handle memory-constrained platforms gracefully (reports OUT_OF_MEMORY) - Add documentation comments for TCP transport's non-polymorphic setup methods (initialize, enable_server_mode, try_accept_connection) Made-with: Cursor
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@include/transport/event_driven_udp_transport.h`:
- Around line 60-61: The SD code expects multicast control via
transport::UdpTransport but join_multicast_group/leave_multicast_group are only
on EventDrivenUdpTransport; add a UDP-specific interface (e.g., an abstract
IUdpTransport with virtual Result join_multicast_group(const std::string&) and
leave_multicast_group(const std::string&)) and make both transport::UdpTransport
and EventDrivenUdpTransport implement it, then update callers that currently
downcast ITransport to transport::UdpTransport to instead query/cast to
IUdpTransport and call
IUdpTransport::join_multicast_group/leave_multicast_group; alternatively (if you
prefer caller changes) update the SD callers to detect EventDrivenUdpTransport
as well and invoke its join/leave methods — ensure symbols to change include
join_multicast_group, leave_multicast_group, EventDrivenUdpTransport,
transport::UdpTransport and the SD caller that currently downcasts ITransport.
In `@src/transport/event_driven_tcp_transport.cpp`:
- Around line 213-223: In on_adapter_disconnected(), don't reset initialized_
(which indicates local socket readiness) — remove the line that sets
initialized_ = false; instead model remote peer state separately (e.g., add or
use a member like peer_connected_ / connection_active_ or clear
connection_remote_) and update that flag so connect() / ITransport flows still
see the transport as initialized; ensure receive_buffer_ is cleared and
listener_->on_connection_lost(connection_remote_) is called as before, and
adjust connect(), initialize(), and any logic that checks initialized_ to use
the new peer state flag for peer connectivity semantics.
- Around line 151-157: stop() currently clears the stored listener
(listener_.store(nullptr, ...)) which causes callbacks to be permanently lost
after stop/start; remove the line that nulls listener_ (or stop clearing it) so
the configured listener set via set_listener() remains intact across
stop()/start() cycles, and ensure EventDrivenTcpTransport::start() continues to
rely on the existing listener_ rather than expecting set_listener() to be called
again.
- Around line 110-116: disconnect() races with on_adapter_receive() because
receive_buffer_ is cleared without holding queue_mutex_, risking concurrent
mutation; fix by acquiring queue_mutex_ in EventDrivenTcpTransport::disconnect()
before touching receive_buffer_ (and while unregistering/closing the adapter if
callbacks remain). Specifically, call adapter_.close() or clear the receive
callback, then lock queue_mutex_ and perform receive_buffer_.clear(), and set
initialized_ (initialized_.store(false)) while still holding the lock to ensure
no on_adapter_receive() runs concurrently and mutates the buffer.
- Around line 240-255: The resync currently begins at SOMEIP_HEADER_SIZE and
accepts any non-zero 4-byte word, which can skip a valid header; change the
recovery loop in event_driven_tcp_transport.cpp to scan byte-by-byte starting at
offset 1 (search_start = 1) and for each candidate position validate the full
SOME/IP header (e.g., read the candidate message id and the candidate length
field and ensure the length is between 8 and MAX_MESSAGE_SIZE and the buffer
contains at least that many bytes) before erasing the prefix from buffer; update
the code paths that use length_from_client_id and SOMEIP_HEADER_SIZE so the
resync only commits when both a plausible msg id and a valid length are
confirmed.
In `@src/transport/event_driven_udp_transport.cpp`:
- Around line 121-128: The stop() method is clearing listener_ which causes
on_message_received()/on_error() to be lost after restart because start() never
restores it; update EventDrivenUdpTransport::stop() to not reset or store
nullptr into listener_ (remove the listener_.store(nullptr, ...) call) and rely
on running_ = false and adapter_.set_receive_callback(nullptr) to suppress
post-stop delivery so that a subsequent start() will reuse the previously set
listener (set_listener()) without requiring the caller to re-register it.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: df4b2c3d-9896-4431-a96e-291c2c11e4da
📒 Files selected for processing (4)
include/transport/event_driven_tcp_transport.hinclude/transport/event_driven_udp_transport.hsrc/transport/event_driven_tcp_transport.cppsrc/transport/event_driven_udp_transport.cpp
| if (length_from_client_id < 8 || length_from_client_id > MAX_MESSAGE_SIZE) { | ||
| size_t search_start = SOMEIP_HEADER_SIZE; | ||
| bool found_valid_header = false; | ||
|
|
||
| while (search_start + SOMEIP_HEADER_SIZE <= buffer.size()) { | ||
| uint32_t potential_msg_id = (static_cast<uint32_t>(buffer[search_start]) << 24) | | ||
| (static_cast<uint32_t>(buffer[search_start + 1]) << 16) | | ||
| (static_cast<uint32_t>(buffer[search_start + 2]) << 8) | | ||
| static_cast<uint32_t>(buffer[search_start + 3]); | ||
| if (potential_msg_id != 0) { | ||
| buffer.erase(buffer.begin(), buffer.begin() + static_cast<std::ptrdiff_t>(search_start)); | ||
| found_valid_header = true; | ||
| break; | ||
| } | ||
| search_start++; | ||
| } |
There was a problem hiding this comment.
The resync heuristic skips valid headers after short misalignment.
When the length field is invalid, the recovery loop starts at SOMEIP_HEADER_SIZE and accepts any non-zero 4-byte word as a header. If the stream is off by 1-15 bytes, a real SOME/IP header earlier in the buffer is skipped, and a single malformed byte can make the parser discard later valid frames. Start scanning at the next byte and validate the candidate length field before resyncing.
🛠️ Safer resync sketch
- size_t search_start = SOMEIP_HEADER_SIZE;
+ size_t search_start = 1;
bool found_valid_header = false;
while (search_start + SOMEIP_HEADER_SIZE <= buffer.size()) {
- uint32_t potential_msg_id = (static_cast<uint32_t>(buffer[search_start]) << 24) |
- (static_cast<uint32_t>(buffer[search_start + 1]) << 16) |
- (static_cast<uint32_t>(buffer[search_start + 2]) << 8) |
- static_cast<uint32_t>(buffer[search_start + 3]);
- if (potential_msg_id != 0) {
+ uint32_t candidate_length =
+ (static_cast<uint32_t>(buffer[search_start + 4]) << 24) |
+ (static_cast<uint32_t>(buffer[search_start + 5]) << 16) |
+ (static_cast<uint32_t>(buffer[search_start + 6]) << 8) |
+ static_cast<uint32_t>(buffer[search_start + 7]);
+ if (candidate_length >= 8 && candidate_length <= MAX_MESSAGE_SIZE) {
buffer.erase(buffer.begin(), buffer.begin() + static_cast<std::ptrdiff_t>(search_start));
found_valid_header = true;
break;
}
- search_start++;
+ ++search_start;
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/transport/event_driven_tcp_transport.cpp` around lines 240 - 255, The
resync currently begins at SOMEIP_HEADER_SIZE and accepts any non-zero 4-byte
word, which can skip a valid header; change the recovery loop in
event_driven_tcp_transport.cpp to scan byte-by-byte starting at offset 1
(search_start = 1) and for each candidate position validate the full SOME/IP
header (e.g., read the candidate message id and the candidate length field and
ensure the length is between 8 and MAX_MESSAGE_SIZE and the buffer contains at
least that many bytes) before erasing the prefix from buffer; update the code
paths that use length_from_client_id and SOMEIP_HEADER_SIZE so the resync only
commits when both a plausible msg id and a valid length are confirmed.
- Preserve listener_ across stop()/start() cycles in both UDP and TCP transports — running_ = false already suppresses post-stop delivery - Guard receive_buffer_.clear() in disconnect() with queue_mutex_ to prevent concurrent access with on_adapter_receive() - Don't reset initialized_ in on_adapter_disconnected() — initialized_ reflects local socket state, not peer connection; allows reconnect without requiring re-initialize() - Fix TCP stream resync heuristic: start scanning at offset 1 (not SOMEIP_HEADER_SIZE) and validate the candidate length field instead of accepting any non-zero service ID Made-with: Cursor
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/transport/event_driven_tcp_transport.cpp`:
- Around line 30-33: The destructor and stop() currently unregister callbacks
and close the adapter while adapter callbacks capture raw this (see
EventDrivenTcpTransport::~EventDrivenTcpTransport and the lambda captures at
lines ~146-149, ~154-173), which can let late events call member handlers after
destruction; fix by routing all adapter callbacks through a ref-counted shared
state or shutdown token instead of capturing this directly: create a
shared_ptr<State> (or shared shutdown token) owned by EventDrivenTcpTransport,
have set_*_callback lambdas capture a weak_ptr<State>, inside each on_adapter_*
lambda lock the weak_ptr and early-return if expired or shutdown, and in
stop()/~EventDrivenTcpTransport set the shutdown flag, reset adapter callbacks,
call adapter->close(), then wait for the state refcount/condition variable to
indicate no in-flight callbacks before returning/destruction (do not rely on
adapter to drain callbacks per tcp_socket_adapter.h contract).
- Around line 241-279: The helper currently returns false after modifying buffer
(when resyncing past invalid headers or after dropping malformed frames), which
stops on_adapter_receive()'s loop and can leave a valid SOME/IP frame behind;
modify the helper to loop internally: wrap the parsing logic so that when you
erase bytes during the resync branch (the search using SOMEIP_HEADER_SIZE,
candidate_length and buffer.erase) you continue the loop to attempt parsing the
next header, and likewise when you detect a complete but malformed frame (after
constructing message_data, calling platform::allocate_message() and
message->deserialize(...) returning false) erase that frame from buffer and
continue parsing instead of returning false; only return false when buffer is
genuinely incomplete (e.g. buffer.size() < total_message_size) or no progress
can be made, and return true immediately when a valid message is allocated and
deserialized.
In `@src/transport/event_driven_udp_transport.cpp`:
- Around line 139-141: The is_running() method currently returns the atomic
running_ via implicit conversion; change it to perform an explicit atomic read
by returning running_.load() so it matches other atomic reads (e.g., places that
use running_.load()) and makes the intent clear; update the
EventDrivenUdpTransport::is_running() function to use running_.load() for
consistency.
- Around line 51-53: The empty conditional checking config_.max_message_size
against data.size() silently allows oversized messages; instead enforce the
limit by returning an error (or failing the send) and logging when data.size() >
config_.max_message_size in the same place where that check occurs (the
send/sendPacket method in event_driven_udp_transport.cpp that references
config_.max_message_size and data.size()); remove the no-op block and replace it
with an early return/failure path that logs a clear message including both
config_.max_message_size and data.size() (or, if you prefer a softer approach,
delete the block and add a TODO comment noting TP segmentation is pending and
that no enforcement is currently performed).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: ad2a7186-46a8-4a1f-892d-33df451ffe9e
📒 Files selected for processing (2)
src/transport/event_driven_tcp_transport.cppsrc/transport/event_driven_udp_transport.cpp
| if (length_from_client_id < 8 || length_from_client_id > MAX_MESSAGE_SIZE) { | ||
| size_t search_start = 1; | ||
| bool found_valid_header = false; | ||
|
|
||
| while (search_start + SOMEIP_HEADER_SIZE <= buffer.size()) { | ||
| uint32_t candidate_length = | ||
| (static_cast<uint32_t>(buffer[search_start + 4]) << 24) | | ||
| (static_cast<uint32_t>(buffer[search_start + 5]) << 16) | | ||
| (static_cast<uint32_t>(buffer[search_start + 6]) << 8) | | ||
| static_cast<uint32_t>(buffer[search_start + 7]); | ||
| if (candidate_length >= 8 && candidate_length <= MAX_MESSAGE_SIZE) { | ||
| buffer.erase(buffer.begin(), buffer.begin() + static_cast<std::ptrdiff_t>(search_start)); | ||
| found_valid_header = true; | ||
| break; | ||
| } | ||
| ++search_start; | ||
| } | ||
|
|
||
| if (!found_valid_header) { | ||
| buffer.clear(); | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| size_t total_message_size = 8 + length_from_client_id; | ||
|
|
||
| if (buffer.size() < total_message_size) { | ||
| return false; | ||
| } | ||
|
|
||
| std::vector<uint8_t> message_data(buffer.begin(), buffer.begin() + static_cast<std::ptrdiff_t>(total_message_size)); | ||
| buffer.erase(buffer.begin(), buffer.begin() + static_cast<std::ptrdiff_t>(total_message_size)); | ||
|
|
||
| message = platform::allocate_message(); | ||
| if (message && message->deserialize(message_data)) { | ||
| return true; | ||
| } | ||
|
|
||
| return false; |
There was a problem hiding this comment.
Recovery paths should keep scanning the current buffer.
After Lines 252-254 and again after Lines 271-279, this helper has already removed bytes from buffer but still returns false. Since on_adapter_receive() stops its while loop on false, a complete valid SOME/IP frame already buffered behind the discarded bytes is deferred until another read and can be lost permanently if no more data arrives. Convert this helper to loop internally and continue after successful resync or after dropping a malformed complete frame.
🛠️ Possible fix
bool EventDrivenTcpTransport::parse_message_from_buffer(std::vector<uint8_t>& buffer, MessagePtr& message) {
- if (buffer.size() > config_.max_receive_buffer) {
- buffer.clear();
- return false;
- }
-
- if (buffer.size() < SOMEIP_HEADER_SIZE) {
- return false;
- }
-
- uint32_t length_from_client_id =
- (static_cast<uint32_t>(buffer[4]) << 24) | (static_cast<uint32_t>(buffer[5]) << 16) |
- (static_cast<uint32_t>(buffer[6]) << 8) | static_cast<uint32_t>(buffer[7]);
-
- if (length_from_client_id < 8 || length_from_client_id > MAX_MESSAGE_SIZE) {
- size_t search_start = 1;
- bool found_valid_header = false;
-
- while (search_start + SOMEIP_HEADER_SIZE <= buffer.size()) {
- uint32_t candidate_length =
- (static_cast<uint32_t>(buffer[search_start + 4]) << 24) |
- (static_cast<uint32_t>(buffer[search_start + 5]) << 16) |
- (static_cast<uint32_t>(buffer[search_start + 6]) << 8) |
- static_cast<uint32_t>(buffer[search_start + 7]);
- if (candidate_length >= 8 && candidate_length <= MAX_MESSAGE_SIZE) {
- buffer.erase(buffer.begin(), buffer.begin() + static_cast<std::ptrdiff_t>(search_start));
- found_valid_header = true;
- break;
- }
- ++search_start;
- }
-
- if (!found_valid_header) {
- buffer.clear();
- }
- return false;
- }
-
- size_t total_message_size = 8 + length_from_client_id;
-
- if (buffer.size() < total_message_size) {
- return false;
- }
-
- std::vector<uint8_t> message_data(buffer.begin(), buffer.begin() + static_cast<std::ptrdiff_t>(total_message_size));
- buffer.erase(buffer.begin(), buffer.begin() + static_cast<std::ptrdiff_t>(total_message_size));
-
- message = platform::allocate_message();
- if (message && message->deserialize(message_data)) {
- return true;
- }
-
- return false;
+ for (;;) {
+ if (buffer.size() > config_.max_receive_buffer) {
+ buffer.clear();
+ return false;
+ }
+
+ if (buffer.size() < SOMEIP_HEADER_SIZE) {
+ return false;
+ }
+
+ uint32_t length_from_client_id =
+ (static_cast<uint32_t>(buffer[4]) << 24) | (static_cast<uint32_t>(buffer[5]) << 16) |
+ (static_cast<uint32_t>(buffer[6]) << 8) | static_cast<uint32_t>(buffer[7]);
+
+ if (length_from_client_id < 8 || length_from_client_id > MAX_MESSAGE_SIZE) {
+ size_t search_start = 1;
+ bool found_valid_header = false;
+
+ while (search_start + SOMEIP_HEADER_SIZE <= buffer.size()) {
+ uint32_t candidate_length =
+ (static_cast<uint32_t>(buffer[search_start + 4]) << 24) |
+ (static_cast<uint32_t>(buffer[search_start + 5]) << 16) |
+ (static_cast<uint32_t>(buffer[search_start + 6]) << 8) |
+ static_cast<uint32_t>(buffer[search_start + 7]);
+ if (candidate_length >= 8 && candidate_length <= MAX_MESSAGE_SIZE) {
+ buffer.erase(buffer.begin(),
+ buffer.begin() + static_cast<std::ptrdiff_t>(search_start));
+ found_valid_header = true;
+ break;
+ }
+ ++search_start;
+ }
+
+ if (!found_valid_header) {
+ buffer.clear();
+ return false;
+ }
+
+ continue;
+ }
+
+ size_t total_message_size = 8 + length_from_client_id;
+ if (buffer.size() < total_message_size) {
+ return false;
+ }
+
+ std::vector<uint8_t> message_data(
+ buffer.begin(), buffer.begin() + static_cast<std::ptrdiff_t>(total_message_size));
+ buffer.erase(buffer.begin(), buffer.begin() + static_cast<std::ptrdiff_t>(total_message_size));
+
+ message = platform::allocate_message();
+ if (message && message->deserialize(message_data)) {
+ return true;
+ }
+
+ message.reset();
+ }
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/transport/event_driven_tcp_transport.cpp` around lines 241 - 279, The
helper currently returns false after modifying buffer (when resyncing past
invalid headers or after dropping malformed frames), which stops
on_adapter_receive()'s loop and can leave a valid SOME/IP frame behind; modify
the helper to loop internally: wrap the parsing logic so that when you erase
bytes during the resync branch (the search using SOMEIP_HEADER_SIZE,
candidate_length and buffer.erase) you continue the loop to attempt parsing the
next header, and likewise when you detect a complete but malformed frame (after
constructing message_data, calling platform::allocate_message() and
message->deserialize(...) returning false) erase that frame from buffer and
continue parsing instead of returning false; only return false when buffer is
genuinely incomplete (e.g. buffer.size() < total_message_size) or no progress
can be made, and return true immediately when a valid message is allocated and
deserialized.
- Document callback quiescence guarantee on adapter interfaces: after set_*_callback(nullptr) returns, no in-flight callbacks may execute - Add running_ guard to on_adapter_disconnected() in TCP transport - Make parse_message_from_buffer() loop internally after resync so valid frames buffered behind discarded bytes are not deferred - Enforce max_message_size limit in UDP send_message() instead of leaving an empty conditional block - Use explicit .load() on atomic reads in is_running() for consistency Made-with: Cursor
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/transport/event_driven_udp_transport.cpp (1)
27-31:⚠️ Potential issue | 🟠 MajorInvalid local endpoint is still silently accepted when exceptions are disabled.
In no-exception builds, construction continues with an invalid
local_endpoint_, and failure is deferred/adapter-dependent. Add an explicit runtime guard before startup so invalid configuration always fails deterministically.🔧 Proposed minimal fix
Result EventDrivenUdpTransport::start() { + if (!local_endpoint_.is_valid()) { + return Result::INVALID_ENDPOINT; + } if (is_running()) { return Result::SUCCESS; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/transport/event_driven_udp_transport.cpp` around lines 27 - 31, The constructor currently throws only when exceptions are enabled, leaving invalid local_endpoint_ accepted in no-exceptions builds; update the code path that runs before startup to deterministically fail when local_endpoint_.is_valid() is false by adding an explicit runtime guard that returns an error (or sets a failed-init state) and logs the problem when exceptions are disabled. Locate the check around local_endpoint_.is_valid() in the EventDrivenUdpTransport constructor/initializer and add a branch for the non-exceptions case that either returns a failure status from the initialization routine (or sets a boolean like initialized_failed_ / init_ok_ and prevents Start()/Run() from proceeding) and emits a clear error message so startup cannot continue with an invalid endpoint. Ensure Start()/Run() (or the transport startup method) respects this failure flag and refuses to start.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/transport/event_driven_tcp_transport.cpp`:
- Around line 241-243: The variable name length_from_client_id is misleading
because it holds the SOME/IP Length field (header bytes 4-7); rename it to
message_length (or length_field) wherever declared and used in
event_driven_tcp_transport.cpp (the variable in the parsing block that shifts
buffer[4..7]) and update all subsequent references so semantics remain identical
and compilation succeeds.
---
Duplicate comments:
In `@src/transport/event_driven_udp_transport.cpp`:
- Around line 27-31: The constructor currently throws only when exceptions are
enabled, leaving invalid local_endpoint_ accepted in no-exceptions builds;
update the code path that runs before startup to deterministically fail when
local_endpoint_.is_valid() is false by adding an explicit runtime guard that
returns an error (or sets a failed-init state) and logs the problem when
exceptions are disabled. Locate the check around local_endpoint_.is_valid() in
the EventDrivenUdpTransport constructor/initializer and add a branch for the
non-exceptions case that either returns a failure status from the initialization
routine (or sets a boolean like initialized_failed_ / init_ok_ and prevents
Start()/Run() from proceeding) and emits a clear error message so startup cannot
continue with an invalid endpoint. Ensure Start()/Run() (or the transport
startup method) respects this failure flag and refuses to start.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: bcebc8e8-9bb6-4c66-9f7c-d25e2a271655
📒 Files selected for processing (4)
include/transport/tcp_socket_adapter.hinclude/transport/udp_socket_adapter.hsrc/transport/event_driven_tcp_transport.cppsrc/transport/event_driven_udp_transport.cpp
…ge_length The variable reads the SOME/IP Length field at header offset 4-7, which is unrelated to Client ID. Rename for clarity. Made-with: Cursor
- Add IMulticastTransport interface so SD code can work with both UdpTransport and EventDrivenUdpTransport via a common abstraction instead of downcasting to concrete UdpTransport - Add is_valid() to EventDrivenUdpTransport for no-exception builds where constructor cannot signal invalid endpoint - Add mutex-guarded accessors to test listener to prevent future flaky tests from direct unlocked access to shared state - Add comprehensive EventDrivenTcpTransport test suite with 15 tests covering lifecycle, send/receive, fragmented messages, server mode, connection callbacks, and listener persistence across stop/start Made-with: Cursor
The leave result is intentionally ignored during shutdown — the transport is about to be stopped regardless. Made-with: Cursor
Implements GitHub #172: callback-based
IUdpSocketAdapter/ITcpSocketAdapterplusEventDrivenUdpTransportandEventDrivenTcpTransportimplementingITransportwithout pulling in BSD sockets. Adds unit tests with a mock UDP adapter.platform/net.h)Endpoint,Message/MessagePtr, andITransportListenertests/test_event_driven_udp_transport.cppcovers lifecycle, send, receive, multicast, and error pathsMade with Cursor
Summary by CodeRabbit
New Features
Tests
Chores