Source code for microraiden.channel_manager.state

"""Off-chain state is saved in a sqlite database."""
# import json
# import shutil
import sqlite3
import os
import logging
from eth_utils import is_address

from microraiden.utils import check_permission_safety

from microraiden.exceptions import (
    InsecureStateFile
)
from .channel import Channel, ChannelState

log = logging.getLogger(__name__)


[docs]def dict_factory(cursor, row): """make sqlite result a dict with keys being column names""" d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d
DB_CREATION_SQL = """ CREATE TABLE `metadata` ( `network_id` INTEGER, `contract_address` CHAR(42), `receiver` CHAR(42) ); CREATE TABLE `syncstate` ( `confirmed_head_number` INTEGER, `confirmed_head_hash` CHAR(66), `unconfirmed_head_number` INTEGER, `unconfirmed_head_hash` CHAR(66) ); -- deposit and balance have length of 78 to fit uint256 CREATE TABLE `channels` ( `sender` CHAR(42) NOT NULL, `open_block_number` INTEGER NOT NULL, `deposit` DECIMAL(78,0) NOT NULL, `balance` DECIMAL(78,0) NOT NULL, `last_signature` CHAR(132), `settle_timeout` INTEGER NOT NULL, `mtime` INTEGER NOT NULL, `ctime` INTEGER NOT NULL, `state` INTEGER NOT NULL, `confirmed` BOOL NOT NULL, PRIMARY KEY (`sender`, `open_block_number`) ); CREATE TABLE `topups` ( `channel_rowid` INTEGER, `txhash` CHAR(66) NOT NULL, `deposit` DECIMAL(78,0) NOT NULL, PRIMARY KEY (`channel_rowid`, `txhash`), FOREIGN KEY (`channel_rowid`) REFERENCES channels (rowid) ON DELETE CASCADE ); INSERT INTO `metadata` VALUES ( NULL, NULL, NULL ); INSERT INTO `syncstate` VALUES ( NULL, NULL, NULL, NULL ); """ UPDATE_METADATA_SQL = """ UPDATE `metadata` SET `network_id` = ?, `contract_address` = ?, `receiver` = ?; """ UPDATE_SYNCSTATE_SQL = { 'confirmed_head_number': 'UPDATE `syncstate` SET `confirmed_head_number` = ?;', 'confirmed_head_hash': 'UPDATE `syncstate` SET `confirmed_head_hash` = ?;', 'unconfirmed_head_number': 'UPDATE `syncstate` SET `unconfirmed_head_number` = ?;', 'unconfirmed_head_hash': 'UPDATE `syncstate` SET `unconfirmed_head_hash` = ?;' } ADD_CHANNEL_SQL = """ INSERT OR REPLACE INTO `channels` VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ) """ UPDATE_CHANNEL_SQL = """ UPDATE `channels` SET `deposit` = ?, `balance` = ?, `last_signature` = ?, `settle_timeout` = ?, `mtime` = ?, `state` = ? WHERE `sender` = ? AND `open_block_number` = ?; """ DEL_CHANNEL_SQL = """ DELETE FROM `channels` WHERE `sender` = ? AND `open_block_number` = ?"""
[docs]class ChannelManagerState(object): """The part of the channel manager state that needs to persist.""" def __init__(self, filename): self.filename = filename self.conn = sqlite3.connect(self.filename, isolation_level="EXCLUSIVE") self.conn.row_factory = dict_factory if filename not in (None, ':memory:'): os.chmod(filename, 0o600)
[docs] def setup_db(self, network_id: int, contract_address: str, receiver: str): """Initialize an empty database.""" assert is_address(receiver) self.conn.executescript(DB_CREATION_SQL) self.conn.execute(UPDATE_METADATA_SQL, [network_id, contract_address, receiver]) self.conn.commit()
@property def contract_address(self): """The address of the channel manager contract.""" c = self.conn.cursor() c.execute('SELECT `contract_address` FROM `metadata`;') contract_address = c.fetchone()['contract_address'] assert c.fetchone() is None return contract_address @property def receiver(self): """The receiver address.""" c = self.conn.cursor() c.execute('SELECT `receiver` FROM `metadata`;') receiver = c.fetchone()['receiver'] assert c.fetchone() is None return receiver @property def network_id(self): """Network the state uses.""" c = self.conn.cursor() c.execute('SELECT `network_id` FROM `metadata`;') network_id = c.fetchone()['network_id'] assert c.fetchone() is None return network_id @property def _sync_state(self): c = self.conn.cursor() c.execute('SELECT * FROM `syncstate`;') state = c.fetchone() assert c.fetchone() is None assert len(state) == 4 return state @property def confirmed_head_number(self): """The number of the highest processed block considered to be final.""" return self._sync_state['confirmed_head_number'] @confirmed_head_number.setter def confirmed_head_number(self, value): self.update_sync_state(confirmed_head_number=value) @property def confirmed_head_hash(self): """The hash of the highest processed block considered to be final.""" return self._sync_state['confirmed_head_hash'] @confirmed_head_hash.setter def confirmed_head_hash(self, value): self.update_sync_state(confirmed_head_hash=value) @property def unconfirmed_head_number(self): """The number of the highest processed block considered to be not yet final.""" return self._sync_state['unconfirmed_head_number'] @unconfirmed_head_number.setter def unconfirmed_head_number(self, value: int): self.update_sync_state(unconfirmed_head_number=value) @property def unconfirmed_head_hash(self): """The hash of the highest processed block considered to be not yet final.""" return self._sync_state['unconfirmed_head_hash'] @unconfirmed_head_hash.setter def unconfirmed_head_hash(self, value: int): self.update_sync_state(unconfirmed_head_hash=value)
[docs] def update_sync_state( self, confirmed_head_number=None, confirmed_head_hash=None, unconfirmed_head_number=None, unconfirmed_head_hash=None ): """Update block numbers and hashes of confirmed and unconfirmed head.""" if confirmed_head_number is not None: sql = UPDATE_SYNCSTATE_SQL['confirmed_head_number'] self.conn.execute(sql, [confirmed_head_number]) if confirmed_head_hash is not None: sql = UPDATE_SYNCSTATE_SQL['confirmed_head_hash'] self.conn.execute(sql, [confirmed_head_hash]) if unconfirmed_head_number is not None: sql = UPDATE_SYNCSTATE_SQL['unconfirmed_head_number'] self.conn.execute(sql, [unconfirmed_head_number]) if unconfirmed_head_hash is not None: sql = UPDATE_SYNCSTATE_SQL['unconfirmed_head_hash'] self.conn.execute(sql, [unconfirmed_head_hash]) self.conn.commit()
@property def n_channels(self): """Returns: int: count of all channels, regardless of their state """ c = self.conn.cursor() c.execute('SELECT COUNT(*) as count FROM `channels`') return c.fetchone()['count'] @property def n_open_channels(self): """ Returns: int: count of open channels """ c = self.conn.cursor() c.execute('SELECT COUNT(*) as count FROM `channels` WHERE `state` = ?', [ChannelState.OPEN.value]) return c.fetchone()['count']
[docs] def get_channels(self, confirmed=True): """ Args: confirmed (bool, optional): return confirmed channels only. Default is True. Returns: dict: map of channels, (sender, open_block_number) => Channel """ ret = dict() c = self.conn.cursor() c.execute('SELECT rowid, * FROM `channels` WHERE `confirmed` = ?', [confirmed]) for result in c.fetchall(): channel = self.result_to_channel(result) ret[result['sender'], result['open_block_number']] = channel return ret
@property def channels(self): return self.get_channels(confirmed=True) @property def unconfirmed_channels(self): return self.get_channels(confirmed=False) @property def pending_channels(self): """Get list of channels in a CLOSE_PENDING state""" ret = dict() c = self.conn.cursor() c.execute('SELECT rowid, * FROM `channels` WHERE `state` = ?', [ChannelState.CLOSE_PENDING.value]) for result in c.fetchall(): channel = self.result_to_channel(result) ret[result['sender'], result['open_block_number']] = channel return ret
[docs] def result_to_channel(self, result: dict): """Helper function to serialize one row of `channels` table into a channel object """ channel = Channel(self.receiver, result['sender'], int(result['deposit']), result['open_block_number']) channel.balance = int(result['balance']) channel.state = ChannelState(result['state']) channel.last_signature = result['last_signature'] channel.settle_timeout = result['settle_timeout'] channel.mtime = result['mtime'] channel.ctime = result['ctime'] channel.unconfirmed_topups = self.get_unconfirmed_topups(result['rowid']) channel.confirmed = result['confirmed'] return channel
[docs] def get_channel_rowid(self, sender: str, open_block_number: int): sender = sender c = self.conn.cursor() result = c.execute( 'SELECT rowid from `channels` WHERE sender = ? AND open_block_number = ?', [sender, open_block_number] ) return result.fetchone()['rowid']
[docs] def get_unconfirmed_topups(self, channel_rowid: int): c = self.conn.cursor() c.execute('SELECT * FROM topups WHERE channel_rowid = ?', [channel_rowid]) return {result['txhash']: result['deposit'] for result in c.fetchall()}
[docs] def set_channel(self, channel: Channel): """Update channel state""" self.add_channel(channel)
[docs] def channel_exists(self, sender: str, open_block_number: int): """Return true if channel(sender, open_block_number) exists""" c = self.conn.cursor() sql = 'SELECT 1 FROM `channels` WHERE `sender` = ? AND `open_block_number` == ?' c.execute(sql, [sender, open_block_number]) result = c.fetchone() if result is None: return False elif result['1']: return True assert False
[docs] def set_unconfirmed_topups(self, channel_rowid: int, topups: dict): assert channel_rowid is not None and isinstance(channel_rowid, int) self.conn.execute('DELETE FROM topups WHERE channel_rowid = ?', [channel_rowid]) for txhash, deposit in topups.items(): self.conn.execute('INSERT OR REPLACE INTO topups VALUES (?, ?, ?)', [channel_rowid, txhash, str(deposit)])
[docs] def add_channel(self, channel: Channel): """Add or update channel state""" assert channel.open_block_number > 0 assert channel.state is not ChannelState.UNDEFINED assert is_address(channel.sender) params = [ channel.sender, channel.open_block_number, str(channel.deposit), str(channel.balance), channel.last_signature, channel.settle_timeout, channel.mtime, channel.ctime, channel.state.value, channel.confirmed ] self.conn.execute(ADD_CHANNEL_SQL, params) rowid = self.get_channel_rowid(channel.sender, channel.open_block_number) self.set_unconfirmed_topups(rowid, channel.unconfirmed_topups) self.conn.commit()
[docs] def get_channel(self, sender: str, open_block_number: int): assert is_address(sender) assert open_block_number > 0 # TODO unconfirmed topups c = self.conn.cursor() sql = 'SELECT rowid,* FROM `channels` WHERE `sender` = ? AND `open_block_number` = ?' c.execute(sql, [sender, open_block_number]) result = c.fetchone() assert c.fetchone() is None return self.result_to_channel(result)
[docs] def del_channel(self, sender: str, open_block_number: int): assert is_address(sender) assert open_block_number > 0 assert self.channel_exists(sender, open_block_number) self.conn.execute(DEL_CHANNEL_SQL, [sender, open_block_number]) self.conn.commit()
[docs] @classmethod def load(cls, filename: str, check_permissions=True): """Load a previously stored state.""" assert filename and isinstance(filename, str) if filename != ':memory:': if os.path.isfile(filename) is False: log.error("State file %s doesn't exist" % filename) return None if check_permissions and not check_permission_safety(filename): raise InsecureStateFile(filename) ret = cls(filename) log.debug("loaded saved state. head_number=%s receiver=%s" % (ret.confirmed_head_number, ret.receiver)) # for sender, block in ret.channels.keys(): # log.debug("loaded channel info from the saved state sender=%s open_block=%s" % # (sender, block)) return ret
[docs] def del_unconfirmed_channels(self): self.conn.execute('DELETE FROM `channels` WHERE `confirmed` = 0') self.conn.commit()
[docs] def set_channel_state(self, sender: str, open_block_number: int, state: ChannelState): assert is_address(sender) sender = sender self.conn.execute('UPDATE `channels` SET `state` = ?' 'WHERE `sender` = ? AND `open_block_number` = ?', [state, sender, open_block_number])