Source code for microraiden.test.fixtures.web3

import logging
import pytest
import gevent
import types

import rlp
from _pytest.monkeypatch import MonkeyPatch
from eth_utils import decode_hex
from ethereum.transactions import Transaction
import ethereum.tester
from populus.wait import Wait

from web3 import Web3, EthereumTesterProvider
from web3.contract import Contract
from web3.providers.rpc import HTTPProvider

from microraiden.config import NETWORK_CFG
from microraiden.constants import WEB3_PROVIDER_DEFAULT
from microraiden.utils import (
    addr_from_sig,
    keccak256,
)
from microraiden.test.config import (
    FAUCET_ALLOWANCE,
    INITIAL_TOKEN_SUPPLY
)
from microraiden.constants import (
    get_network_id
)
from microraiden.utils.contract import DEFAULT_TIMEOUT, DEFAULT_RETRY_INTERVAL
import microraiden.utils.contract


[docs]@pytest.fixture(scope='session') def mine_sync_event(): return gevent.event.Event()
[docs]@pytest.fixture(scope='session', autouse=True) def disable_requests_loggin(): logging.getLogger('requests').setLevel(logging.WARNING) logging.getLogger('urllib3').setLevel(logging.WARNING)
[docs]def deploy_token_contract(web3, deployer_address, faucet_address, token_abi, token_bytecode): Token = web3.eth.contract(abi=token_abi, bytecode=token_bytecode) txhash = Token.deploy( {'from': deployer_address}, args=[INITIAL_TOKEN_SUPPLY, "Raiden Network Token", "RDN", 18] ) receipt = web3.eth.getTransactionReceipt(txhash) contract_address = receipt.contractAddress token = Token(contract_address) token.transact({'from': deployer_address}).transfer(faucet_address, FAUCET_ALLOWANCE) assert web3.eth.getCode(contract_address) != '0x' return token
[docs]@pytest.fixture(scope='session') def token_address( use_tester, web3, deployer_address, faucet_address, token_abi, token_bytecode, channel_manager_abi ): if use_tester: contract = deploy_token_contract( web3, deployer_address, faucet_address, token_abi, token_bytecode ) return contract.address else: channel_manager = web3.eth.contract( abi=channel_manager_abi, address=NETWORK_CFG.CHANNEL_MANAGER_ADDRESS ) return channel_manager.call().token()
[docs]def deploy_channel_manager_contract( web3, deployer_address, channel_manager_abi, channel_manager_bytecode, token_address ): ChannelManager = web3.eth.contract(abi=channel_manager_abi, bytecode=channel_manager_bytecode) txhash = ChannelManager.deploy({'from': deployer_address}, args=[token_address, 500, []]) contract_address = web3.eth.getTransactionReceipt(txhash).contractAddress web3.testing.mine(1) return ChannelManager(contract_address)
[docs]@pytest.fixture(scope='session') def channel_manager_address( use_tester, web3, deployer_address, channel_manager_abi, channel_manager_bytecode, token_address ): if use_tester: contract = deploy_channel_manager_contract( web3, deployer_address, channel_manager_abi, channel_manager_bytecode, token_address ) return contract.address else: return NETWORK_CFG.CHANNEL_MANAGER_ADDRESS
[docs]@pytest.fixture(scope='session') def web3(use_tester: bool, faucet_private_key: str, faucet_address: str, mine_sync_event): if use_tester: provider = EthereumTesterProvider() web3 = Web3(provider) NETWORK_CFG.set_defaults(get_network_id('ethereum-tester')) x = web3.testing.mine def mine_patched(self, count): x(count) mine_sync_event.set() gevent.sleep(0) # switch context mine_sync_event.clear() web3.testing.mine = types.MethodType( mine_patched, web3.testing.mine ) # Tester chain uses Transaction to send and validate transactions but does not support # EIP-155 yet. This patches the sender address recovery to handle EIP-155. sender_property_original = Transaction.sender.fget def sender_property_patched(self: Transaction): if self._sender: return self._sender if self.v and self.v >= 35: v = bytes([self.v]) r = self.r.to_bytes(32, byteorder='big') s = self.s.to_bytes(32, byteorder='big') raw_tx = Transaction( self.nonce, self.gasprice, self.startgas, self.to, self.value, self.data, (self.v - 35) // 2, 0, 0 ) msg = keccak256(rlp.encode(raw_tx)) self._sender = decode_hex(addr_from_sig(r + s + v, msg)) return self._sender else: return sender_property_original(self) Transaction.sender = property( sender_property_patched, Transaction.sender.fset, Transaction.sender.fdel ) # add faucet account to tester ethereum.tester.accounts.append(decode_hex(faucet_address)) ethereum.tester.keys.append(decode_hex(faucet_private_key)) # make faucet rich web3.eth.sendTransaction({'to': faucet_address, 'value': FAUCET_ALLOWANCE}) else: rpc = HTTPProvider(WEB3_PROVIDER_DEFAULT) web3 = Web3(rpc) NETWORK_CFG.set_defaults(int(web3.version.network)) yield web3 if use_tester: ethereum.tester.accounts.remove(decode_hex(faucet_address)) ethereum.tester.keys.remove(decode_hex(faucet_private_key))
[docs]@pytest.fixture def patched_contract(use_tester: bool, monkeypatch: MonkeyPatch, web3: Web3): if use_tester: def patched_get_logs_raw(contract: Contract, filter_params): filter_ = contract.web3.eth.filter(filter_params) response = contract.web3.eth.getFilterLogs(filter_.filter_id) contract.web3.eth.uninstallFilter(filter_.filter_id) return response def patched_wait_for_event(wait: float): web3.testing.mine(1) def patched_wait_for_transaction( web3: Web3, tx_hash: str, timeout: int = DEFAULT_TIMEOUT, polling_interval: int = DEFAULT_RETRY_INTERVAL ): web3.testing.mine(1) tx_receipt = web3.eth.getTransactionReceipt(tx_hash) if tx_receipt is None: raise TimeoutError('Transaction {} was not mined.'.format(tx_hash)) return tx_receipt monkeypatch.setattr(microraiden.utils.contract, '_get_logs_raw', patched_get_logs_raw) monkeypatch.setattr(microraiden.utils.contract, '_wait', patched_wait_for_event) monkeypatch.setattr( microraiden.utils.contract, 'wait_for_transaction', patched_wait_for_transaction )
[docs]@pytest.fixture(scope='session') def wait(web3, kovan_block_time): poll_interval = kovan_block_time / 2 return Wait(web3, poll_interval=poll_interval)
[docs]@pytest.fixture(scope='session') def wait_for_blocks(web3, kovan_block_time, use_tester): def wait_for_blocks(n): if use_tester: web3.testing.mine(n) gevent.sleep(0) else: target_block = web3.eth.blockNumber + n while web3.eth.blockNumber < target_block: gevent.sleep(kovan_block_time / 2) return wait_for_blocks
[docs]@pytest.fixture(scope='session') def wait_for_transaction(wait): def wait_for_transaction(tx_hash): wait.for_receipt(tx_hash) gevent.sleep(0) return wait_for_transaction
[docs]@pytest.fixture def revert_chain(web3: Web3, use_tester: bool, sender_privkey: str, receiver_privkey: str): if use_tester: snapshot_id = web3.testing.snapshot() yield web3.testing.revert(snapshot_id) else: yield
[docs]@pytest.fixture(scope='session') def token_contract(web3: Web3, token_address: str, token_abi): return web3.eth.contract(abi=token_abi, address=token_address)
[docs]@pytest.fixture(scope='session') def channel_manager_contract(web3: Web3, channel_manager_address: str, channel_manager_abi): return web3.eth.contract(abi=channel_manager_abi, address=channel_manager_address)