diff --git a/entries/bls_pkid_pair.go b/entries/bls_pkid_pair.go index 9b9ebac..93b081c 100644 --- a/entries/bls_pkid_pair.go +++ b/entries/bls_pkid_pair.go @@ -21,6 +21,19 @@ type PGBLSPkidPairEntry struct { BLSPublicKeyPKIDPairEntry } +type BLSPublicKeyPKIDPairSnapshotEntry struct { + PKID string `bun:",nullzero"` + BLSPublicKey string `bun:",nullzero"` + SnapshotAtEpochNumber uint64 `bun:",use_zero"` + + BadgerKey []byte `pg:",pk,use_zero"` +} + +type PGBLSPublicKeyPKIDPairSnapshotEntry struct { + bun.BaseModel `bun:"table:bls_public_key_pkid_pair_snapshot_entry"` + BLSPublicKeyPKIDPairSnapshotEntry +} + // Convert the BLSPublicKeyPKIDPairEntry DeSo encoder to the PGBLSPkidPairEntry struct used by bun. func BLSPublicKeyPKIDPairEncoderToPGStruct(blsPublicKeyPKIDPairEntry *lib.BLSPublicKeyPKIDPairEntry, keyBytes []byte, params *lib.DeSoParams) BLSPublicKeyPKIDPairEntry { pgBLSPkidPairEntry := BLSPublicKeyPKIDPairEntry{ @@ -38,6 +51,30 @@ func BLSPublicKeyPKIDPairEncoderToPGStruct(blsPublicKeyPKIDPairEntry *lib.BLSPub return pgBLSPkidPairEntry } +// BLSPublicKeyPKIDPairSnapshotEncoderToPGStruct converts the BLSPublicKeyPKIDPairSnapshotEntry DeSo encoder to the +// PGBLSPublicKeyPKIDPairSnapshotEntry struct used by bun. +func BLSPublicKeyPKIDPairSnapshotEncoderToPGStruct( + blsPublicKeyPKIDPairEntry *lib.BLSPublicKeyPKIDPairEntry, keyBytes []byte, params *lib.DeSoParams, +) BLSPublicKeyPKIDPairSnapshotEntry { + prefixRemovedKeyBytes := keyBytes[1:] + epochNumber := lib.DecodeUint64(prefixRemovedKeyBytes[:8]) + + pgBLSPkidPairSnapshotEntry := BLSPublicKeyPKIDPairSnapshotEntry{ + SnapshotAtEpochNumber: epochNumber, + BadgerKey: keyBytes, + } + + if blsPublicKeyPKIDPairEntry.PKID != nil { + pgBLSPkidPairSnapshotEntry.PKID = consumer.PublicKeyBytesToBase58Check((*blsPublicKeyPKIDPairEntry.PKID)[:], params) + } + + if !blsPublicKeyPKIDPairEntry.BLSPublicKey.IsEmpty() { + pgBLSPkidPairSnapshotEntry.BLSPublicKey = blsPublicKeyPKIDPairEntry.BLSPublicKey.ToString() + } + + return pgBLSPkidPairSnapshotEntry +} + // BLSPublicKeyPKIDPairBatchOperation is the entry point for processing a batch of BLSPublicKeyPKIDPair entries. // It determines the appropriate handler based on the operation type and executes it. func BLSPublicKeyPKIDPairBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { @@ -62,25 +99,52 @@ func bulkInsertBLSPkidPairEntry( ) error { // Track the unique entries we've inserted so we don't insert the same entry twice. uniqueEntries := consumer.UniqueEntries(entries) + uniqueBLSPkidPairEntries := consumer.FilterEntriesByPrefix( + uniqueEntries, lib.Prefixes.PrefixValidatorBLSPublicKeyPKIDPairEntry) + uniqueBLSPkidPairSnapshotEntries := consumer.FilterEntriesByPrefix( + uniqueEntries, lib.Prefixes.PrefixSnapshotValidatorBLSPublicKeyPKIDPairEntry) // Create a new array to hold the bun struct. - pgEntrySlice := make([]*PGBLSPkidPairEntry, len(uniqueEntries)) + pgBLSPkidPairEntrySlice := make([]*PGBLSPkidPairEntry, len(uniqueBLSPkidPairEntries)) + pgBLSPkidPairSnapshotEntrySlice := make([]*PGBLSPublicKeyPKIDPairSnapshotEntry, len(uniqueBLSPkidPairSnapshotEntries)) // Loop through the entries and convert them to PGEntry. - for ii, entry := range uniqueEntries { - pgEntrySlice[ii] = &PGBLSPkidPairEntry{BLSPublicKeyPKIDPairEntry: BLSPublicKeyPKIDPairEncoderToPGStruct( + for ii, entry := range uniqueBLSPkidPairEntries { + pgBLSPkidPairEntrySlice[ii] = &PGBLSPkidPairEntry{BLSPublicKeyPKIDPairEntry: BLSPublicKeyPKIDPairEncoderToPGStruct( entry.Encoder.(*lib.BLSPublicKeyPKIDPairEntry), entry.KeyBytes, params)} } - // Execute the insert query. - query := db.NewInsert().Model(&pgEntrySlice) + for ii, entry := range uniqueBLSPkidPairSnapshotEntries { + pgBLSPkidPairSnapshotEntrySlice[ii] = &PGBLSPublicKeyPKIDPairSnapshotEntry{ + BLSPublicKeyPKIDPairSnapshotEntry: BLSPublicKeyPKIDPairSnapshotEncoderToPGStruct( + entry.Encoder.(*lib.BLSPublicKeyPKIDPairEntry), entry.KeyBytes, params)} + } + + if len(pgBLSPkidPairEntrySlice) > 0 { + // Execute the insert query. + query := db.NewInsert().Model(&pgBLSPkidPairEntrySlice) - if operationType == lib.DbOperationTypeUpsert { - query = query.On("CONFLICT (badger_key) DO UPDATE") + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (badger_key) DO UPDATE") + } + + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertBLSPkidPairEntry: Error inserting entries") + } } - if _, err := query.Returning("").Exec(context.Background()); err != nil { - return errors.Wrapf(err, "entries.bulkInsertBLSPkidPairEntry: Error inserting entries") + if len(pgBLSPkidPairSnapshotEntrySlice) > 0 { + // Execute query for snapshot entries. + query := db.NewInsert().Model(&pgBLSPkidPairSnapshotEntrySlice) + + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (badger_key) DO UPDATE") + } + + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertBLSPkidPairEntry: Error inserting snapshot entries") + } } + return nil } @@ -91,15 +155,28 @@ func bulkDeleteBLSPkidPairEntry(entries []*lib.StateChangeEntry, db *bun.DB, ope // Transform the entries into a list of keys to delete. keysToDelete := consumer.KeysToDelete(uniqueEntries) + blsPKIDPairEntryKeysToDelete := consumer.FilterKeysByPrefix(keysToDelete, + lib.Prefixes.PrefixValidatorBLSPublicKeyPKIDPairEntry) + blsPKIDPairSnapshotEntryKeysToDelete := consumer.FilterKeysByPrefix(keysToDelete, + lib.Prefixes.PrefixSnapshotValidatorBLSPublicKeyPKIDPairEntry) // Execute the delete query. if _, err := db.NewDelete(). Model(&PGBLSPkidPairEntry{}). - Where("badger_key IN (?)", bun.In(keysToDelete)). + Where("badger_key IN (?)", bun.In(blsPKIDPairEntryKeysToDelete)). Returning(""). Exec(context.Background()); err != nil { return errors.Wrapf(err, "entries.bulkDeleteBLSPkidPairEntry: Error deleting entries") } + // Execute the delete query for snapshot entries. + if _, err := db.NewDelete(). + Model(&PGBLSPublicKeyPKIDPairSnapshotEntry{}). + Where("badger_key IN (?)", bun.In(blsPKIDPairSnapshotEntryKeysToDelete)). + Returning(""). + Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkDeleteBLSPkidPairEntry: Error deleting snapshot entries") + } + return nil } diff --git a/entries/pkid.go b/entries/pkid.go index 960e966..7a72e21 100644 --- a/entries/pkid.go +++ b/entries/pkid.go @@ -166,15 +166,18 @@ func bulkInsertPkid(entries []*lib.StateChangeEntry, db *bun.DB, operationType l pgEntrySlice[ii] = &PGLeaderScheduleEntry{LeaderScheduleEntry: *leaderScheduleEntry} } - query := db.NewInsert().Model(&pgEntrySlice) + if len(pgEntrySlice) > 0 { + query := db.NewInsert().Model(&pgEntrySlice) - if operationType == lib.DbOperationTypeUpsert { - query = query.On("CONFLICT (badger_key) DO UPDATE") - } + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (badger_key) DO UPDATE") + } - if _, err := query.Returning("").Exec(context.Background()); err != nil { - return errors.Wrapf(err, "entries.bulkInsertPkid: Error inserting entries") + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertPkid: Error inserting entries") + } } + return nil } @@ -187,13 +190,15 @@ func bulkDeletePkid(entries []*lib.StateChangeEntry, db *bun.DB, operationType l keysToDelete := consumer.KeysToDelete(uniqueEntries) leaderSchedKeysToDelete := consumer.FilterKeysByPrefix(keysToDelete, lib.Prefixes.PrefixSnapshotLeaderSchedule) - // Execute the delete query. - if _, err := db.NewDelete(). - Model(&PGLeaderScheduleEntry{}). - Where("badger_key IN (?)", bun.In(leaderSchedKeysToDelete)). - Returning(""). - Exec(context.Background()); err != nil { - return errors.Wrapf(err, "entries.bulkDeletePkid: Error deleting entries") + if len(leaderSchedKeysToDelete) > 0 { + // Execute the delete query. + if _, err := db.NewDelete(). + Model(&PGLeaderScheduleEntry{}). + Where("badger_key IN (?)", bun.In(leaderSchedKeysToDelete)). + Returning(""). + Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkDeletePkid: Error deleting entries") + } } return nil diff --git a/migrations/initial_migrations/20240260000001_create_bls_public_key_pkid_pair_snapshot_entry_table.go b/migrations/initial_migrations/20240260000001_create_bls_public_key_pkid_pair_snapshot_entry_table.go new file mode 100644 index 0000000..a027de2 --- /dev/null +++ b/migrations/initial_migrations/20240260000001_create_bls_public_key_pkid_pair_snapshot_entry_table.go @@ -0,0 +1,39 @@ +package initial_migrations + +import ( + "context" + "strings" + + "github.com/uptrace/bun" +) + +// TODO: Not nullable fields +func createBLSPublicKeyPKIDPairSnapshotEntryTable(db *bun.DB, tableName string) error { + _, err := db.Exec(strings.Replace(` + CREATE TABLE {tableName} ( + pkid VARCHAR NOT NULL, + bls_public_key VARCHAR NOT NULL, + snapshot_at_epoch_number BIGINT NOT NULL, + + badger_key BYTEA PRIMARY KEY + ); + CREATE INDEX {tableName}_pkid_idx ON {tableName} (pkid); + CREATE INDEX {tableName}_bls_public_key_idx ON {tableName} (bls_public_key); + CREATE INDEX {tableName}_snapshot_at_epoch_number_idx ON {tableName} (snapshot_at_epoch_number); + `, "{tableName}", tableName, -1)) + return err +} + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + return createBLSPublicKeyPKIDPairSnapshotEntryTable(db, "bls_public_key_pkid_pair_snapshot_entry") + }, func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + DROP TABLE IF EXISTS bls_public_key_pkid_pair_snapshot_entry; + `) + if err != nil { + return err + } + return nil + }) +} diff --git a/migrations/post_sync_migrations/20240260000002_create_pos_stat_views_and_fk_comments_for_snapshot_bls.go b/migrations/post_sync_migrations/20240260000002_create_pos_stat_views_and_fk_comments_for_snapshot_bls.go new file mode 100644 index 0000000..8fa7920 --- /dev/null +++ b/migrations/post_sync_migrations/20240260000002_create_pos_stat_views_and_fk_comments_for_snapshot_bls.go @@ -0,0 +1,122 @@ +package post_sync_migrations + +import ( + "context" + + "github.com/uptrace/bun" +) + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + +CREATE OR REPLACE VIEW epoch_details_for_block as +select block_hash, epoch_number, bls.pkid as proposer_pkid +from block + left join epoch_entry + on epoch_entry.initial_block_height <= block.height and + epoch_entry.final_block_height >= block.height + left join bls_public_key_pkid_pair_snapshot_entry bls + on bls.snapshot_at_epoch_number = epoch_entry.snapshot_at_epoch_number and + block.proposer_voting_public_key = bls.bls_public_key; + + comment on view epoch_details_for_block is E'@unique block_hash\n@unique epoch_number\n@foreignKey (block_hash) references block (block_hash)|@foreignFieldName epochDetailForBlock|@fieldName block\n@foreignKey (epoch_number) references epoch_entry (epoch_number)|@foreignFieldName blockHashesInEpoch|@fieldName epochEntry\n@foreignKey (proposer_pkid) references account (pkid)|@foreignFieldName proposedBlockHashes|@fieldName proposer'; + comment on table bls_public_key_pkid_pair_snapshot_entry is E'@foreignKey (pkid) references account (pkid)|@foreignFieldName blsPublicKeyPkidPairSnapshotEntries|@fieldName account\n@foreignKey (snapshot_at_epoch_number) references epoch_entry (snapshot_at_epoch_number)|@foreignFieldName blsPublicKeyPkidPairSnapshotEntries|@fieldName epochEntry'; + comment on column bls_public_key_pkid_pair_snapshot_entry.badger_key is E'@omit'; +`) + if err != nil { + return err + } + if !calculateExplorerStatistics { + return nil + } + _, err = db.Exec(` +CREATE MATERIALIZED VIEW my_stake_summary as +select coalesce(total_stake_rewards.staker_pkid, total_stake_amount.staker_pkid) as staker_pkid, + total_stake_rewards.total_rewards as total_stake_rewards, + total_stake_amount.total_stake as total_stake +from (select staker_pkid, sum(reward_nanos) total_rewards + from stake_reward + group by staker_pkid) total_stake_rewards + full outer join + (select staker_pkid, sum(stake_amount_nanos) total_stake + from stake_entry + group by staker_pkid) total_stake_amount + on total_stake_amount.staker_pkid = total_stake_rewards.staker_pkid; + +CREATE UNIQUE INDEX my_stake_summary_unique_index ON my_stake_summary (staker_pkid); + +CREATE MATERIALIZED VIEW staking_summary as +select * +from (select sum(total_stake_amount_nanos) as global_stake_amount_nanos, + count(distinct validator_pkid) as num_validators + from validator_entry) validator_summary, + (select max(epoch_number) current_epoch_number from epoch_entry) current_epoch, + (select count(distinct snapshot_at_epoch_number) num_epochs_in_leader_schedule + from leader_schedule_entry) num_epochs_in_leader_schedule; + +CREATE UNIQUE INDEX staking_summary_unique_index ON staking_summary (global_stake_amount_nanos, num_validators, current_epoch_number, num_epochs_in_leader_schedule); + +CREATE MATERIALIZED VIEW validator_stats as +select validator_entry.validator_pkid, + rank() OVER ( order by validator_entry.total_stake_amount_nanos) as validator_rank, + validator_entry.total_stake_amount_nanos::float / + staking_summary.global_stake_amount_nanos::float as percent_total_stake, + coalesce(time_in_jail, 0) + + (case + when jailed_at_epoch_number = 0 then 0 + else (staking_summary.current_epoch_number - jailed_at_epoch_number) END) epochs_in_jail, + coalesce(leader_schedule_summary.num_epochs_in_leader_schedule, 0), + coalesce(leader_schedule_summary.num_epochs_in_leader_schedule, 0)::float / + staking_summary.num_epochs_in_leader_schedule::float as percent_epochs_in_leader_schedule, + coalesce(total_rewards, 0) as total_stake_reward_nanos +from staking_summary, + validator_entry + left join (select validator_pkid, sum(jhe.unjailed_at_epoch_number - jhe.jailed_at_epoch_number) time_in_jail + from jailed_history_event jhe + group by validator_pkid) jhe + on jhe.validator_pkid = validator_entry.validator_pkid + left join (select validator_pkid, count(*) as num_epochs_in_leader_schedule + from leader_schedule_entry + group by validator_pkid) leader_schedule_summary + on leader_schedule_summary.validator_pkid = validator_entry.validator_pkid + left join (select validator_pkid, sum(reward_nanos) as total_rewards + from stake_reward + group by validator_pkid) as total_stake_rewards + on total_stake_rewards.validator_pkid = validator_entry.validator_pkid; + +CREATE UNIQUE INDEX validator_stats_unique_index ON validator_stats (validator_pkid); + + comment on materialized view validator_stats is E'@unique validator_pkid\n@foreignKey (validator_pkid) references validator_entry (validator_pkid)|@foreignFieldName validatorStats|@fieldName validatorEntry'; + comment on materialized view my_stake_summary is E'@foreignKey (staker_pkid) references account (pkid)|@foreignFieldName myStakeSummary|@fieldName staker'; + +`) + if err != nil { + return err + } + + return nil + }, func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + comment on column bls_public_key_pkid_pair_snapshot_entry.badger_key is NULL; + comment on table bls_public_key_pkid_pair_snapshot_entry is NULL; + DROP VIEW IF EXISTS epoch_details_for_block CASCADE; +`) + if err != nil { + return err + } + if !calculateExplorerStatistics { + return nil + } + _, err = db.Exec(` + DROP MATERIALIZED VIEW IF EXISTS validator_stats CASCADE; + DROP MATERIALIZED VIEW IF EXISTS staking_summary CASCADE; + DROP MATERIALIZED VIEW IF EXISTS my_stake_summary CASCADE; + `) + if err != nil { + return err + } + + return nil + }) +} diff --git a/migrations/post_sync_migrations/migration_helpers.go b/migrations/post_sync_migrations/migration_helpers.go index 5c1fe4e..35f7d54 100644 --- a/migrations/post_sync_migrations/migration_helpers.go +++ b/migrations/post_sync_migrations/migration_helpers.go @@ -73,6 +73,9 @@ var ( {Query: "REFRESH MATERIALIZED VIEW CONCURRENTLY statistic_profile_deso_token_buy_orders", Ticker: time.NewTicker(30 * time.Minute)}, {Query: "REFRESH MATERIALIZED VIEW CONCURRENTLY statistic_profile_deso_token_sell_orders", Ticker: time.NewTicker(30 * time.Minute)}, {Query: "REFRESH MATERIALIZED VIEW CONCURRENTLY statistic_profile_earnings_breakdown_counts", Ticker: time.NewTicker(30 * time.Minute)}, + {Query: "REFRESH MATERIALIZED VIEW CONCURRENTLY staking_summary", Ticker: time.NewTicker(1 * time.Second)}, + {Query: "REFRESH MATERIALIZED VIEW CONCURRENTLY my_stake_summary", Ticker: time.NewTicker(1 * time.Second)}, + {Query: "REFRESH MATERIALIZED VIEW CONCURRENTLY validator_stats", Ticker: time.NewTicker(1 * time.Second)}, } )