Skip to content

Commit e77355c

Browse files
author
Lazy Nina
committed
Add StakeEntry, LockedStakeEntry, and ValidatorEntry. Update Dockerfile for pos
1 parent 15bf118 commit e77355c

7 files changed

Lines changed: 504 additions & 0 deletions

entries/locked_stake.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package entries
2+
3+
import (
4+
"context"
5+
"github.com/deso-protocol/core/lib"
6+
"github.com/deso-protocol/state-consumer/consumer"
7+
"github.com/pkg/errors"
8+
"github.com/uptrace/bun"
9+
"github.com/uptrace/bun/extra/bunbig"
10+
)
11+
12+
// TODO: when to use nullzero vs use_zero?
13+
type LockedStakeEntry struct {
14+
StakerPKID string `bun:",nullzero"`
15+
ValidatorPKID string `bun:",nullzero"`
16+
LockedAmountNanos *bunbig.Int `pg:",use_zero"`
17+
LockedAtEpochNumber uint64
18+
19+
ExtraData map[string]string `bun:"type:jsonb"`
20+
BadgerKey []byte `pg:",pk,use_zero"`
21+
}
22+
23+
type PGLockedStakeEntry struct {
24+
bun.BaseModel `bun:"table:locked_stake_entry"`
25+
LockedStakeEntry
26+
}
27+
28+
// TODO: Do I need this?
29+
type PGLockedStakeEntryUtxoOps struct {
30+
bun.BaseModel `bun:"table:locked_stake_entry_utxo_ops"`
31+
LockedStakeEntry
32+
UtxoOperation
33+
}
34+
35+
// Convert the LockedStakeEntry DeSo encoder to the PGLockedStakeEntry struct used by bun.
36+
func LockedStakeEncoderToPGStruct(lockedStakeEntry *lib.LockedStakeEntry, keyBytes []byte, params *lib.DeSoParams) LockedStakeEntry {
37+
pgLockedStakeEntry := LockedStakeEntry{
38+
ExtraData: consumer.ExtraDataBytesToString(lockedStakeEntry.ExtraData),
39+
BadgerKey: keyBytes,
40+
}
41+
42+
if lockedStakeEntry.StakerPKID != nil {
43+
pgLockedStakeEntry.StakerPKID = consumer.PublicKeyBytesToBase58Check((*lockedStakeEntry.StakerPKID)[:], params)
44+
}
45+
46+
if lockedStakeEntry.ValidatorPKID != nil {
47+
pgLockedStakeEntry.ValidatorPKID = consumer.PublicKeyBytesToBase58Check((*lockedStakeEntry.ValidatorPKID)[:], params)
48+
}
49+
50+
pgLockedStakeEntry.LockedAtEpochNumber = lockedStakeEntry.LockedAtEpochNumber
51+
pgLockedStakeEntry.LockedAmountNanos = bunbig.FromMathBig(lockedStakeEntry.LockedAmountNanos.ToBig())
52+
53+
return pgLockedStakeEntry
54+
}
55+
56+
// LockedStakeBatchOperation is the entry point for processing a batch of LockedStake entries.
57+
// It determines the appropriate handler based on the operation type and executes it.
58+
func LockedStakeBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
59+
// We check before we call this function that there is at least one operation type.
60+
// We also ensure before this that all entries have the same operation type.
61+
operationType := entries[0].OperationType
62+
var err error
63+
if operationType == lib.DbOperationTypeDelete {
64+
err = bulkDeleteLockedStakeEntry(entries, db, operationType)
65+
} else {
66+
err = bulkInsertLockedStakeEntry(entries, db, operationType, params)
67+
}
68+
if err != nil {
69+
return errors.Wrapf(err, "entries.LockedStakeBatchOperation: Problem with operation type %v", operationType)
70+
}
71+
return nil
72+
}
73+
74+
// bulkInsertLockedStakeEntry inserts a batch of locked stake entries into the database.
75+
func bulkInsertLockedStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
76+
// Track the unique entries we've inserted so we don't insert the same entry twice.
77+
uniqueEntries := consumer.UniqueEntries(entries)
78+
// Create a new array to hold the bun struct.
79+
pgEntrySlice := make([]*PGLockedStakeEntry, len(uniqueEntries))
80+
81+
// Loop through the entries and convert them to PGEntry.
82+
for ii, entry := range uniqueEntries {
83+
pgEntrySlice[ii] = &PGLockedStakeEntry{LockedStakeEntry: LockedStakeEncoderToPGStruct(entry.Encoder.(*lib.LockedStakeEntry), entry.KeyBytes, params)}
84+
}
85+
86+
// Execute the insert query.
87+
query := db.NewInsert().Model(&pgEntrySlice)
88+
89+
if operationType == lib.DbOperationTypeUpsert {
90+
query = query.On("CONFLICT (badger_key) DO UPDATE")
91+
}
92+
93+
if _, err := query.Returning("").Exec(context.Background()); err != nil {
94+
return errors.Wrapf(err, "entries.bulkInsertLockedStakeEntry: Error inserting entries")
95+
}
96+
return nil
97+
}
98+
99+
// bulkDeleteLockedStakeEntry deletes a batch of locked stake entries from the database.
100+
func bulkDeleteLockedStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
101+
// Track the unique entries we've inserted so we don't insert the same entry twice.
102+
uniqueEntries := consumer.UniqueEntries(entries)
103+
104+
// Transform the entries into a list of keys to delete.
105+
keysToDelete := consumer.KeysToDelete(uniqueEntries)
106+
107+
// Execute the delete query.
108+
if _, err := db.NewDelete().
109+
Model(&PGLockedStakeEntry{}).
110+
Where("badger_key IN (?)", bun.In(keysToDelete)).
111+
Returning("").
112+
Exec(context.Background()); err != nil {
113+
return errors.Wrapf(err, "entries.bulkDeleteLockedStakeEntry: Error deleting entries")
114+
}
115+
116+
return nil
117+
}

entries/stake.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package entries
2+
3+
import (
4+
"context"
5+
"github.com/deso-protocol/core/lib"
6+
"github.com/deso-protocol/state-consumer/consumer"
7+
"github.com/pkg/errors"
8+
"github.com/uptrace/bun"
9+
"github.com/uptrace/bun/extra/bunbig"
10+
)
11+
12+
// TODO: when to use nullzero vs use_zero?
13+
type StakeEntry struct {
14+
StakerPKID string `bun:",nullzero"`
15+
ValidatorPKID string `bun:",nullzero"`
16+
RewardMethod lib.StakingRewardMethod // TODO: we probably want this to be human readable?
17+
StakeAmountNanos *bunbig.Int `pg:",use_zero"`
18+
19+
ExtraData map[string]string `bun:"type:jsonb"`
20+
BadgerKey []byte `pg:",pk,use_zero"`
21+
}
22+
23+
type PGStakeEntry struct {
24+
bun.BaseModel `bun:"table:stake_entry"`
25+
StakeEntry
26+
}
27+
28+
// TODO: Do I need this?
29+
type PGStakeEntryUtxoOps struct {
30+
bun.BaseModel `bun:"table:stake_entry_utxo_ops"`
31+
StakeEntry
32+
UtxoOperation
33+
}
34+
35+
// Convert the StakeEntry DeSo encoder to the PGStakeEntry struct used by bun.
36+
func StakeEncoderToPGStruct(stakeEntry *lib.StakeEntry, keyBytes []byte, params *lib.DeSoParams) StakeEntry {
37+
pgStakeEntry := StakeEntry{
38+
ExtraData: consumer.ExtraDataBytesToString(stakeEntry.ExtraData),
39+
BadgerKey: keyBytes,
40+
}
41+
42+
if stakeEntry.StakerPKID != nil {
43+
pgStakeEntry.StakerPKID = consumer.PublicKeyBytesToBase58Check((*stakeEntry.StakerPKID)[:], params)
44+
}
45+
46+
if stakeEntry.ValidatorPKID != nil {
47+
pgStakeEntry.ValidatorPKID = consumer.PublicKeyBytesToBase58Check((*stakeEntry.ValidatorPKID)[:], params)
48+
}
49+
50+
pgStakeEntry.RewardMethod = stakeEntry.RewardMethod
51+
pgStakeEntry.StakeAmountNanos = bunbig.FromMathBig(stakeEntry.StakeAmountNanos.ToBig())
52+
53+
return pgStakeEntry
54+
}
55+
56+
// StakeBatchOperation is the entry point for processing a batch of Stake entries.
57+
// It determines the appropriate handler based on the operation type and executes it.
58+
func StakeBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
59+
// We check before we call this function that there is at least one operation type.
60+
// We also ensure before this that all entries have the same operation type.
61+
operationType := entries[0].OperationType
62+
var err error
63+
if operationType == lib.DbOperationTypeDelete {
64+
err = bulkDeleteStakeEntry(entries, db, operationType)
65+
} else {
66+
err = bulkInsertStakeEntry(entries, db, operationType, params)
67+
}
68+
if err != nil {
69+
return errors.Wrapf(err, "entries.StakeBatchOperation: Problem with operation type %v", operationType)
70+
}
71+
return nil
72+
}
73+
74+
// bulkInsertStakeEntry inserts a batch of stake entries into the database.
75+
func bulkInsertStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
76+
// Track the unique entries we've inserted so we don't insert the same entry twice.
77+
uniqueEntries := consumer.UniqueEntries(entries)
78+
// Create a new array to hold the bun struct.
79+
pgEntrySlice := make([]*PGStakeEntry, len(uniqueEntries))
80+
81+
// Loop through the entries and convert them to PGEntry.
82+
for ii, entry := range uniqueEntries {
83+
pgEntrySlice[ii] = &PGStakeEntry{StakeEntry: StakeEncoderToPGStruct(entry.Encoder.(*lib.StakeEntry), entry.KeyBytes, params)}
84+
}
85+
86+
// Execute the insert query.
87+
query := db.NewInsert().Model(&pgEntrySlice)
88+
89+
if operationType == lib.DbOperationTypeUpsert {
90+
query = query.On("CONFLICT (badger_key) DO UPDATE")
91+
}
92+
93+
if _, err := query.Returning("").Exec(context.Background()); err != nil {
94+
return errors.Wrapf(err, "entries.bulkInsertStakeEntry: Error inserting entries")
95+
}
96+
return nil
97+
}
98+
99+
// bulkDeleteStakeEntry deletes a batch of stake entries from the database.
100+
func bulkDeleteStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
101+
// Track the unique entries we've inserted so we don't insert the same entry twice.
102+
uniqueEntries := consumer.UniqueEntries(entries)
103+
104+
// Transform the entries into a list of keys to delete.
105+
keysToDelete := consumer.KeysToDelete(uniqueEntries)
106+
107+
// Execute the delete query.
108+
if _, err := db.NewDelete().
109+
Model(&PGStakeEntry{}).
110+
Where("badger_key IN (?)", bun.In(keysToDelete)).
111+
Returning("").
112+
Exec(context.Background()); err != nil {
113+
return errors.Wrapf(err, "entries.bulkDeleteStakeEntry: Error deleting entries")
114+
}
115+
116+
return nil
117+
}

entries/validator.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package entries
2+
3+
import (
4+
"context"
5+
"github.com/deso-protocol/core/lib"
6+
"github.com/deso-protocol/state-consumer/consumer"
7+
"github.com/pkg/errors"
8+
"github.com/uptrace/bun"
9+
"github.com/uptrace/bun/extra/bunbig"
10+
)
11+
12+
// TODO: when to use nullzero vs use_zero?
13+
type ValidatorEntry struct {
14+
ValidatorPKID string `bun:",nullzero"`
15+
Domains []string `bun:",nullzero"`
16+
DisableDelegatedStake bool
17+
DelegatedStakeCommissionBasisPoints uint64
18+
VotingPublicKey string `bun:",nullzero"`
19+
VotingAuthorization string `bun:",nullzero"`
20+
// Use bunbig.Int to store the balance as a numeric in the pg database.
21+
TotalStakeAmountNanos *bunbig.Int `pg:",use_zero"`
22+
LastActiveAtEpochNumber uint64
23+
JailedAtEpochNumber uint64
24+
25+
ExtraData map[string]string `bun:"type:jsonb"`
26+
BadgerKey []byte `pg:",pk,use_zero"`
27+
}
28+
29+
type PGValidatorEntry struct {
30+
bun.BaseModel `bun:"table:validator_entry"`
31+
ValidatorEntry
32+
}
33+
34+
// TODO: Do I need this?
35+
type PGValidatorEntryUtxoOps struct {
36+
bun.BaseModel `bun:"table:validator_entry_utxo_ops"`
37+
ValidatorEntry
38+
UtxoOperation
39+
}
40+
41+
// Convert the ValidatorEntry DeSo encoder to the PGValidatorEntry struct used by bun.
42+
func ValidatorEncoderToPGStruct(validatorEntry *lib.ValidatorEntry, keyBytes []byte, params *lib.DeSoParams) ValidatorEntry {
43+
pgValidatorEntry := ValidatorEntry{
44+
ExtraData: consumer.ExtraDataBytesToString(validatorEntry.ExtraData),
45+
BadgerKey: keyBytes,
46+
}
47+
48+
if validatorEntry.ValidatorPKID != nil {
49+
pgValidatorEntry.ValidatorPKID = consumer.PublicKeyBytesToBase58Check((*validatorEntry.ValidatorPKID)[:], params)
50+
}
51+
52+
if validatorEntry.Domains != nil {
53+
pgValidatorEntry.Domains = make([]string, len(validatorEntry.Domains))
54+
for ii, domain := range validatorEntry.Domains {
55+
pgValidatorEntry.Domains[ii] = string(domain)
56+
}
57+
}
58+
59+
pgValidatorEntry.DisableDelegatedStake = validatorEntry.DisableDelegatedStake
60+
pgValidatorEntry.DelegatedStakeCommissionBasisPoints = validatorEntry.DelegatedStakeCommissionBasisPoints
61+
62+
if validatorEntry.VotingPublicKey != nil {
63+
pgValidatorEntry.VotingPublicKey = validatorEntry.VotingPublicKey.ToString()
64+
}
65+
66+
if validatorEntry.VotingAuthorization != nil {
67+
pgValidatorEntry.VotingAuthorization = validatorEntry.VotingAuthorization.ToString()
68+
}
69+
70+
pgValidatorEntry.TotalStakeAmountNanos = bunbig.FromMathBig(validatorEntry.TotalStakeAmountNanos.ToBig())
71+
pgValidatorEntry.LastActiveAtEpochNumber = validatorEntry.LastActiveAtEpochNumber
72+
pgValidatorEntry.JailedAtEpochNumber = validatorEntry.JailedAtEpochNumber
73+
74+
return pgValidatorEntry
75+
}
76+
77+
// ValidatorBatchOperation is the entry point for processing a batch of Validator entries.
78+
// It determines the appropriate handler based on the operation type and executes it.
79+
func ValidatorBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
80+
// We check before we call this function that there is at least one operation type.
81+
// We also ensure before this that all entries have the same operation type.
82+
operationType := entries[0].OperationType
83+
var err error
84+
if operationType == lib.DbOperationTypeDelete {
85+
err = bulkDeleteValidatorEntry(entries, db, operationType)
86+
} else {
87+
err = bulkInsertValidatorEntry(entries, db, operationType, params)
88+
}
89+
if err != nil {
90+
return errors.Wrapf(err, "entries.ValidatorBatchOperation: Problem with operation type %v", operationType)
91+
}
92+
return nil
93+
}
94+
95+
// bulkInsertValidatorEntry inserts a batch of validator entries into the database.
96+
func bulkInsertValidatorEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
97+
// Track the unique entries we've inserted so we don't insert the same entry twice.
98+
uniqueEntries := consumer.UniqueEntries(entries)
99+
// Create a new array to hold the bun struct.
100+
pgEntrySlice := make([]*PGValidatorEntry, len(uniqueEntries))
101+
102+
// Loop through the entries and convert them to PGEntry.
103+
for ii, entry := range uniqueEntries {
104+
pgEntrySlice[ii] = &PGValidatorEntry{ValidatorEntry: ValidatorEncoderToPGStruct(entry.Encoder.(*lib.ValidatorEntry), entry.KeyBytes, params)}
105+
}
106+
107+
// Execute the insert query.
108+
query := db.NewInsert().Model(&pgEntrySlice)
109+
110+
if operationType == lib.DbOperationTypeUpsert {
111+
query = query.On("CONFLICT (badger_key) DO UPDATE")
112+
}
113+
114+
if _, err := query.Returning("").Exec(context.Background()); err != nil {
115+
return errors.Wrapf(err, "entries.bulkInsertValidatorEntry: Error inserting entries")
116+
}
117+
return nil
118+
}
119+
120+
// bulkDeleteValidatorEntry deletes a batch of validator entries from the database.
121+
func bulkDeleteValidatorEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
122+
// Track the unique entries we've inserted so we don't insert the same entry twice.
123+
uniqueEntries := consumer.UniqueEntries(entries)
124+
125+
// Transform the entries into a list of keys to delete.
126+
keysToDelete := consumer.KeysToDelete(uniqueEntries)
127+
128+
// Execute the delete query.
129+
if _, err := db.NewDelete().
130+
Model(&PGValidatorEntry{}).
131+
Where("badger_key IN (?)", bun.In(keysToDelete)).
132+
Returning("").
133+
Exec(context.Background()); err != nil {
134+
return errors.Wrapf(err, "entries.bulkDeleteValidatorEntry: Error deleting entries")
135+
}
136+
137+
return nil
138+
}

handler/data_handler.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ func (postgresDataHandler *PostgresDataHandler) HandleEntryBatch(batchedEntries
7474
err = entries.BlockBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
7575
case lib.EncoderTypeTxn:
7676
err = entries.TransactionBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
77+
case lib.EncoderTypeStakeEntry:
78+
err = entries.StakeBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
79+
case lib.EncoderTypeValidatorEntry:
80+
err = entries.ValidatorBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
81+
case lib.EncoderTypeLockedStakeEntry:
82+
err = entries.LockedStakeBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params)
7783
}
7884

7985
if err != nil {

0 commit comments

Comments
 (0)