Source code for microraiden.proxy.resources.login

import time
import uuid
from flask_httpauth import HTTPBasicAuth
from flask import g
from flask_restful import Resource

auth = HTTPBasicAuth()


[docs]class TokenAccess: def __init__(self, user: str): self.token = str(uuid.uuid1()) self.time_created = time.time() self.time_accessed = time.time() self.user = user
[docs]class UsersDB: def __init__(self): self.users = {} self.tokens = {} self.token_expiry_seconds = 600 # token is valid for 10 minutes
[docs] def add_user(self, user: str, password: str): self.users[user] = password
[docs] def del_user(self, user: str): self.users.pop(user)
[docs] def authorize(self, user_or_token: str, password: str): """Authorize user using token or username/password combination""" g.user = user_or_token token_record = self.verify_token(user_or_token) if token_record is not None: token_record.time_accessed = time.time() return True else: self.tokens.pop(user_or_token, '') if user_or_token not in self.users: return False return self.users[user_or_token] == password
[docs] def verify_token(self, token: str): """Verify if the token is valid and not expired""" token_record = self.tokens.get(token, None) if token_record is None: return None t_diff = time.time() - token_record.time_accessed assert t_diff >= 0 if t_diff > self.token_expiry_seconds: return None return token_record
[docs] def remove_token(self, token: str): del self.tokens[token]
[docs] def get_token(self, user: str): token_record = TokenAccess(user) self.tokens[token_record.token] = token_record return token_record.token
userDB = UsersDB() # used by flask to process http auth requests
[docs]@auth.verify_password def verify_password(username, password): return userDB.authorize(username, password)
# two resources for managing the login # exported as /login and /logout
[docs]class ChannelManagementLogin(Resource):
[docs] @auth.login_required def get(self): token = userDB.get_token(g.user) return {'token': token}, 200
[docs]class ChannelManagementLogout(Resource):
[docs] @auth.login_required def get(self): userDB.remove_token(g.user) return "OK", 200