diff --git a/gpMgmt/sbin/gpsegrecovery.py b/gpMgmt/sbin/gpsegrecovery.py index ef1bc88738f..811a73ccbb5 100644 --- a/gpMgmt/sbin/gpsegrecovery.py +++ b/gpMgmt/sbin/gpsegrecovery.py @@ -39,26 +39,26 @@ def run(self): target_gp_dbid=self.recovery_info.target_segment_dbid, progress_file=self.recovery_info.progress_file) self.logger.info("Running pg_basebackup with progress output temporarily in %s" % self.recovery_info.progress_file) - # try: - cmd.run(validateAfter=True) - # except Exception as e: #TODO should this be ExecutionError? - # self.logger.info("Running pg_basebackup failed: {}".format(str(e))) - # - # # If the cluster never has mirrors, cmd will fail - # # quickly because the internal slot doesn't exist. - # # Re-run with `create_slot`. - # # GPDB_12_MERGE_FIXME could we check it before? or let - # # pg_basebackup create slot if not exists. - # cmd = PgBaseBackup(self.recovery_info.target_datadir, - # self.recovery_info.source_hostname, - # str(self.recovery_info.source_port), - # create_slot=True, - # replication_slot_name=self.replicationSlotName, - # forceoverwrite=True, - # target_gp_dbid=self.recovery_info.target_segment_dbid, - # progress_file=self.recovery_info.progress_file) - # self.logger.info("Re-running pg_basebackup, creating the slot this time") - # cmd.run(validateAfter=True) + try: + cmd.run(validateAfter=True) + except Exception as e: #TODO should this be ExecutionError? + self.logger.info("Running pg_basebackup failed: {}".format(str(e))) + + # If the cluster never has mirrors, cmd will fail + # quickly because the internal slot doesn't exist. + # Re-run with `create_slot`. + # GPDB_12_MERGE_FIXME could we check it before? or let + # pg_basebackup create slot if not exists. + cmd = PgBaseBackup(self.recovery_info.target_datadir, + self.recovery_info.source_hostname, + str(self.recovery_info.source_port), + create_slot=True, + replication_slot_name=self.replicationSlotName, + forceoverwrite=True, + target_gp_dbid=self.recovery_info.target_segment_dbid, + progress_file=self.recovery_info.progress_file) + self.logger.info("Re-running pg_basebackup, creating the slot this time") + cmd.run(validateAfter=True) self.error_type = RecoveryErrorType.DEFAULT_ERROR self.logger.info("Successfully ran pg_basebackup for dbid: {}".format( diff --git a/gpcontrib/gp_toolkit/sql/resource_manager_restore_to_none.sql b/gpcontrib/gp_toolkit/sql/resource_manager_restore_to_none.sql index e22c251c92a..a694a483f1f 100644 --- a/gpcontrib/gp_toolkit/sql/resource_manager_restore_to_none.sql +++ b/gpcontrib/gp_toolkit/sql/resource_manager_restore_to_none.sql @@ -5,7 +5,7 @@ \! echo $? -- start_ignore -\! gpstop -rai +\! gpstop -raf -- end_ignore \! echo $? diff --git a/src/backend/utils/activity/backend_status.c b/src/backend/utils/activity/backend_status.c index 36ae361b250..647c4482f09 100644 --- a/src/backend/utils/activity/backend_status.c +++ b/src/backend/utils/activity/backend_status.c @@ -729,6 +729,27 @@ pgstat_report_xact_timestamp(TimestampTz tstamp) PGSTAT_END_WRITE_ACTIVITY(beentry); } +/* ---------- + * pgstat_report_resgroup() - + * + * Called to update the resource group id in MyBEEntry. + * ---------- + */ +void +pgstat_report_resgroup(Oid groupId) +{ + volatile PgBackendStatus *beentry = MyBEEntry; + + if (!beentry) + return; + + PGSTAT_BEGIN_WRITE_ACTIVITY(beentry); + + beentry->st_rsgid = groupId; + + PGSTAT_END_WRITE_ACTIVITY(beentry); +} + /* ---------- * pgstat_read_current_status() - * diff --git a/src/backend/utils/resgroup/resgroup.c b/src/backend/utils/resgroup/resgroup.c index 987bae3c7ad..98284d27a09 100644 --- a/src/backend/utils/resgroup/resgroup.c +++ b/src/backend/utils/resgroup/resgroup.c @@ -53,6 +53,7 @@ #include "utils/memutils.h" #include "utils/ps_status.h" #include "utils/cgroup.h" +#include "utils/backend_status.h" #include "utils/resgroup.h" #include "utils/resource_manager.h" #include "utils/session_state.h" @@ -1290,8 +1291,7 @@ groupAcquireSlot(ResGroupInfo *pGroupInfo, bool isMoveQuery) /* got one, lucky */ group->totalExecuted++; LWLockRelease(ResGroupLock); - /* MERGE16_FIXME report data to pastat */ - //pgstat_report_resgroup(group->groupId); + pgstat_report_resgroup(group->groupId); return slot; } } @@ -1328,7 +1328,7 @@ groupAcquireSlot(ResGroupInfo *pGroupInfo, bool isMoveQuery) group->totalExecuted++; LWLockRelease(ResGroupLock); -// pgstat_report_resgroup(group->groupId); + pgstat_report_resgroup(group->groupId); return slot; } @@ -1567,7 +1567,7 @@ AssignResGroupOnMaster(void) /* Update pg_stat_activity statistics */ bypassedGroup->totalExecuted++; -// pgstat_report_resgroup(bypassedGroup->groupId); + pgstat_report_resgroup(bypassedGroup->groupId); /* Initialize the fake slot */ bypassedSlot.group = groupInfo.group; @@ -1633,7 +1633,7 @@ UnassignResGroup(void) bypassedGroup = NULL; /* Update pg_stat_activity statistics */ -// pgstat_report_resgroup(InvalidOid); + pgstat_report_resgroup(InvalidOid); return; } @@ -1668,7 +1668,7 @@ UnassignResGroup(void) if (Gp_role == GP_ROLE_DISPATCH) SIMPLE_FAULT_INJECTOR("unassign_resgroup_end_qd"); -// pgstat_report_resgroup(InvalidOid); + pgstat_report_resgroup(InvalidOid); } /* @@ -1812,7 +1812,7 @@ waitOnGroup(ResGroupData *group, bool isMoveQuery) * not enough to store a full Oid, so we set groupId out-of-band, * via the backend entry. */ -// pgstat_report_resgroup(group->groupId); + pgstat_report_resgroup(group->groupId); /* * Mark that we are waiting on resource group @@ -2448,7 +2448,6 @@ static void groupWaitQueuePush(ResGroupData *group, PGPROC *proc) { dclist_head *waitQueue; - PGPROC *headProc; Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE)); Assert(!procIsWaiting(proc)); @@ -2457,13 +2456,10 @@ groupWaitQueuePush(ResGroupData *group, PGPROC *proc) groupWaitQueueValidate(group); waitQueue = &group->waitProcs; - headProc = (PGPROC *) &waitQueue->dlist.head; - dclist_insert_before(waitQueue, &headProc->links, &proc->links); + dclist_push_tail(waitQueue, &proc->links); groupWaitProcValidate(proc, waitQueue); - waitQueue->count++; - Assert(groupWaitQueueFind(group, proc)); } @@ -2483,14 +2479,12 @@ groupWaitQueuePop(ResGroupData *group) waitQueue = &group->waitProcs; - proc = (PGPROC *) waitQueue->dlist.head.next; + proc = dclist_head_element(PGPROC, links, waitQueue); groupWaitProcValidate(proc, waitQueue); Assert(groupWaitQueueFind(group, proc)); Assert(proc->resSlot == NULL); - dclist_delete_from(waitQueue, &proc->links); - - waitQueue->count--; + dclist_delete_from_thoroughly(waitQueue, &proc->links); return proc; } @@ -2513,9 +2507,7 @@ groupWaitQueueErase(ResGroupData *group, PGPROC *proc) waitQueue = &group->waitProcs; groupWaitProcValidate(proc, waitQueue); - dclist_delete_from(waitQueue, &proc->links); - - waitQueue->count--; + dclist_delete_from_thoroughly(waitQueue, &proc->links); } /* @@ -2772,31 +2764,27 @@ resgroupDumpGroup(StringInfo str, ResGroupData *group) static void resgroupDumpWaitQueue(StringInfo str, dclist_head *queue) { - PGPROC *proc; + dlist_iter iter; + bool first = true; appendStringInfo(str, "\"wait_queue\":{"); appendStringInfo(str, "\"wait_queue_size\":%d,", queue->count); appendStringInfo(str, "\"wait_queue_content\":["); - proc = (PGPROC *)dclist_next_node(queue, &queue->dlist.head); - - if (!ShmemAddrIsValid(&proc->links)) + dclist_foreach(iter, queue) { - appendStringInfo(str, "]},"); - return; - } + PGPROC *proc = dlist_container(PGPROC, links, iter.cur); + + if (!first) + appendStringInfo(str, ","); + first = false; - while (proc) - { appendStringInfo(str, "{"); appendStringInfo(str, "\"pid\":%d,", proc->pid); appendStringInfo(str, "\"resWaiting\":%s,", procIsWaiting(proc) ? "true" : "false"); appendStringInfo(str, "\"resSlot\":%d", slotGetId(proc->resSlot)); appendStringInfo(str, "}"); - proc = (PGPROC *)dclist_next_node(queue, &queue->dlist.head); - if (proc) - appendStringInfo(str, ","); } appendStringInfo(str, "]},"); } @@ -3329,7 +3317,7 @@ HandleMoveResourceGroup(void) cgroupOpsRoutine->attachcgroup(self->groupId, MyProcPid, self->caps.cpuMaxPercent == CPU_MAX_PERCENT_DISABLED); -// pgstat_report_resgroup(self->groupId); + pgstat_report_resgroup(self->groupId); } /* @@ -3698,7 +3686,7 @@ check_and_unassign_from_resgroup(PlannedStmt* stmt) } while (!groupIncBypassedRef(&groupInfo)); bypassedGroup = groupInfo.group; - bypassedGroup->totalExecuted++; + pgstat_report_resgroup(bypassedGroup->groupId); bypassedSlot.group = groupInfo.group; bypassedSlot.groupId = groupInfo.groupId; diff --git a/src/include/utils/backend_status.h b/src/include/utils/backend_status.h index f01acf0b01d..d8fa3854ca1 100644 --- a/src/include/utils/backend_status.h +++ b/src/include/utils/backend_status.h @@ -325,6 +325,7 @@ extern void pgstat_report_query_id(uint64 query_id, bool force); extern void pgstat_report_tempfile(size_t filesize); extern void pgstat_report_appname(const char *appname); extern void pgstat_report_xact_timestamp(TimestampTz tstamp); +extern void pgstat_report_resgroup(Oid groupId); extern const char *pgstat_get_backend_current_activity(int pid, bool checkUser); extern const char *pgstat_get_crashed_backend_activity(int pid, char *buffer, int buflen); diff --git a/src/test/isolation2/expected/resgroup/resgroup_auxiliary_tools_v1.out b/src/test/isolation2/expected/resgroup/resgroup_auxiliary_tools_v1.out index 17bfbb23fee..6968a020ec3 100644 --- a/src/test/isolation2/expected/resgroup/resgroup_auxiliary_tools_v1.out +++ b/src/test/isolation2/expected/resgroup/resgroup_auxiliary_tools_v1.out @@ -117,10 +117,10 @@ CREATE 0: CREATE OR REPLACE FUNCTION is_session_in_group(pid integer, groupname text) RETURNS BOOL AS $$ import subprocess sql = "select sess_id from pg_stat_activity where pid = '%d'" % pid result = plpy.execute(sql) session_id = result[0]['sess_id'] sql = "select groupid from gp_toolkit.gp_resgroup_config where groupname='%s'" % groupname result = plpy.execute(sql) groupid = result[0]['groupid'] -sql = "select hostname from gp_segment_configuration group by hostname" result = plpy.execute(sql) hosts = [_['hostname'] for _ in result] -def get_result(host): stdout = subprocess.run(["ssh", "{}".format(host), "ps -ef | grep postgres | grep con{} | grep -v grep | awk '{{print $2}}'".format(session_id)], check=True, stdout=subprocess.PIPE).stdout session_pids = stdout.splitlines() -path = "/sys/fs/cgroup/cpu/gpdb/{}/cgroup.procs".format(groupid) stdout = subprocess.run(["ssh", "{}".format(host), "cat {}".format(path)], check=True, stdout=subprocess.PIPE).stdout cgroups_pids = stdout.splitlines() -return set(session_pids).issubset(set(cgroups_pids)) -for host in hosts: if not get_result(host): return False return True +sql = """select sc.hostname, array_agg(pa.pid::text) as pids from gp_stat_activity pa join gp_segment_configuration sc on pa.gp_segment_id = sc.content and sc.role = 'p' where pa.sess_id = %d group by sc.hostname""" % session_id result = plpy.execute(sql) +for row in result: host = row['hostname'] session_pids = set(row['pids']) +if not session_pids: continue +path = "/sys/fs/cgroup/cpu/gpdb/{}/cgroup.procs".format(groupid) stdout = subprocess.run(["ssh", "{}".format(host), "cat {}".format(path)], stdout=subprocess.PIPE, check=True).stdout cgroups_pids = set(stdout.decode().splitlines()) +if not session_pids.issubset(cgroups_pids): return False return True $$ LANGUAGE plpython3u; CREATE diff --git a/src/test/isolation2/expected/resgroup/resgroup_auxiliary_tools_v2.out b/src/test/isolation2/expected/resgroup/resgroup_auxiliary_tools_v2.out index 05a8ea81b73..92c2e1fa33f 100644 --- a/src/test/isolation2/expected/resgroup/resgroup_auxiliary_tools_v2.out +++ b/src/test/isolation2/expected/resgroup/resgroup_auxiliary_tools_v2.out @@ -133,11 +133,11 @@ CREATE 0: CREATE OR REPLACE FUNCTION is_session_in_group(pid integer, groupname text) RETURNS BOOL AS $$ import subprocess sql = "select sess_id from pg_stat_activity where pid = '%d'" % pid result = plpy.execute(sql) session_id = result[0]['sess_id'] sql = "select groupid from gp_toolkit.gp_resgroup_config where groupname='%s'" % groupname result = plpy.execute(sql) groupid = result[0]['groupid'] -sql = "select hostname from gp_segment_configuration group by hostname" result = plpy.execute(sql) hosts = [_['hostname'] for _ in result] -def get_result(host): stdout = subprocess.run(["ssh", "{}".format(host), "ps -ef | grep postgres | grep con{} | grep -v grep | awk '{{print $2}}'".format(session_id)], stdout=subprocess.PIPE, check=True).stdout session_pids = stdout.splitlines() -path = "/sys/fs/cgroup/gpdb/{}/queries/cgroup.procs".format(groupid) stdout = subprocess.run(["ssh", "{}".format(host), "cat {}".format(path)], stdout=subprocess.PIPE, check=True).stdout cgroups_pids = stdout.splitlines() -return set(session_pids).issubset(set(cgroups_pids)) -for host in hosts: if not get_result(host): return False return True +sql = """select sc.hostname, array_agg(pa.pid::text) as pids from gp_stat_activity pa join gp_segment_configuration sc on pa.gp_segment_id = sc.content and sc.role = 'p' where pa.sess_id = %d group by sc.hostname""" % session_id result = plpy.execute(sql) +for row in result: host = row['hostname'] session_pids = set(row['pids']) +if not session_pids: continue +path = "/sys/fs/cgroup/gpdb/{}/queries/cgroup.procs".format(groupid) stdout = subprocess.run(["ssh", "{}".format(host), "cat {}".format(path)], stdout=subprocess.PIPE, check=True).stdout cgroups_pids = set(stdout.decode().splitlines()) +if not session_pids.issubset(cgroups_pids): return False return True $$ LANGUAGE plpython3u; CREATE diff --git a/src/test/isolation2/expected/resgroup/resgroup_name_convention.out b/src/test/isolation2/expected/resgroup/resgroup_name_convention.out index 9f200bb2beb..b6bb2fbd084 100644 --- a/src/test/isolation2/expected/resgroup/resgroup_name_convention.out +++ b/src/test/isolation2/expected/resgroup/resgroup_name_convention.out @@ -388,7 +388,7 @@ LINE 1: CREATE RESOURCE GROUP 'must_fail' WITH (cpu_max_percent=10); -- does not support leading numbers CREATE RESOURCE GROUP 0_must_fail WITH (cpu_max_percent=10); -ERROR: syntax error at or near "0" +ERROR: trailing junk after numeric literal at or near "0_must_fail" LINE 1: CREATE RESOURCE GROUP 0_must_fail WITH (cpu_max_percent=10); ^ diff --git a/src/test/isolation2/expected/setup.out b/src/test/isolation2/expected/setup.out index 0c9c28cea9e..a59d0e0ceed 100644 --- a/src/test/isolation2/expected/setup.out +++ b/src/test/isolation2/expected/setup.out @@ -166,3 +166,6 @@ CREATE -- sequences start/end based on the concurrency level (see AOSegmentGet_startHeapBlock()) CREATE OR REPLACE FUNCTION populate_pages(relname text, value int, upto tid) RETURNS VOID AS $$ /* in func */ DECLARE curtid tid; /* in func */ BEGIN /* in func */ LOOP /* in func */ EXECUTE format('INSERT INTO %I VALUES($1) RETURNING ctid', relname) INTO curtid USING value; /* in func */ EXIT WHEN curtid > upto; /* in func */ END LOOP; /* in func */ END; $$ /* in func */ LANGUAGE PLPGSQL; CREATE + +GRANT ALL ON SCHEMA public TO public; +GRANT diff --git a/src/test/isolation2/sql/resgroup/resgroup_auxiliary_tools_v1.sql b/src/test/isolation2/sql/resgroup/resgroup_auxiliary_tools_v1.sql index 1e09cd70809..cd71081bae6 100644 --- a/src/test/isolation2/sql/resgroup/resgroup_auxiliary_tools_v1.sql +++ b/src/test/isolation2/sql/resgroup/resgroup_auxiliary_tools_v1.sql @@ -255,23 +255,26 @@ $$ LANGUAGE plpython3u; result = plpy.execute(sql) groupid = result[0]['groupid'] - sql = "select hostname from gp_segment_configuration group by hostname" + sql = """select sc.hostname, array_agg(pa.pid::text) as pids + from gp_stat_activity pa + join gp_segment_configuration sc + on pa.gp_segment_id = sc.content and sc.role = 'p' + where pa.sess_id = %d + group by sc.hostname""" % session_id result = plpy.execute(sql) - hosts = [_['hostname'] for _ in result] - def get_result(host): - stdout = subprocess.run(["ssh", "{}".format(host), "ps -ef | grep postgres | grep con{} | grep -v grep | awk '{{print $2}}'".format(session_id)], - check=True, stdout=subprocess.PIPE).stdout - session_pids = stdout.splitlines() + for row in result: + host = row['hostname'] + session_pids = set(row['pids']) - path = "/sys/fs/cgroup/cpu/gpdb/{}/cgroup.procs".format(groupid) - stdout = subprocess.run(["ssh", "{}".format(host), "cat {}".format(path)], check=True, stdout=subprocess.PIPE).stdout - cgroups_pids = stdout.splitlines() + if not session_pids: + continue - return set(session_pids).issubset(set(cgroups_pids)) + path = "/sys/fs/cgroup/cpu/gpdb/{}/cgroup.procs".format(groupid) + stdout = subprocess.run(["ssh", "{}".format(host), "cat {}".format(path)], stdout=subprocess.PIPE, check=True).stdout + cgroups_pids = set(stdout.decode().splitlines()) - for host in hosts: - if not get_result(host): + if not session_pids.issubset(cgroups_pids): return False return True diff --git a/src/test/isolation2/sql/resgroup/resgroup_auxiliary_tools_v2.sql b/src/test/isolation2/sql/resgroup/resgroup_auxiliary_tools_v2.sql index 38e0a24d3a3..9f123b0e5c1 100644 --- a/src/test/isolation2/sql/resgroup/resgroup_auxiliary_tools_v2.sql +++ b/src/test/isolation2/sql/resgroup/resgroup_auxiliary_tools_v2.sql @@ -261,23 +261,26 @@ $$ LANGUAGE plpython3u; result = plpy.execute(sql) groupid = result[0]['groupid'] - sql = "select hostname from gp_segment_configuration group by hostname" + sql = """select sc.hostname, array_agg(pa.pid::text) as pids + from gp_stat_activity pa + join gp_segment_configuration sc + on pa.gp_segment_id = sc.content and sc.role = 'p' + where pa.sess_id = %d + group by sc.hostname""" % session_id result = plpy.execute(sql) - hosts = [_['hostname'] for _ in result] - def get_result(host): - stdout = subprocess.run(["ssh", "{}".format(host), "ps -ef | grep postgres | grep con{} | grep -v grep | awk '{{print $2}}'".format(session_id)], - stdout=subprocess.PIPE, check=True).stdout - session_pids = stdout.splitlines() + for row in result: + host = row['hostname'] + session_pids = set(row['pids']) + + if not session_pids: + continue path = "/sys/fs/cgroup/gpdb/{}/queries/cgroup.procs".format(groupid) stdout = subprocess.run(["ssh", "{}".format(host), "cat {}".format(path)], stdout=subprocess.PIPE, check=True).stdout - cgroups_pids = stdout.splitlines() - - return set(session_pids).issubset(set(cgroups_pids)) + cgroups_pids = set(stdout.decode().splitlines()) - for host in hosts: - if not get_result(host): + if not session_pids.issubset(cgroups_pids): return False return True diff --git a/src/test/isolation2/sql/setup.sql b/src/test/isolation2/sql/setup.sql index 949f22f0002..367a0d94a7b 100644 --- a/src/test/isolation2/sql/setup.sql +++ b/src/test/isolation2/sql/setup.sql @@ -489,3 +489,5 @@ EXIT WHEN curtid > upto; /* in func */ END LOOP; /* in func */ END; $$ /* in func */ LANGUAGE PLPGSQL; + +GRANT ALL ON SCHEMA public TO public; \ No newline at end of file diff --git a/src/test/regress/expected/memoize.out b/src/test/regress/expected/memoize.out index dd5328211ef..96e6737be27 100644 --- a/src/test/regress/expected/memoize.out +++ b/src/test/regress/expected/memoize.out @@ -257,6 +257,7 @@ INSERT INTO prt VALUES (10), (10), (10), (10); CREATE INDEX iprt_p1_a ON prt_p1 (a); CREATE INDEX iprt_p2_a ON prt_p2 (a); ANALYZE prt; +SET enable_parallel = false; SELECT explain_memoize(' SELECT * FROM prt t1 INNER JOIN prt t2 ON t1.a = t2.a;', false); explain_memoize @@ -278,6 +279,7 @@ SELECT * FROM prt t1 INNER JOIN prt t2 ON t1.a = t2.a;', false); Optimizer: Postgres query optimizer (15 rows) +RESET enable_parallel; -- Ensure memoize works with parameterized union-all Append path SET enable_partitionwise_join TO off; SELECT explain_memoize(' diff --git a/src/test/regress/expected/memoize_1.out b/src/test/regress/expected/memoize_1.out deleted file mode 100644 index 685615e4013..00000000000 --- a/src/test/regress/expected/memoize_1.out +++ /dev/null @@ -1,402 +0,0 @@ --- Perform tests on the Memoize node. --- GPDB_14_MERGE_FIXME: --- 1.test memoize in CBDB as enable_nestloop is false by default --- 2.enable memoize in orca --- The cache hits/misses/evictions from the Memoize node can vary between --- machines. Let's just replace the number with an 'N'. In order to allow us --- to perform validation when the measure was zero, we replace a zero value --- with "Zero". All other numbers are replaced with 'N'. -create function explain_memoize(query text, hide_hitmiss bool) returns setof text -language plpgsql as -$$ -declare - ln text; -begin - for ln in - execute format('explain (analyze, costs off, summary off, timing off) %s', - query) - loop - if hide_hitmiss = true then - ln := regexp_replace(ln, 'Hits: 0', 'Hits: Zero'); - ln := regexp_replace(ln, 'Hits: \d+', 'Hits: N'); - ln := regexp_replace(ln, 'Misses: 0', 'Misses: Zero'); - ln := regexp_replace(ln, 'Misses: \d+', 'Misses: N'); - end if; - ln := regexp_replace(ln, 'Evictions: 0', 'Evictions: Zero'); - ln := regexp_replace(ln, 'Evictions: \d+', 'Evictions: N'); - ln := regexp_replace(ln, 'Memory Usage: \d+', 'Memory Usage: N'); - ln := regexp_replace(ln, 'Memory: \d+', 'Memory: N'); - ln := regexp_replace(ln, 'Heap Fetches: \d+', 'Heap Fetches: N'); - ln := regexp_replace(ln, 'loops=\d+', 'loops=N'); - return next ln; - end loop; -end; -$$; --- Ensure we get a memoize node on the inner side of the nested loop -SET optimizer_enable_hashjoin TO off; -SET optimizer_enable_bitmapscan TO off; -SET enable_hashjoin TO off; -SET enable_bitmapscan TO off; -SELECT explain_memoize(' -SELECT COUNT(*),AVG(t1.unique1) FROM tenk1 t1 -INNER JOIN tenk1 t2 ON t1.unique1 = t2.twenty -WHERE t2.unique1 < 1000;', false); - explain_memoize -------------------------------------------------------------------------------------------------------- - Finalize Aggregate (actual rows=1 loops=N) - -> Gather Motion 3:1 (slice1; segments: 3) (actual rows=3 loops=N) - -> Partial Aggregate (actual rows=1 loops=N) - -> Nested Loop (actual rows=400 loops=N) - -> Redistribute Motion 3:3 (slice2; segments: 3) (actual rows=400 loops=N) - Hash Key: t2.twenty - -> Seq Scan on tenk1 t2 (actual rows=340 loops=N) - Filter: (unique1 < 1000) - Rows Removed by Filter: 2906 - -> Memoize (actual rows=1 loops=N) - Cache Key: t2.twenty - Cache Mode: logical - -> Index Only Scan using tenk1_unique1 on tenk1 t1 (actual rows=1 loops=N) - Index Cond: (unique1 = t2.twenty) - Heap Fetches: N - Optimizer: Postgres query optimizer -(16 rows) - --- And check we get the expected results. -SELECT COUNT(*),AVG(t1.unique1) FROM tenk1 t1 -INNER JOIN tenk1 t2 ON t1.unique1 = t2.twenty -WHERE t2.unique1 < 1000; - count | avg --------+-------------------- - 1000 | 9.5000000000000000 -(1 row) - --- Try with LATERAL joins -SELECT explain_memoize(' -SELECT COUNT(*),AVG(t2.unique1) FROM tenk1 t1, -LATERAL (SELECT t2.unique1 FROM tenk1 t2 - WHERE t1.twenty = t2.unique1 OFFSET 0) t2 -WHERE t1.unique1 < 1000;', false); - explain_memoize ----------------------------------------------------------------------------------------------------------- - Aggregate (actual rows=1 loops=N) - -> Nested Loop (actual rows=1000 loops=N) - -> Gather Motion 3:1 (slice1; segments: 3) (actual rows=1000 loops=N) - -> Seq Scan on tenk1 t1 (actual rows=340 loops=N) - Filter: (unique1 < 1000) - Rows Removed by Filter: 2906 - -> Materialize (actual rows=1 loops=N) - -> Memoize (actual rows=1 loops=N) - Cache Key: t1.twenty - Cache Mode: binary - Hits: 980 Misses: 20 Evictions: Zero Overflows: 0 Memory Usage: NkB - -> Result (actual rows=1 loops=N) - Filter: (t1.twenty = t2.unique1) - -> Materialize (actual rows=10000 loops=N) - -> Gather Motion 3:1 (slice2; segments: 3) (actual rows=10000 loops=N) - -> Seq Scan on tenk1 t2 (actual rows=3386 loops=N) - Optimizer: Postgres query optimizer -(17 rows) - --- And check we get the expected results. -SELECT COUNT(*),AVG(t2.unique1) FROM tenk1 t1, -LATERAL (SELECT t2.unique1 FROM tenk1 t2 - WHERE t1.twenty = t2.unique1 OFFSET 0) t2 -WHERE t1.unique1 < 1000; - count | avg --------+-------------------- - 1000 | 9.5000000000000000 -(1 row) - -SET enable_mergejoin TO off; --- Test for varlena datatype with expr evaluation -CREATE TABLE expr_key (x numeric, t text); -INSERT INTO expr_key (x, t) -SELECT d1::numeric, d1::text FROM ( - SELECT round((d / pi())::numeric, 7) AS d1 FROM generate_series(1, 20) AS d -) t; --- duplicate rows so we get some cache hits -INSERT INTO expr_key SELECT * FROM expr_key; -CREATE INDEX expr_key_idx_x_t ON expr_key (x, t); -VACUUM ANALYZE expr_key; --- Ensure we get we get a cache miss and hit for each of the 20 distinct values -SELECT explain_memoize(' -SELECT * FROM expr_key t1 INNER JOIN expr_key t2 -ON t1.x = t2.t::numeric AND t1.t::numeric = t2.x;', false); - explain_memoize -------------------------------------------------------------------------------------------------- - Gather Motion 3:1 (slice1; segments: 3) (actual rows=80 loops=N) - -> Merge Join (actual rows=28 loops=N) - Merge Cond: ((t1.x = ((t2.t)::numeric)) AND (((t1.t)::numeric) = t2.x)) - -> Sort (actual rows=14 loops=N) - Sort Key: t1.x, ((t1.t)::numeric) - Sort Method: quicksort Memory: NkB - -> Seq Scan on expr_key t1 (actual rows=14 loops=N) - -> Sort (actual rows=27 loops=N) - Sort Key: ((t2.t)::numeric), t2.x - Sort Method: quicksort Memory: NkB - -> Result (actual rows=14 loops=N) - -> Redistribute Motion 3:3 (slice2; segments: 3) (actual rows=14 loops=N) - Hash Key: (t2.t)::numeric - -> Seq Scan on expr_key t2 (actual rows=14 loops=N) - Optimizer: Postgres query optimizer -(15 rows) - -DROP TABLE expr_key; --- Reduce work_mem and hash_mem_multiplier so that we see some cache evictions -SET work_mem TO '64kB'; -SET hash_mem_multiplier TO 1.0; --- Ensure we get some evictions. We're unable to validate the hits and misses --- here as the number of entries that fit in the cache at once will vary --- between different machines. -SELECT explain_memoize(' -SELECT COUNT(*),AVG(t1.unique1) FROM tenk1 t1 -INNER JOIN tenk1 t2 ON t1.unique1 = t2.thousand -WHERE t2.unique1 < 1200;', true); - explain_memoize --------------------------------------------------------------------------------------------------------- - Finalize Aggregate (actual rows=1 loops=N) - -> Gather Motion 3:1 (slice1; segments: 3) (actual rows=3 loops=N) - -> Partial Aggregate (actual rows=1 loops=N) - -> Merge Join (actual rows=407 loops=N) - Merge Cond: (t1.unique1 = t2.thousand) - -> Index Only Scan using tenk1_unique1 on tenk1 t1 (actual rows=341 loops=N) - Heap Fetches: N - -> Sort (actual rows=407 loops=N) - Sort Key: t2.thousand - Sort Method: quicksort Memory: NkB - -> Redistribute Motion 3:3 (slice2; segments: 3) (actual rows=407 loops=N) - Hash Key: t2.thousand - -> Seq Scan on tenk1 t2 (actual rows=407 loops=N) - Filter: (unique1 < 1200) - Rows Removed by Filter: 2961 - Optimizer: Postgres query optimizer -(16 rows) - -CREATE TABLE flt (f float); -CREATE INDEX flt_f_idx ON flt (f); -INSERT INTO flt VALUES('-0.0'::float),('+0.0'::float); -ANALYZE flt; -SET enable_seqscan TO off; --- Ensure memoize operates in logical mode -SELECT explain_memoize(' -SELECT * FROM flt f1 INNER JOIN flt f2 ON f1.f = f2.f;', false); - explain_memoize -------------------------------------------------------------------------------- - Gather Motion 3:1 (slice1; segments: 3) (actual rows=4 loops=N) - -> Nested Loop (actual rows=4 loops=N) - -> Index Only Scan using flt_f_idx on flt f1 (actual rows=2 loops=N) - Heap Fetches: N - -> Index Only Scan using flt_f_idx on flt f2 (actual rows=2 loops=N) - Index Cond: (f = f1.f) - Heap Fetches: N - Optimizer: Postgres query optimizer -(8 rows) - --- Ensure memoize operates in binary mode -SELECT explain_memoize(' -SELECT * FROM flt f1 INNER JOIN flt f2 ON f1.f >= f2.f;', false); - explain_memoize -------------------------------------------------------------------------------------- - Gather Motion 3:1 (slice1; segments: 3) (actual rows=4 loops=N) - -> Nested Loop (actual rows=4 loops=N) - -> Broadcast Motion 3:3 (slice2; segments: 3) (actual rows=2 loops=N) - -> Index Only Scan using flt_f_idx on flt f1 (actual rows=2 loops=N) - Heap Fetches: N - -> Index Only Scan using flt_f_idx on flt f2 (actual rows=2 loops=N) - Index Cond: (f <= f1.f) - Heap Fetches: N - Optimizer: Postgres query optimizer -(9 rows) - -DROP TABLE flt; --- Exercise Memoize in binary mode with a large fixed width type and a --- varlena type. -CREATE TABLE strtest (n name, t text); -CREATE INDEX strtest_n_idx ON strtest (n); -CREATE INDEX strtest_t_idx ON strtest (t); -INSERT INTO strtest VALUES('one','one'),('two','two'),('three',repeat(fipshash('three'),100)); --- duplicate rows so we get some cache hits -INSERT INTO strtest SELECT * FROM strtest; -ANALYZE strtest; --- Ensure we get 3 hits and 3 misses -SELECT explain_memoize(' -SELECT * FROM strtest s1 INNER JOIN strtest s2 ON s1.n >= s2.n;', false); - explain_memoize ----------------------------------------------------------------------------------- - Gather Motion 3:1 (slice1; segments: 3) (actual rows=24 loops=N) - -> Nested Loop (actual rows=12 loops=N) - -> Broadcast Motion 3:3 (slice2; segments: 3) (actual rows=6 loops=N) - -> Seq Scan on strtest s1 (actual rows=4 loops=N) - -> Index Scan using strtest_n_idx on strtest s2 (actual rows=2 loops=N) - Index Cond: (n <= s1.n) - Optimizer: Postgres query optimizer -(7 rows) - --- Ensure we get 3 hits and 3 misses -SELECT explain_memoize(' -SELECT * FROM strtest s1 INNER JOIN strtest s2 ON s1.t >= s2.t;', false); - explain_memoize ----------------------------------------------------------------------------------- - Gather Motion 3:1 (slice1; segments: 3) (actual rows=24 loops=N) - -> Nested Loop (actual rows=16 loops=N) - -> Broadcast Motion 3:3 (slice2; segments: 3) (actual rows=6 loops=N) - -> Seq Scan on strtest s1 (actual rows=4 loops=N) - -> Index Scan using strtest_t_idx on strtest s2 (actual rows=3 loops=N) - Index Cond: (t <= s1.t) - Optimizer: Postgres query optimizer -(7 rows) - -DROP TABLE strtest; --- Ensure memoize works with partitionwise join -SET enable_partitionwise_join TO on; -CREATE TABLE prt (a int) PARTITION BY RANGE(a); -CREATE TABLE prt_p1 PARTITION OF prt FOR VALUES FROM (0) TO (10); -CREATE TABLE prt_p2 PARTITION OF prt FOR VALUES FROM (10) TO (20); -INSERT INTO prt VALUES (0), (0), (0), (0); -INSERT INTO prt VALUES (10), (10), (10), (10); -CREATE INDEX iprt_p1_a ON prt_p1 (a); -CREATE INDEX iprt_p2_a ON prt_p2 (a); -ANALYZE prt; -SELECT explain_memoize(' -SELECT * FROM prt t1 INNER JOIN prt t2 ON t1.a = t2.a;', false); - explain_memoize ------------------------------------------------------------------------------------------- - Gather Motion 6:1 (slice1; segments: 6) (actual rows=32 loops=N) - -> Parallel Append (actual rows=16 loops=N) - -> Nested Loop (actual rows=16 loops=N) - -> Index Only Scan using iprt_p1_a on prt_p1 t1_1 (actual rows=4 loops=N) - Heap Fetches: N - -> Index Only Scan using iprt_p1_a on prt_p1 t2_1 (actual rows=4 loops=N) - Index Cond: (a = t1_1.a) - Heap Fetches: N - -> Nested Loop (actual rows=16 loops=N) - -> Index Only Scan using iprt_p2_a on prt_p2 t1_2 (actual rows=4 loops=N) - Heap Fetches: N - -> Index Only Scan using iprt_p2_a on prt_p2 t2_2 (actual rows=4 loops=N) - Index Cond: (a = t1_2.a) - Heap Fetches: N - Optimizer: Postgres query optimizer -(15 rows) - --- Ensure memoize works with parameterized union-all Append path -SET enable_partitionwise_join TO off; -SELECT explain_memoize(' -SELECT * FROM prt_p1 t1 INNER JOIN -(SELECT * FROM prt_p1 UNION ALL SELECT * FROM prt_p2) t2 -ON t1.a = t2.a;', false); - explain_memoize -------------------------------------------------------------------------------------- - Gather Motion 3:1 (slice1; segments: 3) (actual rows=16 loops=N) - -> Nested Loop (actual rows=16 loops=N) - -> Index Only Scan using iprt_p1_a on prt_p1 t1 (actual rows=4 loops=N) - Heap Fetches: N - -> Append (actual rows=4 loops=N) - -> Index Only Scan using iprt_p1_a on prt_p1 (actual rows=4 loops=N) - Index Cond: (a = t1.a) - Heap Fetches: N - -> Index Only Scan using iprt_p2_a on prt_p2 (actual rows=0 loops=N) - Index Cond: (a = t1.a) - Heap Fetches: N - Optimizer: Postgres query optimizer -(12 rows) - -DROP TABLE prt; -RESET enable_partitionwise_join; --- Exercise Memoize code that flushes the cache when a parameter changes which --- is not part of the cache key. --- Ensure we get a Memoize plan -EXPLAIN (COSTS OFF) -SELECT unique1 FROM tenk1 t0 -WHERE unique1 < 3 - AND EXISTS ( - SELECT 1 FROM tenk1 t1 - INNER JOIN tenk1 t2 ON t1.unique1 = t2.hundred - WHERE t0.ten = t1.twenty AND t0.two <> t2.four OFFSET 0); - QUERY PLAN ----------------------------------------------------------------------------------------------- - Gather Motion 3:1 (slice1; segments: 3) - -> Index Scan using tenk1_unique1 on tenk1 t0 - Index Cond: (unique1 < 3) - Filter: (SubPlan 1) - SubPlan 1 - -> Merge Join - Merge Cond: (t2.hundred = t1.unique1) - -> Sort - Sort Key: t2.hundred - -> Result - Filter: (t0.two <> t2.four) - -> Materialize - -> Broadcast Motion 3:3 (slice2; segments: 3) - -> Index Scan using tenk1_hundred on tenk1 t2 - -> Materialize - -> Sort - Sort Key: t1.unique1 - -> Result - Filter: (t0.ten = t1.twenty) - -> Materialize - -> Broadcast Motion 3:3 (slice3; segments: 3) - -> Index Scan using tenk1_unique1 on tenk1 t1 - Optimizer: Postgres query optimizer -(23 rows) - --- Ensure the above query returns the correct result -SELECT unique1 FROM tenk1 t0 -WHERE unique1 < 3 - AND EXISTS ( - SELECT 1 FROM tenk1 t1 - INNER JOIN tenk1 t2 ON t1.unique1 = t2.hundred - WHERE t0.ten = t1.twenty AND t0.two <> t2.four OFFSET 0); - unique1 ---------- - 2 -(1 row) - -RESET enable_seqscan; -RESET enable_mergejoin; -RESET work_mem; -RESET enable_bitmapscan; -RESET enable_hashjoin; -RESET optimizer_enable_hashjoin; -RESET optimizer_enable_bitmapscan; -RESET hash_mem_multiplier; --- Test parallel plans with Memoize -SET min_parallel_table_scan_size TO 0; -SET parallel_setup_cost TO 0; -SET parallel_tuple_cost TO 0; -SET max_parallel_workers_per_gather TO 2; --- Ensure we get a parallel plan. -EXPLAIN (COSTS OFF) -SELECT COUNT(*),AVG(t2.unique1) FROM tenk1 t1, -LATERAL (SELECT t2.unique1 FROM tenk1 t2 WHERE t1.twenty = t2.unique1) t2 -WHERE t1.unique1 < 1000; - QUERY PLAN ------------------------------------------------------------------------------- - Finalize Aggregate - -> Gather Motion 3:1 (slice1; segments: 3) - -> Partial Aggregate - -> Hash Join - Hash Cond: (t2.unique1 = t1.twenty) - -> Seq Scan on tenk1 t2 - -> Hash - -> Redistribute Motion 3:3 (slice2; segments: 3) - Hash Key: t1.twenty - -> Bitmap Heap Scan on tenk1 t1 - Recheck Cond: (unique1 < 1000) - -> Bitmap Index Scan on tenk1_unique1 - Index Cond: (unique1 < 1000) - Optimizer: Postgres query optimizer -(14 rows) - --- And ensure the parallel plan gives us the correct results. -SELECT COUNT(*),AVG(t2.unique1) FROM tenk1 t1, -LATERAL (SELECT t2.unique1 FROM tenk1 t2 WHERE t1.twenty = t2.unique1) t2 -WHERE t1.unique1 < 1000; - count | avg --------+-------------------- - 1000 | 9.5000000000000000 -(1 row) - -RESET max_parallel_workers_per_gather; -RESET parallel_tuple_cost; -RESET parallel_setup_cost; -RESET min_parallel_table_scan_size; diff --git a/src/test/regress/sql/memoize.sql b/src/test/regress/sql/memoize.sql index 985cc7fd095..683889d8814 100644 --- a/src/test/regress/sql/memoize.sql +++ b/src/test/regress/sql/memoize.sql @@ -146,8 +146,10 @@ CREATE INDEX iprt_p1_a ON prt_p1 (a); CREATE INDEX iprt_p2_a ON prt_p2 (a); ANALYZE prt; +SET enable_parallel = false; SELECT explain_memoize(' SELECT * FROM prt t1 INNER JOIN prt t2 ON t1.a = t2.a;', false); +RESET enable_parallel; -- Ensure memoize works with parameterized union-all Append path SET enable_partitionwise_join TO off;