rifalto/rifaserver/app.py

272 lines
10 KiB
Python

import functools
import itertools
import json
import operator
import os
import random
import secrets
from flask import Flask, render_template, request, url_for, redirect, flash, session, make_response
from flask.cli import with_appcontext
from flask_login import LoginManager, login_user, login_required, logout_user, current_user
from sqlalchemy.exc import IntegrityError
import click
from sqlalchemy.orm import joinedload
from werkzeug.security import generate_password_hash
from flask_migrate import Migrate, upgrade, stamp
from rifaserver.models import Seller, Purchase, Raffle, Ticket, db
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import shutil
def backup_db(instance_path):
db_path = os.path.join(instance_path, 'raffle.db')
backup_dir = os.path.join(instance_path, 'backup')
os.makedirs(backup_dir, exist_ok=True)
# Find the most recent backup file
try:
latest_backup = max(os.path.join(backup_dir, name) for name in os.listdir(backup_dir))
except ValueError:
latest_backup = None # No backup files exist yet
if latest_backup and os.path.getmtime(db_path) <= os.path.getmtime(latest_backup):
# print("No changes to database, skipping backup.")
pass
else:
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
shutil.copy(db_path, os.path.join(backup_dir, f'raffle_{timestamp}.db'))
def remove_cache(response):
response = make_response(response)
response.headers['Last-Modified'] = str(datetime.now())
response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, post-check=0, pre-check=0, max-age=0'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '-1'
return response
def create_app():
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///raffle.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # optional, but can improve performance
app.config.from_prefixed_env()
config_filename = os.path.join(app.instance_path, "config.json")
if os.path.isfile(config_filename):
app.config.from_file(config_filename, load=json.load)
with open(config_filename, 'r') as f:
_config = json.load(f)
else:
print(f"{config_filename} not found")
_config = {}
if app.config['SECRET_KEY'] is None:
app.config['SECRET_KEY'] = secrets.token_hex()
print(f"Using generated key {app.config['SECRET_KEY']}")
_config['SECRET_KEY'] = app.config['SECRET_KEY']
with open(config_filename, 'w') as f:
json.dump(_config, f)
db.init_app(app)
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
login_manager.login_message = 'Favor entrar com senha para acessar esta página!'
login_manager.login_message_category = 'danger'
@login_manager.user_loader
def load_user(user_id):
return Seller.query.get(user_id)
migrate = Migrate(app, db)
if not os.path.isfile(os.path.join(app.instance_path, 'raffle.db')):
print('Criando novo banco de dados `raffle.db`')
with app.app_context():
db.create_all()
stamp()
import sys
sys.exit()
scheduler = BackgroundScheduler()
scheduler.add_job(func=backup_db, args=(app.instance_path,), trigger="interval", hours=1)
scheduler.start()
return app
app = create_app()
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
user = Seller.query.filter_by(username=request.form['username']).first()
if user and user.check_password(request.form['password']):
login_user(user)
return redirect(url_for('seller_screen'))
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('login'))
@app.route('/venda', methods=['GET', 'POST'])
@login_required
def seller_screen():
if request.method == 'POST':
if not request.form['amount'].isdigit() or int(request.form['amount']) <= 0:
flash("Quantidade inválida", 'warning')
return redirect(url_for('seller_screen'))
purchase = Purchase(seller_id=current_user.id, total_numbers=int(request.form['amount']),
numbers_left=int(request.form['amount']), remarks=request.form['remarks'])
purchase.generate_id()
db.session.add(purchase)
db.session.commit()
purchase_link = url_for('numbers_screen', purchase_id=purchase.id, _external=True)
return remove_cache(render_template('purchase_link.html', link=purchase_link))
return render_template('seller_screen.html')
def suggest_numbers(raffle_id, amount=3):
nums_needed = target_pool_size = max(amount * 2, 50)
possible_suggestions = []
candidate_number = 1
for ticket in Ticket.query.filter_by(raffle_id=raffle_id).order_by(Ticket.chosen_number.asc()):
possible_suggestions.extend(range(candidate_number, min(ticket.chosen_number, candidate_number + nums_needed)))
candidate_number = ticket.chosen_number + 1
if len(possible_suggestions) >= target_pool_size:
nums_needed = 0
break
nums_needed = target_pool_size - len(possible_suggestions)
possible_suggestions.extend(range(candidate_number, candidate_number + nums_needed))
return sorted(random.sample(possible_suggestions, amount))
@app.route('/numeros/<purchase_id>', methods=['GET', 'POST'])
def numbers_screen(purchase_id):
purchase = Purchase.query.get_or_404(purchase_id)
buyer_data = session.get('buyer_data', {'name': '', 'contact': ''})
if request.method == 'POST':
_validation_errors = False
if not request.form['raffle_id']:
flash("Rifa não escolhida, escolha uma rifa!", 'warning')
_validation_errors = True
if purchase.numbers_left <= 0:
flash("Acabaram os números para escolher, compre mais!", 'danger')
_validation_errors = True
if not request.form['name']:
flash("Preencher o nome é obrigatório", 'warning')
_validation_errors = True
if not request.form['number'].isdigit() or int(request.form['number']) <= 0:
flash("Número inválido!", 'danger')
_validation_errors = True
if not _validation_errors:
raffle = Raffle.query.get_or_404(request.form['raffle_id'])
ticket = Ticket(purchase_id=purchase.id, raffle_id=raffle.id, buyer_name=request.form['name'],
buyer_contact=request.form['contact'], chosen_number=request.form['number'])
db.session.add(ticket)
try:
purchase.decrease_numbers()
db.session.commit()
session['failed_attempts'] = 0
flash(f"Número reservado: {request.form['number']}", 'success')
except IntegrityError:
db.session.rollback()
session['failed_attempts'] = session.get('failed_attempts', 0) + 1
if session['failed_attempts'] < 3:
flash(f"O número {request.form['number']} já foi escolhido para {raffle.description}!", 'warning')
else:
suggested_numbers = suggest_numbers(raffle.id)
flash(f"{request.form['number']} também já foi! Sugestões para {raffle.description}: "
f"{', '.join(map(str, suggested_numbers))}", 'warning')
session['buyer_data'] = {'name': request.form['name'], 'contact': request.form['contact']}
return redirect(url_for('numbers_screen', purchase_id=purchase_id))
raffles = Raffle.query.all()
tickets = Ticket.query.filter_by(purchase_id=purchase_id).options(joinedload(Ticket.raffle)).order_by(Ticket.raffle_id).all()
grouped_tickets = {k: list(v) for k, v in itertools.groupby(tickets, key=operator.attrgetter('raffle.description'))}
# tickets = Ticket.query.filter_by(purchase_id=purchase_id).all()
return remove_cache(render_template('purchase.html', purchase=purchase, raffles=raffles,
grouped_tickets=grouped_tickets, buyer_data=buyer_data))
@app.route('/vendas', methods=['GET'])
@login_required
def seller_purchases():
seller_id = current_user.id
purchases = Purchase.query.filter_by(seller_id=seller_id).order_by(Purchase.timestamp.desc(), Purchase.id.desc())
return render_template('seller_purchases.html', purchases=purchases)
@app.cli.command('create-seller')
@click.argument('username')
@click.argument('password')
def create_seller(username, password):
"""Create a new seller"""
from rifaserver.models import Seller, db
# check if seller exists
existing_seller = Seller.query.filter_by(username=username).first()
if existing_seller:
click.echo(f"Username {username} already exists.")
return
seller = Seller(
username=username,
password_hash=generate_password_hash(password)
)
db.session.add(seller)
db.session.commit()
click.echo(f"Seller: {username} created.")
@app.cli.command('cria-rifa')
@click.argument('descricao')
@with_appcontext
def create_raffle(descricao):
"""Create a new raffle"""
from rifaserver.models import db, Raffle
# Create a new raffle
raffle = Raffle(description=descricao)
db.session.add(raffle)
db.session.commit()
click.echo(f"Rifa '{descricao}' criada.")
@app.cli.command('muda-senha')
@click.argument('usuario')
@click.argument('nova_senha')
@with_appcontext
def change_password(usuario, nova_senha):
"""Change password for a seller"""
from rifaserver.models import db, Seller
from werkzeug.security import generate_password_hash
# Find the seller
seller = Seller.query.filter_by(username=usuario).first()
if not seller:
click.echo(f"Vendedor não encontrado: {usuario}")
return
# Change the password and commit the changes
seller.password_hash = generate_password_hash(nova_senha)
db.session.commit()
click.echo(f"Senha para {usuario} foi atualizada.")