diff --git a/MODULE.bazel.lock b/MODULE.bazel.lock index 3c1f64734c930..8c6ce2abf6a2f 100644 --- a/MODULE.bazel.lock +++ b/MODULE.bazel.lock @@ -352,7 +352,7 @@ "moduleExtensions": { "//bazel:extensions.bzl%non_module_dependencies": { "general": { - "bzlTransitiveDigest": "QjTj1c16NUTHtgapkCb3zvr9rfAZchyhmf6C3j78FBU=", + "bzlTransitiveDigest": "IOUVO55MSpHf1Jcnrk2EElDodXcIMjWJjU+qdqAvxHU=", "usagesDigest": "FEiDyZe9eAU6yEqnarZf0XMEUk+prUyYClvq1RU1J98=", "recordedInputs": [ "REPO_MAPPING:,bazel_tools bazel_tools" @@ -521,9 +521,9 @@ "repoRuleId": "@@bazel_tools//tools/build_defs/repo:http.bzl%http_archive", "attributes": { "build_file": "@@//bazel/thirdparty:seastar.BUILD", - "sha256": "de9da2d60057985c55199459a063d02944a95f1c41ce7f9e09f62a3b9c941465", - "strip_prefix": "seastar-a0b4f2a6b0fe5d86d3e03f0a85f7d7f7b86bdbe7", - "url": "https://github.com/redpanda-data/seastar/archive/a0b4f2a6b0fe5d86d3e03f0a85f7d7f7b86bdbe7.tar.gz" + "sha256": "d043a4e3f979f05d7b69b22d73d8ec1d56465daf4a673c353080e27ee49b6791", + "strip_prefix": "seastar-097536fff982981023c2073e3dc5dc885cfe74be", + "url": "https://github.com/redpanda-data/seastar/archive/097536fff982981023c2073e3dc5dc885cfe74be.tar.gz" } }, "unordered_dense": { diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index 82bc18e115ea9..5b2a642bd7007 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -156,13 +156,13 @@ def data_dependency(): url = "https://github.com/redpanda-data/CRoaring/archive/c433d1c70c10fb2e40f049e019e2abbcafa6e69d.tar.gz", ) - # branch: v26.2.x-pre + # branch: v26.2.x (rebased) — https://github.com/redpanda-data/seastar/pull/277 http_archive( name = "seastar", build_file = "//bazel/thirdparty:seastar.BUILD", - sha256 = "de9da2d60057985c55199459a063d02944a95f1c41ce7f9e09f62a3b9c941465", - strip_prefix = "seastar-a0b4f2a6b0fe5d86d3e03f0a85f7d7f7b86bdbe7", - url = "https://github.com/redpanda-data/seastar/archive/a0b4f2a6b0fe5d86d3e03f0a85f7d7f7b86bdbe7.tar.gz", + sha256 = "d043a4e3f979f05d7b69b22d73d8ec1d56465daf4a673c353080e27ee49b6791", + strip_prefix = "seastar-097536fff982981023c2073e3dc5dc885cfe74be", + url = "https://github.com/redpanda-data/seastar/archive/097536fff982981023c2073e3dc5dc885cfe74be.tar.gz", ) http_archive( diff --git a/bazel/thirdparty/seastar.BUILD b/bazel/thirdparty/seastar.BUILD index d6ebc1c4912d5..79076a097a66d 100644 --- a/bazel/thirdparty/seastar.BUILD +++ b/bazel/thirdparty/seastar.BUILD @@ -202,6 +202,7 @@ cc_proto_library( cc_library( name = "seastar", srcs = [ + "include/seastar/core/internal/md5.hh", "include/seastar/net/native-stack.hh", "include/seastar/net/tcp.hh", "include/seastar/net/tcp-stack.hh", @@ -213,6 +214,9 @@ cc_library( "src/core/cgroup.hh", "src/core/condition-variable.cc", "src/core/cpu_profiler.cc", + "src/core/crypto.cc", + "src/core/crypto.hh", + "src/core/crypto_openssl.cc", "src/core/disk_params.cc", "src/core/dpdk_rte.cc", "src/core/exception_hacks.cc", @@ -284,7 +288,6 @@ cc_library( "src/net/native-stack.cc", "src/net/native-stack-impl.hh", "src/net/net.cc", - "src/net/ossl.cc", "src/net/packet.cc", "src/net/posix-stack.cc", "src/net/proxy.cc", @@ -293,6 +296,8 @@ cc_library( "src/net/tcp.cc", "src/net/tls-impl.cc", "src/net/tls-impl.hh", + "src/net/tls_openssl.cc", + "src/net/tls_openssl.hh", "src/net/udp.cc", "src/net/unix_address.cc", "src/net/virtio.cc", @@ -310,6 +315,7 @@ cc_library( "src/util/read_first_line.cc", "src/util/short_streams.cc", "src/util/tmp_file.cc", + "src/websocket/client.cc", "src/websocket/common.cc", "src/websocket/parser.cc", "src/websocket/server.cc", @@ -523,6 +529,7 @@ cc_library( "include/seastar/util/file.hh", "include/seastar/util/function_input_iterator.hh", "include/seastar/util/indirect.hh", + "include/seastar/util/integrated-length.hh", "include/seastar/util/internal/array_map.hh", "include/seastar/util/internal/iovec_utils.hh", "include/seastar/util/internal/magic.hh", @@ -534,6 +541,7 @@ cc_library( "include/seastar/util/log-cli.hh", "include/seastar/util/log-impl.hh", "include/seastar/util/log-level.hh", + "include/seastar/util/memory-data-sink.hh", "include/seastar/util/memory-data-source.hh", "include/seastar/util/memory_diagnostics.hh", "include/seastar/util/noncopyable_function.hh", @@ -554,6 +562,7 @@ cc_library( "include/seastar/util/tuple_utils.hh", "include/seastar/util/used_size.hh", "include/seastar/util/variant_utils.hh", + "include/seastar/websocket/client.hh", "include/seastar/websocket/common.hh", "include/seastar/websocket/parser.hh", "include/seastar/websocket/server.hh", @@ -572,7 +581,7 @@ cc_library( "SEASTAR_API_LEVEL=$(API_LEVEL)", "SEASTAR_HAS_MEMBARRIER", "SEASTAR_SCHEDULING_GROUPS_COUNT=$(SCHEDULING_GROUPS)", - "SEASTAR_USE_OPENSSL", + "SEASTAR_HAVE_OPENSSL", ] + select({ ":use_task_backtrace": ["SEASTAR_TASK_BACKTRACE"], "//conditions:default": [], diff --git a/src/v/bytes/tests/iobuf_tests_mt.cc b/src/v/bytes/tests/iobuf_tests_mt.cc index 2eb78dc9d87c8..9505d77e6851e 100644 --- a/src/v/bytes/tests/iobuf_tests_mt.cc +++ b/src/v/bytes/tests/iobuf_tests_mt.cc @@ -32,7 +32,8 @@ TEST_CORO(iobuf_mt, test_cross_shard_shares) { boost::irange(0ul, parallel_shares), [b = buf.share(0, buf.size_bytes())](auto s) mutable { return ss::smp::submit_to( - s % ss::smp::count, [b = b.share(0, b.size_bytes())]() mutable { + s % ss::this_smp_shard_count(), + [b = b.share(0, b.size_bytes())]() mutable { for (size_t i = 0; i < iterations; i++) { b.share(0, b.size_bytes()); } diff --git a/src/v/cloud_io/cache_service.cc b/src/v/cloud_io/cache_service.cc index 3c3ce48331553..ee4a7525204c2 100644 --- a/src/v/cloud_io/cache_service.cc +++ b/src/v/cloud_io/cache_service.cc @@ -1764,7 +1764,7 @@ ss::future<> cache::do_reserve_space( auto short_term_hydrations_estimate = config::shard_local_cfg().cloud_storage_max_connections() - * ss::smp::count; + * ss::this_smp_shard_count(); // Here we're trying to estimate how much space do we need to // free to allow all TS resources to be used again to download diff --git a/src/v/cloud_io/io_resources.cc b/src/v/cloud_io/io_resources.cc index 60318d04476be..2bafe4ad488f1 100644 --- a/src/v/cloud_io/io_resources.cc +++ b/src/v/cloud_io/io_resources.cc @@ -110,7 +110,7 @@ throughput_limit get_hard_throughput_limit() { auto hard_limit = config::shard_local_cfg() .cloud_storage_max_throughput_per_shard() .value_or(0) - * ss::smp::count; + * ss::this_smp_shard_count(); if (hard_limit == 0) { // Run tiered-storage without throttling by setting @@ -120,7 +120,8 @@ throughput_limit get_hard_throughput_limit() { return { .disk_node_throughput_limit = hard_limit, - .download_shard_throughput_limit = hard_limit / ss::smp::count, + .download_shard_throughput_limit = hard_limit + / ss::this_smp_shard_count(), }; } @@ -128,7 +129,7 @@ throughput_limit get_throughput_limit(std::optional device_throughput) { auto hard_limit = config::shard_local_cfg() .cloud_storage_max_throughput_per_shard() .value_or(0) - * ss::smp::count; + * ss::this_smp_shard_count(); auto percent = config::shard_local_cfg() .cloud_storage_throughput_limit_percent() .value_or(0); @@ -147,7 +148,8 @@ throughput_limit get_throughput_limit(std::optional device_throughput) { // is set we still need to limit network bandwidth even though // the limit is overly high. return throughput_limit{ - .download_shard_throughput_limit = hard_limit / ss::smp::count, + .download_shard_throughput_limit = hard_limit + / ss::this_smp_shard_count(), }; } @@ -156,7 +158,7 @@ throughput_limit get_throughput_limit(std::optional device_throughput) { auto tp = std::min(hard_limit, scaled_device_throughput); return { .disk_node_throughput_limit = tp, - .download_shard_throughput_limit = tp / ss::smp::count, + .download_shard_throughput_limit = tp / ss::this_smp_shard_count(), }; } diff --git a/src/v/cloud_io/tests/cache_test.cc b/src/v/cloud_io/tests/cache_test.cc index 729995ebb1a03..bb12f106708a4 100644 --- a/src/v/cloud_io/tests/cache_test.cc +++ b/src/v/cloud_io/tests/cache_test.cc @@ -514,7 +514,7 @@ FIXTURE_TEST(test_clean_up_on_stream_exception, cache_test_fixture) { */ FIXTURE_TEST(test_put_enospc_on_non_zero_shard, cache_test_fixture) { vassert( - ss::smp::count >= 2, + ss::this_smp_shard_count() >= 2, "Test requires at least 2 shards to exercise non-shard-0 put path"); BOOST_CHECK_EXCEPTION( @@ -539,7 +539,7 @@ FIXTURE_TEST(test_put_enospc_on_non_zero_shard, cache_test_fixture) { // After ENOSPC, all shards must observe the block-puts flag, otherwise // concurrent puts on other shards would keep racing into a full disk. - for (ss::shard_id s = 0; s < ss::smp::count; ++s) { + for (ss::shard_id s = 0; s < ss::this_smp_shard_count(); ++s) { BOOST_CHECK(get_block_puts(s)); } } diff --git a/src/v/cloud_io/tests/db_s3_imposter_fixture.cc b/src/v/cloud_io/tests/db_s3_imposter_fixture.cc index 345e0ed3f3e44..5327d352f90c9 100644 --- a/src/v/cloud_io/tests/db_s3_imposter_fixture.cc +++ b/src/v/cloud_io/tests/db_s3_imposter_fixture.cc @@ -125,7 +125,7 @@ struct db_s3_imposter_fixture::handler : ss::httpd::handler_base { rep->set_status(ss::http::reply::status_type::bad_request); } - rep->done("xml"); + rep->set_content_type("xml"); co_return std::move(rep); } @@ -528,7 +528,7 @@ ss::future<> db_s3_imposter_fixture::start() { _handlers.push_back(std::move(real)); std::vector fwd_ptrs; - for (unsigned i = 1; i < ss::smp::count; ++i) { + for (unsigned i = 1; i < ss::this_smp_shard_count(); ++i) { auto fwd = std::make_unique(*real_ptr); fwd_ptrs.push_back(fwd.get()); _handlers.push_back(std::move(fwd)); diff --git a/src/v/cloud_storage_clients/client_pool.cc b/src/v/cloud_storage_clients/client_pool.cc index 6b6c613790e84..f647bc75aeb48 100644 --- a/src/v/cloud_storage_clients/client_pool.cc +++ b/src/v/cloud_storage_clients/client_pool.cc @@ -42,7 +42,7 @@ constexpr auto pool_ready_timeout = 15s; [[nodiscard]] std::pair pick_two_random_shards() { using dist_t = std::uniform_int_distribution; - const ss::shard_id n = ss::smp::count; + const ss::shard_id n = ss::this_smp_shard_count(); const ss::shard_id self = ss::this_shard_id(); vassert(n > 1, "At least two shards are required"); @@ -308,7 +308,7 @@ ss::future client_pool::acquire( if (likely(!_idle_clients.empty())) { client = pop_most_recently_used(); } else if ( - ss::smp::count == 1 + ss::this_smp_shard_count() == 1 || _policy == client_pool_overdraft_policy::wait_if_empty || _leased.size() >= _capacity * 2) { // If borrowing is disabled or this shard borrowed '_capacity' diff --git a/src/v/cloud_storage_clients/detail/tests/registry_test.cc b/src/v/cloud_storage_clients/detail/tests/registry_test.cc index 1edd4e3ec7c38..71d969d1edce4 100644 --- a/src/v/cloud_storage_clients/detail/tests/registry_test.cc +++ b/src/v/cloud_storage_clients/detail/tests/registry_test.cc @@ -159,7 +159,7 @@ TEST(UpstreamRegistry, DelayedThrowingStartWithMultipleWaiters) { } TEST(UpstreamRegistry, CrossShardConcurrentGet) { - ASSERT_GE(ss::smp::count, 2); + ASSERT_GE(ss::this_smp_shard_count(), 2); ss::sharded registry; registry.start(std::ref(test_log), test_registry::no_entry_limit).get(); @@ -174,13 +174,13 @@ TEST(UpstreamRegistry, CrossShardConcurrentGet) { }) .get(); - for (ss::shard_id s{0}; s < ss::smp::count; ++s) { + for (ss::shard_id s{0}; s < ss::this_smp_shard_count(); ++s) { EXPECT_EQ(host_shards[s], s); } } TEST(UpstreamRegistry, CrossShardThrowingStart) { - ASSERT_GE(ss::smp::count, 2); + ASSERT_GE(ss::this_smp_shard_count(), 2); ss::sharded registry; registry.start(std::ref(test_log), test_registry::no_entry_limit).get(); @@ -294,7 +294,7 @@ TEST(UpstreamRegistry, EvictorLoopEvictsIdleEntries) { } TEST(UpstreamRegistry, EntryLimitEnforced) { - ASSERT_GE(ss::smp::count, 2); + ASSERT_GE(ss::this_smp_shard_count(), 2); constexpr size_t max_entries = 2; ss::sharded registry; @@ -346,7 +346,7 @@ TEST(UpstreamRegistry, EntryLimitEnforced) { } TEST(UpstreamRegistry, CrossShardEviction) { - ASSERT_GE(ss::smp::count, 2); + ASSERT_GE(ss::this_smp_shard_count(), 2); constexpr auto evict_threshold = ss::lowres_clock::time_point::max(); ss::sharded registry; @@ -456,7 +456,7 @@ TEST(UpstreamRegistry, StressConcurrentGetAndEvict) { } TEST(UpstreamRegistry, StressCrossShardConcurrentCreateEvict) { - ASSERT_GE(ss::smp::count, 2); + ASSERT_GE(ss::this_smp_shard_count(), 2); ss::sharded registry; registry.start(std::ref(test_log), test_registry::no_entry_limit).get(); @@ -482,7 +482,7 @@ TEST(UpstreamRegistry, StressCrossShardConcurrentCreateEvict) { // Verify entries created - coordinator has all keys. auto counts = registry.map([](test_registry& r) { return r.entry_count(); }).get(); - EXPECT_EQ(counts[0], keys_per_shard * ss::smp::count); + EXPECT_EQ(counts[0], keys_per_shard * ss::this_smp_shard_count()); // Evict on peers first (releases coordinator's semaphore units). registry @@ -512,7 +512,7 @@ TEST(UpstreamRegistry, StressCrossShardConcurrentCreateEvict) { } TEST(UpstreamRegistry, StressConcurrentGetAndEvictWorkers) { - ASSERT_GE(ss::smp::count, 2); + ASSERT_GE(ss::this_smp_shard_count(), 2); ss::sharded registry; registry.start(std::ref(test_log), test_registry::no_entry_limit).get(); @@ -579,7 +579,7 @@ TEST(UpstreamRegistry, StressConcurrentGetAndEvictWorkers) { } TEST(UpstreamRegistry, PrepareStopCallsServicePrepareStop) { - ASSERT_GE(ss::smp::count, 2); + ASSERT_GE(ss::this_smp_shard_count(), 2); ss::sharded registry; registry.start(std::ref(test_log), test_registry::no_entry_limit).get(); diff --git a/src/v/cloud_storage_clients/tests/client_pool_mt_test.cc b/src/v/cloud_storage_clients/tests/client_pool_mt_test.cc index 53d683096de05..9e5cfee9fd645 100644 --- a/src/v/cloud_storage_clients/tests/client_pool_mt_test.cc +++ b/src/v/cloud_storage_clients/tests/client_pool_mt_test.cc @@ -57,7 +57,7 @@ static cloud_storage_clients::s3_configuration client_configuration() { static const client_pool_builder test_pool_builder{client_configuration()}; SEASTAR_THREAD_TEST_CASE(test_client_pool_acquire_blocked_on_another_shard) { - BOOST_REQUIRE(ss::smp::count == 2); + BOOST_REQUIRE(ss::this_smp_shard_count() == 2); constexpr size_t num_connections_per_shard = 4; @@ -137,7 +137,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_pool_acquire_blocked_on_another_shard) { } SEASTAR_THREAD_TEST_CASE(test_client_pool_acquire_blocked_on_this_shard) { - BOOST_REQUIRE(ss::smp::count == 2); + BOOST_REQUIRE(ss::this_smp_shard_count() == 2); constexpr size_t num_connections_per_shard = 4; @@ -190,7 +190,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_pool_acquire_blocked_on_this_shard) { } SEASTAR_THREAD_TEST_CASE(test_client_pool_acquire_after_leasing_all) { - BOOST_REQUIRE(ss::smp::count == 2); + BOOST_REQUIRE(ss::this_smp_shard_count() == 2); constexpr size_t num_connections_per_shard = 4; ss::sharded pool; @@ -223,7 +223,9 @@ SEASTAR_THREAD_TEST_CASE(test_client_pool_acquire_after_leasing_all) { auto leases_stop = ss::defer([&leases] { leases.stop().get(); }); // Lease all connections from all the shards. - for (size_t i = 0; i < ss::smp::count * num_connections_per_shard; i++) { + for (size_t i = 0; + i < ss::this_smp_shard_count() * num_connections_per_shard; + i++) { leases.local().leases.push_back( pool.local().acquire(test_bucket, leases.local().as).get()); } @@ -291,7 +293,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_pool_acquire_after_leasing_all) { } SEASTAR_THREAD_TEST_CASE(test_client_pool_concurrent_acquire_release) { - BOOST_REQUIRE(ss::smp::count == 2); + BOOST_REQUIRE(ss::this_smp_shard_count() == 2); constexpr size_t num_connections_per_shard = 4; constexpr size_t num_workers_per_shard = 3; diff --git a/src/v/cloud_topics/level_one/maintenance/tests/scheduler_multithread_test.cc b/src/v/cloud_topics/level_one/maintenance/tests/scheduler_multithread_test.cc index 91e956462206f..52054f66f062a 100644 --- a/src/v/cloud_topics/level_one/maintenance/tests/scheduler_multithread_test.cc +++ b/src/v/cloud_topics/level_one/maintenance/tests/scheduler_multithread_test.cc @@ -64,7 +64,8 @@ TEST_F(SchedulerTestFixture, TestSchedulerMultithread) { scheduler->unmanage_partition(ntp_to_remove, "unmanage_partition_func"); }; auto pause_random_worker_func = [this, &paused_workers]() { - auto random_shard = random_generators::get_int(ss::smp::count - 1); + auto random_shard = random_generators::get_int( + ss::this_smp_shard_count() - 1); // Inserting _before_ the pause_worker() future resolves can help expose // deadlocks. paused_workers.insert(random_shard); diff --git a/src/v/cloud_topics/level_one/maintenance/tests/scheduler_test.cc b/src/v/cloud_topics/level_one/maintenance/tests/scheduler_test.cc index 506f7d96ce20b..922578d1968f9 100644 --- a/src/v/cloud_topics/level_one/maintenance/tests/scheduler_test.cc +++ b/src/v/cloud_topics/level_one/maintenance/tests/scheduler_test.cc @@ -66,7 +66,8 @@ TEST_F(SchedulerTestFixture, TestScheduler) { scheduler->unmanage_partition(ntp_to_remove, "unmanage_partition_func"); }; auto pause_random_worker_func = [this, &paused_workers]() { - auto random_shard = random_generators::get_int(ss::smp::count - 1); + auto random_shard = random_generators::get_int( + ss::this_smp_shard_count() - 1); // Inserting _before_ the pause_worker() future resolves can help expose // deadlocks. paused_workers.insert(random_shard); diff --git a/src/v/cloud_topics/level_one/maintenance/tests/worker_manager_test.cc b/src/v/cloud_topics/level_one/maintenance/tests/worker_manager_test.cc index 48f3b28312ca4..5852c2434b642 100644 --- a/src/v/cloud_topics/level_one/maintenance/tests/worker_manager_test.cc +++ b/src/v/cloud_topics/level_one/maintenance/tests/worker_manager_test.cc @@ -58,7 +58,7 @@ TEST_F(WorkerManagerTestFixture, PauseAndResumeWorkers) { start_workers(manager).get(); auto stop_manager = ss::defer([&manager] { manager.stop().get(); }); using worker_state = l1::compaction_worker::worker_state; - for (ss::shard_id i = 0; i < ss::smp::count; ++i) { + for (ss::shard_id i = 0; i < ss::this_smp_shard_count(); ++i) { // Workers start in active state ASSERT_EQ(get_worker_state(manager, i).get(), worker_state::active); ASSERT_TRUE(work_fut_has_value(manager, i).get()); diff --git a/src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_mt_test.cc b/src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_mt_test.cc index 5a7ef095ecae4..a96128956ceca 100644 --- a/src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_mt_test.cc +++ b/src/v/cloud_topics/level_zero/gc/tests/level_zero_gc_mt_test.cc @@ -168,7 +168,7 @@ class mt_epoch_source : public l0::gc::epoch_source { class mt_node_info : public l0::gc::node_info { public: size_t shard_index() const override { return ss::this_shard_id(); } - size_t total_shards() const override { return ss::smp::count; } + size_t total_shards() const override { return ss::this_smp_shard_count(); } }; class safety_monitor_test_impl : public cloud_topics::l0::gc::safety_monitor { @@ -184,7 +184,7 @@ class safety_monitor_test_impl : public cloud_topics::l0::gc::safety_monitor { struct level_zero_gc_mt_test : public seastar_test { ss::future<> SetUpAsync() override { vassert(ss::this_shard_id() == ss::shard_id{0}, "Setup on not shard 0"); - vassert(ss::smp::count > 1, "Too few shards"); + vassert(ss::this_smp_shard_count() > 1, "Too few shards"); // Create shared state on shard 0 g_bucket_state = std::make_unique(); @@ -274,8 +274,8 @@ TEST_F_CORO(level_zero_gc_mt_test, objects_deleted_across_shards) { RPTEST_REQUIRE_EVENTUALLY_CORO( 5s, [this] { return get_total_deleted() == num_objects; }); - EXPECT_EQ(get_shards_that_deleted(), ss::smp::count); - EXPECT_EQ(get_shards_that_listed(), ss::smp::count); + EXPECT_EQ(get_shards_that_deleted(), ss::this_smp_shard_count()); + EXPECT_EQ(get_shards_that_listed(), ss::this_smp_shard_count()); } /* @@ -291,7 +291,7 @@ TEST_F_CORO(level_zero_gc_mt_test, no_objects_no_crash) { co_await ss::sleep(500ms); // Should complete without crashing or trying to delete anything - EXPECT_EQ(get_shards_that_listed(), ss::smp::count) + EXPECT_EQ(get_shards_that_listed(), ss::this_smp_shard_count()) << "No shards attempted to list"; EXPECT_EQ(get_shards_that_deleted(), 0); EXPECT_EQ(get_total_deleted(), 0); @@ -311,7 +311,7 @@ TEST_F_CORO(level_zero_gc_mt_test, no_eligible_epoch) { co_await ss::sleep(500ms); // Objects should not be deleted since there's no eligible epoch - EXPECT_EQ(get_shards_that_listed(), ss::smp::count) + EXPECT_EQ(get_shards_that_listed(), ss::this_smp_shard_count()) << "No shards attempted to list"; EXPECT_EQ(get_shards_that_deleted(), 0); EXPECT_EQ(get_total_deleted(), 0); diff --git a/src/v/cloud_topics/level_zero/read_request_scheduler/read_request_scheduler.cc b/src/v/cloud_topics/level_zero/read_request_scheduler/read_request_scheduler.cc index 31041f580ed39..d7c7fe9a765ae 100644 --- a/src/v/cloud_topics/level_zero/read_request_scheduler/read_request_scheduler.cc +++ b/src/v/cloud_topics/level_zero/read_request_scheduler/read_request_scheduler.cc @@ -37,7 +37,7 @@ ss::shard_id shard_for(const read_request& req) { // The placeholder batch can't span multiple objects so it's safe // to check only the first extent. auto h = hasher(req.query.meta.front().id.name); - auto shard = h % ss::smp::count; + auto shard = h % ss::this_smp_shard_count(); return static_cast(shard); } diff --git a/src/v/cloud_topics/level_zero/tests/l0_object_size_distribution_test.cc b/src/v/cloud_topics/level_zero/tests/l0_object_size_distribution_test.cc index 8f74bd5254fb5..50ee24fb27cdb 100644 --- a/src/v/cloud_topics/level_zero/tests/l0_object_size_distribution_test.cc +++ b/src/v/cloud_topics/level_zero/tests/l0_object_size_distribution_test.cc @@ -193,7 +193,7 @@ TEST_F(L0ObjectSizeDistFixture, ThreeToOne) { * cores and then expects that these are eventually grouped together and * uploaded as a single object by the scheduler/batcher. */ - ASSERT_EQ(seastar::smp::count, 3); + ASSERT_EQ(seastar::this_smp_shard_count(), 3); start(false).get(); const auto timeout = 1s; @@ -202,7 +202,7 @@ TEST_F(L0ObjectSizeDistFixture, ThreeToOne) { // build batches to upload to each core size_t total_size{0}; std::vector> batches; - for (unsigned i = 0; i < seastar::smp::count; ++i) { + for (unsigned i = 0; i < seastar::this_smp_shard_count(); ++i) { batches.emplace_back(); auto buf = model::test::make_random_batches().get(); for (auto& b : buf) { diff --git a/src/v/cloud_topics/level_zero/write_request_scheduler/tests/write_request_scheduler_test.cc b/src/v/cloud_topics/level_zero/write_request_scheduler/tests/write_request_scheduler_test.cc index f4685fe842330..0a0f55f163fe6 100644 --- a/src/v/cloud_topics/level_zero/write_request_scheduler/tests/write_request_scheduler_test.cc +++ b/src/v/cloud_topics/level_zero/write_request_scheduler/tests/write_request_scheduler_test.cc @@ -240,7 +240,7 @@ static auto make_random_batches(size_t num_batches, size_t batch_size) { TEST_F_CORO(write_request_balancer_fixture, time_deadline_test) { // This test produces some batches and expects the time deadline // to trigger the upload. - ASSERT_TRUE_CORO(ss::smp::count > 1); + ASSERT_TRUE_CORO(ss::this_smp_shard_count() > 1); co_await start(false); @@ -265,7 +265,7 @@ TEST_F_CORO(write_request_balancer_fixture, test_core_affinity) { // the unified scheduling policy to trigger the upload. It expects // that write requests to land on one shard using round-robin scheduling. // With round-robin, shard 0 will handle the first upload (ix=0). - ASSERT_TRUE_CORO(ss::smp::count > 1); + ASSERT_TRUE_CORO(ss::this_smp_shard_count() > 1); co_await start(false); @@ -306,7 +306,7 @@ TEST_F_CORO(write_request_balancer_fixture, test_core_affinity) { TEST_F_CORO(write_request_balancer_fixture, data_threshold_test) { // Single shard produces enough data to trigger L0 upload - ASSERT_TRUE_CORO(ss::smp::count > 1); + ASSERT_TRUE_CORO(ss::this_smp_shard_count() > 1); auto size_threshold = config::shard_local_cfg() .cloud_topics_produce_batching_size_threshold.value(); @@ -333,7 +333,7 @@ TEST_F_CORO(write_request_balancer_fixture, test_data_threshold_with_failover) { // alternates between shards. This test verifies that all requests are // eventually acknowledged regardless of which shard handles the upload. - ASSERT_TRUE_CORO(ss::smp::count > 1); + ASSERT_TRUE_CORO(ss::this_smp_shard_count() > 1); auto size_threshold = config::shard_local_cfg() .cloud_topics_produce_batching_size_threshold.value(); @@ -373,7 +373,7 @@ TEST_F_CORO( // uploads. With round-robin scheduling, the target shard is selected based // on the ix counter, not on data volume. The test verifies that failures // are correctly propagated regardless of which shard handles the upload. - ASSERT_TRUE_CORO(ss::smp::count > 1); + ASSERT_TRUE_CORO(ss::this_smp_shard_count() > 1); static constexpr size_t batch_size = 0x1000; co_await start(false); diff --git a/src/v/cloud_topics/level_zero/write_request_scheduler/write_request_scheduler.cc b/src/v/cloud_topics/level_zero/write_request_scheduler/write_request_scheduler.cc index e5371fcdf757c..7837aaab54c59 100644 --- a/src/v/cloud_topics/level_zero/write_request_scheduler/write_request_scheduler.cc +++ b/src/v/cloud_topics/level_zero/write_request_scheduler/write_request_scheduler.cc @@ -368,13 +368,13 @@ ss::future<> write_request_scheduler::start() { // Create scheduler context on shard 0. // The context is shared between all shards. // FixedArrays are sized at construction time. - _shard_zero_context.emplace(ss::smp::count); + _shard_zero_context.emplace(ss::this_smp_shard_count()); // Initialize shards with their backlog size references. // shard_to_group and groups are already default-initialized by // the FixedArray constructor (padded_atomic_group_id defaults to // group_id{0}, shard_group default-constructs with current time). - for (unsigned ix = 0; ix < ss::smp::count; ix++) { + for (unsigned ix = 0; ix < ss::this_smp_shard_count(); ix++) { for (const counter_shard& sc : shard_bytes) { if (sc.shard == ix) { _shard_zero_context->shards[ix] = shard_state( @@ -438,7 +438,7 @@ write_request_scheduler::run_once() { case schedule_action::upload: { // Collect shard info for shards in this group std::vector shard_bytes; - shard_bytes.resize(ss::smp::count); + shard_bytes.resize(ss::this_smp_shard_count()); _context->get_shard_bytes_vec(shard_bytes, result.gid); // Only shutdown exceptions are expected here diff --git a/src/v/cluster/archival/tests/archival_service_fixture.h b/src/v/cluster/archival/tests/archival_service_fixture.h index 9a03cfb6b05f8..1a836cfe3a159 100644 --- a/src/v/cluster/archival/tests/archival_service_fixture.h +++ b/src/v/cluster/archival/tests/archival_service_fixture.h @@ -500,7 +500,8 @@ class archiver_cluster_fixture Value map_reduce(Mapper map, Value initial, Reduce reduce) { Value acc = initial; for (auto& [node_id, value] : apps) { - for (size_t cpu_id = 0; cpu_id < ss::smp::count; cpu_id++) { + for (size_t cpu_id = 0; cpu_id < ss::this_smp_shard_count(); + cpu_id++) { auto res = ss::smp::submit_to(cpu_id, [node_id, &map] { return map(node_id); }).get(); diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index 6f67e4643003a..7c44b06d0e579 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -668,12 +668,12 @@ ss::future<> controller::start( conf_invariants.core_count); } - if (conf_invariants.core_count > ss::smp::count) { + if (conf_invariants.core_count > ss::this_smp_shard_count()) { // Successfully starting shard_balancer with reduced core count means // that all partition info from extra kvstores has been copied and we // can finally update the configuration invariants. auto new_invariants = configuration_invariants( - *config::node().node_id(), ss::smp::count); + *config::node().node_id(), ss::this_smp_shard_count()); co_await _storage.local().kvs().put( storage::kvstore::key_space::controller, invariants_key(), @@ -1303,7 +1303,7 @@ controller::validate_configuration_invariants() { "Node id must be set before checking configuration invariants"); auto current = configuration_invariants( - *config::node().node_id(), ss::smp::count); + *config::node().node_id(), ss::this_smp_shard_count()); if (!invariants_buf) { // store configuration invariants diff --git a/src/v/cluster/controller_api.cc b/src/v/cluster/controller_api.cc index 44b5794de2fad..69e4cd525c5c7 100644 --- a/src/v/cluster/controller_api.cc +++ b/src/v/cluster/controller_api.cc @@ -205,7 +205,8 @@ controller_api::get_reconciliation_state(model::ntp ntp) { } // query controller backends for in progress operations ss::chunked_fifo ops; - const auto shards = boost::irange(0, ss::smp::count); + const auto shards = boost::irange( + 0, ss::this_smp_shard_count()); for (auto shard : shards) { auto shard_op = co_await get_current_op(ntp, shard); if (shard_op) { diff --git a/src/v/cluster/controller_backend.h b/src/v/cluster/controller_backend.h index 06bf5ebedc049..09953fe6a5ec0 100644 --- a/src/v/cluster/controller_backend.h +++ b/src/v/cluster/controller_backend.h @@ -242,8 +242,8 @@ class controller_backend void notify_reconciliation(const model::ntp&); /// Copy partition kvstore data from an extra shard (i.e. kvstore shard that - /// is >= ss::smp::count). This method is expected to be called *before* - /// start(). + /// is >= ss::this_smp_shard_count()). This method is expected to be called + /// *before* start(). ss::future<> transfer_partitions_from_extra_shard( storage::kvstore&, shard_placement_table&); diff --git a/src/v/cluster/metadata_dissemination_handler.cc b/src/v/cluster/metadata_dissemination_handler.cc index 22766e8837529..ac61ef84eb40c 100644 --- a/src/v/cluster/metadata_dissemination_handler.cc +++ b/src/v/cluster/metadata_dissemination_handler.cc @@ -50,7 +50,7 @@ metadata_dissemination_handler::do_update_leadership( std::move(leaders), [this](const chunked_vector& leaders) { return ss::parallel_for_each( - boost::irange(0, ss::smp::count), + boost::irange(0, ss::this_smp_shard_count()), [this, &leaders](ss::shard_id shard) { return ss::smp::submit_to(shard, [this, &leaders] { return ss::do_for_each( diff --git a/src/v/cluster/node_status_backend.h b/src/v/cluster/node_status_backend.h index d80aa8abfd98e..2ed125c428367 100644 --- a/src/v/cluster/node_status_backend.h +++ b/src/v/cluster/node_status_backend.h @@ -94,7 +94,7 @@ class node_status_backend { private: ss::shard_id connection_source_shard(model::node_id target) const { - return target % ss::smp::count; + return target % ss::this_smp_shard_count(); } backoff_policy create_backoff_policy() const { diff --git a/src/v/cluster/rpc_utils.cc b/src/v/cluster/rpc_utils.cc index 3075f1f77a198..a909c2f1cf99c 100644 --- a/src/v/cluster/rpc_utils.cc +++ b/src/v/cluster/rpc_utils.cc @@ -42,7 +42,7 @@ model::broker make_self_broker(const config::node_config& node_cfg) { // Calculate memory size const auto shard_mem = ss::memory::stats(); - uint64_t total_mem = shard_mem.total_memory() * ss::smp::count; + uint64_t total_mem = shard_mem.total_memory() * ss::this_smp_shard_count(); // If memory is <1GB, we'll return zero. That case is already // handled when reading this field (e.g. in // `partition_allocator::check_cluster_limits`) because earlier redpanda @@ -69,7 +69,7 @@ model::broker make_self_broker(const config::node_config& node_cfg) { node_cfg.rack, model::broker_properties{ // TODO: populate or remote etc_props, mount_paths - .cores = ss::smp::count, + .cores = ss::this_smp_shard_count(), .available_memory_gb = total_mem_gb, .available_disk_gb = disk_gb, .available_memory_bytes = total_mem, diff --git a/src/v/cluster/security_manager.cc b/src/v/cluster/security_manager.cc index 1f169ee23d403..ff2d92a79eb37 100644 --- a/src/v/cluster/security_manager.cc +++ b/src/v/cluster/security_manager.cc @@ -169,9 +169,9 @@ ss::future security_manager::dispatch_updates_to_cores( using ret_t = std::vector; return ss::do_with( ret_t{}, [cmd = std::move(cmd), &service](ret_t& ret) mutable { - ret.reserve(ss::smp::count); + ret.reserve(ss::this_smp_shard_count()); return ss::parallel_for_each( - boost::irange(0, (int)ss::smp::count), + boost::irange(0, (int)ss::this_smp_shard_count()), [&ret, &cmd, &service](int shard) { return do_apply(shard, copy_cmd(cmd), service) .then([&ret](std::error_code r) { ret.push_back(r); }); diff --git a/src/v/cluster/self_test/diskcheck.cc b/src/v/cluster/self_test/diskcheck.cc index f23ea579f3b0b..e8d21b41240e1 100644 --- a/src/v/cluster/self_test/diskcheck.cc +++ b/src/v/cluster/self_test/diskcheck.cc @@ -281,7 +281,7 @@ ss::future> diskcheck::run(diskcheck_opts opts) { ss::future> diskcheck::run_configured_benchmarks(ss::sstring basename) { auto active_shards = std::min( - _opts.parallelism, ss::smp::count); + _opts.parallelism, ss::this_smp_shard_count()); auto parallelism_per_shard = _opts.parallelism / active_shards; auto remainder = _opts.parallelism % active_shards; diff --git a/src/v/cluster/shard_balancer.cc b/src/v/cluster/shard_balancer.cc index d90712bf9a8f8..ac14d12eeb715 100644 --- a/src/v/cluster/shard_balancer.cc +++ b/src/v/cluster/shard_balancer.cc @@ -69,7 +69,7 @@ shard_balancer::shard_balancer( , _partitions_per_shard(std::move(partitions_per_shard)) , _partitions_reserve_shard0(std::move(partitions_reserve_shard0)) , _balance_timer([this] { balance_timer_callback(); }) - , _total_counts(ss::smp::count, 0) { + , _total_counts(ss::this_smp_shard_count(), 0) { _total_counts.at(0) += 1; // controller partition _debounce_timeout.watch([this] { @@ -123,7 +123,7 @@ ss::future<> shard_balancer::start(size_t kvstore_shard_count) { }); } - if (kvstore_shard_count > ss::smp::count) { + if (kvstore_shard_count > ss::this_smp_shard_count()) { // Check that we can decrease shard count ss::sstring reject_reason; @@ -135,7 +135,8 @@ ss::future<> shard_balancer::start(size_t kvstore_shard_count) { if (!_balancing_on_core_count_change()) { reject_reason = "balancing on core count change is disabled"; } - size_t max_capacity = ss::smp::count * _partitions_per_shard(); + size_t max_capacity = ss::this_smp_shard_count() + * _partitions_per_shard(); max_capacity -= std::min( max_capacity, static_cast(_partitions_reserve_shard0())); if (local_group2ntp.size() > max_capacity) { @@ -152,13 +153,14 @@ ss::future<> shard_balancer::start(size_t kvstore_shard_count) { "Detected decrease in number of cores dedicated to run Redpanda " "from {} to {}, but it is impossible because {}.", kvstore_shard_count, - ss::smp::count, + ss::this_smp_shard_count(), reject_reason)); } } std::vector> extra_kvstores; - for (ss::shard_id s = ss::smp::count; s < kvstore_shard_count; ++s) { + for (ss::shard_id s = ss::this_smp_shard_count(); s < kvstore_shard_count; + ++s) { extra_kvstores.push_back(co_await _storage.make_extra_kvstore(s)); } @@ -169,7 +171,7 @@ ss::future<> shard_balancer::start(size_t kvstore_shard_count) { extra_kvstores, [](auto& kvs) { return kvs->stop(); }); }); - if (kvstore_shard_count > ss::smp::count) { + if (kvstore_shard_count > ss::this_smp_shard_count()) { // Now that all partition info is copied from extra kvstores, we can // remove them. co_await _storage.log_mgr().remove_orphan_files( @@ -177,7 +179,8 @@ ss::future<> shard_balancer::start(size_t kvstore_shard_count) { {model::redpanda_ns}, [](model::ntp ntp, storage::partition_path::metadata) { return ntp.tp.topic == model::kvstore_topic - && ntp.tp.partition() >= static_cast(ss::smp::count); + && ntp.tp.partition() + >= static_cast(ss::this_smp_shard_count()); }); } @@ -278,7 +281,7 @@ ss::future<> shard_balancer::init_shard_placement( _balancing_on_core_count_change() && _features.is_active(features::feature::node_local_core_assignment)) { co_await balance_on_core_count_change( - lock, ss::smp::count + extra_kvstores.size()); + lock, ss::this_smp_shard_count() + extra_kvstores.size()); } // 4. Move partition info from extra kvstores @@ -328,7 +331,7 @@ shard_balancer::reassign_shard(model::ntp ntp, ss::shard_id shard) { auto lock = co_await _mtx.get_units(); - if (shard >= ss::smp::count) { + if (shard >= ss::this_smp_shard_count()) { co_return errc::invalid_request; } auto replicas_view = _topics.local().get_replicas_view(ntp); @@ -530,8 +533,8 @@ ss::future<> shard_balancer::balance_on_core_count_change( // the first time, and this is a good time to rebalance as well. if ( - last_rebalance_core_count == ss::smp::count - && kvstore_shard_count == ss::smp::count) { + last_rebalance_core_count == ss::this_smp_shard_count() + && kvstore_shard_count == ss::this_smp_shard_count()) { co_return; } @@ -601,7 +604,7 @@ ss::future<> shard_balancer::do_balance(ssx::mutex::units& lock) { state_kvstore_key(), serde::to_iobuf( persisted_state{ - .last_rebalance_core_count = ss::smp::count, + .last_rebalance_core_count = ss::this_smp_shard_count(), })); } @@ -673,7 +676,8 @@ ss::shard_id shard_balancer::choose_shard( return score; }; optimize_level( - std::views::iota(ss::shard_id(0), ss::shard_id(ss::smp::count)), + std::views::iota( + ss::shard_id(0), ss::shard_id(ss::this_smp_shard_count())), topic_count_score); auto total_count_score = [&](ss::shard_id shard) { @@ -702,18 +706,18 @@ void shard_balancer::update_counts( topic_data_t& topic_data, const std::optional& prev, const std::optional& next) { - // Shard values that are >= ss::smp::count are possible when initializing - // shard placement after a core count decrease. We ignore them because - // partition counts on extra shards are not needed for balancing. + // Shard values that are >= ss::this_smp_shard_count() are possible when + // initializing shard placement after a core count decrease. We ignore them + // because partition counts on extra shards are not needed for balancing. - if (prev && prev->shard < ss::smp::count) { + if (prev && prev->shard < ss::this_smp_shard_count()) { topic_data.shard2count.at(prev->shard) -= 1; topic_data.total_count -= 1; // TODO: check negative values _total_counts.at(prev->shard) -= 1; } - if (next && next->shard < ss::smp::count) { + if (next && next->shard < ss::this_smp_shard_count()) { topic_data.shard2count.at(next->shard) += 1; topic_data.total_count += 1; _total_counts.at(next->shard) += 1; diff --git a/src/v/cluster/shard_balancer.h b/src/v/cluster/shard_balancer.h index 9e28eba4a91f7..62dd43e7c8364 100644 --- a/src/v/cluster/shard_balancer.h +++ b/src/v/cluster/shard_balancer.h @@ -91,7 +91,7 @@ class shard_balancer { using shard2count_t = std::vector; struct topic_data_t { explicit topic_data_t() - : shard2count(ss::smp::count, 0) {} + : shard2count(ss::this_smp_shard_count(), 0) {} int32_t total_count = 0; shard2count_t shard2count; diff --git a/src/v/cluster/shard_placement_table.cc b/src/v/cluster/shard_placement_table.cc index 0ab9a3eba31fc..342272bb47399 100644 --- a/src/v/cluster/shard_placement_table.cc +++ b/src/v/cluster/shard_placement_table.cc @@ -436,7 +436,7 @@ shard_placement_table::initialize_from_kvstore( for (size_t i = 0; i < extra_kvstores.size(); ++i) { extra_spts.push_back( std::make_unique( - ss::smp::count + i, *extra_kvstores[i])); + ss::this_smp_shard_count() + i, *extra_kvstores[i])); } // 1. gather kvstore markers from all shards @@ -629,14 +629,14 @@ ss::future<> shard_placement_table::scatter_init_data( if (_shard == init_data.hosted.shard) { if ( init_data.receiving.shard - && init_data.receiving.shard < ss::smp::count) { + && init_data.receiving.shard < ss::this_smp_shard_count()) { state._next = placement_state::versioned_shard{ .shard = init_data.receiving.shard.value(), .revision = init_data.receiving.revision}; } } else if ( _shard != init_data.receiving.shard || !init_data.hosted.shard - || _shard >= ss::smp::count) { + || _shard >= ss::this_smp_shard_count()) { state.set_hosted_status(hosted_status::obsolete, *_probe); } } @@ -652,7 +652,7 @@ ss::future<> shard_placement_table::scatter_init_data( state._is_initial_for = init_data.log_revision; } - if (_shard >= ss::smp::count) { + if (_shard >= ss::this_smp_shard_count()) { // mark states on extra shards as ready to transfer state.set_assigned(std::nullopt, *_probe); } @@ -790,7 +790,10 @@ ss::future<> shard_placement_table::set_target( if (target) { vassert( - target->shard < ss::smp::count, "[{}] bad target: {}", ntp, target); + target->shard < ss::this_smp_shard_count(), + "[{}] bad target: {}", + ntp, + target); } // ensure that there is no concurrent enable_persistence() call @@ -859,7 +862,7 @@ ss::future<> shard_placement_table::set_target( assignment_kvstore_key(target->group), std::move(marker_buf)); }); - } else if (prev_target.value().shard < ss::smp::count) { + } else if (prev_target.value().shard < ss::this_smp_shard_count()) { co_await container().invoke_on( prev_target.value().shard, [group = prev_target->group, &ntp](shard_placement_table& other) { @@ -908,7 +911,7 @@ ss::future<> shard_placement_table::set_target( } if ( - prev_target && prev_target->shard < ss::smp::count + prev_target && prev_target->shard < ss::this_smp_shard_count() && (!target || target->shard != prev_target->shard)) { co_await container().invoke_on( prev_target->shard, @@ -939,7 +942,8 @@ ss::future<> shard_placement_table::set_target( // 3. Lastly, remove obsolete kvstore marker if ( - _persistence_enabled && prev_target && prev_target->shard < ss::smp::count + _persistence_enabled && prev_target + && prev_target->shard < ss::this_smp_shard_count() && (!target || target->shard != prev_target->shard)) { co_await container().invoke_on( prev_target->shard, diff --git a/src/v/cluster/tests/shard_placement_table_test.cc b/src/v/cluster/tests/shard_placement_table_test.cc index 2bd8ae4c66be1..739714cac617a 100644 --- a/src/v/cluster/tests/shard_placement_table_test.cc +++ b/src/v/cluster/tests/shard_placement_table_test.cc @@ -709,7 +709,7 @@ class reconciliation_backend // Limit concurrency to 4 so that there are more interesting repeats in randomly // generated shard ids. ss::shard_id get_max_shard_id() { - return std::min(ss::smp::count - 1, ss::shard_id(3)); + return std::min(ss::this_smp_shard_count() - 1, ss::shard_id(3)); } /// Simplified version of shard_balancer that just assigns ntps to random diff --git a/src/v/cluster/topic_updates_dispatcher.cc b/src/v/cluster/topic_updates_dispatcher.cc index ff8899cf4d5ae..1a66f98c6ea03 100644 --- a/src/v/cluster/topic_updates_dispatcher.cc +++ b/src/v/cluster/topic_updates_dispatcher.cc @@ -561,7 +561,7 @@ template ss::future topic_updates_dispatcher::dispatch_updates_to_cores(Cmd cmd, model::offset o) { auto results = co_await ssx::parallel_transform( - boost::irange(0, ss::smp::count), + boost::irange(0, ss::this_smp_shard_count()), [this, cmd = std::move(cmd), o](ss::shard_id shard) mutable { return do_apply(shard, cmd, _topic_table, o); }); diff --git a/src/v/debug_bundle/tests/debug_bundle_service_test.cc b/src/v/debug_bundle/tests/debug_bundle_service_test.cc index 1b47f31f540f9..9f979be2e4b19 100644 --- a/src/v/debug_bundle/tests/debug_bundle_service_test.cc +++ b/src/v/debug_bundle/tests/debug_bundle_service_test.cc @@ -421,7 +421,7 @@ TEST_F_CORO(debug_bundle_service_started_fixture, try_running_multiple) { ASSERT_TRUE_CORO(res.has_value()) << res.assume_error().message(); auto res2 = co_await _service.invoke_on( - (debug_bundle::service_shard + 1) % ss::smp::count, + (debug_bundle::service_shard + 1) % ss::this_smp_shard_count(), [](debug_bundle::service& s) { return s.initiate_rpk_debug_bundle_collection( debug_bundle::job_id_t(uuid_t::create()), {}); diff --git a/src/v/http/tests/utils.cc b/src/v/http/tests/utils.cc index 13a4352fe97ac..fc6b2a410333c 100644 --- a/src/v/http/tests/utils.cc +++ b/src/v/http/tests/utils.cc @@ -45,16 +45,15 @@ ss::future> flexible_function_handler::handle( .then([this](std::unique_ptr rep) { if ( _content_type_overrides.contains(rep->get_header("Content-Type"))) { - rep->done(); + // content type already set via override; nothing to do } else if (_content_type == "xml") { // Because `application/xml` is not implemented as a mapping // in `http/mime_types.cc`, in order to construct a reply with // the `Content-Type` header set to `application/xml`, we // need to hard code a path here. rep->set_content_type("application/xml"); - rep->done(); } else { - rep->done(_content_type); + rep->set_content_type(_content_type); } return ss::make_ready_future>( std::move(rep)); diff --git a/src/v/kafka/server/datalake_throttle_manager.cc b/src/v/kafka/server/datalake_throttle_manager.cc index 4b296d4319434..862ae5b634a51 100644 --- a/src/v/kafka/server/datalake_throttle_manager.cc +++ b/src/v/kafka/server/datalake_throttle_manager.cc @@ -157,7 +157,7 @@ ss::future<> datalake_throttle_manager::gc_and_update_global_producers_map() { * total backlog */ auto shard_local_maps = co_await ssx::parallel_transform( - boost::irange(ss::smp::count), [this](auto shard_id) { + boost::irange(ss::this_smp_shard_count()), [this](auto shard_id) { return container().invoke_on( shard_id, [status = _translation_status, disk_space_info = _disk_space_info]( diff --git a/src/v/kafka/server/fetch_session_cache.h b/src/v/kafka/server/fetch_session_cache.h index 1cc84b54a3eca..5841f059d2420 100644 --- a/src/v/kafka/server/fetch_session_cache.h +++ b/src/v/kafka/server/fetch_session_cache.h @@ -48,7 +48,8 @@ class fetch_session_cache { // held in a cache on single core is limitted by the memory usage. size_t max_sessions_per_core() { static const size_t v - = std::numeric_limits::max() / ss::smp::count; + = std::numeric_limits::max() + / ss::this_smp_shard_count(); return v; } diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 4b0b0e1557d3a..0c040b30ba6cb 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -893,7 +893,7 @@ class fetch_worker { class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { public: nonpolling_fetch_plan_executor() - : _last_result_size(ss::smp::count, 0) + : _last_result_size(ss::this_smp_shard_count(), 0) , _fetch_timeout{[this] { _has_progress.signal(); }} {} /** @@ -948,9 +948,9 @@ class nonpolling_fetch_plan_executor final : public fetch_plan_executor::impl { // start fetching from a random shard to make sure that we fetch data // from all the partitions even if we reach fetch message size limit const ss::shard_id start_shard_idx = random_generators::get_int( - ss::smp::count - 1); - for (size_t i = 0; i < ss::smp::count; ++i) { - auto shard = (start_shard_idx + i) % ss::smp::count; + ss::this_smp_shard_count() - 1); + for (size_t i = 0; i < ss::this_smp_shard_count(); ++i) { + auto shard = (start_shard_idx + i) % ss::this_smp_shard_count(); ssx::spawn_with_gate(_workers_gate, [&]() mutable { return handle_exceptions(start_shard_fetch_worker( @@ -1228,7 +1228,7 @@ void for_each_fetch_partition(const op_context& octx, const Func& f) { class simple_fetch_planner final : public fetch_planner::impl { fetch_plan create_plan(op_context& octx) final { - fetch_plan plan(ss::smp::count); + fetch_plan plan(ss::this_smp_shard_count()); auto resp_it = octx.response_begin(); auto bytes_left_in_plan = octx.bytes_left; diff --git a/src/v/kafka/server/snc_quota_manager.cc b/src/v/kafka/server/snc_quota_manager.cc index 7bdbd030253fb..be3236611c0cd 100644 --- a/src/v/kafka/server/snc_quota_manager.cc +++ b/src/v/kafka/server/snc_quota_manager.cc @@ -62,7 +62,7 @@ void snc_quotas_probe::setup_metrics() { "Currently effective quota, in bytes/s"); static constexpr auto calc_quota = [](const std::optional& q) { if (q.has_value()) { - return q.value() / ss::smp::count; + return q.value() / ss::this_smp_shard_count(); } constexpr int64_t max_without_conversion_error = std::numeric_limits::max() / 1024 * 1024; diff --git a/src/v/kafka/server/tests/datalake_throttle_manager_test.cc b/src/v/kafka/server/tests/datalake_throttle_manager_test.cc index c382f19f651a6..54ac602a9a07c 100644 --- a/src/v/kafka/server/tests/datalake_throttle_manager_test.cc +++ b/src/v/kafka/server/tests/datalake_throttle_manager_test.cc @@ -85,7 +85,7 @@ ss::future<> mark_producers_on_random_shards( ss::sharded& mgr, int producer_cnt) { for (auto i : std::ranges::views::iota(0, producer_cnt)) { auto shard = random_generators::get_int( - 0, ss::smp::count - 1); + 0, ss::this_smp_shard_count() - 1); co_await mgr.invoke_on( shard, [i](kafka::datalake_throttle_manager& local_mgr) { local_mgr.mark_datalake_producer(fmt::format("producer-{}", i)); @@ -155,7 +155,8 @@ TEST_F(IcebergThrottlingManagerTest, TestProducerEviction) { TEST_F(IcebergThrottlingManagerTest, TestHandlingAnonymousProducers) { // mark anonymous producer on random shard - auto shard = random_generators::get_int(0, ss::smp::count - 1); + auto shard = random_generators::get_int( + 0, ss::this_smp_shard_count() - 1); manager .invoke_on( shard, diff --git a/src/v/kafka/server/tests/fetch_bench.cc b/src/v/kafka/server/tests/fetch_bench.cc index 77f1b9da3c60a..b8ee163ea6fbf 100644 --- a/src/v/kafka/server/tests/fetch_bench.cc +++ b/src/v/kafka/server/tests/fetch_bench.cc @@ -306,7 +306,7 @@ struct fetch_bench_fixture : redpanda_thread_fixture { // Creates a topic with two partitions. One on shard 0 the other on // shard 1. ss::future initialize_multi_partition_topic() { - vassert(ss::smp::count >= 2, "requires at least 2 shards"); + vassert(ss::this_smp_shard_count() >= 2, "requires at least 2 shards"); auto t = co_await create_topic({ model::broker_shard{this_node(), 0}, diff --git a/src/v/kafka/server/tests/fetch_memory_units_test.cc b/src/v/kafka/server/tests/fetch_memory_units_test.cc index 4b7671dce291c..60c6ef36e1f72 100644 --- a/src/v/kafka/server/tests/fetch_memory_units_test.cc +++ b/src/v/kafka/server/tests/fetch_memory_units_test.cc @@ -105,7 +105,7 @@ class fetch_memory_units_test_fixture : public seastar_test { }; TEST_F_CORO(fetch_memory_units_test_fixture, test_cross_shard_free) { - EXPECT_GE(ss::smp::count, 2); + EXPECT_GE(ss::this_smp_shard_count(), 2); const auto max_release_size = kafka::fetch_memory_units_manager::max_release_size; @@ -115,7 +115,8 @@ TEST_F_CORO(fetch_memory_units_test_fixture, test_cross_shard_free) { co_await set_kafka_units(max_release_size); co_await set_fetch_units(max_release_size); - auto other_shard_id = (ss::this_shard_id() + 1) % ss::smp::count; + auto other_shard_id = (ss::this_shard_id() + 1) + % ss::this_smp_shard_count(); auto other_kafka_sem_avail = [&] { return sharded_kafka_sem().invoke_on(other_shard_id, [](auto& sem_sev) { diff --git a/src/v/kafka/server/tests/produce_consume_test.cc b/src/v/kafka/server/tests/produce_consume_test.cc index 17137eb5d8485..a3cb0790a9bd6 100644 --- a/src/v/kafka/server/tests/produce_consume_test.cc +++ b/src/v/kafka/server/tests/produce_consume_test.cc @@ -307,7 +307,7 @@ template auto map_reduce_thread_per_core( Mapper&& mapper, Initial&& initial, Reduce&& reduce) { return ss::map_reduce( - boost::irange(0u, ss::smp::count), + boost::irange(0u, ss::this_smp_shard_count()), [&mapper](auto shard) { return async_submit_to(shard, std::forward(mapper)); }, @@ -319,7 +319,7 @@ template auto transform_reduce_thread_per_core( Mapper&& mapper, Initial&& initial, Reduce&& reduce) { return transform_reduce( - boost::irange(0u, ss::smp::count), + boost::irange(0u, ss::this_smp_shard_count()), [&mapper](auto shard) { return async_submit_to(shard, std::forward(mapper)).get(); }, @@ -469,7 +469,7 @@ struct throughput_limits_fixure : prod_consume_fixture { constexpr auto min_rate = std::min(rate_limit_in, rate_limit_out); const double expected_max_throttle = (double(batch_size) / min_rate) - * (policy == execution::seq ? 1 : ss::smp::count) + * (policy == execution::seq ? 1 : ss::this_smp_shard_count()) * (honour_throttle ? 1 : 2); const size_t tolerance_percent = 8; config_set( @@ -482,7 +482,7 @@ struct throughput_limits_fixure : prod_consume_fixture { config_set("max_kafka_throttle_delay_ms", 30'000ms); wait_for_controller_leadership().get(); - start(ss::smp::count); + start(ss::this_smp_shard_count()); // PRODUCE smaller batches for 5s per client const auto batches_cnt = (5s).count() * rate_limit_in @@ -536,7 +536,8 @@ struct throughput_limits_fixure : prod_consume_fixture { policy, [&](auto i) { const model::partition_id p_id(i); - const auto data_cap = (kafka_in_data_len / ss::smp::count) + const auto data_cap = (kafka_in_data_len + / ss::this_smp_shard_count()) - batch_size * 2; return do_consume(i, data_cap, honour_throttle); }, diff --git a/src/v/kafka/server/tests/quota_manager_bench.cc b/src/v/kafka/server/tests/quota_manager_bench.cc index 9b6ce5b3790d1..c7f0e07407a34 100644 --- a/src/v/kafka/server/tests/quota_manager_bench.cc +++ b/src/v/kafka/server/tests/quota_manager_bench.cc @@ -88,7 +88,8 @@ struct throughput_test_case { }; future run_tc(throughput_test_case tc) { - co_await test_quota_manager(total_requests / ss::smp::count, tc.use_unique); + co_await test_quota_manager( + total_requests / ss::this_smp_shard_count(), tc.use_unique); co_return total_requests; } @@ -163,7 +164,8 @@ future run_latency_test(latency_test_case tc) { unsigned shard = tc.on_shard_0 ? 0 : 1; BOOST_ASSERT_MSG( - shard < ss::smp::count, "Not enough cores available for the benchmark"); + shard < ss::this_smp_shard_count(), + "Not enough cores available for the benchmark"); co_await ss::smp::submit_to( shard, [&sqm, "a_store, tc](this auto) -> ss::future<> { diff --git a/src/v/net/conn_quota.cc b/src/v/net/conn_quota.cc index d5d6b54750716..3e7a8039d4353 100644 --- a/src/v/net/conn_quota.cc +++ b/src/v/net/conn_quota.cc @@ -206,7 +206,7 @@ ss::shard_id conn_quota::addr_to_shard(ss::net::inet_address addr) const { return total_shard; } else { uint32_t hash = xxhash_32((char*)(addr.data()), addr.size()); - return hash % ss::smp::count; + return hash % ss::this_smp_shard_count(); } } @@ -419,7 +419,7 @@ bool conn_quota::try_get_units(home_allowance& allowance) { bool conn_quota::should_leave_reclaim(home_allowance& allowance) { return allowance.reclaim // Must be enough tokens for it to be worth borrowing any - && allowance.max > ss::smp::count + && allowance.max > ss::this_smp_shard_count() // Must have at least half its tokens free && allowance.available > allowance.max / 2 // Must not be in the middle of starting a reclaim diff --git a/src/v/net/tests/conn_quota_test.cc b/src/v/net/tests/conn_quota_test.cc index 5f90c84416f0b..60525d75f1167 100644 --- a/src/v/net/tests/conn_quota_test.cc +++ b/src/v/net/tests/conn_quota_test.cc @@ -79,7 +79,7 @@ struct conn_quota_fixture { } void drop_shard_units() { - for (ss::shard_id i = 0; i < ss::smp::count; ++i) { + for (ss::shard_id i = 0; i < ss::this_smp_shard_count(); ++i) { scq.invoke_on(i, [i, this](conn_quota&) { shard_units.erase(i); }) .get(); } @@ -131,7 +131,7 @@ struct conn_quota_fixture { * Helper for acquiring units on all the shards at once. */ void take_on_all(uint32_t take_units, ss::net::inet_address addr = addr1) { - for (ss::shard_id i = 0; i < ss::smp::count; ++i) { + for (ss::shard_id i = 0; i < ss::this_smp_shard_count(); ++i) { take_on_shard(i, addr, take_units); } } @@ -290,7 +290,7 @@ void conn_quota_fixture::test_borrows( } FIXTURE_TEST(test_total_borrows, conn_quota_fixture) { - auto core_count = ss::smp::count; + auto core_count = ss::this_smp_shard_count(); // This test needs at least a few cores. If you run it with -c 1 it // won't work. We're testing sharded logic so we really do need multiple @@ -304,13 +304,13 @@ FIXTURE_TEST(test_total_borrows, conn_quota_fixture) { * Variant of test_total_borrows that stresses the per-IP limit instead */ FIXTURE_TEST(test_per_ip_borrows, conn_quota_fixture) { - auto core_count = ss::smp::count; + auto core_count = ss::this_smp_shard_count(); BOOST_REQUIRE(core_count >= 4); test_borrows(core_count, 2, std::nullopt, core_count * 2); } FIXTURE_TEST(test_change_limits, conn_quota_fixture) { - auto core_count = ss::smp::count; + auto core_count = ss::this_smp_shard_count(); uint32_t initial_limit = core_count * 3; start(initial_limit, std::nullopt); @@ -364,7 +364,7 @@ FIXTURE_TEST(test_change_limits, conn_quota_fixture) { } FIXTURE_TEST(test_decrease_limit, conn_quota_fixture) { - auto core_count = ss::smp::count; + auto core_count = ss::this_smp_shard_count(); uint32_t initial_limit = core_count * 3; start(initial_limit, std::nullopt); @@ -397,7 +397,7 @@ FIXTURE_TEST(test_decrease_limit, conn_quota_fixture) { } FIXTURE_TEST(test_change_limits_per_ip, conn_quota_fixture) { - auto core_count = ss::smp::count; + auto core_count = ss::this_smp_shard_count(); uint32_t initial_limit = core_count * 3; start(std::nullopt, initial_limit); @@ -437,7 +437,7 @@ FIXTURE_TEST(test_change_limits_per_ip, conn_quota_fixture) { * limit for a particular IP vs. the general per-IP limit */ FIXTURE_TEST(test_overrides, conn_quota_fixture) { - auto core_count = ss::smp::count; + auto core_count = ss::this_smp_shard_count(); uint32_t general_limit = core_count * 2; vlog(logger.info, "Constructing with overrides"); diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index 88af56f99b386..035cf26bc6c7f 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -45,19 +45,19 @@ ss::shard_id shard_for(const context_subject& sub) { auto hasher = incremental_xxhash64{}; hasher.update(sub.ctx()); hasher.update(sub.sub()); - return jump_consistent_hash(hasher.digest(), ss::smp::count); + return jump_consistent_hash(hasher.digest(), ss::this_smp_shard_count()); } ss::shard_id shard_for(const context_schema_id& id) { auto hasher = incremental_xxhash64{}; hasher.update(id.ctx()); hasher.update(id.id()); - return jump_consistent_hash(hasher.digest(), ss::smp::count); + return jump_consistent_hash(hasher.digest(), ss::this_smp_shard_count()); } ss::shard_id shard_for(const context& ctx) { auto hash = xxhash_64(ctx().data(), ctx().length()); - return jump_consistent_hash(hash, ss::smp::count); + return jump_consistent_hash(hash, ss::this_smp_shard_count()); } compatibility_result check_compatible( diff --git a/src/v/pandaproxy/schema_registry/test/mt_sharded_store.cc b/src/v/pandaproxy/schema_registry/test/mt_sharded_store.cc index 66307cd08a21d..c2bacdbd61fe1 100644 --- a/src/v/pandaproxy/schema_registry/test/mt_sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/test/mt_sharded_store.cc @@ -68,13 +68,14 @@ SEASTAR_THREAD_TEST_CASE(test_sharded_store_cross_shard_def) { ss::parallel_for_each( boost::irange(0, num_parallel_requests), [&store, i](auto shrd) { - return ss::smp::submit_to(shrd % ss::smp::count, [&store, i]() { - return store - .get_schema_definition( - pps::context_schema_id{ - pps::default_context, pps::schema_id{i}}) - .discard_result(); - }); + return ss::smp::submit_to( + shrd % ss::this_smp_shard_count(), [&store, i]() { + return store + .get_schema_definition( + pps::context_schema_id{ + pps::default_context, pps::schema_id{i}}) + .discard_result(); + }); }) .get(); } diff --git a/src/v/pandaproxy/server.h b/src/v/pandaproxy/server.h index c4342cdb8e74b..e3f93bf230d6b 100644 --- a/src/v/pandaproxy/server.h +++ b/src/v/pandaproxy/server.h @@ -46,7 +46,7 @@ class server_probe; inline ss::shard_id user_shard(const ss::sstring& name) { auto hash = xxhash_64(name.data(), name.length()); - return jump_consistent_hash(hash, ss::smp::count); + return jump_consistent_hash(hash, ss::this_smp_shard_count()); } namespace impl { @@ -59,7 +59,7 @@ concept KafkaRequestFactory = KafkaRequestType>; inline ss::shard_id consumer_shard(const kafka::group_id& g_id) { auto hash = xxhash_64(g_id().data(), g_id().length()); - return jump_consistent_hash(hash, ss::smp::count); + return jump_consistent_hash(hash, ss::this_smp_shard_count()); } } // namespace impl diff --git a/src/v/raft/coordinated_recovery_throttle.cc b/src/v/raft/coordinated_recovery_throttle.cc index 8ceef09036e3d..3352848dd29eb 100644 --- a/src/v/raft/coordinated_recovery_throttle.cc +++ b/src/v/raft/coordinated_recovery_throttle.cc @@ -86,7 +86,7 @@ coordinated_recovery_throttle::coordinated_recovery_throttle( config::binding rate_binding, config::binding use_static) : _rate_binding(std::move(rate_binding)) , _use_static_allocation(std::move(use_static)) - , _throttler(_rate_binding() / ss::smp::count) { + , _throttler(_rate_binding() / ss::this_smp_shard_count()) { if (ss::this_shard_id() == _coordinator_shard) { _coordinator.set_callback([this] { ssx::spawn_with_gate(_gate, [this] { @@ -223,7 +223,7 @@ ss::future<> coordinated_recovery_throttle::do_coordinate_tick() { ssize_t total_rate = _rate_binding() ? std::clamp( _rate_binding(), - size_t{ss::smp::count}, + size_t{ss::this_smp_shard_count()}, std::numeric_limits::max() / 4) : 0; @@ -250,14 +250,15 @@ ss::future<> coordinated_recovery_throttle::do_coordinate_tick() { if (unlikely(_use_static_allocation())) { // Don't take individual shard requests into account, distribute equally - auto fair_shard_rate = to_add / ss::smp::count; + auto fair_shard_rate = to_add / ss::this_smp_shard_count(); co_return co_await renew_capacity_all_shards(fair_shard_rate); } // Surplus after we satisfy hard requirements (that is, bring all shard // rates to 0 assuming they stay as they are while we're calculating). // Negative means net overuse. - ssize_t avg_surplus = (to_add + total_remaining_units) / ss::smp::count; + ssize_t avg_surplus = (to_add + total_remaining_units) + / ss::this_smp_shard_count(); vlog( raftlog.trace, "Coordinated recovery throttle tick: total_rate={}, " diff --git a/src/v/raft/tests/coordinated_recovery_throttle_test.cc b/src/v/raft/tests/coordinated_recovery_throttle_test.cc index fbd62d78b42c0..9a340a51b7d3e 100644 --- a/src/v/raft/tests/coordinated_recovery_throttle_test.cc +++ b/src/v/raft/tests/coordinated_recovery_throttle_test.cc @@ -31,14 +31,15 @@ struct test_fixture { static constexpr std::chrono::seconds timeout{5}; test_fixture() { - BOOST_REQUIRE_GT(ss::smp::count, 1); + BOOST_REQUIRE_GT(ss::this_smp_shard_count(), 1); BOOST_REQUIRE_EQUAL(ss::this_shard_id(), 0); - vlog(logger.info, "using smp count: {}", ss::smp::count); + vlog(logger.info, "using smp count: {}", ss::this_smp_shard_count()); _as.start().get(); // Here we don't trigger the coordinator timer intentionally so that // the test can step thru the ticks manually as needed using // coordinator_tick(). That gives more control over the test state. - _config_rate.start(ss::smp::count * initial_rate_per_shard).get(); + _config_rate.start(ss::this_smp_shard_count() * initial_rate_per_shard) + .get(); _config_use_static.start(false).get(); _throttler .start( @@ -186,7 +187,8 @@ FIXTURE_TEST(throttler_test_simple, test_fixture) { } FIXTURE_TEST(throttler_test_rebalancing, test_fixture) { - for (auto i : std::views::iota(ss::shard_id{1}, ss::smp::count)) { + for (auto i : + std::views::iota(ss::shard_id{1}, ss::this_smp_shard_count())) { vlog(logger.info, "Running iteration: {}", i); // Each iteration of this loop consumes the entirety of bandwidth @@ -194,7 +196,7 @@ FIXTURE_TEST(throttler_test_rebalancing, test_fixture) { // bandwidth should be shared among [0, i) resulting in progress. // step 1: Consume all rate on [0, i) - auto total_rate = local().available() * ss::smp::count; + auto total_rate = local().available() * ss::this_smp_shard_count(); auto throttle_per_shard = total_rate / i; std::vector> throttled; for (auto j : boost::irange(i)) { @@ -235,14 +237,15 @@ FIXTURE_TEST(throttler_test_rebalancing, test_fixture) { // bandwidth after rebalancing. ssize_t total_available = 0; for (auto current : all_available().get()) { - BOOST_REQUIRE(std::abs(current) < ss::smp::count); + BOOST_REQUIRE(std::abs(current) < ss::this_smp_shard_count()); total_available += current; } - BOOST_REQUIRE(std::abs(total_available) < ss::smp::count); + BOOST_REQUIRE(std::abs(total_available) < ss::this_smp_shard_count()); // Nothing waiting BOOST_REQUIRE_EQUAL( - std::ranges::count(all_waiting().get(), 0), ss::smp::count); + std::ranges::count(all_waiting().get(), 0), + ss::this_smp_shard_count()); // All admitted bytes accounted for. wait_until([this, throttle_per_shard, i] { return all_admitted().then([throttle_per_shard, @@ -251,7 +254,7 @@ FIXTURE_TEST(throttler_test_rebalancing, test_fixture) { admitted | std::views::take(i), throttle_per_shard - 1) == i && std::ranges::count(admitted | std::views::drop(i), 0) - == ss::smp::count - i; + == ss::this_smp_shard_count() - i; }); }); @@ -261,13 +264,14 @@ FIXTURE_TEST(throttler_test_rebalancing, test_fixture) { ss::shard_id shard_id = 0; for (auto current : all_available().get()) { if (shard_id < i) { - BOOST_REQUIRE(current > throttle_per_shard - ss::smp::count); + BOOST_REQUIRE( + current > throttle_per_shard - ss::this_smp_shard_count()); } else { - BOOST_REQUIRE(current < ss::smp::count); + BOOST_REQUIRE(current < ss::this_smp_shard_count()); } ++shard_id; } - BOOST_REQUIRE(std::abs(total_available) < ss::smp::count); + BOOST_REQUIRE(std::abs(total_available) < ss::this_smp_shard_count()); // Step 5: In a few ticks, all shards should be back to fair rate. wait_until([this] { @@ -275,7 +279,7 @@ FIXTURE_TEST(throttler_test_rebalancing, test_fixture) { .then([this] { return all_available(); }) .then([](std::vector available) { return std::ranges::count(available, initial_rate_per_shard) - == ss::smp::count; + == ss::this_smp_shard_count(); }); }); } @@ -284,7 +288,7 @@ FIXTURE_TEST(throttler_test_rebalancing, test_fixture) { FIXTURE_TEST(throttler_rate_update, test_fixture) { auto current_shard_rate = local().available(); for (auto curr : {current_shard_rate / 2, current_shard_rate * 2}) { - auto new_rate = curr * ss::smp::count; + auto new_rate = curr * ss::this_smp_shard_count(); update_rate(new_rate); tests::cooperative_spin_wait_with_timeout(timeout, [this, curr] { return coordinator_tick().then( @@ -298,7 +302,8 @@ FIXTURE_TEST(overusing, test_fixture) { throttle_on_shard(0, 1).get(); check_available(0, initial_rate_per_shard - 1); - for (auto shard_id : std::views::iota(ss::shard_id{1}, ss::smp::count)) { + for (auto shard_id : + std::views::iota(ss::shard_id{1}, ss::this_smp_shard_count())) { check_available(shard_id, initial_rate_per_shard); } @@ -309,7 +314,8 @@ FIXTURE_TEST(overusing, test_fixture) { // Ensure the throttling is registered but not satisfied. ss::yield().get(); check_waiting(0, oversized_req); - for (auto shard_id : std::views::iota(ss::shard_id{1}, ss::smp::count)) { + for (auto shard_id : + std::views::iota(ss::shard_id{1}, ss::this_smp_shard_count())) { check_waiting(shard_id, 0); } BOOST_REQUIRE(!f.available()); @@ -318,12 +324,14 @@ FIXTURE_TEST(overusing, test_fixture) { _as.invoke_on(0, std::mem_fn(&ss::abort_source::request_abort)).get(); f.wait(); f.get_exception(); - BOOST_REQUIRE(std::ranges::count(all_waiting().get(), 0) == ss::smp::count); + BOOST_REQUIRE( + std::ranges::count(all_waiting().get(), 0) == ss::this_smp_shard_count()); // Trigger a tick to refill the buckets. coordinator_tick().get(); check_available(0, initial_rate_per_shard); - for (auto shard_id : std::views::iota(ss::shard_id{1}, ss::smp::count)) { + for (auto shard_id : + std::views::iota(ss::shard_id{1}, ss::this_smp_shard_count())) { check_available(shard_id, initial_rate_per_shard); } @@ -332,13 +340,15 @@ FIXTURE_TEST(overusing, test_fixture) { // Check that the admitted bytes include the borrowed amount. check_admitted(0, oversized_req); - for (auto shard_id : std::views::iota(ss::shard_id{1}, ss::smp::count)) { + for (auto shard_id : + std::views::iota(ss::shard_id{1}, ss::this_smp_shard_count())) { check_admitted(shard_id, 0); } // Check that available capacity is negative check_available(0, -initial_rate_per_shard); - for (auto shard_id : std::views::iota(ss::shard_id{1}, ss::smp::count)) { + for (auto shard_id : + std::views::iota(ss::shard_id{1}, ss::this_smp_shard_count())) { check_available(shard_id, initial_rate_per_shard); } @@ -353,9 +363,9 @@ FIXTURE_TEST(overusing, test_fixture) { })); auto sum = std::ranges::fold_left(available, ssize_t{0}, std::plus{}); auto deviation_from_full = std::abs( - sum - ss::smp::count * initial_rate_per_shard); + sum - ss::this_smp_shard_count() * initial_rate_per_shard); // allow for rounding errors - BOOST_REQUIRE(deviation_from_full <= ss::smp::count); + BOOST_REQUIRE(deviation_from_full <= ss::this_smp_shard_count()); } FIXTURE_TEST(overusing_a_lot, test_fixture) { @@ -369,14 +379,17 @@ FIXTURE_TEST(overusing_a_lot, test_fixture) { | std::views::transform([this, shard_id, size_multiplier](int) { return throttle_on_shard( shard_id, - initial_rate_per_shard * ss::smp::count * size_multiplier); + initial_rate_per_shard * ss::this_smp_shard_count() + * size_multiplier); })); }; auto all_throttle_f = ssx::when_all_succeed( - std::views::iota(ss::shard_id{0}, ss::smp::count) + std::views::iota(ss::shard_id{0}, ss::this_smp_shard_count()) | std::views::transform(request_on_shard)); - auto ticks = size_multiplier * requests_per_shard * ss::smp::count + 1; + auto ticks = size_multiplier * requests_per_shard + * ss::this_smp_shard_count() + + 1; for (size_t i = 0; i <= ticks; ++i) { coordinator_tick().get(); } @@ -404,15 +417,15 @@ FIXTURE_TEST(overusing_on_zero_rate, test_fixture) { check_waiting(0, 1); BOOST_ASSERT(!f.available()); - update_rate(ss::smp::count); + update_rate(ss::this_smp_shard_count()); coordinator_tick().get(); f.get(); } FIXTURE_TEST(allowance_staying_negative, test_fixture) { - ssize_t original_rate = 100 * ss::smp::count; - ssize_t reduced_rate = 10 * ss::smp::count; - ssize_t oversized_request = 10000 * ss::smp::count; + ssize_t original_rate = 100 * ss::this_smp_shard_count(); + ssize_t reduced_rate = 10 * ss::this_smp_shard_count(); + ssize_t oversized_request = 10000 * ss::this_smp_shard_count(); update_rate(original_rate); coordinator_tick().get(); @@ -431,5 +444,5 @@ FIXTURE_TEST(allowance_staying_negative, test_fixture) { auto deviation = std::abs(sum - expected_sum); // allow for rounding errors - BOOST_REQUIRE(deviation <= ss::smp::count); + BOOST_REQUIRE(deviation <= ss::this_smp_shard_count()); } diff --git a/src/v/raft/tests/foreign_entry_test.cc b/src/v/raft/tests/foreign_entry_test.cc index 9a6c652c65470..b9bff9d9808fb 100644 --- a/src/v/raft/tests/foreign_entry_test.cc +++ b/src/v/raft/tests/foreign_entry_test.cc @@ -162,11 +162,11 @@ FIXTURE_TEST(sharing_one_reader, foreign_entry_fixture) { std::vector copies = // clang-format off raft::details::foreign_share_n(gen_config_record_batch_reader(3), - ss::smp::count).get(); + ss::this_smp_shard_count()).get(); // clang-format on - BOOST_REQUIRE_EQUAL(copies.size(), ss::smp::count); - for (ss::shard_id shard = 0; shard < ss::smp::count; ++shard) { + BOOST_REQUIRE_EQUAL(copies.size(), ss::this_smp_shard_count()); + for (ss::shard_id shard = 0; shard < ss::this_smp_shard_count(); ++shard) { info("Submitting shared reader to shard:{}", shard); auto cfg = // MUST return the config; otherwise thread exception @@ -191,14 +191,14 @@ FIXTURE_TEST(sharing_correcteness_test, foreign_entry_fixture) { auto rdr = model::make_memory_record_batch_reader(std::move(batches)); auto refs = raft::details::share_n(std::move(rdr), 2).get(); auto shared = raft::details::foreign_share_n( - std::move(refs.back()), ss::smp::count) + std::move(refs.back()), ss::this_smp_shard_count()) .get(); refs.pop_back(); auto reference_batches = model::consume_reader_to_memory( std::move(refs.back()), model::no_timeout) .get(); - BOOST_REQUIRE_EQUAL(shared.size(), ss::smp::count); + BOOST_REQUIRE_EQUAL(shared.size(), ss::this_smp_shard_count()); for (auto& copy : shared) { auto shared = model::consume_reader_to_memory( std::move(copy), model::no_timeout) @@ -214,12 +214,12 @@ FIXTURE_TEST(copy_lots_of_readers, foreign_entry_fixture) { { auto rdr = gen_config_record_batch_reader(1); share_copies = raft::details::foreign_share_n( - std::move(rdr), ss::smp::count) + std::move(rdr), ss::this_smp_shard_count()) .get(); } - BOOST_REQUIRE_EQUAL(share_copies.size(), ss::smp::count); + BOOST_REQUIRE_EQUAL(share_copies.size(), ss::this_smp_shard_count()); - for (ss::shard_id shard = 0; shard < ss::smp::count; ++shard) { + for (ss::shard_id shard = 0; shard < ss::this_smp_shard_count(); ++shard) { info("Submitting shared raft::entry to shard:{}", shard); auto cfg = ss::smp::submit_to( shard, diff --git a/src/v/raft/tests/raft_fixture_base.cc b/src/v/raft/tests/raft_fixture_base.cc index 58dbd99ef437d..bc1eb3167f57f 100644 --- a/src/v/raft/tests/raft_fixture_base.cc +++ b/src/v/raft/tests/raft_fixture_base.cc @@ -616,7 +616,7 @@ seastar::future<> raft_fixture_base::stop() { } seastar::future<> raft_fixture_base::start() { - for (auto cpu : ss::smp::all_cpus()) { + for (auto cpu : ss::this_smp_all_shards()) { co_await ss::smp::submit_to(cpu, [] { config::shard_local_cfg().disable_metrics.set_value(true); config::shard_local_cfg().disable_public_metrics.set_value(true); diff --git a/src/v/redpanda/admin/debug.cc b/src/v/redpanda/admin/debug.cc index 36fb905a61220..e896ab9c82ef9 100644 --- a/src/v/redpanda/admin/debug.cc +++ b/src/v/redpanda/admin/debug.cc @@ -586,7 +586,7 @@ void admin_server::register_debug_routes() { using admin::apply_validator; void check_shard_id(seastar::shard_id id) { - auto max_shard_id = ss::smp::count - 1; + auto max_shard_id = ss::this_smp_shard_count() - 1; if (id > max_shard_id) { throw ss::httpd::bad_param_exception( fmt::format("Shard id too high, max shard id is {}", max_shard_id)); @@ -1172,7 +1172,7 @@ ss::future> admin_server::put_ctracker_va( fmt::format("Invalid shard id: {}", req->get_path_param("shard"))); } - if (shard >= ss::smp::count) { + if (shard >= ss::this_smp_shard_count()) { throw ss::httpd::bad_param_exception( fmt::format("Invalid shard id: {}", shard)); } diff --git a/src/v/redpanda/admin/kafka_connections_service.cc b/src/v/redpanda/admin/kafka_connections_service.cc index 09b04d049d6c4..3a54a9c1360bd 100644 --- a/src/v/redpanda/admin/kafka_connections_service.cc +++ b/src/v/redpanda/admin/kafka_connections_service.cc @@ -81,7 +81,7 @@ ss::future gather_all_shards( connection_collector& global_collector) { size_t total_matching_connections = 0; - for (ss::shard_id shard = 0; shard < ss::smp::count; ++shard) { + for (ss::shard_id shard = 0; shard < ss::this_smp_shard_count(); ++shard) { auto accumulated_count = global_collector.size(); auto shard_result = co_await kafka_server.invoke_on( shard, diff --git a/src/v/redpanda/admin/server.cc b/src/v/redpanda/admin/server.cc index 622c783580424..ffc249301856c 100644 --- a/src/v/redpanda/admin/server.cc +++ b/src/v/redpanda/admin/server.cc @@ -397,7 +397,6 @@ class rpc_handler : public ss::httpd::handler_base { std::current_exception()); rep = serde::pb::rpc::internal_exception().handle(std::move(rep)); } - rep->done(); co_return rep; } diff --git a/src/v/redpanda/admin/services/internal/level_zero.cc b/src/v/redpanda/admin/services/internal/level_zero.cc index c4636fdaf2e3d..a7a6486bada90 100644 --- a/src/v/redpanda/admin/services/internal/level_zero.cc +++ b/src/v/redpanda/admin/services/internal/level_zero.cc @@ -97,7 +97,7 @@ level_zero_service_impl::get_status( if (local == apply_local::yes) { auto gc_states = co_await _gc->map( [](const cloud_topics::level_zero_gc& gc) { return gc.get_state(); }); - if (gc_states.size() != ss::smp::count) { + if (gc_states.size() != ss::this_smp_shard_count()) { throw serde::pb::rpc::internal_exception( "Status collection failed"); } diff --git a/src/v/redpanda/application_config.cc b/src/v/redpanda/application_config.cc index c312ca20050f7..57f9941163da3 100644 --- a/src/v/redpanda/application_config.cc +++ b/src/v/redpanda/application_config.cc @@ -118,7 +118,7 @@ void set_auditing_kafka_client_defaults( void log_system_resources( ss::logger& log, const boost::program_options::variables_map& cfg) { const auto shard_mem = ss::memory::stats(); - auto total_mem = shard_mem.total_memory() * ss::smp::count; + auto total_mem = shard_mem.total_memory() * ss::this_smp_shard_count(); /** * IMPORTANT: copied out of seastar `resources.cc`, if logic in seastar will * change we have to change our logic in here. @@ -133,7 +133,7 @@ void log_system_resources( log.info, "System resources: {{ cpus: {}, available memory: {}, reserved memory: " "{}}}", - ss::smp::count, + ss::this_smp_shard_count(), human::bytes(total_mem), human::bytes(reserve)); @@ -218,7 +218,9 @@ compaction_controller_config(ss::scheduling_group sg, uint64_t fs_avail) { static const int64_t backlog_avail_percents = 10; return config::shard_local_cfg() .compaction_ctrl_backlog_size() - .value_or((fs_avail / 100) * backlog_avail_percents / ss::smp::count); + .value_or( + (fs_avail / 100) * backlog_avail_percents + / ss::this_smp_shard_count()); }; /** @@ -238,7 +240,7 @@ compaction_controller_config(ss::scheduling_group sg, uint64_t fs_avail) { * k_p = 1000 / 80 = 12.5 * */ - auto normalization = fs_avail / (1000 * ss::smp::count); + auto normalization = fs_avail / (1000 * ss::this_smp_shard_count()); return storage::backlog_controller_config( config::shard_local_cfg().compaction_ctrl_p_coeff.bind(), @@ -268,7 +270,7 @@ make_upload_controller_config(ss::scheduling_group sg, uint64_t fs_avail) { auto setpoint_function = []() { return 0; }; int64_t normalization = static_cast(fs_avail) - / (1000 * ss::smp::count); + / (1000 * ss::this_smp_shard_count()); return { config::shard_local_cfg().cloud_storage_upload_ctrl_p_coeff.bind(), config::mock_binding(0.0), diff --git a/src/v/resource_mgmt/smp_groups.cc b/src/v/resource_mgmt/smp_groups.cc index 00f921678f0a3..e6d18ece1256c 100644 --- a/src/v/resource_mgmt/smp_groups.cc +++ b/src/v/resource_mgmt/smp_groups.cc @@ -64,7 +64,7 @@ smp_groups::default_raft_non_local_requests(uint32_t max_partitions_per_core) { return max_partitions_per_core * (max_append_requests_per_follower + additional_requests_per_follower) - * (ss::smp::count - 1); + * (ss::this_smp_shard_count() - 1); } ss::future> diff --git a/src/v/resource_mgmt/tests/cpu_profiler_test.cc b/src/v/resource_mgmt/tests/cpu_profiler_test.cc index 5ede3a63347d1..cd912c810e465 100644 --- a/src/v/resource_mgmt/tests/cpu_profiler_test.cc +++ b/src/v/resource_mgmt/tests/cpu_profiler_test.cc @@ -138,9 +138,10 @@ SEASTAR_THREAD_TEST_CASE(test_cpu_profiler_enable_override) { }) .get(); - BOOST_REQUIRE_EQUAL(ss::smp::count, results.size()); + BOOST_REQUIRE_EQUAL(ss::this_smp_shard_count(), results.size()); - for (ss::shard_id shard_id = 0; shard_id < ss::smp::count; ++shard_id) { + for (ss::shard_id shard_id = 0; shard_id < ss::this_smp_shard_count(); + ++shard_id) { auto& shard_results = results[shard_id]; BOOST_TEST(shard_results.samples.size() >= 1); BOOST_REQUIRE_EQUAL(shard_id, shard_results.shard); diff --git a/src/v/rpc/connection_cache.cc b/src/v/rpc/connection_cache.cc index 52a89db2fb863..5fd48b46b1c4e 100644 --- a/src/v/rpc/connection_cache.cc +++ b/src/v/rpc/connection_cache.cc @@ -136,7 +136,8 @@ connection_cache::connection_cache( if (ss::this_shard_id() == _coordinator_shard) { _coordinator_state = std::make_unique( ssx::mutex{"connection_cache"}, - connection_allocation_strategy(connections_per_node, ss::smp::count)); + connection_allocation_strategy( + connections_per_node, ss::this_smp_shard_count())); } } diff --git a/src/v/rpc/connection_cache.h b/src/v/rpc/connection_cache.h index a6441cb8fb010..d3462414c64ff 100644 --- a/src/v/rpc/connection_cache.h +++ b/src/v/rpc/connection_cache.h @@ -196,7 +196,7 @@ class connection_cache final model::node_id self, ss::shard_id src, model::node_id node, - ss::shard_id max_shards = ss::smp::count) const; + ss::shard_id max_shards = ss::this_smp_shard_count()) const; private: std::optional _label; diff --git a/src/v/rpc/test/rpc_gen_cycling_test.cc b/src/v/rpc/test/rpc_gen_cycling_test.cc index dfffc364a38e4..3cee4bc34ee9c 100644 --- a/src/v/rpc/test/rpc_gen_cycling_test.cc +++ b/src/v/rpc/test/rpc_gen_cycling_test.cc @@ -284,7 +284,7 @@ FIXTURE_TEST(basic_cache_ops, rpc_integration_fixture) { .get(); absl::flat_hash_map shards_with_con; - for (auto shard : ss::smp::all_cpus()) { + for (auto shard : ss::this_smp_all_shards()) { auto con_shard = cc.invoke_on( shard, [](auto& c) { diff --git a/src/v/rpc/test/rpc_integration_fixture.h b/src/v/rpc/test/rpc_integration_fixture.h index e72fdf2b27cb1..8033b4e585298 100644 --- a/src/v/rpc/test/rpc_integration_fixture.h +++ b/src/v/rpc/test/rpc_integration_fixture.h @@ -166,7 +166,7 @@ class rpc_sharded_integration_fixture : public rpc_base_integration_fixture { void check_server() override { const bool all_initialized = ss::map_reduce( boost::irange( - 0, ss::smp::count), + 0, ss::this_smp_shard_count()), [this](unsigned /*c*/) { return ss::make_ready_future( _server.local_is_initialized()); diff --git a/src/v/ssx/sharded_ptr.h b/src/v/ssx/sharded_ptr.h index d01af58f9bd1d..d02f5bd7f0b66 100644 --- a/src/v/ssx/sharded_ptr.h +++ b/src/v/ssx/sharded_ptr.h @@ -108,7 +108,7 @@ class sharded_ptr { assert_shard(); auto mu{co_await _mutex.get_units()}; if (_state.empty()) { - _state.resize(ss::smp::count); + _state.resize(ss::this_smp_shard_count()); } auto copy_to_deallocate_on_owner_shard = local(); diff --git a/src/v/ssx/sharded_value.h b/src/v/ssx/sharded_value.h index 330d53207476f..ec2a66b3d7fdc 100644 --- a/src/v/ssx/sharded_value.h +++ b/src/v/ssx/sharded_value.h @@ -29,7 +29,7 @@ template class sharded_value { public: explicit sharded_value(T value) - : _state(ss::smp::count, ssx::aligned{value}) {} + : _state(ss::this_smp_shard_count(), ssx::aligned{value}) {} ~sharded_value() noexcept = default; sharded_value(sharded_value&& other) noexcept = default; diff --git a/src/v/ssx/tests/abort_source_test.cc b/src/v/ssx/tests/abort_source_test.cc index b4c766762482b..49a15bf3cb462 100644 --- a/src/v/ssx/tests/abort_source_test.cc +++ b/src/v/ssx/tests/abort_source_test.cc @@ -64,7 +64,7 @@ struct fixture { }; SEASTAR_THREAD_TEST_CASE(ssx_sharded_abort_source_test_abort_parent) { - BOOST_REQUIRE(ss::smp::count > 1); + BOOST_REQUIRE(ss::this_smp_shard_count() > 1); fixture f; f.start().get(); @@ -88,7 +88,7 @@ SEASTAR_THREAD_TEST_CASE(ssx_sharded_abort_source_test_abort_parent) { } SEASTAR_THREAD_TEST_CASE(ssx_sharded_abort_source_test_no_abort_parent) { - BOOST_REQUIRE(ss::smp::count > 1); + BOOST_REQUIRE(ss::this_smp_shard_count() > 1); fixture f; f.start().get(); diff --git a/src/v/ssx/tests/sharded_ptr_test.cc b/src/v/ssx/tests/sharded_ptr_test.cc index b3e3c63d3c159..b8a223dee24e2 100644 --- a/src/v/ssx/tests/sharded_ptr_test.cc +++ b/src/v/ssx/tests/sharded_ptr_test.cc @@ -24,7 +24,7 @@ SEASTAR_THREAD_TEST_CASE(test_sharded_ptr_basic_ops) { BOOST_REQUIRE_EQUAL(p0.shard_id(), ss::this_shard_id()); // Test operator bool (before reset) - for (auto i : std::views::iota(0u, ss::smp::count)) { + for (auto i : std::views::iota(0u, ss::this_smp_shard_count())) { ss::smp::submit_to(i, [&]() { BOOST_REQUIRE(!p0); }).get(); } @@ -33,7 +33,7 @@ SEASTAR_THREAD_TEST_CASE(test_sharded_ptr_basic_ops) { p0.reset(43).get(); // Test operator bool and deref (after reset) - for (auto i : std::views::iota(0u, ss::smp::count)) { + for (auto i : std::views::iota(0u, ss::this_smp_shard_count())) { ss::smp::submit_to(i, [&]() { BOOST_REQUIRE(p0 && p0.operator*() == 43); BOOST_REQUIRE(p0 && *p0.operator->() == 43); @@ -42,7 +42,7 @@ SEASTAR_THREAD_TEST_CASE(test_sharded_ptr_basic_ops) { // Test operator bool (after stop) p0.stop().get(); - for (auto i : std::views::iota(0u, ss::smp::count)) { + for (auto i : std::views::iota(0u, ss::this_smp_shard_count())) { ss::smp::submit_to(i, [&]() { BOOST_REQUIRE(!p0); }).get(); } diff --git a/src/v/ssx/tests/single_sharded_test.cc b/src/v/ssx/tests/single_sharded_test.cc index 17f0689b6b8eb..9cc6d6e3fb1a5 100644 --- a/src/v/ssx/tests/single_sharded_test.cc +++ b/src/v/ssx/tests/single_sharded_test.cc @@ -131,7 +131,7 @@ struct caller { }; SEASTAR_THREAD_TEST_CASE(single_sharded) { - ss::shard_id the_shard = ss::smp::count - 1; + ss::shard_id the_shard = ss::this_smp_shard_count() - 1; ss::sharded counters; ssx::single_sharded single; @@ -163,7 +163,8 @@ SEASTAR_THREAD_TEST_CASE(single_sharded) { bool on_the_shard = the_shard == ss::this_shard_id(); BOOST_REQUIRE_EQUAL(cntr.started, on_the_shard ? 1 : 0); BOOST_REQUIRE_EQUAL( - cntr.called_foo, on_the_shard ? ss::smp::count * 3 + 1 : 0); + cntr.called_foo, + on_the_shard ? ss::this_smp_shard_count() * 3 + 1 : 0); BOOST_REQUIRE_EQUAL(cntr.stopped, on_the_shard ? 1 : 0); }) .get(); @@ -171,10 +172,10 @@ SEASTAR_THREAD_TEST_CASE(single_sharded) { } SEASTAR_THREAD_TEST_CASE(single_sharded_wrong_shard) { - BOOST_REQUIRE(ss::smp::count > 1); + BOOST_REQUIRE(ss::this_smp_shard_count() > 1); - ss::shard_id the_shard = ss::smp::count - 2; - ss::shard_id wrong_shard = ss::smp::count - 1; + ss::shard_id the_shard = ss::this_smp_shard_count() - 2; + ss::shard_id wrong_shard = ss::this_smp_shard_count() - 1; ss::sharded counters; ssx::single_sharded single; diff --git a/src/v/ssx/tests/thread_worker.cc b/src/v/ssx/tests/thread_worker.cc index 0362ad28d1776..f061067a96fb7 100644 --- a/src/v/ssx/tests/thread_worker.cc +++ b/src/v/ssx/tests/thread_worker.cc @@ -28,12 +28,13 @@ struct move_only { template auto thread_worker_test() { - BOOST_REQUIRE_GT(ss::smp::count, 1); + BOOST_REQUIRE_GT(ss::this_smp_shard_count(), 1); auto w = ssx::singleton_thread_worker{}; w.start({}).get(); - std::vector>> all_results(ss::smp::count); + std::vector>> all_results( + ss::this_smp_shard_count()); ss::smp::invoke_on_all([&w, &all_results]() { auto& results = all_results[ss::this_shard_id()]; diff --git a/src/v/storage/api.h b/src/v/storage/api.h index fed90cce59f09..4e2a507133b72 100644 --- a/src/v/storage/api.h +++ b/src/v/storage/api.h @@ -48,7 +48,7 @@ class api : public ss::peering_sharded_service { ss::future> make_extra_kvstore(ss::shard_id s) { vassert( - s >= ss::smp::count, + s >= ss::this_smp_shard_count(), "can't make extra kvstore for existing shard {}", s); auto kvs = std::make_unique( diff --git a/src/v/storage/opfuzz/opfuzz.cc b/src/v/storage/opfuzz/opfuzz.cc index ddb3a6a41c1a7..f7e635a5e7cc8 100644 --- a/src/v/storage/opfuzz/opfuzz.cc +++ b/src/v/storage/opfuzz/opfuzz.cc @@ -139,7 +139,8 @@ struct append_op_foreign final : opfuzz::op { ~append_op_foreign() noexcept override = default; const char* name() const final { return "append_op_foreign"; } ss::future<> invoke(opfuzz::op_context ctx) final { - auto source_core = random_generators::get_int(ss::smp::count - 1); + auto source_core = random_generators::get_int( + ss::this_smp_shard_count() - 1); return ss::smp::submit_to( source_core, [ctx] { diff --git a/src/v/storage/storage_resources.cc b/src/v/storage/storage_resources.cc index 330d9b435c946..6957c24df2840 100644 --- a/src/v/storage/storage_resources.cc +++ b/src/v/storage/storage_resources.cc @@ -21,7 +21,7 @@ namespace { uint64_t per_shard_target_replay_bytes(uint64_t global_target_replay_bytes) { - return global_target_replay_bytes / ss::smp::count; + return global_target_replay_bytes / ss::this_smp_shard_count(); } } // namespace @@ -38,15 +38,17 @@ storage_resources::storage_resources( , _compaction_index_mem_limit(compaction_index_memory) , _append_chunk_size(internal::chunks().chunk_size()) , _offset_translator_dirty_bytes( - _global_target_replay_bytes() / ss::smp::count) + _global_target_replay_bytes() / ss::this_smp_shard_count()) , _configuration_manager_dirty_bytes( - _global_target_replay_bytes() / ss::smp::count) - , _stm_dirty_bytes(_global_target_replay_bytes() / ss::smp::count) + _global_target_replay_bytes() / ss::this_smp_shard_count()) + , _stm_dirty_bytes(_global_target_replay_bytes() / ss::this_smp_shard_count()) , _compaction_index_bytes(_compaction_index_mem_limit()) , _inflight_recovery( - std::max(_max_concurrent_replay() / ss::smp::count, uint64_t{1})) + std::max( + _max_concurrent_replay() / ss::this_smp_shard_count(), uint64_t{1})) , _inflight_close_flush( - std::max(_max_concurrent_replay() / ss::smp::count, uint64_t{1})) { + std::max( + _max_concurrent_replay() / ss::this_smp_shard_count(), uint64_t{1})) { // Register notifications on configuration changes _global_target_replay_bytes.watch([this]() { auto v = per_shard_target_replay_bytes(_global_target_replay_bytes()); @@ -58,7 +60,7 @@ storage_resources::storage_resources( }); _max_concurrent_replay.watch([this]() { - auto v = _max_concurrent_replay() / ss::smp::count; + auto v = _max_concurrent_replay() / ss::this_smp_shard_count(); // Guard against case where core count is higher than // total concurrent replay count. @@ -140,7 +142,8 @@ size_t storage_resources::calc_falloc_step() { // is uneven, this may lead to us underestimasting how much space // is available, which is safe. - uint64_t space_free_this_shard = _space_allowance_free / ss::smp::count; + uint64_t space_free_this_shard = _space_allowance_free + / ss::this_smp_shard_count(); // Only use up to half the available space for fallocs. uint64_t space_per_partition = (space_free_this_shard / 2) diff --git a/src/v/storage/tests/offset_translator_tests.cc b/src/v/storage/tests/offset_translator_tests.cc index 59355dbbe1572..63b52af627922 100644 --- a/src/v/storage/tests/offset_translator_tests.cc +++ b/src/v/storage/tests/offset_translator_tests.cc @@ -637,7 +637,7 @@ TEST_F(base_fixture, test_moving_persistent_state) { validate_translation(local_ot, model::offset(11), model::offset(5)); // use last available shard - auto target_shard = ss::smp::count - 1; + auto target_shard = ss::this_smp_shard_count() - 1; // move state to target shard storage::offset_translator::copy_persistent_state( raft::group_id(0), _api.local().kvs(), target_shard, _api) diff --git a/src/v/syschecks/syschecks.cc b/src/v/syschecks/syschecks.cc index 3cec7016c5ae5..2e3ff72b3a766 100644 --- a/src/v/syschecks/syschecks.cc +++ b/src/v/syschecks/syschecks.cc @@ -37,6 +37,8 @@ ss::sstring to_string(ss::fs_type fs) { return "hfs"; case ss::fs_type::tmpfs: return "tmpfs"; + case ss::fs_type::hugetlbfs: + return "hugetlbfs"; }; return "bad_enum"; } diff --git a/src/v/wasm/tests/wasm_cache_test.cc b/src/v/wasm/tests/wasm_cache_test.cc index 1b25418392b3e..3bd6c89015574 100644 --- a/src/v/wasm/tests/wasm_cache_test.cc +++ b/src/v/wasm/tests/wasm_cache_test.cc @@ -133,7 +133,8 @@ class fake_runtime : public runtime { class WasmCacheTest : public ::testing::Test { public: static void SetUpTestSuite() { - vassert(ss::smp::count > 1, "This test expects multiple shards"); + vassert( + ss::this_smp_shard_count() > 1, "This test expects multiple shards"); } void SetUp() override { @@ -221,13 +222,13 @@ TEST_F(WasmCacheTest, CachesEngines) { EXPECT_EQ(engine, engine_two.get()); live_engine = engine; }); - EXPECT_EQ(state()->engines, ss::smp::count); + EXPECT_EQ(state()->engines, ss::this_smp_shard_count()); // This engine doesn't actually create new instances under the hood. auto engine = factory->make_engine(std::make_unique()).get(); - EXPECT_EQ(state()->engines, ss::smp::count); + EXPECT_EQ(state()->engines, ss::this_smp_shard_count()); engine = nullptr; - EXPECT_EQ(state()->engines, ss::smp::count); + EXPECT_EQ(state()->engines, ss::this_smp_shard_count()); invoke_on_all([] { live_engine = nullptr; }); EXPECT_EQ(state()->engines, 0); @@ -300,7 +301,7 @@ TEST_F(WasmCacheTest, GC) { }); EXPECT_EQ(state()->engines, 0); // We should GC each engine for each core - EXPECT_EQ(gc(), ss::smp::count); + EXPECT_EQ(gc(), ss::this_smp_shard_count()); factory = nullptr; // Now we should GC the factory EXPECT_EQ(gc(), 1);