Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions gpMgmt/sbin/gpsegrecovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,26 @@ def run(self):
target_gp_dbid=self.recovery_info.target_segment_dbid,
progress_file=self.recovery_info.progress_file)
self.logger.info("Running pg_basebackup with progress output temporarily in %s" % self.recovery_info.progress_file)
# try:
cmd.run(validateAfter=True)
# except Exception as e: #TODO should this be ExecutionError?
# self.logger.info("Running pg_basebackup failed: {}".format(str(e)))
#
# # If the cluster never has mirrors, cmd will fail
# # quickly because the internal slot doesn't exist.
# # Re-run with `create_slot`.
# # GPDB_12_MERGE_FIXME could we check it before? or let
# # pg_basebackup create slot if not exists.
# cmd = PgBaseBackup(self.recovery_info.target_datadir,
# self.recovery_info.source_hostname,
# str(self.recovery_info.source_port),
# create_slot=True,
# replication_slot_name=self.replicationSlotName,
# forceoverwrite=True,
# target_gp_dbid=self.recovery_info.target_segment_dbid,
# progress_file=self.recovery_info.progress_file)
# self.logger.info("Re-running pg_basebackup, creating the slot this time")
# cmd.run(validateAfter=True)
try:
cmd.run(validateAfter=True)
except Exception as e: #TODO should this be ExecutionError?
self.logger.info("Running pg_basebackup failed: {}".format(str(e)))

# If the cluster never has mirrors, cmd will fail
# quickly because the internal slot doesn't exist.
# Re-run with `create_slot`.
# GPDB_12_MERGE_FIXME could we check it before? or let
# pg_basebackup create slot if not exists.
cmd = PgBaseBackup(self.recovery_info.target_datadir,
self.recovery_info.source_hostname,
str(self.recovery_info.source_port),
create_slot=True,
replication_slot_name=self.replicationSlotName,
forceoverwrite=True,
target_gp_dbid=self.recovery_info.target_segment_dbid,
progress_file=self.recovery_info.progress_file)
self.logger.info("Re-running pg_basebackup, creating the slot this time")
cmd.run(validateAfter=True)

self.error_type = RecoveryErrorType.DEFAULT_ERROR
self.logger.info("Successfully ran pg_basebackup for dbid: {}".format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
\! echo $?

-- start_ignore
\! gpstop -rai
\! gpstop -raf
-- end_ignore

\! echo $?
21 changes: 21 additions & 0 deletions src/backend/utils/activity/backend_status.c
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,27 @@ pgstat_report_xact_timestamp(TimestampTz tstamp)
PGSTAT_END_WRITE_ACTIVITY(beentry);
}

/* ----------
* pgstat_report_resgroup() -
*
* Called to update the resource group id in MyBEEntry.
* ----------
*/
void
pgstat_report_resgroup(Oid groupId)
{
volatile PgBackendStatus *beentry = MyBEEntry;

if (!beentry)
return;

PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);

beentry->st_rsgid = groupId;

PGSTAT_END_WRITE_ACTIVITY(beentry);
}

/* ----------
* pgstat_read_current_status() -
*
Expand Down
54 changes: 21 additions & 33 deletions src/backend/utils/resgroup/resgroup.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/cgroup.h"
#include "utils/backend_status.h"
#include "utils/resgroup.h"
#include "utils/resource_manager.h"
#include "utils/session_state.h"
Expand Down Expand Up @@ -1290,8 +1291,7 @@ groupAcquireSlot(ResGroupInfo *pGroupInfo, bool isMoveQuery)
/* got one, lucky */
group->totalExecuted++;
LWLockRelease(ResGroupLock);
/* MERGE16_FIXME report data to pastat */
//pgstat_report_resgroup(group->groupId);
pgstat_report_resgroup(group->groupId);
return slot;
}
}
Expand Down Expand Up @@ -1328,7 +1328,7 @@ groupAcquireSlot(ResGroupInfo *pGroupInfo, bool isMoveQuery)
group->totalExecuted++;
LWLockRelease(ResGroupLock);

// pgstat_report_resgroup(group->groupId);
pgstat_report_resgroup(group->groupId);
return slot;
}

Expand Down Expand Up @@ -1567,7 +1567,7 @@ AssignResGroupOnMaster(void)

/* Update pg_stat_activity statistics */
bypassedGroup->totalExecuted++;
// pgstat_report_resgroup(bypassedGroup->groupId);
pgstat_report_resgroup(bypassedGroup->groupId);

/* Initialize the fake slot */
bypassedSlot.group = groupInfo.group;
Expand Down Expand Up @@ -1633,7 +1633,7 @@ UnassignResGroup(void)
bypassedGroup = NULL;

/* Update pg_stat_activity statistics */
// pgstat_report_resgroup(InvalidOid);
pgstat_report_resgroup(InvalidOid);
return;
}

Expand Down Expand Up @@ -1668,7 +1668,7 @@ UnassignResGroup(void)
if (Gp_role == GP_ROLE_DISPATCH)
SIMPLE_FAULT_INJECTOR("unassign_resgroup_end_qd");

// pgstat_report_resgroup(InvalidOid);
pgstat_report_resgroup(InvalidOid);
}

/*
Expand Down Expand Up @@ -1812,7 +1812,7 @@ waitOnGroup(ResGroupData *group, bool isMoveQuery)
* not enough to store a full Oid, so we set groupId out-of-band,
* via the backend entry.
*/
// pgstat_report_resgroup(group->groupId);
pgstat_report_resgroup(group->groupId);

/*
* Mark that we are waiting on resource group
Expand Down Expand Up @@ -2448,7 +2448,6 @@ static void
groupWaitQueuePush(ResGroupData *group, PGPROC *proc)
{
dclist_head *waitQueue;
PGPROC *headProc;

Assert(LWLockHeldByMeInMode(ResGroupLock, LW_EXCLUSIVE));
Assert(!procIsWaiting(proc));
Expand All @@ -2457,13 +2456,10 @@ groupWaitQueuePush(ResGroupData *group, PGPROC *proc)
groupWaitQueueValidate(group);

waitQueue = &group->waitProcs;
headProc = (PGPROC *) &waitQueue->dlist.head;

dclist_insert_before(waitQueue, &headProc->links, &proc->links);
dclist_push_tail(waitQueue, &proc->links);
groupWaitProcValidate(proc, waitQueue);

waitQueue->count++;

Assert(groupWaitQueueFind(group, proc));
}

Expand All @@ -2483,14 +2479,12 @@ groupWaitQueuePop(ResGroupData *group)

waitQueue = &group->waitProcs;

proc = (PGPROC *) waitQueue->dlist.head.next;
proc = dclist_head_element(PGPROC, links, waitQueue);
groupWaitProcValidate(proc, waitQueue);
Assert(groupWaitQueueFind(group, proc));
Assert(proc->resSlot == NULL);

dclist_delete_from(waitQueue, &proc->links);

waitQueue->count--;
dclist_delete_from_thoroughly(waitQueue, &proc->links);

return proc;
}
Expand All @@ -2513,9 +2507,7 @@ groupWaitQueueErase(ResGroupData *group, PGPROC *proc)
waitQueue = &group->waitProcs;

groupWaitProcValidate(proc, waitQueue);
dclist_delete_from(waitQueue, &proc->links);

waitQueue->count--;
dclist_delete_from_thoroughly(waitQueue, &proc->links);
}

/*
Expand Down Expand Up @@ -2772,31 +2764,27 @@ resgroupDumpGroup(StringInfo str, ResGroupData *group)
static void
resgroupDumpWaitQueue(StringInfo str, dclist_head *queue)
{
PGPROC *proc;
dlist_iter iter;
bool first = true;

appendStringInfo(str, "\"wait_queue\":{");
appendStringInfo(str, "\"wait_queue_size\":%d,", queue->count);
appendStringInfo(str, "\"wait_queue_content\":[");

proc = (PGPROC *)dclist_next_node(queue, &queue->dlist.head);

if (!ShmemAddrIsValid(&proc->links))
dclist_foreach(iter, queue)
{
appendStringInfo(str, "]},");
return;
}
PGPROC *proc = dlist_container(PGPROC, links, iter.cur);

if (!first)
appendStringInfo(str, ",");
first = false;

while (proc)
{
appendStringInfo(str, "{");
appendStringInfo(str, "\"pid\":%d,", proc->pid);
appendStringInfo(str, "\"resWaiting\":%s,",
procIsWaiting(proc) ? "true" : "false");
appendStringInfo(str, "\"resSlot\":%d", slotGetId(proc->resSlot));
appendStringInfo(str, "}");
proc = (PGPROC *)dclist_next_node(queue, &queue->dlist.head);
if (proc)
appendStringInfo(str, ",");
}
appendStringInfo(str, "]},");
}
Expand Down Expand Up @@ -3329,7 +3317,7 @@ HandleMoveResourceGroup(void)
cgroupOpsRoutine->attachcgroup(self->groupId, MyProcPid,
self->caps.cpuMaxPercent == CPU_MAX_PERCENT_DISABLED);

// pgstat_report_resgroup(self->groupId);
pgstat_report_resgroup(self->groupId);
}

/*
Expand Down Expand Up @@ -3698,7 +3686,7 @@ check_and_unassign_from_resgroup(PlannedStmt* stmt)
} while (!groupIncBypassedRef(&groupInfo));

bypassedGroup = groupInfo.group;
bypassedGroup->totalExecuted++;
pgstat_report_resgroup(bypassedGroup->groupId);
bypassedSlot.group = groupInfo.group;
bypassedSlot.groupId = groupInfo.groupId;

Expand Down
1 change: 1 addition & 0 deletions src/include/utils/backend_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ extern void pgstat_report_query_id(uint64 query_id, bool force);
extern void pgstat_report_tempfile(size_t filesize);
extern void pgstat_report_appname(const char *appname);
extern void pgstat_report_xact_timestamp(TimestampTz tstamp);
extern void pgstat_report_resgroup(Oid groupId);
extern const char *pgstat_get_backend_current_activity(int pid, bool checkUser);
extern const char *pgstat_get_crashed_backend_activity(int pid, char *buffer,
int buflen);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ CREATE
0: CREATE OR REPLACE FUNCTION is_session_in_group(pid integer, groupname text) RETURNS BOOL AS $$ import subprocess
sql = "select sess_id from pg_stat_activity where pid = '%d'" % pid result = plpy.execute(sql) session_id = result[0]['sess_id']
sql = "select groupid from gp_toolkit.gp_resgroup_config where groupname='%s'" % groupname result = plpy.execute(sql) groupid = result[0]['groupid']
sql = "select hostname from gp_segment_configuration group by hostname" result = plpy.execute(sql) hosts = [_['hostname'] for _ in result]
def get_result(host): stdout = subprocess.run(["ssh", "{}".format(host), "ps -ef | grep postgres | grep con{} | grep -v grep | awk '{{print $2}}'".format(session_id)], check=True, stdout=subprocess.PIPE).stdout session_pids = stdout.splitlines()
path = "/sys/fs/cgroup/cpu/gpdb/{}/cgroup.procs".format(groupid) stdout = subprocess.run(["ssh", "{}".format(host), "cat {}".format(path)], check=True, stdout=subprocess.PIPE).stdout cgroups_pids = stdout.splitlines()
return set(session_pids).issubset(set(cgroups_pids))
for host in hosts: if not get_result(host): return False return True
sql = """select sc.hostname, array_agg(pa.pid::text) as pids from gp_stat_activity pa join gp_segment_configuration sc on pa.gp_segment_id = sc.content and sc.role = 'p' where pa.sess_id = %d group by sc.hostname""" % session_id result = plpy.execute(sql)
for row in result: host = row['hostname'] session_pids = set(row['pids'])
if not session_pids: continue
path = "/sys/fs/cgroup/cpu/gpdb/{}/cgroup.procs".format(groupid) stdout = subprocess.run(["ssh", "{}".format(host), "cat {}".format(path)], stdout=subprocess.PIPE, check=True).stdout cgroups_pids = set(stdout.decode().splitlines())
if not session_pids.issubset(cgroups_pids): return False return True
$$ LANGUAGE plpython3u;
CREATE
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,11 @@ CREATE
0: CREATE OR REPLACE FUNCTION is_session_in_group(pid integer, groupname text) RETURNS BOOL AS $$ import subprocess
sql = "select sess_id from pg_stat_activity where pid = '%d'" % pid result = plpy.execute(sql) session_id = result[0]['sess_id']
sql = "select groupid from gp_toolkit.gp_resgroup_config where groupname='%s'" % groupname result = plpy.execute(sql) groupid = result[0]['groupid']
sql = "select hostname from gp_segment_configuration group by hostname" result = plpy.execute(sql) hosts = [_['hostname'] for _ in result]
def get_result(host): stdout = subprocess.run(["ssh", "{}".format(host), "ps -ef | grep postgres | grep con{} | grep -v grep | awk '{{print $2}}'".format(session_id)], stdout=subprocess.PIPE, check=True).stdout session_pids = stdout.splitlines()
path = "/sys/fs/cgroup/gpdb/{}/queries/cgroup.procs".format(groupid) stdout = subprocess.run(["ssh", "{}".format(host), "cat {}".format(path)], stdout=subprocess.PIPE, check=True).stdout cgroups_pids = stdout.splitlines()
return set(session_pids).issubset(set(cgroups_pids))
for host in hosts: if not get_result(host): return False return True
sql = """select sc.hostname, array_agg(pa.pid::text) as pids from gp_stat_activity pa join gp_segment_configuration sc on pa.gp_segment_id = sc.content and sc.role = 'p' where pa.sess_id = %d group by sc.hostname""" % session_id result = plpy.execute(sql)
for row in result: host = row['hostname'] session_pids = set(row['pids'])
if not session_pids: continue
path = "/sys/fs/cgroup/gpdb/{}/queries/cgroup.procs".format(groupid) stdout = subprocess.run(["ssh", "{}".format(host), "cat {}".format(path)], stdout=subprocess.PIPE, check=True).stdout cgroups_pids = set(stdout.decode().splitlines())
if not session_pids.issubset(cgroups_pids): return False return True
$$ LANGUAGE plpython3u;
CREATE

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ LINE 1: CREATE RESOURCE GROUP 'must_fail' WITH (cpu_max_percent=10);

-- does not support leading numbers
CREATE RESOURCE GROUP 0_must_fail WITH (cpu_max_percent=10);
ERROR: syntax error at or near "0"
ERROR: trailing junk after numeric literal at or near "0_must_fail"
LINE 1: CREATE RESOURCE GROUP 0_must_fail WITH (cpu_max_percent=10);
^

Expand Down
3 changes: 3 additions & 0 deletions src/test/isolation2/expected/setup.out
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,6 @@ CREATE
-- sequences start/end based on the concurrency level (see AOSegmentGet_startHeapBlock())
CREATE OR REPLACE FUNCTION populate_pages(relname text, value int, upto tid) RETURNS VOID AS $$ /* in func */ DECLARE curtid tid; /* in func */ BEGIN /* in func */ LOOP /* in func */ EXECUTE format('INSERT INTO %I VALUES($1) RETURNING ctid', relname) INTO curtid USING value; /* in func */ EXIT WHEN curtid > upto; /* in func */ END LOOP; /* in func */ END; $$ /* in func */ LANGUAGE PLPGSQL;
CREATE

GRANT ALL ON SCHEMA public TO public;
GRANT
27 changes: 15 additions & 12 deletions src/test/isolation2/sql/resgroup/resgroup_auxiliary_tools_v1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -255,23 +255,26 @@ $$ LANGUAGE plpython3u;
result = plpy.execute(sql)
groupid = result[0]['groupid']

sql = "select hostname from gp_segment_configuration group by hostname"
sql = """select sc.hostname, array_agg(pa.pid::text) as pids
from gp_stat_activity pa
join gp_segment_configuration sc
on pa.gp_segment_id = sc.content and sc.role = 'p'
where pa.sess_id = %d
group by sc.hostname""" % session_id
result = plpy.execute(sql)
hosts = [_['hostname'] for _ in result]

def get_result(host):
stdout = subprocess.run(["ssh", "{}".format(host), "ps -ef | grep postgres | grep con{} | grep -v grep | awk '{{print $2}}'".format(session_id)],
check=True, stdout=subprocess.PIPE).stdout
session_pids = stdout.splitlines()
for row in result:
host = row['hostname']
session_pids = set(row['pids'])

path = "/sys/fs/cgroup/cpu/gpdb/{}/cgroup.procs".format(groupid)
stdout = subprocess.run(["ssh", "{}".format(host), "cat {}".format(path)], check=True, stdout=subprocess.PIPE).stdout
cgroups_pids = stdout.splitlines()
if not session_pids:
continue

return set(session_pids).issubset(set(cgroups_pids))
path = "/sys/fs/cgroup/cpu/gpdb/{}/cgroup.procs".format(groupid)
stdout = subprocess.run(["ssh", "{}".format(host), "cat {}".format(path)], stdout=subprocess.PIPE, check=True).stdout
cgroups_pids = set(stdout.decode().splitlines())

for host in hosts:
if not get_result(host):
if not session_pids.issubset(cgroups_pids):
return False
return True

Expand Down
25 changes: 14 additions & 11 deletions src/test/isolation2/sql/resgroup/resgroup_auxiliary_tools_v2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -261,23 +261,26 @@ $$ LANGUAGE plpython3u;
result = plpy.execute(sql)
groupid = result[0]['groupid']

sql = "select hostname from gp_segment_configuration group by hostname"
sql = """select sc.hostname, array_agg(pa.pid::text) as pids
from gp_stat_activity pa
join gp_segment_configuration sc
on pa.gp_segment_id = sc.content and sc.role = 'p'
where pa.sess_id = %d
group by sc.hostname""" % session_id
result = plpy.execute(sql)
hosts = [_['hostname'] for _ in result]

def get_result(host):
stdout = subprocess.run(["ssh", "{}".format(host), "ps -ef | grep postgres | grep con{} | grep -v grep | awk '{{print $2}}'".format(session_id)],
stdout=subprocess.PIPE, check=True).stdout
session_pids = stdout.splitlines()
for row in result:
host = row['hostname']
session_pids = set(row['pids'])

if not session_pids:
continue

path = "/sys/fs/cgroup/gpdb/{}/queries/cgroup.procs".format(groupid)
stdout = subprocess.run(["ssh", "{}".format(host), "cat {}".format(path)], stdout=subprocess.PIPE, check=True).stdout
cgroups_pids = stdout.splitlines()

return set(session_pids).issubset(set(cgroups_pids))
cgroups_pids = set(stdout.decode().splitlines())

for host in hosts:
if not get_result(host):
if not session_pids.issubset(cgroups_pids):
return False
return True

Expand Down
2 changes: 2 additions & 0 deletions src/test/isolation2/sql/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -489,3 +489,5 @@ EXIT WHEN curtid > upto; /* in func */
END LOOP; /* in func */
END; $$ /* in func */
LANGUAGE PLPGSQL;

GRANT ALL ON SCHEMA public TO public;
Loading