-
Notifications
You must be signed in to change notification settings - Fork 526
Propagate cancellation within leaf search #6002
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
05093f0
472229f
600e6de
db35e83
2fd61d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,7 +41,7 @@ use tantivy::directory::FileSlice; | |
| use tantivy::fastfield::FastFieldReaders; | ||
| use tantivy::schema::Field; | ||
| use tantivy::{DateTime, Index, ReloadPolicy, Searcher, TantivyError, Term}; | ||
| use tokio::task::JoinError; | ||
| use tokio::task::{JoinError, JoinSet}; | ||
| use tracing::*; | ||
|
|
||
| use crate::collector::{IncrementalCollector, make_collector_for_split, make_merge_collector}; | ||
|
|
@@ -1202,8 +1202,7 @@ pub async fn multi_index_leaf_search( | |
| // | ||
| // It is a little bit tricky how to handle which is now the incremental_merge_collector, one | ||
| // per index, e.g. when to merge results and how to avoid lock contention. | ||
| let mut leaf_request_tasks = Vec::new(); | ||
|
|
||
| let mut leaf_request_futures = JoinSet::new(); | ||
| for leaf_search_request_ref in leaf_search_request.leaf_requests.into_iter() { | ||
| let index_uri = quickwit_common::uri::Uri::from_str( | ||
| leaf_search_request | ||
|
|
@@ -1226,7 +1225,7 @@ pub async fn multi_index_leaf_search( | |
| })? | ||
| .clone(); | ||
|
|
||
| let leaf_request_future = tokio::spawn({ | ||
| leaf_request_futures.spawn({ | ||
| let storage_resolver = storage_resolver.clone(); | ||
| let searcher_context = searcher_context.clone(); | ||
| let search_request = search_request.clone(); | ||
|
|
@@ -1241,24 +1240,20 @@ pub async fn multi_index_leaf_search( | |
| doc_mapper, | ||
| aggregation_limits, | ||
| ) | ||
| .in_current_span() | ||
| .await | ||
| } | ||
| .in_current_span() | ||
| }); | ||
| leaf_request_tasks.push(leaf_request_future); | ||
| } | ||
|
|
||
| let leaf_responses: Vec<crate::Result<LeafSearchResponse>> = tokio::time::timeout( | ||
| searcher_context.searcher_config.request_timeout(), | ||
| try_join_all(leaf_request_tasks), | ||
| ) | ||
| .await??; | ||
| let merge_collector = make_merge_collector(&search_request, aggregation_limits)?; | ||
| let mut incremental_merge_collector = IncrementalCollector::new(merge_collector); | ||
| for result in leaf_responses { | ||
| match result { | ||
| Ok(result) => { | ||
| incremental_merge_collector.add_result(result)?; | ||
| while let Some(leaf_response_join_result) = leaf_request_futures.join_next().await { | ||
| // abort the search on join errors | ||
| let leaf_response_result = leaf_response_join_result?; | ||
| match leaf_response_result { | ||
| Ok(leaf_response) => { | ||
| incremental_merge_collector.add_result(leaf_response)?; | ||
| } | ||
| Err(err) => { | ||
| incremental_merge_collector.add_failed_split(SplitSearchError { | ||
|
|
@@ -1349,9 +1344,6 @@ pub async fn single_doc_mapping_leaf_search( | |
|
|
||
| let split_filter = Arc::new(RwLock::new(split_filter)); | ||
|
|
||
| let mut leaf_search_single_split_join_handles: Vec<(String, tokio::task::JoinHandle<()>)> = | ||
| Vec::with_capacity(split_with_req.len()); | ||
|
|
||
| let merge_collector = make_merge_collector(&request, aggregations_limits.clone())?; | ||
| let incremental_merge_collector = IncrementalCollector::new(merge_collector); | ||
| let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector)); | ||
|
|
@@ -1379,6 +1371,8 @@ pub async fn single_doc_mapping_leaf_search( | |
| split_filter: split_filter.clone(), | ||
| }); | ||
|
|
||
| let mut split_search_futures = JoinSet::new(); | ||
| let mut split_with_task_id = Vec::with_capacity(split_with_req.len()); | ||
| for ((split, search_request), permit_fut) in | ||
| split_with_req.into_iter().zip(permit_futures.into_iter()) | ||
| { | ||
|
|
@@ -1394,35 +1388,37 @@ pub async fn single_doc_mapping_leaf_search( | |
| leaf_search_state_guard.set_state(SplitSearchState::PrunedBeforeWarmup); | ||
| continue; | ||
| }; | ||
|
|
||
| leaf_search_single_split_join_handles.push(( | ||
| split.split_id.clone(), | ||
| tokio::spawn( | ||
| leaf_search_single_split_wrapper( | ||
| simplified_search_request, | ||
| leaf_search_context.clone(), | ||
| index_storage.clone(), | ||
| split, | ||
| leaf_split_search_permit, | ||
| aggregations_limits.clone(), | ||
| ) | ||
| .in_current_span(), | ||
| ), | ||
| )); | ||
| let split_id = split.split_id.clone(); | ||
| let handle = split_search_futures.spawn( | ||
| leaf_search_single_split_wrapper( | ||
| simplified_search_request, | ||
| leaf_search_context.clone(), | ||
| index_storage.clone(), | ||
| split, | ||
| leaf_split_search_permit, | ||
| aggregations_limits.clone(), | ||
| ) | ||
| .in_current_span(), | ||
| ); | ||
| split_with_task_id.push((split_id, handle.id())); | ||
| } | ||
|
|
||
| // TODO we could cancel running splits when !run_all_splits and the running split can no | ||
| // longer give better results after some other split answered. | ||
| let mut split_search_join_errors: Vec<(String, JoinError)> = Vec::new(); | ||
|
|
||
| // There is no need to use `join_all`, as these are spawned tasks. | ||
| for (split, leaf_search_join_handle) in leaf_search_single_split_join_handles { | ||
| while let Some(leaf_search_join_result) = split_search_futures.join_next().await { | ||
| // splits that did not panic were already added to the collector | ||
| if let Err(join_error) = leaf_search_join_handle.await { | ||
| if let Err(join_error) = leaf_search_join_result { | ||
| if join_error.is_cancelled() { | ||
| // An explicit task cancellation is not an error. | ||
| continue; | ||
| } | ||
| let position = split_with_task_id | ||
guilload marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| .iter() | ||
| .position(|(_, task_id)| *task_id == join_error.id()) | ||
| .unwrap(); | ||
| let (split, _) = split_with_task_id.remove(position); | ||
| if join_error.is_panic() { | ||
| error!(split=%split, "leaf search task panicked"); | ||
| } else { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this timeout seems to have disappeared. Before it would shorten a query (but not actually release resources 😭). Can we bring it back?never mind, it was added on the caller side