Support DDLs related to distributed schemas from any node - part 4 / sequences#8486
Conversation
.. not when we stop syncing metadata to it.
Resetting local group id means setting it 0, which is same as setting it
to COORDINATOR_GROUP_ID. But when we stop syncing metadata to a worker, we
don't want it to assume it's the coordinator until we actually remove it
as it's still part of this cluster and can be re-added later. This mostly
doesn't cause an issue because we distinguish between the commands to be
sent to worker nodes with metadata vs without metadata, so we mostly don't
send a command to a worker without metadata if the commands has to check,
e.g., if the node is coordinator or not.
However, it seems that in some cases we don't make such a distinction.
For example, we create sequences that a table depends on due to having a
serial column or using a column that defaults to nextval('my_sequence')
on all remote nodes, including the ones that we marked as
"hasmetadata = false", when creating prerequisite objects using
PropagatePrerequisiteObjectsForDistributedTable() before creating shards.
This actually doesn't make much sense because a non-MX worker doesn't need
sequences as we always resolve nextval() calls on coordinator
and MX workers before preparing shard-level queries, so actually we never
need sequences when executing shard-level queries on non-MX workers. And
with next commit, worker_apply_sequence_command() will need to check if
the node it's sent to is a coordinator or not during execution and doing
so would be problematic if we don't reset local group id when stopping
metadata sync to a worker, as such a worker would then assume it's a
coordinator and would try to execute coordinator specific logic in
worker_apply_sequence_command() too.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## feature/ddl-from-any-node-schema-based-sharding #8486 +/- ##
=================================================================================
Coverage 88.88% 88.89%
=================================================================================
Files 286 286
Lines 63261 63504 +243
Branches 7937 7965 +28
=================================================================================
+ Hits 56230 56451 +221
- Misses 4745 4757 +12
- Partials 2286 2296 +10 🚀 New features to boost your workflow:
|
| /* | ||
| * WORKER_APPLY_SEQUENCE_COMMAND_LEGACY differs from | ||
| * WORKER_APPLY_SEQUENCE_COMMAND in that it does not | ||
| * accept last_value and is_called params, and does | ||
| * not set the initial sequence value when called on | ||
| * the coordinator. | ||
| * | ||
| * The initial value must be set only when creating | ||
| * sequence dependencies on the coordinator for | ||
| * operations initiated from a worker. In that case, on | ||
| * the coordinator, we need to continue after the last | ||
| * value used on the worker so the coordinator can safely | ||
| * assume the full sequence range. | ||
| * | ||
| * For operations initiated from the coordinator, this is | ||
| * unnecessary since all remote nodes are workers. While | ||
| * it would be safe to always use | ||
| * WORKER_APPLY_SEQUENCE_COMMAND (the underlying UDF skips | ||
| * setting the value when the target node is a worker), we | ||
| * use the legacy variant to preserve compatibility with | ||
| * mixed-version clusters. | ||
| * | ||
| * Therefore, for now we use | ||
| * WORKER_APPLY_SEQUENCE_COMMAND_LEGACY when the operation | ||
| * is initiated from the coordinator. In Citus 15.0, we | ||
| * will remove WORKER_APPLY_SEQUENCE_COMMAND_LEGACY and will | ||
| * delete the legacy code path, the first branch of the if | ||
| * statement below. | ||
| */ | ||
| if (IsCoordinator()) | ||
| { | ||
| appendStringInfo(wrappedSequenceDef, | ||
| WORKER_APPLY_SEQUENCE_COMMAND_LEGACY, | ||
| escapedSequenceDef, | ||
| quote_literal_cstr(typeName)); | ||
| } | ||
| else | ||
| { | ||
| /* prevent concurrent updates to the sequence until the end of the transaction */ | ||
| LockRelationOid(sequenceOid, RowExclusiveLock); | ||
|
|
||
| uint64 lastValue = 0; | ||
| bool isCalled = false; | ||
| FetchSequenceState(sequenceOid, &lastValue, &isCalled); | ||
|
|
||
| appendStringInfo(wrappedSequenceDef, | ||
| WORKER_APPLY_SEQUENCE_COMMAND, | ||
| escapedSequenceDef, | ||
| quote_literal_cstr(typeName), | ||
| lastValue, isCalled ? "true" : "false"); | ||
| } |
There was a problem hiding this comment.
this is for mixed-version clusters
| List * | ||
| IdentitySequenceDependencyCommandList(Oid targetRelationId) | ||
| { | ||
| if (IsCoordinator()) | ||
| { | ||
| return IdentitySequenceDependencyCommandListLegacy(targetRelationId); | ||
| } | ||
|
|
||
| List *commandList = NIL; | ||
|
|
||
| Relation relation = relation_open(targetRelationId, AccessShareLock); | ||
| TupleDesc tupleDescriptor = RelationGetDescr(relation); |
There was a problem hiding this comment.
this is for mixed-version clusters
| /* reset local group id for the node to be removed */ | ||
| char *updateLocalGroupIdCommand = LocalGroupIdUpdateCommand(0); | ||
| SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction( | ||
| nodeName, nodePort, CurrentUserName(), list_make1(updateLocalGroupIdCommand)); | ||
|
|
There was a problem hiding this comment.
See the changes at src/backend/distributed/metadata/metadata_sync.c.
This is moved from stop_metadata_sync_to_node() to here, see the PR description for why.
| #include "udfs/citus_internal_adjust_identity_column_seq_settings/15.0-1.sql" | ||
| #include "udfs/worker_apply_sequence_command/15.0-1.sql" |
There was a problem hiding this comment.
As described in PR description, we don't remove ancestors / older versions of these UDFs for mixed-version clusters.
| -- Failure to set groupid in the worker | ||
| SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET groupid").cancel(' || :pid || ')'); | ||
| SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); | ||
| SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET groupid").kill()'); | ||
| SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); |
There was a problem hiding this comment.
now we're not anymore sending LocalGroupIdUpdateCommand() during stop_metadata_sync_to_node(), so removed
| -- Considering the various ways a table can use sequences, test if we | ||
| -- properly adjust sequence min / max values on all worker nodes, | ||
| -- including the current one, as well as testing if we sync last_value | ||
| -- to the coordinator. |
There was a problem hiding this comment.
All the tests below are also tested by appending the following right after the marked comment lines, in order to make sure sequence ranges / nextval calls / column default expressions would not change in any nodes even if we created / altered all the sequences below from the coordinator - all test output was identical.
\c - - - :master_port
3850a8d to
6ed9eac
Compare
| for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts; | ||
| attributeIndex++) | ||
| { | ||
| Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, |
There was a problem hiding this comment.
We are using the tupleDescriptor after closing the relation and later in the loop we execute ALTER Table on the same relation (from ExecuteAndLogUtilityCommand()). So effectively ALTER table could trigger the relcache invalidation and the relcache entry may get rebuilt and the TupleDesc pointer becomes a dangling reference.
My be we could consider retaining the AccessShareLock on the relation to the end of the transaction (relation_close(tableRelation, NoLock))
There was a problem hiding this comment.
Fixed, now we're copying the tuple desc and using it later in the loop.
83e8916 to
dea4aeb
Compare
| } | ||
|
|
||
| int64 lastValueInt64 = DatumGetInt64(lastValueDatum); | ||
| if (lastValueInt64 < 0) |
There was a problem hiding this comment.
Do we disallow descending sequence? As in PostgreSQL user can create -ve value sequences like
CREATE SEQUENCE s AS bigint INCREMENT -1 MINVALUE -100 MAXVALUE -1 START -1;
Is this intentional?
There was a problem hiding this comment.
seems I wasn't even aware of that Postgres allows consuming negative values from sequences, let me fix that, thank you!
|
|
||
|
|
||
| /* | ||
| * SendCommandToCoordinator sends a command to remote workers in parallel. |
There was a problem hiding this comment.
Typo in function name in comment, Should be SendCommandToRemoteWorkersWithMetadata instead of SendCommandToCoordinator
| includeIdentityDefaults, | ||
| creatingShellTableOnRemoteNode); | ||
|
|
||
| SendCommandToCoordinator(DISABLE_DDL_PROPAGATION); |
There was a problem hiding this comment.
Do we want to enable the DDL Propagation state afterwards? Or is this intended?
There was a problem hiding this comment.
Normally, I'd also expect us to enable it back later in such code-paths, however, even in the earlier version where we were directly sending commands to all remote nodes, we were not sending commands to enable it back later. That's why I intentionally decided to not send enable commands at the end for the coordinator as well, in order to preserve the behavior.
Previously we were generating same set of commands and sending them to all nodes, but now we're generating commands for the remote workers and the coordinator, and sending them separately.
List *commandList = list_make1(DISABLE_DDL_PROPAGATION);
...
...
TableDDLCommand *tableDDLCommand = NULL;
foreach_declared_ptr(tableDDLCommand, tableDDLCommands)
{
Assert(CitusIsA(tableDDLCommand, TableDDLCommand));
commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand));
}
const char *command = NULL;
foreach_declared_ptr(command, commandList)
{
SendCommandToRemoteNodesWithMetadata(command);
}Although I'm saying these, probably it wouldn't make a difference if we've sent commands to enable it back because we're sending those commands via the citus internal metadata connections and such connection never care about whether DDL propagation is enabled or not.
| -- bump version to 15.0-1 | ||
|
|
||
| #include "udfs/citus_internal_get_next_colocation_id/15.0-1.sql" | ||
| #include "udfs/citus_internal_adjust_identity_column_seq_settings/15.0-1.sql" |
There was a problem hiding this comment.
To guard against scenario, if the extension is upgraded on the coordinator while workers are still on the old version. Would it make sense to WRAP these new function creation in DISABLE_DDL_PROPAGATION/ENABLE_DDL_PROPAGATION ?
There was a problem hiding this comment.
no, we don't really need that
| colname, | ||
| command->missing_ok, forceUseNextVal); | ||
|
|
||
| sequenceOidToAdjustRangesForLocalWorker = seqOid; |
There was a problem hiding this comment.
If an ALTER TABLE statement contains multiple subcommands that each add or alter a column with a nextval() default (e.g., ALTER TABLE t ADD COLUMN a bigint DEFAULT nextval('s1'), ADD COLUMN b bigint DEFAULT nextval('s2')), sequenceOidToAdjustRangesForLocalWorker will only contain the last subcommand's sequence. That means earlier columns' sequences won't have their ranges adjusted.
There was a problem hiding this comment.
As we do for most ALTER TABLE subcommands, we already don't allow issuing such with other subcommands:
citus/src/backend/distributed/commands/table.c
Lines 3348 to 3361 in 366fd64
citus/src/backend/distributed/commands/table.c
Lines 3427 to 3441 in 366fd64
citus/src/backend/distributed/commands/table.c
Lines 3481 to 3495 in 366fd64
But let me still add some asserts and some comments to make it even clearer.
…buted-schema tables from workers
…buted-schema tables from workers A further refactor should ideally make the rest of the code-base be using SetLocalEnableDDLPropagation() as well. Also, another further refactor should ideally get rid of extra args of AlterSequenceMinMax(). Also, ideally, sequence-processing related parts in PostprocessAlterTableStmt should be moved into a seperate function etc.
5454c6b to
9782fda
Compare
417f8aa
into
feature/ddl-from-any-node-schema-based-sharding
Note that all the commits are reviewed and first merged into https://github.com/citusdata/citus/tree/feature/ddl-from-any-node-schema-based-sharding, from #8461, #8464, #8478, #8486 and #8506. One can also check individual commits / commit messages to better understand the changes. DESCRIPTION: Support some of the DDLs for schema-based-sharding from any node. --- ## What's supported in general? Add support for the following from any node in general, regardless of whether the command targets a distributed-schema / table or not: * create schema * drop schema * alter schema rename to / owner to * create / drop / alter view Note that we have already been supporting the following: * create / drop / alter role * truncate ## What's supported for distributed-schemas / tables? Add support for the following from any node when the command targets a distributed-schema / table: * "create table" variants, including, e.g., sequence-based columns, initial foreign keys, partitioning relationship and many other attributes that can be defined at "create table" time. * drop table * alter table, including but not limited to: * "alter table set schema" to move a Postgres / Citus managed local table into a distributed-schema from any node * "alter table attach / detach partition" to manage partitioning relationships between distributed-schema tables within the same schema * "alter table add / drop / set column" .. * create / drop / alter index * create / drop / alter trigger Also remove the limitations that were inherited from regular distributed tables around the usage of triggers with distributed-schema tables, i.e., do not require citus.enable_unsafe_triggers for distributed-schema tables anymore as it's the case for Citus managed local tables. ## Important changes at the high-level * Now we call EnsurePropagationToCoordinator() instead of EnsureCoordinator() in relevant code-paths to support a DDL from any node. * Instead of SendCommandToWorkersWithMetadata(), now we use SendCommandToRemoteNodesWithMetadata() in the code-paths where we need to send a command to other nodes, e.g., to sync metadata changes when creating a distributed-schema table from a worker. * When creating a new distributed-schema / distributed-schema table, we need to fetch the next colocation id, shard id and shard placement id. With this PR, whenever we need to consume one of those sequences, we fetch the next sequence value from the coordinator and perform rest of the operations on the current node. This is because, we've never synced those sequences to workers, so we consider their coordinator versions as source of truth. * Preserve sequence range, nextval, and initial value behavior when creating / altering distributed-schema tables from workers. * Make sure to acquire central locks on the coordinator even when an operation is initiated from a worker. * We also properly support replicating reference tables from workers when creating a distributed-schema table from workers. For the last three points, please also see the next sections for more details. ## Significant changes for sequences In Postgres, a column can be associated with a sequence in three ways: * Explicit default using nextval('sequence_name') * Serial pseudo-types (smallserial, serial, bigserial) * Identity columns (GENERATED ... AS IDENTITY) Such columns can be used in Citus tables in three scenarios: * The column existed before the table was distributed * The column was added after distribution * The column was altered to use a sequence after distribution When Citus tables are created or altered on the coordinator, we enforce the following: * The coordinator is always allowed to use the full sequence range. * If the sequence is bigint-based, the allowed sequence range is split across workers based on node group ids. Otherwise, workers are effectively prevented from using the sequence by setting current value to the maximum value. See AlterSequenceMinMax(). * Also, for columns using explicit nextval defaults, worker-side defaults are rewritten to worker_nextval() to provide a clearer error when invoked on workers. For serial and identity columns, the default expression cannot be rewritten, but the same sequence range restrictions still apply. * Furthermore, since the sequence originally present on the coordinator, its last_value and is_called state are implicitly preserved on the coordinator after it's distributed. And we want to preserve the same behavior when distributed-schema tables are created or altered from workers, hence the code changes introduced in this PR: * We make sure to always use nextval() on the coordinator when the column has a default expression that uses nextval(), regardless of the data type. * We make sure to execute AlterSequenceMinMax() on the local worker node in addition to remote workers. That way, we can use the appropriate sequence range on the local worker as well if it's bigint-based, or we can prevent the local worker from using the sequence otherwise, by setting the current value to the maximum value. * We also sync the sequence's last_value and is_called state to the coordinator's sequence. That way, we can continue using the full range starting from the worker's current value on the coordinator after distribution, as it's the case when creating Citus tables from the coordinator. Finally, the UDFs that we send to other nodes to implement some parts of these changes now need to know whether it's called on the coordinator or a worker, namely worker_apply_sequence_command() and citus_internal.adjust_identity_column_seq_settings(). For these UDFs to properly check whether it's called on the coordinator or a worker, we always need to have the node group id to be already assigned. For coordinator and metadata-synced workers, this is always the case. Note that, by default, citus_add_node() always syncs metadata to the workers when adding them into the metadata. However, although not recommended, user can later execute stop_metadata_sync_to_node() for a worker to stop metadata syncing to that worker. In that case, before this PR, we were resetting the node group id to 0 for workers with metadata sync stopped, where, 0 is assumed to be the coordinator's node group id almost always in the code-base. So with this PR, we defer resetting the group id until the worker is actually removed from the metadata. Ideally, we should not be sending commands to create / alter sequences to the workers without a metadata because we never use sequences in shard-level queries and DMLs, but when querying shell tables. However, changing this would require more work and deferring resetting the node group id until the worker is removed from the metadata anyways seems like a correct thing to do. Also, to preserve compatibility in mixed-version clusters, we keep the older variant of worker_apply_sequence_command(), i.e., the one that takes 2 parameters rather than 4, and worker_adjust_identity_column_seq_ranges(), which is the ancestor of citus_internal.adjust_identity_column_seq_settings(). And when creating or altering a distributed table from the coordinator, we keep sending these older variants to the workers. This is because, the most significant difference between using the older variants and the newer variants is that the newer variants also send last_value and is_called state of the sequence to the node that they're sent to. However, as we don't actually need to do that when creating or altering a distributed table from the coordinator, and since using them from the coordinator could cause failures in mixed-version clusters, e.g., when one of the workers is still on the older version of Citus, we keep using the older variants when creating or altering a distributed table from the coordinator. ## Significant changes around Citus locks Now we acquire the following locks on the coordinator when creating a distributed-schema table from a worker: * advisory colocation id lock * advisory placement colocation lock * pg_dist_node table lock This is because, on several operations that we can only initiate from the coordinator today, such as shard-moves, we rely on those locks to be acquired on the coordinator if a concurrent operation that could interfere with us was already initiated. Note that in the scope of these locks, in code-paths that we're sure that the operation cannot be initiated from a worker, we don't even check whether we're on a worker etc., so we always just acquire the lock locally. In other words, we check "if we need to remotely acquire those locks on the coordinator" only in the code-paths that can initiate an operation from a worker, e.g., operations around distributed-schemas / tables that we've added support for initiating from a worker. Note that we don't care about "colocation default lock". This is because, even if it appears, e.g., in CreateCitusTable(), it's never called when creating a distributed-schema table by definition. ## Support implicitly replicating reference tables from workers We also support for replicating reference tables to all nodes when creating a distributed-schema table from a worker, when the reference tables are not yet replicated to all nodes. To do that, as well as acquiring necessary locks on the coordinator as mentioned above, we make sure to connect to the coordinator and initiate shard-copy operations from there. ## Future improvements * Implementation of the mechanisms to avoid "avoidable-distributed-deadlocks". * Also see #1083 on how we could avoid such deadlocks. * Supporting other DDLs / UDFs around distributed-schemas / tables and related objects from any node: * "create /alter / drop etc." of the independent objects like types and functions that the users might use with distributed-schema tables are not supported. * "create /alter / drop etc." statistics / policies / rule after table creation are not supported. * Although we allow creating tables having such initial objects / configs at create table time and altering them later if / as much as alter table allows. * Limitations that we had for some "alter table" subcommands are also inherited. * "alter table set schema" to move a distributed-schema table into a regular or another distributed-schema is not supported. * This is because this will require supporting underlying functionality of undistribute_table() from workers. * distributed-schema management UDFs like citus_schema_distribute / citus_schema_undistribute are not supported. * Any other DDL / UDFs that's not listed here are not yet supported too.
TL;DR;
Things that we might want to refactor in a follow-up PR:
Long Summary:
In Postgres, a column can be associated with a sequence in three ways:
Such columns can be used in distributed tables in three scenarios:
When distributed tables are created or altered on the coordinator, we enforce
the following behavior:
workers based on node group ids.
Otherwise, workers are effectively prevented from using the sequence by
setting current value to the maximum value.
See AlterSequenceMinMax().
rewritten to worker_nextval() to provide a clearer error when invoked on
workers.
For serial and identity columns, the default expression cannot be rewritten,
but the same sequence range restrictions still apply.
last_value and is_called state are implicitly preserved on the coordinator
after it's disributed.
And we want to preserve the same behavior when distributed tables are created or
altered from workers, hence the code changes introduced in this PR.
Specifically, when creating a distributed-schema table from a worker:
default expression that uses nextval(), regardless of the data type.
addition to remote workers.
That way, we can use the appropriate sequence range on the local worker as
well if it's bigint-based, or we can prevent the local worker from using the
sequence otherwise, by setting the current value to the maximum value.
coordinator's sequence.
That way, we can continue using the full range starting from the current value
on the coordinator after distribution, as it's the case when creating
distributed tables from the coordinator.
Finally, the UDFs that we send to other nodes to implement some parts of these
changes now need to know whether it's called on the coordinator or a worker,
namely worker_apply_sequence_command() and
citus_internal.adjust_identity_column_seq_settings(). For these UDFs to properly
check whether it's called on the coordinator or a worker, we always need to have
the node group id to be already assigned.
For coordinator and metadata-synced workers, this is always the case. Note that,
by default, citus_add_node() always syncs metadata to the workers when adding
them into the metadata. However, although not recommended,
user can later execute stop_metadata_sync_to_node() for a worker to stop
metadata syncing to that worker.
In that case, before this PR, we were resetting the node group id to 0 for
workers with metadata sync stopped, where, 0 is assumed to be the coordinator's
node group id almost always in the code-base. So with this PR, we defer
resetting the group id until the worker is actually removed from the metadata.
Ideally, we should not be sending commands to create / alter sequences to the
workers without a metadata because we never use sequences in shard-level queries
and DMLs, but when querying shell tables. However, changing this would require
more work and deferring resetting the node group id until the worker is removed
from the metadata anyways seems like a correct thing to do.
Also, to preserve compatibility in mixed-version clusters, we keep the older
variant of worker_apply_sequence_command(), i.e., the one that takes 2
parameters rather than 4, and worker_adjust_identity_column_seq_ranges(), which
is the ancestor of citus_internal.adjust_identity_column_seq_settings(). And
when creating or altering a distributed table from the coordinator, we keep
sending these older variants to the workers. This is because, the most
significant difference between using the older variants and the newer variants
is that the newer variants also send last_value and is_called state of the
sequence to the node that they're sent to. However, as we don't actually need to
do that when creating or altering a distributed table from the coordinator, and
since using them from the coordinator could cause failures in mixed-version
clusters, e.g., when one of the workers is still on the older version of Citus,
we keep using the older variants when creating or altering a distributed table
from the coordinator.