diff --git a/Framework/Core/include/Framework/DataModelViews.h b/Framework/Core/include/Framework/DataModelViews.h index f42ef85ec78e1..8ceae37204cde 100644 --- a/Framework/Core/include/Framework/DataModelViews.h +++ b/Framework/Core/include/Framework/DataModelViews.h @@ -70,7 +70,7 @@ struct count_parts { count += 1; mi += header->splitPayloadParts + 1; } else { - count += header->splitPayloadParts; + count += header->splitPayloadParts ? header->splitPayloadParts : 1; mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2; } } @@ -104,11 +104,11 @@ struct get_pair { } mi += header->splitPayloadParts + 1; } else { - count += header->splitPayloadParts ? header->splitPayloadParts : 1; - if (self.pairId < count) { - return {mi, mi + 2 * diff + 1}; + if (self.pairId == count) { + return {mi, mi + 1}; } - mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2; + count += 1; + mi += 2; } } throw std::runtime_error("Payload not found"); @@ -138,10 +138,10 @@ struct get_dataref_indices { mi += header->splitPayloadParts + 1; } else { if (self.part == count) { - return {mi, mi + 2 * self.subPart + 1}; + return {mi, mi + self.subPart + 1}; } count += 1; - mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2; + mi += 2; } } throw std::runtime_error("Payload not found"); @@ -172,32 +172,41 @@ struct get_payload { }; struct get_num_payloads { - size_t id; - // ends the pipeline, returns the number of parts + size_t n; + // ends the pipeline, returns the number of payloads which are associated + // to the multipart n-th sequence of messages found in the range template requires std::ranges::random_access_range && std::ranges::sized_range friend size_t operator|(R&& r, get_num_payloads self) { size_t count = 0; size_t mi = 0; + // Un while (mi < r.size()) { auto* header = o2::header::get(r[mi]->GetData()); if (!header) { throw std::runtime_error("Not a DataHeader"); } - if (self.id == count) { - if (header->splitPayloadParts > 1 && (header->splitPayloadIndex == header->splitPayloadParts)) { + if (header->splitPayloadParts > 1 && (header->splitPayloadIndex == header->splitPayloadParts)) { + // This is the case for the new multi payload messages where the number of parts + // is as many as the splitPayloadParts number. + if (self.n == count) { return header->splitPayloadParts; - } else { - return 1; } - } - if (header->splitPayloadParts > 1 && (header->splitPayloadIndex == header->splitPayloadParts)) { + // For multipayload we skip all the parts and their associated header count += 1; mi += header->splitPayloadParts + 1; } else { - count += 1; - mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2; + // This is the case of a multipart (header, payload), (header, payload), ... + // sequence where we know how many pairs are there. + // When splitPayloadParts == 0, it means it is a non-multipart (header, payload) + // pair. Each pair has exactly 1 payload. + auto pairs = header->splitPayloadParts ? header->splitPayloadParts : 1; + if (self.n < count + pairs) { + return 1; + } + count += pairs; + mi += 2 * pairs; } } return 0; diff --git a/Framework/Core/test/test_DataRelayer.cxx b/Framework/Core/test/test_DataRelayer.cxx index e5ca7c5d235e5..9ef927d2e4eb6 100644 --- a/Framework/Core/test/test_DataRelayer.cxx +++ b/Framework/Core/test/test_DataRelayer.cxx @@ -26,6 +26,10 @@ #include "Framework/WorkflowSpec.h" #include #include +#include +#include "Framework/FairMQDeviceProxy.h" +#include "Framework/ExpirationHandler.h" +#include "Framework/LifetimeHelpers.h" #include #include #include @@ -808,4 +812,159 @@ TEST_CASE("DataRelayer") } } } + + SECTION("ProcessDanglingInputs") + { + InputSpec spec{"condition", "TST", "COND"}; + std::vector inputs = { + InputRoute{spec, 0, "from_source_to_self", 0}}; + + std::vector infos{1}; + TimesliceIndex index{1, infos}; + ref.registerService(ServiceRegistryHelpers::handleForService(&index)); + + // Bind a fake input channel so FairMQDeviceProxy::getInputChannelIndex works + FairMQDeviceProxy proxy; + std::vector channels{fair::mq::Channel("from_source_to_self")}; + auto findChannel = [&channels](std::string const& name) -> fair::mq::Channel& { + for (auto& ch : channels) { + if (ch.GetName() == name) { + return ch; + } + } + throw std::runtime_error("Channel not found: " + name); + }; + proxy.bind({}, inputs, {}, findChannel, [] { return false; }); + ref.registerService(ServiceRegistryHelpers::handleForService(&proxy)); + + auto policy = CompletionPolicyHelpers::consumeWhenAny(); + DataRelayer relayer(policy, inputs, index, {registry}, -1); + relayer.setPipelineLength(4); + + auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq"); + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + + DataHeader dh{"COND", "TST", 0}; + dh.splitPayloadParts = 1; + dh.splitPayloadIndex = 0; + DataProcessingHeader dph{0, 1}; + + ExpirationHandler handler; + handler.name = "test-condition"; + handler.routeIndex = RouteIndex{0}; + handler.lifetime = Lifetime::Condition; + + // Creator: claim an empty slot and assign timeslice 0 to it + handler.creator = [](ServiceRegistryRef services, ChannelIndex channelIndex) -> TimesliceSlot { + auto& index = services.get(); + for (size_t si = 0; si < index.size(); si++) { + TimesliceSlot slot{si}; + if (!index.isValid(slot)) { + index.associate(TimesliceId{0}, slot); + (void)index.setOldestPossibleInput({1}, channelIndex); + return slot; + } + } + return TimesliceSlot{TimesliceSlot::INVALID}; + }; + + // Checker: always trigger expiration + handler.checker = LifetimeHelpers::expireAlways(); + + // Handler: materialise a dummy header+payload into the PartRef + handler.handler = [&transport, &channelAlloc, &dh, &dph](ServiceRegistryRef, PartRef& ref, data_matcher::VariableContext&) { + ref.header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}); + ref.payload = transport->CreateMessage(4); + }; + + std::vector handlers{handler}; + auto activity = relayer.processDanglingInputs(handlers, {registry}, true); + + REQUIRE(activity.newSlots == 1); + REQUIRE(activity.expiredSlots == 1); + + // The materialised data should now be ready to consume + std::vector ready; + relayer.getReadyToProcess(ready); + REQUIRE(ready.size() == 1); + REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume); + + auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot); + REQUIRE(result.size() == 1); + REQUIRE((result.at(0).messages | count_parts{}) == 1); + } + + SECTION("ProcessDanglingInputsSkipsWhenDataPresent") + { + // processDanglingInputs must not overwrite a slot that already has data. + // This is guarded by the (part.messages | get_header{0}) != nullptr check. + InputSpec spec{"condition", "TST", "COND"}; + std::vector inputs = { + InputRoute{spec, 0, "from_source_to_self", 0}}; + + std::vector infos{1}; + TimesliceIndex index{1, infos}; + ref.registerService(ServiceRegistryHelpers::handleForService(&index)); + + FairMQDeviceProxy proxy; + std::vector channels{fair::mq::Channel("from_source_to_self")}; + auto findChannel = [&channels](std::string const& name) -> fair::mq::Channel& { + for (auto& ch : channels) { + if (ch.GetName() == name) { + return ch; + } + } + throw std::runtime_error("Channel not found: " + name); + }; + proxy.bind({}, inputs, {}, findChannel, [] { return false; }); + ref.registerService(ServiceRegistryHelpers::handleForService(&proxy)); + + auto policy = CompletionPolicyHelpers::consumeWhenAny(); + DataRelayer relayer(policy, inputs, index, {registry}, -1); + relayer.setPipelineLength(4); + + auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq"); + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + + DataHeader dh{"COND", "TST", 0}; + dh.splitPayloadParts = 1; + dh.splitPayloadIndex = 0; + DataProcessingHeader dph{0, 1}; + + // Build an expiration handler that always tries to expire + ExpirationHandler handler; + handler.name = "test-condition"; + handler.routeIndex = RouteIndex{0}; + handler.lifetime = Lifetime::Condition; + handler.creator = [](ServiceRegistryRef services, ChannelIndex channelIndex) -> TimesliceSlot { + auto& index = services.get(); + for (size_t si = 0; si < index.size(); si++) { + TimesliceSlot slot{si}; + if (!index.isValid(slot)) { + index.associate(TimesliceId{0}, slot); + (void)index.setOldestPossibleInput({1}, channelIndex); + return slot; + } + } + return TimesliceSlot{TimesliceSlot::INVALID}; + }; + handler.checker = LifetimeHelpers::expireAlways(); + int handlerCallCount = 0; + handler.handler = [&transport, &channelAlloc, &dh, &dph, &handlerCallCount](ServiceRegistryRef, PartRef& ref, data_matcher::VariableContext&) { + ref.header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}); + ref.payload = transport->CreateMessage(4); + handlerCallCount++; + }; + std::vector handlers{handler}; + + // First call: slot is empty, so the handler fires and materialises data + auto activity1 = relayer.processDanglingInputs(handlers, {registry}, true); + REQUIRE(activity1.expiredSlots == 1); + REQUIRE(handlerCallCount == 1); + + // Second call: slot already has data — the handler must NOT fire again + auto activity2 = relayer.processDanglingInputs(handlers, {registry}, false); + REQUIRE(activity2.expiredSlots == 0); + REQUIRE(handlerCallCount == 1); // handler was not called a second time + } } diff --git a/Framework/Core/test/test_MessageSet.cxx b/Framework/Core/test/test_MessageSet.cxx index 37f823197ef18..290e55220d6cb 100644 --- a/Framework/Core/test/test_MessageSet.cxx +++ b/Framework/Core/test/test_MessageSet.cxx @@ -12,8 +12,10 @@ #include #include #include "Framework/MessageSet.h" +#include "Framework/DataModelViews.h" #include "Framework/DataProcessingHeader.h" #include "Headers/Stack.h" +#include "Headers/DataHeader.h" #include "MemoryResources/MemoryResources.h" #include @@ -43,6 +45,12 @@ TEST_CASE("MessageSet") REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0); REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1); CHECK_THROWS((msgSet.messages | get_pair{1})); + // Validate pipe operators match old API + REQUIRE(&(msgSet.messages | get_header{0}) == &msgSet.header(0)); + REQUIRE(&(msgSet.messages | get_payload{0, 0}) == &msgSet.payload(0)); + REQUIRE((msgSet.messages | get_num_payloads{0}) == msgSet.messageMap[0].size); + REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); + REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); } TEST_CASE("MessageSetWithFunction") @@ -68,6 +76,11 @@ TEST_CASE("MessageSetWithFunction") REQUIRE((msgSet.messages | get_pair{0}).headerIdx == 0); REQUIRE((msgSet.messages | get_pair{0}).payloadIdx == 1); CHECK_THROWS((msgSet.messages | get_pair{1})); + REQUIRE(&(msgSet.messages | get_header{0}) == &msgSet.header(0)); + REQUIRE(&(msgSet.messages | get_payload{0, 0}) == &msgSet.payload(0)); + REQUIRE((msgSet.messages | get_num_payloads{0}) == msgSet.messageMap[0].size); + REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); + REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); } TEST_CASE("MessageSetWithMultipart") @@ -99,6 +112,13 @@ TEST_CASE("MessageSetWithMultipart") REQUIRE((msgSet.messages | get_pair{1}).headerIdx == 0); REQUIRE((msgSet.messages | get_pair{1}).payloadIdx == 2); CHECK_THROWS((msgSet.messages | get_pair{2})); + // Validate pipe operators match old API for multi-payload + REQUIRE(&(msgSet.messages | get_header{0}) == &msgSet.header(0)); + REQUIRE(&(msgSet.messages | get_payload{0, 0}) == &msgSet.payload(0, 0)); + REQUIRE(&(msgSet.messages | get_payload{0, 1}) == &msgSet.payload(0, 1)); + REQUIRE((msgSet.messages | get_num_payloads{0}) == msgSet.messageMap[0].size); + REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); + REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); } TEST_CASE("MessageSetAddPartRef") @@ -170,4 +190,234 @@ TEST_CASE("MessageSetAddMultiple") REQUIRE((msgSet.messages | get_pair{2}).payloadIdx == 5); REQUIRE((msgSet.messages | get_pair{3}).headerIdx == 4); REQUIRE((msgSet.messages | get_pair{3}).payloadIdx == 6); + // Validate pipe operators match old API for mixed modes + for (size_t i = 0; i < 3; ++i) { + REQUIRE(&(msgSet.messages | get_header{i}) == &msgSet.header(i)); + REQUIRE(&(msgSet.messages | get_payload{i, 0}) == &msgSet.payload(i, 0)); + } + // Part 2 has a second payload (multi-payload with splitPayloadParts=2, splitPayloadIndex=2) + REQUIRE(&(msgSet.messages | get_payload{2, 1}) == &msgSet.payload(2, 1)); + for (size_t i = 0; i < 3; ++i) { + REQUIRE((msgSet.messages | get_num_payloads{i}) == msgSet.messageMap[i].size); + } + REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); + REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); +} + +TEST_CASE("GetHeaderPayloadOperators") +{ + // Validates that get_header{part} / get_payload{part, 0} pipe operators on .messages + // correctly replace the removed header(part) / payload(part) methods, + // including access to parts at index > 0. + o2::framework::DataProcessingHeader dph{0, 1}; + auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq"); + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + + o2::framework::MessageSet msgSet; + + // Add two separate header-payload pairs + for (size_t part = 0; part < 2; ++part) { + o2::header::DataHeader dh{}; + dh.dataDescription = "CLUSTERS"; + dh.dataOrigin = "TPC"; + dh.subSpecification = part; // use part index as subSpecification to distinguish + dh.splitPayloadParts = 1; + dh.splitPayloadIndex = 0; + std::vector ptrs; + ptrs.emplace_back(o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph})); + ptrs.emplace_back(transport->CreateMessage(100 + part * 100)); // 100 and 200 bytes + msgSet.add([&ptrs](size_t i) -> fair::mq::MessagePtr& { return ptrs[i]; }, 2); + } + + REQUIRE(msgSet.messages.size() == 4); + + // Validate part 0 + auto& hdr0 = msgSet.messages | get_header{0}; + REQUIRE(hdr0.get() != nullptr); + auto* dh0 = o2::header::get(hdr0->GetData()); + REQUIRE(dh0 != nullptr); + REQUIRE(dh0->subSpecification == 0); + auto& pl0 = msgSet.messages | get_payload{0, 0}; + REQUIRE(pl0.get() != nullptr); + REQUIRE(pl0->GetSize() == 100); + + // Validate part 1 + auto& hdr1 = msgSet.messages | get_header{1}; + REQUIRE(hdr1.get() != nullptr); + auto* dh1 = o2::header::get(hdr1->GetData()); + REQUIRE(dh1 != nullptr); + REQUIRE(dh1->subSpecification == 1); + auto& pl1 = msgSet.messages | get_payload{1, 0}; + REQUIRE(pl1.get() != nullptr); + REQUIRE(pl1->GetSize() == 200); +} + +TEST_CASE("GetHeaderPayloadMultiPayload") +{ + // Validates get_header{part} / get_payload{part, subpart} where both + // part and subpart can be non-zero. + // Layout: + // part 0: standard (1 header + 1 payload) → splitPayloadParts=1 + // part 1: multi-payload (1 header + 3 payloads) → splitPayloadParts=3, splitPayloadIndex=3 + o2::framework::DataProcessingHeader dph{0, 1}; + auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq"); + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + + o2::framework::MessageSet msgSet; + + // Part 0: standard header-payload pair + { + o2::header::DataHeader dh{}; + dh.dataDescription = "CLUSTERS"; + dh.dataOrigin = "TPC"; + dh.subSpecification = 0; + dh.splitPayloadParts = 1; + dh.splitPayloadIndex = 0; + std::vector ptrs; + ptrs.emplace_back(o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph})); + ptrs.emplace_back(transport->CreateMessage(100)); + msgSet.add([&ptrs](size_t i) -> fair::mq::MessagePtr& { return ptrs[i]; }, 2); + } + + // Part 1: one header with 3 payloads (splitPayloadIndex == splitPayloadParts) + { + o2::header::DataHeader dh{}; + dh.dataDescription = "TRACKS"; + dh.dataOrigin = "TPC"; + dh.subSpecification = 1; + dh.splitPayloadParts = 3; + dh.splitPayloadIndex = 3; // signals multi-payload layout + std::vector ptrs; + ptrs.emplace_back(o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph})); + ptrs.emplace_back(transport->CreateMessage(200)); + ptrs.emplace_back(transport->CreateMessage(300)); + ptrs.emplace_back(transport->CreateMessage(400)); + msgSet.add([&ptrs](size_t i) -> fair::mq::MessagePtr& { return ptrs[i]; }, 4); + } + + // messages: [hdr0, pl0, hdr1, pl1_0, pl1_1, pl1_2] + REQUIRE(msgSet.messages.size() == 6); + + // Part 0: standard + auto& hdr0 = msgSet.messages | get_header{0}; + REQUIRE(hdr0.get() != nullptr); + auto* dh0 = o2::header::get(hdr0->GetData()); + REQUIRE(dh0->subSpecification == 0); + auto& pl0 = msgSet.messages | get_payload{0, 0}; + REQUIRE(pl0.get() != nullptr); + REQUIRE(pl0->GetSize() == 100); + + // Part 1: multi-payload header + auto& hdr1 = msgSet.messages | get_header{1}; + REQUIRE(hdr1.get() != nullptr); + auto* dh1 = o2::header::get(hdr1->GetData()); + REQUIRE(dh1->subSpecification == 1); + + // get_payload{1, 0} — first payload of part 1 + auto& pl1_0 = msgSet.messages | get_payload{1, 0}; + REQUIRE(pl1_0.get() != nullptr); + REQUIRE(pl1_0->GetSize() == 200); + + // get_payload{1, 1} — second payload of part 1 (nonzero, nonzero) + auto& pl1_1 = msgSet.messages | get_payload{1, 1}; + REQUIRE(pl1_1.get() != nullptr); + REQUIRE(pl1_1->GetSize() == 300); + + // get_payload{1, 2} — third payload of part 1 (nonzero, nonzero) + auto& pl1_2 = msgSet.messages | get_payload{1, 2}; + REQUIRE(pl1_2.get() != nullptr); + REQUIRE(pl1_2->GetSize() == 400); + + // count_payloads should report 4 total (1 from part 0 + 3 from part 1) + REQUIRE((msgSet.messages | count_payloads{}) == 4); + // count_parts should report 2 (one per header) + REQUIRE((msgSet.messages | count_parts{}) == 2); + // get_num_payloads for part 1 should be 3 + REQUIRE((msgSet.messages | get_num_payloads{1}) == 3); + + // Validate pipe operators match old API for multi-payload (header, pl, pl, pl) + REQUIRE(&(msgSet.messages | get_header{0}) == &msgSet.header(0)); + REQUIRE(&(msgSet.messages | get_header{1}) == &msgSet.header(1)); + REQUIRE(&(msgSet.messages | get_payload{0, 0}) == &msgSet.payload(0, 0)); + REQUIRE(&(msgSet.messages | get_payload{1, 0}) == &msgSet.payload(1, 0)); + REQUIRE(&(msgSet.messages | get_payload{1, 1}) == &msgSet.payload(1, 1)); + REQUIRE(&(msgSet.messages | get_payload{1, 2}) == &msgSet.payload(1, 2)); + for (size_t i = 0; i < 2; ++i) { + REQUIRE((msgSet.messages | get_num_payloads{i}) == msgSet.messageMap[i].size); + } + REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); + REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); +} + +TEST_CASE("TraditionalSplitParts") +{ + // Validates operators with traditional split parts layout: + // 3 (header, payload) pairs where splitPayloadParts=3, splitPayloadIndex=0,1,2 + // This is ONE logical part with 3 subparts. + // Memory layout: [hdr0, pl0, hdr1, pl1, hdr2, pl2] + o2::framework::DataProcessingHeader dph{0, 1}; + auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq"); + auto channelAlloc = o2::pmr::getTransportAllocator(transport.get()); + + o2::framework::MessageSet msgSet; + + for (size_t i = 0; i < 3; ++i) { + o2::header::DataHeader dh{}; + dh.dataDescription = "CLUSTERS"; + dh.dataOrigin = "TPC"; + dh.subSpecification = 0; + dh.splitPayloadParts = 3; + dh.splitPayloadIndex = i; + std::vector ptrs; + ptrs.emplace_back(o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph})); + ptrs.emplace_back(transport->CreateMessage(100 * (i + 1))); + msgSet.add([&ptrs](size_t idx) -> fair::mq::MessagePtr& { return ptrs[idx]; }, 2); + } + + REQUIRE(msgSet.messages.size() == 6); + + // count_payloads: 3 traditional split parts = 3 payloads + REQUIRE((msgSet.messages | count_payloads{}) == 3); + // count_parts: one logical entity split into 3 pairs = 3 parts + REQUIRE((msgSet.messages | count_parts{}) == 3); + + // Each traditional split pair is a separate part, matching MessageSet::header(part) semantics + for (size_t i = 0; i < 3; ++i) { + auto& hdr = msgSet.messages | get_header{i}; + REQUIRE(hdr.get() != nullptr); + auto* dh = o2::header::get(hdr->GetData()); + REQUIRE(dh != nullptr); + REQUIRE(dh->splitPayloadIndex == i); + + auto& pl = msgSet.messages | get_payload{i, 0}; + REQUIRE(pl.get() != nullptr); + REQUIRE(pl->GetSize() == 100 * (i + 1)); + } + + // get_dataref_indices: each part maps to its own (header, payload) pair + for (size_t i = 0; i < 3; ++i) { + auto indices = msgSet.messages | get_dataref_indices{i, 0}; + REQUIRE(indices.headerIdx == 2 * i); + REQUIRE(indices.payloadIdx == 2 * i + 1); + } + + // get_pair: same as get_dataref_indices for traditional split + for (size_t i = 0; i < 3; ++i) { + auto indices = msgSet.messages | get_pair{i}; + REQUIRE(indices.headerIdx == 2 * i); + REQUIRE(indices.payloadIdx == 2 * i + 1); + } + + // get_num_payloads: each traditional split pair has 1 payload + for (size_t i = 0; i < 3; ++i) { + REQUIRE((msgSet.messages | get_num_payloads{i}) == msgSet.messageMap[i].size); + } + + // Validate pipe operators match old MessageSet::header()/payload() API + for (size_t i = 0; i < 3; ++i) { + REQUIRE(&(msgSet.messages | get_header{i}) == &msgSet.header(i)); + REQUIRE(&(msgSet.messages | get_payload{i, 0}) == &msgSet.payload(i)); + } + REQUIRE((msgSet.messages | count_parts{}) == msgSet.messageMap.size()); + REQUIRE((msgSet.messages | count_payloads{}) == msgSet.pairMap.size()); }