-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
92 additions
and
2,811 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,169 +1,33 @@ | ||
package transform | ||
|
||
import ( | ||
"encoding/base64" | ||
"encoding/json" | ||
"fmt" | ||
|
||
"github.com/stellar/stellar-etl/internal/toid" | ||
"github.com/stellar/stellar-etl/internal/utils" | ||
"github.com/stellar/stellar-etl/internal/xdr2json" | ||
|
||
"github.com/stellar/go/ingest" | ||
"github.com/stellar/go/strkey" | ||
diagnosticevent "github.com/stellar/go/ingest/diagnostic_event" | ||
"github.com/stellar/go/ingest/ledger" | ||
"github.com/stellar/go/xdr" | ||
) | ||
|
||
// TransformContractEvent converts a transaction's contract events and diagnostic events into a form suitable for BigQuery. | ||
// It is known that contract events are a subset of the diagnostic events XDR definition. We are opting to call all of these events | ||
// contract events for better clarity to data analytics users. | ||
func TransformContractEvent(transaction ingest.LedgerTransaction, lhe xdr.LedgerHeaderHistoryEntry) ([]ContractEventOutput, error) { | ||
ledgerHeader := lhe.Header | ||
outputTransactionHash := utils.HashToHexString(transaction.Result.TransactionHash) | ||
outputLedgerSequence := uint32(ledgerHeader.LedgerSeq) | ||
|
||
transactionIndex := uint32(transaction.Index) | ||
|
||
outputTransactionID := toid.New(int32(outputLedgerSequence), int32(transactionIndex), 0).ToInt64() | ||
|
||
outputCloseTime, err := utils.TimePointToUTCTimeStamp(ledgerHeader.ScpValue.CloseTime) | ||
if err != nil { | ||
return []ContractEventOutput{}, fmt.Errorf("for ledger %d; transaction %d (transaction id=%d): %v", outputLedgerSequence, transactionIndex, outputTransactionID, err) | ||
} | ||
|
||
// GetDiagnosticEvents will return all contract events and diagnostic events emitted | ||
contractEvents, err := transaction.GetDiagnosticEvents() | ||
if err != nil { | ||
return []ContractEventOutput{}, err | ||
} | ||
|
||
var transformedContractEvents []ContractEventOutput | ||
|
||
for _, contractEvent := range contractEvents { | ||
var outputContractId string | ||
var outputTopicsJson []interface{} | ||
var outputTopicsDecodedJson []interface{} | ||
|
||
outputInSuccessfulContractCall := contractEvent.InSuccessfulContractCall | ||
event := contractEvent.Event | ||
outputType := event.Type | ||
outputTypeString := event.Type.String() | ||
|
||
eventTopics := getEventTopics(event.Body) | ||
outputTopics, outputTopicsDecoded, err := serializeScValArray(eventTopics) | ||
if err != nil { | ||
return []ContractEventOutput{}, err | ||
} | ||
outputTopicsJson = outputTopics | ||
outputTopicsDecodedJson = outputTopicsDecoded | ||
|
||
eventData := getEventData(event.Body) | ||
outputData, outputDataDecoded, err := serializeScVal(eventData) | ||
if err != nil { | ||
return []ContractEventOutput{}, err | ||
} | ||
|
||
// Convert the xdrContactId to string | ||
// TODO: https://stellarorg.atlassian.net/browse/HUBBLE-386 this should be a stellar/go/xdr function | ||
if event.ContractId != nil { | ||
contractId := *event.ContractId | ||
contractIdByte, _ := contractId.MarshalBinary() | ||
outputContractId, _ = strkey.Encode(strkey.VersionByteContract, contractIdByte) | ||
} | ||
|
||
outputContractEventXDR, err := xdr.MarshalBase64(contractEvent) | ||
if err != nil { | ||
return []ContractEventOutput{}, err | ||
} | ||
|
||
outputTransactionID := toid.New(int32(outputLedgerSequence), int32(transactionIndex), 0).ToInt64() | ||
outputSuccessful := transaction.Result.Successful() | ||
|
||
transformedDiagnosticEvent := ContractEventOutput{ | ||
TransactionHash: outputTransactionHash, | ||
TransactionID: outputTransactionID, | ||
Successful: outputSuccessful, | ||
LedgerSequence: outputLedgerSequence, | ||
ClosedAt: outputCloseTime, | ||
InSuccessfulContractCall: outputInSuccessfulContractCall, | ||
ContractId: outputContractId, | ||
Type: int32(outputType), | ||
TypeString: outputTypeString, | ||
Topics: outputTopicsJson, | ||
TopicsDecoded: outputTopicsDecodedJson, | ||
Data: outputData, | ||
DataDecoded: outputDataDecoded, | ||
ContractEventXDR: outputContractEventXDR, | ||
} | ||
|
||
transformedContractEvents = append(transformedContractEvents, transformedDiagnosticEvent) | ||
} | ||
|
||
return transformedContractEvents, nil | ||
} | ||
|
||
// TODO this should be a stellar/go/xdr function | ||
func getEventTopics(eventBody xdr.ContractEventBody) []xdr.ScVal { | ||
switch eventBody.V { | ||
case 0: | ||
contractEventV0 := eventBody.MustV0() | ||
return contractEventV0.Topics | ||
default: | ||
panic("unsupported event body version: " + string(eventBody.V)) | ||
} | ||
} | ||
|
||
// TODO this should be a stellar/go/xdr function | ||
func getEventData(eventBody xdr.ContractEventBody) xdr.ScVal { | ||
switch eventBody.V { | ||
case 0: | ||
contractEventV0 := eventBody.MustV0() | ||
return contractEventV0.Data | ||
default: | ||
panic("unsupported event body version: " + string(eventBody.V)) | ||
} | ||
} | ||
|
||
// TODO this should also be used in the operations processor | ||
func serializeScVal(scVal xdr.ScVal) (interface{}, interface{}, error) { | ||
var serializedData, serializedDataDecoded interface{} | ||
serializedData = "n/a" | ||
serializedDataDecoded = "n/a" | ||
|
||
if _, ok := scVal.ArmForSwitch(int32(scVal.Type)); ok { | ||
var err error | ||
var raw []byte | ||
var jsonMessage json.RawMessage | ||
raw, err = scVal.MarshalBinary() | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
serializedData = base64.StdEncoding.EncodeToString(raw) | ||
jsonMessage, err = xdr2json.ConvertBytes(xdr.ScVal{}, raw) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
serializedDataDecoded = jsonMessage | ||
} | ||
|
||
return serializedData, serializedDataDecoded, nil | ||
} | ||
|
||
// TODO this should also be used in the operations processor | ||
func serializeScValArray(scVals []xdr.ScVal) ([]interface{}, []interface{}, error) { | ||
data := make([]interface{}, 0, len(scVals)) | ||
dataDecoded := make([]interface{}, 0, len(scVals)) | ||
|
||
for _, scVal := range scVals { | ||
serializedData, serializedDataDecoded, err := serializeScVal(scVal) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
data = append(data, serializedData) | ||
dataDecoded = append(dataDecoded, serializedDataDecoded) | ||
func TransformContractEvent(event xdr.DiagnosticEvent, transaction ingest.LedgerTransaction) (ContractEventOutput, error) { | ||
outputTransactionHash, _ := xdr.MarshalBase64(transaction.Hash) | ||
outputContractID, _, _ := diagnosticevent.ContractID(event) | ||
outputTopics, _ := diagnosticevent.Topics(event) | ||
outputData, _ := diagnosticevent.Topics(event) | ||
|
||
transformedDiagnosticEvent := ContractEventOutput{ | ||
TransactionHash: outputTransactionHash, | ||
TransactionID: transaction.ID(), | ||
Successful: transaction.Successful(), | ||
LedgerSequence: ledger.Sequence(transaction.Ledger), | ||
ClosedAt: ledger.ClosedAt(transaction.Ledger), | ||
InSuccessfulContractCall: diagnosticevent.Successful(event), | ||
ContractId: outputContractID, | ||
Type: diagnosticevent.Type(event), | ||
TopicsDecoded: outputTopics, | ||
DataDecoded: outputData, | ||
} | ||
|
||
return data, dataDecoded, nil | ||
return transformedDiagnosticEvent, nil | ||
} |
Oops, something went wrong.