代码拉取完成,页面将自动刷新
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
import pymysql
from flask import request,jsonify
from flask_cors import CORS
from flask_socketio import SocketIO,send,emit,join_room, leave_room
import urllib.parse
import user_view
from celery import Celery
from datetime import timedelta
pymysql.install_as_MySQLdb()
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://root:root@localhost:3306/md"
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['BROKER_URL'] = 'redis://localhost:6379'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379'
app.config['CELERY_ACCEPT_CONTENT'] = ['json', 'pickle']
app.config['REDIS_URL'] = 'redis://localhost:6379'
app.config['JSON_AS_ASCII'] = False
CORS(app,cors_allowed_origins="*")
app.register_blueprint(user_view.user)
db = SQLAlchemy(app)
socketio = SocketIO(app,cors_allowed_origins='*',async_mode="threading",message_queue=app.config['CELERY_RESULT_BACKEND'])
celery = Celery(app.name)
celery.conf.update(app.config)
celery.conf.CELERYBEAT_SCHEDULE = {
"test":{
"task":"get_cron",
"schedule":timedelta(seconds=10)
}
}
@celery.task(name="get_cron")
def get_cron():
get_sendback.delay()
@celery.task()
def get_sendback():
socketio.emit('sendback','message',broadcast=True)
@app.route('/task')
def start_background_task():
get_sendback.delay()
return '开始'
@app.route('/',methods=['GET','POST',"PUT","DELETE"])
def hello_world():
#res = db.session.execute("insert into user (`username`) values ('123') ")
# res = db.session.execute(" select id,username from user ").fetchall()
# data = request.args.get("id")
# #data = request.form.get("id")
# print(data)
# print(res)
# #return 'Hello Flask'
# return jsonify({'result': [dict(row) for row in res]})
return jsonify({'message':'你好123,Docker'})
@socketio.on('join')
def on_join(data):
username = 'user1'
room = 'room1'
join_room(room)
send(username + ' has entered the room.', room=room)
@socketio.on('message')
def handle_message(message):
message = urllib.parse.unquote(message)
print(message)
send(message,broadcast=True)
@socketio.on('connect', namespace='/chat')
def test_connect():
emit('my response', {'data': 'Connected'})
@socketio.on('disconnect', namespace='/chat')
def test_disconnect():
print('Client disconnected')
@app.route("/sendback",methods=['GET'])
def sendback():
socketio.emit('sendback','message')
return 'ok'
if __name__ == '__main__':
socketio.run(app,debug=True,host="0.0.0.0",port=5000)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。