Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/backend/gporca/libgpopt/include/gpopt/base/CUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,9 @@ class CUtils

static BOOL FScalarConstOrBinaryCoercible(CExpression *pexpr);

static BOOL FHasCrossSliceReplicatedCTEConsumer(CMemoryPool *mp,
CExpression *pexpr);

static CExpression *ReplaceColrefWithProjectExpr(CMemoryPool *mp,
CExpression *pexpr,
CColRef *pcolref,
Expand Down
122 changes: 122 additions & 0 deletions src/backend/gporca/libgpopt/src/base/CUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,128 @@ CUtils::FHasCTEAnchor(CExpression *pexpr)
return false;
}

// True if the distribution is replicated-like.
static BOOL
FReplicatedLikeDistribution(CDistributionSpec::EDistributionType edt)
{
return (CDistributionSpec::EdtStrictReplicated == edt ||
CDistributionSpec::EdtTaintedReplicated == edt ||
CDistributionSpec::EdtUniversal == edt);
}

struct SCTEInfo
{
ULONG cteId;
ULONG sliceId;

SCTEInfo(ULONG cte_id, ULONG slice_id) : cteId(cte_id), sliceId(slice_id)
{
}
};

typedef CDynamicPtrArray<SCTEInfo, CleanupDelete<SCTEInfo> > CTEInfoArray;

// Walk the physical tree, recording the slice id of every replicated
// CTE Producer and every CTE Consumer. Slices are delimited by Motion
// nodes: each non-scalar child of a Motion lives in a fresh slice --
// same motId-stack idea as in apply_shareinput_xslice.
static void
CollectCTESlices(CMemoryPool *mp, CExpression *pexpr, ULONG curSlice,
ULONG *pNextSlice, CTEInfoArray *prodInfos,
CTEInfoArray *consInfos)
{
COperator *pop = pexpr->Pop();

if (COperator::EopPhysicalCTEProducer == pop->Eopid())
{
// Producer's distribution comes from its only child -- inspect
// it there. Skip non-replicated Producers; they cannot trigger
// the cross-slice issue we are checking for.
GPOS_ASSERT(1 == pexpr->Arity());
CExpression *pexprChild = (*pexpr)[0];
CDrvdPropPlan *pdpplan =
CDrvdPropPlan::Pdpplan(pexprChild->PdpDerive());

if (FReplicatedLikeDistribution(pdpplan->Pds()->Edt()))
{
prodInfos->Append(GPOS_NEW(mp) SCTEInfo(
CPhysicalCTEProducer::PopConvert(pop)->UlCTEId(), curSlice));
}
}
else if (COperator::EopPhysicalCTEConsumer == pop->Eopid())
{
// Consumer is a leaf -- record (cteId, curSlice) and let the
// caller decide later, once the whole tree has been walked.
consInfos->Append(GPOS_NEW(mp) SCTEInfo(
CPhysicalCTEConsumer::PopConvert(pop)->UlCTEId(), curSlice));
}

BOOL isMotion = CUtils::FPhysicalMotion(pop);

for (ULONG ul = 0; ul < pexpr->Arity(); ul++)
{
CExpression *pexprChild = (*pexpr)[ul];

// Scalar subtrees (predicates, project elements) never run as
// separate executor groups, so they cannot host a slice.
if (pexprChild->Pop()->FScalar())
{
continue;
}

// Allocate a fresh slice id for each non-scalar child of a
// Motion; otherwise the child stays in the parent's slice.
ULONG childSlice = curSlice;
if (isMotion)
{
(*pNextSlice)++;
childSlice = *pNextSlice;
}

CollectCTESlices(mp, pexprChild, childSlice, pNextSlice, prodInfos,
Comment thread
andr-sokolov marked this conversation as resolved.
consInfos);
}
}

BOOL
CUtils::FHasCrossSliceReplicatedCTEConsumer(CMemoryPool *mp, CExpression *pexpr)
{
if (NULL == pexpr)
{
return false;
}

CTEInfoArray *prodInfos = GPOS_NEW(mp) CTEInfoArray(mp);
CTEInfoArray *consInfos = GPOS_NEW(mp) CTEInfoArray(mp);
ULONG nextSlice = 0;

CollectCTESlices(mp, pexpr, 0 /*curSlice*/, &nextSlice, prodInfos,
consInfos);

BOOL cross = false;

for (ULONG ic = 0; ic < consInfos->Size(); ic++)
{
SCTEInfo *cons = (*consInfos)[ic];

for (ULONG ip = 0; ip < prodInfos->Size(); ip++)
{
SCTEInfo *prod = (*prodInfos)[ip];
if (prod->cteId == cons->cteId && prod->sliceId != cons->sliceId)
{
cross = true;
goto lExit;
}
}
}
lExit:

prodInfos->Release();
consInfos->Release();

return cross;
}

//---------------------------------------------------------------------------
// @class:
// CUtils::FHasSubqueryOrApply
Expand Down
14 changes: 14 additions & 0 deletions src/backend/gporca/libgpopt/src/translate/CTranslatorExprToDXL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,20 @@ CTranslatorExprToDXL::PdxlnTranslate(CExpression *pexpr,

GPOS_ASSERT(NULL == m_pdpplan);

// Walk the physical tree and detect a CTE Consumer placed on a
// different slice than its Producer when the Producer's output is
// replicated-like (StrictReplicated/TaintedReplicated/Universal).
// Fall back to the Postgres optimizer if it is detected because
// it breaks Producer-Consumer locality and can hang the
// query at execution.
if (CUtils::FHasCrossSliceReplicatedCTEConsumer(m_mp, pexpr))
{
GPOS_RAISE(
gpdxl::ExmaDXL, gpdxl::ExmiExpr2DXLUnsupportedFeature,
GPOS_WSZ_LIT(
"CTE Consumer placed on a different slice than its replicated Producer"));
}

m_pdpplan = CDrvdPropPlan::Pdpplan(pexpr->PdpDerive());
m_pdpplan->AddRef();

Expand Down
46 changes: 46 additions & 0 deletions src/test/regress/expected/qp_orca_fallback.out
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 +330,49 @@ SELECT * FROM jsonb_array_elements('["b", "a"]'::jsonb) WITH ORDINALITY;
"a" | 2
(2 rows)

-- ORCA should fallback in case there are any CTE Consumers beneath a duplicate-hazard motion
-- start_ignore
DROP TABLE IF EXISTS tbl1, tbl2;
-- end_ignore
CREATE TABLE tbl2 (
id numeric NULL,
refrcode varchar(255) NULL,
referenceid numeric NULL
)
DISTRIBUTED REPLICATED;
CREATE TABLE tbl1 (
id bigserial NOT NULL,
iscalctrg varchar(15) NOT NULL,
iscalcdetail varchar(15) NULL
)
DISTRIBUTED REPLICATED;
EXPLAIN WITH
t1 AS (SELECT * FROM tbl1),
t2 AS (SELECT id, refrcode FROM tbl2 WHERE REFERENCEID = 101991)
SELECT p.*FROM t1 p
JOIN t2 r
ON p.isCalcTRG = r.RefrCode
JOIN t2 r1
ON p.isCalcDetail = r1.RefrCode
LIMIT 1;
QUERY PLAN
---------------------------------------------------------------------------------------------
Limit (cost=0.34..83.24 rows=1 width=104)
-> Gather Motion 1:1 (slice1; segments: 1) (cost=0.34..581.83 rows=8 width=104)
-> Hash Join (cost=0.34..581.69 rows=8 width=104)
Hash Cond: ((tbl1.iscalcdetail)::text = (r1.refrcode)::text)
-> Hash Join (cost=0.17..580.97 rows=130 width=104)
Hash Cond: ((tbl1.iscalctrg)::text = (r.refrcode)::text)
-> Seq Scan on tbl1 (cost=0.00..344.00 rows=24400 width=104)
-> Hash (cost=0.11..0.11 rows=2 width=516)
-> Subquery Scan on r (cost=0.00..0.11 rows=6 width=516)
-> Seq Scan on tbl2 (cost=0.00..166.25 rows=6 width=548)
Filter: (referenceid = '101991'::numeric)
-> Hash (cost=0.11..0.11 rows=2 width=516)
-> Subquery Scan on r1 (cost=0.00..0.11 rows=6 width=516)
-> Seq Scan on tbl2 tbl2_1 (cost=0.00..166.25 rows=6 width=548)
Filter: (referenceid = '101991'::numeric)
Optimizer: Postgres query optimizer
(16 rows)

DROP TABLE tbl1, tbl2;
48 changes: 48 additions & 0 deletions src/test/regress/expected/qp_orca_fallback_optimizer.out
Original file line number Diff line number Diff line change
Expand Up @@ -384,3 +384,51 @@ DETAIL: Feature not supported: WITH ORDINALITY
"a" | 2
(2 rows)

-- ORCA should fallback in case there are any CTE Consumers beneath a duplicate-hazard motion
-- start_ignore
DROP TABLE IF EXISTS tbl1, tbl2;
-- end_ignore
CREATE TABLE tbl2 (
id numeric NULL,
refrcode varchar(255) NULL,
referenceid numeric NULL
)
DISTRIBUTED REPLICATED;
CREATE TABLE tbl1 (
id bigserial NOT NULL,
iscalctrg varchar(15) NOT NULL,
iscalcdetail varchar(15) NULL
)
DISTRIBUTED REPLICATED;
EXPLAIN WITH
t1 AS (SELECT * FROM tbl1),
t2 AS (SELECT id, refrcode FROM tbl2 WHERE REFERENCEID = 101991)
SELECT p.*FROM t1 p
JOIN t2 r
ON p.isCalcTRG = r.RefrCode
JOIN t2 r1
ON p.isCalcDetail = r1.RefrCode
LIMIT 1;
INFO: GPORCA failed to produce a plan, falling back to planner
DETAIL: Feature not supported: CTE Consumer placed on a different slice than its replicated Producer
QUERY PLAN
---------------------------------------------------------------------------------------------
Limit (cost=0.34..83.24 rows=1 width=104)
-> Gather Motion 1:1 (slice1; segments: 1) (cost=0.34..581.83 rows=8 width=104)
-> Hash Join (cost=0.34..581.69 rows=8 width=104)
Hash Cond: ((tbl1.iscalcdetail)::text = (r1.refrcode)::text)
-> Hash Join (cost=0.17..580.97 rows=130 width=104)
Hash Cond: ((tbl1.iscalctrg)::text = (r.refrcode)::text)
-> Seq Scan on tbl1 (cost=0.00..344.00 rows=24400 width=104)
-> Hash (cost=0.11..0.11 rows=2 width=516)
-> Subquery Scan on r (cost=0.00..0.11 rows=6 width=516)
-> Seq Scan on tbl2 (cost=0.00..166.25 rows=6 width=548)
Filter: (referenceid = '101991'::numeric)
-> Hash (cost=0.11..0.11 rows=2 width=516)
-> Subquery Scan on r1 (cost=0.00..0.11 rows=6 width=516)
-> Seq Scan on tbl2 tbl2_1 (cost=0.00..166.25 rows=6 width=548)
Filter: (referenceid = '101991'::numeric)
Optimizer: Postgres query optimizer
(16 rows)

DROP TABLE tbl1, tbl2;
28 changes: 28 additions & 0 deletions src/test/regress/expected/shared_scan.out
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,34 @@ where
Optimizer: Postgres query optimizer
(52 rows)

-- ORCA should fallback when a CTE over a replicated table is referenced
-- from multiple scalar subqueries.
-- ss_t1 needs enough rows (40000) to push ORCA to the cross-slice plan;
-- with fewer rows the bug does not manifest and the test would silently
-- pass even without the fix.
CREATE TABLE ss_t1 AS
SELECT generate_series(1, 40000) id
DISTRIBUTED BY (id);
CREATE TABLE ss_t2 AS
SELECT * FROM (VALUES (1, 10), (2, 20)) AS v(id, v)
DISTRIBUTED REPLICATED;
ANALYZE ss_t1;
ANALYZE ss_t2;
SET statement_timeout = '15s';
WITH
cte1 AS (SELECT v FROM ss_t2 WHERE id = 1),
cte2 AS (SELECT v FROM ss_t2 WHERE id = 2)
SELECT (SELECT v FROM cte1) + (SELECT v FROM cte2) +
(SELECT v FROM cte1) + (SELECT v FROM cte2) AS result
FROM ss_t1
LIMIT 1;
result
--------
60
(1 row)

RESET statement_timeout;
DROP TABLE ss_t1, ss_t2;
-- Test the scenario which already opened many fds
-- start_ignore
RESET search_path;
Expand Down
28 changes: 28 additions & 0 deletions src/test/regress/expected/shared_scan_optimizer.out
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,34 @@ where
Optimizer: Postgres query optimizer
(52 rows)

-- ORCA should fallback when a CTE over a replicated table is referenced
-- from multiple scalar subqueries.
-- ss_t1 needs enough rows (40000) to push ORCA to the cross-slice plan;
-- with fewer rows the bug does not manifest and the test would silently
-- pass even without the fix.
CREATE TABLE ss_t1 AS
SELECT generate_series(1, 40000) id
DISTRIBUTED BY (id);
CREATE TABLE ss_t2 AS
SELECT * FROM (VALUES (1, 10), (2, 20)) AS v(id, v)
DISTRIBUTED REPLICATED;
ANALYZE ss_t1;
ANALYZE ss_t2;
SET statement_timeout = '15s';
WITH
cte1 AS (SELECT v FROM ss_t2 WHERE id = 1),
cte2 AS (SELECT v FROM ss_t2 WHERE id = 2)
SELECT (SELECT v FROM cte1) + (SELECT v FROM cte2) +
(SELECT v FROM cte1) + (SELECT v FROM cte2) AS result
FROM ss_t1
LIMIT 1;
result
--------
60
(1 row)

RESET statement_timeout;
DROP TABLE ss_t1, ss_t2;
-- Test the scenario which already opened many fds
-- start_ignore
RESET search_path;
Expand Down
29 changes: 29 additions & 0 deletions src/test/regress/sql/qp_orca_fallback.sql
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,32 @@ INSERT INTO partition_key_dropped VALUES(3);

-- Orca should fallback if a function in 'from' clause uses 'WITH ORDINALITY'
SELECT * FROM jsonb_array_elements('["b", "a"]'::jsonb) WITH ORDINALITY;

-- ORCA should fallback in case there are any CTE Consumers beneath a duplicate-hazard motion
-- start_ignore
DROP TABLE IF EXISTS tbl1, tbl2;
-- end_ignore
CREATE TABLE tbl2 (
id numeric NULL,
refrcode varchar(255) NULL,
referenceid numeric NULL
)
DISTRIBUTED REPLICATED;
CREATE TABLE tbl1 (
id bigserial NOT NULL,
iscalctrg varchar(15) NOT NULL,
iscalcdetail varchar(15) NULL
)
DISTRIBUTED REPLICATED;

EXPLAIN WITH
t1 AS (SELECT * FROM tbl1),
t2 AS (SELECT id, refrcode FROM tbl2 WHERE REFERENCEID = 101991)
SELECT p.*FROM t1 p
JOIN t2 r
ON p.isCalcTRG = r.RefrCode
JOIN t2 r1
ON p.isCalcDetail = r1.RefrCode
LIMIT 1;

DROP TABLE tbl1, tbl2;
25 changes: 25 additions & 0 deletions src/test/regress/sql/shared_scan.sql
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,31 @@ where
and (stat.schema_name || '.' ||stat.table_name not in (select table_nm_onl_act from tbls_w_onl_actl_data))
or (stat.schema_name || '.' ||stat.table_name in (select table_nm_onl_act from tbls_w_onl_actl_data));

-- ORCA should fallback when a CTE over a replicated table is referenced
-- from multiple scalar subqueries.
-- ss_t1 needs enough rows (40000) to push ORCA to the cross-slice plan;
-- with fewer rows the bug does not manifest and the test would silently
-- pass even without the fix.
CREATE TABLE ss_t1 AS
SELECT generate_series(1, 40000) id
DISTRIBUTED BY (id);
CREATE TABLE ss_t2 AS
SELECT * FROM (VALUES (1, 10), (2, 20)) AS v(id, v)
DISTRIBUTED REPLICATED;
ANALYZE ss_t1;
ANALYZE ss_t2;

SET statement_timeout = '15s';
WITH
cte1 AS (SELECT v FROM ss_t2 WHERE id = 1),
cte2 AS (SELECT v FROM ss_t2 WHERE id = 2)
SELECT (SELECT v FROM cte1) + (SELECT v FROM cte2) +
(SELECT v FROM cte1) + (SELECT v FROM cte2) AS result
FROM ss_t1
LIMIT 1;
RESET statement_timeout;
DROP TABLE ss_t1, ss_t2;

-- Test the scenario which already opened many fds
-- start_ignore
RESET search_path;
Expand Down
Loading