diff --git a/contrib/interconnect/ic_common.c b/contrib/interconnect/ic_common.c index 7e266b69efb..fc4cba2d931 100644 --- a/contrib/interconnect/ic_common.c +++ b/contrib/interconnect/ic_common.c @@ -36,6 +36,22 @@ static bool interconnect_resowner_callback_registered; * VISIBLE FUNCTIONS */ + +void +InterconnectShmemInit(void) +{ + if (Gp_interconnect_type == INTERCONNECT_TYPE_UDPIFC) + InterconnectShmemInitUDPIFC(); +} + +Size +InterconnectShmemSize(void) +{ + if (Gp_interconnect_type == INTERCONNECT_TYPE_UDPIFC) + return InterconnectShmemSizeUDPIFC(); + return 0; +} + /* See ml_ipc.h */ bool SendTupleChunkToAMS(ChunkTransportState * transportStates, diff --git a/contrib/interconnect/udp/ic_udpifc.c b/contrib/interconnect/udp/ic_udpifc.c index d11e4577cd6..b8e5857ebf9 100644 --- a/contrib/interconnect/udp/ic_udpifc.c +++ b/contrib/interconnect/udp/ic_udpifc.c @@ -64,6 +64,11 @@ #include "cdb/cdbdispatchresult.h" #include "ic_faultinjection.h" + +#include "funcapi.h" +#include "storage/lock.h" +#include "storage/pg_shmem.h" + #ifdef WIN32 #define WIN32_LEAN_AND_MEAN #ifndef _WIN32_WINNT @@ -714,6 +719,30 @@ typedef struct ICStatistics /* Statistics for UDP interconnect. */ static ICStatistics ic_statistics; + +typedef struct ICStatisticsShmem +{ + uint64 totalRecvQueueSize; + uint64 recvQueueSizeCountingTime; + uint64 totalCapacity; + uint64 capacityCountingTime; + uint64 totalBuffers; + uint64 bufferCountingTime; + uint64 retransmits; + uint64 startupCachedPktNum; + uint64 mismatchNum; + uint64 crcErrors; + uint64 sndPktNum; + uint64 recvPktNum; + uint64 disorderedPktNum; + uint64 duplicatedPktNum; + uint64 recvAckNum; + uint64 statusQueryMsgNum; +} ICStatisticsShmem; + +static ICStatisticsShmem *pICStatisticsShmem = NULL; + + /* UDP listen fd */ int UDP_listenerFd; @@ -1814,6 +1843,43 @@ ic_reset_pthread_sigmasks(sigset_t *sigs) return; } + +Size +InterconnectShmemSizeUDPIFC(void) { + return sizeof(ICStatisticsShmem); +} + +void +InterconnectShmemInitUDPIFC(void) +{ + bool found; + pICStatisticsShmem = ShmemInitStruct("global interconnect statistics", + sizeof(ICStatisticsShmem), &found); + if (pICStatisticsShmem == NULL) + ereport(FATAL, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("not enough shared memory for global interconnect statistics"))); + if (!found) + { + pICStatisticsShmem->totalRecvQueueSize = 0; + pICStatisticsShmem->recvQueueSizeCountingTime = 0; + pICStatisticsShmem->totalCapacity = 0; + pICStatisticsShmem->capacityCountingTime = 0; + pICStatisticsShmem->totalBuffers = 0; + pICStatisticsShmem->bufferCountingTime = 0; + pICStatisticsShmem->retransmits = 0; + pICStatisticsShmem->startupCachedPktNum = 0; + pICStatisticsShmem->mismatchNum = 0; + pICStatisticsShmem->crcErrors = 0; + pICStatisticsShmem->sndPktNum = 0; + pICStatisticsShmem->recvPktNum = 0; + pICStatisticsShmem->disorderedPktNum = 0; + pICStatisticsShmem->duplicatedPktNum = 0; + pICStatisticsShmem->recvAckNum = 0; + pICStatisticsShmem->statusQueryMsgNum = 0; + } +} + /* * InitMotionUDPIFC * Initialize UDP specific comms, and create rx-thread. @@ -3901,6 +3967,29 @@ chunkTransportStateEntryInitialized(ChunkTransportState *transportStates, return pEntry->valid; } + +static void updateGlobalInterconnectStats(void) +{ + LWLockAcquire(ICStatisticsLock, LW_EXCLUSIVE); + pICStatisticsShmem->totalRecvQueueSize += ic_statistics.totalRecvQueueSize; + pICStatisticsShmem->recvQueueSizeCountingTime += ic_statistics.recvQueueSizeCountingTime; + pICStatisticsShmem->totalCapacity += ic_statistics.totalCapacity; + pICStatisticsShmem->capacityCountingTime += ic_statistics.capacityCountingTime; + pICStatisticsShmem->totalBuffers += ic_statistics.totalBuffers; + pICStatisticsShmem->bufferCountingTime += ic_statistics.bufferCountingTime; + pICStatisticsShmem->retransmits += ic_statistics.retransmits; + pICStatisticsShmem->startupCachedPktNum += ic_statistics.startupCachedPktNum; + pICStatisticsShmem->mismatchNum += ic_statistics.mismatchNum; + pICStatisticsShmem->crcErrors += ic_statistics.crcErrors; + pICStatisticsShmem->sndPktNum += ic_statistics.sndPktNum; + pICStatisticsShmem->recvPktNum += ic_statistics.recvPktNum; + pICStatisticsShmem->disorderedPktNum += ic_statistics.disorderedPktNum; + pICStatisticsShmem->duplicatedPktNum += ic_statistics.duplicatedPktNum; + pICStatisticsShmem->recvAckNum += ic_statistics.recvAckNum; + pICStatisticsShmem->statusQueryMsgNum += ic_statistics.statusQueryMsgNum; + LWLockRelease(ICStatisticsLock); +} + /* * computeNetworkStatistics * Compute the max/min/avg network statistics. @@ -4207,6 +4296,9 @@ TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates, (minRtt == ~((uint64) 0) ? 0 : minRtt), (minDev == ~((uint64) 0) ? 0 : minDev), avgRtt, avgDev, maxRtt, maxDev, snd_control_info.cwnd, ic_statistics.statusQueryMsgNum); + + updateGlobalInterconnectStats(); + ic_control_info.isSender = false; memset(&ic_statistics, 0, sizeof(ICStatistics)); @@ -8224,3 +8316,82 @@ MlPutRxBufferIFC(ChunkTransportState *transportStates, int motNodeID, int route) if (param.msg.len != 0) sendAckWithParam(¶m); } + + +Datum +GpInterconnectGetStatsUDPIFC(PG_FUNCTION_ARGS) +{ + /* + * Build a tuple descriptor for our result type + * The number and type of attributes have to match the definition of the + * view gp_interconnect_stats_per_segment + */ +#define NUM_IC_STATS_ELEM 17 + TupleDesc tupdesc = CreateTemplateTupleDesc(NUM_IC_STATS_ELEM, false); + + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "segid", + INT2OID, -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "total_recv_queue_size", + INT8OID, -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "recv_queue_conting_time", + INT8OID, -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "total_capacity", + INT8OID, -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "capacity_counting_time", + INT8OID, -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "total_buffers", + INT8OID, -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "buffer_counting_time", + INT8OID, -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "retransmits", + INT8OID, -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "startup_cached_pkts", + INT8OID, -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "mismatches", + INT8OID, -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "crs_errors", + INT8OID, -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "snd_pkt_num", + INT8OID, -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber) 13, "recv_pkt_num", + INT8OID, -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber) 14, "disordered_pkt_num", + INT8OID, -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber) 15, "duplicate_pkt_num", + INT8OID, -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber) 16, "recv_ack_num", + INT8OID, -1 /* typmod */, 0 /* attdim */); + TupleDescInitEntry(tupdesc, (AttrNumber) 17, "status_query_msg_num", + INT8OID, -1 /* typmod */, 0 /* attdim */); + + tupdesc = BlessTupleDesc(tupdesc); + + Datum values[NUM_IC_STATS_ELEM]; + bool nulls[NUM_IC_STATS_ELEM]; + MemSet(nulls, 0, sizeof(nulls)); + + LWLockAcquire(ICStatisticsLock, LW_EXCLUSIVE); + values[0] = Int32GetDatum(GpIdentity.segindex); + values[1] = Int64GetDatum(pICStatisticsShmem->totalRecvQueueSize); + values[2] = Int64GetDatum(pICStatisticsShmem->recvQueueSizeCountingTime); + values[3] = Int64GetDatum(pICStatisticsShmem->totalCapacity); + values[4] = Int64GetDatum(pICStatisticsShmem->capacityCountingTime); + values[5] = Int64GetDatum(pICStatisticsShmem->totalBuffers); + values[6] = Int64GetDatum(pICStatisticsShmem->bufferCountingTime); + values[7] = Int64GetDatum(pICStatisticsShmem->retransmits); + values[8] = Int64GetDatum(pICStatisticsShmem->startupCachedPktNum); + values[9] = Int64GetDatum(pICStatisticsShmem->mismatchNum); + values[10] = Int64GetDatum(pICStatisticsShmem->crcErrors); + values[11] = Int64GetDatum(pICStatisticsShmem->sndPktNum); + values[12] = Int64GetDatum(pICStatisticsShmem->recvPktNum); + values[13] = Int64GetDatum(pICStatisticsShmem->disorderedPktNum); + values[14] = Int64GetDatum(pICStatisticsShmem->duplicatedPktNum); + values[15] = Int64GetDatum(pICStatisticsShmem->recvAckNum); + values[16] = Int64GetDatum(pICStatisticsShmem->statusQueryMsgNum); + LWLockRelease(ICStatisticsLock); + + HeapTuple tuple = heap_form_tuple(tupdesc, values, nulls); + Datum result = HeapTupleGetDatum(tuple); + + PG_RETURN_DATUM(result); +} diff --git a/gpcontrib/Makefile b/gpcontrib/Makefile index 2969194cfac..727176ed73a 100644 --- a/gpcontrib/Makefile +++ b/gpcontrib/Makefile @@ -22,7 +22,8 @@ ifeq "$(enable_debug_extensions)" "yes" gp_legacy_string_agg \ gp_replica_check \ gp_toolkit \ - pg_hint_plan + pg_hint_plan \ + gp_interconnect_stats else recurse_targets = gp_sparse_vector \ gp_distribution_policy \ @@ -30,7 +31,8 @@ else gp_legacy_string_agg \ gp_exttable_fdw \ gp_toolkit \ - pg_hint_plan + pg_hint_plan \ + gp_interconnect_stats endif ifeq "$(with_diskquota)" "yes" diff --git a/gpcontrib/gp_interconnect_stats/Makefile b/gpcontrib/gp_interconnect_stats/Makefile new file mode 100644 index 00000000000..dff071893a3 --- /dev/null +++ b/gpcontrib/gp_interconnect_stats/Makefile @@ -0,0 +1,23 @@ +EXTENSION = gp_interconnect_stats +MODULES = gp_interconnect_stats + +EXTENSION_VERSION = 1.0 + + +DATA = gp_interconnect_stats--1.0.sql + +OBJS = gp_interconnect_stats.o + +MODULE_big = gp_interconnect_stats + + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = gpcontrib/gp_interconnect_stats +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/gpcontrib/gp_interconnect_stats/gp_interconnect_stats--1.0.sql b/gpcontrib/gp_interconnect_stats/gp_interconnect_stats--1.0.sql new file mode 100644 index 00000000000..a4815b10a1d --- /dev/null +++ b/gpcontrib/gp_interconnect_stats/gp_interconnect_stats--1.0.sql @@ -0,0 +1,135 @@ +/* gpcontrib/gp_interconnect_stats/gp_interconnect_stats--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION gp_interconnect_stats" to load this file. \quit + +CREATE FUNCTION __gp_interconnect_get_stats_f_on_master( + OUT gp_segment_id smallint, + OUT total_recv_queue_size bigint, + OUT recv_queue_conting_time bigint, + OUT total_capacity bigint, + OUT capacity_counting_time bigint, + OUT total_buffers bigint, + OUT buffer_counting_time bigint, + OUT retransmits bigint, + OUT startup_cached_pkts bigint, + OUT mismatches bigint, + OUT crs_errors bigint, + OUT snd_pkt_num bigint, + OUT recv_pkt_num bigint, + OUT disordered_pkt_num bigint, + OUT duplicate_pkt_num bigint, + OUT recv_ack_num bigint, + OUT status_query_msg_num bigint +) +RETURNS SETOF record +LANGUAGE C VOLATILE EXECUTE ON MASTER +AS '$libdir/gp_interconnect_stats', 'gp_interconnect_get_stats'; + +CREATE FUNCTION __gp_interconnect_get_stats_f_on_segments( + OUT gp_segment_id smallint, + OUT total_recv_queue_size bigint, + OUT recv_queue_conting_time bigint, + OUT total_capacity bigint, + OUT capacity_counting_time bigint, + OUT total_buffers bigint, + OUT buffer_counting_time bigint, + OUT retransmits bigint, + OUT startup_cached_pkts bigint, + OUT mismatches bigint, + OUT crs_errors bigint, + OUT snd_pkt_num bigint, + OUT recv_pkt_num bigint, + OUT disordered_pkt_num bigint, + OUT duplicate_pkt_num bigint, + OUT recv_ack_num bigint, + OUT status_query_msg_num bigint +) +RETURNS SETOF record LANGUAGE C VOLATILE EXECUTE ON ALL SEGMENTS +AS '$libdir/gp_interconnect_stats', 'gp_interconnect_get_stats'; + + +-------------------------------------------------------------------------------- +-- @view: +-- gp_interconnect_stats_per_segment +-- +-- @doc: +-- Cummulative interconnect statistics per segment +-- +-------------------------------------------------------------------------------- +CREATE VIEW gp_interconnect_stats_per_segment AS + SELECT c.hostname, s.* FROM ( + SELECT * FROM __gp_interconnect_get_stats_f_on_master() + UNION ALL + SELECT * FROM __gp_interconnect_get_stats_f_on_segments() + ) s + INNER JOIN pg_catalog.gp_segment_configuration AS c + ON s.gp_segment_id = c.content + AND c.role = 'p' + ; + +GRANT SELECT ON gp_interconnect_stats_per_segment TO public; + +-------------------------------------------------------------------------------- +-- @view: +-- gp_interconnect_stats +-- +-- @doc: +-- Cummulative interconnect statistics +-- +-------------------------------------------------------------------------------- +CREATE VIEW gp_interconnect_stats AS + SELECT + sum(total_recv_queue_size) as total_recv_queue_size + , sum(recv_queue_conting_time) as recv_queue_conting_time + , sum(total_capacity) as total_capacity + , sum(capacity_counting_time) as capacity_counting_time + , sum(total_buffers) as total_buffers + , sum(buffer_counting_time) as buffer_counting_time + , sum(retransmits) as retransmits + , sum(startup_cached_pkts) as startup_cached_pkts + , sum(mismatches) as mismatches + , sum(crs_errors) as crs_errors + , sum(snd_pkt_num) as snd_pkt_num + , sum(recv_pkt_num) as recv_pkt_num + , sum(disordered_pkt_num) as disordered_pkt_num + , sum(duplicate_pkt_num) as duplicate_pkt_num + , sum(recv_ack_num) as recv_ack_num + , sum(status_query_msg_num) as status_query_msg_num + FROM gp_interconnect_stats_per_segment + ; + +GRANT SELECT ON gp_interconnect_stats TO public; + +-------------------------------------------------------------------------------- +-- @view: +-- gp_interconnect_stats_per_host +-- +-- @doc: +-- Cummulative interconnect statistics grouped by host +-- +-------------------------------------------------------------------------------- + +CREATE VIEW gp_interconnect_stats_per_host AS + SELECT + hostname + , sum(total_recv_queue_size) as total_recv_queue_size + , sum(recv_queue_conting_time) as recv_queue_conting_time + , sum(total_capacity) as total_capacity + , sum(capacity_counting_time) as capacity_counting_time + , sum(total_buffers) as total_buffers + , sum(buffer_counting_time) as buffer_counting_time + , sum(retransmits) as retransmits + , sum(startup_cached_pkts) as startup_cached_pkts + , sum(mismatches) as mismatches + , sum(crs_errors) as crs_errors + , sum(snd_pkt_num) as snd_pkt_num + , sum(recv_pkt_num) as recv_pkt_num + , sum(disordered_pkt_num) as disordered_pkt_num + , sum(duplicate_pkt_num) as duplicate_pkt_num + , sum(recv_ack_num) as recv_ack_num + , sum(status_query_msg_num) as status_query_msg_num + FROM gp_interconnect_stats_per_segment + GROUP BY hostname; + +GRANT SELECT ON gp_interconnect_stats_per_host TO public; diff --git a/gpcontrib/gp_interconnect_stats/gp_interconnect_stats.c b/gpcontrib/gp_interconnect_stats/gp_interconnect_stats.c new file mode 100644 index 00000000000..1c8fcca5179 --- /dev/null +++ b/gpcontrib/gp_interconnect_stats/gp_interconnect_stats.c @@ -0,0 +1,27 @@ +/*-------------------------------------------------------------------------- + * + * gp_interconnect_stats.c + * Routine for getting UDPIFC interconnect stats + * + **-------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "cdb/cdbvars.h" +#include "cdb/ic_udpifc.h" +#include "fmgr.h" + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(gp_interconnect_get_stats); + +Datum +gp_interconnect_get_stats(PG_FUNCTION_ARGS) +{ + if (Gp_interconnect_type == INTERCONNECT_TYPE_UDPIFC) + return GpInterconnectGetStatsUDPIFC(fcinfo); + ereport(WARNING, + (errcode(ERRCODE_WARNING_GP_INTERCONNECTION), + errmsg("Interconnect statistics are collected only for UDPIFC protocol"))); + PG_RETURN_NULL(); +} \ No newline at end of file diff --git a/gpcontrib/gp_interconnect_stats/gp_interconnect_stats.control b/gpcontrib/gp_interconnect_stats/gp_interconnect_stats.control new file mode 100644 index 00000000000..cfa3c890677 --- /dev/null +++ b/gpcontrib/gp_interconnect_stats/gp_interconnect_stats.control @@ -0,0 +1,3 @@ +comment = 'Cummulative statistics from UDPIFC interconnect protocol' +default_version = '1.0' +relocatable = false diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 6bdb32f9df9..d8a0ea5706b 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -77,6 +77,8 @@ #include "cdb/cdbendpoint.h" #include "replication/gp_replication.h" +#include "cdb/ml_ipc.h" + /* GUCs */ int shared_memory_type = DEFAULT_SHARED_MEMORY_TYPE; @@ -236,6 +238,9 @@ CreateSharedMemoryAndSemaphores(void) /* size of token and endpoint shared memory */ size = add_size(size, EndpointShmemSize()); + + + size = add_size(size, InterconnectShmemSize()); #ifndef USE_INTERNAL_FTS /* size of cdb etcd result cache */ if (Gp_role != GP_ROLE_EXECUTE) @@ -402,6 +407,7 @@ CreateSharedMemoryAndSemaphores(void) GpExpandVersionShmemInit(); KmgrShmemInit(); + InterconnectShmemInit(); #ifdef EXEC_BACKEND /* diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index c3583b146d7..486ea4fd830 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -76,3 +76,5 @@ LoginFailedSharedMemoryLock 66 GPIVMResLock 67 DirectoryTableLock 68 CommittedGxidArrayLock 69 + +ICStatisticsLock 70 diff --git a/src/include/cdb/ml_ipc.h b/src/include/cdb/ml_ipc.h index b39eaac60a2..62ace21b2e1 100644 --- a/src/include/cdb/ml_ipc.h +++ b/src/include/cdb/ml_ipc.h @@ -311,4 +311,11 @@ extern bool CheckGpInterconnectTypeStr(char **type_name); */ extern void InitializeCurrentMotionIPCLayer(void); + +extern Size InterconnectShmemSize(void); +extern Size InterconnectShmemSizeUDPIFC(void); +extern void InterconnectShmemInit(void); +extern void InterconnectShmemInitUDPIFC(void); +extern Datum GpInterconnectGetStatsUDPIFC(PG_FUNCTION_ARGS); + #endif /* ML_IPC_H */