Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build-cloudberry.yml
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ jobs:
"contrib/formatter_fixedwidth:installcheck",
"contrib/hstore:installcheck",
"contrib/indexscan:installcheck",
"contrib/interconnect:installcheck",
"contrib/pg_trgm:installcheck",
"contrib/indexscan:installcheck",
"contrib/pgcrypto:installcheck",
Expand Down
21 changes: 16 additions & 5 deletions contrib/btree_gist/btree_utils_var.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,36 +116,47 @@ gbt_var_leaf2node(GBT_VARKEY *leaf, const gbtree_vinfo *tinfo, FmgrInfo *flinfo)

/*
* returns the common prefix length of a node key
*
* If the underlying type is character data, the prefix length may point in
* the middle of a multibyte character.
*/
static int32
gbt_var_node_cp_len(const GBT_VARKEY *node, const gbtree_vinfo *tinfo)
{
GBT_VARKEY_R r = gbt_var_key_readable(node);
int32 i = 0;
int32 l = 0;
int32 l_left_to_match = 0;
int32 l_total = 0;
int32 t1len = VARSIZE(r.lower) - VARHDRSZ;
int32 t2len = VARSIZE(r.upper) - VARHDRSZ;
int32 ml = Min(t1len, t2len);
char *p1 = VARDATA(r.lower);
char *p2 = VARDATA(r.upper);
const char *end1 = p1 + t1len;
const char *end2 = p2 + t2len;

if (ml == 0)
return 0;

while (i < ml)
{
if (tinfo->eml > 1 && l == 0)
if (tinfo->eml > 1 && l_left_to_match == 0)
{
if ((l = pg_mblen(p1)) != pg_mblen(p2))
l_total = pg_mblen_range(p1, end1);
if (l_total != pg_mblen_range(p2, end2))
{
return i;
}
l_left_to_match = l_total;
}
if (*p1 != *p2)
{
if (tinfo->eml > 1)
{
return (i - l + 1);
int32 l_matched_subset = l_total - l_left_to_match;

/* end common prefix at final byte of last matching char */
return i - l_matched_subset;
}
else
{
Expand All @@ -155,7 +166,7 @@ gbt_var_node_cp_len(const GBT_VARKEY *node, const gbtree_vinfo *tinfo)

p1++;
p2++;
l--;
l_left_to_match--;
i++;
}
return ml; /* lower == upper */
Expand Down
8 changes: 4 additions & 4 deletions contrib/dict_xsyn/dict_xsyn.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ find_word(char *in, char **end)
char *start;

*end = NULL;
while (*in && t_isspace(in))
in += pg_mblen(in);
while (*in && t_isspace_cstr(in))
in += pg_mblen_cstr(in);

if (!*in || *in == '#')
return NULL;
start = in;

while (*in && !t_isspace(in))
in += pg_mblen(in);
while (*in && !t_isspace_cstr(in))
in += pg_mblen_cstr(in);

*end = in;

Expand Down
2 changes: 1 addition & 1 deletion contrib/hstore/hstore_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ prssyntaxerror(HSParser *state)
errsave(state->escontext,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("syntax error in hstore, near \"%.*s\" at position %d",
pg_mblen(state->ptr), state->ptr,
pg_mblen_cstr(state->ptr), state->ptr,
(int) (state->ptr - state->begin))));
/* In soft error situation, return false as convenience for caller */
return false;
Expand Down
14 changes: 13 additions & 1 deletion contrib/intarray/_int_selfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "catalog/pg_operator.h"
#include "catalog/pg_statistic.h"
#include "catalog/pg_type.h"
#include "commands/extension.h"
#include "miscadmin.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
Expand Down Expand Up @@ -171,7 +172,18 @@ _int_matchsel(PG_FUNCTION_ARGS)
PG_RETURN_FLOAT8(0.0);
}

/* The caller made sure the const is a query, so get it now */
/*
* Verify that the Const is a query_int, else return a default estimate.
* (This could only fail if someone attached this estimator to the wrong
* operator.)
*/
if (((Const *) other)->consttype !=
get_function_sibling_type(fcinfo->flinfo->fn_oid, "query_int"))
{
ReleaseVariableStats(vardata);
PG_RETURN_FLOAT8(DEFAULT_EQ_SEL);
}

query = DatumGetQueryTypeP(((Const *) other)->constvalue);

/* Empty query matches nothing */
Expand Down
6 changes: 6 additions & 0 deletions contrib/interconnect/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ include $(top_builddir)/contrib/interconnect/Makefile.interconnect
MODULE_big = interconnect
PGFILEDESC = "interconnect - inter connection module"

EXTENSION = interconnect
EXTENSION_VERSION = 1.0
DATA = interconnect--$(EXTENSION_VERSION).sql

OBJS = \
$(WIN32RES) \
ic_common.o \
Expand All @@ -33,6 +37,8 @@ OBJS += proxy/ic_proxy_iobuf.o
SHLIB_LINK += $(filter -luv, $(LIBS))
endif # enable_ic_proxy

REGRESS = interconnect

ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
Expand Down
32 changes: 32 additions & 0 deletions contrib/interconnect/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,35 @@ udpifc result:

Notice that: Lower TPS does not mean the protocol is slower, might means that the cpu time taken by the protocol is low. For the udpifc, it satisfies the highest tps required by `cbdb`. at the same time it occupies a lower cpu than other types of interconnect.

# interconnect statistics

This extension provides cumulative interconnect statistics for Cloudberry Database, including queue sizes, buffer usage, retransmits, packet errors, and other UDPIFC‑related metrics.

It exposes three views with statistics at different aggregation levels:
gp_interconnect_stats — total cluster‑wide stats;
gp_interconnect_stats_per_segment — stats per segment;
gp_interconnect_stats_per_host — stats grouped by host.

## How to create the extension

Add interconnect to shared_preload_libraries and restart the cluster.

```
gpconfig -c shared_preload_libraries -v \
"$(psql -At -c \
"SELECT array_to_string( \
array_append( \
string_to_array( \
current_setting('shared_preload_libraries'), \
','), \
'interconnect'), \
',')" \
postgres)"
gpstop -ra
```

Create the extension in your database.

```
CREATE EXTENSION interconnect;
```
82 changes: 82 additions & 0 deletions contrib/interconnect/expected/interconnect.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
-- Capture current interconnect stats as baseline for future comparisons
SELECT * FROM gp_interconnect_stats \gset prev_
-- Verify that all baseline interconnect statistics are >= 0 (no negative values)
SELECT
:prev_total_recv_queue_size >= 0,
:prev_recv_queue_conting_time >= 0,
:prev_total_capacity >= 0,
:prev_capacity_counting_time >= 0,
:prev_total_buffers >= 0,
:prev_buffer_counting_time >= 0,
:prev_retransmits >= 0,
:prev_startup_cached_pkts >= 0,
:prev_mismatches >= 0,
:prev_crs_errors >= 0,
:prev_snd_pkt_num >= 0,
:prev_recv_pkt_num >= 0,
:prev_disordered_pkt_num >= 0,
:prev_duplicate_pkt_num >= 0,
:prev_recv_ack_num >= 0,
:prev_status_query_msg_num >= 0;
?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column?
----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------
t | t | t | t | t | t | t | t | t | t | t | t | t | t | t | t
(1 row)

-- Create test table to generate interconnect traffic
CREATE TABLE test_ic_data
AS SELECT generate_series(1, 1000) AS id
DISTRIBUTED RANDOMLY;
-- Re-capture current state: overwrite prev with latest values
SELECT * FROM gp_interconnect_stats \gset prev2_
-- Check if current statistics are >= baseline values after first data insertion
SELECT
:prev2_total_recv_queue_size >= :prev_total_recv_queue_size,
:prev2_recv_queue_conting_time >= :prev_recv_queue_conting_time,
:prev2_total_capacity >= :prev_total_capacity,
:prev2_capacity_counting_time >= :prev_capacity_counting_time,
:prev2_total_buffers >= :prev_total_buffers,
:prev2_buffer_counting_time >= :prev_buffer_counting_time,
:prev2_retransmits >= :prev_retransmits,
:prev2_startup_cached_pkts >= :prev_startup_cached_pkts,
:prev2_mismatches >= :prev_mismatches,
:prev2_crs_errors >= :prev_crs_errors,
:prev2_snd_pkt_num >= :prev_snd_pkt_num,
:prev2_recv_pkt_num >= :prev_recv_pkt_num,
:prev2_disordered_pkt_num >= :prev_disordered_pkt_num,
:prev2_duplicate_pkt_num >= :prev_duplicate_pkt_num,
:prev2_recv_ack_num >= :prev_recv_ack_num,
:prev2_status_query_msg_num >= :prev_status_query_msg_num;
?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column?
----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------
t | t | t | t | t | t | t | t | t | t | t | t | t | t | t | t
(1 row)

-- Insert additional data to further test interconnect statistics changes under load
INSERT INTO test_ic_data SELECT generate_series(1001, 2000);
-- Re‑check if current statistics remain >= baseline after second data insertion
SELECT
total_recv_queue_size >= :prev2_total_recv_queue_size,
recv_queue_conting_time >= :prev2_recv_queue_conting_time,
total_capacity >= :prev2_total_capacity,
capacity_counting_time >= :prev2_capacity_counting_time,
total_buffers >= :prev2_total_buffers,
buffer_counting_time >= :prev2_buffer_counting_time,
retransmits >= :prev2_retransmits,
startup_cached_pkts >= :prev2_startup_cached_pkts,
mismatches >= :prev2_mismatches,
crs_errors >= :prev2_crs_errors,
snd_pkt_num >= :prev2_snd_pkt_num,
recv_pkt_num >= :prev2_recv_pkt_num,
disordered_pkt_num >= :prev2_disordered_pkt_num,
duplicate_pkt_num >= :prev2_duplicate_pkt_num,
recv_ack_num >= :prev2_recv_ack_num,
status_query_msg_num >= :prev2_status_query_msg_num
FROM gp_interconnect_stats;
?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column? | ?column?
----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------
t | t | t | t | t | t | t | t | t | t | t | t | t | t | t | t
(1 row)

DROP TABLE test_ic_data;
DROP EXTENSION interconnect;
25 changes: 25 additions & 0 deletions contrib/interconnect/ic_modules.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
#include "ic_common.h"
#include "tcp/ic_tcp.h"
#include "udp/ic_udpifc.h"
#include "storage/ipc.h"

#ifdef ENABLE_IC_PROXY
#include "proxy/ic_proxy_server.h"
#endif

PG_MODULE_MAGIC;

shmem_startup_hook_type prev_shmem_startup_hook = NULL;

MotionIPCLayer tcp_ipc_layer = {
.ic_type = INTERCONNECT_TYPE_TCP,
.type_name = "tcp",
Expand Down Expand Up @@ -141,6 +144,16 @@ MotionIPCLayer udpifc_ipc_layer = {
.GetMotionSentRecordTypmod = GetMotionSentRecordTypmod,
};

static void
InterconnectShmemInit(void)
{
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();

if (Gp_interconnect_type == INTERCONNECT_TYPE_UDPIFC)
InterconnectShmemInitUDPIFC();
}

void
_PG_init(void)
{
Expand All @@ -153,4 +166,16 @@ _PG_init(void)
RegisterIPCLayerImpl(&tcp_ipc_layer);
RegisterIPCLayerImpl(&udpifc_ipc_layer);
RegisterIPCLayerImpl(&proxy_ipc_layer);

if (Gp_interconnect_type == INTERCONNECT_TYPE_UDPIFC)
RequestAddinShmemSpace(sizeof(ICStatisticsShmem));

prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = InterconnectShmemInit;
}

void
_PG_fini(void)
{
shmem_startup_hook = prev_shmem_startup_hook;
}
1 change: 1 addition & 0 deletions contrib/interconnect/ic_modules.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ extern MotionIPCLayer proxy_ipc_layer;
extern MotionIPCLayer udpifc_ipc_layer;

extern void _PG_init(void);
extern void _PG_fini(void);

#endif // INTER_CONNECT_H
Loading
Loading