Skip to content

Commit

Permalink
Add tests for Solana spl swap
Browse files Browse the repository at this point in the history
  • Loading branch information
fbeutin-ledger committed Jan 23, 2025
1 parent 50e2c39 commit dfe98c1
Show file tree
Hide file tree
Showing 98 changed files with 493 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/reusable_swap_functional_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ on:

branch_for_solana:
required: false
default: 'develop'
default: 'fbe/revamp_spl_send_and_swap'
type: string
branch_for_solana_nanos:
required: false
Expand Down
4 changes: 4 additions & 0 deletions test/python/apps/cal.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from .bitcoin import BTC_PACKED_DERIVATION_PATH, BTC_CONF
from .stellar import XLM_PACKED_DERIVATION_PATH, XLM_CONF
from .solana_utils import SOL_PACKED_DERIVATION_PATH, SOL_CONF
from .solana_utils import JUP_PACKED_DERIVATION_PATH, JUP_CONF
from .solana_utils import SOL_USDC_PACKED_DERIVATION_PATH, SOL_USDC_CONF
from .xrp import XRP_PACKED_DERIVATION_PATH, XRP_CONF
from .tezos import XTZ_PACKED_DERIVATION_PATH, XTZ_CONF
from .polkadot import DOT_PACKED_DERIVATION_PATH, DOT_CONF
Expand Down Expand Up @@ -42,6 +44,8 @@ def get_conf_for_ticker(self, overload_signer: Optional[SigningAuthority]=None)
LTC_CURRENCY_CONFIGURATION = CurrencyConfiguration(ticker="LTC", conf=LTC_CONF, packed_derivation_path=LTC_PACKED_DERIVATION_PATH)
XLM_CURRENCY_CONFIGURATION = CurrencyConfiguration(ticker="XLM", conf=XLM_CONF, packed_derivation_path=XLM_PACKED_DERIVATION_PATH)
SOL_CURRENCY_CONFIGURATION = CurrencyConfiguration(ticker="SOL", conf=SOL_CONF, packed_derivation_path=SOL_PACKED_DERIVATION_PATH)
JUP_CURRENCY_CONFIGURATION = CurrencyConfiguration(ticker="JUP", conf=JUP_CONF, packed_derivation_path=JUP_PACKED_DERIVATION_PATH)
SOL_USDC_CURRENCY_CONFIGURATION = CurrencyConfiguration(ticker="USDC", conf=SOL_USDC_CONF, packed_derivation_path=SOL_USDC_PACKED_DERIVATION_PATH)
XRP_CURRENCY_CONFIGURATION = CurrencyConfiguration(ticker="XRP", conf=XRP_CONF, packed_derivation_path=XRP_PACKED_DERIVATION_PATH)
XTZ_CURRENCY_CONFIGURATION = CurrencyConfiguration(ticker="XTZ", conf=XTZ_CONF, packed_derivation_path=XTZ_PACKED_DERIVATION_PATH)
BNB_CURRENCY_CONFIGURATION = CurrencyConfiguration(ticker="BNB", conf=BSC_CONF, packed_derivation_path=BSC_PACKED_DERIVATION_PATH)
Expand Down
8 changes: 8 additions & 0 deletions test/python/apps/keychain/trusted_name.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-----BEGIN EC PARAMETERS-----
BgUrgQQACg==
-----END EC PARAMETERS-----
-----BEGIN EC PRIVATE KEY-----
MHQCAQEEIHfwyko1dEHTTQ7es7EUy2ajZo1IRRcEC8/9b+MDOzUaoAcGBSuBBAAK
oUQDQgAEuR++wXPjukpxTgFOvIJ7b4man6f0rHac3ihDF6APT2UPCfCapP9aMXYC
Vf5d/IETKbO1C+mRlPyhFhnmXy7f6g==
-----END EC PRIVATE KEY-----
150 changes: 138 additions & 12 deletions test/python/apps/solana.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from typing import List, Generator
from typing import List, Generator, Optional
from enum import IntEnum
from contextlib import contextmanager

from ragger.backend.interface import BackendInterface, RAPDU
from ragger.firmware import Firmware
from ragger.error import ExceptionRAPDU

from .solana_tlv import FieldTag, format_tlv
from .solana_keychain import Key, sign_data

class INS(IntEnum):
# DEPRECATED - Use non "16" suffixed variants below
Expand All @@ -15,6 +19,8 @@ class INS(IntEnum):
INS_GET_PUBKEY = 0x05
INS_SIGN_MESSAGE = 0x06
INS_SIGN_OFFCHAIN_MESSAGE = 0x07
INS_GET_CHALLENGE = 0x20
INS_TRUSTED_INFO = 0x21


CLA = 0xE0
Expand Down Expand Up @@ -69,12 +75,108 @@ def _extend_and_serialize_multiple_derivations_paths(derivations_paths: List[byt
serialized += derivations_path
return serialized

class StatusWord(IntEnum):
OK = 0x9000
ERROR_NO_INFO = 0x6a00
INVALID_DATA = 0x6a80
INSUFFICIENT_MEMORY = 0x6a84
INVALID_INS = 0x6d00
INVALID_P1_P2 = 0x6b00
CONDITION_NOT_SATISFIED = 0x6985
REF_DATA_NOT_FOUND = 0x6a88
EXCEPTION_OVERFLOW = 0x6807
NOT_IMPLEMENTED = 0x911c

class PKIClient:
_CLA: int = 0xB0
_INS: int = 0x06

def __init__(self, client: BackendInterface) -> None:
self._client = client

def send_certificate(self, payload: bytes) -> RAPDU:
response = self.send_raw(payload)

def send_raw(self, payload: bytes) -> RAPDU:
header = bytearray()
header.append(self._CLA)
header.append(self._INS)
header.append(0x04) # PubKeyUsage = 0x04
header.append(0x00)
header.append(len(payload))
return self._client.exchange_raw(header + payload)

class SolanaClient:
client: BackendInterface

def __init__(self, client: BackendInterface):
self._client = client
self._pki_client: Optional[PKIClient] = None
if self._client.firmware != Firmware.NANOS:
# LedgerPKI not supported on Nanos
self._pki_client = PKIClient(self._client)

def _exchange_split(self, cla: int, ins: int, p1: int, payload: bytes) -> RAPDU:
payload_split = [payload[x:x + MAX_CHUNK_SIZE] for x in range(0, len(payload), MAX_CHUNK_SIZE)]
for i, p in enumerate(payload_split):
p2 = P2_NONE
# Send all chunks with P2_MORE except for the last chunk
if i != len(payload_split) - 1:
p2 |= P2_MORE
# Send all chunks with P2_EXTEND except for the first chunk
if i != 0:
p2 |= P2_EXTEND
rapdu = self._client.exchange(CLA, ins=ins, p1=p1, p2=p2, data=p)

return rapdu

def provide_trusted_name(self,
source_contract: bytes,
trusted_name: bytes,
address: bytes,
chain_id: int,
challenge: Optional[int] = None):

payload = format_tlv(FieldTag.TAG_STRUCTURE_TYPE, 3)
payload += format_tlv(FieldTag.TAG_VERSION, 2)
payload += format_tlv(FieldTag.TAG_TRUSTED_NAME_TYPE, 0x06)
payload += format_tlv(FieldTag.TAG_TRUSTED_NAME_SOURCE, 0x06)
payload += format_tlv(FieldTag.TAG_TRUSTED_NAME, trusted_name)
payload += format_tlv(FieldTag.TAG_CHAIN_ID, chain_id)
payload += format_tlv(FieldTag.TAG_ADDRESS, address)
payload += format_tlv(FieldTag.TAG_TRUSTED_NAME_SOURCE_CONTRACT, source_contract)
if challenge is not None:
payload += format_tlv(FieldTag.TAG_CHALLENGE, challenge)
payload += format_tlv(FieldTag.TAG_SIGNER_KEY_ID, 0) # test key
payload += format_tlv(FieldTag.TAG_SIGNER_ALGO, 1) # secp256k1
payload += format_tlv(FieldTag.TAG_DER_SIGNATURE,
sign_data(Key.TRUSTED_NAME, payload))

# send PKI certificate
if self._pki_client is None:
print(f"Ledger-PKI Not supported on '{self._client.firmware.name}'")
else:
# pylint: disable=line-too-long
if self._client.firmware == Firmware.NANOSP:
cert_apdu = "01010102010211040000000212010013020002140101160400000000200C547275737465645F4E616D6530020004310104320121332102B91FBEC173E3BA4A714E014EBC827B6F899A9FA7F4AC769CDE284317A00F4F6534010135010315473045022100D494B106E217B46BB90BF20A4E9285529C4C8382D9B80FF462F74942579785F802202D68D0F85CD7CA36BDF351FD41332F310E93163BD175F6A92446C14A3329CC8B" # noqa: E501
elif self._client.firmware == Firmware.NANOX:
cert_apdu = "01010102010211040000000212010013020002140101160400000000200C547275737465645F4E616D6530020004310104320121332102B91FBEC173E3BA4A714E014EBC827B6F899A9FA7F4AC769CDE284317A00F4F653401013501021546304402207FCD665B94B43A6E838E8CD68BE52403D38A7E6A98E2CE291AB1C5D24A41101D02207AB1863E5CB127D9E8A680AC63FF2F2CBEA79CE76652A72832EF154BF1AD6477" # noqa: E501
elif self._client.firmware == Firmware.STAX:
cert_apdu = "01010102010211040000000212010013020002140101160400000000200C547275737465645F4E616D6530020004310104320121332102B91FBEC173E3BA4A714E014EBC827B6F899A9FA7F4AC769CDE284317A00F4F65340101350104154730450221008F8FB0117C8D51F0D13A77680C18CA98B4B317C3D6C67F23BF9198410BEDF1A1022023B1052CA43E86E2411831990C64B1E027D85E142AD39F480948E3EF9517E55E" # noqa: E501
elif self._client.firmware == Firmware.FLEX:
cert_apdu = "01010102010211040000000212010013020002140101160400000000200C547275737465645F4E616D6530020004310104320121332102B91FBEC173E3BA4A714E014EBC827B6F899A9FA7F4AC769CDE284317A00F4F6534010135010515473045022100CEF28780DCAFA3A485D83406D519F9AC12FD9B9C3AA7AE798896013F07DD178D022020F01B1AB1D2AAEDA70357F615EAC55E17FE94EC36DF9DE850CEFACBC98D16C8" # noqa: E501
# pylint: enable=line-too-long

self._pki_client.send_certificate(bytes.fromhex(cert_apdu))

# send TLV trusted info
# res: RAPDU = self._client.exchange(CLA, INS.INS_TRUSTED_INFO, P1_NON_CONFIRM, P2_NONE, payload)
self._exchange_split(CLA, INS.INS_TRUSTED_INFO, P1_NON_CONFIRM, payload)


def get_challenge(self) -> bytes:
challenge: RAPDU = self._client.exchange(CLA, INS.INS_GET_CHALLENGE,P1_NON_CONFIRM, P2_NONE)
return challenge.data


def get_public_key(self, derivation_path: bytes) -> bytes:
Expand All @@ -85,24 +187,32 @@ def get_public_key(self, derivation_path: bytes) -> bytes:
return public_key.data


def split_and_prefix_message(self, derivation_path : bytes, message: bytes) -> List[bytes]:
@contextmanager
def send_public_key_with_confirm(self, derivation_path: bytes) -> bytes:
with self._client.exchange_async(CLA, INS.INS_GET_PUBKEY,
P1_CONFIRM, P2_NONE,
derivation_path):
yield


def split_and_prefix_message(self, derivation_path: bytes, message: bytes) -> List[bytes]:
assert len(message) <= 65535, "Message to send is too long"
header: bytes = _extend_and_serialize_multiple_derivations_paths([derivation_path])
# Check to see if this data needs to be split up and sent in chunks.
max_size = MAX_CHUNK_SIZE - len(header)
max_size = MAX_CHUNK_SIZE
message_splited = [message[x:x + max_size] for x in range(0, len(message), max_size)]
# Add the header to every chunk
return [header + s for s in message_splited]
# The first chunk is the header, then all chunks with max size
return [header] + message_splited


def send_first_message_batch(self, messages: List[bytes], p1: int) -> RAPDU:
self._client.exchange(CLA, INS.INS_SIGN_MESSAGE, p1, P2_MORE, messages[0])
def send_first_message_batch(self, ins: INS, messages: List[bytes], p1: int) -> RAPDU:
self._client.exchange(CLA, ins, p1, P2_MORE, messages[0])
for m in messages[1:]:
self._client.exchange(CLA, INS.INS_SIGN_MESSAGE, p1, P2_MORE | P2_EXTEND, m)
self._client.exchange(CLA, ins, p1, P2_MORE | P2_EXTEND, m)


@contextmanager
def send_async_sign_message(self,
def send_async_sign_request(self,
ins: INS,
derivation_path : bytes,
message: bytes) -> Generator[None, None, None]:
message_splited_prefixed = self.split_and_prefix_message(derivation_path, message)
Expand All @@ -111,17 +221,33 @@ def send_async_sign_message(self,
# Send all chunks with P2_EXTEND except for the first chunk
if len(message_splited_prefixed) > 1:
final_p2 = P2_EXTEND
self.send_first_message_batch(message_splited_prefixed[:-1], P1_CONFIRM)
self.send_first_message_batch(ins, message_splited_prefixed[:-1], P1_CONFIRM)
else:
final_p2 = 0

with self._client.exchange_async(CLA,
INS.INS_SIGN_MESSAGE,
ins,
P1_CONFIRM,
final_p2,
message_splited_prefixed[-1]):
yield


@contextmanager
def send_async_sign_message(self,
derivation_path : bytes,
message: bytes) -> Generator[None, None, None]:
with self.send_async_sign_request(INS.INS_SIGN_MESSAGE, derivation_path, message):
yield


@contextmanager
def send_async_sign_offchain_message(self,
derivation_path : bytes,
message: bytes) -> Generator[None, None, None]:
with self.send_async_sign_request(INS.INS_SIGN_OFFCHAIN_MESSAGE, derivation_path, message):
yield


def get_async_response(self) -> RAPDU:
return self._client.last_async_response
89 changes: 89 additions & 0 deletions test/python/apps/solana_cmd_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,92 @@ def serialize(self) -> bytes:
for instruction in self.compiled_instructions:
serialized += instruction.serialize()
return serialized

def is_printable_ascii(string: str) -> bool:
try:
string.decode('ascii')
return True
except UnicodeDecodeError:
return False

def is_utf8(string: str) -> bool:
return True


PACKET_DATA_SIZE: int = 1280 - 40 - 8
U16_MAX = 2^16-1


SIGNING_DOMAIN: bytes = b"\xffsolana offchain"
# // Header Length = Signing Domain (16) + Header Version (1)
BASE_HEADER_LEN: int = len(SIGNING_DOMAIN) + 1;

# Header Length = Message Format (1) + Message Length (2)
MESSAGE_HEADER_LEN: int = 3;
# Max length of the OffchainMessage
MAX_LEN: int = U16_MAX - BASE_HEADER_LEN - MESSAGE_HEADER_LEN;
# Max Length of the OffchainMessage supported by the Ledger
MAX_LEN_LEDGER: int = PACKET_DATA_SIZE - BASE_HEADER_LEN - MESSAGE_HEADER_LEN;

class MessageFormat(IntEnum):
RestrictedAscii = 0x00
LimitedUtf8 = 0x01
ExtendedUtf8 = 0x02

class v0_OffchainMessage:
format: MessageFormat
message: bytes

def __init__(self, message: bytes):
# /// Construct a new OffchainMessage object from the given message
if len(message) <= MAX_LEN_LEDGER:
if is_printable_ascii(message):
self.format = MessageFormat.RestrictedAscii
elif is_utf8(message):
self.format = MessageFormat.LimitedUtf8
else:
raise ValueError()
elif len(message) <= MAX_LEN:
if is_utf8(message):
self.format = MessageFormat.ExtendedUtf8
else:
raise ValueError()
else:
raise ValueError()
self.message = message

# Serialize the message to bytes, including the full header
def serialize(self) -> bytes:
# data.reserve(Self::HEADER_LEN.saturating_add(self.message.len()));
data: bytes = b""
# format
data += self.format.to_bytes(1, byteorder='little')
# message length
data += len(self.message).to_bytes(2, byteorder='little')
# message
data += self.message
return data


class OffchainMessage:
version: int
message: v0_OffchainMessage

# Construct a new OffchainMessage object from the given version and message
def __init__(self, version: int, message: bytes):
self.version = version
if version == 0:
self.message = v0_OffchainMessage(message)
else:
raise ValueError()

# Serialize the off-chain message to bytes including full header
def serialize(self) -> bytes:
data: bytes = b""
# serialize signing domain
data += SIGNING_DOMAIN

# serialize version and call version specific serializer
data += self.version.to_bytes(1, byteorder='little')
data += self.message.serialize()
return data
30 changes: 30 additions & 0 deletions test/python/apps/solana_keychain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os
import hashlib
from ecdsa import SigningKey
from ecdsa.util import sigencode_der
from enum import Enum, auto


# Private key PEM files have to be named the same (lowercase) as their corresponding enum entries
# Example: for an entry in the Enum named DEV, its PEM file must be at keychain/dev.pem
class Key(Enum):
TRUSTED_NAME = auto()


_keys: dict[Key, SigningKey] = dict()


# Open the corresponding PEM file and load its key in the global dict
def _init_key(key: Key):
global _keys
with open("%s/keychain/%s.pem" % (os.path.dirname(__file__), key.name.lower())) as pem_file:
_keys[key] = SigningKey.from_pem(pem_file.read(), hashlib.sha256)
assert (key in _keys) and (_keys[key] is not None)


# Generate a SECP256K1 signature of the given data with the given key
def sign_data(key: Key, data: bytes) -> bytes:
global _keys
if key not in _keys:
_init_key(key)
return _keys[key].sign_deterministic(data, sigencode=sigencode_der)
Loading

0 comments on commit dfe98c1

Please sign in to comment.