Skip to content

Commit

Permalink
processArchive: set process status to RESULTS when oracle transaction…
Browse files Browse the repository at this point in the history
… is received

Signed-off-by: p4u <pau@dabax.net>
  • Loading branch information
p4u committed Jul 28, 2021
1 parent 771070c commit 2bb4c7f
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 22 deletions.
4 changes: 4 additions & 0 deletions oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,7 @@ func (o *Oracle) OnComputeResults(results *indexertypes.Results, proc *indexerty
}
log.Infof("oracle transaction sent, hash:%x", res.Hash)
}

// OnOracleResults does nothing. Required for implementing the scrutinizer EventListener interface
func (o *Oracle) OnOracleResults(procResults *models.ProcessResult, pid []byte, height uint32) {
}
2 changes: 1 addition & 1 deletion vochain/censusdownloader/censusdownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,6 @@ func (c *CensusDownloader) OnProcessStatusChange(pid []byte,
}

func (c *CensusDownloader) OnProcessResults(pid []byte,
results []*models.QuestionResult, txindex int32) error {
results *models.ProcessResult, txindex int32) error {
return nil
}
2 changes: 1 addition & 1 deletion vochain/keykeeper/keykeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (k *KeyKeeper) OnRevealKeys(pid []byte, priv, rev string, txindex int32) {

// OnProcessResults does nothing
func (k *KeyKeeper) OnProcessResults(pid []byte,
results []*models.QuestionResult, txindex int32) error {
results *models.ProcessResult, txindex int32) error {
// do nothing
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion vochain/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (v *State) SetProcessResults(pid []byte, result *models.ProcessResult, comm
}
// Call event listeners
for _, l := range v.eventListeners {
if err := l.OnProcessResults(process.ProcessId, result.Votes, v.TxCounter()); err != nil {
if err := l.OnProcessResults(process.ProcessId, result, v.TxCounter()); err != nil {
log.Warnf("onProcessResults callback error: %v", err)
}
}
Expand Down
73 changes: 70 additions & 3 deletions vochain/processarchive/processarchive.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.vocdoni.io/dvote/types"
"go.vocdoni.io/dvote/vochain/scrutinizer"
"go.vocdoni.io/dvote/vochain/scrutinizer/indexertypes"
"go.vocdoni.io/proto/build/go/models"
)

type ProcessArchive struct {
Expand Down Expand Up @@ -58,6 +59,21 @@ func (js *jsonStorage) AddProcess(p *Process) error {
return os.WriteFile(filepath.Join(js.datadir, fmt.Sprintf("%x", p.ProcessInfo.ID)), data, 0o644)
}

// GetProcess retreives a process from the js storage
func (js *jsonStorage) GetProcess(pid []byte) (*Process, error) {
if len(pid) != types.ProcessIDsize {
return nil, fmt.Errorf("process not valid")
}
js.lock.Lock()
defer js.lock.Unlock()
data, err := os.ReadFile(filepath.Join(js.datadir, fmt.Sprintf("%x", pid)))
if err != nil {
return nil, err
}
p := &Process{}
return p, json.Unmarshal(data, p)
}

// ProcessExist returns true if a process already existin in the storage
func (js *jsonStorage) ProcessExist(pid []byte) (bool, error) {
js.lock.Lock()
Expand Down Expand Up @@ -157,14 +173,27 @@ func (pa *ProcessArchive) ProcessScan(fromBlock int) error {
return nil
}

// OnComputeResults implements the indexer event callback
// OnComputeResults implements the indexer event callback.
// On this event the results are set always and the process info only if it
// does not exist yet in the json storage.
func (pa *ProcessArchive) OnComputeResults(results *indexertypes.Results,
proc *indexertypes.Process, height uint32) {
if err := pa.storage.AddProcess(&Process{ProcessInfo: proc, Results: results}); err != nil {
// Get the process (if exist)
jsProc, err := pa.storage.GetProcess(results.ProcessID)
if err != nil {
if os.IsNotExist(err) { // if it does not exist yet, we create it
jsProc = &Process{ProcessInfo: proc, Results: results}
} else {
log.Errorf("cannot get json store process: %v", err)
return
}
}
jsProc.Results = results
if err := pa.storage.AddProcess(jsProc); err != nil {
log.Errorf("cannot add json process: %v", err)
return
}
log.Infof("stored json process %x", proc.ID)
log.Infof("stored json process %x for compute results event", proc.ID)

// send publish signal
log.Debugf("sending archive publish signal for height %d", height)
Expand All @@ -174,6 +203,44 @@ func (pa *ProcessArchive) OnComputeResults(results *indexertypes.Results,
}
}

// OnOracleResults implements the indexer event callback.
// On this event the process status is set to Results.
func (pa *ProcessArchive) OnOracleResults(oracleResults *models.ProcessResult, pid []byte, height uint32) {
jsProc, err := pa.storage.GetProcess(pid)
if err != nil {
if os.IsNotExist(err) { // if it does not exist yet, we create it
proc, err := pa.indexer.ProcessInfo(pid)
if err != nil {
log.Errorf("cannot get process info %x from indexer: %v", pid, err)
return
}
jsProc = &Process{ProcessInfo: proc, Results: nil}
} else {
log.Errorf("cannot get json store process: %v", err)
return
}
}
// Ensure the status is set to RESULTS since OnOracleResults event is called on setProcessResultsTx
jsProc.ProcessInfo.Status = int32(models.ProcessStatus_RESULTS)
jsProc.ProcessInfo.FinalResults = true
// TODO: add signatures from oracles
//jsProc.Results.Signatures = append(jsProc.results.Signatures, oracleResults.Signature)

// Store the process
if err := pa.storage.AddProcess(jsProc); err != nil {
log.Errorf("cannot add json process: %v", err)
return
}
log.Infof("stored json process %x for oracle results transaction event", pid)

// Send publish signal
log.Debugf("sending archive publish signal for height %d", height)
select {
case pa.publish <- true:
default: // do nothing
}
}

// Close closes the process archive
func (pa *ProcessArchive) Close() {
pa.close <- true
Expand Down
44 changes: 30 additions & 14 deletions vochain/scrutinizer/scrutinizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ const (
// events of the tally of a process.
type EventListener interface {
OnComputeResults(results *indexertypes.Results, process *indexertypes.Process, height uint32)
OnOracleResults(oracleResults *models.ProcessResult, pid []byte, height uint32)
}

// AddEventListener adds a new event listener, to receive method calls on block
// events as documented in EventListener.
func (s *Scrutinizer) AddEventListener(l EventListener) {
s.eventListeners = append(s.eventListeners, l)
s.eventOnResults = append(s.eventOnResults, l)
}

// Scrutinizer is the component which makes the accounting of the voting processes
Expand All @@ -60,8 +61,8 @@ type Scrutinizer struct {
newTxPool []*indexertypes.TxReference
// list of live processes (those on which the votes will be computed on arrival)
liveResultsProcs sync.Map
// eventListeners is the list of external callbacks that will be executed by the scrutinizer
eventListeners []EventListener
// eventOnResults is the list of external callbacks that will be executed by the scrutinizer
eventOnResults []EventListener
db *badgerhold.Store
// envelopeHeightCache and countTotalEnvelopes are in memory counters that helps reducing the
// access time when GenEnvelopeHeight() is called.
Expand Down Expand Up @@ -481,9 +482,14 @@ func (s *Scrutinizer) OnRevealKeys(pid []byte, priv, reveal string, txIndex int3
s.updateProcessPool = append(s.updateProcessPool, pid)
}

// OnProcessResults verifies the results for a process and appends it to the updateProcessPool
func (s *Scrutinizer) OnProcessResults(pid []byte, results []*models.QuestionResult,
// OnProcessResults verifies the results for a process and appends it to the updateProcessPool
func (s *Scrutinizer) OnProcessResults(pid []byte, results *models.ProcessResult,
txIndex int32) error {
// Execute callbacks
for _, l := range s.eventOnResults {
go l.OnOracleResults(results, pid, s.App.Height())
}

// We don't execute any action if the blockchain is being syncronized
if s.App.IsSynchronizing() {
return nil
Expand All @@ -498,6 +504,10 @@ func (s *Scrutinizer) OnProcessResults(pid []byte, results []*models.QuestionRes
// This code must be run async in order to not delay the consensus. The results retreival
// could require some time.
go func() {
if results == nil || results.Votes == nil {
log.Errorf("results are nil")
return
}
var myResults *indexertypes.Results
var err error
retries := 50
Expand All @@ -519,24 +529,30 @@ func (s *Scrutinizer) OnProcessResults(pid []byte, results []*models.QuestionRes
return
}

myVotes := BuildProcessResult(myResults, nil).GetVotes()
if len(myVotes) != len(results) {
log.Errorf("results validation failed: wrong result questions size")
return
}
for i, q := range results {
myVotes := BuildProcessResult(myResults, results.EntityId).GetVotes()
correct := len(myVotes) != len(results.Votes)
for i, q := range results.GetVotes() {
if !correct {
break
}
if len(q.Question) != len(myVotes[i].Question) {
log.Errorf("results validation failed: wrong question size")
return
correct = false
break
}
for j, v := range q.Question {
if !bytes.Equal(v, myVotes[i].Question[j]) {
log.Errorf("results validation failed: wrong question result")
return
correct = false
break
}
}
}
log.Infof("published results for process %x are correct!", pid)
if correct {
log.Infof("published results for process %x are correct", pid)
} else {
log.Warnf("published results for process %x are not correct", pid)
}
}()
s.updateProcessPool = append(s.updateProcessPool, pid)
return nil
Expand Down
2 changes: 1 addition & 1 deletion vochain/scrutinizer/vote.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (s *Scrutinizer) ComputeResult(processID []byte) error {
}

// Execute callbacks
for _, l := range s.eventListeners {
for _, l := range s.eventOnResults {
go l.OnComputeResults(results, p, height)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion vochain/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type EventListener interface {
OnCancel(pid []byte, txIndex int32)
OnProcessKeys(pid []byte, encryptionPub, commitment string, txIndex int32)
OnRevealKeys(pid []byte, encryptionPriv, reveal string, txIndex int32)
OnProcessResults(pid []byte, results []*models.QuestionResult, txIndex int32) error
OnProcessResults(pid []byte, results *models.ProcessResult, txIndex int32) error
Commit(height uint32) (err error)
Rollback()
}
Expand Down

0 comments on commit 2bb4c7f

Please sign in to comment.