Skip to content

Commit

Permalink
[CHIA-1102] catchup: long_lived/vault from main @ 4955930 (#18448)
Browse files Browse the repository at this point in the history
  • Loading branch information
Quexington authored Aug 14, 2024
2 parents 3bf382d + a6df3ea commit 12f4e77
Show file tree
Hide file tree
Showing 15 changed files with 231 additions and 187 deletions.
18 changes: 18 additions & 0 deletions chia/_tests/clvm/test_program.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,28 @@ def test_run() -> None:
ret = div.run([10, -5])
assert ret.atom == bytes([0xFE])

# run()
with pytest.raises(ValueError, match="div operator with negative operands is deprecated"):
cost, ret = div.run_with_cost(100000, [10, -5], 0)

cost, ret = div.run_with_cost(100000, [10, -5], ENABLE_FIXED_DIV)
assert cost == 1107
print(ret)
assert ret.atom == bytes([0xFE])

# run_with_flags()
with pytest.raises(ValueError, match="div operator with negative operands is deprecated"):
cost, ret = div.run_with_flags(100000, 0, [10, -5])

cost, ret = div.run_with_flags(100000, ENABLE_FIXED_DIV, [10, -5])
assert cost == 1107
print(ret)
assert ret.atom == bytes([0xFE])

# run_with_cost()
with pytest.raises(ValueError, match="div operator with negative operands is deprecated"):
ret = div.run([10, -5], 100000, 0)

ret = div.run([10, -5], 100000, ENABLE_FIXED_DIV)
print(ret)
assert ret.atom == bytes([0xFE])
2 changes: 1 addition & 1 deletion chia/_tests/clvm/test_puzzle_drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_puzzle_info() -> None:
assert puzzle_info == PuzzleInfo(capitalize_bytes)

obj: Union[PuzzleInfo, Solver]
for obj in (puzzle_info, solver): # type: ignore
for obj in (puzzle_info, solver):
assert obj["string"] == "hello"
assert obj["bytes"] == bytes.fromhex("cafef00d")
assert obj["int"] == 123
Expand Down
102 changes: 101 additions & 1 deletion chia/_tests/core/data_layer/test_data_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import copy
import enum
import json
import logging
import os
import random
import sqlite3
Expand All @@ -14,7 +15,7 @@
from dataclasses import dataclass
from enum import IntEnum
from pathlib import Path
from typing import Any, AsyncIterator, Dict, List, Optional, Tuple, cast
from typing import Any, AsyncIterator, Dict, List, Optional, Set, Tuple, cast

import anyio
import pytest
Expand Down Expand Up @@ -2268,6 +2269,16 @@ async def test_maximum_full_file_count(
assert not full_file_path.exists()


@pytest.mark.limit_consensus_modes(reason="does not depend on consensus rules")
@pytest.mark.anyio
async def test_unsubscribe_unknown(
bare_data_layer_api: DataLayerRpcApi,
seeded_random: random.Random,
) -> None:
with pytest.raises(RuntimeError, match="No subscription found for the given store_id."):
await bare_data_layer_api.unsubscribe(request={"id": bytes32.random(seeded_random).hex(), "retain": False})


@pytest.mark.parametrize("retain", [True, False])
@boolean_datacases(name="group_files_by_store", false="group by singleton", true="don't group by singleton")
@pytest.mark.limit_consensus_modes(reason="does not depend on consensus rules")
Expand Down Expand Up @@ -2298,6 +2309,8 @@ async def test_unsubscribe_removes_files(
store_id = bytes32.from_hexstr(res["id"])
await farm_block_check_singleton(data_layer, full_node_api, ph, store_id, wallet=wallet_rpc_api.service)

# subscribe to ourselves
await data_rpc_api.subscribe(request={"id": store_id.hex()})
update_count = 10
for batch_count in range(update_count):
key = batch_count.to_bytes(2, "big")
Expand Down Expand Up @@ -3763,3 +3776,90 @@ class ModifiedStatus(IntEnum):
await farm_block_with_spend(full_node_api, ph, update_tx_rec1, wallet_rpc_api)
keys = await data_rpc_api.get_keys({"id": store_id.hex()})
assert keys == {"keys": ["0x30303031", "0x30303030"]}


@pytest.mark.limit_consensus_modes(reason="does not depend on consensus rules")
@boolean_datacases(name="auto_subscribe_to_local_stores", false="do not auto subscribe", true="auto subscribe")
@pytest.mark.anyio
async def test_auto_subscribe_to_local_stores(
self_hostname: str,
one_wallet_and_one_simulator_services: SimulatorsAndWalletsServices,
tmp_path: Path,
monkeypatch: Any,
auto_subscribe_to_local_stores: bool,
) -> None:
wallet_rpc_api, full_node_api, wallet_rpc_port, ph, bt = await init_wallet_and_node(
self_hostname, one_wallet_and_one_simulator_services
)
manage_data_interval = 5
fake_store = bytes32([1] * 32)

async def mock_get_store_ids(self: Any) -> Set[bytes32]:
return {fake_store}

async def mock_dl_track_new(self: Any, request: Dict[str, Any]) -> Dict[str, Any]:
# ignore and just return empty response
return {}

with monkeypatch.context() as m:
m.setattr("chia.data_layer.data_store.DataStore.get_store_ids", mock_get_store_ids)
m.setattr("chia.rpc.wallet_rpc_client.WalletRpcClient.dl_track_new", mock_dl_track_new)

config = bt.config
config["data_layer"]["auto_subscribe_to_local_stores"] = auto_subscribe_to_local_stores
bt.change_config(new_config=config)

async with init_data_layer(
wallet_rpc_port=wallet_rpc_port,
bt=bt,
db_path=tmp_path,
manage_data_interval=manage_data_interval,
maximum_full_file_count=100,
) as data_layer:
data_rpc_api = DataLayerRpcApi(data_layer)

await asyncio.sleep(manage_data_interval)

response = await data_rpc_api.subscriptions(request={})

if auto_subscribe_to_local_stores:
assert fake_store.hex() in response["store_ids"]
else:
assert fake_store.hex() not in response["store_ids"]


@pytest.mark.limit_consensus_modes(reason="does not depend on consensus rules")
@pytest.mark.anyio
async def test_local_store_exception(
self_hostname: str,
one_wallet_and_one_simulator_services: SimulatorsAndWalletsServices,
tmp_path: Path,
monkeypatch: Any,
caplog: pytest.LogCaptureFixture,
) -> None:
wallet_rpc_api, full_node_api, wallet_rpc_port, ph, bt = await init_wallet_and_node(
self_hostname, one_wallet_and_one_simulator_services
)
manage_data_interval = 5
fake_store = bytes32([1] * 32)

async def mock_get_store_ids(self: Any) -> Set[bytes32]:
return {fake_store}

with monkeypatch.context() as m, caplog.at_level(logging.INFO):
m.setattr("chia.data_layer.data_store.DataStore.get_store_ids", mock_get_store_ids)

config = bt.config
config["data_layer"]["auto_subscribe_to_local_stores"] = True
bt.change_config(new_config=config)

async with init_data_layer(
wallet_rpc_port=wallet_rpc_port,
bt=bt,
db_path=tmp_path,
manage_data_interval=manage_data_interval,
maximum_full_file_count=100,
):
await asyncio.sleep(manage_data_interval)

assert f"Can't subscribe to local store {fake_store.hex()}:" in caplog.text
2 changes: 1 addition & 1 deletion chia/_tests/core/data_layer/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async def general_insert(
store_id: bytes32,
key: bytes,
value: bytes,
reference_node_hash: bytes32,
reference_node_hash: Optional[bytes32],
side: Optional[Side],
) -> bytes32:
insert_result = await data_store.insert(
Expand Down
2 changes: 1 addition & 1 deletion chia/_tests/wallet/cat_wallet/test_cat_wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1234,7 +1234,7 @@ async def test_cat_melt_balance(wallet_environments: WalletTestFramework) -> Non
assert isinstance(cat_wallet, CATWallet)

# Let's test that continuing to melt this CAT results in the correct balance changes
async with wallet.wallet_state_manager.new_action_scope(push=False) as action_scope:
async with wallet.wallet_state_manager.new_action_scope(wallet_environments.tx_config, push=False) as action_scope:
for _ in range(0, 5):
tx_amount -= 1
new_coin = (await cat_wallet.get_cat_spendable_coins())[0].coin
Expand Down
2 changes: 1 addition & 1 deletion chia/cmds/plotnft_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ async def join_pool(
func = functools.partial(
wallet_client.pw_join_pool,
wallet_id,
hexstr_to_bytes(json_dict["target_puzzle_hash"]),
bytes32(hexstr_to_bytes(json_dict["target_puzzle_hash"])),
pool_url,
json_dict["relative_lock_height"],
fee,
Expand Down
42 changes: 35 additions & 7 deletions chia/data_layer/data_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,10 @@ async def remove_subscriptions(self, store_id: bytes32, urls: List[str]) -> None

async def unsubscribe(self, store_id: bytes32, retain_data: bool) -> None:
async with self.subscription_lock:
subscriptions = await self.data_store.get_subscriptions()
if store_id not in (subscription.store_id for subscription in subscriptions):
raise RuntimeError("No subscription found for the given store_id.")

# Unsubscribe is processed later, after all fetching of data is done, to avoid races.
self.unsubscribe_data_queue.append(UnsubscribeData(store_id, retain_data))

Expand Down Expand Up @@ -896,22 +900,46 @@ async def periodically_manage_data(self) -> None:
await asyncio.sleep(0.1)

while not self._shut_down:
# Add existing subscriptions
async with self.subscription_lock:
subscriptions = await self.data_store.get_subscriptions()

# Subscribe to all local store_ids that we can find on chain.
local_store_ids = await self.data_store.get_store_ids()
# pseudo-subscribe to all unsubscribed owned stores
# Need this to make sure we process updates and generate DAT files
try:
owned_stores = await self.get_owned_stores()
except ValueError:
# Sometimes the DL wallet isn't available, so we can't get the owned stores.
# We'll try again next time.
owned_stores = []
subscription_store_ids = {subscription.store_id for subscription in subscriptions}
for local_id in local_store_ids:
if local_id not in subscription_store_ids:
for record in owned_stores:
store_id = record.launcher_id
if store_id not in subscription_store_ids:
try:
subscription = await self.subscribe(local_id, [])
subscriptions.insert(0, subscription)
# don't actually subscribe, just add to the list
subscriptions.insert(0, Subscription(store_id=store_id, servers_info=[]))
except Exception as e:
self.log.info(
f"Can't subscribe to locally stored {local_id}: {type(e)} {e} {traceback.format_exc()}"
f"Can't subscribe to owned store {store_id}: {type(e)} {e} {traceback.format_exc()}"
)

# Optionally
# Subscribe to all local non-owned store_ids that we can find on chain.
# This is the prior behavior where all local stores, both owned and not owned, are subscribed to.
if self.config.get("auto_subscribe_to_local_stores", False):
local_store_ids = await self.data_store.get_store_ids()
subscription_store_ids = {subscription.store_id for subscription in subscriptions}
for local_id in local_store_ids:
if local_id not in subscription_store_ids:
try:
subscription = await self.subscribe(local_id, [])
subscriptions.insert(0, subscription)
except Exception as e:
self.log.info(
f"Can't subscribe to local store {local_id}: {type(e)} {e} {traceback.format_exc()}"
)

work_queue: asyncio.Queue[Job[Subscription]] = asyncio.Queue()
async with QueuedAsyncPool.managed(
name="DataLayer subscription update pool",
Expand Down
96 changes: 2 additions & 94 deletions chia/data_layer/data_layer_wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import dataclasses
import logging
import time
from operator import attrgetter
from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Optional, Set, Tuple, cast

from chia_rs import G1Element, G2Element
Expand All @@ -21,7 +20,7 @@
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.coin_spend import CoinSpend, compute_additions
from chia.types.condition_opcodes import ConditionOpcode
from chia.types.spend_bundle import SpendBundle, estimate_fees
from chia.types.spend_bundle import SpendBundle
from chia.util.ints import uint8, uint32, uint64, uint128
from chia.util.streamable import Streamable, streamable
from chia.wallet.conditions import (
Expand Down Expand Up @@ -58,7 +57,7 @@
from chia.wallet.util.compute_memos import compute_memos
from chia.wallet.util.merkle_utils import _simplify_merkle_proof
from chia.wallet.util.transaction_type import TransactionType
from chia.wallet.util.tx_config import CoinSelectionConfig, TXConfigLoader
from chia.wallet.util.tx_config import CoinSelectionConfig
from chia.wallet.util.wallet_sync_utils import fetch_coin_spend, fetch_coin_spend_for_coin_state
from chia.wallet.util.wallet_types import WalletType
from chia.wallet.wallet_action_scope import WalletActionScope
Expand Down Expand Up @@ -900,100 +899,9 @@ async def singleton_removed(self, parent_spend: CoinSpend, height: uint32) -> No
await self.wallet_state_manager.add_interested_coin_ids(
[new_singleton.name()],
)
await self.potentially_handle_resubmit(singleton_record.launcher_id)
elif parent_spend.coin.puzzle_hash == create_mirror_puzzle().get_tree_hash():
await self.wallet_state_manager.dl_store.delete_mirror(parent_name)

# This function, though in use, is currently untested because it never runs due to other design choices
async def potentially_handle_resubmit(self, launcher_id: bytes32) -> None: # pragma: no cover
"""
This method is meant to detect a fork in our expected pending singletons and the singletons that have actually
been confirmed on chain. If there is a fork and the root on chain never changed, we will attempt to rebase our
singletons on to the new latest singleton. If there is a fork and the root changed, we assume that everything
has failed and delete any pending state.
"""
unconfirmed_singletons = await self.wallet_state_manager.dl_store.get_unconfirmed_singletons(launcher_id)
if len(unconfirmed_singletons) == 0:
return
unconfirmed_singletons = sorted(unconfirmed_singletons, key=attrgetter("generation"))
full_branch: List[SingletonRecord] = await self.wallet_state_manager.dl_store.get_all_singletons_for_launcher(
launcher_id,
min_generation=unconfirmed_singletons[0].generation,
)
if len(unconfirmed_singletons) == len(full_branch) and set(unconfirmed_singletons) == set(full_branch):
return

# Now we have detected a fork so we should check whether the root changed at all
self.log.info("Attempting automatic rebase")
parent_name = unconfirmed_singletons[0].lineage_proof.parent_name
assert parent_name is not None
parent_singleton = await self.wallet_state_manager.dl_store.get_singleton_record(parent_name)
if parent_singleton is None or any(parent_singleton.root != s.root for s in full_branch if s.confirmed):
root_changed: bool = True
else:
root_changed = False

# Regardless of whether the root changed or not, our old state is bad so let's eliminate it
# First let's find all of our txs matching our unconfirmed singletons
relevant_dl_txs: List[TransactionRecord] = []
for singleton in unconfirmed_singletons:
parent_name = singleton.lineage_proof.parent_name
if parent_name is None:
continue

tx = await self.wallet_state_manager.tx_store.get_transaction_record(parent_name)
if tx is not None:
relevant_dl_txs.append(tx)
# Let's check our standard wallet for fee transactions related to these dl txs
all_spends: List[SpendBundle] = [tx.spend_bundle for tx in relevant_dl_txs if tx.spend_bundle is not None]
all_removal_ids: Set[bytes32] = {removal.name() for sb in all_spends for removal in sb.removals()}
unconfirmed_std_txs: List[TransactionRecord] = (
await self.wallet_state_manager.tx_store.get_unconfirmed_for_wallet(self.standard_wallet.id())
)
relevant_std_txs: List[TransactionRecord] = [
tx for tx in unconfirmed_std_txs if any(c.name() in all_removal_ids for c in tx.removals)
]
# Delete all of the relevant transactions
for tx in [*relevant_dl_txs, *relevant_std_txs]:
await self.wallet_state_manager.tx_store.delete_transaction_record(tx.name)
# Delete all of the unconfirmed singleton records
for singleton in unconfirmed_singletons:
await self.wallet_state_manager.dl_store.delete_singleton_record(singleton.coin_id)

if not root_changed:
# The root never changed so let's attempt a rebase
try:
assert self.wallet_state_manager.wallet_node.logged_in_fingerprint is not None
async with self.wallet_state_manager.new_action_scope(
TXConfigLoader().autofill(
constants=self.wallet_state_manager.constants,
config=self.wallet_state_manager.config,
logged_in_fingerprint=(self.wallet_state_manager.wallet_node.logged_in_fingerprint),
),
push=True,
) as action_scope:
for singleton in unconfirmed_singletons:
for tx in relevant_dl_txs:
if any(c.name() == singleton.coin_id for c in tx.additions):
if tx.spend_bundle is not None:
# This executes the puzzles
fee = uint64(estimate_fees(tx.spend_bundle))
else:
fee = uint64(0)

assert self.wallet_state_manager.wallet_node.logged_in_fingerprint is not None
await self.create_update_state_spend(
launcher_id,
singleton.root,
action_scope=action_scope,
fee=fee,
)
except Exception as e:
self.log.warning(f"Something went wrong during attempted DL resubmit: {str(e)}")
# Something went wrong so let's delete anything pending that was created
for singleton in unconfirmed_singletons:
await self.wallet_state_manager.dl_store.delete_singleton_record(singleton.coin_id)

async def stop_tracking_singleton(self, launcher_id: bytes32) -> None:
await self.wallet_state_manager.dl_store.delete_singleton_records_by_launcher_id(launcher_id)
await self.wallet_state_manager.dl_store.delete_launcher(launcher_id)
Expand Down
Loading

0 comments on commit 12f4e77

Please sign in to comment.