epicpost/funding/orm.py
2022-08-18 23:05:04 +08:00

203 lines
6.4 KiB
Python

from datetime import datetime
import string
import random
import requests
from sqlalchemy.orm import relationship, backref
import sqlalchemy as sa
from sqlalchemy.orm import scoped_session, sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.types import Float
from sqlalchemy_json import MutableJson
from sqlalchemy import and_, or_, not_
import hashlib
import json
import settings
from funding.factory import db, cache
base = declarative_base(name="Model")
def val_address(address):
if len(address) != 52 or address[0] != 'e' or address[1] != 's':
raise Exception("invalid epic address")
class Address(db.Model):
__tablename__ = "addresses"
id = db.Column('user_id', db.Integer, primary_key=True)
address = db.Column(db.String(52), unique=True)
@classmethod
def find_address(cls, address):
from funding.factory import db
q = cls.query
q = q.filter(Address.address == address)
result = q.first()
if not result:
return
return result
def __repr__(self):
return "<Address(address='%s')>" % (
self.address)
@classmethod
def add(cls, address):
from funding.factory import db
try:
previous = cls.query.filter(Address.address == address).first()
if previous is not None:
print("was inputted before")
return
# validate incoming username/email
val_address(address)
new_address = Address(address=address)
db.session.add(new_address)
db.session.commit()
db.session.flush()
return new_address
except Exception as ex:
db.session.rollback()
raise
class Slate(db.Model):
__tablename__ = "slates"
id = db.Column(db.Integer, primary_key=True)
slate = db.Column(db.String())
# TODO: clear slatse that have been in the database past a certain time
posted_time = db.Column(db.DateTime)
receivingAddress = db.Column(db.String(52), db.ForeignKey('addresses.address'), nullable=False)
def __init__(self, slate, receivingAddress):
from funding.factory import bcrypt
self.slate = slate
self.receivingAddress = receivingAddress
self.posted_time = datetime.utcnow()
def __repr__(self):
return "<Slate(slate='%s', receivingAddress='%s', posted_time='%s)>" % (
self.slate, self.receivingAddress, self.posted_time)
@classmethod
def find_slates(cls, address):
q = cls.query
q = q.filter(Slate.receivingAddress == address)
result = q.all()
print(f'{result}')
list_slates = []
for slate_db in result:
list_slates.append(slate_db.slate)
return list_slates
@classmethod
def delete_slate(cls, address, slate):
q = cls.query
q = q.filter(and_(Slate.receivingAddress == address, Slate.slate == slate) )
result = q.first()
if result is None:
print("none guy")
return True
try:
db.session.delete(result)
db.session.commit()
db.session.flush()
return True
except Exception as ex:
db.session.rollback()
return False
@classmethod
def add(cls, slate, receivingAddress):
from funding.factory import db
try:
previous = cls.query.filter(Slate.slate == slate).first()
if previous is not None:
print("was inputted before")
return
# put in new one
new_slate = Slate(slate=slate, receivingAddress=receivingAddress)
db.session.add(new_slate)
db.session.commit()
db.session.flush()
return new_slate
except Exception as ex:
db.session.rollback()
raise
class Cancel(db.Model):
__tablename__ = "cancels"
id = db.Column(db.Integer, primary_key=True)
slate_id = db.Column(db.String())
# TODO: clear slatse that have been in the database past a certain time
posted_time = db.Column(db.DateTime)
receivingAddress = db.Column(db.String(52), nullable=False)
sendersAddress = db.Column(db.String(52), nullable=False)
def __init__(self, slate_id, receivingAddress, sendersAddress):
from funding.factory import bcrypt
self.slate_id = slate_id
self.receivingAddress = receivingAddress
self.sendersAddress = sendersAddress
self.posted_time = datetime.utcnow()
def __repr__(self):
return "<Cancel(slate_id='%s', receivingAddress='%s', sendersAddress='%s', posted_time='%s)>" % (
self.slate_id, self.receivingAddress, self.sendersAddress, self.posted_time)
@classmethod
def find_slate_ids(cls, address):
q = cls.query
q = q.filter(Cancel.receivingAddress == address)
result = q.all()
print(f'{result}')
list_slates = []
for slate_db in result:
list_slates.append({slate_db.slate_id: slate_db.sendersAddress})
return list_slates
@classmethod
def delete_slate_id(cls, address, slate_id):
q = cls.query
q = q.filter(and_(Cancel.receivingAddress == address, Cancel.slate_id == slate_id))
result = q.first()
if result is None:
print("none guy")
return True
try:
db.session.delete(result)
db.session.commit()
db.session.flush()
return True
except Exception as ex:
db.session.rollback()
return False
@classmethod
def add(cls, slate_id, receivingAddress, sendersAddress):
from funding.factory import db
try:
previous = cls.query.filter(Cancel.slate_id == slate_id).first()
if previous is not None:
print(previous)
print("was inputted before")
return
# put in new one
new_cancel_request = Cancel(slate_id=slate_id, receivingAddress=receivingAddress, sendersAddress=sendersAddress)
db.session.add(new_cancel_request)
db.session.commit()
db.session.flush()
return new_cancel_request
except Exception as ex:
db.session.rollback()
raise