代码拉取完成,页面将自动刷新
import json
from datetime import datetime
from random import sample
import tornado.websocket
import ctrl_base
from database.Xmongodb import mongo, mongodb
from util.utils import generate_video_list
class test_index(ctrl_base.BaseHandler):
async def get(self):
current_type, current_username, current_user_id, find_0, find_0_0, find_0_1 = await self.get_base_config()
await self.render("test_index.html",find_0=find_0,current_type=current_type,
current_username=current_username, current_user_id=current_user_id,find_0_0=find_0_0,
find_0_1=find_0_1)
class test(ctrl_base.BaseHandler):
async def get(self):
current_type, current_username, current_user_id, find_0, find_0_0, find_0_1 = await self.get_base_config()
await self.render("test.html",find_0=find_0,current_type=current_type,
current_username=current_username, current_user_id=current_user_id,find_0_0=find_0_0,
find_0_1=find_0_1)
class test_websocket(tornado.websocket.WebSocketHandler):
testIn = 0
videoIn = 1
testSubmit = 2
users = []
users_video = {}
users_cache = {}
def createMessage(self, type, data):
message = {
"type": type,
"msg": data,
"create_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
return json.dumps(message)
def init_video_choice(self):
random_list = sample(self.users_video.get('{}'.format(self)), 2)
msg = {
"img_a": '/static/img/test_img/{}.png'.format(random_list[0]),
"img_b": '/static/img/test_img/{}.png'.format(random_list[1]),
"num_a": random_list[0],
"num_b": random_list[1]
}
self.users_cache.update(
{
'{}'.format(self):
{
'video_choice_A': random_list[0],
'video_choice_B': random_list[1],
'begin_time': datetime.now()
}
})
return msg
async def open(self, *args, **kwargs):
self.users.append(self)
user_video = generate_video_list()
self.users_video.update(
{
"{}".format(self):user_video
}
)
msg = self.init_video_choice()
await mongo.record.insert_one(
{
'user': str(self),
'all': user_video,
'create_time': datetime.now()
})
await self.write_message(self.createMessage(self.testIn,msg))
async def on_message(self, message):
message = json.loads(message)
if message.get('type') == 'choose':
video_label = message.get('ob')
self.users_video.get('{}'.format(self)).remove(int(video_label))
self.users_cache.get(str(self)).update(
{
'choose_time':(datetime.now()-self.users_cache.get(str(self)).get('begin_time')).microseconds/1000,
'choose_video':int(video_label)
})
self.users_cache.get(str(self)).pop('begin_time')
print(self.users_cache.get(str(self)))
print(len(self.users_video.get('{}'.format(self))))
video = '/static/video/test_video/{}.mp4'.format(int(video_label))
await self.write_message(self.createMessage(self.videoIn,video))
elif message.get('type') == 'submit':
await mongo.record.update_one(
{
'user': str(self)
},
{
"$push":
{
"history":
{
"round": message.get('round'),
"disgusted": message.get('disgusted'),
"amused": message.get('amused'),
"calm": message.get('calm'),
"anxious": message.get('anxious'),
"sad": message.get('sad'),
"choose_time": self.users_cache.get(str(self)).get('choose_time'),
"choose_video": self.users_cache.get(str(self)).get('choose_video'),
"video_choice_A": self.users_cache.get(str(self)).get('video_choice_A'),
"video_choice_B": self.users_cache.get(str(self)).get('video_choice_B')
}
}
})
if int(message.get('round')) < 40:
msg = self.init_video_choice()
await self.write_message(self.createMessage(self.testSubmit, msg))
else:
await self.write_message(self.createMessage(self.testSubmit, None))
def on_close(self):
record = mongodb.record.find_one(
{
'user': str(self),
})
if record.get('history') is None:
mongodb.record.delete_one(
{
'user': str(self),
})
del self.users_video[str(self)]
del self.users_cache[str(self)]
self.users.remove(self)
def check_origin(self, origin):
return True
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。