1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
| from flask import Flask, request, render_template, url_for, flash, redirect from gevent import pywsgi from flask_sqlalchemy import SQLAlchemy import os import sys import click from werkzeug.security import generate_password_hash, check_password_hash from flask_login import LoginManager, UserMixin,login_user, login_required, logout_user,current_user
WIN = sys.platform.startswith('win') if WIN: prefix = 'sqlite:///' else: prefix = 'sqlite:////'
app = Flask(__name__) app.config['SECRET_KEY'] = 'dev' app.config['SQLALCHEMY_DATABASE_URI'] = prefix + os.path.join(app.root_path, 'data.db') app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app) login_manager = LoginManager(app) login_manager.login_view = 'login'
class User(db.Model,UserMixin): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(20)) username = db.Column(db.String(20)) password_hash = db.Column(db.String(128))
def set_password(self, password): self.password_hash = generate_password_hash(password)
def validate_password(self, password): return check_password_hash(self.password_hash, password)
class Movie(db.Model): id = db.Column(db.Integer, primary_key=True) content = db.Column(db.String(60)) name = db.Column(db.String(6))
class Read(db.Model): id = db.Column(db.Integer, primary_key=True) uesrname = db.Column(db.String(20)) bookname = db.Column(db.String(20)) zjid = db.Column(db.Integer)
@app.cli.command() @click.option('--drop', is_flag=True, help='删除后创建.') def initdb(drop): """初始化数据库.""" if drop: db.drop_all() db.create_all() click.echo('已初始化的数据库.')
@app.cli.command() @click.option('--username', prompt=True, help='The username used to login.') @click.option('--password', prompt=True, hide_input=True, confirmation_prompt=True, help='The password used to login.') def admin(username, password): """Create user.""" db.create_all()
user = User.query.first() if user is not None: click.echo('Updating user...') user.username = username user.set_password(password) else: click.echo('Creating user...') user = User(username=username, name='Admin') user.set_password(password) db.session.add(user)
db.session.commit() click.echo('Done.')
@login_manager.user_loader def load_user(user_id): user = User.query.get(int(user_id)) return user
@app.route('/', methods=['GET', 'POST']) def index(): if request.method == 'POST': content = request.form.get('content') name = request.form.get('name') if not content or not name or len(name) > 6 or len(content) > 60: flash('输入无效.') return redirect(url_for('index')) movie = Movie(content=content, name=name) db.session.add(movie) db.session.commit() flash('您的留言已提交成功.') return redirect(url_for('index')) ly_lists = Movie.query.all() book_lists = os.listdir('static/book') return render_template('index.html', book_lists=book_lists, ly_lists=ly_lists)
@app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password']
if not username or not password: flash('输入无效.') return redirect(url_for('login'))
user = User.query.filter_by(username=username).first() if user: if username == user.username and user.validate_password(password): login_user(user) flash(current_user.username + ',欢迎回家!') return redirect(url_for('index'))
flash('无效的用户名或密码.') return redirect(url_for('login'))
return render_template('login.html')
@app.route('/signup', methods=['GET', 'POST']) def signup(): if request.method == 'POST': username = request.form['username'] password = request.form['password']
if not username or not password or len(username) > 20 or len(password) > 20: flash('输入无效.') return redirect(url_for('signup')) if username.lower() == 'admin': flash('大佬,管理员是小秋秋哦!') return redirect(url_for('signup')) users = User.query.filter_by(username=username).first() if users: flash('注册账号重复,请重新输入!') return redirect(url_for('signup')) else: user = User(username=username, name=username) user.set_password(password) db.session.add(user) db.session.commit() flash('注册成功!') return redirect(url_for('login'))
return render_template('signup.html')
@app.route('/admin') @login_required def admin(): if current_user.id == 1: admins = User.query.all() return render_template('admin.html', admins=admins)
@app.route('/admin/delete/<int:admin_id>', methods=['POST']) @login_required def admindelete(admin_id): if admin_id != 1: admim = User.query.get_or_404(admin_id) name = admim.username readjls = Read.query.filter_by(uesrname=name).all() if readjls: for readjl in readjls: db.session.delete(readjl) db.session.delete(admim) db.session.commit() flash('会员已删除!')
return redirect(url_for('admin'))
@app.route('/logout') @login_required def logout(): logout_user() flash('欢迎下次光临!') return redirect(url_for('index'))
@app.route('/movie/delete/<int:movie_id>', methods=['POST']) @login_required def delete(movie_id): movie = Movie.query.get_or_404(movie_id) db.session.delete(movie) db.session.commit() flash('留言已删除.') return redirect(url_for('index'))
@app.route('/book/<book_id>') def book_page(book_id='许仙志.txt'): book_path = 'static/book/' + book_id with open(book_path, 'r', encoding='utf-8') as f: txts = f.readlines() nums = int(len(txts) / 100) + 1 page_nums = range(1, int(nums)) return render_template('page.html', book_id=book_id, page_nums=page_nums)
@app.route('/record') @login_required def record(): zj_ids = Read.query.filter_by(uesrname=current_user.username).all() return render_template('record.html', zj_ids=zj_ids)
@app.route('/<book_id>/<int:movie_id>', methods=['GET', 'POST']) def home(book_id='许仙志.txt', movie_id=1): book_path = 'static/book/' + book_id with open(book_path, 'r', encoding='utf-8') as f: txts = f.readlines() if current_user.is_authenticated: username = current_user.username if movie_id == 1: xj_ids = Read.query.filter_by(uesrname=username, bookname=book_id).first() if xj_ids: movie_id = xj_ids.zjid else: r1 = Read(uesrname=username, bookname=book_id, zjid=movie_id) db.session.add(r1) db.session.commit() else: xj_ids = Read.query.filter_by(uesrname=username, bookname=book_id).first() if xj_ids: xj_ids.zjid = movie_id db.session.commit() else: r1 = Read(uesrname=username, bookname=book_id, zjid=movie_id) db.session.add(r1) db.session.commit()
xj_ids = Read.query.filter_by(uesrname=username, bookname=book_id).first() xj_ids.zjid = movie_id db.session.commit()
txt_mun = movie_id * 100 return render_template('home.html', book_id=book_id, movie_id=movie_id, contents=txts[txt_mun-100:txt_mun])
@app.errorhandler(404) def page_not_found(e): return render_template('404.html'), 404
if __name__ == '__main__': server = pywsgi.WSGIServer(('0.0.0.0', 5000), app) server.serve_forever()
|