import requests
import sys
import gevent
import logging
from ethereum.exceptions import InsufficientBalance
from web3 import Web3
from web3.contract import Contract
from web3.exceptions import BadFunctionCallOutput
from eth_utils import is_same_address, to_checksum_address
from microraiden.config import NETWORK_CFG
from microraiden.constants import PROXY_BALANCE_LIMIT
from microraiden.utils import get_logs
[docs]class Blockchain(gevent.Greenlet):
"""Class that watches the blockchain and relays events to the channel manager."""
poll_interval = 2
def __init__(
self,
web3: Web3,
channel_manager_contract: Contract,
channel_manager,
n_confirmations,
sync_chunk_size=100 * 1000
):
gevent.Greenlet.__init__(self)
self.web3 = web3
self.channel_manager_contract = channel_manager_contract
self.cm = channel_manager
self.n_confirmations = n_confirmations
self.log = logging.getLogger('blockchain')
self.wait_sync_event = gevent.event.Event()
self.is_connected = gevent.event.Event()
self.sync_chunk_size = sync_chunk_size
self.running = False
# insufficient_balance
# - set to true if for some reason tx can't be send
# - set to false when all pending closes are settled
# - used to dispute/close channels that are in CHALLENGED state after manager
# has ether to spend again
self.insufficient_balance = False
self.sync_start_block = NETWORK_CFG.start_sync_block
def _run(self):
self.running = True
self.log.info('starting blockchain polling (interval %ss)', self.poll_interval)
while self.running:
if self.insufficient_balance:
self.insufficient_balance_recover()
try:
self._update()
self.is_connected.set()
if self.wait_sync_event.is_set():
gevent.sleep(self.poll_interval)
except requests.exceptions.ConnectionError as e:
endpoint = self.web3.currentProvider.endpoint_uri
self.log.warning(
'Ethereum node (%s) refused connection. Retrying in %d seconds.' %
(endpoint, self.poll_interval)
)
gevent.sleep(self.poll_interval)
self.is_connected.clear()
self.log.info('stopped blockchain polling')
[docs] def stop(self):
self.running = False
[docs] def wait_sync(self):
"""Block until event polling is up-to-date with a most recent block of the blockchain"""
self.wait_sync_event.wait()
def _update(self):
current_block = self.web3.eth.blockNumber
# reset unconfirmed channels in case of reorg
if self.wait_sync_event.is_set(): # but not on first sync
if current_block < self.cm.state.unconfirmed_head_number:
self.log.info('chain reorganization detected. '
'Resyncing unconfirmed events (unconfirmed_head=%d) [@%d]' %
(self.cm.state.unconfirmed_head_number, self.web3.eth.blockNumber))
self.cm.reset_unconfirmed()
try:
# raises if hash doesn't exist (i.e. block has been replaced)
self.web3.eth.getBlock(self.cm.state.unconfirmed_head_hash)
except ValueError:
self.log.info('chain reorganization detected. '
'Resyncing unconfirmed events (unconfirmed_head=%d) [@%d]. '
'(getBlock() raised ValueError)' %
(self.cm.state.unconfirmed_head_number, current_block))
self.cm.reset_unconfirmed()
# in case of reorg longer than confirmation number fail
try:
self.web3.eth.getBlock(self.cm.state.confirmed_head_hash)
except ValueError:
self.log.critical('events considered confirmed have been reorganized')
assert False # unreachable as long as confirmation level is set high enough
if self.cm.state.confirmed_head_number is None:
self.cm.state.update_sync_state(confirmed_head_number=self.sync_start_block)
if self.cm.state.unconfirmed_head_number is None:
self.cm.state.update_sync_state(unconfirmed_head_number=self.sync_start_block)
new_unconfirmed_head_number = self.cm.state.unconfirmed_head_number + self.sync_chunk_size
new_unconfirmed_head_number = min(new_unconfirmed_head_number, current_block)
new_confirmed_head_number = max(new_unconfirmed_head_number - self.n_confirmations, 0)
# return if blocks have already been processed
if (self.cm.state.confirmed_head_number >= new_confirmed_head_number and
self.cm.state.unconfirmed_head_number >= new_unconfirmed_head_number):
return
# filter for events after block_number
filters_confirmed = {
'from_block': self.cm.state.confirmed_head_number + 1,
'to_block': new_confirmed_head_number,
'argument_filters': {
'_receiver_address': self.cm.state.receiver
}
}
filters_unconfirmed = {
'from_block': self.cm.state.unconfirmed_head_number + 1,
'to_block': new_unconfirmed_head_number,
'argument_filters': {
'_receiver_address': self.cm.state.receiver
}
}
self.log.debug(
'filtering for events u:%s-%s c:%s-%s @%d',
filters_unconfirmed['from_block'],
filters_unconfirmed['to_block'],
filters_confirmed['from_block'],
filters_confirmed['to_block'],
current_block
)
# unconfirmed channel created
logs = get_logs(
self.channel_manager_contract,
'ChannelCreated',
**filters_unconfirmed
)
for log in logs:
assert is_same_address(log['args']['_receiver_address'], self.cm.state.receiver)
sender = log['args']['_sender_address']
sender = to_checksum_address(sender)
deposit = log['args']['_deposit']
open_block_number = log['blockNumber']
self.log.debug(
'received unconfirmed ChannelCreated event (sender %s, block number %s)',
sender,
open_block_number
)
self.cm.unconfirmed_event_channel_opened(sender, open_block_number, deposit)
# channel created
logs = get_logs(
self.channel_manager_contract,
'ChannelCreated',
**filters_confirmed
)
for log in logs:
assert is_same_address(log['args']['_receiver_address'], self.cm.state.receiver)
sender = log['args']['_sender_address']
sender = to_checksum_address(sender)
deposit = log['args']['_deposit']
open_block_number = log['blockNumber']
self.log.debug('received ChannelOpened event (sender %s, block number %s)',
sender, open_block_number)
self.cm.event_channel_opened(sender, open_block_number, deposit)
# unconfirmed channel top ups
logs = get_logs(
self.channel_manager_contract,
'ChannelToppedUp',
**filters_unconfirmed
)
for log in logs:
assert is_same_address(log['args']['_receiver_address'], self.cm.state.receiver)
txhash = log['transactionHash']
sender = log['args']['_sender_address']
sender = to_checksum_address(sender)
open_block_number = log['args']['_open_block_number']
added_deposit = log['args']['_added_deposit']
self.log.debug(
'received top up event (sender %s, block number %s, deposit %s)',
sender,
open_block_number,
added_deposit
)
self.cm.unconfirmed_event_channel_topup(
sender,
open_block_number,
txhash,
added_deposit
)
# confirmed channel top ups
logs = get_logs(
self.channel_manager_contract,
'ChannelToppedUp',
**filters_confirmed
)
for log in logs:
assert is_same_address(log['args']['_receiver_address'], self.cm.state.receiver)
txhash = log['transactionHash']
sender = log['args']['_sender_address']
sender = to_checksum_address(sender)
open_block_number = log['args']['_open_block_number']
added_deposit = log['args']['_added_deposit']
self.log.debug(
'received top up event (sender %s, block number %s, added deposit %s)',
sender,
open_block_number,
added_deposit
)
self.cm.event_channel_topup(sender, open_block_number, txhash, added_deposit)
# channel settled event
logs = get_logs(
self.channel_manager_contract,
'ChannelSettled',
**filters_confirmed
)
for log in logs:
assert is_same_address(log['args']['_receiver_address'], self.cm.state.receiver)
sender = log['args']['_sender_address']
sender = to_checksum_address(sender)
open_block_number = log['args']['_open_block_number']
self.log.debug('received ChannelSettled event (sender %s, block number %s)',
sender, open_block_number)
self.cm.event_channel_settled(sender, open_block_number)
# channel close requested
logs = get_logs(
self.channel_manager_contract,
'ChannelCloseRequested',
**filters_confirmed
)
for log in logs:
assert is_same_address(log['args']['_receiver_address'], self.cm.state.receiver)
sender = log['args']['_sender_address']
sender = to_checksum_address(sender)
open_block_number = log['args']['_open_block_number']
if (sender, open_block_number) not in self.cm.channels:
continue
balance = log['args']['_balance']
try:
timeout = self.channel_manager_contract.call().getChannelInfo(
sender,
self.cm.state.receiver,
open_block_number
)[2]
except BadFunctionCallOutput:
self.log.warning(
'received ChannelCloseRequested event for a channel that doesn\'t '
'exist or has been closed already (sender=%s open_block_number=%d)'
% (sender, open_block_number))
self.cm.force_close_channel(sender, open_block_number)
continue
self.log.debug('received ChannelCloseRequested event (sender %s, block number %s)',
sender, open_block_number)
try:
self.cm.event_channel_close_requested(sender, open_block_number, balance, timeout)
except InsufficientBalance:
self.log.fatal('Insufficient ETH balance of the receiver. '
"Can't close the channel. "
'Will retry once the balance is sufficient')
self.insufficient_balance = True
# TODO: recover
# update head hash and number
try:
new_unconfirmed_head_hash = self.web3.eth.getBlock(new_unconfirmed_head_number).hash
new_confirmed_head_hash = self.web3.eth.getBlock(new_confirmed_head_number).hash
except AttributeError:
self.log.critical("RPC endpoint didn't return proper info for an existing block "
"(%d,%d)" % (new_unconfirmed_head_number, new_confirmed_head_number))
self.log.critical("It is possible that the blockchain isn't fully synced. "
"This often happens when Parity is run with --fast or --warp sync.")
self.log.critical("Can't continue - check status of the ethereum node.")
sys.exit(1)
self.cm.set_head(
new_unconfirmed_head_number,
new_unconfirmed_head_hash,
new_confirmed_head_number,
new_confirmed_head_hash
)
if not self.wait_sync_event.is_set() and new_unconfirmed_head_number == current_block:
self.wait_sync_event.set()
[docs] def insufficient_balance_recover(self):
"""Recover from an insufficient balance state by closing
all pending channels if possible."""
balance = self.web3.eth.getBalance(self.cm.receiver)
if balance < PROXY_BALANCE_LIMIT:
return
try:
self.cm.close_pending_channels()
self.insufficient_balance = False
except InsufficientBalance:
self.log.fatal('Insufficient balance when trying to'
'close pending channels. (balance=%d)'
% balance)