Skip to content

Support DDLs related to distributed schemas from any node - part 5 / proper locking and replicating reference tables#8506

Merged
onurctirtir merged 11 commits intofeature/ddl-from-any-node-schema-based-shardingfrom
ddl-from-any-node-ref-table-replication
Mar 17, 2026
Merged

Support DDLs related to distributed schemas from any node - part 5 / proper locking and replicating reference tables#8506
onurctirtir merged 11 commits intofeature/ddl-from-any-node-schema-based-shardingfrom
ddl-from-any-node-ref-table-replication

Conversation

@onurctirtir
Copy link
Copy Markdown
Member

@onurctirtir onurctirtir commented Mar 12, 2026

With this PR, we acquire the following locks on the coordinator when creating a
distributed-schema table from a worker:

  • advisory colocation id lock
  • advisory placement colocation lock
  • pg_dist_node table lock

This is because, on several operations that we can only initiate from the
coordinator today, such as shard-moves, we rely on those locks to be acquired
on the coordinator with proper lock modes if a concurrent operation that could
interfere with us was already initiated.

Note that in the scope of these locks, in code-paths that we're sure that the
operation cannot be initiated from a worker, we don't even check whether we're
on a worker etc., so we always just acquire the lock locally. In other words, we
check "if we need to remotely acquire those locks on the coordinator" only in
the code-paths that can initiate an operation from a worker, e.g., operations
around distributed-schemas that we've added support for initiating from a worker
so far.

Note that this PR doesn't care about "colocation default lock". This is because,
even if it appears, e.g., in CreateCitusTable(), it's never called when creating
a distributed-schema table by definition.

Besides these improvements around locking, this PR also adds proper support for
replicating reference tables to all nodes when creating a distributed-schema
table from a worker, when the reference tables are not yet replicated to all
nodes. To do that, as well as acquiring necessary locks on the coordinator, we
make sure to connect to the coordinator and initiate shard-copy operations from
there.

Finally, this PR also adds isolation tests to properly test the locking behavior
for the operations that can be initiated from a worker as well as adding more
regression tests for critical things to consider, see the last commit's description.

We never meant to support this UDF from workers because
EnsureReferenceTablesExistOnAllNodesExtended() always assumed
that it's run from the coordinator, which might change very
soon but still we wouldn't want to support
replicate_reference_tables() from workers for scoping purposes.
@codecov
Copy link
Copy Markdown

codecov bot commented Mar 12, 2026

Codecov Report

❌ Patch coverage is 82.58065% with 27 lines in your changes missing coverage. Please review.
✅ Project coverage is 88.88%. Comparing base (417f8aa) to head (f725873).
⚠️ Report is 20 commits behind head on feature/ddl-from-any-node-schema-based-sharding.

Additional details and impacted files
@@                                 Coverage Diff                                 @@
##           feature/ddl-from-any-node-schema-based-sharding    #8506      +/-   ##
===================================================================================
- Coverage                                            88.89%   88.88%   -0.01%     
===================================================================================
  Files                                                  286      286              
  Lines                                                63503    63639     +136     
  Branches                                              7966     7988      +22     
===================================================================================
+ Hits                                                 56451    56566     +115     
- Misses                                                4759     4773      +14     
- Partials                                              2293     2300       +7     
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@onurctirtir onurctirtir requested a review from codeforall March 12, 2026 08:37
}

PG_ENSURE_ARGNOTNULL(1, "lock_mode");
int lockMode = PG_GETARG_INT32(1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider using IntToLockMode to prevents callers from passing invalid lock mode values (e.g., 0 or 99) which would reach PostgreSQL's LockAcquire() with undefined behavior.

Copy link
Copy Markdown
Contributor

@codeforall codeforall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the comment about IntToLockMode is addressed, The PR is good to be merged

During the tests where we create distributed-schema tables from a
worker node, altering citus.next_shard_id / citus.next_placement_id
using run_command_on_coordinator() doesn't guarantee that we'll use the
values we set them to, e.g., if the citus internal connection that we
use to connect to the coordinator changes. For this reason, let's always
directly alter pg_dist_shardid_seq / pg_dist_placement_placementid_seq
at the system level by connecting to the coordinator.
…un from a worker

Given the recent support added to create distributed-schema tables from
any node; in the code-paths that can now acquire a colocation id lock
from a worker, make sure to acquire the lock on the coordinator, maybe
by using a remote connection that won't be closed until the end of the
transaction.

EnsureReferenceTablesExistOnAllNodesExtended() is also one of those
code-paths, but normally it needs to release the lock at certain points,
so it'll be handled in a separate commit differently. The code-paths
that are handled in this commit are the ones that don't need to release
the lock until the end of the transaction.
…n from a worker

Given the support for some operations from any node any node; in the
code-paths that can acquire a lock pg_dist_node from a worker, make
sure to acquire the lock on the coordinator as well, maybe by using
a remote connection that won't be closed until the end of the
transaction.

EnsureReferenceTablesExistOnAllNodesExtended() is also one of those
code-paths but it'll be handled in a separate commit.
…on is run from a worker

Given the recent support added to create distributed-schema tables from
any node; in the code-paths that can now acquire a placement colocation
lock from a worker, make sure to acquire the lock on the coordinator,
maybe by using a remote connection that won't be closed until the end of
the transaction.
…ing a distributed-schema table from a worker

Acquire a transactional advisory lock and release it by commiting the
remote transaction, because, in Postgres there is no other way to
release a transcational advisory lock in a separate command.
…mand via session-level conn

.. during isolation tests.

See the code comments.
…node

- Add tests for reference table replication as well as
  a failure test for #6592.
- Make sure to test create / drop schema / table for non-super
  user in schema_based_sharding_from_workers_a.sql.
- Add more non-super user tests.
- Add tests for the cases when the relation name is not qualified.
- Add tests for schema / table names that need proper quoting.
- Add more tests for foreign key constraints.
- Add isolation tests.
@onurctirtir onurctirtir force-pushed the ddl-from-any-node-ref-table-replication branch from f76d240 to f725873 Compare March 17, 2026 08:53
@onurctirtir onurctirtir merged commit 432e701 into feature/ddl-from-any-node-schema-based-sharding Mar 17, 2026
28 checks passed
@onurctirtir onurctirtir deleted the ddl-from-any-node-ref-table-replication branch March 17, 2026 08:54
onurctirtir added a commit that referenced this pull request Apr 14, 2026
Note that all the commits are reviewed and first merged into
https://github.com/citusdata/citus/tree/feature/ddl-from-any-node-schema-based-sharding,
from #8461, #8464, #8478, #8486 and #8506.

One can also check individual commits / commit messages to better
understand the changes.

DESCRIPTION: Support some of the DDLs for schema-based-sharding from any
node.

---

## What's supported in general?

Add support for the following from any node in general, regardless of
whether
the command targets a distributed-schema / table or not:
* create schema
* drop schema
* alter schema rename to / owner to
* create / drop / alter view

Note that we have already been supporting the following:
* create / drop / alter role
* truncate

## What's supported for distributed-schemas / tables?

Add support for the following from any node when the command targets a
distributed-schema / table:
* "create table" variants, including, e.g., sequence-based columns,
initial
foreign keys, partitioning relationship and many other attributes that
can be
  defined at "create table" time.
* drop table
* alter table, including but not limited to:
* "alter table set schema" to move a Postgres / Citus managed local
table
     into a distributed-schema from any node
   * "alter table attach / detach partition" to manage partitioning
relationships between distributed-schema tables within the same schema
   * "alter table add / drop / set column" ..
* create / drop / alter index
* create / drop / alter trigger

Also remove the limitations that were inherited from regular distributed
tables
around the usage of triggers with distributed-schema tables, i.e., do
not
require citus.enable_unsafe_triggers for distributed-schema tables
anymore as
it's the case for Citus managed local tables.

## Important changes at the high-level

* Now we call EnsurePropagationToCoordinator() instead of
EnsureCoordinator() in
  relevant code-paths to support a DDL from any node.
* Instead of SendCommandToWorkersWithMetadata(), now we use
SendCommandToRemoteNodesWithMetadata() in the code-paths where we need
to send
a command to other nodes, e.g., to sync metadata changes when creating a
  distributed-schema table from a worker.
* When creating a new distributed-schema / distributed-schema table, we
need to
fetch the next colocation id, shard id and shard placement id. With this
PR,
whenever we need to consume one of those sequences, we fetch the next
sequence
value from the coordinator and perform rest of the operations on the
current
node. This is because, we've never synced those sequences to workers, so
we
  consider their coordinator versions as source of truth.
* Preserve sequence range, nextval, and initial value behavior when
creating /
  altering distributed-schema tables from workers.
* Make sure to acquire central locks on the coordinator even when an
operation
  is initiated from a worker.
* We also properly support replicating reference tables from workers
when
  creating a distributed-schema table from workers.

For the last three points, please also see the next sections for more
details.

## Significant changes for sequences

In Postgres, a column can be associated with a sequence in three ways:
* Explicit default using nextval('sequence_name')
* Serial pseudo-types (smallserial, serial, bigserial)
* Identity columns (GENERATED ... AS IDENTITY)

Such columns can be used in Citus tables in three scenarios:
* The column existed before the table was distributed
* The column was added after distribution
* The column was altered to use a sequence after distribution

When Citus tables are created or altered on the coordinator, we enforce
the
following:
* The coordinator is always allowed to use the full sequence range.
* If the sequence is bigint-based, the allowed sequence range is split
across
  workers based on node group ids.
Otherwise, workers are effectively prevented from using the sequence by
  setting current value to the maximum value.
  See AlterSequenceMinMax().
* Also, for columns using explicit nextval defaults, worker-side
defaults are
rewritten to worker_nextval() to provide a clearer error when invoked on
  workers.
For serial and identity columns, the default expression cannot be
rewritten,
  but the same sequence range restrictions still apply.
* Furthermore, since the sequence originally present on the coordinator,
its
last_value and is_called state are implicitly preserved on the
coordinator
  after it's distributed.

And we want to preserve the same behavior when distributed-schema tables
are
created or altered from workers, hence the code changes introduced in
this PR:
* We make sure to always use nextval() on the coordinator when the
column has a
  default expression that uses nextval(), regardless of the data type.
* We make sure to execute AlterSequenceMinMax() on the local worker node
in
  addition to remote workers.
That way, we can use the appropriate sequence range on the local worker
as
well if it's bigint-based, or we can prevent the local worker from using
the
  sequence otherwise, by setting the current value to the maximum value.
* We also sync the sequence's last_value and is_called state to the
  coordinator's sequence.
That way, we can continue using the full range starting from the
worker's
current value on the coordinator after distribution, as it's the case
when
  creating Citus tables from the coordinator.

Finally, the UDFs that we send to other nodes to implement some parts of
these
changes now need to know whether it's called on the coordinator or a
worker,
namely worker_apply_sequence_command() and
citus_internal.adjust_identity_column_seq_settings(). For these UDFs to
properly
check whether it's called on the coordinator or a worker, we always need
to have
the node group id to be already assigned.
For coordinator and metadata-synced workers, this is always the case.
Note that,
by default, citus_add_node() always syncs metadata to the workers when
adding
them into the metadata. However, although not recommended, user can
later
execute stop_metadata_sync_to_node() for a worker to stop metadata
syncing to
that worker.

In that case, before this PR, we were resetting the node group id to 0
for
workers with metadata sync stopped, where, 0 is assumed to be the
coordinator's
node group id almost always in the code-base. So with this PR, we defer
resetting the group id until the worker is actually removed from the
metadata.

Ideally, we should not be sending commands to create / alter sequences
to the
workers without a metadata because we never use sequences in shard-level
queries
and DMLs, but when querying shell tables. However, changing this would
require
more work and deferring resetting the node group id until the worker is
removed
from the metadata anyways seems like a correct thing to do.

Also, to preserve compatibility in mixed-version clusters, we keep the
older
variant of worker_apply_sequence_command(), i.e., the one that takes 2
parameters rather than 4, and
worker_adjust_identity_column_seq_ranges(), which
is the ancestor of citus_internal.adjust_identity_column_seq_settings().
And
when creating or altering a distributed table from the coordinator, we
keep
sending these older variants to the workers. This is because, the most
significant difference between using the older variants and the newer
variants
is that the newer variants also send last_value and is_called state of
the
sequence to the node that they're sent to. However, as we don't actually
need to
do that when creating or altering a distributed table from the
coordinator, and
since using them from the coordinator could cause failures in
mixed-version
clusters, e.g., when one of the workers is still on the older version of
Citus,
we keep using the older variants when creating or altering a distributed
table
from the coordinator.

## Significant changes around Citus locks

Now we acquire the following locks on the coordinator when creating a
distributed-schema table from a worker:

* advisory colocation id lock
* advisory placement colocation lock
* pg_dist_node table lock

This is because, on several operations that we can only initiate from
the
coordinator today, such as shard-moves, we rely on those locks to be
acquired
on the coordinator if a concurrent operation that could interfere with
us was
already initiated.

Note that in the scope of these locks, in code-paths that we're sure
that the
operation cannot be initiated from a worker, we don't even check whether
we're
on a worker etc., so we always just acquire the lock locally. In other
words, we
check "if we need to remotely acquire those locks on the coordinator"
only in
the code-paths that can initiate an operation from a worker, e.g.,
operations
around distributed-schemas / tables that we've added support for
initiating from
a worker.

Note that we don't care about "colocation default lock". This is
because, even
if it appears, e.g., in CreateCitusTable(), it's never called when
creating a
distributed-schema table by definition.

## Support implicitly replicating reference tables from workers

We also support for replicating reference tables to all nodes when
creating a
distributed-schema table from a worker, when the reference tables are
not yet
replicated to all nodes. To do that, as well as acquiring necessary
locks on the
coordinator as mentioned above, we make sure to connect to the
coordinator
and initiate shard-copy operations from there.

## Future improvements

* Implementation of the mechanisms to avoid
"avoidable-distributed-deadlocks".
* Also see #1083 on how we
could
     avoid such deadlocks.
* Supporting other DDLs / UDFs around distributed-schemas / tables and
related
  objects from any node:
* "create /alter / drop etc." of the independent objects like types and
functions that the users might use with distributed-schema tables are
not
     supported.
* "create /alter / drop etc." statistics / policies / rule after table
     creation are not supported.
      * Although we allow creating tables having such initial objects /
configs at create table time and altering them later if / as much as
        alter table allows.
   * Limitations that we had for some "alter table" subcommands are also
     inherited.
* "alter table set schema" to move a distributed-schema table into a
regular
     or another distributed-schema is not supported.
* This is because this will require supporting underlying functionality
of
        undistribute_table() from workers.
   * distributed-schema management UDFs like citus_schema_distribute /
     citus_schema_undistribute are not supported.
* Any other DDL / UDFs that's not listed here are not yet supported too.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants