代码拉取完成,页面将自动刷新
#!/usr/bin/env python2
# -*- coding: UTF-8 -*-
#import roslib
#import pyrealsense2
import requests
import json
import re
import rospy
import cv2
import tf2_ros
# import numpy as np
import actionlib
from tf2_geometry_msgs import tf2_geometry_msgs
from actionlib_msgs.msg import *
from simple_navigation.srv import *
from geometry_msgs.msg import Pose, PoseWithCovarianceStamped, Point, Quaternion, Twist
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
from tf.transformations import euler_from_quaternion, quaternion_from_euler
from std_msgs.msg import String
from random import sample
from math import pow, sqrt
from sensor_msgs.msg import Image
from cv_bridge import CvBridge, CvBridgeError
import smach # smach状态机
import smach_ros # smach状态机
#from seu_robot_msgs.msg import object_position
from smach import State, StateMachine
from simple_navigation.msg import *
n = 0
first_launch = 0
locations = dict()
locations['bedroom'] = Pose(Point(-1.5553, 1.0021, 0.00),Quaternion(0.000, 0.000, 0.73, 0.66))
locations['living_room'] = Pose(Point(-1.5781, -1.2908, 0.00),Quaternion(0.000, 0.000, 0.000, 1.000))
locations['three'] = Pose(Point(1.30, 4.7651, 0.00),Quaternion(0.000, 0.000, 0.000, 1.000))
locations['four'] = Pose(Point(-2.9921, 3.7542, 0.00),Quaternion(0.000, 0.000, 0.70, 0.71))
seuhome_speech_recognition_name = 'speech_recognition_service'
seuhome_llm_name = 'baidu_llm_service'
# 注意替换路径
plan_set_path = "/home/camellia/catkin_turtlebot3/src/seu_llm/scripts/plan_sets/plan_set.json"
'''
为了在程序中调用文心一言4.0大语言模型API,先在百度智能云控制台“应用接入”里创建应用,获得AppID,API Key以及Secret Key。随后在计费管理中开通相关服务,ERNIE-Bot-4的资费为0.12¥/千tokens。
之后便可在get_access_token()函数下将API更换为个人应用:使用 API Key,Secret Key 获取access_token,替换下列示例中的应用API Key、应用Secret Key
url = "https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=[应用API Key]&client_secret=[应用Secret Key]"
'''
def get_access_token():
url = "https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=H08iY72qdIUuG2O40xNF3n9q&client_secret=blBE9UY85m8Cp13yOKW44WGxemGBCdLn"
payload = json.dumps("")
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
return response.json().get("access_token")
def srv_speech_recognition():
print(123321)
now_srv_name = seuhome_speech_recognition_name # 当前服务的名称
rospy.wait_for_service(now_srv_name) # 等待服务
rospy.loginfo(now_srv_name + ' available')
try:
now_srv = rospy.ServiceProxy(now_srv_name, seu_speech_recognition) # 当前操作的服务
now_request = True # 请求值,加前缀防止重名
now_response = now_srv(now_request) # 返回值
if now_response.state == 1:
rospy.loginfo('Service call Succeeded: ' + now_srv_name) # 调用成功
return now_response.word
else:
print ("Service call Failed: " + now_srv_name) # 调用失败
return -1 # 调用失败,返回-1
except rospy.ServiceException as e: # 调用失败,输出具体信息
print("Service call Failed: " + now_srv_name + ": %s" % e)
return -1
def ros_param_init():
rospy.set_param('pose_origin_X', 0) # 初始位置X值
rospy.set_param('pose_origin_Y', 0) # 初始位置Y值
rospy.set_param('pose_origin_yaw', 0) # 初始位置转角
rospy.set_param('pose_now_X', 0) # 当前X值
rospy.set_param('pose_now_Y', 0) # 当前Y值
rospy.set_param('pose_now_yaw', 0) # 当前转角值,注意在导航过程中实时更新
def analyze(req):
#rospy.loginfo("Start to analyze the command")
url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions_pro?access_token=" + get_access_token()
# 定义基元动作
action_base = "move_to, look_for_obj, look_for_person, follow, grasp, pass_to, speak, answer"
prompt = f"from actions import {action_base}"
# 将action_base转换为列表形式
action_base_lst = action_base.split(", ")
# 添加场景obj、person
# items = ['wine glass', 'kitchen cabinet', 'lime', 'mug', 'salmon', 'fruit', 'clothes', 'apple']
# prompt += f"\n\nobjects = {items}"
# 加载任务模板
with open(plan_set_path, 'r') as f:
tmp = json.load(f)
prompt_egs = {}
for k, v in tmp.items():
prompt_egs[k] = v
prompt += f"\n\nexamples = {prompt_egs}"
# 加载任务提示
prompt += "\n\nAnswer the following question with reference to the examples in the form of a nested python list without any modifiers. Your response is expected to be in the following format: 'Answer: [['actions', 'parameters'], ..., ['actions', 'parameters']]'."
# 添加当前任务
#prompt_task = req.command
prompt_task = req
# rospy.loginfo("Task: %s",prompt_task)
# 创建当前prompt
curr_prompt = f"{prompt}\n\n{prompt_task}\n\t"
#print(curr_prompt)
payload = json.dumps({
"messages": [
{
"role": "user",
"content": curr_prompt
}
]
})
headers = {
'Content-Type': 'application/json'
}
pattern = r"\[\[.*?\]\]"
prompt_add1 = "Please note the format of the answer!"
prompt_add2 = "Please note that mandated subtasks need to be used to complete task planning"
for i in range(5):
response = requests.request("POST", url, headers=headers, data=payload)
# 将回答文本中的“result”存储在字符串中
match = re.search(pattern, response.text)
if match:
action_str = match.group()
#rospy.loginfo(action_str)
# 将匹配到的字符串转换为函数列表
action_lst = eval(match.group())
# 如果规划的子任务不在任务模板中,则提示用户重新回答
if not all([action in action_base_lst for action, _ in action_lst]):
#rospy.logerr("Error: Planned subtasks are not executable. Trying again...")
curr_prompt = f"{prompt}\n\n{prompt_task}\n\n{prompt_add2}"
payload = json.dumps({
"messages": [
{
"role": "user",
"content": curr_prompt
}
]
})
continue
else:
#rospy.loginfo("Task plan successfully!")
state = True
errormsg = 'success'
answer = action_str
break
else:
#rospy.logerr("Error: Failed to store result. Trying again...")
# 添加提示词:请注意回答的格式!
curr_prompt = f"{prompt}\n\n{prompt_task}\n\n{prompt_add1}"
payload = json.dumps({
"messages": [
{
"role": "user",
"content": curr_prompt
}
]
})
if i == 4:
# 请求超过5次后,返回失败
#rospy.logerr("Error: Failed to store result after 5 attempts. Task planning failed.")
action_str = ""
state = False
errormsg = 'failure'
answer = action_str
break
return state,errormsg,answer
class Moving():
def __init__(self):
self.goal_sent = False
rospy.on_shutdown(self.shutdown)
self.move_base = actionlib.SimpleActionClient("move_base", MoveBaseAction)
rospy.loginfo("Wait for the action server to come up")
self.move_base.wait_for_server(rospy.Duration(5))
print('connectted')
# 从patrol_nav.launch文件中得到的参数
# self.rest_time = rospy.get_param("~rest_time", 5)
# self.keep_patrol = rospy.get_param("~keep_patrol", False)
# self.random_patrol = rospy.get_param("~random_patrol", False)
# self.patrol_type = rospy.get_param("~patrol_type", 0)
# self.patrol_loop = rospy.get_param("~patrol_loop", 2)
# self.patrol_time = rospy.get_param("~patrol_time", 5)
def goto(self, location):
# Send a goal
self.goal_sent = True
goal = MoveBaseGoal()
goal.target_pose.header.frame_id = 'map'
goal.target_pose.header.stamp = rospy.Time.now()
goal.target_pose.pose = location
# Start moving
self.move_base.send_goal(goal)
#Since the map is not big, set the duration time of the turtlebot as 300s
success = self.move_base.wait_for_result(rospy.Duration(300))
state = self.move_base.get_state()
result = False
if success and state == GoalStatus.SUCCEEDED:
result = True
else:
self.move_base.cancel_goal()
self.goal_sent = False
return result
def shutdown(self):
if self.goal_sent:
self.move_base.cancel_goal()
rospy.loginfo("Stop")
rospy.sleep(1)
class mainstate(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['GetIt'],output_keys=['analyze_output']) # 状态机初始化
#self.info = info
self.initialpose_pub = rospy.Publisher('/initialpose', PoseWithCovarianceStamped, queue_size=10)
def execute(self, userdata):
#word=srv_speech_recognition()
#print(word)
command='find cola in bedroom'
print(command)
state,errormsg,answer=analyze(command)
userdata.analyze_output=answer
print(answer)
# navigator = Moving()
# navigator.goto(locations[word])
rospy.loginfo('I get your command') # 输出信息,便于调试
#srv_WakeUp() # 调用唤醒词服务。由于服务的性质为执行完毕才有返回值,则在接收到唤醒词后直接结束服务
return 'GetIt' # 满足条件'TimeToWakeUp',转移到状态MainState
###定义基本动作
class MoveToState(smach.State):
def __init__(self, arg):
smach.State.__init__(self, outcomes=['success', 'failure'])
self.target = arg # 参数,即导航目的地
def execute(self,userdata):
# 执行操作
word = "Moving to "+ str(self.target)
print(word)
# 语音播报
#speech_to_text(word)
navigator = Moving()
navigator.goto(locations[self.target])
if word:
return 'success'
else:
return 'failure'
class LookForObjState(smach.State):
def __init__(self,arg):
smach.State.__init__(self, outcomes=['success', 'failure'])
self.target = arg # 参数,即所找物品名称
#State.__init__(self, outcomes=['success', 'timeout'])
#ros发布初始化
self.cmd_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10)
self.camera_state_pub = rospy.Publisher('/findobject_camera_state', String, queue_size=10)
#ros接收初始化
rospy.Subscriber('/find_object_result', object_position, self.object_callback)
#初始化参数
self.timeout = False
self.is_find_object = False
self.object_count = 0
#旋转条件改变后就可去掉
self.rotate_count = 0
def execute(self, userdata):
word = "Looking for object "+ str(self.target)
# 语音播报
#speech_to_text(word)
print(word)
#rospy.logerr("######### ExploreRoom #########")
# #设置旋转速度
# cmd_vel = Twist()
# cmd_vel.angular.z = 0.18
# #打开摄像头
# camera_state = String()
# camera_state.data = "camera_open"
#
# self.is_find_object = False
#
# #找物
# while not rospy.is_shutdown():
#
# #旋转计数
# self.rotate_count = self.rotate_count + 1
# #rospy.loginfo('rotate_count:%s',self.rotate_count)
#
# if self.is_find_object:
# break
#
# #之后旋转条件建议改成转到原始朝向就停止
# elif self.rotate_count > 400:
# rospy.logwarn('No %s',object)
# self.timeout = True
# break
#
# #ros消息发送
# rospy.sleep(0.1)
# self.cmd_vel_pub.publish(cmd_vel)
# self.camera_state_pub.publish(camera_state)
#
#
# #停止旋转
# cmd_vel = Twist()
# self.cmd_vel_pub.publish(cmd_vel)
# #关闭摄像头
# camera_state = String()
# camera_state.data = "camera_close"
# self.camera_state_pub.publish(camera_state)
#
# #参数重置
# self.object_count = 0
# self.rotate_count = 0
# self.is_find_object = False
self.timeout = True
if self.timeout:
return 'success'
else:
return 'failure'
def pose_callback(self, msg):
# Update robot current pose
self.current_pose = msg
def object_callback(self, msg):
global ps_out
if msg.label == object:
self.object_count = self.object_count + 1
#用于防止单帧误识别
if self.object_count > 10:
#同探索房间tf转换处理
ps = tf2_geometry_msgs.PointStamped()
ps.header.stamp = rospy.Time()
ps.header.frame_id = "camera1_color_optical_frame"
ps.point.x = msg.x
ps.point.y = msg.y
ps.point.z = msg.z
ps_out = buffer.transform(ps,"map")
#需要改成根据ps_out的x,y结果判断是否在场地中,判断是否真的找到物体
self.is_find_object = True
else:
'''
当同一张照片出现多个物体时,这个else会不会出问题建议多测
情况如下:
画面存在两个物体,物体A满足条件,连续5帧图像,识别先发送物体A信息,再发送物体B信息,
当物体A的回传信息触发is_find_object = True后,物体B的信息会将其重新变为False
考虑要将这个else删除
'''
self.is_find_object = False
def create_state_machine(actions):
# 将列表中的动作,参数和序号拼接,作为每个状态的名称
actions=actions.replace('[','')
actions=actions.replace(']', '')
actions = actions.replace('\'', '')
state_names=actions.split(',')
# state_names = [f'{action[0]} {action[1]} {index}' for index, action in enumerate(actions)]
# state_names.append('sub_success') # 在末端添加‘sub_success’,作为最后一个状态的状态转移
# print('state_names')
# print(state_names)
# print(type(state_names[0]))
# 创建状态机
# action=[]
# arg=[]
sm = smach.StateMachine(outcomes=['sub_success4', 'sub_failure'])
# print(state_names)
# print(len(state_names))
# for i in range(int(len(state_names) / 2)):
# action.append(state_names[i*2])
# arg.append(state_names[i*2+ 1])
# # 根据列表中的动作,添加不同的动作状态
# print(action)
# print(arg)
action=['move_to','look_for_obj','move_to','look_for_obj']
arg=['bedroom','water','living_room','cola']
action.append('sub_success')
# with sm:
# smach.StateMachine.add('move_to',
# MoveToState(arg[0]),
# transitions={'success': action[1], # 当前状态成功,跳转下一状态,否则重复当前状态。下同
# 'failure': 'move_to'})
# smach.StateMachine.add('look_for_obj',
# LookForObjState(arg[1]),
# transitions={'success': 'sub_success',
# 'failure': 'look_for_obj'})
with sm:
for i in range(len(arg)):
if action[i] == 'move_to':
smach.StateMachine.add('move_to'+str(i),
MoveToState(arg[i]),
transitions={'success': action[i+1]+str(i+1), # 当前状态成功,跳转下一状态,否则重复当前状态。下同
'failure': 'move_to'+str(i)})
elif action[i] == 'look_for_obj':
smach.StateMachine.add('look_for_obj'+str(i),
LookForObjState(arg[i]),
transitions={'success': action[i+1]+str(i+1),
'failure': 'look_for_obj'+str(i)})
return sm
def main():
# 主节点初始化
rospy.init_node('REC_main')
#wait_for_service()
ros_param_init()
sm_top = smach.StateMachine(outcomes=['end'])
with sm_top:
#smach.StateMachine.add('mainstate',mainstate(),transitions={'GetIt':'end'})
smach.StateMachine.add('mainstate', LookForObjState('cola'), transitions={'success': 'end','failure':'end'})
sis = smach_ros.IntrospectionServer('seurat_main',sm_top,'/SM_ROOT')
sis.start()
speech_state = 'input'
# 启动状态机
outcome = sm_top.execute()
# 获得解析后的任务列表,用于执行状态机的创建
#operations = sm_top.userdata.analyze_output
#analyze_result = sm_top.userdata.analyze_output
#rospy.loginfo('operations:%s', operations)
sis.stop()
# # 创建大模型执行状态机
# sm_sub = create_state_machine(operations)
# # 创建并启动内部监测服务器
# sis_sub = smach_ros.IntrospectionServer('gpsr_state_machine', sm_sub, '/SM_ROOT_SUB')
# print('sis_sub start')
# sis_sub.start()
#
# # 启动执行状态机
# outcome_sub = sm_sub.execute()
#
# sis_sub.stop()
if __name__ == '__main__':
print(123456789)
main()
# try:
# #bridge = CvBridge()
# PatrolNav()
# print('begin_main')
# #navigator = Moving()
# #navigator.goto(locations['bedroom'])
# rospy.spin()
# except rospy.ROSInterruptException:
# rospy.logwarn("patrol navigation exception finished.")
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。