diff --git a/Cargo.toml b/Cargo.toml index 3248f10..4c65e1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ http-serde = "2" iceberg = "0.4" iceberg-catalog-rest = "0.4" miette = { version = "7", features = ["fancy"] } +num-format = { version = "0.4", features = ["with-system-locale"] } ratatui = "0.29" serde = "1" streemap = "0.1" @@ -31,4 +32,4 @@ terminal_size = "0.4" thiserror = "2" tokio = "1" tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } diff --git a/tanic-core/config.rs b/tanic-core/config.rs index 4050634..2d42cbf 100644 --- a/tanic-core/config.rs +++ b/tanic-core/config.rs @@ -37,6 +37,12 @@ impl ConnectionDetails { } } +impl PartialEq for ConnectionDetails { + fn eq(&self, other: &Self) -> bool { + self.uri == other.uri + } +} + /// persistable user config. /// /// Loaded in at application startup from $CONFIG/tanic/tanic.toml diff --git a/tanic-core/error.rs b/tanic-core/error.rs index 755088e..c6dfe11 100644 --- a/tanic-core/error.rs +++ b/tanic-core/error.rs @@ -27,3 +27,9 @@ pub enum TanicError { #[error("Unexpected")] UnexpectedError(String), } + +impl TanicError { + pub fn unexpected(msg: T) -> Self { + Self::UnexpectedError(msg.to_string()) + } +} diff --git a/tanic-core/message.rs b/tanic-core/message.rs index e96e7f3..621122b 100644 --- a/tanic-core/message.rs +++ b/tanic-core/message.rs @@ -11,3 +11,14 @@ pub struct TableDeets { pub name: String, pub row_count: usize, } + +impl NamespaceDeets { + pub fn from_parts(parts: Vec) -> Self { + let name = parts.clone().join("."); + Self { + parts, + name, + table_count: 0, + } + } +} diff --git a/tanic-svc/Cargo.toml b/tanic-svc/Cargo.toml index d020a2c..40464da 100644 --- a/tanic-svc/Cargo.toml +++ b/tanic-svc/Cargo.toml @@ -28,6 +28,10 @@ iceberg-catalog-rest = "0.4.0" serde = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } -uuid = { version = "1.12.0", features = ["v4"] } -names = "0.14.0" -tokio-stream = { version = "0.1.17", features = ["sync"] } +uuid = { version = "1", features = ["v4"] } +names = "0.14" +num-format = { workspace = true } +tokio-stream = { version = "0.1", features = ["sync"] } +parquet = "54.0.0" +indexmap = "2" +futures = "0.3" diff --git a/tanic-svc/src/iceberg_context.rs b/tanic-svc/src/iceberg_context.rs index 6e0f064..bf9da1d 100644 --- a/tanic-svc/src/iceberg_context.rs +++ b/tanic-svc/src/iceberg_context.rs @@ -1,186 +1,403 @@ //! Iceberg Context -use iceberg::{Catalog, NamespaceIdent}; -use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; use std::sync::Arc; +use std::sync::RwLock; + +use futures::stream::StreamExt; +use iceberg::{Catalog, NamespaceIdent, TableIdent}; +use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::watch::Receiver; -use tokio_stream::{wrappers::WatchStream, StreamExt}; +use tokio::task::JoinHandle; +use tokio_stream::wrappers::WatchStream; use tanic_core::config::ConnectionDetails; use tanic_core::message::{NamespaceDeets, TableDeets}; use tanic_core::{Result, TanicError}; +use tokio::sync::mpsc::{channel, Receiver as MpscReceiver, Sender as MpscSender}; +use tokio_stream::wrappers::ReceiverStream; -use crate::state::{TanicAction, TanicAppState, ViewingNamespacesListState}; +use crate::state::{TanicAction, TanicAppState, TanicIcebergState}; -#[derive(Debug)] -enum Connection { - Disconnected, - Connected(IcebergContext), -} +type ActionTx = UnboundedSender; +type IceCtxRef = Arc>; -#[derive(Debug)] +const JOB_STREAM_CONCURRENCY: usize = 1; + +#[derive(Debug, Default)] struct IcebergContext { - connection_details: ConnectionDetails, + connection_details: Option, /// Iceberg Catalog catalog: Option>, namespaces: Vec, tables: Vec, + + #[allow(unused)] // TODO: cancellation + pub cancellable_action: Option>, } /// Iceberg Context #[derive(Debug)] pub struct IcebergContextManager { - action_tx: UnboundedSender, + action_tx: ActionTx, + iceberg_context: IceCtxRef, + state_ref: Arc>, +} + +#[derive(Debug)] +enum IcebergTask { + Namespaces, + TablesForNamespace(NamespaceDeets), + SummaryForTable(TableDeets), } impl IcebergContextManager { - pub fn new(action_tx: UnboundedSender) -> Self { - Self { action_tx } + pub fn new(action_tx: ActionTx, state_ref: Arc>) -> Self { + Self { + action_tx, + state_ref, + iceberg_context: Arc::new(RwLock::new(IcebergContext::default())), + } } - pub async fn event_loop(self, state_rx: Receiver) -> Result<()> { - let mut connection = Connection::Disconnected; - + pub async fn event_loop(&self, state_rx: Receiver<()>) -> Result<()> { let mut state_stream = WatchStream::new(state_rx); + let (job_queue_tx, job_queue_rx) = channel(10); - while let Some(state) = state_stream.next().await { - match state { - TanicAppState::ConnectingTo(ref new_conn_details) => { - match &mut connection { - // initial connection - Connection::Disconnected => { - let mut context = IcebergContext::connect_to(new_conn_details); - - context.populate_namespaces().await?; - - self.action_tx - .send(TanicAction::RetrievedNamespaceList( - context.namespaces.clone(), - )) - .map_err(|err| TanicError::UnexpectedError(err.to_string()))?; - - connection = Connection::Connected(context); - } - - // already existing connection? No Op - Connection::Connected(IcebergContext { - connection_details, .. - }) if connection_details.uri == new_conn_details.uri => {} + let mut current_conn_details: Option = None; - // switch connection - Connection::Connected(_) => { - let mut context = IcebergContext::connect_to(new_conn_details); - - context.populate_namespaces().await?; - - self.action_tx - .send(TanicAction::RetrievedNamespaceList( - context.namespaces.clone(), - )) - .map_err(|err| TanicError::UnexpectedError(err.to_string()))?; - - connection = Connection::Connected(context); - } + tokio::spawn({ + let action_tx = self.action_tx.clone(); + let job_queue_tx = job_queue_tx.clone(); + let iceberg_ctx = self.iceberg_context.clone(); + async move { + tracing::debug!("await job_handler()"); + Self::job_handler(job_queue_rx, job_queue_tx, action_tx, iceberg_ctx).await + } + }); + + tracing::debug!("await state_stream.next() 1"); + let mut next_item = state_stream.next().await; + tracing::debug!("await state_stream.next() 1 complete"); + while next_item.is_some() { + let new_conn_details = { + tracing::debug!("self.state_ref.read()"); + let state = self.state_ref.read().unwrap(); + + match &state.iceberg { + TanicIcebergState::ConnectingTo(ref new_conn_details) => { + Some(new_conn_details.clone()) } - } - TanicAppState::ViewingNamespacesList(_) => {} - TanicAppState::RetrievingTableList(ViewingNamespacesListState { - namespaces, - selected_idx, - }) => { - let Some(selected_idx) = selected_idx else { - continue; - }; - let namespace = &namespaces[selected_idx]; - if let Connection::Connected(ref mut iceberg_ctx) = &mut connection { - iceberg_ctx.populate_table_list(&namespace.parts).await?; - - self.action_tx - .send(TanicAction::RetrievedTableList( - namespace.clone(), - iceberg_ctx.tables.clone(), - )) - .map_err(|err| TanicError::UnexpectedError(err.to_string()))?; + TanicIcebergState::Exiting => { + break; } + _ => None, } - TanicAppState::Exiting => { - break; + }; + tracing::debug!("self.state_ref.read() done"); + + if let Some(new_conn_details) = new_conn_details { + if Some(new_conn_details.clone()) != current_conn_details { + current_conn_details = Some(new_conn_details.clone()); + + tracing::debug!("await self.connect_to()"); + self.connect_to(&new_conn_details, job_queue_tx.clone()) + .await?; + tracing::debug!("await self.connect_to() done"); + + // begin crawl + tracing::debug!("await job_queue_tx.send()"); + let _ = job_queue_tx.send(IcebergTask::Namespaces).await; + tracing::debug!("await job_queue_tx.send() done"); } - _ => {} } + + tracing::debug!("await state_stream.next() 2"); + next_item = state_stream.next().await; + tracing::debug!("await state_stream.next() 2 complete"); } Ok(()) } -} -impl IcebergContext { - /// Create a new Iceberg Context from a Uri - pub fn connect_to(connection_details: &ConnectionDetails) -> Self { - let connection_details = connection_details.clone(); - - let mut uri_str = connection_details.uri.to_string(); - uri_str.pop(); - - let config = RestCatalogConfig::builder().uri(uri_str).build(); - let rest_catalog = RestCatalog::new(config); + async fn connect_to( + &self, + new_conn_details: &ConnectionDetails, + _job_queue_tx: MpscSender, + ) -> Result<()> { + { + tracing::debug!("self.iceberg_context.read()"); + let ctx = self.iceberg_context.read().unwrap(); + if let Some(ref existing_conn_details) = ctx.connection_details { + if new_conn_details == existing_conn_details { + // do nothing, already connected to this catalog + return Ok(()); + } + } + } + tracing::debug!("self.iceberg_context.read() done"); - Self { - connection_details, - namespaces: vec![], - tables: vec![], - catalog: Some(Arc::new(rest_catalog)), + { + tracing::debug!("self.iceberg_context.write()"); + let mut ctx = self.iceberg_context.write().unwrap(); + ctx.connect_to(new_conn_details); } + tracing::debug!("self.iceberg_context.write() done"); + + Ok(()) } - pub async fn populate_namespaces(&mut self) -> Result<()> { - let Some(ref catalog) = self.catalog else { - panic!(); + async fn populate_namespaces( + ctx: IceCtxRef, + action_tx: ActionTx, + job_queue_tx: MpscSender, + ) -> Result<()> { + let root_namespaces = { + let catalog = { + tracing::debug!("ctx.read()"); + let r_ctx = ctx.read().unwrap(); + + let Some(ref catalog) = r_ctx.catalog else { + return Err(TanicError::unexpected( + "Attempted to populate namespaces when catalog not initialised", + )); + }; + + catalog.clone() + }; + tracing::debug!("ctx.read() done"); + + tracing::debug!("catalog.list_namespaces(None).await"); + let res = catalog.list_namespaces(None).await?; + tracing::debug!("catalog.list_namespaces(None).await done"); + + res }; - let root_namespaces = catalog.list_namespaces(None).await?; - let namespaces = root_namespaces .into_iter() - .map(|ns| { - let parts = ns.inner(); - let name = parts.clone().join("."); - NamespaceDeets { - parts, - name, - table_count: 0, - } - }) + .map(|ns| NamespaceDeets::from_parts(ns.inner())) .collect::>(); - self.namespaces = namespaces; + { + let namespaces = namespaces.clone(); + tracing::debug!("ctx.write()"); + ctx.write().unwrap().namespaces = namespaces; + } + tracing::debug!("ctx.write() done"); + + action_tx + .send(TanicAction::UpdateNamespacesList( + namespaces + .iter() + .map(|ns| ns.name.clone()) + .collect::>(), + )) + .map_err(|err| TanicError::UnexpectedError(err.to_string()))?; + + for namespace in namespaces { + tracing::debug!("job_queue_tx.send await"); + let _ = job_queue_tx + .send(IcebergTask::TablesForNamespace(namespace.clone())) + .await; + tracing::debug!("job_queue_tx.send await done"); + } Ok(()) } - pub async fn populate_table_list(&mut self, namespace_parts: &Vec) -> Result<()> { - let Some(ref catalog) = self.catalog else { - panic!(); + async fn populate_tables( + ctx: IceCtxRef, + action_tx: ActionTx, + namespace: NamespaceDeets, + job_queue_tx: MpscSender, + ) -> Result<()> { + let namespace_ident = NamespaceIdent::from_strs(namespace.parts.clone())?; + let tables = { + let catalog = { + tracing::debug!("ctx.read()"); + let r_ctx = ctx.read().unwrap(); + + let Some(ref catalog) = r_ctx.catalog else { + return Err(TanicError::unexpected( + "Attempted to populate namespaces when catalog not initialised", + )); + }; + + catalog.clone() + }; + tracing::debug!("ctx.read() done"); + + tracing::debug!("catalog.list_tables(&namespace_ident).await"); + let res = catalog.list_tables(&namespace_ident).await?; + tracing::debug!("catalog.list_tables(&namespace_ident).await done"); + + res }; - let tables = catalog - .list_tables(&NamespaceIdent::from_strs(namespace_parts)?) - .await?; - - let table_names = tables + let tables = tables .into_iter() .map(|ti| TableDeets { - namespace: namespace_parts.clone(), + namespace: namespace.parts.clone(), name: ti.name().to_string(), row_count: 1, }) .collect::>(); - self.tables = table_names; + { + let tables = tables.clone(); + tracing::debug!("ctx.write()"); + ctx.write().unwrap().tables = tables; + } + tracing::debug!("ctx.write() done"); + + action_tx + .send(TanicAction::UpdateNamespaceTableList( + namespace.name.clone(), + tables.iter().map(|t| &t.name).cloned().collect(), + )) + .map_err(TanicError::unexpected)?; + + for table in tables { + tracing::debug!("job_queue_tx.send await"); + tracing::info!(?table, "sending SummaryForTable"); + let _ = job_queue_tx + .send(IcebergTask::SummaryForTable(table.clone())) + .await; + tracing::debug!("job_queue_tx.send await done"); + } Ok(()) } + + async fn populate_table_summary( + ctx: IceCtxRef, + action_tx: ActionTx, + table: TableDeets, + _job_queue_tx: MpscSender, + ) -> Result<()> { + let namespace_ident = NamespaceIdent::from_strs(table.namespace.clone())?; + let table_ident = TableIdent::new(namespace_ident.clone(), table.name.clone()); + + let loaded_table = { + let catalog = { + tracing::debug!("ctx.read()"); + let r_ctx = ctx.read().unwrap(); + + let Some(ref catalog) = r_ctx.catalog else { + return Err(TanicError::unexpected( + "Attempted to populate table summary when catalog not initialised", + )); + }; + + catalog.clone() + }; + tracing::debug!("ctx.read() done"); + + tracing::debug!("catalog.load_table(&table_ident).await"); + let res = catalog.load_table(&table_ident).await?; + tracing::debug!("catalog.load_table(&table_ident).await done"); + + res + }; + + let summary = loaded_table + .metadata() + .current_snapshot() + .unwrap() + .summary(); + // tracing::info!(?summary); + + action_tx + .send(TanicAction::UpdateTableSummary { + namespace: namespace_ident.to_url_string(), + table_name: table_ident.name.clone(), + table_summary: summary.additional_properties.clone(), + }) + .map_err(TanicError::unexpected)?; + + Ok(()) + } +} + +impl IcebergContext { + /// Create a new Iceberg Context from a Uri + pub fn connect_to(&mut self, connection_details: &ConnectionDetails) { + self.connection_details = Some(connection_details.clone()); + + let mut uri_str = connection_details.uri.to_string(); + uri_str.pop(); + + let config = RestCatalogConfig::builder().uri(uri_str).build(); + self.catalog = Some(Arc::new(RestCatalog::new(config))); + + self.namespaces = vec![]; + self.tables = vec![]; + } +} + +impl IcebergContextManager { + async fn job_handler( + job_queue_rx: MpscReceiver, + job_queue_tx: MpscSender, + action_tx: ActionTx, + iceberg_ctx: IceCtxRef, + ) { + let job_stream = ReceiverStream::new(job_queue_rx); + + job_stream + .map(|task| { + ( + task, + iceberg_ctx.clone(), + action_tx.clone(), + job_queue_tx.clone(), + ) + }) + .for_each_concurrent( + JOB_STREAM_CONCURRENCY, + async move |(task, iceberg_ctx, action_tx, job_queue_tx)| { + match task { + IcebergTask::Namespaces => { + tracing::debug!("populate_namespaces.await"); + let _ = IcebergContextManager::populate_namespaces( + iceberg_ctx, + action_tx, + job_queue_tx, + ) + .await; + tracing::debug!("populate_namespaces.await done"); + } + + IcebergTask::TablesForNamespace(namespace) => { + tracing::debug!("populate_tables.await"); + let _ = IcebergContextManager::populate_tables( + iceberg_ctx, + action_tx, + namespace, + job_queue_tx, + ) + .await; + tracing::debug!("populate_tables.await done"); + } + + IcebergTask::SummaryForTable(table) => { + tracing::debug!("populate_table_summary.await"); + let _ = IcebergContextManager::populate_table_summary( + iceberg_ctx, + action_tx, + table, + job_queue_tx, + ) + .await; + tracing::debug!("populate_table_summary.await done"); + } // _ => {} + } + }, + ) + .await; + // }).await; + } } diff --git a/tanic-svc/src/lib.rs b/tanic-svc/src/lib.rs index 7f97aee..36ec63a 100644 --- a/tanic-svc/src/lib.rs +++ b/tanic-svc/src/lib.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; +use std::sync::RwLock; use tanic_core::TanicConfig; use tanic_core::{Result, TanicError}; use tokio::sync::mpsc::{UnboundedReceiver as MpscReceiver, UnboundedSender as MpscSender}; @@ -6,6 +8,7 @@ use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender}; pub mod iceberg_context; pub mod state; +use crate::state::TanicIcebergState; pub use state::{TanicAction, TanicAppState}; pub struct AppStateManager { @@ -13,19 +16,17 @@ pub struct AppStateManager { #[allow(unused)] action_tx: MpscSender, - state_tx: WatchSender, + state_tx: WatchSender<()>, - state: TanicAppState, + state: Arc>, } impl AppStateManager { - pub fn new( - _config: TanicConfig, - ) -> (Self, MpscSender, WatchReceiver) { - let state = TanicAppState::default(); + pub fn new(_config: TanicConfig) -> (Self, MpscSender, WatchReceiver<()>) { + let state = Arc::new(RwLock::new(TanicAppState::default())); let (action_tx, action_rx) = tokio::sync::mpsc::unbounded_channel(); - let (state_tx, state_rx) = tokio::sync::watch::channel(state.clone()); + let (state_tx, state_rx) = tokio::sync::watch::channel(()); ( Self { @@ -39,25 +40,35 @@ impl AppStateManager { ) } + pub fn get_state(&self) -> Arc> { + self.state.clone() + } + pub async fn event_loop(self) -> Result<()> { let Self { - mut state, + state, state_tx, mut action_rx, .. } = self; - while !matches!(state, TanicAppState::Exiting) { + while !matches!(state.read().unwrap().iceberg, TanicIcebergState::Exiting) { + tracing::debug!("await action_rx.recv()"); let Some(action) = action_rx.recv().await else { break; }; + tracing::debug!("await action_rx.recv() complete"); tracing::info!(?action, "AppState received an action"); - let next_state = state.reduce(action); + { + tracing::debug!("state.write()"); + let mut mut_state = state.write().unwrap(); + *mut_state = mut_state.clone().update(action); + } + tracing::debug!("state.write() done"); - state = next_state; state_tx - .send(state.clone()) + .send(()) .map_err(|err| TanicError::UnexpectedError(err.to_string()))?; } diff --git a/tanic-svc/src/state.rs b/tanic-svc/src/state.rs index 7b1f16d..ddd63b8 100644 --- a/tanic-svc/src/state.rs +++ b/tanic-svc/src/state.rs @@ -1,5 +1,12 @@ +use iceberg::spec::{DataFile, Manifest, ManifestList, Snapshot}; +use iceberg::table::Table; +use indexmap::IndexMap; +use num_format::SystemLocale; +use parquet::file::metadata::ParquetMetaData; +use std::collections::HashMap; use tanic_core::config::ConnectionDetails; -use tanic_core::message::{NamespaceDeets, TableDeets}; + +const TABLE_SUMMARY_KEY_ROW_COUNT: &str = "total-records"; #[derive(Debug)] pub enum TanicAction { @@ -7,191 +14,733 @@ pub enum TanicAction { ConnectTo(ConnectionDetails), - RetrievedNamespaceList(Vec), + // Iceberg metadata update actions + UpdateNamespacesList(Vec), + UpdateNamespaceProperties(String, HashMap), + UpdateNamespaceTableList(String, Vec), + UpdateTable { + namespace: String, + table_name: String, + table: Table, + }, + UpdateTableSummary { + namespace: String, + table_name: String, + table_summary: HashMap, + }, + UpdateTableCurrentSnapshot { + namespace: String, + table_name: String, + snapshot: Snapshot, + }, + UpdateTableCurrentManifestList { + namespace: String, + table_name: String, + manifest_list: Box, + }, + UpdateTableManifest { + namespace: String, + table_name: String, + manifest: Box, + file_path: String, + }, + UpdateTableDataFile { + namespace: String, + table_name: String, + data_file: Box, + }, + UpdateTableParquetMetaData { + namespace: String, + table_name: String, + file_path: String, + metadata: Box, + }, + + ///// UI Actions /////// FocusPrevNamespace, FocusNextNamespace, SelectNamespace, - RetrievedTableList(NamespaceDeets, Vec), - EnrichedTableDetails(), FocusPrevTable, FocusNextTable, SelectTable, - LeaveNamespace, + + Escape, + + FocusNextPartition, + FocusPrevPartition, + SelectPartition, + + FocusNextDataFile, + FocusPrevDataFile, + SelectDataFile, +} + +#[derive(Clone, Debug)] +pub struct TanicAppState { + pub iceberg: TanicIcebergState, + pub ui: TanicUiState, + pub locale: SystemLocale, } #[derive(Clone, Debug, Default)] -pub enum TanicAppState { +pub enum TanicIcebergState { #[default] Initializing, ConnectingTo(ConnectionDetails), + Connected(RetrievedIcebergMetadata), + Exiting, +} + +#[derive(Clone, Debug)] +pub struct RetrievedIcebergMetadata { + pub namespaces: IndexMap, +} + +#[derive(Clone, Debug)] +pub struct NamespaceDescriptor { + pub name: String, + properties: Option>, + pub tables: Option>, + + pub row_count: Option, +} + +#[derive(Clone, Debug)] +pub struct TableDescriptor { + pub name: String, + #[allow(unused)] + namespace: Vec, + current_snapshot_summary: Option>, + table: Option, + current_snapshot: Option, + current_manifest_list: Option, + manifests: IndexMap, + datafiles: HashMap, + parquet_metadata: HashMap, + + row_count: Option, +} + +impl TableDescriptor { + pub fn row_count(&self) -> Option { + self.row_count + } +} + +#[derive(Clone, Debug, Default)] +pub enum TanicUiState { + #[default] + SplashScreen, ViewingNamespacesList(ViewingNamespacesListState), - RetrievingTableList(ViewingNamespacesListState), ViewingTablesList(ViewingTablesListState), + ViewingTable(ViewingTableState), Exiting, } #[derive(Clone, Debug)] pub struct ViewingNamespacesListState { - pub namespaces: Vec, pub selected_idx: Option, } #[derive(Clone, Debug)] pub struct ViewingTablesListState { pub namespaces: ViewingNamespacesListState, - pub namespace: NamespaceDeets, - pub tables: Vec, pub selected_idx: Option, } +#[derive(Clone, Debug)] +pub struct ViewingTableState { + pub tables: ViewingTablesListState, +} + +impl Default for TanicAppState { + fn default() -> Self { + let locale = SystemLocale::default().unwrap(); + + Self { + iceberg: Default::default(), + ui: Default::default(), + locale, + } + } +} + impl TanicAppState { - pub(crate) fn reduce(self, action: TanicAction) -> Self { - match (action, &self) { - (TanicAction::Exit, _) => TanicAppState::Exiting, + pub(crate) fn update(mut self, action: TanicAction) -> Self { + match (action, &mut self) { + (TanicAction::Exit, _) => { + self.iceberg = TanicIcebergState::Exiting; + self.ui = TanicUiState::Exiting; + } - (TanicAction::ConnectTo(conn_details), _) => TanicAppState::ConnectingTo(conn_details), + (TanicAction::ConnectTo(conn_details), _) => { + self.iceberg = TanicIcebergState::ConnectingTo(conn_details); + self.ui = TanicUiState::SplashScreen; + } - (TanicAction::RetrievedNamespaceList(namespaces), _) => { + (TanicAction::UpdateNamespacesList(namespaces), _) => { let selected_idx = if namespaces.is_empty() { None } else { Some(0) }; - TanicAppState::ViewingNamespacesList(ViewingNamespacesListState { - namespaces, + let namespaces = IndexMap::from_iter(namespaces.iter().map(|ns| { + ( + ns.clone(), + NamespaceDescriptor { + name: ns.clone(), + properties: None, + tables: None, + row_count: None, + }, + ) + })); + + self.iceberg = + TanicIcebergState::Connected(RetrievedIcebergMetadata { namespaces }); + self.ui = TanicUiState::ViewingNamespacesList(ViewingNamespacesListState { selected_idx, - }) + }); + } + + (TanicAction::UpdateNamespaceProperties(namespace, properties), prev_state) => { + let TanicAppState { iceberg, .. } = prev_state; + + let TanicIcebergState::Connected(ref mut retrieved_iceberg_metadata) = iceberg + else { + tracing::error!("panic! not connected"); + panic!(); + }; + + let Some(namespacce_desc) = + retrieved_iceberg_metadata.namespaces.get_mut(&namespace) + else { + tracing::error!("panic! namespace not found"); + panic!(); + }; + + namespacce_desc.properties = Some(properties); + } + + (TanicAction::UpdateNamespaceTableList(namespace, table_names), prev_state) => { + let TanicAppState { iceberg, .. } = prev_state; + + let TanicIcebergState::Connected(ref mut retrieved_iceberg_metadata) = iceberg + else { + tracing::error!("panic! not connected"); + panic!(); + }; + + let Some(namespacce_desc) = + retrieved_iceberg_metadata.namespaces.get_mut(&namespace) + else { + tracing::error!("panic! namepsace not found"); + panic!(); + }; + + namespacce_desc.tables = + Some(IndexMap::from_iter(table_names.into_iter().map(|name| { + ( + name.clone(), + TableDescriptor { + name, + namespace: namespace + .split(" ") + .map(|s| s.to_string()) + .collect::>(), + current_snapshot_summary: None, + table: None, + current_snapshot: None, + current_manifest_list: None, + manifests: IndexMap::default(), + datafiles: HashMap::default(), + parquet_metadata: HashMap::default(), + + row_count: None, + }, + ) + }))) } ( - TanicAction::FocusPrevNamespace, - TanicAppState::ViewingNamespacesList(ViewingNamespacesListState { - namespaces, - selected_idx, - }), + TanicAction::UpdateTable { + namespace, + table_name, + table, + }, + prev_state, ) => { - let selected_idx = selected_idx.map(|selected_idx| { - if selected_idx == 0 { - namespaces.len() - 1 - } else { - selected_idx - 1 - } - }); + let TanicAppState { iceberg, .. } = prev_state; - TanicAppState::ViewingNamespacesList(ViewingNamespacesListState { - namespaces: namespaces.clone(), - selected_idx, - }) + let TanicIcebergState::Connected(ref mut retrieved_iceberg_metadata) = iceberg + else { + tracing::error!("panic! not connected"); + panic!(); + }; + + let Some(namespacce_desc) = + retrieved_iceberg_metadata.namespaces.get_mut(&namespace) + else { + tracing::error!("panic! ns not found"); + panic!(); + }; + + let Some(ref mut table_desc) = namespacce_desc.tables else { + tracing::error!("panic! tables not found"); + panic!(); + }; + + let Some(table_desc) = table_desc.get_mut(&table_name) else { + tracing::error!("panic! table desc not found"); + panic!(); + }; + + table_desc.table = Some(table); } ( - TanicAction::FocusNextNamespace, - TanicAppState::ViewingNamespacesList(ViewingNamespacesListState { - namespaces, - selected_idx, - }), + TanicAction::UpdateTableSummary { + namespace, + table_name, + table_summary, + }, + prev_state, ) => { - let selected_idx = selected_idx.map(|selected_idx| { - if selected_idx == namespaces.len() - 1 { - 0 - } else { - selected_idx + 1 + let TanicAppState { iceberg, .. } = prev_state; + + let TanicIcebergState::Connected(ref mut retrieved_iceberg_metadata) = iceberg + else { + tracing::error!("panic! not connected"); + panic!(); + }; + + let Some(namespace_desc) = + retrieved_iceberg_metadata.namespaces.get_mut(&namespace) + else { + tracing::error!("panic! ns not found"); + panic!(); + }; + + let Some(ref mut table_desc) = namespace_desc.tables else { + tracing::error!("panic! tables not found"); + panic!(); + }; + + let Some(table_desc) = table_desc.get_mut(&table_name) else { + tracing::error!("panic! table desc not found"); + panic!(); + }; + + if let Some(row_count_str) = table_summary.get(TABLE_SUMMARY_KEY_ROW_COUNT) { + if let Ok(row_count) = row_count_str.trim().parse::() { + table_desc.row_count = Some(row_count); + tracing::info!( + table_row_count = row_count, + orig_ns_rows = namespace_desc.row_count, + "Bumping NS row count" + ); + namespace_desc.row_count = namespace_desc + .row_count + .map(|rc| rc + row_count) + .or(Some(row_count)); } - }); + } - TanicAppState::ViewingNamespacesList(ViewingNamespacesListState { - namespaces: namespaces.clone(), - selected_idx, - }) + table_desc.current_snapshot_summary = Some(table_summary); } ( - TanicAction::SelectNamespace, - TanicAppState::ViewingNamespacesList(ViewingNamespacesListState { - selected_idx, - namespaces, - }), - ) => TanicAppState::RetrievingTableList(ViewingNamespacesListState { - selected_idx: *selected_idx, - namespaces: namespaces.clone(), - }), + TanicAction::UpdateTableCurrentSnapshot { + namespace, + table_name, + snapshot, + }, + prev_state, + ) => { + let TanicAppState { iceberg, .. } = prev_state; + + let TanicIcebergState::Connected(ref mut retrieved_iceberg_metadata) = iceberg + else { + tracing::error!("panic! not connected"); + panic!(); + }; + + let Some(namespacce_desc) = + retrieved_iceberg_metadata.namespaces.get_mut(&namespace) + else { + tracing::error!("panic! ns not found"); + panic!(); + }; + + let Some(ref mut table_desc) = namespacce_desc.tables else { + tracing::error!("panic! tble desc not found"); + panic!(); + }; + + let Some(table_desc) = table_desc.get_mut(&table_name) else { + tracing::error!("panic! table not found"); + panic!(); + }; + + table_desc.current_snapshot = Some(snapshot); + } ( - TanicAction::RetrievedTableList(namespace, tables), - TanicAppState::RetrievingTableList(ViewingNamespacesListState { - selected_idx: namespace_selected_idx, - namespaces, - }), + TanicAction::UpdateTableCurrentManifestList { + namespace, + table_name, + manifest_list, + }, + prev_state, ) => { - let table_selected_idx = if tables.is_empty() { None } else { Some(0) }; + let TanicAppState { iceberg, .. } = prev_state; + + let TanicIcebergState::Connected(ref mut retrieved_iceberg_metadata) = iceberg + else { + tracing::error!("panic! not connected"); + panic!(); + }; + + let Some(namespacce_desc) = + retrieved_iceberg_metadata.namespaces.get_mut(&namespace) + else { + tracing::error!("panic!ns not found"); + panic!(); + }; + + let Some(ref mut table_desc) = namespacce_desc.tables else { + tracing::error!("panic! tables not found"); + panic!(); + }; + + let Some(table_desc) = table_desc.get_mut(&table_name) else { + tracing::error!("panic! table not found"); + panic!(); + }; + + table_desc.current_manifest_list = Some(*manifest_list); + } - TanicAppState::ViewingTablesList(ViewingTablesListState { - namespaces: ViewingNamespacesListState { - selected_idx: *namespace_selected_idx, - namespaces: namespaces.clone(), - }, + ( + TanicAction::UpdateTableManifest { namespace, - tables, - selected_idx: table_selected_idx, - }) + table_name, + manifest, + file_path: uri, + }, + prev_state, + ) => { + let TanicAppState { iceberg, .. } = prev_state; + + let TanicIcebergState::Connected(ref mut retrieved_iceberg_metadata) = iceberg + else { + tracing::error!("panic! not connected"); + panic!(); + }; + + let Some(namespacce_desc) = + retrieved_iceberg_metadata.namespaces.get_mut(&namespace) + else { + panic!(); + }; + + let Some(ref mut table_desc) = namespacce_desc.tables else { + panic!(); + }; + + let Some(table_desc) = table_desc.get_mut(&table_name) else { + panic!(); + }; + + table_desc.manifests.insert(uri, *manifest); } ( - TanicAction::FocusPrevTable, - TanicAppState::ViewingTablesList(ViewingTablesListState { - namespaces, + TanicAction::UpdateTableDataFile { namespace, - tables, - selected_idx, - }), + table_name, + data_file, + }, + prev_state, ) => { - let selected_idx = selected_idx.map(|selected_idx| { - if selected_idx == 0 { - tables.len() - 1 - } else { - selected_idx - 1 - } - }); + let TanicAppState { iceberg, .. } = prev_state; - TanicAppState::ViewingTablesList(ViewingTablesListState { - namespaces: namespaces.clone(), - namespace: namespace.clone(), - tables: tables.clone(), - selected_idx, - }) + let TanicIcebergState::Connected(ref mut retrieved_iceberg_metadata) = iceberg + else { + panic!(); + }; + + let Some(namespacce_desc) = + retrieved_iceberg_metadata.namespaces.get_mut(&namespace) + else { + panic!(); + }; + + let Some(ref mut table_desc) = namespacce_desc.tables else { + panic!(); + }; + + let Some(table_desc) = table_desc.get_mut(&table_name) else { + panic!(); + }; + + table_desc + .datafiles + .insert(data_file.file_path().to_string(), *data_file); } ( - TanicAction::FocusNextTable, - TanicAppState::ViewingTablesList(ViewingTablesListState { - namespaces, + TanicAction::UpdateTableParquetMetaData { namespace, - tables, - selected_idx, - }), + table_name, + file_path, + metadata, + }, + prev_state, ) => { - let selected_idx = selected_idx.map(|selected_idx| { - if selected_idx == tables.len() - 1 { - 0 + let TanicAppState { iceberg, .. } = prev_state; + + let TanicIcebergState::Connected(ref mut retrieved_iceberg_metadata) = iceberg + else { + panic!(); + }; + + let Some(namespacce_desc) = + retrieved_iceberg_metadata.namespaces.get_mut(&namespace) + else { + panic!(); + }; + + let Some(ref mut table_desc) = namespacce_desc.tables else { + panic!(); + }; + + let Some(table_desc) = table_desc.get_mut(&table_name) else { + panic!(); + }; + + table_desc.parquet_metadata.insert(file_path, *metadata); + } + + (TanicAction::FocusPrevNamespace, prev_state) => { + let TanicAppState { iceberg, ui, .. } = prev_state; + + let TanicUiState::ViewingNamespacesList(ref mut viewing_namespaces_list_state) = ui + else { + panic!(); + }; + + let TanicIcebergState::Connected(ref mut retrieved_iceberg_metadata) = iceberg + else { + panic!(); + }; + + viewing_namespaces_list_state.selected_idx = viewing_namespaces_list_state + .selected_idx + .map(|selected_idx| { + if selected_idx == 0 { + retrieved_iceberg_metadata.namespaces.len() - 1 + } else { + selected_idx - 1 + } + }); + } + + (TanicAction::FocusNextNamespace, prev_state) => { + let TanicAppState { iceberg, ui, .. } = prev_state; + + let TanicUiState::ViewingNamespacesList(ref mut viewing_namespaces_list_state) = ui + else { + panic!(); + }; + + let TanicIcebergState::Connected(ref mut retrieved_iceberg_metadata) = iceberg + else { + panic!(); + }; + + viewing_namespaces_list_state.selected_idx = viewing_namespaces_list_state + .selected_idx + .map(|selected_idx| { + if selected_idx == retrieved_iceberg_metadata.namespaces.len() - 1 { + 0 + } else { + selected_idx + 1 + } + }); + } + + (TanicAction::SelectNamespace, prev_state) => { + let TanicAppState { iceberg, ui, .. } = prev_state; + + let TanicUiState::ViewingNamespacesList(namespaces) = ui else { + panic!(); + }; + + let TanicIcebergState::Connected(ref iceberg_state) = iceberg else { + panic!(); + }; + + let has_some_tables = if let Some(selected_namespace_idx) = namespaces.selected_idx + { + if let Some((_, ns)) = + iceberg_state.namespaces.get_index(selected_namespace_idx) + { + if let Some(ref tables) = ns.tables { + !tables.is_empty() + } else { + false + } } else { - selected_idx + 1 + false } - }); + } else { + false + }; - TanicAppState::ViewingTablesList(ViewingTablesListState { + let selected_idx = if has_some_tables { Some(0) } else { None }; + + self.ui = TanicUiState::ViewingTablesList(ViewingTablesListState { namespaces: namespaces.clone(), - namespace: namespace.clone(), - tables: tables.clone(), selected_idx, - }) + }); } - (TanicAction::SelectTable, _) => self, + (TanicAction::FocusPrevTable, prev_state) => { + let TanicAppState { iceberg, ui, .. } = prev_state; - ( - TanicAction::LeaveNamespace, - TanicAppState::ViewingTablesList(ViewingTablesListState { namespaces, .. }), - ) => TanicAppState::ViewingNamespacesList(namespaces.clone()), + let TanicIcebergState::Connected(ref mut retrieved_iceberg_metadata) = iceberg + else { + panic!(); + }; + + let TanicUiState::ViewingTablesList(ref mut viewing_tables_list_state) = ui else { + panic!(); + }; + + let Some(namespace_selected_idx) = + viewing_tables_list_state.namespaces.selected_idx + else { + panic!(); + }; + + let Some(&namespace_selected_name) = retrieved_iceberg_metadata + .namespaces + .keys() + .collect::>() + .get(namespace_selected_idx) + else { + panic!(); + }; + + let Some(namespace) = retrieved_iceberg_metadata + .namespaces + .get(namespace_selected_name) + else { + panic!(); + }; + + if let Some(ref table_list) = namespace.tables { + let table_list_len = table_list.len(); + + viewing_tables_list_state.selected_idx = + viewing_tables_list_state.selected_idx.map(|selected_idx| { + if selected_idx == 0 { + table_list_len - 1 + } else { + selected_idx - 1 + } + }); + } + } - _ => self, + (TanicAction::FocusNextTable, prev_state) => { + let TanicAppState { iceberg, ui, .. } = prev_state; + + let TanicIcebergState::Connected(ref mut retrieved_iceberg_metadata) = iceberg + else { + panic!(); + }; + + let TanicUiState::ViewingTablesList(ref mut viewing_tables_list_state) = ui else { + panic!(); + }; + + let Some(namespace_selected_idx) = + viewing_tables_list_state.namespaces.selected_idx + else { + panic!(); + }; + + let Some(&namespace_selected_name) = retrieved_iceberg_metadata + .namespaces + .keys() + .collect::>() + .get(namespace_selected_idx) + else { + panic!(); + }; + + let Some(namespace) = retrieved_iceberg_metadata + .namespaces + .get(namespace_selected_name) + else { + panic!(); + }; + + if let Some(ref table_list) = namespace.tables { + viewing_tables_list_state.selected_idx = + viewing_tables_list_state.selected_idx.map(|selected_idx| { + if selected_idx == table_list.len() - 1 { + 0 + } else { + selected_idx + 1 + } + }); + } + } + + (TanicAction::Escape, TanicAppState { ui, .. }) => { + #[allow(clippy::single_match)] // remove once more than one match below + match ui { + TanicUiState::ViewingTablesList(ViewingTablesListState { + namespaces, .. + }) => self.ui = TanicUiState::ViewingNamespacesList(namespaces.clone()), + + // TODO: Escape from Partition and DataFile + _ => {} + } + } + + (TanicAction::SelectTable, prev_state) => { + let TanicAppState { iceberg, ui, .. } = prev_state; + + let TanicIcebergState::Connected(ref iceberg_state) = iceberg else { + panic!(); + }; + + let TanicUiState::ViewingTablesList(ref mut viewing_tables_list_state) = ui else { + panic!(); + }; + + // self.ui = TanicUiState::ViewingTable(ViewingTableState { + // namespaces: namespaces.clone(), + // selected_idx, + // }); + } + + // TODO: + + // * SelectTable + // * FocusNextPartition + // * FocusPrevPartition, + // * SelectPartition, + // * FocusNextDataFile, + // * FocusPrevDataFile, + // * SelectDataFile + _ => { + unimplemented!() + } } + + self } } diff --git a/tanic-tui/Cargo.toml b/tanic-tui/Cargo.toml index 2897a25..b95dc76 100644 --- a/tanic-tui/Cargo.toml +++ b/tanic-tui/Cargo.toml @@ -33,3 +33,4 @@ futures = "0.3.31" tui-logger = "0.14.1" treemap = "0.3.2" tokio-stream = { version = "0.1.17", features = ["sync"] } +num-format = { workspace = true, features = ["with-system-locale"] } diff --git a/tanic-tui/component.rs b/tanic-tui/component.rs new file mode 100644 index 0000000..77864db --- /dev/null +++ b/tanic-tui/component.rs @@ -0,0 +1,18 @@ +use num_format::SystemLocale; +use ratatui::crossterm::event::{KeyEvent, MouseEvent}; +use ratatui::prelude::*; + +use tanic_svc::TanicAction; + +pub trait Component { + fn handle_key_event(&mut self, _key: KeyEvent) -> Option { + None + } + + #[allow(dead_code)] // not using any mouse events yet + fn handle_mouse_event(&mut self, _mouse: MouseEvent) -> Option { + None + } + + fn render(&self, area: Rect, buf: &mut Buffer, locale: &SystemLocale); +} diff --git a/tanic-tui/lib.rs b/tanic-tui/lib.rs index 94bf98e..803e3ac 100644 --- a/tanic-tui/lib.rs +++ b/tanic-tui/lib.rs @@ -1,13 +1,17 @@ use crossterm::event::{Event, EventStream}; use ratatui::Frame; +use std::sync::Arc; +use std::sync::RwLock; use tokio::sync::mpsc::UnboundedSender as MpscSender; use tokio::sync::watch::Receiver as WatchReceiver; use tokio_stream::{wrappers::WatchStream, StreamExt}; use crate::ui_components::app_container::AppContainer; use tanic_core::{Result, TanicError}; +use tanic_svc::state::TanicUiState; use tanic_svc::{TanicAction, TanicAppState}; +mod component; mod ui_components; pub struct TanicTui { @@ -19,16 +23,33 @@ impl TanicTui { Self { action_tx } } - pub async fn event_loop(self, state_rx: WatchReceiver) -> Result<()> { + pub async fn event_loop( + self, + state_rx: WatchReceiver<()>, + state: Arc>, + ) -> Result<()> { let mut terminal = ratatui::init(); let mut term_event_stream = EventStream::new(); let mut state_stream = WatchStream::new(state_rx); - let mut state = TanicAppState::Initializing; + let Some(_) = state_stream.next().await else { + tracing::debug!("state_stream.next().await done"); + return Ok(()); + }; - while !matches!(&state, TanicAppState::Exiting) { - let ui = AppContainer::new(&state); - terminal.draw(|frame| self.draw(frame, &ui))?; + let ui = AppContainer::new(state.clone()); + + loop { + { + tracing::debug!("state.read"); + let state = state.read().unwrap(); + tracing::debug!("state.read done"); + if matches!(state.ui, TanicUiState::Exiting) { + break; + } + + terminal.draw(|frame| self.draw(frame, &ui))?; + }; tokio::select! { // Catch and handle crossterm events @@ -46,9 +67,7 @@ impl TanicTui { }, // Handle state updates - Some(new_state) = state_stream.next() => { - state = new_state; - }, + _ = state_stream.next() => {} } } diff --git a/tanic-tui/ui_components/app_container.rs b/tanic-tui/ui_components/app_container.rs index 52d985a..6e11f92 100644 --- a/tanic-tui/ui_components/app_container.rs +++ b/tanic-tui/ui_components/app_container.rs @@ -1,3 +1,4 @@ +use crate::component::Component; use crate::ui_components::{ namespace_list_view::NamespaceListView, splash_screen::SplashScreen, table_list_view::TableListView, @@ -7,24 +8,26 @@ use ratatui::buffer::Buffer; use ratatui::layout::{Constraint, Layout, Rect}; use ratatui::prelude::{Color, Style, Widget}; use ratatui::widgets::Block; +use std::sync::{Arc, RwLock, TryLockError}; +use tanic_svc::state::TanicUiState; use tanic_svc::{TanicAction, TanicAppState}; use tui_logger::{LevelFilter, TuiLoggerLevelOutput, TuiLoggerWidget, TuiWidgetState}; -pub(crate) struct AppContainer<'a> { - state: &'a TanicAppState, - namespace_list_view: NamespaceListView<'a>, - table_list_view: TableListView<'a>, - splash_screen: SplashScreen<'a>, +pub(crate) struct AppContainer { + state: Arc>, + namespace_list_view: NamespaceListView, + table_list_view: TableListView, + splash_screen: SplashScreen, } -impl<'a> AppContainer<'a> { - pub(crate) fn new(state: &'a TanicAppState) -> Self { +impl AppContainer { + pub(crate) fn new(state: Arc>) -> Self { Self { - state, + state: state.clone(), - namespace_list_view: NamespaceListView::new(state), - table_list_view: TableListView::new(state), - splash_screen: SplashScreen::new(state), + namespace_list_view: NamespaceListView::new(state.clone()), + table_list_view: TableListView::new(state.clone()), + splash_screen: SplashScreen::new(state.clone()), } } @@ -37,20 +40,33 @@ impl<'a> AppContainer<'a> { // User pressed Q. Dispatch an exit action Some(TanicAction::Exit) } - key_event => match &self.state { - TanicAppState::ViewingNamespacesList(_) => { - self.namespace_list_view.handle_key_event(key_event) - } - TanicAppState::ViewingTablesList(_) => { - self.table_list_view.handle_key_event(key_event) + + key_event => { + tracing::debug!("key_event self.state.read"); + let state = match self.state.read() { + Ok(state) => state, + Err(err) => { + tracing::error!(?err, %err, "poison ☠"); + panic!(); + } + }; + tracing::debug!("key_event self.state.read done"); + + match state.ui { + TanicUiState::ViewingNamespacesList(_) => { + (&self.namespace_list_view).handle_key_event(key_event) + } + TanicUiState::ViewingTablesList(_) => { + (&self.table_list_view).handle_key_event(key_event) + } + _ => None, } - _ => None, - }, + } } } } -impl Widget for &AppContainer<'_> { +impl Widget for &AppContainer { fn render(self, area: Rect, buf: &mut Buffer) { let [top, bottom] = Layout::vertical([Constraint::Fill(1), Constraint::Max(6)]).areas(area); @@ -70,12 +86,33 @@ impl Widget for &AppContainer<'_> { .state(&filter_state) .render(bottom, buf); - match &self.state { - TanicAppState::Initializing => self.splash_screen.render(top, buf), - TanicAppState::ViewingNamespacesList(_) => self.namespace_list_view.render(top, buf), - TanicAppState::ViewingTablesList(_) => self.table_list_view.render(top, buf), - TanicAppState::Exiting => {} - _ => {} + { + tracing::debug!("render self.state.read"); + let state = match self.state.try_read() { + Ok(state) => state, + Err(TryLockError::Poisoned(err)) => { + tracing::error!(?err, %err, "poison ☠"); + panic!(); + } + Err(TryLockError::WouldBlock) => { + tracing::error!("WouldBlock"); + + // just skip this render if we can't get a read lock + return; + } + }; + + match state.ui { + TanicUiState::SplashScreen => self.splash_screen.render(top, buf), + TanicUiState::ViewingNamespacesList(_) => { + (&self.namespace_list_view).render(top, buf, &state.locale) + } + TanicUiState::ViewingTablesList(_) => { + (&self.table_list_view).render(top, buf, &state.locale) + } + TanicUiState::Exiting => {} // _ => {} + } } + tracing::debug!("render self.state.read done"); } } diff --git a/tanic-tui/ui_components/mod.rs b/tanic-tui/ui_components/mod.rs index c09dbe1..f577c19 100644 --- a/tanic-tui/ui_components/mod.rs +++ b/tanic-tui/ui_components/mod.rs @@ -1,4 +1,7 @@ pub(crate) mod app_container; +pub(crate) mod namespace_list_item; pub(crate) mod namespace_list_view; -mod splash_screen; +pub mod splash_screen; +pub(crate) mod table_list_item; pub(crate) mod table_list_view; +pub mod treemap_layout; diff --git a/tanic-tui/ui_components/namespace_list_item.rs b/tanic-tui/ui_components/namespace_list_item.rs new file mode 100644 index 0000000..9cdf382 --- /dev/null +++ b/tanic-tui/ui_components/namespace_list_item.rs @@ -0,0 +1,67 @@ +use crate::component::Component; +use num_format::{SystemLocale, ToFormattedString}; +use ratatui::prelude::*; +use ratatui::symbols::border; +use ratatui::widgets::{Block, Paragraph}; +use tanic_svc::state::NamespaceDescriptor; + +const NERD_FONT_ICON_TABLE_FOLDER: &str = "\u{f12e4}"; // 󱋤 + +pub(crate) struct NamespaceListItem<'a> { + pub(crate) ns: &'a NamespaceDescriptor, + pub(crate) is_selected: bool, +} + +impl<'a> NamespaceListItem<'a> { + pub(crate) fn new(ns: &'a NamespaceDescriptor, is_selected: bool) -> Self { + Self { ns, is_selected } + } +} + +impl Component for &NamespaceListItem<'_> { + fn render(&self, area: Rect, buf: &mut Buffer, locale: &SystemLocale) { + let mut block = Block::new().border_set(border::THICK); + let block_inner = block.inner(area); + + if self.is_selected { + block = block.style(Style::new().bg(Color::Cyan)); + } + + let name = self.ns.name.clone(); + let tables = &self.ns.tables; + let table_count = tables.as_ref().map(|t| t.len()).unwrap_or(0); + let plural_suffix = if table_count == 1 { "" } else { "s" }; + + let row_count = self.ns.row_count.unwrap_or(0); + let row_plural_suffix = if row_count == 1 { "" } else { "s" }; + + let name = format!( + "{} {} ({} table{}, {} row{})", + NERD_FONT_ICON_TABLE_FOLDER, + name, + table_count.to_formatted_string(locale), + plural_suffix, + row_count.to_formatted_string(locale), + row_plural_suffix + ); + + let para_rect = Rect::new( + block_inner.x, + block_inner.y + (block_inner.height / 2), + block_inner.width, + 1, + ); + + let mut para = Paragraph::new(name) + .alignment(Alignment::Center) + .white() + .bold(); + + if self.is_selected { + para = para.black(); + } + + block.render(area, buf); + para.render(para_rect, buf); + } +} diff --git a/tanic-tui/ui_components/namespace_list_view.rs b/tanic-tui/ui_components/namespace_list_view.rs index eadb574..5b744ab 100644 --- a/tanic-tui/ui_components/namespace_list_view.rs +++ b/tanic-tui/ui_components/namespace_list_view.rs @@ -1,24 +1,27 @@ +use crate::component::Component; +use crate::ui_components::namespace_list_item::NamespaceListItem; +use crate::ui_components::treemap_layout::TreeMapLayout; use crossterm::event::{KeyCode, KeyEvent}; +use num_format::SystemLocale; use ratatui::prelude::*; -use ratatui::widgets::canvas::{Canvas, Rectangle}; +use ratatui::symbols::border; use ratatui::widgets::Block; -use treemap::{MapItem, Mappable, Rect as TreeMapRect, TreemapLayout}; - +use std::sync::{Arc, RwLock, TryLockError}; +use tanic_svc::state::{TanicIcebergState, TanicUiState}; use tanic_svc::{TanicAction, TanicAppState}; -// find more at https://www.nerdfonts.com/cheat-sheet -const NERD_FONT_ICON_TABLE_FOLDER: &str = "\u{f12e4}"; // 󱋤 - -pub(crate) struct NamespaceListView<'a> { - state: &'a TanicAppState, +pub(crate) struct NamespaceListView { + state: Arc>, } -impl<'a> NamespaceListView<'a> { - pub(crate) fn new(state: &'a TanicAppState) -> Self { +impl NamespaceListView { + pub(crate) fn new(state: Arc>) -> Self { Self { state } } +} - pub(crate) fn handle_key_event(&self, key_event: KeyEvent) -> Option { +impl Component for &NamespaceListView { + fn handle_key_event(&mut self, key_event: KeyEvent) -> Option { match key_event.code { KeyCode::Left => Some(TanicAction::FocusPrevNamespace), KeyCode::Right => Some(TanicAction::FocusNextNamespace), @@ -26,79 +29,68 @@ impl<'a> NamespaceListView<'a> { _ => None, } } -} - -impl Widget for &NamespaceListView<'_> { - fn render(self, area: Rect, buf: &mut Buffer) { - let layout = TreemapLayout::new(); - let bounds = TreeMapRect::from_points( - area.x as f64, - area.y as f64, - area.width as f64, - area.height as f64, - ); - - let TanicAppState::ViewingNamespacesList(view_state) = self.state else { - panic!(); - }; - - let mut items: Vec> = view_state - .namespaces - .iter() - .map(|namespace| { - let res: Box = - Box::new(MapItem::with_size(namespace.table_count.max(1) as f64)); - res - }) - .collect::>(); - layout.layout_items(&mut items, bounds); + fn render(&self, area: Rect, buf: &mut Buffer, locale: &SystemLocale) { + let block = Block::bordered() + .title(" Tanic //// Root Namespaces") + .border_set(border::PLAIN); + let block_inner_area = block.inner(area); + + { + tracing::debug!("render self.state.read"); + let state = match self.state.try_read() { + Ok(state) => state, + Err(TryLockError::Poisoned(err)) => { + tracing::error!(?err, %err, "poison ☠"); + panic!(); + } + Err(TryLockError::WouldBlock) => { + tracing::error!("WouldBlock"); - let selected_idx = view_state.selected_idx; + // just skip this render if we can't get a read lock + return; + } + }; - let canvas = Canvas::default() - .block(Block::bordered().title(" Tanic //// Root Namespaces")) - .x_bounds([area.x as f64, (area.x + area.width) as f64]) - .y_bounds([area.y as f64, (area.y + area.height) as f64]) - .paint(|ctx| { - for (idx, item) in items.iter().enumerate() { - let item_bounds = item.bounds(); + let items = self.get_items(&state); - let rect = Rectangle { - x: item_bounds.x, - y: item_bounds.y, - width: item_bounds.w, - height: item_bounds.h, - color: Color::White, - }; + let children: Vec<(&NamespaceListItem, usize)> = items + .iter() + .map(|item| { + let tables = &item.ns.tables; + let table_count = tables.as_ref().map(|t| t.len()).unwrap_or(0); + (item, table_count) + }) + .collect::>(); - ctx.draw(&rect); + let layout = TreeMapLayout::new(children); - let style = if Some(idx) == selected_idx { - Style::new().black().bold().on_white() - } else { - Style::new().white() - }; + block.render(area, buf); + (&layout).render(block_inner_area, buf, locale); + } + tracing::debug!("render self.state.read done"); + } +} - let ns = &view_state.namespaces[idx]; - let name = ns.name.clone(); - let plural_suffix = if ns.table_count == 1 { "" } else { "s" }; - let name = format!( - "{} {} ({} table{})", - NERD_FONT_ICON_TABLE_FOLDER, name, ns.table_count, plural_suffix - ); +impl NamespaceListView { + fn get_items<'a>(&self, state: &'a TanicAppState) -> Vec> { + let TanicIcebergState::Connected(ref iceberg_state) = state.iceberg else { + return vec![]; + }; - let name_len = name.len(); - let text = Line::styled(name, style); + let TanicUiState::ViewingNamespacesList(ref view_state) = state.ui else { + return vec![]; + }; - ctx.print( - item_bounds.x + (item_bounds.w * 0.5) - (name_len as f64 * 0.5), - item_bounds.y + (item_bounds.h * 0.5), - text, - ); - } - }); + let items = iceberg_state + .namespaces + .iter() + .enumerate() + .map(|(idx, (_, ns))| { + NamespaceListItem::new(ns, view_state.selected_idx.unwrap_or(usize::MAX) == idx) + }) + .collect::>(); - canvas.render(area, buf); + items } } diff --git a/tanic-tui/ui_components/splash_screen.rs b/tanic-tui/ui_components/splash_screen.rs index 1958539..ac12312 100644 --- a/tanic-tui/ui_components/splash_screen.rs +++ b/tanic-tui/ui_components/splash_screen.rs @@ -1,19 +1,20 @@ use ratatui::prelude::*; use ratatui::symbols::border; use ratatui::widgets::{Block, Paragraph}; +use std::sync::{Arc, RwLock}; use tanic_svc::TanicAppState; -pub(crate) struct SplashScreen<'a> { - _state: &'a TanicAppState, +pub(crate) struct SplashScreen { + _state: Arc>, } -impl<'a> SplashScreen<'a> { - pub(crate) fn new(state: &'a TanicAppState) -> Self { +impl SplashScreen { + pub(crate) fn new(state: Arc>) -> Self { Self { _state: state } } } -impl Widget for &SplashScreen<'_> { +impl Widget for &SplashScreen { fn render(self, area: Rect, buf: &mut Buffer) { let style = Style::new().white().bold(); let title = Line::styled(" Tanic ".to_string(), style); diff --git a/tanic-tui/ui_components/table_list_item.rs b/tanic-tui/ui_components/table_list_item.rs new file mode 100644 index 0000000..57ade16 --- /dev/null +++ b/tanic-tui/ui_components/table_list_item.rs @@ -0,0 +1,59 @@ +use crate::component::Component; +use num_format::{SystemLocale, ToFormattedString}; +use ratatui::prelude::*; +use ratatui::symbols::border; +use ratatui::widgets::{Block, Paragraph}; +use tanic_svc::state::TableDescriptor; + +const NERD_FONT_ICON_TABLE: &str = "\u{ebb7}"; //  + +pub(crate) struct TableListItem<'a> { + pub(crate) table: &'a TableDescriptor, + pub(crate) is_selected: bool, +} + +impl<'a> TableListItem<'a> { + pub(crate) fn new(table: &'a TableDescriptor, is_selected: bool) -> Self { + Self { table, is_selected } + } +} + +impl Component for &TableListItem<'_> { + fn render(&self, area: Rect, buf: &mut Buffer, locale: &SystemLocale) { + let mut block = Block::new().border_set(border::THICK); + let block_inner = block.inner(area); + + if self.is_selected { + block = block.style(Style::new().bg(Color::Cyan)); + } + + let name = self.table.name.clone(); + + let row_count_str = match self.table.row_count() { + None => "".to_string(), + Some(1) => " (1 row)".to_string(), + Some(n) => format!(" ({} rows)", n.to_formatted_string(locale)), + }; + + let name = format!("{} {}{}", NERD_FONT_ICON_TABLE, name, row_count_str); + + let para_rect = Rect::new( + block_inner.x, + block_inner.y + (block_inner.height / 2), + block_inner.width, + 1, + ); + + let mut para = Paragraph::new(name) + .alignment(Alignment::Center) + .white() + .bold(); + + if self.is_selected { + para = para.black(); + } + + block.render(area, buf); + para.render(para_rect, buf); + } +} diff --git a/tanic-tui/ui_components/table_list_view.rs b/tanic-tui/ui_components/table_list_view.rs index 88fd36e..e9a9438 100644 --- a/tanic-tui/ui_components/table_list_view.rs +++ b/tanic-tui/ui_components/table_list_view.rs @@ -1,103 +1,102 @@ +use crate::component::Component; +use crate::ui_components::table_list_item::TableListItem; +use crate::ui_components::treemap_layout::TreeMapLayout; use crossterm::event::{KeyCode, KeyEvent}; +use num_format::SystemLocale; use ratatui::prelude::*; -use ratatui::widgets::canvas::{Canvas, Rectangle}; +use ratatui::symbols::border; use ratatui::widgets::Block; -use treemap::{MapItem, Mappable, Rect as TreeMapRect, TreemapLayout}; - +use std::sync::{Arc, RwLock}; +use tanic_svc::state::{ + RetrievedIcebergMetadata, TanicIcebergState, TanicUiState, ViewingTablesListState, +}; use tanic_svc::{TanicAction, TanicAppState}; -// find more at https://www.nerdfonts.com/cheat-sheet -const NERD_FONT_ICON_TABLE: &str = "\u{ebb7}"; //  - -pub(crate) struct TableListView<'a> { - state: &'a TanicAppState, +pub(crate) struct TableListView { + state: Arc>, } -impl<'a> TableListView<'a> { - pub(crate) fn new(state: &'a TanicAppState) -> Self { +impl TableListView { + pub(crate) fn new(state: Arc>) -> Self { Self { state } } +} - pub(crate) fn handle_key_event(&self, key_event: KeyEvent) -> Option { +impl Component for &TableListView { + fn handle_key_event(&mut self, key_event: KeyEvent) -> Option { match key_event.code { KeyCode::Left => Some(TanicAction::FocusPrevTable), KeyCode::Right => Some(TanicAction::FocusNextTable), KeyCode::Enter => Some(TanicAction::SelectTable), - KeyCode::Esc => Some(TanicAction::LeaveNamespace), + KeyCode::Esc => Some(TanicAction::Escape), _ => None, } } -} -impl Widget for &TableListView<'_> { - fn render(self, area: Rect, buf: &mut Buffer) { - let layout = TreemapLayout::new(); - let bounds = TreeMapRect::from_points( - area.x as f64, - area.y as f64, - area.width as f64, - area.height as f64, - ); - - let TanicAppState::ViewingTablesList(view_state) = self.state else { + fn render(&self, area: Rect, buf: &mut Buffer, locale: &SystemLocale) { + tracing::debug!("self.state.read"); + let state = self.state.read().unwrap(); + tracing::debug!("self.state.read done"); + + let TanicIcebergState::Connected(ref iceberg_state) = state.iceberg else { panic!(); }; - let mut items: Vec> = view_state - .tables + let TanicUiState::ViewingTablesList(ref view_state) = state.ui else { + panic!(); + }; + + let block = Block::bordered() + .title(format!( + " Tanic //// {} Namespace ", + view_state + .namespaces + .selected_idx + .and_then(|idx| iceberg_state.namespaces.get_index(idx)) + .map(|(k, _)| k.to_string()) + .unwrap_or("???".to_string()) + )) + .border_set(border::PLAIN); + let block_inner_area = block.inner(area); + + let items = TableListView::get_items(iceberg_state, view_state); + + let children: Vec<(&TableListItem, usize)> = items .iter() - .map(|table| { - let res: Box = - Box::new(MapItem::with_size(table.row_count.max(1) as f64)); - res - }) + .map(|item| (item, item.table.row_count().unwrap_or(1) as usize)) .collect::>(); - layout.layout_items(&mut items, bounds); + let layout = TreeMapLayout::new(children); - let selected_idx = view_state.selected_idx; + block.render(area, buf); + (&layout).render(block_inner_area, buf, locale); + } +} - let canvas = Canvas::default() - .block(Block::bordered().title(format!( - " Tanic //// {} Namespace ", - view_state.namespace.name - ))) - .x_bounds([area.x as f64, (area.x + area.width) as f64]) - .y_bounds([area.y as f64, (area.y + area.height) as f64]) - .paint(|ctx| { - for (idx, item) in items.iter().enumerate() { - let item_bounds = item.bounds(); - - let rect = Rectangle { - x: item_bounds.x, - y: item_bounds.y, - width: item_bounds.w, - height: item_bounds.h, - color: Color::White, - }; - - ctx.draw(&rect); - - let style = if Some(idx) == selected_idx { - Style::new().black().bold().on_white() - } else { - Style::new().white() - }; - - let name = view_state.tables[idx].name.clone(); - let name = format!("{} {}", NERD_FONT_ICON_TABLE, name); - - let name_len = name.len(); - let text = Line::styled(name, style); - - ctx.print( - item_bounds.x + (item_bounds.w * 0.5) - (name_len as f64 * 0.5), - item_bounds.y + (item_bounds.h * 0.5), - text, - ); - } - }); - - canvas.render(area, buf); +impl TableListView { + fn get_items<'a>( + iceberg_state: &'a RetrievedIcebergMetadata, + view_state: &'a ViewingTablesListState, + ) -> Vec> { + let Some(ref selected_namespace) = view_state.namespaces.selected_idx else { + return vec![]; + }; + + let Some((_, namespace_desc)) = iceberg_state.namespaces.get_index(*selected_namespace) + else { + return vec![]; + }; + + let Some(tables) = &namespace_desc.tables else { + return vec![]; + }; + + let items = tables + .iter() + .enumerate() + .map(|(idx, (_, ns))| TableListItem::new(ns, Some(idx) == view_state.selected_idx)) + .collect::>(); + + items } } diff --git a/tanic-tui/ui_components/treemap_layout.rs b/tanic-tui/ui_components/treemap_layout.rs new file mode 100644 index 0000000..34d6214 --- /dev/null +++ b/tanic-tui/ui_components/treemap_layout.rs @@ -0,0 +1,51 @@ +use num_format::SystemLocale; +use ratatui::prelude::*; +use treemap::{MapItem, Mappable, Rect as TreeMapRect, TreemapLayout}; + +use crate::component::Component; + +pub(crate) struct TreeMapLayout { + children: Vec<(T, usize)>, +} + +impl TreeMapLayout { + pub(crate) fn new(children: Vec<(T, usize)>) -> Self { + Self { children } + } +} + +impl Component for &TreeMapLayout { + fn render(&self, area: Rect, buf: &mut Buffer, locale: &SystemLocale) { + let layout = TreemapLayout::new(); + let bounds = TreeMapRect::from_points( + area.x as f64, + area.y as f64, + area.width as f64, + area.height as f64, + ); + + let mut regions: Vec> = self + .children + .iter() + .map(|&(_, size)| { + let res: Box = Box::new(MapItem::with_size(size.max(1) as f64)); + res + }) + .collect::>(); + + layout.layout_items(&mut regions, bounds); + + for ((child, _), region) in self.children.iter().zip(regions.iter()) { + let region_bounds = region.bounds(); + + let rect = Rect { + x: region_bounds.x as u16, + y: region_bounds.y as u16, + width: region_bounds.w as u16, + height: region_bounds.h as u16, + }; + + child.render(rect, buf, locale); + } + } +} diff --git a/tanic/Cargo.toml b/tanic/Cargo.toml index 4f92be0..9d8452a 100644 --- a/tanic/Cargo.toml +++ b/tanic/Cargo.toml @@ -33,5 +33,6 @@ terminal_size = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } -tracing-subscriber = { workspace = true } +tracing-subscriber = { workspace = true, features = ["json"] } tui-logger = { version = "0.14.1", features = ["tracing-support"] } +console-subscriber = "0.4.1" diff --git a/tanic/src/args.rs b/tanic/src/args.rs index 7fc5595..af3b344 100644 --- a/tanic/src/args.rs +++ b/tanic/src/args.rs @@ -6,4 +6,7 @@ use http::Uri; pub struct Args { /// URI of an Iceberg Catalog to connect to pub catalogue_uri: Option, + + #[clap(long, default_value_t = false)] + pub no_ui: bool, } diff --git a/tanic/src/logging.rs b/tanic/src/logging.rs index 56ed5ab..6b79287 100644 --- a/tanic/src/logging.rs +++ b/tanic/src/logging.rs @@ -1,3 +1,4 @@ +use std::fs::OpenOptions; use tracing::level_filters::LevelFilter; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::prelude::*; @@ -18,9 +19,36 @@ pub(crate) fn init() { .expect("Unable to set global subscriber"); } -pub(crate) fn init_tui_logger() { - tracing_subscriber::registry() - .with(tui_logger::tracing_subscriber_layer()) - .init(); - tui_logger::init_logger(tui_logger::LevelFilter::Trace).expect("Could not initialize logger"); +pub(crate) fn init_tui_logger(no_ui: bool) { + if no_ui { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().pretty()) + .init(); + } else { + let log_file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open("tanic-log.txt") + .unwrap(); + + tracing_subscriber::registry() + // .with(console_subscriber::spawn()) + .with( + EnvFilter::builder() + .with_env_var("TANIC_LOG") + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(), + ) + .with( + tracing_subscriber::fmt::layer() + .json() + .with_writer(log_file), + ) + .with(tui_logger::tracing_subscriber_layer()) + .init(); + + tui_logger::init_logger(tui_logger::LevelFilter::Debug) + .expect("Could not initialize logger"); + } } diff --git a/tanic/src/main.rs b/tanic/src/main.rs index cd846a4..4a9e3c6 100644 --- a/tanic/src/main.rs +++ b/tanic/src/main.rs @@ -14,23 +14,24 @@ mod logging; #[tokio::main] async fn main() -> Result<()> { - logging::init_tui_logger(); - let args = Args::try_parse().into_diagnostic()?; + + logging::init_tui_logger(args.no_ui); + let config = TanicConfig::load().into_diagnostic()?; tracing::info!(?config, "loaded config"); - // let config = Arc::new(RwLock::new(config)); let (app_state, action_tx, state_rx) = AppStateManager::new(config); - let tanic_tui = TanicTui::new(action_tx.clone()); - let iceberg_ctx_mgr = IcebergContextManager::new(action_tx.clone()); + let iceberg_task = tokio::spawn({ + let state_rx = state_rx.clone(); + let app_state = app_state.get_state(); + let iceberg_ctx_mgr = IcebergContextManager::new(action_tx.clone(), app_state); + async move { iceberg_ctx_mgr.event_loop(state_rx).await } + }); + + let ui_app_state = app_state.get_state(); let svc_task = tokio::spawn(async move { app_state.event_loop().await }); - let ui_state_rx = state_rx.clone(); - let ui_task = tokio::spawn(async move { tanic_tui.event_loop(ui_state_rx).await }); - let iceberg_task_state_rx = state_rx.clone(); - let iceberg_task = - tokio::spawn(async move { iceberg_ctx_mgr.event_loop(iceberg_task_state_rx).await }); if let Some(ref uri) = args.catalogue_uri { let connection = ConnectionDetails::new_anon(uri.clone()); @@ -39,9 +40,23 @@ async fn main() -> Result<()> { action_tx.send(message).into_diagnostic()?; } - tokio::select! { - _ = ui_task => Ok(()), - _ = svc_task => Ok(()), - _ = iceberg_task => Ok(()), + if args.no_ui { + tokio::select! { + _ = svc_task => Ok(()), + _ = iceberg_task => Ok(()), + } + } else { + tokio::task::spawn_blocking(move || { + tokio::spawn(async move { + let tanic_tui = TanicTui::new(action_tx.clone()); + let state_rx = state_rx.clone(); + tanic_tui.event_loop(state_rx, ui_app_state).await + }) + }); + + tokio::select! { + _ = svc_task => Ok(()), + _ = iceberg_task => Ok(()), + } } }