diff --git a/src/backend/cdb/cdbpath.c b/src/backend/cdb/cdbpath.c index 6c23fb90ce3..60d4304bbf2 100644 --- a/src/backend/cdb/cdbpath.c +++ b/src/backend/cdb/cdbpath.c @@ -3180,8 +3180,22 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, if (CdbPathLocus_IsBottleneck(inner.locus)) { - /* CBDB_PARALLEL_FIXME: TODO, gather to single segment */ - goto fail; + /* + * We may win if we are a parallel-aware join, SingleQE is on the inner side that + * means there is a chance to generate a parallel join under SingleQE. + * In this case, we have both side parallel and may benefit. + * See ex 5_P_2_2 in gp_parallel.sql + * If not parallel-aware, we are not sure for the benefit and a simgle test + * shows lower performance, ex: parallel scan on replicated table and join with + * SingleQE which is a non-parallel plan. + */ + if (parallel_aware) + { + segGeneral->move_to = inner.locus; + segGeneral->move_to.numsegments = inner.locus.numsegments; + } + else + goto fail; } else if (CdbPathLocus_IsPartitioned(inner.locus)) { @@ -3273,8 +3287,15 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, if (CdbPathLocus_IsSegmentGeneralWorkers(other->locus)) { - /* CBDB_PARALLEL_FIXME: TODO, gather to single segment */ - goto fail; + /* + * We may win here if gather to SingleQE no matter what parallel-aware is. + * SingleQE is outer side, there could be a parallel plan under it. + * So we may benefit even without a shared hash table. + * Let the planner decide. + * See ex 2_P_5_2 in gp_parallel.sql. + */ + other->move_to = outer.locus; + other->move_to.numsegments = outer.locus.numsegments; } else if (CdbPathLocus_IsPartitioned(other->locus)) { diff --git a/src/test/regress/expected/cbdb_parallel.out b/src/test/regress/expected/cbdb_parallel.out index 7e505cf3f62..2b0faa09e05 100644 --- a/src/test/regress/expected/cbdb_parallel.out +++ b/src/test/regress/expected/cbdb_parallel.out @@ -1198,6 +1198,153 @@ select * from t_replica_workers_2 join t_random_workers_2 using(a); 5 | 6 | 6 (5 rows) +abort; +-- +-- ex 2_P_5_2 +-- SingleQE join SegmentGeneralWorkers. +-- Join locus: SingleQE(may be elided to Entry). +-- +begin; +create table t1(a int, b int) with(parallel_workers=2); +create table rt1(a int, b int) with(parallel_workers=2) distributed replicated; +insert into t1 select i, i from generate_series(1, 100000) i; +insert into rt1 select i, i+1 from generate_series(1, 10000) i; +analyze t1; +analyze rt1; +set local enable_parallel = on; +explain(locus, costs off) select * from (select count(*) as a from t1) t1 left join rt1 on rt1.a = t1.a; + QUERY PLAN +------------------------------------------------------ + Parallel Hash Left Join + Locus: Entry + Hash Cond: ((count(*)) = rt1.a) + -> Finalize Aggregate + Locus: Entry + -> Gather Motion 6:1 (slice1; segments: 6) + Locus: Entry + -> Partial Aggregate + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Seq Scan on t1 + Locus: HashedWorkers + Parallel Workers: 2 + -> Parallel Hash + Locus: Entry + -> Gather Motion 2:1 (slice2; segments: 2) + Locus: Entry + -> Parallel Seq Scan on rt1 + Locus: SegmentGeneralWorkers + Parallel Workers: 2 + Optimizer: Postgres query optimizer +(21 rows) + +select * from (select count(*) as a from t1) t1 left join rt1 on rt1.a = t1.a; + a | a | b +--------+---+--- + 100000 | | +(1 row) + +set local enable_parallel = off; +select * from (select count(*) as a from t1) t1 left join rt1 on rt1.a = t1.a; + a | a | b +--------+---+--- + 100000 | | +(1 row) + +abort; +-- +-- ex 5_P_2_2 +-- SingleQE join SegmentGeneralWorkers. +-- Join locus: SingleQE(may be elided to Entry). +-- +begin; +set local enable_parallel = on; +set local max_parallel_workers_per_gather = 4; +create table t1(a int, b int) with(parallel_workers=4); +create table t2(a int, b int) with(parallel_workers=4); +create table rt1(a int, b int) with(parallel_workers=4) distributed replicated; +insert into t1 select i, i from generate_series(1, 10000000) i; +insert into t2 select i, i from generate_series(1, 10000000) i; +insert into rt1 select i, i+1 from generate_series(1, 10000) i; +analyze t1; +analyze t2; +analyze rt1; +explain(costs off, locus) select * from rt1 join (select count(*) as c, sum(t1.a) as a from t1 join t2 using(a)) t3 on t3.c = rt1.a; + QUERY PLAN +------------------------------------------------------------------- + Parallel Hash Join + Locus: Entry + Hash Cond: (rt1.a = (count(*))) + -> Gather Motion 4:1 (slice1; segments: 4) + Locus: Entry + -> Parallel Seq Scan on rt1 + Locus: SegmentGeneralWorkers + Parallel Workers: 4 + -> Parallel Hash + Locus: Entry + -> Finalize Aggregate + Locus: Entry + -> Gather Motion 12:1 (slice2; segments: 12) + Locus: Entry + -> Partial Aggregate + Locus: HashedWorkers + Parallel Workers: 4 + -> Parallel Hash Join + Locus: HashedWorkers + Parallel Workers: 4 + Hash Cond: (t1.a = t2.a) + -> Parallel Seq Scan on t1 + Locus: HashedWorkers + Parallel Workers: 4 + -> Parallel Hash + Locus: Hashed + -> Parallel Seq Scan on t2 + Locus: HashedWorkers + Parallel Workers: 4 + Optimizer: Postgres query optimizer +(30 rows) + +select * from rt1 join (select count(*) as c, sum(t1.a) as a from t1 join t2 using(a)) t3 on t3.c = rt1.a; + a | b | c | a +---+---+---+--- +(0 rows) + +set local enable_parallel = off; +explain(costs off, locus) select * from rt1 join (select count(*) as c, sum(t1.a) as a from t1 join t2 using(a)) t3 on t3.c = rt1.a; + QUERY PLAN +------------------------------------------------------------ + Hash Join + Locus: Entry + Hash Cond: (rt1.a = (count(*))) + -> Gather Motion 1:1 (slice1; segments: 1) + Locus: Entry + -> Seq Scan on rt1 + Locus: SegmentGeneral + -> Hash + Locus: Entry + -> Finalize Aggregate + Locus: Entry + -> Gather Motion 3:1 (slice2; segments: 3) + Locus: Entry + -> Partial Aggregate + Locus: Hashed + -> Hash Join + Locus: Hashed + Hash Cond: (t1.a = t2.a) + -> Seq Scan on t1 + Locus: Hashed + -> Hash + Locus: Hashed + -> Seq Scan on t2 + Locus: Hashed + Optimizer: Postgres query optimizer +(25 rows) + +select * from rt1 join (select count(*) as c, sum(t1.a) as a from t1 join t2 using(a)) t3 on t3.c = rt1.a; + a | b | c | a +---+---+---+--- +(0 rows) + abort; -- -- Test final join path's parallel_workers should be same with join_locus whose diff --git a/src/test/regress/sql/cbdb_parallel.sql b/src/test/regress/sql/cbdb_parallel.sql index 3ead947b0ba..5cc83c5d7ea 100644 --- a/src/test/regress/sql/cbdb_parallel.sql +++ b/src/test/regress/sql/cbdb_parallel.sql @@ -396,6 +396,49 @@ set local enable_parallel=false; select * from t_replica_workers_2 join t_random_workers_2 using(a); abort; +-- +-- ex 2_P_5_2 +-- SingleQE join SegmentGeneralWorkers. +-- Join locus: SingleQE(may be elided to Entry). +-- +begin; +create table t1(a int, b int) with(parallel_workers=2); +create table rt1(a int, b int) with(parallel_workers=2) distributed replicated; +insert into t1 select i, i from generate_series(1, 100000) i; +insert into rt1 select i, i+1 from generate_series(1, 10000) i; +analyze t1; +analyze rt1; +set local enable_parallel = on; +explain(locus, costs off) select * from (select count(*) as a from t1) t1 left join rt1 on rt1.a = t1.a; +select * from (select count(*) as a from t1) t1 left join rt1 on rt1.a = t1.a; +set local enable_parallel = off; +select * from (select count(*) as a from t1) t1 left join rt1 on rt1.a = t1.a; +abort; + +-- +-- ex 5_P_2_2 +-- SingleQE join SegmentGeneralWorkers. +-- Join locus: SingleQE(may be elided to Entry). +-- +begin; +set local enable_parallel = on; +set local max_parallel_workers_per_gather = 4; +create table t1(a int, b int) with(parallel_workers=4); +create table t2(a int, b int) with(parallel_workers=4); +create table rt1(a int, b int) with(parallel_workers=4) distributed replicated; +insert into t1 select i, i from generate_series(1, 10000000) i; +insert into t2 select i, i from generate_series(1, 10000000) i; +insert into rt1 select i, i+1 from generate_series(1, 10000) i; +analyze t1; +analyze t2; +analyze rt1; +explain(costs off, locus) select * from rt1 join (select count(*) as c, sum(t1.a) as a from t1 join t2 using(a)) t3 on t3.c = rt1.a; +select * from rt1 join (select count(*) as c, sum(t1.a) as a from t1 join t2 using(a)) t3 on t3.c = rt1.a; +set local enable_parallel = off; +explain(costs off, locus) select * from rt1 join (select count(*) as c, sum(t1.a) as a from t1 join t2 using(a)) t3 on t3.c = rt1.a; +select * from rt1 join (select count(*) as c, sum(t1.a) as a from t1 join t2 using(a)) t3 on t3.c = rt1.a; +abort; + -- -- Test final join path's parallel_workers should be same with join_locus whose -- parallel_workers is different from origin outer path(without motion).