2 Star 4 Fork 3

刘悦的技术博客/myflask

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
manage.py 2.66 KB
一键复制 编辑 原始数据 按行查看 历史
liuyue 提交于 2020-07-21 20:45 . 增加热启动
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
import pymysql
from flask import request,jsonify
from flask_cors import CORS
from flask_socketio import SocketIO,send,emit,join_room, leave_room
import urllib.parse
import user_view
from celery import Celery
from datetime import timedelta
pymysql.install_as_MySQLdb()
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://root:root@localhost:3306/md"
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['BROKER_URL'] = 'redis://localhost:6379'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379'
app.config['CELERY_ACCEPT_CONTENT'] = ['json', 'pickle']
app.config['REDIS_URL'] = 'redis://localhost:6379'
app.config['JSON_AS_ASCII'] = False
CORS(app,cors_allowed_origins="*")
app.register_blueprint(user_view.user)
db = SQLAlchemy(app)
socketio = SocketIO(app,cors_allowed_origins='*',async_mode="threading",message_queue=app.config['CELERY_RESULT_BACKEND'])
celery = Celery(app.name)
celery.conf.update(app.config)
celery.conf.CELERYBEAT_SCHEDULE = {
"test":{
"task":"get_cron",
"schedule":timedelta(seconds=10)
}
}
@celery.task(name="get_cron")
def get_cron():
get_sendback.delay()
@celery.task()
def get_sendback():
socketio.emit('sendback','message',broadcast=True)
@app.route('/task')
def start_background_task():
get_sendback.delay()
return '开始'
@app.route('/',methods=['GET','POST',"PUT","DELETE"])
def hello_world():
#res = db.session.execute("insert into user (`username`) values ('123') ")
# res = db.session.execute(" select id,username from user ").fetchall()
# data = request.args.get("id")
# #data = request.form.get("id")
# print(data)
# print(res)
# #return 'Hello Flask'
# return jsonify({'result': [dict(row) for row in res]})
return jsonify({'message':'你好123,Docker'})
@socketio.on('join')
def on_join(data):
username = 'user1'
room = 'room1'
join_room(room)
send(username + ' has entered the room.', room=room)
@socketio.on('message')
def handle_message(message):
message = urllib.parse.unquote(message)
print(message)
send(message,broadcast=True)
@socketio.on('connect', namespace='/chat')
def test_connect():
emit('my response', {'data': 'Connected'})
@socketio.on('disconnect', namespace='/chat')
def test_disconnect():
print('Client disconnected')
@app.route("/sendback",methods=['GET'])
def sendback():
socketio.emit('sendback','message')
return 'ok'
if __name__ == '__main__':
socketio.run(app,debug=True,host="0.0.0.0",port=5000)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/QiHanXiBei/myflask.git
git@gitee.com:QiHanXiBei/myflask.git
QiHanXiBei
myflask
myflask
master

搜索帮助