"""Channel manager handles channel state changes on a low (blockchain) level."""
import time
import gevent
import gevent.event
import filelock
import logging
import os
from eth_utils import (
decode_hex,
is_same_address,
is_checksum_address
)
from ethereum.exceptions import InsufficientBalance
from web3 import Web3
from web3.contract import Contract
from microraiden.utils import (
verify_balance_proof,
privkey_to_addr,
sign_close,
create_signed_contract_transaction
)
from microraiden.exceptions import (
NetworkIdMismatch,
StateReceiverAddrMismatch,
StateContractAddrMismatch,
StateFileLocked,
NoOpenChannel,
InsufficientConfirmations,
InvalidBalanceProof,
InvalidBalanceAmount,
InvalidContractVersion,
NoBalanceProofReceived,
)
from microraiden.constants import CHANNEL_MANAGER_CONTRACT_VERSION
from .state import ChannelManagerState
from .blockchain import Blockchain
from .channel import Channel, ChannelState
log = logging.getLogger(__name__)
[docs]class ChannelManager(gevent.Greenlet):
"""Manages channels from the receiver's point of view."""
def __init__(
self,
web3: Web3,
channel_manager_contract: Contract,
token_contract: Contract,
private_key: str,
state_filename: str = None,
n_confirmations=1
) -> None:
gevent.Greenlet.__init__(self)
self.blockchain = Blockchain(
web3,
channel_manager_contract,
self,
n_confirmations=n_confirmations
)
self.receiver = privkey_to_addr(private_key)
self.private_key = private_key
self.channel_manager_contract = channel_manager_contract
self.token_contract = token_contract
self.n_confirmations = n_confirmations
self.log = logging.getLogger('channel_manager')
network_id = int(web3.version.network)
assert is_same_address(privkey_to_addr(self.private_key), self.receiver)
# check contract version
self.check_contract_version()
if state_filename not in (None, ':memory:') and os.path.isfile(state_filename):
self.state = ChannelManagerState.load(state_filename)
else:
self.state = ChannelManagerState(state_filename)
self.state.setup_db(
network_id,
channel_manager_contract.address,
self.receiver
)
assert self.state is not None
if state_filename not in (None, ':memory:'):
self.lock_state = filelock.FileLock(state_filename + '.lock')
try:
self.lock_state.acquire(timeout=0)
except filelock.Timeout:
raise StateFileLocked("state file %s is locked by another process" %
state_filename)
if network_id != self.state.network_id:
raise NetworkIdMismatch("Network id mismatch: state=%d, backend=%d" % (
self.state.network_id, network_id))
if not is_same_address(self.receiver, self.state.receiver):
raise StateReceiverAddrMismatch('%s != %s' %
(self.receiver, self.state.receiver))
if not is_same_address(self.state.contract_address, channel_manager_contract.address):
raise StateContractAddrMismatch('%s != %s' % (
channel_manager_contract.address, self.state.contract_address))
self.log.debug('setting up channel manager, receiver=%s channel_contract=%s' %
(self.receiver, channel_manager_contract.address))
def __del__(self):
self.stop()
def _run(self):
self.blockchain.start()
[docs] def stop(self):
if self.blockchain.running:
self.blockchain.stop()
self.blockchain.join()
[docs] def set_head(self,
unconfirmed_head_number: int,
unconfirmed_head_hash: int,
confirmed_head_number,
confirmed_head_hash):
"""Set the block number up to which all events have been registered."""
assert unconfirmed_head_number > 0
assert confirmed_head_number > 0
assert confirmed_head_number < unconfirmed_head_number
self.state.update_sync_state(unconfirmed_head_number=unconfirmed_head_number,
unconfirmed_head_hash=unconfirmed_head_hash,
confirmed_head_number=confirmed_head_number,
confirmed_head_hash=confirmed_head_hash)
# relevant events from the blockchain for receiver from contract
[docs] def event_channel_opened(self, sender: str, open_block_number: int, deposit: int):
"""Notify the channel manager of a new confirmed channel opening."""
assert is_checksum_address(sender)
if (sender, open_block_number) in self.channels:
return # ignore event if already provessed
c = Channel(self.state.receiver, sender, deposit, open_block_number)
c.confirmed = True
c.state = ChannelState.OPEN
self.log.info('new channel opened (sender %s, block number %s)', sender, open_block_number)
self.state.set_channel(c)
[docs] def unconfirmed_event_channel_opened(self, sender: str, open_block_number: int, deposit: int):
"""Notify the channel manager of a new channel opening that has not been confirmed yet."""
assert is_checksum_address(sender)
assert deposit >= 0
assert open_block_number > 0
event_already_processed = (sender, open_block_number) in self.unconfirmed_channels
channel_already_confirmed = (sender, open_block_number) in self.channels
if event_already_processed or channel_already_confirmed:
return
c = Channel(self.state.receiver, sender, deposit, open_block_number)
c.confirmed = False
c.state = ChannelState.OPEN
self.state.set_channel(c)
self.log.info('unconfirmed channel event received (sender %s, block_number %s)',
sender, open_block_number)
[docs] def event_channel_close_requested(
self,
sender: str,
open_block_number: int,
balance: int,
settle_timeout: int
):
"""Notify the channel manager that a the closing of a channel has been requested.
Params:
settle_timeout (int): settle timeout in blocks"""
assert is_checksum_address(sender)
assert settle_timeout >= 0
if (sender, open_block_number) not in self.channels:
self.log.warning(
'attempt to close a non existing channel (sender %ss, block_number %ss)',
sender,
open_block_number
)
return
c = self.channels[sender, open_block_number]
if c.balance > balance:
self.log.warning('sender tried to cheat, sending challenge '
'(sender %s, block number %s)',
sender, open_block_number)
self.close_channel(sender, open_block_number) # dispute by closing the channel
else:
self.log.info('valid channel close request received '
'(sender %s, block number %s, timeout %d)',
sender, open_block_number, settle_timeout)
c.settle_timeout = settle_timeout
c.is_closed = True
c.confirmed = True
c.mtime = time.time()
self.state.set_channel(c)
[docs] def event_channel_settled(self, sender, open_block_number):
"""Notify the channel manager that a channel has been settled."""
assert is_checksum_address(sender)
self.log.info('Forgetting settled channel (sender %s, block number %s)',
sender, open_block_number)
self.state.del_channel(sender, open_block_number)
[docs] def unconfirmed_event_channel_topup(
self, sender, open_block_number, txhash, added_deposit
):
"""Notify the channel manager of a topup with not enough confirmations yet."""
assert is_checksum_address(sender)
if (sender, open_block_number) not in self.channels:
assert (sender, open_block_number) in self.unconfirmed_channels
self.log.info('Ignoring unconfirmed topup of unconfirmed channel '
'(sender %s, block number %s, added %s)',
sender, open_block_number, added_deposit)
return
self.log.info('Registering unconfirmed deposit top up '
'(sender %s, block number %s, added %s)',
sender, open_block_number, added_deposit)
c = self.channels[sender, open_block_number]
c.unconfirmed_topups[txhash] = added_deposit
self.state.set_channel(c)
[docs] def event_channel_topup(self, sender, open_block_number, txhash, added_deposit):
"""Notify the channel manager that the deposit of a channel has been topped up."""
assert is_checksum_address(sender)
self.log.info(
'Registering deposit top up (sender %s, block number %s, added deposit %s)',
sender, open_block_number, added_deposit
)
assert (sender, open_block_number) in self.channels
c = self.channels[sender, open_block_number]
if c.is_closed is True:
self.log.warning(
"Topup of an already closed channel (sender=%s open_block=%d)" %
(sender, open_block_number)
)
return None
c.deposit += added_deposit
c.unconfirmed_topups.pop(txhash, None)
c.mtime = time.time()
self.state.set_channel(c)
# end events ####
[docs] def close_channel(self, sender: str, open_block_number: int):
"""Close and settle a channel.
Params:
sender (str): sender address
open_block_number (int): block the channel was open in
"""
assert is_checksum_address(sender)
if not (sender, open_block_number) in self.channels:
self.log.warning(
"attempt to close a non-registered channel (sender=%s open_block=%s" %
(sender, open_block_number)
)
return
c = self.channels[sender, open_block_number]
if c.last_signature is None:
raise NoBalanceProofReceived('Cannot close a channel without a balance proof.')
# send closing tx
closing_sig = sign_close(
self.private_key,
sender,
open_block_number,
c.balance,
self.channel_manager_contract.address
)
raw_tx = create_signed_contract_transaction(
self.private_key,
self.channel_manager_contract,
'cooperativeClose',
[
self.state.receiver,
open_block_number,
c.balance,
decode_hex(c.last_signature),
closing_sig
]
)
# update local state
c.is_closed = True
c.mtime = time.time()
self.state.set_channel(c)
try:
txid = self.blockchain.web3.eth.sendRawTransaction(raw_tx)
self.log.info('sent channel close(sender %s, block number %s, tx %s)',
sender, open_block_number, txid)
except InsufficientBalance:
c.state = ChannelState.CLOSE_PENDING
self.state.set_channel(c)
raise
[docs] def force_close_channel(self, sender: str, open_block_number: int):
"""Forcibly remove a channel from our channel state"""
assert is_checksum_address(sender)
try:
self.close_channel(sender, open_block_number)
return
except NoBalanceProofReceived:
c = self.channels[sender, open_block_number]
c.is_closed = True
self.state.set_channel(c)
[docs] def sign_close(self, sender: str, open_block_number: int, balance: int):
"""Sign an agreement for a channel closing.
Returns:
channel close signature (str): a signature that can be used client-side to close
the channel by directly calling contract's close method on-chain.
"""
assert is_checksum_address(sender)
if (sender, open_block_number) not in self.channels:
raise NoOpenChannel('Channel does not exist or has been closed'
'(sender=%s, open_block_number=%d)' % (sender, open_block_number))
c = self.channels[sender, open_block_number]
if c.is_closed:
raise NoOpenChannel('Channel closing has been requested already.')
assert balance is not None
if c.last_signature is None:
raise NoBalanceProofReceived('Payment has not been registered.')
if balance != c.balance:
raise InvalidBalanceProof('Requested closing balance does not match latest one.')
c.is_closed = True
c.mtime = time.time()
receiver_sig = sign_close(
self.private_key,
sender,
open_block_number,
c.balance,
self.channel_manager_contract.address
)
self.state.set_channel(c)
self.log.info('signed cooperative closing message (sender %s, block number %s)',
sender, open_block_number)
return receiver_sig
[docs] def get_locked_balance(self):
"""Get the balance in all channels combined."""
return sum([c.balance for c in self.channels.values()])
[docs] def get_liquid_balance(self):
"""Get the balance of the receiver in the token contract (not locked in channels)."""
balance = self.token_contract.call().balanceOf(self.receiver)
return balance
[docs] def get_eth_balance(self):
"""Get eth balance of the receiver"""
return self.channel_manager_contract.web3.eth.getBalance(self.receiver)
[docs] def verify_balance_proof(self, sender, open_block_number, balance, signature):
"""Verify that a balance proof is valid and return the sender.
This method just verifies if the balance proof is valid - no state update is performed.
:returns: Channel, if it exists
"""
assert is_checksum_address(sender)
if (sender, open_block_number) in self.unconfirmed_channels:
raise InsufficientConfirmations(
'Insufficient confirmations for the channel '
'(sender=%s, open_block_number=%d)' % (sender, open_block_number))
try:
c = self.channels[sender, open_block_number]
except KeyError:
raise NoOpenChannel('Channel does not exist or has been closed'
'(sender=%s, open_block_number=%s)' % (sender, open_block_number))
if c.is_closed:
raise NoOpenChannel('Channel closing has been requested already.')
if not is_same_address(
verify_balance_proof(
self.receiver,
open_block_number,
balance,
decode_hex(signature),
self.channel_manager_contract.address
),
sender
):
raise InvalidBalanceProof('Recovered signer does not match the sender')
return c
[docs] def register_payment(self, sender: str, open_block_number: int, balance: int, signature: str):
"""Register a payment.
Method will try to reconstruct (verify) balance update data
with a signature sent by the client.
If verification is succesfull, an internal payment state is updated.
Parameters:
sender (str): sender of the balance proof
open_block_number (int): block the channel was opened in
balance (int): updated balance
signature(str): balance proof to verify
"""
assert is_checksum_address(sender)
c = self.verify_balance_proof(sender, open_block_number, balance, signature)
if balance <= c.balance:
raise InvalidBalanceAmount('The balance must not decrease.')
if balance > c.deposit:
raise InvalidBalanceProof('Balance must not be greater than deposit')
received = balance - c.balance
c.balance = balance
c.last_signature = signature
c.mtime = time.time()
self.state.set_channel(c)
self.log.debug('registered payment (sender %s, block number %s, new balance %s)',
c.sender, open_block_number, balance)
return c.sender, received
[docs] def reset_unconfirmed(self):
"""Forget all unconfirmed channels and topups to allow for a clean resync."""
self.state.del_unconfirmed_channels()
for channel in self.channels.values():
channel.unconfirmed_topups.clear()
self.state.set_channel(channel)
self.state.unconfirmed_head_number = self.state.confirmed_head_number
self.state.unconfirmed_head_hash = self.state.confirmed_head_hash
@property
def channels(self):
return self.state.channels
@property
def unconfirmed_channels(self):
return self.state.unconfirmed_channels
@property
def pending_channels(self):
return self.state.pending_channels
[docs] def channels_to_dict(self):
"""Export all channels as a dictionary."""
d = {}
for sender, block_number in self.channels:
channel = self.channels[sender, block_number]
channel_dict = {
'deposit': channel.deposit,
'balance': channel.balance,
'mtime': channel.mtime,
'ctime': channel.ctime,
'settle_timeout': channel.settle_timeout,
'last_signature': channel.last_signature,
'is_closed': channel.is_closed
}
if sender not in d:
d[sender] = {}
d[sender][block_number] = channel_dict
return d
[docs] def unconfirmed_channels_to_dict(self):
"""Export all unconfirmed channels as a dictionary."""
d = {}
for sender, block_number in self.unconfirmed_channels:
channel = self.unconfirmed_channels[sender, block_number]
channel_dict = {
'deposit': channel.deposit,
'ctime': channel.ctime
}
if sender not in d:
d[sender] = {}
d[sender][block_number] = channel_dict
return d
[docs] def wait_sync(self):
self.blockchain.wait_sync()
[docs] def node_online(self):
return self.blockchain.is_connected.is_set()
[docs] def get_token_address(self):
return self.token_contract.address
[docs] def check_contract_version(self):
"""Compare version of the contract to the version of the library.
Only major and minor version is used in the comparison.
"""
deployed_contract_version = self.channel_manager_contract.call().version()
deployed_contract_version = deployed_contract_version.rsplit('.', 1)[0]
library_version = CHANNEL_MANAGER_CONTRACT_VERSION.rsplit('.', 1)[0]
if deployed_contract_version != library_version:
raise InvalidContractVersion("Incompatible contract version: expected=%s deployed=%s" %
(CHANNEL_MANAGER_CONTRACT_VERSION,
deployed_contract_version))
[docs] def close_pending_channels(self):
"""Close all channels that are in CLOSE_PENDING state.
This state happens if the receiver's eth balance is not enough to
close channel on-chain."""
for sender, open_block_number in self.pending_channels.keys():
self.close_channel(sender, open_block_number) # dispute by closing the channel