Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 87 additions & 10 deletions entries/bls_pkid_pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@ type PGBLSPkidPairEntry struct {
BLSPublicKeyPKIDPairEntry
}

type BLSPublicKeyPKIDPairSnapshotEntry struct {
PKID string `bun:",nullzero"`
BLSPublicKey string `bun:",nullzero"`
SnapshotAtEpochNumber uint64 `bun:",use_zero"`

BadgerKey []byte `pg:",pk,use_zero"`
}

type PGBLSPublicKeyPKIDPairSnapshotEntry struct {
bun.BaseModel `bun:"table:bls_public_key_pkid_pair_snapshot_entry"`
BLSPublicKeyPKIDPairSnapshotEntry
}

// Convert the BLSPublicKeyPKIDPairEntry DeSo encoder to the PGBLSPkidPairEntry struct used by bun.
func BLSPublicKeyPKIDPairEncoderToPGStruct(blsPublicKeyPKIDPairEntry *lib.BLSPublicKeyPKIDPairEntry, keyBytes []byte, params *lib.DeSoParams) BLSPublicKeyPKIDPairEntry {
pgBLSPkidPairEntry := BLSPublicKeyPKIDPairEntry{
Expand All @@ -38,6 +51,30 @@ func BLSPublicKeyPKIDPairEncoderToPGStruct(blsPublicKeyPKIDPairEntry *lib.BLSPub
return pgBLSPkidPairEntry
}

// BLSPublicKeyPKIDPairSnapshotEncoderToPGStruct converts the BLSPublicKeyPKIDPairSnapshotEntry DeSo encoder to the
// PGBLSPublicKeyPKIDPairSnapshotEntry struct used by bun.
func BLSPublicKeyPKIDPairSnapshotEncoderToPGStruct(
blsPublicKeyPKIDPairEntry *lib.BLSPublicKeyPKIDPairEntry, keyBytes []byte, params *lib.DeSoParams,
) BLSPublicKeyPKIDPairSnapshotEntry {
prefixRemovedKeyBytes := keyBytes[1:]
epochNumber := lib.DecodeUint64(prefixRemovedKeyBytes[:8])

pgBLSPkidPairSnapshotEntry := BLSPublicKeyPKIDPairSnapshotEntry{
SnapshotAtEpochNumber: epochNumber,
BadgerKey: keyBytes,
}

if blsPublicKeyPKIDPairEntry.PKID != nil {
pgBLSPkidPairSnapshotEntry.PKID = consumer.PublicKeyBytesToBase58Check((*blsPublicKeyPKIDPairEntry.PKID)[:], params)
}

if !blsPublicKeyPKIDPairEntry.BLSPublicKey.IsEmpty() {
pgBLSPkidPairSnapshotEntry.BLSPublicKey = blsPublicKeyPKIDPairEntry.BLSPublicKey.ToString()
}

return pgBLSPkidPairSnapshotEntry
}

// BLSPublicKeyPKIDPairBatchOperation is the entry point for processing a batch of BLSPublicKeyPKIDPair entries.
// It determines the appropriate handler based on the operation type and executes it.
func BLSPublicKeyPKIDPairBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
Expand All @@ -62,25 +99,52 @@ func bulkInsertBLSPkidPairEntry(
) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
uniqueBLSPkidPairEntries := consumer.FilterEntriesByPrefix(
uniqueEntries, lib.Prefixes.PrefixValidatorBLSPublicKeyPKIDPairEntry)
uniqueBLSPkidPairSnapshotEntries := consumer.FilterEntriesByPrefix(
uniqueEntries, lib.Prefixes.PrefixSnapshotValidatorBLSPublicKeyPKIDPairEntry)
// Create a new array to hold the bun struct.
pgEntrySlice := make([]*PGBLSPkidPairEntry, len(uniqueEntries))
pgBLSPkidPairEntrySlice := make([]*PGBLSPkidPairEntry, len(uniqueBLSPkidPairEntries))
pgBLSPkidPairSnapshotEntrySlice := make([]*PGBLSPublicKeyPKIDPairSnapshotEntry, len(uniqueBLSPkidPairSnapshotEntries))

// Loop through the entries and convert them to PGEntry.
for ii, entry := range uniqueEntries {
pgEntrySlice[ii] = &PGBLSPkidPairEntry{BLSPublicKeyPKIDPairEntry: BLSPublicKeyPKIDPairEncoderToPGStruct(
for ii, entry := range uniqueBLSPkidPairEntries {
pgBLSPkidPairEntrySlice[ii] = &PGBLSPkidPairEntry{BLSPublicKeyPKIDPairEntry: BLSPublicKeyPKIDPairEncoderToPGStruct(
entry.Encoder.(*lib.BLSPublicKeyPKIDPairEntry), entry.KeyBytes, params)}
}

// Execute the insert query.
query := db.NewInsert().Model(&pgEntrySlice)
for ii, entry := range uniqueBLSPkidPairSnapshotEntries {
pgBLSPkidPairSnapshotEntrySlice[ii] = &PGBLSPublicKeyPKIDPairSnapshotEntry{
BLSPublicKeyPKIDPairSnapshotEntry: BLSPublicKeyPKIDPairSnapshotEncoderToPGStruct(
entry.Encoder.(*lib.BLSPublicKeyPKIDPairEntry), entry.KeyBytes, params)}
}

if len(pgBLSPkidPairEntrySlice) > 0 {
// Execute the insert query.
query := db.NewInsert().Model(&pgBLSPkidPairEntrySlice)

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
}

if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertBLSPkidPairEntry: Error inserting entries")
}
}

if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertBLSPkidPairEntry: Error inserting entries")
if len(pgBLSPkidPairSnapshotEntrySlice) > 0 {
// Execute query for snapshot entries.
query := db.NewInsert().Model(&pgBLSPkidPairSnapshotEntrySlice)

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
}

if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertBLSPkidPairEntry: Error inserting snapshot entries")
}
}

return nil
}

Expand All @@ -91,15 +155,28 @@ func bulkDeleteBLSPkidPairEntry(entries []*lib.StateChangeEntry, db *bun.DB, ope

// Transform the entries into a list of keys to delete.
keysToDelete := consumer.KeysToDelete(uniqueEntries)
blsPKIDPairEntryKeysToDelete := consumer.FilterKeysByPrefix(keysToDelete,
lib.Prefixes.PrefixValidatorBLSPublicKeyPKIDPairEntry)
blsPKIDPairSnapshotEntryKeysToDelete := consumer.FilterKeysByPrefix(keysToDelete,
lib.Prefixes.PrefixSnapshotValidatorBLSPublicKeyPKIDPairEntry)

// Execute the delete query.
if _, err := db.NewDelete().
Model(&PGBLSPkidPairEntry{}).
Where("badger_key IN (?)", bun.In(keysToDelete)).
Where("badger_key IN (?)", bun.In(blsPKIDPairEntryKeysToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteBLSPkidPairEntry: Error deleting entries")
}

// Execute the delete query for snapshot entries.
if _, err := db.NewDelete().
Model(&PGBLSPublicKeyPKIDPairSnapshotEntry{}).
Where("badger_key IN (?)", bun.In(blsPKIDPairSnapshotEntryKeysToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteBLSPkidPairEntry: Error deleting snapshot entries")
}

return nil
}
31 changes: 18 additions & 13 deletions entries/pkid.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,18 @@ func bulkInsertPkid(entries []*lib.StateChangeEntry, db *bun.DB, operationType l
pgEntrySlice[ii] = &PGLeaderScheduleEntry{LeaderScheduleEntry: *leaderScheduleEntry}
}

query := db.NewInsert().Model(&pgEntrySlice)
if len(pgEntrySlice) > 0 {
query := db.NewInsert().Model(&pgEntrySlice)

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
}
if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
}

if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertPkid: Error inserting entries")
if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertPkid: Error inserting entries")
}
}

return nil
}

Expand All @@ -187,13 +190,15 @@ func bulkDeletePkid(entries []*lib.StateChangeEntry, db *bun.DB, operationType l
keysToDelete := consumer.KeysToDelete(uniqueEntries)
leaderSchedKeysToDelete := consumer.FilterKeysByPrefix(keysToDelete, lib.Prefixes.PrefixSnapshotLeaderSchedule)

// Execute the delete query.
if _, err := db.NewDelete().
Model(&PGLeaderScheduleEntry{}).
Where("badger_key IN (?)", bun.In(leaderSchedKeysToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeletePkid: Error deleting entries")
if len(leaderSchedKeysToDelete) > 0 {
// Execute the delete query.
if _, err := db.NewDelete().
Model(&PGLeaderScheduleEntry{}).
Where("badger_key IN (?)", bun.In(leaderSchedKeysToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeletePkid: Error deleting entries")
}
}

return nil
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package initial_migrations

import (
"context"
"strings"

"github.com/uptrace/bun"
)

// TODO: Not nullable fields
func createBLSPublicKeyPKIDPairSnapshotEntryTable(db *bun.DB, tableName string) error {
_, err := db.Exec(strings.Replace(`
CREATE TABLE {tableName} (
pkid VARCHAR NOT NULL,
bls_public_key VARCHAR NOT NULL,
snapshot_at_epoch_number BIGINT NOT NULL,

badger_key BYTEA PRIMARY KEY
);
CREATE INDEX {tableName}_pkid_idx ON {tableName} (pkid);
CREATE INDEX {tableName}_bls_public_key_idx ON {tableName} (bls_public_key);
CREATE INDEX {tableName}_snapshot_at_epoch_number_idx ON {tableName} (snapshot_at_epoch_number);
`, "{tableName}", tableName, -1))
return err
}

func init() {
Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error {
return createBLSPublicKeyPKIDPairSnapshotEntryTable(db, "bls_public_key_pkid_pair_snapshot_entry")
}, func(ctx context.Context, db *bun.DB) error {
_, err := db.Exec(`
DROP TABLE IF EXISTS bls_public_key_pkid_pair_snapshot_entry;
`)
if err != nil {
return err
}
return nil
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package post_sync_migrations

import (
"context"

"github.com/uptrace/bun"
)

func init() {
Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error {
_, err := db.Exec(`

CREATE OR REPLACE VIEW epoch_details_for_block as
select block_hash, epoch_number, bls.pkid as proposer_pkid
from block
left join epoch_entry
on epoch_entry.initial_block_height <= block.height and
epoch_entry.final_block_height >= block.height
left join bls_public_key_pkid_pair_snapshot_entry bls
on bls.snapshot_at_epoch_number = epoch_entry.snapshot_at_epoch_number and
block.proposer_voting_public_key = bls.bls_public_key;

comment on view epoch_details_for_block is E'@unique block_hash\n@unique epoch_number\n@foreignKey (block_hash) references block (block_hash)|@foreignFieldName epochDetailForBlock|@fieldName block\n@foreignKey (epoch_number) references epoch_entry (epoch_number)|@foreignFieldName blockHashesInEpoch|@fieldName epochEntry\n@foreignKey (proposer_pkid) references account (pkid)|@foreignFieldName proposedBlockHashes|@fieldName proposer';
comment on table bls_public_key_pkid_pair_snapshot_entry is E'@foreignKey (pkid) references account (pkid)|@foreignFieldName blsPublicKeyPkidPairSnapshotEntries|@fieldName account\n@foreignKey (snapshot_at_epoch_number) references epoch_entry (snapshot_at_epoch_number)|@foreignFieldName blsPublicKeyPkidPairSnapshotEntries|@fieldName epochEntry';
comment on column bls_public_key_pkid_pair_snapshot_entry.badger_key is E'@omit';
`)
if err != nil {
return err
}
if !calculateExplorerStatistics {
return nil
}
_, err = db.Exec(`
CREATE MATERIALIZED VIEW my_stake_summary as
select coalesce(total_stake_rewards.staker_pkid, total_stake_amount.staker_pkid) as staker_pkid,
total_stake_rewards.total_rewards as total_stake_rewards,
total_stake_amount.total_stake as total_stake
from (select staker_pkid, sum(reward_nanos) total_rewards
from stake_reward
group by staker_pkid) total_stake_rewards
full outer join
(select staker_pkid, sum(stake_amount_nanos) total_stake
from stake_entry
group by staker_pkid) total_stake_amount
on total_stake_amount.staker_pkid = total_stake_rewards.staker_pkid;

CREATE UNIQUE INDEX my_stake_summary_unique_index ON my_stake_summary (staker_pkid);

CREATE MATERIALIZED VIEW staking_summary as
select *
from (select sum(total_stake_amount_nanos) as global_stake_amount_nanos,
count(distinct validator_pkid) as num_validators
from validator_entry) validator_summary,
(select max(epoch_number) current_epoch_number from epoch_entry) current_epoch,
(select count(distinct snapshot_at_epoch_number) num_epochs_in_leader_schedule
from leader_schedule_entry) num_epochs_in_leader_schedule;

CREATE UNIQUE INDEX staking_summary_unique_index ON staking_summary (global_stake_amount_nanos, num_validators, current_epoch_number, num_epochs_in_leader_schedule);

CREATE MATERIALIZED VIEW validator_stats as
select validator_entry.validator_pkid,
rank() OVER ( order by validator_entry.total_stake_amount_nanos) as validator_rank,
validator_entry.total_stake_amount_nanos::float /
staking_summary.global_stake_amount_nanos::float as percent_total_stake,
coalesce(time_in_jail, 0) +
(case
when jailed_at_epoch_number = 0 then 0
else (staking_summary.current_epoch_number - jailed_at_epoch_number) END) epochs_in_jail,
coalesce(leader_schedule_summary.num_epochs_in_leader_schedule, 0),
coalesce(leader_schedule_summary.num_epochs_in_leader_schedule, 0)::float /
staking_summary.num_epochs_in_leader_schedule::float as percent_epochs_in_leader_schedule,
coalesce(total_rewards, 0) as total_stake_reward_nanos
from staking_summary,
validator_entry
left join (select validator_pkid, sum(jhe.unjailed_at_epoch_number - jhe.jailed_at_epoch_number) time_in_jail
from jailed_history_event jhe
group by validator_pkid) jhe
on jhe.validator_pkid = validator_entry.validator_pkid
left join (select validator_pkid, count(*) as num_epochs_in_leader_schedule
from leader_schedule_entry
group by validator_pkid) leader_schedule_summary
on leader_schedule_summary.validator_pkid = validator_entry.validator_pkid
left join (select validator_pkid, sum(reward_nanos) as total_rewards
from stake_reward
group by validator_pkid) as total_stake_rewards
on total_stake_rewards.validator_pkid = validator_entry.validator_pkid;

CREATE UNIQUE INDEX validator_stats_unique_index ON validator_stats (validator_pkid);

comment on materialized view validator_stats is E'@unique validator_pkid\n@foreignKey (validator_pkid) references validator_entry (validator_pkid)|@foreignFieldName validatorStats|@fieldName validatorEntry';
comment on materialized view my_stake_summary is E'@foreignKey (staker_pkid) references account (pkid)|@foreignFieldName myStakeSummary|@fieldName staker';

`)
if err != nil {
return err
}

return nil
}, func(ctx context.Context, db *bun.DB) error {
_, err := db.Exec(`
comment on column bls_public_key_pkid_pair_snapshot_entry.badger_key is NULL;
comment on table bls_public_key_pkid_pair_snapshot_entry is NULL;
DROP VIEW IF EXISTS epoch_details_for_block CASCADE;
`)
if err != nil {
return err
}
if !calculateExplorerStatistics {
return nil
}
_, err = db.Exec(`
DROP MATERIALIZED VIEW IF EXISTS validator_stats CASCADE;
DROP MATERIALIZED VIEW IF EXISTS staking_summary CASCADE;
DROP MATERIALIZED VIEW IF EXISTS my_stake_summary CASCADE;
`)
if err != nil {
return err
}

return nil
})
}
3 changes: 3 additions & 0 deletions migrations/post_sync_migrations/migration_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ var (
{Query: "REFRESH MATERIALIZED VIEW CONCURRENTLY statistic_profile_deso_token_buy_orders", Ticker: time.NewTicker(30 * time.Minute)},
{Query: "REFRESH MATERIALIZED VIEW CONCURRENTLY statistic_profile_deso_token_sell_orders", Ticker: time.NewTicker(30 * time.Minute)},
{Query: "REFRESH MATERIALIZED VIEW CONCURRENTLY statistic_profile_earnings_breakdown_counts", Ticker: time.NewTicker(30 * time.Minute)},
{Query: "REFRESH MATERIALIZED VIEW CONCURRENTLY staking_summary", Ticker: time.NewTicker(1 * time.Second)},
{Query: "REFRESH MATERIALIZED VIEW CONCURRENTLY my_stake_summary", Ticker: time.NewTicker(1 * time.Second)},
{Query: "REFRESH MATERIALIZED VIEW CONCURRENTLY validator_stats", Ticker: time.NewTicker(1 * time.Second)},
}
)

Expand Down