From eea7cf5daf4dc6395a4c97632add6592ace13283 Mon Sep 17 00:00:00 2001 From: Zhang Mingli Date: Fri, 28 Jul 2023 10:08:23 +0800 Subject: [PATCH] Refactor cdbpath_motion_for_parallel_join() by outer join inner style We have separated join locus function for non-parallel and parallel mode into cdbpath_motion_for_join() which handles for locus whose parallel_workers is 0 and cdbpath_motion_for_parallel_join() which handles others. A lot of logic are meaningless in cdbpath_motion_for_parallel_join(): 1.Writeable operations for join. 2.Both sides's locus parallel_workers are no more than one. 3.Some locus couldn't participate parallel join yet now. We have done the job for SegmentGeneralWorkers as outer side join with other locus as inner side, but lack for other types. Refactoring that to keep codes clear. Some wrong logic are found and left CBDB_PARALLEL_FIXME to remind us to resolve them in upcoming fixes. Main changes: SingleQE join SegmentGeneralWorkers. SingleQE join Partitioned(workers>1), except HashedOJ. SegmentGeneral as outer, Assert(false). Partitioned(workers>1) join SegmentGeneral. Partitioned(workers>1) join SegmentGeneralWorkers. Partitioned(workers>1) join SingleQE. Partitioned join Partitioned are handled as cdbpath_motion_for_join(). Remove all if conditions for inner side locus. Authored-by: Zhang Mingli avamingli@gmail.com --- src/backend/cdb/cdbpath.c | 679 +++++++--------------- src/test/regress/expected/gp_parallel.out | 1 + src/test/regress/sql/gp_parallel.sql | 1 + 3 files changed, 226 insertions(+), 455 deletions(-) diff --git a/src/backend/cdb/cdbpath.c b/src/backend/cdb/cdbpath.c index a9a2b8ee1dd..e13976ed0db 100644 --- a/src/backend/cdb/cdbpath.c +++ b/src/backend/cdb/cdbpath.c @@ -2270,6 +2270,7 @@ try_redistribute(PlannerInfo *root, CdbpathMfjRel *g, CdbpathMfjRel *o, CdbPathLocus_IsSegmentGeneralWorkers(g->locus)); Assert(CdbPathLocus_IsPartitioned(o->locus)); + /* CBDB_PARALLEL_FIXME: is it possible to redistribute both ?*/ if (CdbPathLocus_IsHashedWorkers(o->locus)) return false; @@ -2808,18 +2809,17 @@ can_elide_explicit_motion(PlannerInfo *root, Index rti, Path *subpath, * cdbpath_motion_for_parallel_join * Sibling of cdbpath_motion_for_join in parallel mode. * Separate with non-parallel functions as the logic of parallel join is quite different: - * 1. Trating path locus by outer/inner. The position side in prallel join is sensitive. - * 2. Still try Redistribute Motion even if Broadcast one side. In parallel mode, the cost based on rel size - * might not be better than redistribute one or both. Let the planner decide which is better. - * 3. Never duplicated outer_path(parallel_workers=0). That will lead to wrong results, ex: parallel left join. + * 1. Treating path locus by outer/inner. The position side in prallel join is sensitive. + * 2. Still try Redistribute Motion even if Broadcast one side using parallel_hash_enable_motion_broadcast. + * 3. Never duplicate outer_path(parallel_workers=0). That will lead to wrong results, ex: parallel left join. * Follow upstream until we have a clear answer. * * The locus of path whose workers > 1 could be: - * HashedWorkers: parallel scan on a Hashed locus table. + * HashedWorkers: parallel scan on a Hashed locus table and etc. * ReplicatedWorkers: like Broadcast, replica data to segments but strewn on workers of the same segment. - * SegmentGeneralWorkers: parallel scan on a replica table. - * Strewn(parallel_workers > 1), parallel scan on a randomly distributed table. - * Hashed(parallel_workers > 1), a special one generated by HashedWorkers with a Redistribute Motion. + * SegmentGeneralWorkers: parallel scan on a replica table and etc. + * Strewn(parallel_workers > 1), parallel scan on a randomly distributed table and etc. + * Hashed(parallel_workers > 1), generated by HashedWorkers with a Redistribute Motion. * * When we add a new xxxWorkers locus? * ISTM: xxxWorkers means strewn on workers of the same segment, but together as a xxx locus on segments that @@ -2876,7 +2876,6 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, { CdbpathMfjRel outer; CdbpathMfjRel inner; - int numsegments; bool join_quals_contain_outer_references; ListCell *lc; @@ -3012,10 +3011,14 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, /* Get rel sizes. */ outer.bytes = outer.path->rows * outer.path->pathtarget->width; inner.bytes = inner.path->rows * inner.path->pathtarget->width; + int outerParallel = outer.locus.parallel_workers; + int innerParallel = inner.locus.parallel_workers; if (join_quals_contain_outer_references || CdbPathLocus_IsOuterQuery(outer.locus) || CdbPathLocus_IsOuterQuery(inner.locus) || + CdbPathLocus_IsEntry(outer.locus) || + CdbPathLocus_IsEntry(inner.locus) || CdbPathLocus_IsGeneral(outer.locus) || CdbPathLocus_IsGeneral(inner.locus)) { @@ -3030,8 +3033,6 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, CdbpathMfjRel *segGeneral = &outer; CdbpathMfjRel *other = &inner; - int outerParallel = outer.locus.parallel_workers; - int innerParallel = inner.locus.parallel_workers; Assert(outerParallel > 1); if (CdbPathLocus_IsSegmentGeneralWorkers(inner.locus)) @@ -3079,25 +3080,10 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, if (CdbPathLocus_IsBottleneck(inner.locus)) { - /* - * Bottleneck locus can't participate in parallel with SegmentGeneralWorkers at present, may be enabled later. - * We don't support parallel on QD yet. If bottleneck is on QE, ex: - * A SingleQE join with SegmentGeneralWorkers(workers:2), we have two ways: - * 1.Motion(1:2) SingleQE to 2 process in local segments. - * There is no such motion now, and we have no clear answer that if SingleQE is parallel_safe(I think most - * are not, because SingleQE is always the last resort which means things must be done on a single process of - * a segments with all data). - * And what's join locus of that, SingleQE with workers = 2? It breaks the rule. - * - * 2.Motion(2:1) SegmentGeneralWorkers to a single process: - * In GPDB, we usually do not motion a SegmentGeneralxxx locus except for: 1) bring to singleQE; 2) Redistributed - * to Partitioned if XXXGeneral can't be general as it has volatile fuctions and so on. - * We follow it as GPDB. - */ + /* CBDB_PARALLEL_FIXME: TODO, gather to single segment */ goto fail; } - - if (CdbPathLocus_IsPartitioned(inner.locus)) + else if (CdbPathLocus_IsPartitioned(inner.locus)) { if (CdbPathLocus_NumSegments(outer.locus) != CdbPathLocus_NumSegments(inner.locus)) goto fail; @@ -3157,491 +3143,274 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root, } } } - else if (CdbPathLocus_IsSegmentGeneralWorkers(inner.locus)) + /* SegmentGeneral join others */ + else if (CdbPathLocus_IsSegmentGeneral(outer.locus)) { /* - * The whole branch handles the case that at least - * one of the two locus is SegmentGeneralWorkers. - * Put this before SegmentGeneral, we will handle - * SegmentGeneral with SegmentGeneralWorkers in this branch. + * In principle, we couldn't get here as: + * 1.If both's parallel_workers is 0, they should be handled in cdbpath_motion_for_join(). + * 2.If inner path's parallel_workers > 0, it must be from a partial_pathlist. + * SegmentGeneral neighter could be from base rel's partial_pathlist nor could be from + * partial_pathlist of a join locus. */ + Assert(false); + goto fail; + } + else if (CdbPathLocus_IsSingleQE(outer.locus)) + { + CdbpathMfjRel *single = &outer; + CdbpathMfjRel *other = &inner; + bool single_immovable = (outer.require_existing_order && + !outer_pathkeys) || outer.has_wts; + bool other_immovable = inner.require_existing_order && + !inner_pathkeys; - CdbpathMfjRel *segGeneral; - CdbpathMfjRel *other; - int outerParallel = outer.locus.parallel_workers; - int innerParallel = inner.locus.parallel_workers; - /* Didn't support insert with parallel yet */ - Assert(root->upd_del_replicated_table == 0); + /* single_immovable used with partitioned locus in parallel mode. */ - /* We don't handle parallel when expanding segments */ - if (CdbPathLocus_NumSegments(outer.locus) != CdbPathLocus_NumSegments(inner.locus)) + Assert(innerParallel != 0); + if (innerParallel == 0) goto fail; - if (CdbPathLocus_IsSegmentGeneral(outer.locus) && - CdbPathLocus_IsSegmentGeneralWorkers(inner.locus)) + if (CdbPathLocus_IsSegmentGeneralWorkers(other->locus)) { - /* - * CBDB_PARALLEL_FIXME: - * We shouln't get here as Path(parallel_worker=1) won't be added to partial_pathlist. - * If outer locus is SegmentGeneral and its parallel_workers must be 0. - * We neighter want a Motion nor change the parallel_workers of a path(May be enabled - * later in some very restricted scenarios or use path(parallel_workers=1) as a partial_path). - */ + /* CBDB_PARALLEL_FIXME: TODO, gather to single segment */ goto fail; } - else + else if (CdbPathLocus_IsPartitioned(other->locus)) { - /* SegmentGeneralWorkers with Partitioned or Bottleneck */ - segGeneral = &inner; - segGeneral->isouter = false; - other = &outer; - other->isouter = true; - Assert(innerParallel > 1); - - Assert(CdbPathLocus_IsBottleneck(other->locus) || - CdbPathLocus_IsPartitioned(other->locus)); - - if (CdbPathLocus_IsBottleneck(other->locus)) + /* If the bottlenecked rel can't be moved, bring the other rel to it. */ + if (single_immovable) { - /* - * Bottleneck locus can't participate in parallel at present, may be enabled later if we have a clear answer. - * We don't support parallel on QD yet. If bottleneck is on QE, ex: - * A SingleQE join with SegmentGeneralWorkers(workers:2), we have two ways: - * 1.Motion(1:2) SingleQE to 2 process in local segments. - * There is no such motion now, and we have no clear answer that if SingleQE is parallel_safe(I think most - * are not, because SingleQE is always the last resort which means things must be done on a single process of - * a segments with all data). - * And what's join locus of that, SingleQE with workers = 2? It breaks the rule. - * - * 2.Motion(2:1) SegmentGeneralWorkers to a single process: - * In GPDB, we usually do not motion a SegmentGeneralxxx locus except for: 1) bring to singleQE; 2) Redistributed - * to Partitioned if XXXGeneral can't be general as it has volatile fuctions and so on. - * We follow it as GPDB. - * Another reason is: not sure if we could benefit from a Parallel Scan on replicated tables and Gather data to a single process plan: Parallel scan with Motion all rows cost vs scan without motion. And we have no test cases for that. - */ - goto fail; + if (other_immovable) + goto fail; + else + other->move_to = single->locus; } - else + /* Redistribute single rel if joining on other rel's partitioning key */ + else if (cdbpath_match_preds_to_distkey(root, + redistribution_clauses, + other->path, + other->locus, + single->locus, + parallel_aware, + &single->move_to)) /* OUT */ { - /* - * This branch handles for partitioned other locus - * hashed, hashoj, strewn and hashedworkers. - */ - Assert(CdbPathLocus_IsPartitioned(other->locus)); - - if (!segGeneral->ok_to_replicate) - { - if (!try_redistribute(root, segGeneral, - other, redistribution_clauses, parallel_aware)) - { - /* - * FIXME: do we need to test movable? - */ - if (parallel_aware) - goto fail; - - CdbPathLocus_MakeSingleQE(&segGeneral->move_to, - CdbPathLocus_NumSegments(segGeneral->locus)); - CdbPathLocus_MakeSingleQE(&other->move_to, - CdbPathLocus_NumSegments(other->locus)); - } - } - else - { - if (parallel_aware) - { - if (innerParallel != outerParallel) - goto fail; - - if (segGeneral->isouter) - return cdbpathlocus_parallel_join(jointype, segGeneral->locus, other->locus, true); - - /* HashedWorkers, Hashed, Strewn JOIN SegmentGeneralWorkers with shared hash table, return the other locus anyway */ - return other->locus; - } - - /* No shared hash table join */ - /* Couldn't join if other is at outer side without shared hash table */ - Assert(other->isouter); - goto fail; - } + AssertEquivalent(CdbPathLocus_NumSegments(other->locus), + CdbPathLocus_NumSegments(single->move_to)); } + /* Replicate single rel if cheaper than redistributing both rels. */ + /* CBDB_PARALLEL_FIXME: Should move to ReplicatedWorkers if parallel_aware */ + else if (single->ok_to_replicate && + (single->bytes * CdbPathLocus_NumSegments(other->locus) < + single->bytes + other->bytes)) + CdbPathLocus_MakeReplicated(&single->move_to, + CdbPathLocus_NumSegments(other->locus), + single->path->parallel_workers); + /* + * Redistribute both rels on equijoin cols. + * + * Redistribute both to the same segments, here we choose the + * same segments with other. + */ + else if (!other_immovable && + cdbpath_distkeys_from_preds(root, + redistribution_clauses, + single->path, + CdbPathLocus_NumSegments(other->locus), + Max(single->path->parallel_workers, other->path->parallel_workers), + parallel_aware, + &single->move_to, /* OUT */ + &other->move_to)) /* OUT */ + { + /* ok */ + } + /* Broadcast single rel for below cases. */ + /* CBDB_PARALLEL_FIXME: Should move to ReplicatedWorkers if parallel_aware */ + else if (single->ok_to_replicate && + (other_immovable || + single->bytes < other->bytes || + other->has_wts)) + CdbPathLocus_MakeReplicated(&single->move_to, + CdbPathLocus_NumSegments(other->locus), + single->path->parallel_workers); + /* Last resort: If possible, move all partitions of other rel to single QE. */ + else if (!other_immovable) + other->move_to = single->locus; + else + goto fail; + } + else + { + /* Should not get here. */ + Assert(false); + goto fail; } } - else if (CdbPathLocus_IsSegmentGeneral(outer.locus) || - CdbPathLocus_IsSegmentGeneral(inner.locus)) + else if (CdbPathLocus_IsPartitioned(outer.locus) && (!CdbPathLocus_IsPartitioned(inner.locus))) { /* - * the whole branch handles the case that at least - * one of the two locus is SegmentGeneral. The logic - * is: - * - if both are SegmentGeneral: - * 1. if both locus are equal, no motion needed, simply return - * 2. For update cases. If resultrelation - * is SegmentGeneral, the update must execute - * on each segment of the resultrelation, if resultrelation's - * numsegments is larger, the only solution is to broadcast - * other - * 3. no motion is needed, change both numsegments to common - * - if only one of them is SegmentGeneral : - * 1. consider update case, if resultrelation is SegmentGeneral, - * the only solution is to broadcast the other - * 2. if other's locus is singleQE or entry, make SegmentGeneral - * to other's locus - * 3. the remaining possibility of other's locus is partitioned - * 3.1 if SegmentGeneral is not ok_to_replicate, try to - * add redistribute motion, if fails gather each to - * singleQE - * 3.2 if SegmentGeneral's numsegments is larger, just return - * other's locus - * 3.3 try to add redistribute motion, if fails, gather each - * to singleQE + * This branch handles outer Partitioned join with inner(non-Partitioned) locus. + * Case both are partitioned should be handled below like cbdpath_motion_for_join(). */ - CdbpathMfjRel *segGeneral; - CdbpathMfjRel *other; - - if (CdbPathLocus_IsSegmentGeneral(outer.locus) && - CdbPathLocus_IsSegmentGeneral(inner.locus)) + if (CdbPathLocus_IsSegmentGeneral(inner.locus)) { - int outerParallel = outer.locus.parallel_workers; - int innerParallel = inner.locus.parallel_workers; - Assert(outerParallel == 0); - Assert(innerParallel == 0); - if (innerParallel > 0 || outerParallel > 0) - goto fail; - - /* - * use_common to indicate whether we should - * return a segmentgeneral locus with common - * numsegments. - */ - bool use_common = true; - - /* - * Handle the case two same locus - */ - if (CdbPathLocus_NumSegments(outer.locus) == CdbPathLocus_NumSegments(inner.locus)) - return inner.locus; - - /* - * Now, two locus' numsegments not equal - * We should consider update resultrelation - * if update, - * - resultrelation's numsegments larger, then - * we should broadcast the other - * - otherwise, results is common - * else: - * common - */ - if (root->upd_del_replicated_table > 0) + Assert(outerParallel > 1); + if (!inner.ok_to_replicate) { - if ((CdbPathLocus_NumSegments(outer.locus) > - CdbPathLocus_NumSegments(inner.locus)) && - bms_is_member(root->upd_del_replicated_table, - outer.path->parent->relids)) - { - /* - * the updated resultrelation is replicated table - * and its numsegments is larger, we should broadcast - * the other path - */ - if (!inner.ok_to_replicate) - goto fail; - - CdbPathLocus_MakeReplicated(&inner.move_to, - CdbPathLocus_NumSegments(outer.locus), - inner.path->parallel_workers); - use_common = false; - } - else if ((CdbPathLocus_NumSegments(outer.locus) < - CdbPathLocus_NumSegments(inner.locus)) && - bms_is_member(root->upd_del_replicated_table, - inner.path->parent->relids)) + if (!try_redistribute(root, &inner, + &outer, redistribution_clauses, parallel_aware)) { /* - * the updated resultrelation is replicated table - * and its numsegments is larger, we should broadcast - * the other path + * CBDB_PARALLEL_FIXME: + * Do we need to test movable? + * Could we allow parallel-aware here? */ - if (!outer.ok_to_replicate) + if (parallel_aware) goto fail; - CdbPathLocus_MakeReplicated(&outer.move_to, - CdbPathLocus_NumSegments(inner.locus), - outer.path->parallel_workers); - use_common = false; + CdbPathLocus_MakeSingleQE(&(&inner)->move_to, + CdbPathLocus_NumSegments(inner.locus)); + CdbPathLocus_MakeSingleQE(&(&outer)->move_to, + CdbPathLocus_NumSegments(outer.locus)); } } - - if (use_common) + else { - /* - * The statement is not update a replicated table. - * Just return the segmentgeneral with a smaller numsegments. - */ - numsegments = CdbPathLocus_CommonSegments(inner.locus, - outer.locus); - outer.locus.numsegments = numsegments; - inner.locus.numsegments = numsegments; - - return inner.locus; + /* CBDB_PARALLEL_FIXME: redistribute to partitioned? */ + if (parallel_aware) + goto fail; + if (CdbPathLocus_NumSegments(inner.locus) != CdbPathLocus_NumSegments(outer.locus)) + goto fail; + return outer.locus; /* Partitioned(workers>1) JOIN SegmentGeneral */ } } - else + else if (CdbPathLocus_IsSegmentGeneralWorkers(inner.locus)) { - if (CdbPathLocus_IsSegmentGeneral(outer.locus)) - { - Assert(!CdbPathLocus_HasMultipleWorkers(outer.locus)); - segGeneral = &outer; - segGeneral->isouter = true; - other = &inner; - other->isouter = false; - } - else - { - Assert(!CdbPathLocus_HasMultipleWorkers(inner.locus)); - segGeneral = &inner; - segGeneral->isouter = false; - other = &outer; - other->isouter = true; - } - - Assert(CdbPathLocus_IsBottleneck(other->locus) || - CdbPathLocus_IsPartitioned(other->locus)); - - /* - * For UPDATE/DELETE, replicated table can't guarantee a logic row has - * same ctid or item pointer on each copy. If we broadcast matched tuples - * to all segments, the segments may update the wrong tuples or can't - * find a valid tuple according to ctid or item pointer. - * - * So For UPDATE/DELETE on replicated table, we broadcast other path so - * all target tuples can be selected on all copys and then be updated - * locally. - */ - if (root->upd_del_replicated_table > 0 && - bms_is_member(root->upd_del_replicated_table, - segGeneral->path->parent->relids)) - { - /* - * For UPDATE on a replicated table, we have to do it - * everywhere so that for each segment, we have to collect - * all the information of other that is we should broadcast it - */ - /* doesn't support insert parallel yet */ - Assert(other->path->parallel_workers == 0); + /* We don't handle parallel when expanding segments */ + if (CdbPathLocus_NumSegments(outer.locus) != CdbPathLocus_NumSegments(inner.locus)) + goto fail; - CdbPathLocus_MakeReplicated(&other->move_to, - CdbPathLocus_NumSegments(segGeneral->locus), - 0); - } - else if (CdbPathLocus_IsBottleneck(other->locus)) + if (!inner.ok_to_replicate) { - if (parallel_aware) - goto fail; - Assert(other->locus.parallel_workers == 0); + if (!try_redistribute(root, &inner, + &outer, redistribution_clauses, parallel_aware)) + { + /* + * CBDB_PARALLEL_FIXME: + * Do we need to test movable? + * Could we allow parallel-aware here? + */ + if (parallel_aware) + goto fail; - /* - * if the locus type is equal and segment count is unequal, - * we will dispatch the one on more segments to the other - */ - numsegments = CdbPathLocus_CommonSegments(segGeneral->locus, - other->locus); - segGeneral->move_to = other->locus; - segGeneral->move_to.numsegments = numsegments; + CdbPathLocus_MakeSingleQE(&(&inner)->move_to, + CdbPathLocus_NumSegments(inner.locus)); + CdbPathLocus_MakeSingleQE(&(&outer)->move_to, + CdbPathLocus_NumSegments(outer.locus)); + } } else { - /* - * This branch handles for partitioned other locus - * hashed, hashoj, strewn and hashedworkers. - */ - Assert(CdbPathLocus_IsPartitioned(other->locus)); - - if (!segGeneral->ok_to_replicate) - { - if (!try_redistribute(root, segGeneral, - other, redistribution_clauses, parallel_aware)) - { - /* - * FIXME: do we need to test movable? - */ - if (parallel_aware) - goto fail; - - CdbPathLocus_MakeSingleQE(&segGeneral->move_to, - CdbPathLocus_NumSegments(segGeneral->locus)); - CdbPathLocus_MakeSingleQE(&other->move_to, - CdbPathLocus_NumSegments(other->locus)); - } - } - else + if (parallel_aware) { - /* SegmentGeneral join with Partitioned */ - - /* Couldn't join with SegmentGeneral with a shared hash table */ - if (parallel_aware) - goto fail; - - if (other->locus.parallel_workers > 1) - { - if (CdbPathLocus_NumSegments(segGeneral->locus) != CdbPathLocus_NumSegments(other->locus)) - goto fail; - if (!other->isouter) - goto fail; /* partial path must be outer side when parallel_aware is false */ - if (segGeneral->ok_to_replicate) - return other->locus; /* Partitioned JOIN SegmentGeneral */ + /* CBDB_PARALLEL_FIXME: Motion(N) from one segment to M Partitioned? */ + if (innerParallel != outerParallel) goto fail; - } /* - * If all other's segments have segGeneral stored, then no motion - * is needed. - * - * A sql to reach here: - * select * from d2 a join r1 b using (c1); - * where d2 is a replicated table on 2 segment, - * r1 is a random table on 1 segments. + * HashedWorkers, Hashed, Strewn JOIN SegmentGeneralWorkers with shared hash table, + * return the other locus anyway. + * see ex 11_P_5_11, ex 12_P_5_12 */ - if (CdbPathLocus_NumSegments(segGeneral->locus) >= - CdbPathLocus_NumSegments(other->locus)) - return other->locus; - else - { - if (!try_redistribute(root, segGeneral, - other, redistribution_clauses, parallel_aware)) - { - if (parallel_aware) - goto fail; - - numsegments = CdbPathLocus_CommonSegments(segGeneral->locus, - other->locus); - /* - * FIXME: do we need to test movable? - */ - CdbPathLocus_MakeSingleQE(&segGeneral->move_to, numsegments); - CdbPathLocus_MakeSingleQE(&other->move_to, numsegments); - } - } + return outer.locus; } - } - } - } - /* - * Is either source confined to a single process? NB: Motion to a single - * process (qDisp or qExec) is the only motion in which we may use Merge - * Receive to preserve an existing ordering. - */ - else if (CdbPathLocus_IsBottleneck(outer.locus) || - CdbPathLocus_IsBottleneck(inner.locus)) - { /* singleQE or entry db */ - CdbpathMfjRel *single = &outer; - CdbpathMfjRel *other = &inner; - bool single_immovable = (outer.require_existing_order && - !outer_pathkeys) || outer.has_wts; - bool other_immovable = inner.require_existing_order && - !inner_pathkeys; - /* - * If each of the sources has a single-process locus, then assign both - * sources and the join to run in the same process, without motion. - * The slice will be run on the entry db if either source requires it. - */ - if (CdbPathLocus_IsEntry(single->locus)) - { - if (CdbPathLocus_IsBottleneck(other->locus)) - return single->locus; - } - else if (CdbPathLocus_IsSingleQE(single->locus)) - { - if (CdbPathLocus_IsBottleneck(other->locus)) - { /* - * Can join directly on one of the common segments. + * No shared hash table join: + * couldn't join if other is at outer side without shared hash table. + * CBDB_PARALLEL_FIXME: + * Could we benefit from Motion(N) from one segment to M Partitioned or Gather all to single? */ - numsegments = CdbPathLocus_CommonSegments(outer.locus, - inner.locus); - - other->locus.numsegments = numsegments; - return other->locus; + goto fail; } - } - - /* Let 'single' be the source whose locus is singleQE or entry. */ - else - { - CdbSwap(CdbpathMfjRel *, single, other); - CdbSwap(bool, single_immovable, other_immovable); - } - Assert(CdbPathLocus_IsBottleneck(single->locus)); - Assert(CdbPathLocus_IsPartitioned(other->locus)); - - /* If the bottlenecked rel can't be moved, bring the other rel to it. */ - if (single_immovable) - { - if (other_immovable) - goto fail; - else - other->move_to = single->locus; } - - /* Redistribute single rel if joining on other rel's partitioning key */ - else if (cdbpath_match_preds_to_distkey(root, - redistribution_clauses, - other->path, - other->locus, - single->locus, - parallel_aware, - &single->move_to)) /* OUT */ + else if (CdbPathLocus_IsSingleQE(inner.locus)) { - AssertEquivalent(CdbPathLocus_NumSegments(other->locus), - CdbPathLocus_NumSegments(single->move_to)); - } + CdbpathMfjRel *single = &inner; + CdbpathMfjRel *other = &outer; + bool single_immovable = inner.require_existing_order && !inner_pathkeys; + bool other_immovable = (outer.require_existing_order && !outer_pathkeys) || outer.has_wts; + Assert(outerParallel > 1); - /* Replicate single rel if cheaper than redistributing both rels. */ - else if (single->ok_to_replicate && - (single->bytes * CdbPathLocus_NumSegments(other->locus) < - single->bytes + other->bytes)) - CdbPathLocus_MakeReplicated(&single->move_to, + /* If the bottlenecked rel can't be moved, bring the other rel to it. */ + if (single_immovable) + { + if (other_immovable) + goto fail; + else + other->move_to = single->locus; + } + /* Redistribute single rel if joining on other rel's partitioning key */ + else if (cdbpath_match_preds_to_distkey(root, + redistribution_clauses, + other->path, + other->locus, + single->locus, + parallel_aware, + &single->move_to)) /* OUT */ + { + AssertEquivalent(CdbPathLocus_NumSegments(other->locus), + CdbPathLocus_NumSegments(single->move_to)); + } + /* Replicate single rel if cheaper than redistributing both rels. */ + else if (single->ok_to_replicate && + (single->bytes * CdbPathLocus_NumSegments(other->locus) < + single->bytes + other->bytes)) + /* CBDB_PARALLEL_FIXME: make ReplicatedWorkers when parallel-aware */ + CdbPathLocus_MakeReplicated(&single->move_to, CdbPathLocus_NumSegments(other->locus), single->path->parallel_workers); - - /* - * Redistribute both rels on equijoin cols. - * - * Redistribute both to the same segments, here we choose the - * same segments with other. - */ - else if (!other_immovable && - cdbpath_distkeys_from_preds(root, - redistribution_clauses, - single->path, - CdbPathLocus_NumSegments(other->locus), - Max(single->path->parallel_workers, other->path->parallel_workers), - parallel_aware, - &single->move_to, /* OUT */ - &other->move_to)) /* OUT */ - { - /* ok */ + /* + * Redistribute both rels on equijoin cols. + * + * Redistribute both to the same segments, here we choose the + * same segments with other. + */ + else if (!other_immovable && + cdbpath_distkeys_from_preds(root, + redistribution_clauses, + single->path, + CdbPathLocus_NumSegments(other->locus), + Max(single->path->parallel_workers, other->path->parallel_workers), + parallel_aware, + &single->move_to, /* OUT */ + &other->move_to)) /* OUT */ + { + /* ok */ + } + /* Broadcast single rel for below cases. */ + else if (single->ok_to_replicate && + (other_immovable || + single->bytes < other->bytes || + other->has_wts)) + /* CBDB_PARALLEL_FIXME: make ReplicatedWorkers when parallel-aware */ + CdbPathLocus_MakeReplicated(&single->move_to, + CdbPathLocus_NumSegments(other->locus), + single->path->parallel_workers); + /* Last resort: If possible, move all partitions of other rel to single QE. */ + else if (!other_immovable) + other->move_to = single->locus; + else + goto fail; } - - /* Broadcast single rel for below cases. */ - else if (single->ok_to_replicate && - (other_immovable || - single->bytes < other->bytes || - other->has_wts)) - CdbPathLocus_MakeReplicated(&single->move_to, - CdbPathLocus_NumSegments(other->locus), - single->path->parallel_workers); - - /* Last resort: If possible, move all partitions of other rel to single QE. */ - else if (!other_immovable) - other->move_to = single->locus; else + { + /* Should not get here. */ + Assert(false); goto fail; - } /* singleQE or entry */ - + } + } /* * No motion if partitioned alike and joining on the partitioning keys. */ diff --git a/src/test/regress/expected/gp_parallel.out b/src/test/regress/expected/gp_parallel.out index 0ead2658f4c..689beefb1fe 100644 --- a/src/test/regress/expected/gp_parallel.out +++ b/src/test/regress/expected/gp_parallel.out @@ -1042,6 +1042,7 @@ select * from t_replica_workers_2 join t_random_workers_0 using(a); abort; -- +-- ex 11_P_5_11 -- Strewn(worker=N) join SegmentGeneralWorkers(workers=N) with shared hash table. -- Join locus: Strewn(worker=N). -- diff --git a/src/test/regress/sql/gp_parallel.sql b/src/test/regress/sql/gp_parallel.sql index 16d2ef34afa..53429257cbb 100644 --- a/src/test/regress/sql/gp_parallel.sql +++ b/src/test/regress/sql/gp_parallel.sql @@ -304,6 +304,7 @@ select * from t_replica_workers_2 join t_random_workers_0 using(a); abort; -- +-- ex 11_P_5_11 -- Strewn(worker=N) join SegmentGeneralWorkers(workers=N) with shared hash table. -- Join locus: Strewn(worker=N). --