Source code for microraiden.test.fixtures.channel_manager
import pytest
import types
from microraiden.channel_manager import ChannelManager, Blockchain
[docs]def start_channel_manager(channel_manager, use_tester, mine_sync_event):
if use_tester:
mine_sync_event.clear()
# monkeypatch Blockchain::_update() to wait for an sync event
def update_patched(self: Blockchain):
Blockchain._update(self)
mine_sync_event.wait()
channel_manager.blockchain._update = types.MethodType(
update_patched, channel_manager.blockchain)
# it is pointless to do busy loop as the Blockchain blocks on sync
channel_manager.blockchain.poll_interval = 0
def stop_patched(self: ChannelManager):
mine_sync_event.set()
ChannelManager.stop(self)
self.stop = types.MethodType(ChannelManager.stop, self)
channel_manager.stop = types.MethodType(
stop_patched, channel_manager)
def fail(greenlet):
raise greenlet.exception
channel_manager.link_exception(fail)
channel_manager.start()
return channel_manager
[docs]@pytest.fixture
def channel_manager(
web3,
receiver_privkey,
channel_manager_contract,
token_contract,
use_tester,
mine_sync_event,
state_db_path,
patched_contract,
revert_chain
):
manager = ChannelManager(
web3,
channel_manager_contract,
token_contract,
receiver_privkey,
n_confirmations=5,
state_filename=state_db_path
)
start_channel_manager(manager, use_tester, mine_sync_event)
yield manager
manager.stop()