Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions contrib/interconnect/ic_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ static bool interconnect_resowner_callback_registered;
* VISIBLE FUNCTIONS
*/


void
InterconnectShmemInit(void)
{
if (Gp_interconnect_type == INTERCONNECT_TYPE_UDPIFC)
InterconnectShmemInitUDPIFC();
}

Size
InterconnectShmemSize(void)
{
if (Gp_interconnect_type == INTERCONNECT_TYPE_UDPIFC)
return InterconnectShmemSizeUDPIFC();
return 0;
}

/* See ml_ipc.h */
bool
SendTupleChunkToAMS(ChunkTransportState * transportStates,
Expand Down
171 changes: 171 additions & 0 deletions contrib/interconnect/udp/ic_udpifc.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
#include "cdb/cdbdispatchresult.h"
#include "ic_faultinjection.h"


#include "funcapi.h"
#include "storage/lock.h"
#include "storage/pg_shmem.h"

#ifdef WIN32
#define WIN32_LEAN_AND_MEAN
#ifndef _WIN32_WINNT
Expand Down Expand Up @@ -714,6 +719,30 @@ typedef struct ICStatistics
/* Statistics for UDP interconnect. */
static ICStatistics ic_statistics;


typedef struct ICStatisticsShmem
{
uint64 totalRecvQueueSize;
uint64 recvQueueSizeCountingTime;
uint64 totalCapacity;
uint64 capacityCountingTime;
uint64 totalBuffers;
uint64 bufferCountingTime;
uint64 retransmits;
uint64 startupCachedPktNum;
uint64 mismatchNum;
uint64 crcErrors;
uint64 sndPktNum;
uint64 recvPktNum;
uint64 disorderedPktNum;
uint64 duplicatedPktNum;
uint64 recvAckNum;
uint64 statusQueryMsgNum;
} ICStatisticsShmem;

static ICStatisticsShmem *pICStatisticsShmem = NULL;


/* UDP listen fd */
int UDP_listenerFd;

Expand Down Expand Up @@ -1814,6 +1843,43 @@ ic_reset_pthread_sigmasks(sigset_t *sigs)
return;
}


Size
InterconnectShmemSizeUDPIFC(void) {
return sizeof(ICStatisticsShmem);
}

void
InterconnectShmemInitUDPIFC(void)
{
bool found;
pICStatisticsShmem = ShmemInitStruct("global interconnect statistics",
sizeof(ICStatisticsShmem), &found);
if (pICStatisticsShmem == NULL)
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("not enough shared memory for global interconnect statistics")));
if (!found)
{
pICStatisticsShmem->totalRecvQueueSize = 0;
pICStatisticsShmem->recvQueueSizeCountingTime = 0;
pICStatisticsShmem->totalCapacity = 0;
pICStatisticsShmem->capacityCountingTime = 0;
pICStatisticsShmem->totalBuffers = 0;
pICStatisticsShmem->bufferCountingTime = 0;
pICStatisticsShmem->retransmits = 0;
pICStatisticsShmem->startupCachedPktNum = 0;
pICStatisticsShmem->mismatchNum = 0;
pICStatisticsShmem->crcErrors = 0;
pICStatisticsShmem->sndPktNum = 0;
pICStatisticsShmem->recvPktNum = 0;
pICStatisticsShmem->disorderedPktNum = 0;
pICStatisticsShmem->duplicatedPktNum = 0;
pICStatisticsShmem->recvAckNum = 0;
pICStatisticsShmem->statusQueryMsgNum = 0;
}
}

/*
* InitMotionUDPIFC
* Initialize UDP specific comms, and create rx-thread.
Expand Down Expand Up @@ -3901,6 +3967,29 @@ chunkTransportStateEntryInitialized(ChunkTransportState *transportStates,
return pEntry->valid;
}


static void updateGlobalInterconnectStats(void)
{
LWLockAcquire(ICStatisticsLock, LW_EXCLUSIVE);
pICStatisticsShmem->totalRecvQueueSize += ic_statistics.totalRecvQueueSize;
pICStatisticsShmem->recvQueueSizeCountingTime += ic_statistics.recvQueueSizeCountingTime;
pICStatisticsShmem->totalCapacity += ic_statistics.totalCapacity;
pICStatisticsShmem->capacityCountingTime += ic_statistics.capacityCountingTime;
pICStatisticsShmem->totalBuffers += ic_statistics.totalBuffers;
pICStatisticsShmem->bufferCountingTime += ic_statistics.bufferCountingTime;
pICStatisticsShmem->retransmits += ic_statistics.retransmits;
pICStatisticsShmem->startupCachedPktNum += ic_statistics.startupCachedPktNum;
pICStatisticsShmem->mismatchNum += ic_statistics.mismatchNum;
pICStatisticsShmem->crcErrors += ic_statistics.crcErrors;
pICStatisticsShmem->sndPktNum += ic_statistics.sndPktNum;
pICStatisticsShmem->recvPktNum += ic_statistics.recvPktNum;
pICStatisticsShmem->disorderedPktNum += ic_statistics.disorderedPktNum;
pICStatisticsShmem->duplicatedPktNum += ic_statistics.duplicatedPktNum;
pICStatisticsShmem->recvAckNum += ic_statistics.recvAckNum;
pICStatisticsShmem->statusQueryMsgNum += ic_statistics.statusQueryMsgNum;
LWLockRelease(ICStatisticsLock);
}

/*
* computeNetworkStatistics
* Compute the max/min/avg network statistics.
Expand Down Expand Up @@ -4207,6 +4296,9 @@ TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates,
(minRtt == ~((uint64) 0) ? 0 : minRtt), (minDev == ~((uint64) 0) ? 0 : minDev), avgRtt, avgDev, maxRtt, maxDev,
snd_control_info.cwnd, ic_statistics.statusQueryMsgNum);


updateGlobalInterconnectStats();

ic_control_info.isSender = false;
memset(&ic_statistics, 0, sizeof(ICStatistics));

Expand Down Expand Up @@ -8224,3 +8316,82 @@ MlPutRxBufferIFC(ChunkTransportState *transportStates, int motNodeID, int route)
if (param.msg.len != 0)
sendAckWithParam(&param);
}


Datum
GpInterconnectGetStatsUDPIFC(PG_FUNCTION_ARGS)
{
/*
* Build a tuple descriptor for our result type
* The number and type of attributes have to match the definition of the
* view gp_interconnect_stats_per_segment
*/
#define NUM_IC_STATS_ELEM 17
TupleDesc tupdesc = CreateTemplateTupleDesc(NUM_IC_STATS_ELEM, false);

TupleDescInitEntry(tupdesc, (AttrNumber) 1, "segid",
INT2OID, -1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "total_recv_queue_size",
INT8OID, -1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "recv_queue_conting_time",
INT8OID, -1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "total_capacity",
INT8OID, -1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber) 5, "capacity_counting_time",
INT8OID, -1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber) 6, "total_buffers",
INT8OID, -1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber) 7, "buffer_counting_time",
INT8OID, -1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber) 8, "retransmits",
INT8OID, -1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "startup_cached_pkts",
INT8OID, -1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber) 10, "mismatches",
INT8OID, -1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber) 11, "crs_errors",
INT8OID, -1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber) 12, "snd_pkt_num",
INT8OID, -1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber) 13, "recv_pkt_num",
INT8OID, -1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber) 14, "disordered_pkt_num",
INT8OID, -1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber) 15, "duplicate_pkt_num",
INT8OID, -1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber) 16, "recv_ack_num",
INT8OID, -1 /* typmod */, 0 /* attdim */);
TupleDescInitEntry(tupdesc, (AttrNumber) 17, "status_query_msg_num",
INT8OID, -1 /* typmod */, 0 /* attdim */);

tupdesc = BlessTupleDesc(tupdesc);

Datum values[NUM_IC_STATS_ELEM];
bool nulls[NUM_IC_STATS_ELEM];
MemSet(nulls, 0, sizeof(nulls));

LWLockAcquire(ICStatisticsLock, LW_EXCLUSIVE);
values[0] = Int32GetDatum(GpIdentity.segindex);
values[1] = Int64GetDatum(pICStatisticsShmem->totalRecvQueueSize);
values[2] = Int64GetDatum(pICStatisticsShmem->recvQueueSizeCountingTime);
values[3] = Int64GetDatum(pICStatisticsShmem->totalCapacity);
values[4] = Int64GetDatum(pICStatisticsShmem->capacityCountingTime);
values[5] = Int64GetDatum(pICStatisticsShmem->totalBuffers);
values[6] = Int64GetDatum(pICStatisticsShmem->bufferCountingTime);
values[7] = Int64GetDatum(pICStatisticsShmem->retransmits);
values[8] = Int64GetDatum(pICStatisticsShmem->startupCachedPktNum);
values[9] = Int64GetDatum(pICStatisticsShmem->mismatchNum);
values[10] = Int64GetDatum(pICStatisticsShmem->crcErrors);
values[11] = Int64GetDatum(pICStatisticsShmem->sndPktNum);
values[12] = Int64GetDatum(pICStatisticsShmem->recvPktNum);
values[13] = Int64GetDatum(pICStatisticsShmem->disorderedPktNum);
values[14] = Int64GetDatum(pICStatisticsShmem->duplicatedPktNum);
values[15] = Int64GetDatum(pICStatisticsShmem->recvAckNum);
values[16] = Int64GetDatum(pICStatisticsShmem->statusQueryMsgNum);
LWLockRelease(ICStatisticsLock);

HeapTuple tuple = heap_form_tuple(tupdesc, values, nulls);
Datum result = HeapTupleGetDatum(tuple);

PG_RETURN_DATUM(result);
}
6 changes: 4 additions & 2 deletions gpcontrib/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@ ifeq "$(enable_debug_extensions)" "yes"
gp_legacy_string_agg \
gp_replica_check \
gp_toolkit \
pg_hint_plan
pg_hint_plan \
gp_interconnect_stats
else
recurse_targets = gp_sparse_vector \
gp_distribution_policy \
gp_internal_tools \
gp_legacy_string_agg \
gp_exttable_fdw \
gp_toolkit \
pg_hint_plan
pg_hint_plan \
gp_interconnect_stats
endif

ifeq "$(with_diskquota)" "yes"
Expand Down
23 changes: 23 additions & 0 deletions gpcontrib/gp_interconnect_stats/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
EXTENSION = gp_interconnect_stats
MODULES = gp_interconnect_stats

EXTENSION_VERSION = 1.0


DATA = gp_interconnect_stats--1.0.sql

OBJS = gp_interconnect_stats.o

MODULE_big = gp_interconnect_stats


ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
else
subdir = gpcontrib/gp_interconnect_stats
top_builddir = ../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif
Loading