Skip to content

Commit

Permalink
support pythtest-conformance and pythtest-crosschain (#50)
Browse files Browse the repository at this point in the history
* support pythtest-conformance and pythtest-crosschain

* fix test

* address comments

* fix test
  • Loading branch information
cctdaniel authored Jan 4, 2024
1 parent a7fb15e commit fdc3432
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 106 deletions.
8 changes: 8 additions & 0 deletions pythclient/solana.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
TESTNET_ENDPOINT = "api.testnet.solana.com"
MAINNET_ENDPOINT = "api.mainnet-beta.solana.com"
PYTHNET_ENDPOINT = "pythnet.rpcpool.com"
PYTHTEST_CROSSCHAIN_ENDPOINT = "api.pythtest.pyth.network"
PYTHTEST_CONFORMANCE_ENDPOINT = "api.pythtest.pyth.network"

SOLANA_DEVNET_WS_ENDPOINT = WS_PREFIX + "://" + DEVNET_ENDPOINT
SOLANA_DEVNET_HTTP_ENDPOINT = HTTP_PREFIX + "://" + DEVNET_ENDPOINT
Expand All @@ -33,6 +35,12 @@
PYTHNET_WS_ENDPOINT = WS_PREFIX + "://" + PYTHNET_ENDPOINT
PYTHNET_HTTP_ENDPOINT = HTTP_PREFIX + "://" + PYTHNET_ENDPOINT

PYTHTEST_CROSSCHAIN_WS_ENDPOINT = WS_PREFIX + "://" + PYTHTEST_CROSSCHAIN_ENDPOINT
PYTHTEST_CROSSCHAIN_HTTP_ENDPOINT = HTTP_PREFIX + "://" + PYTHTEST_CROSSCHAIN_ENDPOINT

PYTHTEST_CONFORMANCE_WS_ENDPOINT = WS_PREFIX + "://" + PYTHTEST_CONFORMANCE_ENDPOINT
PYTHTEST_CONFORMANCE_HTTP_ENDPOINT = HTTP_PREFIX + "://" + PYTHTEST_CONFORMANCE_ENDPOINT

class SolanaPublicKey:
"""
Represents a Solana public key. This class is meant to be immutable.
Expand Down
64 changes: 34 additions & 30 deletions pythclient/utils.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,40 @@
import ast
import dns.resolver
from loguru import logger
from typing import Optional

DEFAULT_VERSION = "v2"


# Retrieving keys via DNS TXT records should not be considered secure and is provided as a convenience only.
# Accounts should be stored locally and verified before being used for production.
def get_key(network: str, type: str, version: str = DEFAULT_VERSION) -> Optional[str]:
def get_key(network: str, type: str) -> Optional[str]:
"""
Get the program or mapping keys from dns TXT records.
Example dns records:
devnet-program-v2.pyth.network
mainnet-program-v2.pyth.network
testnet-mapping-v2.pyth.network
pythnet-mapping-v2.pyth.network
Get the program or mapping key.
:param network: The network to get the key for. Either "mainnet", "devnet", "testnet", "pythnet", "pythtest-conformance", or "pythtest-crosschain".
:param type: The type of key to get. Either "program" or "mapping".
"""
url = f"{network}-{type}-{version}.pyth.network"
keys = {
"pythnet": {
"program": "FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH",
"mapping": "AHtgzX45WTKfkPG53L6WYhGEXwQkN1BVknET3sVsLL8J",
},
"mainnet": {
"program": "FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH",
"mapping": "AHtgzX45WTKfkPG53L6WYhGEXwQkN1BVknET3sVsLL8J",
},
"devnet": {
"program": "gSbePebfvPy7tRqimPoVecS2UsBvYv46ynrzWocc92s",
"mapping": "BmA9Z6FjioHJPpjT39QazZyhDRUdZy2ezwx4GiDdE2u2",
},
"testnet": {
"program": "8tfDNiaEyrV6Q1U4DEXrEigs9DoDtkugzFbybENEbCDz",
"mapping": "AFmdnt9ng1uVxqCmqwQJDAYC5cKTkw8gJKSM5PnzuF6z",
},
"pythtest-conformance": {
"program": "8tfDNiaEyrV6Q1U4DEXrEigs9DoDtkugzFbybENEbCDz",
"mapping": "AFmdnt9ng1uVxqCmqwQJDAYC5cKTkw8gJKSM5PnzuF6z",
},
"pythtest-crosschain": {
"program": "gSbePebfvPy7tRqimPoVecS2UsBvYv46ynrzWocc92s",
"mapping": "BmA9Z6FjioHJPpjT39QazZyhDRUdZy2ezwx4GiDdE2u2",
},
}

try:
answer = dns.resolver.resolve(url, "TXT")
except dns.resolver.NXDOMAIN:
logger.error("TXT record for {} not found", url)
return ""
if len(answer) != 1:
logger.error("Invalid number of records returned for {}!", url)
return ""
# Example of the raw_key:
# "program=FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH"
raw_key = ast.literal_eval(list(answer)[0].to_text())
# program=FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH"
_, key = raw_key.split("=", 1)
return key
return keys[network][type]
except KeyError:
raise Exception(f"Unknown network or type: {network}, {type}")
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from setuptools import setup

requirements = ['aiodns', 'aiohttp>=3.7.4', 'backoff', 'base58', 'dnspython', 'flake8', 'loguru', 'typing-extensions', 'pytz', 'pycryptodome']
requirements = ['aiodns', 'aiohttp>=3.7.4', 'backoff', 'base58', 'flake8', 'loguru', 'typing-extensions', 'pytz', 'pycryptodome']

with open('README.md', 'r', encoding='utf-8') as fh:
long_description = fh.read()

setup(
name='pythclient',
version='0.1.19',
version='0.1.20',
packages=['pythclient'],
author='Pyth Developers',
author_email='contact@pyth.network',
Expand Down
84 changes: 10 additions & 74 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,89 +1,25 @@
from _pytest.logging import LogCaptureFixture
import pytest

from pytest_mock import MockerFixture

from mock import Mock

import dns.resolver
import dns.rdatatype
import dns.rdataclass
import dns.message
import dns.rrset
import dns.flags

from pythclient.utils import get_key


@pytest.fixture()
def answer_program() -> dns.resolver.Answer:
qname = dns.name.Name(labels=(b'devnet-program-v2', b'pyth', b'network', b''))
rdtype = dns.rdatatype.TXT
rdclass = dns.rdataclass.IN
response = dns.message.QueryMessage(id=0)
response.flags = dns.flags.QR
rrset_qn = dns.rrset.from_text(qname, 100, rdclass, rdtype)
rrset_ans = dns.rrset.from_text(qname, 100, rdclass, rdtype, '"program=gSbePebfvPy7tRqimPoVecS2UsBvYv46ynrzWocc92s"')
response.question = [rrset_qn]
response.answer = [rrset_ans]
answer = dns.resolver.Answer(
qname=qname, rdtype=rdtype, rdclass=rdclass, response=response)
answer.rrset = rrset_ans
return answer


@pytest.fixture()
def answer_mapping() -> dns.resolver.Answer:
qname = dns.name.Name(labels=(b'devnet-mapping-v2', b'pyth', b'network', b''))
rdtype = dns.rdatatype.TXT
rdclass = dns.rdataclass.IN
response = dns.message.QueryMessage(id=0)
response.flags = dns.flags.QR
rrset_qn = dns.rrset.from_text(qname, 100, rdclass, rdtype)
rrset_ans = dns.rrset.from_text(qname, 100, rdclass, rdtype, '"mapping=BmA9Z6FjioHJPpjT39QazZyhDRUdZy2ezwx4GiDdE2u2"')
response.question = [rrset_qn]
response.answer = [rrset_ans]
answer = dns.resolver.Answer(
qname=qname, rdtype=rdtype, rdclass=rdclass, response=response)
answer.rrset = rrset_ans
return answer


@pytest.fixture()
def mock_dns_resolver_resolve(mocker: MockerFixture) -> Mock:
mock = Mock()
mocker.patch('dns.resolver.resolve', side_effect=mock)
return mock


def test_utils_get_program_key(mock_dns_resolver_resolve: Mock, answer_program: dns.resolver.Answer) -> None:
mock_dns_resolver_resolve.return_value = answer_program
def test_utils_get_program_key() -> None:
program_key = get_key("devnet", "program")
assert program_key == "gSbePebfvPy7tRqimPoVecS2UsBvYv46ynrzWocc92s"


def test_utils_get_mapping_key(mock_dns_resolver_resolve: Mock, answer_mapping: dns.resolver.Answer) -> None:
mock_dns_resolver_resolve.return_value = answer_mapping
def test_utils_get_mapping_key() -> None:
mapping_key = get_key("devnet", "mapping")
assert mapping_key == "BmA9Z6FjioHJPpjT39QazZyhDRUdZy2ezwx4GiDdE2u2"


def test_utils_get_mapping_key_not_found(mock_dns_resolver_resolve: Mock,
answer_mapping: dns.resolver.Answer,
caplog: LogCaptureFixture) -> None:
mock_dns_resolver_resolve.side_effect = dns.resolver.NXDOMAIN
exc_message = f'TXT record for {str(answer_mapping.response.canonical_name())[:-1]} not found'
key = get_key("devnet", "mapping")
assert exc_message in caplog.text
assert key == ""
def test_utils_invalid_network() -> None:
with pytest.raises(Exception) as e:
get_key("testdevnet", "mapping")
assert str(e.value) == "Unknown network or type: testdevnet, mapping"


def test_utils_get_mapping_key_invalid_number(mock_dns_resolver_resolve: Mock,
answer_mapping: dns.resolver.Answer,
caplog: LogCaptureFixture) -> None:
answer_mapping.rrset = None
mock_dns_resolver_resolve.return_value = answer_mapping
exc_message = f'Invalid number of records returned for {str(answer_mapping.response.canonical_name())[:-1]}!'
key = get_key("devnet", "mapping")
assert exc_message in caplog.text
assert key == ""
def test_utils_get_invalid_type() -> None:
with pytest.raises(Exception) as e:
get_key("devnet", "mappingprogram")
assert str(e.value) == "Unknown network or type: devnet, mappingprogram"

0 comments on commit fdc3432

Please sign in to comment.