Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions MODULE.bazel.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,13 @@ def data_dependency():
url = "https://github.com/redpanda-data/CRoaring/archive/c433d1c70c10fb2e40f049e019e2abbcafa6e69d.tar.gz",
)

# branch: v26.2.x-pre
# branch: v26.2.x (rebased) — https://github.com/redpanda-data/seastar/pull/277
http_archive(
name = "seastar",
build_file = "//bazel/thirdparty:seastar.BUILD",
sha256 = "de9da2d60057985c55199459a063d02944a95f1c41ce7f9e09f62a3b9c941465",
strip_prefix = "seastar-a0b4f2a6b0fe5d86d3e03f0a85f7d7f7b86bdbe7",
url = "https://github.com/redpanda-data/seastar/archive/a0b4f2a6b0fe5d86d3e03f0a85f7d7f7b86bdbe7.tar.gz",
sha256 = "d043a4e3f979f05d7b69b22d73d8ec1d56465daf4a673c353080e27ee49b6791",
strip_prefix = "seastar-097536fff982981023c2073e3dc5dc885cfe74be",
url = "https://github.com/redpanda-data/seastar/archive/097536fff982981023c2073e3dc5dc885cfe74be.tar.gz",
)

http_archive(
Expand Down
13 changes: 11 additions & 2 deletions bazel/thirdparty/seastar.BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ cc_proto_library(
cc_library(
name = "seastar",
srcs = [
"include/seastar/core/internal/md5.hh",
"include/seastar/net/native-stack.hh",
"include/seastar/net/tcp.hh",
"include/seastar/net/tcp-stack.hh",
Expand All @@ -213,6 +214,9 @@ cc_library(
"src/core/cgroup.hh",
"src/core/condition-variable.cc",
"src/core/cpu_profiler.cc",
"src/core/crypto.cc",
"src/core/crypto.hh",
"src/core/crypto_openssl.cc",
"src/core/disk_params.cc",
"src/core/dpdk_rte.cc",
"src/core/exception_hacks.cc",
Expand Down Expand Up @@ -284,7 +288,6 @@ cc_library(
"src/net/native-stack.cc",
"src/net/native-stack-impl.hh",
"src/net/net.cc",
"src/net/ossl.cc",
"src/net/packet.cc",
"src/net/posix-stack.cc",
"src/net/proxy.cc",
Expand All @@ -293,6 +296,8 @@ cc_library(
"src/net/tcp.cc",
"src/net/tls-impl.cc",
"src/net/tls-impl.hh",
"src/net/tls_openssl.cc",
"src/net/tls_openssl.hh",
"src/net/udp.cc",
"src/net/unix_address.cc",
"src/net/virtio.cc",
Expand All @@ -310,6 +315,7 @@ cc_library(
"src/util/read_first_line.cc",
"src/util/short_streams.cc",
"src/util/tmp_file.cc",
"src/websocket/client.cc",
"src/websocket/common.cc",
"src/websocket/parser.cc",
"src/websocket/server.cc",
Expand Down Expand Up @@ -523,6 +529,7 @@ cc_library(
"include/seastar/util/file.hh",
"include/seastar/util/function_input_iterator.hh",
"include/seastar/util/indirect.hh",
"include/seastar/util/integrated-length.hh",
"include/seastar/util/internal/array_map.hh",
"include/seastar/util/internal/iovec_utils.hh",
"include/seastar/util/internal/magic.hh",
Expand All @@ -534,6 +541,7 @@ cc_library(
"include/seastar/util/log-cli.hh",
"include/seastar/util/log-impl.hh",
"include/seastar/util/log-level.hh",
"include/seastar/util/memory-data-sink.hh",
"include/seastar/util/memory-data-source.hh",
"include/seastar/util/memory_diagnostics.hh",
"include/seastar/util/noncopyable_function.hh",
Expand All @@ -554,6 +562,7 @@ cc_library(
"include/seastar/util/tuple_utils.hh",
"include/seastar/util/used_size.hh",
"include/seastar/util/variant_utils.hh",
"include/seastar/websocket/client.hh",
"include/seastar/websocket/common.hh",
"include/seastar/websocket/parser.hh",
"include/seastar/websocket/server.hh",
Expand All @@ -572,7 +581,7 @@ cc_library(
"SEASTAR_API_LEVEL=$(API_LEVEL)",
"SEASTAR_HAS_MEMBARRIER",
"SEASTAR_SCHEDULING_GROUPS_COUNT=$(SCHEDULING_GROUPS)",
"SEASTAR_USE_OPENSSL",
"SEASTAR_HAVE_OPENSSL",
] + select({
":use_task_backtrace": ["SEASTAR_TASK_BACKTRACE"],
"//conditions:default": [],
Expand Down
3 changes: 2 additions & 1 deletion src/v/bytes/tests/iobuf_tests_mt.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ TEST_CORO(iobuf_mt, test_cross_shard_shares) {
boost::irange(0ul, parallel_shares),
[b = buf.share(0, buf.size_bytes())](auto s) mutable {
return ss::smp::submit_to(
s % ss::smp::count, [b = b.share(0, b.size_bytes())]() mutable {
s % ss::this_smp_shard_count(),
[b = b.share(0, b.size_bytes())]() mutable {
for (size_t i = 0; i < iterations; i++) {
b.share(0, b.size_bytes());
}
Expand Down
2 changes: 1 addition & 1 deletion src/v/cloud_io/cache_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1764,7 +1764,7 @@ ss::future<> cache::do_reserve_space(

auto short_term_hydrations_estimate
= config::shard_local_cfg().cloud_storage_max_connections()
* ss::smp::count;
* ss::this_smp_shard_count();

// Here we're trying to estimate how much space do we need to
// free to allow all TS resources to be used again to download
Expand Down
12 changes: 7 additions & 5 deletions src/v/cloud_io/io_resources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ throughput_limit get_hard_throughput_limit() {
auto hard_limit = config::shard_local_cfg()
.cloud_storage_max_throughput_per_shard()
.value_or(0)
* ss::smp::count;
* ss::this_smp_shard_count();

if (hard_limit == 0) {
// Run tiered-storage without throttling by setting
Expand All @@ -120,15 +120,16 @@ throughput_limit get_hard_throughput_limit() {

return {
.disk_node_throughput_limit = hard_limit,
.download_shard_throughput_limit = hard_limit / ss::smp::count,
.download_shard_throughput_limit = hard_limit
/ ss::this_smp_shard_count(),
};
}

throughput_limit get_throughput_limit(std::optional<size_t> device_throughput) {
auto hard_limit = config::shard_local_cfg()
.cloud_storage_max_throughput_per_shard()
.value_or(0)
* ss::smp::count;
* ss::this_smp_shard_count();
auto percent = config::shard_local_cfg()
.cloud_storage_throughput_limit_percent()
.value_or(0);
Expand All @@ -147,7 +148,8 @@ throughput_limit get_throughput_limit(std::optional<size_t> device_throughput) {
// is set we still need to limit network bandwidth even though
// the limit is overly high.
return throughput_limit{
.download_shard_throughput_limit = hard_limit / ss::smp::count,
.download_shard_throughput_limit = hard_limit
/ ss::this_smp_shard_count(),
};
}

Expand All @@ -156,7 +158,7 @@ throughput_limit get_throughput_limit(std::optional<size_t> device_throughput) {
auto tp = std::min(hard_limit, scaled_device_throughput);
return {
.disk_node_throughput_limit = tp,
.download_shard_throughput_limit = tp / ss::smp::count,
.download_shard_throughput_limit = tp / ss::this_smp_shard_count(),
};
}

Expand Down
4 changes: 2 additions & 2 deletions src/v/cloud_io/tests/cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ FIXTURE_TEST(test_clean_up_on_stream_exception, cache_test_fixture) {
*/
FIXTURE_TEST(test_put_enospc_on_non_zero_shard, cache_test_fixture) {
vassert(
ss::smp::count >= 2,
ss::this_smp_shard_count() >= 2,
"Test requires at least 2 shards to exercise non-shard-0 put path");

BOOST_CHECK_EXCEPTION(
Expand All @@ -539,7 +539,7 @@ FIXTURE_TEST(test_put_enospc_on_non_zero_shard, cache_test_fixture) {

// After ENOSPC, all shards must observe the block-puts flag, otherwise
// concurrent puts on other shards would keep racing into a full disk.
for (ss::shard_id s = 0; s < ss::smp::count; ++s) {
for (ss::shard_id s = 0; s < ss::this_smp_shard_count(); ++s) {
BOOST_CHECK(get_block_puts(s));
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/v/cloud_io/tests/db_s3_imposter_fixture.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ struct db_s3_imposter_fixture::handler : ss::httpd::handler_base {
rep->set_status(ss::http::reply::status_type::bad_request);
}

rep->done("xml");
rep->set_content_type("xml");
co_return std::move(rep);
}

Expand Down Expand Up @@ -528,7 +528,7 @@ ss::future<> db_s3_imposter_fixture::start() {
_handlers.push_back(std::move(real));

std::vector<forwarding_handler*> fwd_ptrs;
for (unsigned i = 1; i < ss::smp::count; ++i) {
for (unsigned i = 1; i < ss::this_smp_shard_count(); ++i) {
auto fwd = std::make_unique<forwarding_handler>(*real_ptr);
fwd_ptrs.push_back(fwd.get());
_handlers.push_back(std::move(fwd));
Expand Down
4 changes: 2 additions & 2 deletions src/v/cloud_storage_clients/client_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ constexpr auto pool_ready_timeout = 15s;
[[nodiscard]] std::pair<ss::shard_id, ss::shard_id> pick_two_random_shards() {
using dist_t = std::uniform_int_distribution<ss::shard_id>;

const ss::shard_id n = ss::smp::count;
const ss::shard_id n = ss::this_smp_shard_count();
const ss::shard_id self = ss::this_shard_id();

vassert(n > 1, "At least two shards are required");
Expand Down Expand Up @@ -308,7 +308,7 @@ ss::future<client_pool::client_lease> client_pool::acquire(
if (likely(!_idle_clients.empty())) {
client = pop_most_recently_used();
} else if (
ss::smp::count == 1
ss::this_smp_shard_count() == 1
|| _policy == client_pool_overdraft_policy::wait_if_empty
|| _leased.size() >= _capacity * 2) {
// If borrowing is disabled or this shard borrowed '_capacity'
Expand Down
18 changes: 9 additions & 9 deletions src/v/cloud_storage_clients/detail/tests/registry_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ TEST(UpstreamRegistry, DelayedThrowingStartWithMultipleWaiters) {
}

TEST(UpstreamRegistry, CrossShardConcurrentGet) {
ASSERT_GE(ss::smp::count, 2);
ASSERT_GE(ss::this_smp_shard_count(), 2);

ss::sharded<test_registry> registry;
registry.start(std::ref(test_log), test_registry::no_entry_limit).get();
Expand All @@ -174,13 +174,13 @@ TEST(UpstreamRegistry, CrossShardConcurrentGet) {
})
.get();

for (ss::shard_id s{0}; s < ss::smp::count; ++s) {
for (ss::shard_id s{0}; s < ss::this_smp_shard_count(); ++s) {
EXPECT_EQ(host_shards[s], s);
}
}

TEST(UpstreamRegistry, CrossShardThrowingStart) {
ASSERT_GE(ss::smp::count, 2);
ASSERT_GE(ss::this_smp_shard_count(), 2);

ss::sharded<throwing_registry> registry;
registry.start(std::ref(test_log), test_registry::no_entry_limit).get();
Expand Down Expand Up @@ -294,7 +294,7 @@ TEST(UpstreamRegistry, EvictorLoopEvictsIdleEntries) {
}

TEST(UpstreamRegistry, EntryLimitEnforced) {
ASSERT_GE(ss::smp::count, 2);
ASSERT_GE(ss::this_smp_shard_count(), 2);

constexpr size_t max_entries = 2;
ss::sharded<test_registry> registry;
Expand Down Expand Up @@ -346,7 +346,7 @@ TEST(UpstreamRegistry, EntryLimitEnforced) {
}

TEST(UpstreamRegistry, CrossShardEviction) {
ASSERT_GE(ss::smp::count, 2);
ASSERT_GE(ss::this_smp_shard_count(), 2);
constexpr auto evict_threshold = ss::lowres_clock::time_point::max();

ss::sharded<test_registry> registry;
Expand Down Expand Up @@ -456,7 +456,7 @@ TEST(UpstreamRegistry, StressConcurrentGetAndEvict) {
}

TEST(UpstreamRegistry, StressCrossShardConcurrentCreateEvict) {
ASSERT_GE(ss::smp::count, 2);
ASSERT_GE(ss::this_smp_shard_count(), 2);

ss::sharded<test_registry> registry;
registry.start(std::ref(test_log), test_registry::no_entry_limit).get();
Expand All @@ -482,7 +482,7 @@ TEST(UpstreamRegistry, StressCrossShardConcurrentCreateEvict) {
// Verify entries created - coordinator has all keys.
auto counts
= registry.map([](test_registry& r) { return r.entry_count(); }).get();
EXPECT_EQ(counts[0], keys_per_shard * ss::smp::count);
EXPECT_EQ(counts[0], keys_per_shard * ss::this_smp_shard_count());

// Evict on peers first (releases coordinator's semaphore units).
registry
Expand Down Expand Up @@ -512,7 +512,7 @@ TEST(UpstreamRegistry, StressCrossShardConcurrentCreateEvict) {
}

TEST(UpstreamRegistry, StressConcurrentGetAndEvictWorkers) {
ASSERT_GE(ss::smp::count, 2);
ASSERT_GE(ss::this_smp_shard_count(), 2);

ss::sharded<test_registry> registry;
registry.start(std::ref(test_log), test_registry::no_entry_limit).get();
Expand Down Expand Up @@ -579,7 +579,7 @@ TEST(UpstreamRegistry, StressConcurrentGetAndEvictWorkers) {
}

TEST(UpstreamRegistry, PrepareStopCallsServicePrepareStop) {
ASSERT_GE(ss::smp::count, 2);
ASSERT_GE(ss::this_smp_shard_count(), 2);

ss::sharded<test_registry> registry;
registry.start(std::ref(test_log), test_registry::no_entry_limit).get();
Expand Down
12 changes: 7 additions & 5 deletions src/v/cloud_storage_clients/tests/client_pool_mt_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ static cloud_storage_clients::s3_configuration client_configuration() {
static const client_pool_builder test_pool_builder{client_configuration()};

SEASTAR_THREAD_TEST_CASE(test_client_pool_acquire_blocked_on_another_shard) {
BOOST_REQUIRE(ss::smp::count == 2);
BOOST_REQUIRE(ss::this_smp_shard_count() == 2);

constexpr size_t num_connections_per_shard = 4;

Expand Down Expand Up @@ -137,7 +137,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_pool_acquire_blocked_on_another_shard) {
}

SEASTAR_THREAD_TEST_CASE(test_client_pool_acquire_blocked_on_this_shard) {
BOOST_REQUIRE(ss::smp::count == 2);
BOOST_REQUIRE(ss::this_smp_shard_count() == 2);

constexpr size_t num_connections_per_shard = 4;

Expand Down Expand Up @@ -190,7 +190,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_pool_acquire_blocked_on_this_shard) {
}

SEASTAR_THREAD_TEST_CASE(test_client_pool_acquire_after_leasing_all) {
BOOST_REQUIRE(ss::smp::count == 2);
BOOST_REQUIRE(ss::this_smp_shard_count() == 2);
constexpr size_t num_connections_per_shard = 4;

ss::sharded<cloud_storage_clients::client_pool> pool;
Expand Down Expand Up @@ -223,7 +223,9 @@ SEASTAR_THREAD_TEST_CASE(test_client_pool_acquire_after_leasing_all) {
auto leases_stop = ss::defer([&leases] { leases.stop().get(); });

// Lease all connections from all the shards.
for (size_t i = 0; i < ss::smp::count * num_connections_per_shard; i++) {
for (size_t i = 0;
i < ss::this_smp_shard_count() * num_connections_per_shard;
i++) {
leases.local().leases.push_back(
pool.local().acquire(test_bucket, leases.local().as).get());
}
Expand Down Expand Up @@ -291,7 +293,7 @@ SEASTAR_THREAD_TEST_CASE(test_client_pool_acquire_after_leasing_all) {
}

SEASTAR_THREAD_TEST_CASE(test_client_pool_concurrent_acquire_release) {
BOOST_REQUIRE(ss::smp::count == 2);
BOOST_REQUIRE(ss::this_smp_shard_count() == 2);

constexpr size_t num_connections_per_shard = 4;
constexpr size_t num_workers_per_shard = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ TEST_F(SchedulerTestFixture, TestSchedulerMultithread) {
scheduler->unmanage_partition(ntp_to_remove, "unmanage_partition_func");
};
auto pause_random_worker_func = [this, &paused_workers]() {
auto random_shard = random_generators::get_int(ss::smp::count - 1);
auto random_shard = random_generators::get_int(
ss::this_smp_shard_count() - 1);
// Inserting _before_ the pause_worker() future resolves can help expose
// deadlocks.
paused_workers.insert(random_shard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ TEST_F(SchedulerTestFixture, TestScheduler) {
scheduler->unmanage_partition(ntp_to_remove, "unmanage_partition_func");
};
auto pause_random_worker_func = [this, &paused_workers]() {
auto random_shard = random_generators::get_int(ss::smp::count - 1);
auto random_shard = random_generators::get_int(
ss::this_smp_shard_count() - 1);
// Inserting _before_ the pause_worker() future resolves can help expose
// deadlocks.
paused_workers.insert(random_shard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ TEST_F(WorkerManagerTestFixture, PauseAndResumeWorkers) {
start_workers(manager).get();
auto stop_manager = ss::defer([&manager] { manager.stop().get(); });
using worker_state = l1::compaction_worker::worker_state;
for (ss::shard_id i = 0; i < ss::smp::count; ++i) {
for (ss::shard_id i = 0; i < ss::this_smp_shard_count(); ++i) {
// Workers start in active state
ASSERT_EQ(get_worker_state(manager, i).get(), worker_state::active);
ASSERT_TRUE(work_fut_has_value(manager, i).get());
Expand Down
Loading
Loading