rifalto/rifaserver/app.py

235 lines
8.3 KiB
Python
Raw Normal View History

2023-06-14 09:20:36 -03:00
import functools
import itertools
import operator
import os
from flask import Flask, render_template, request, url_for, redirect, flash, session, make_response
from flask.cli import with_appcontext
from flask_login import LoginManager, login_user, login_required, logout_user, current_user
from sqlalchemy.exc import IntegrityError
import click
from sqlalchemy.orm import joinedload
from werkzeug.security import generate_password_hash
from flask_migrate import Migrate, upgrade, stamp
from rifaserver.models import Seller, Purchase, Raffle, Ticket, db
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import shutil
def backup_db(instance_path):
db_path = os.path.join(instance_path, 'raffle.db')
backup_dir = os.path.join(instance_path, 'backup')
os.makedirs(backup_dir, exist_ok=True)
# Find the most recent backup file
try:
latest_backup = max(os.path.join(backup_dir, name) for name in os.listdir(backup_dir))
except ValueError:
latest_backup = None # No backup files exist yet
if latest_backup and os.path.getmtime(db_path) <= os.path.getmtime(latest_backup):
# print("No changes to database, skipping backup.")
pass
else:
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
shutil.copy(db_path, os.path.join(backup_dir, f'raffle_{timestamp}.db'))
def remove_cache(response):
response = make_response(response)
response.headers['Last-Modified'] = str(datetime.now())
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, post-check=0, pre-check=0, max-age=0'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '-1'
return response
def create_app():
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///raffle.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # optional, but can improve performance
app.config['SECRET_KEY'] = 'idm&#@$8*RSXZpc6Wvb4'
db.init_app(app)
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
login_manager.login_message = 'Favor entrar com senha para acessar esta página!'
login_manager.login_message_category = 'danger'
@login_manager.user_loader
def load_user(user_id):
return Seller.query.get(user_id)
migrate = Migrate(app, db)
with app.app_context():
if not os.path.isfile(os.path.join(app.instance_path, 'raffle.db')):
db.create_all()
stamp()
scheduler = BackgroundScheduler()
scheduler.add_job(func=backup_db, args=(app.instance_path,), trigger="interval", hours=1)
scheduler.start()
# print('App created!!')
return app
app = create_app()
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
user = Seller.query.filter_by(username=request.form['username']).first()
if user and user.check_password(request.form['password']):
login_user(user)
return redirect(url_for('seller_screen'))
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('login'))
@app.route('/venda', methods=['GET', 'POST'])
@login_required
def seller_screen():
if request.method == 'POST':
if not request.form['amount'].isdigit() or int(request.form['amount']) <= 0:
flash("Quantidade inválida", 'warning')
return redirect(url_for('seller_screen'))
purchase = Purchase(seller_id=current_user.id, total_numbers=int(request.form['amount']),
numbers_left=int(request.form['amount']), remarks=request.form['remarks'])
purchase.generate_id()
db.session.add(purchase)
db.session.commit()
purchase_link = url_for('numbers_screen', purchase_id=purchase.id, _external=True)
return remove_cache(render_template('purchase_link.html', link=purchase_link))
return render_template('seller_screen.html')
@app.route('/numeros/<purchase_id>', methods=['GET', 'POST'])
def numbers_screen(purchase_id):
purchase = Purchase.query.get_or_404(purchase_id)
buyer_data = session.get('buyer_data', {'name': '', 'contact': ''})
if request.method == 'POST':
_validation_errors = False
if not request.form['raffle_id']:
flash("Rifa não escolhida, escolha uma rifa!", 'warning')
_validation_errors = True
if purchase.numbers_left <= 0:
flash("Acabaram os números para escolher, compre mais!", 'danger')
_validation_errors = True
if not request.form['name']:
flash("Preencher o nome é obrigatório", 'warning')
_validation_errors = True
if not request.form['number'].isdigit() or int(request.form['number']) <= 0:
flash("Número inválido!", 'danger')
_validation_errors = True
if not _validation_errors:
raffle = Raffle.query.get_or_404(request.form['raffle_id'])
ticket = Ticket(purchase_id=purchase.id, raffle_id=raffle.id, buyer_name=request.form['name'],
buyer_contact=request.form['contact'], chosen_number=request.form['number'])
db.session.add(ticket)
try:
purchase.decrease_numbers()
db.session.commit()
flash(f"Número reservado: {request.form['number']}", 'success')
except IntegrityError:
db.session.rollback()
flash(f"O número {request.form['number']} já foi escolhido para {raffle.description}!", 'warning')
session['buyer_data'] = {'name': request.form['name'], 'contact': request.form['contact']}
return redirect(url_for('numbers_screen', purchase_id=purchase_id))
raffles = Raffle.query.all()
tickets = Ticket.query.filter_by(purchase_id=purchase_id).options(joinedload(Ticket.raffle)).order_by(Ticket.raffle_id).all()
grouped_tickets = {k: list(v) for k, v in itertools.groupby(tickets, key=operator.attrgetter('raffle.description'))}
# tickets = Ticket.query.filter_by(purchase_id=purchase_id).all()
return remove_cache(render_template('purchase.html', purchase=purchase, raffles=raffles,
grouped_tickets=grouped_tickets, buyer_data=buyer_data))
@app.route('/vendas', methods=['GET'])
@login_required
def seller_purchases():
seller_id = current_user.id
purchases = Purchase.query.filter_by(seller_id=seller_id).order_by(Purchase.id.desc()).all()
return render_template('seller_purchases.html', purchases=purchases)
@app.cli.command('create-seller')
@click.argument('username')
@click.argument('password')
def create_seller(username, password):
"""Create a new seller"""
from rifaserver.models import Seller, db
# check if seller exists
existing_seller = Seller.query.filter_by(username=username).first()
if existing_seller:
click.echo(f"Username {username} already exists.")
return
seller = Seller(
username=username,
password_hash=generate_password_hash(password)
)
db.session.add(seller)
db.session.commit()
click.echo(f"Seller: {username} created.")
@app.cli.command('cria-rifa')
@click.argument('descricao')
@with_appcontext
def create_raffle(descricao):
"""Create a new raffle"""
from rifaserver.models import db, Raffle
# Create a new raffle
raffle = Raffle(description=descricao)
db.session.add(raffle)
db.session.commit()
click.echo(f"Rifa '{descricao}' criada.")
@app.cli.command('muda-senha')
@click.argument('usuario')
@click.argument('nova_senha')
@with_appcontext
def change_password(usuario, nova_senha):
"""Change password for a seller"""
from rifaserver.models import db, Seller
from werkzeug.security import generate_password_hash
# Find the seller
seller = Seller.query.filter_by(username=usuario).first()
if not seller:
click.echo(f"Vendedor não encontrado: {usuario}")
return
# Change the password and commit the changes
seller.password_hash = generate_password_hash(nova_senha)
db.session.commit()
click.echo(f"Senha para {usuario} foi atualizada.")
@app.cli.command('apply_migrations')
@with_appcontext
def apply_migrations_command():
"""Apply database migrations."""
upgrade()