Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2034,8 +2034,8 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&

std::set<TTabletId> error_tablet_ids;
std::map<TTabletId, TVersion> succ_tablets;
// partition_id, tablet_id, publish_version
std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets;
// partition_id, tablet_id, publish_version, commit_tso
std::vector<DiscontinuousVersionTablet> discontinuous_version_tablets;
std::map<TTableId, std::map<TTabletId, int64_t>> table_id_to_tablet_id_to_num_delta_rows;
uint32_t retry_time = 0;
Status status;
Expand Down Expand Up @@ -2092,8 +2092,8 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
}

for (auto& item : discontinuous_version_tablets) {
_engine.add_async_publish_task(std::get<0>(item), std::get<1>(item), std::get<2>(item),
publish_version_req.transaction_id, false);
_engine.add_async_publish_task(item.partition_id, item.tablet_id, item.publish_version,
publish_version_req.transaction_id, false, item.commit_tso);
}
TFinishTaskRequest finish_task_request;
if (!status.ok()) [[unlikely]] {
Expand Down
11 changes: 11 additions & 0 deletions be/src/information_schema/schema_rowsets_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaRowsetsScanner::_s_tbls_columns = {
{"CREATION_TIME", TYPE_DATETIME, sizeof(int64_t), true},
{"NEWEST_WRITE_TIMESTAMP", TYPE_DATETIME, sizeof(int64_t), true},
{"SCHEMA_VERSION", TYPE_INT, sizeof(int32_t), true},
{"COMMIT_TSO", TYPE_BIGINT, sizeof(int64_t), true},

};

Expand Down Expand Up @@ -268,6 +269,16 @@ Status SchemaRowsetsScanner::_fill_block_impl(Block* block) {
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 12, datas));
}
// COMMIT_TSO
{
std::vector<int64_t> srcs(fill_rowsets_num);
for (size_t i = fill_idx_begin; i < fill_idx_end; ++i) {
RowsetSharedPtr rowset = rowsets_[i];
srcs[i - fill_idx_begin] = rowset->commit_tso();
datas[i - fill_idx_begin] = srcs.data() + i - fill_idx_begin;
}
RETURN_IF_ERROR(fill_dest_column_for_range(block, 13, datas));
}

_rowsets_idx += fill_rowsets_num;
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/http/action/pad_rowset_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ Status PadRowsetAction::_pad_rowset(Tablet* tablet, const Version& version) {
auto writer = DORIS_TRY(tablet->create_rowset_writer(ctx, false));
RowsetSharedPtr rowset;
RETURN_IF_ERROR(writer->build(rowset));
rowset->make_visible(version);
rowset->make_visible(version, -1);

std::vector<RowsetSharedPtr> to_add {rowset};
std::vector<RowsetSharedPtr> to_delete;
Expand Down
29 changes: 15 additions & 14 deletions be/src/storage/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,20 +483,21 @@ Status DataDir::load() {
}
}

auto load_pending_publish_info_func =
[&engine = _engine](int64_t tablet_id, int64_t publish_version, std::string_view info) {
PendingPublishInfoPB pending_publish_info_pb;
bool parsed = pending_publish_info_pb.ParseFromArray(info.data(),
cast_set<int>(info.size()));
if (!parsed) {
LOG(WARNING) << "parse pending publish info failed, tablet_id: " << tablet_id
<< " publish_version: " << publish_version;
}
engine.add_async_publish_task(pending_publish_info_pb.partition_id(), tablet_id,
publish_version,
pending_publish_info_pb.transaction_id(), true);
return true;
};
auto load_pending_publish_info_func = [&engine = _engine](int64_t tablet_id,
int64_t publish_version,
std::string_view info) {
PendingPublishInfoPB pending_publish_info_pb;
bool parsed =
pending_publish_info_pb.ParseFromArray(info.data(), cast_set<int>(info.size()));
if (!parsed) {
LOG(WARNING) << "parse pending publish info failed, tablet_id: " << tablet_id
<< " publish_version: " << publish_version;
}
engine.add_async_publish_task(pending_publish_info_pb.partition_id(), tablet_id,
publish_version, pending_publish_info_pb.transaction_id(),
true, pending_publish_info_pb.commit_tso());
return true;
};
MonotonicStopWatch pending_publish_timer;
pending_publish_timer.start();
RETURN_IF_ERROR(
Expand Down
12 changes: 7 additions & 5 deletions be/src/storage/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1660,7 +1660,7 @@ void StorageEngine::_follow_cooldown_meta(TabletSharedPtr t) {

void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_id,
int64_t publish_version, int64_t transaction_id,
bool is_recovery) {
bool is_recovery, int64_t commit_tso) {
if (!is_recovery) {
bool exists = false;
{
Expand All @@ -1685,6 +1685,7 @@ void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_
PendingPublishInfoPB pending_publish_info_pb;
pending_publish_info_pb.set_partition_id(partition_id);
pending_publish_info_pb.set_transaction_id(transaction_id);
pending_publish_info_pb.set_commit_tso(commit_tso);
static_cast<void>(TabletMetaManager::save_pending_publish_info(
tablet->data_dir(), tablet->tablet_id(), publish_version,
pending_publish_info_pb.SerializeAsString()));
Expand All @@ -1693,7 +1694,7 @@ void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_
<< " version: " << publish_version << " txn_id:" << transaction_id
<< " is_recovery: " << is_recovery;
std::unique_lock<std::shared_mutex> wlock(_async_publish_lock);
_async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id};
_async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id, commit_tso};
}

int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) {
Expand Down Expand Up @@ -1730,8 +1731,9 @@ void StorageEngine::_process_async_publish() {

auto task_iter = tablet_iter->second.begin();
int64_t version = task_iter->first;
int64_t transaction_id = task_iter->second.first;
int64_t partition_id = task_iter->second.second;
int64_t transaction_id = std::get<0>(task_iter->second);
int64_t partition_id = std::get<1>(task_iter->second);
int64_t commit_tso = std::get<2>(task_iter->second);
int64_t max_version = tablet->max_version().second;

if (version <= max_version) {
Expand All @@ -1753,7 +1755,7 @@ void StorageEngine::_process_async_publish() {
}

auto async_publish_task = std::make_shared<AsyncTabletPublishTask>(
*this, tablet, partition_id, transaction_id, version);
*this, tablet, partition_id, transaction_id, version, commit_tso);
static_cast<void>(_tablet_publish_txn_thread_pool->submit_func(
[=]() { async_publish_task->handle(); }));
tablet_iter->second.erase(task_iter);
Expand Down
3 changes: 2 additions & 1 deletion be/src/storage/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Status Rowset::load(bool use_cache) {
return Status::OK();
}

void Rowset::make_visible(Version version) {
void Rowset::make_visible(Version version, int64_t commit_tso) {
_is_pending = false;
_rowset_meta->set_version(version);
_rowset_meta->set_rowset_state(VISIBLE);
Expand All @@ -95,6 +95,7 @@ void Rowset::make_visible(Version version) {
if (_rowset_meta->has_delete_predicate()) {
_rowset_meta->mutable_delete_predicate()->set_version(cast_set<int32_t>(version.first));
}
_rowset_meta->set_commit_tso(commit_tso);
}

void Rowset::set_version(Version version) {
Expand Down
5 changes: 4 additions & 1 deletion be/src/storage/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
const std::string& tablet_path() const { return _tablet_path; }

// publish rowset to make it visible to read
void make_visible(Version version);
void make_visible(Version version, int64_t commit_tso);
void set_version(Version version);
const TabletSchemaSPtr& tablet_schema() const;

Expand All @@ -166,6 +166,9 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
RowsetMetaPB get_rowset_pb() const { return rowset_meta()->get_rowset_pb(); }
// The writing time of the newest data in rowset, to measure the freshness of a rowset.
int64_t newest_write_timestamp() const { return rowset_meta()->newest_write_timestamp(); }
// The commit tso of the newest data in rowset.
int64_t commit_tso() const { return rowset_meta()->commit_tso(); }

bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); }
KeysType keys_type() { return _schema->keys_type(); }
RowsetStatePB rowset_meta_state() const { return rowset_meta()->rowset_state(); }
Expand Down
4 changes: 4 additions & 0 deletions be/src/storage/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,10 @@ class RowsetMeta : public MetadataAdder<RowsetMeta> {
[algorithm]() -> Result<EncryptionAlgorithmPB> { return algorithm; });
}

int64_t commit_tso() const { return _rowset_meta_pb.commit_tso(); }

void set_commit_tso(int64_t commit_tso) { _rowset_meta_pb.set_commit_tso(commit_tso); }

private:
bool _deserialize_from_pb(std::string_view value);

Expand Down
7 changes: 4 additions & 3 deletions be/src/storage/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ class StorageEngine final : public BaseStorageEngine {
void gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos);

void add_async_publish_task(int64_t partition_id, int64_t tablet_id, int64_t publish_version,
int64_t transaction_id, bool is_recover);
int64_t transaction_id, bool is_recover, int64_t commit_tso);
int64_t get_pending_publish_min_version(int64_t tablet_id);

bool add_broken_path(std::string path);
Expand Down Expand Up @@ -583,8 +583,9 @@ class StorageEngine final : public BaseStorageEngine {

std::mutex _cumu_compaction_delay_mtx;

// tablet_id, publish_version, transaction_id, partition_id
std::map<int64_t, std::map<int64_t, std::pair<int64_t, int64_t>>> _async_publish_tasks;
// tablet_id, publish_version, transaction_id, partition_id, commit_tso
std::map<int64_t, std::map<int64_t, std::tuple<int64_t, int64_t, int64_t>>>
_async_publish_tasks;
// aync publish for discontinuous versions of merge_on_write table
std::shared_ptr<Thread> _async_publish_thread;
std::shared_mutex _async_publish_lock;
Expand Down
Loading
Loading