Skip to content
This repository was archived by the owner on May 24, 2024. It is now read-only.

Commit 87fbf06

Browse files
authored
Fix several DistributedTransaction related issues (#13810)
* Avoid enlarging inProgressXidArray when CreateDistributedSnapshot(). * latestCompletedXid should be uint64. * DistributedLog_AdvanceOldestXmin() returns more accurate oldestXmin. * Remove unnecessary check in DistributedSnapshotWithLocalMapping_CommittedTest(). * Remove unnecessary check in HeapTupleSatisfiesMVCC(). There are several issues/enhancements related to DistributedTransactionId or DistributedSnapshot. Since they are closely related, we put them all together here. 1, latestCompletedGxid is uint32 gxid is unit64 now, latestCompletedGxid should also be unit64. 2, Avoid enlarging inProgressXidArray during CreateDistributedSnapshot() DistributedSnapshot's xmax is set to latestCompletedGxid + 1 after ProcArrayLock is held. if (gxid > xmax) { xmax = gxid; } There is no need to update xmax because latestCompletedGxid can't be increased during CreateDistributedSnapshot() and any gxid bigger than latestCompletedGxid should be seen as a future gxid. And we should avoid enlarging inProgressXidArray for efficiency. 3, DistributedLog_AdvanceOldestXmin() should return more accurate oldestXmin while (1) { if (pg_atomic_compare_exchange_u32((pg_atomic_uint32 *)&DistributedLogShared->oldestXmin, &expected, (uint32)oldestXmin)) break; if (TransactionIdPrecedesOrEquals(oldestXmin, expected)) break; } When failed to update oldestXmin in a CAS operation and the expected value is bigger than the value we want to set. Some other processes must have updated oldestXmi to a bigger value. And we should return that more accurate value. 4, Enhancements in DistributedSnapshotWithLocalMapping_CommittedTest() redundant codes: /* * We found it in the distributed log. */ Assert(distribXid != InvalidDistributedTransactionId); /* * We have a distributed committed xid that corresponds to the * local xid. */ Assert(distribXid != InvalidDistributedTransactionId); unnecessary check: if (LocalDistribXactCache_CommittedFind(localXid,&distribXid)) { /* * We cache local-only committed transactions for better performance, * too. */ if (distribXid == InvalidDistributedTransactionId) return DISTRIBUTEDSNAPSHOT_COMMITTED_IGNORE; /* * Fall below and evaluate the committed distributed transaction * against the distributed snapshot. */ } It can't happen that distribXid == InvalidDistributedTransactionId. If we found it in LocalDistribXactCache_CommittedFind(), the distribXid must be valid. We add it to cache in LocalDistribXactCache_AddCommitted() and there is nowhere to update distribXid to InvalidDistributedTransactionId. distribXid in cache must be valid or we don't find it in cache(never added or removed by cache LRU). 5, Redundant checks in HeapTupleSatisfiesMVCC() if (snapshotCheckResult == XID_NOT_IN_SNAPSHOT) { if (snapshotCheckResult == XID_SURELY_COMMITTED || TransactionIdDidCommit(xvac)) { SetHintBits(tuple, buffer, relation, HEAP_XMIN_INVALID, InvalidTransactionId); return false; } SetHintBits(tuple, buffer, relation, HEAP_XMIN_COMMITTED, InvalidTransactionId); } snapshotCheckResult == XID_SURELY_COMMITTED check is unnecessary.
1 parent 183b69f commit 87fbf06

6 files changed

Lines changed: 36 additions & 36 deletions

File tree

src/backend/access/heap/heapam_visibility.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1069,7 +1069,7 @@ HeapTupleSatisfiesMVCC(Relation relation, HeapTuple htup, Snapshot snapshot,
10691069
&setDistributedSnapshotIgnore);
10701070
if (snapshotCheckResult == XID_NOT_IN_SNAPSHOT)
10711071
{
1072-
if (snapshotCheckResult == XID_SURELY_COMMITTED || TransactionIdDidCommit(xvac))
1072+
if (TransactionIdDidCommit(xvac))
10731073
{
10741074
SetHintBits(tuple, buffer, relation, HEAP_XMIN_INVALID,
10751075
InvalidTransactionId);

src/backend/access/transam/distributedlog.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,14 @@ DistributedLog_AdvanceOldestXmin(TransactionId oldestLocalXmin,
273273
break;
274274

275275
if (TransactionIdPrecedesOrEquals(oldestXmin, expected))
276+
{
277+
/*
278+
* If we got here, other processes must have updated the oldestXmin.
279+
* Return the more accurate value.
280+
*/
281+
oldestXmin = expected;
276282
break;
283+
}
277284
}
278285
}
279286

src/backend/cdb/cdbdistributedsnapshot.c

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -86,23 +86,15 @@ DistributedSnapshotWithLocalMapping_CommittedTest(
8686

8787
/*
8888
* Is this local xid in a process-local cache we maintain?
89+
*
90+
* If we found xact in cache, the distribXid must be a valid gxid.
91+
* We added it to cache in LocalDistribXactCache_AddCommitted()
92+
* and there is nowhere to update distribXid to InvalidDistributedTransactionId.
93+
* So that distribXid in cache must be valid or we don't find it
94+
* in cache (never added or removed by cache LRU).
8995
*/
90-
if (LocalDistribXactCache_CommittedFind(localXid,
96+
if (!LocalDistribXactCache_CommittedFind(localXid,
9197
&distribXid))
92-
{
93-
/*
94-
* We cache local-only committed transactions for better performance,
95-
* too.
96-
*/
97-
if (distribXid == InvalidDistributedTransactionId)
98-
return DISTRIBUTEDSNAPSHOT_COMMITTED_IGNORE;
99-
100-
/*
101-
* Fall below and evaluate the committed distributed transaction
102-
* against the distributed snapshot.
103-
*/
104-
}
105-
else
10698
{
10799
/*
108100
* Ok, now we must consult the distributed log.
@@ -114,12 +106,6 @@ DistributedSnapshotWithLocalMapping_CommittedTest(
114106
*/
115107
Assert(distribXid != InvalidDistributedTransactionId);
116108

117-
/*
118-
* We have a distributed committed xid that corresponds to the
119-
* local xid.
120-
*/
121-
Assert(distribXid != InvalidDistributedTransactionId);
122-
123109
/*
124110
* Since we did not find it in our process local cache, add it.
125111
*/
@@ -137,6 +123,7 @@ DistributedSnapshotWithLocalMapping_CommittedTest(
137123
}
138124
}
139125

126+
Assert(distribXid != InvalidDistributedTransactionId);
140127
Assert(ds->xminAllDistributedSnapshots != InvalidDistributedTransactionId);
141128

142129
/*

src/backend/storage/ipc/procarray.c

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2000,9 +2000,20 @@ CreateDistributedSnapshot(DistributedSnapshot *ds)
20002000
if (dxid != InvalidDistributedTransactionId && dxid < globalXminDistributedSnapshots)
20012001
globalXminDistributedSnapshots = dxid;
20022002

2003-
/* just fetch once */
2003+
/*
2004+
* Just fetch once
2005+
*
2006+
* Partial reading is possible on a 32 bit system, however we decided
2007+
* not to take it serious currently.
2008+
*/
20042009
gxid = gxact_candidate->gxid;
2005-
if (gxid == InvalidDistributedTransactionId)
2010+
2011+
/*
2012+
* Skip further gxid to avoid enlarging inProgressXidArray
2013+
* as we already have held ProcArrayLock and latestCompletedGxid
2014+
* can not be changed.
2015+
*/
2016+
if (gxid == InvalidDistributedTransactionId || gxid >= xmax)
20062017
continue;
20072018

20082019
/*
@@ -2013,10 +2024,6 @@ CreateDistributedSnapshot(DistributedSnapshot *ds)
20132024
{
20142025
xmin = gxid;
20152026
}
2016-
if (gxid > xmax)
2017-
{
2018-
xmax = gxid;
2019-
}
20202027

20212028
if (gxact_candidate == MyTmGxact)
20222029
continue;

src/backend/storage/ipc/test/procarray_test.c

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,10 @@ test__CreateDistributedSnapshot(void **state)
9494
/* perform all the validations */
9595
assert_true(ds.xminAllDistributedSnapshots == 5);
9696
assert_true(ds.xmin == 10);
97-
assert_true(ds.xmax == 30);
98-
assert_true(ds.count == 2);
97+
/* xmax should be latestCompletedGxid+1 */
98+
assert_true(ds.xmax == 25);
99+
assert_true(ds.count == 1);
99100
assert_true(ds.inProgressXidArray[0] == 10);
100-
assert_true(ds.inProgressXidArray[1] == 30);
101101
assert_true(MyTmGxact->xminDistributedSnapshot == 10);
102102

103103
/*************************************************************************
@@ -123,13 +123,12 @@ test__CreateDistributedSnapshot(void **state)
123123
/* perform all the validations */
124124
assert_true(ds.xminAllDistributedSnapshots == 5);
125125
assert_true(ds.xmin == 7);
126-
assert_true(ds.xmax == 30);
127-
assert_true(ds.count == 4);
126+
assert_true(ds.xmax == 25);
127+
assert_true(ds.count == 3);
128128
assert_true(MyTmGxact->xminDistributedSnapshot == 7);
129129
assert_true(ds.inProgressXidArray[0] == 7);
130130
assert_true(ds.inProgressXidArray[1] == 10);
131131
assert_true(ds.inProgressXidArray[2] == 15);
132-
assert_true(ds.inProgressXidArray[3] == 30);
133132

134133
free(ds.inProgressXidArray);
135134
free(allTmGxact);

src/include/access/transam.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,8 @@ typedef struct VariableCacheData
152152
*/
153153
TransactionId latestCompletedXid; /* newest XID that has committed or
154154
* aborted */
155-
TransactionId latestCompletedGxid; /* newest distributed XID that has
156-
committed or aborted */
155+
DistributedTransactionId latestCompletedGxid; /* newest distributed XID that has
156+
committed or aborted */
157157

158158
/*
159159
* The two variables are protected by shmGxidGenLock. Note nextGxid won't

0 commit comments

Comments
 (0)