代码拉取完成,页面将自动刷新
同步操作将从 科技ing/pymavlink 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
#!/usr/bin/env python
'''
mavlink python utility functions
Copyright Andrew Tridgell 2011-2019
Released under GNU LGPL version 3 or later
'''
from __future__ import print_function
from builtins import object
import socket, math, struct, time, os, fnmatch, array, sys, errno
import select
from pymavlink import mavexpression
# adding these extra imports allows pymavlink to be used directly with pyinstaller
# without having complex spec files. To allow for installs that don't have ardupilotmega
# at all we avoid throwing an exception if it isn't installed
try:
import json
from pymavlink.dialects.v10 import ardupilotmega
except Exception:
pass
# maximum packet length for a single receive call - use the UDP limit
UDP_MAX_PACKET_LEN = 65535
# Store the MAVLink library for the currently-selected dialect
# (set by set_dialect())
mavlink = None
# Store the mavlink file currently being operated on
# (set by mavlink_connection())
mavfile_global = None
# If the caller hasn't specified a particular native/legacy version, use this
default_native = False
# link_id used for signing
global_link_id = 0
# Use a globally-set MAVLink dialect if one has been specified as an environment variable.
if not 'MAVLINK_DIALECT' in os.environ:
os.environ['MAVLINK_DIALECT'] = 'ardupilotmega'
def mavlink10():
'''return True if using MAVLink 1.0 or later'''
return not 'MAVLINK09' in os.environ
def mavlink20():
'''return True if using MAVLink 2.0'''
return 'MAVLINK20' in os.environ
def evaluate_expression(expression, vars):
'''evaluation an expression'''
return mavexpression.evaluate_expression(expression, vars)
def evaluate_condition(condition, vars):
'''evaluation a conditional (boolean) statement'''
if condition is None:
return True
v = evaluate_expression(condition, vars)
if v is None:
return False
return v
def u_ord(c):
return ord(c) if sys.version_info.major < 3 else c
class location(object):
'''represent a GPS coordinate'''
def __init__(self, lat, lng, alt=0, heading=0):
self.lat = lat
self.lng = lng
self.alt = alt
self.heading = heading
def __str__(self):
return "lat=%.6f,lon=%.6f,alt=%.1f" % (self.lat, self.lng, self.alt)
def set_dialect(dialect):
'''set the MAVLink dialect to work with.
For example, set_dialect("ardupilotmega")
'''
global mavlink, current_dialect
from .generator import mavparse
if 'MAVLINK20' in os.environ:
wire_protocol = mavparse.PROTOCOL_2_0
modname = "pymavlink.dialects.v20." + dialect
elif mavlink is None or mavlink.WIRE_PROTOCOL_VERSION == "1.0" or not 'MAVLINK09' in os.environ:
wire_protocol = mavparse.PROTOCOL_1_0
modname = "pymavlink.dialects.v10." + dialect
else:
wire_protocol = mavparse.PROTOCOL_0_9
modname = "pymavlink.dialects.v09." + dialect
try:
mod = __import__(modname)
except Exception:
# auto-generate the dialect module
from .generator.mavgen import mavgen_python_dialect
mavgen_python_dialect(dialect, wire_protocol)
mod = __import__(modname)
components = modname.split('.')
for comp in components[1:]:
mod = getattr(mod, comp)
current_dialect = dialect
mavlink = mod
# Set the default dialect. This is done here as it needs to be after the function declaration
set_dialect(os.environ['MAVLINK_DIALECT'])
class mavfile_state(object):
'''state for a particular system id'''
def __init__(self):
self.messages = { 'MAV' : self }
self.flightmode = "UNKNOWN"
self.vehicle_type = "UNKNOWN"
self.mav_type = mavlink.MAV_TYPE_FIXED_WING
self.base_mode = 0
self.armed = False # canonical arm state for the vehicle as a whole
if float(mavlink.WIRE_PROTOCOL_VERSION) >= 1:
self.messages['HOME'] = mavlink.MAVLink_gps_raw_int_message(0,0,0,0,0,0,0,0,0,0)
mavlink.MAVLink_waypoint_message = mavlink.MAVLink_mission_item_message
else:
self.messages['HOME'] = mavlink.MAVLink_gps_raw_message(0,0,0,0,0,0,0,0,0)
class param_state(object):
'''state for a particular system id/component id pair'''
def __init__(self):
self.params = {}
class mavfile(object):
'''a generic mavlink port'''
def __init__(self, fd, address, source_system=255, source_component=0, notimestamps=False, input=True, use_native=default_native):
global mavfile_global
if input:
mavfile_global = self
self.fd = fd
self.sysid = 0
self.param_sysid = (0,0)
self.address = address
self.timestamp = 0
self.last_seq = {}
self.mav_loss = 0
self.mav_count = 0
self.param_fetch_start = 0
# state for each sysid
self.sysid_state = {}
self.sysid_state[self.sysid] = mavfile_state()
# param state for each sysid/compid tuple
self.param_state = {}
self.param_state[self.param_sysid] = param_state()
# status of param fetch, indexed by sysid,compid tuple
self.source_system = source_system
self.source_component = source_component
self.first_byte = True
self.robust_parsing = True
self.mav = mavlink.MAVLink(self, srcSystem=self.source_system, srcComponent=self.source_component, use_native=use_native)
self.mav.robust_parsing = self.robust_parsing
self.logfile = None
self.logfile_raw = None
self.start_time = time.time()
self.message_hooks = []
self.idle_hooks = []
self.uptime = 0.0
self.notimestamps = notimestamps
self._timestamp = None
self.WIRE_PROTOCOL_VERSION = mavlink.WIRE_PROTOCOL_VERSION
self.stop_on_EOF = False
self.portdead = False
@property
def target_system(self):
return self.sysid
@property
def target_component(self):
return self.param_sysid[1]
@target_system.setter
def target_system(self, value):
self.sysid = value
if not self.sysid in self.sysid_state:
self.sysid_state[self.sysid] = mavfile_state()
if self.sysid != self.param_sysid[0]:
self.param_sysid = (self.sysid, self.param_sysid[1])
if not self.param_sysid in self.param_state:
self.param_state[self.param_sysid] = param_state()
@target_component.setter
def target_component(self, value):
if value != self.param_sysid[1]:
self.param_sysid = (self.param_sysid[0], value)
if not self.param_sysid in self.param_state:
self.param_state[self.param_sysid] = param_state()
@property
def params(self):
if self.param_sysid[1] == 0:
eff_tuple = (self.sysid, 1)
if eff_tuple in self.param_state:
return getattr(self.param_state[eff_tuple],'params')
return getattr(self.param_state[self.param_sysid],'params')
@property
def messages(self):
return getattr(self.sysid_state[self.sysid],'messages')
@property
def flightmode(self):
return getattr(self.sysid_state[self.sysid],'flightmode')
@flightmode.setter
def flightmode(self, value):
setattr(self.sysid_state[self.sysid],'flightmode',value)
@property
def vehicle_type(self):
return getattr(self.sysid_state[self.sysid],'vehicle_type')
@vehicle_type.setter
def vehicle_type(self, value):
setattr(self.sysid_state[self.sysid],'vehicle_type',value)
@property
def mav_type(self):
return getattr(self.sysid_state[self.sysid],'mav_type')
@mav_type.setter
def mav_type(self, value):
setattr(self.sysid_state[self.sysid],'mav_type',value)
@property
def base_mode(self):
return getattr(self.sysid_state[self.sysid],'base_mode')
@base_mode.setter
def base_mode(self, value):
setattr(self.sysid_state[self.sysid],'base_mode',value)
def auto_mavlink_version(self, buf):
'''auto-switch mavlink protocol version'''
global mavlink
if len(buf) == 0:
return
try:
magic = ord(buf[0])
except:
magic = buf[0]
if not magic in [ 85, 254, 253 ]:
return
self.first_byte = False
if self.WIRE_PROTOCOL_VERSION == "0.9" and magic == 254:
self.WIRE_PROTOCOL_VERSION = "1.0"
set_dialect(current_dialect)
elif self.WIRE_PROTOCOL_VERSION == "1.0" and magic == 85:
self.WIRE_PROTOCOL_VERSION = "0.9"
os.environ['MAVLINK09'] = '1'
set_dialect(current_dialect)
elif self.WIRE_PROTOCOL_VERSION != "2.0" and magic == 253:
self.WIRE_PROTOCOL_VERSION = "2.0"
os.environ['MAVLINK20'] = '1'
set_dialect(current_dialect)
else:
return
# switch protocol
(callback, callback_args, callback_kwargs) = (self.mav.callback,
self.mav.callback_args,
self.mav.callback_kwargs)
self.mav = mavlink.MAVLink(self, srcSystem=self.source_system, srcComponent=self.source_component)
self.mav.robust_parsing = self.robust_parsing
self.WIRE_PROTOCOL_VERSION = mavlink.WIRE_PROTOCOL_VERSION
(self.mav.callback, self.mav.callback_args, self.mav.callback_kwargs) = (callback,
callback_args,
callback_kwargs)
def recv(self, n=None):
'''default recv method'''
raise RuntimeError('no recv() method supplied')
def close(self, n=None):
'''default close method'''
raise RuntimeError('no close() method supplied')
def write(self, buf):
'''default write method'''
raise RuntimeError('no write() method supplied')
def select(self, timeout):
'''wait for up to timeout seconds for more data'''
if self.fd is None:
time.sleep(min(timeout,0.5))
return True
try:
(rin, win, xin) = select.select([self.fd], [], [], timeout)
except select.error:
return False
return len(rin) == 1
def pre_message(self):
'''default pre message call'''
return
def set_rtscts(self, enable):
'''enable/disable RTS/CTS if applicable'''
return
def probably_vehicle_heartbeat(self, msg):
if msg.get_srcComponent() == mavlink.MAV_COMP_ID_GIMBAL:
return False
if msg.type in (mavlink.MAV_TYPE_GCS,
mavlink.MAV_TYPE_GIMBAL,
mavlink.MAV_TYPE_ADSB,
mavlink.MAV_TYPE_ONBOARD_CONTROLLER):
return False
return True
def post_message(self, msg):
'''default post message call'''
if '_posted' in msg.__dict__:
return
msg._posted = True
msg._timestamp = time.time()
type = msg.get_type()
if 'usec' in msg.__dict__:
self.uptime = msg.usec * 1.0e-6
if 'time_boot_ms' in msg.__dict__:
self.uptime = msg.time_boot_ms * 1.0e-3
if self._timestamp is not None:
if self.notimestamps:
msg._timestamp = self.uptime
else:
msg._timestamp = self._timestamp
src_system = msg.get_srcSystem()
src_component = msg.get_srcComponent()
src_tuple = (src_system, src_component)
radio_tuple = (ord('3'), ord('D'))
if not src_system in self.sysid_state:
# we've seen a new system
self.sysid_state[src_system] = mavfile_state()
self.sysid_state[src_system].messages[type] = msg
if src_tuple == radio_tuple:
# as a special case radio msgs are added for all sysids
for s in self.sysid_state.keys():
self.sysid_state[s].messages[type] = msg
if not (src_tuple == radio_tuple or msg.get_type() == 'BAD_DATA'):
if not src_tuple in self.last_seq:
last_seq = -1
else:
last_seq = self.last_seq[src_tuple]
seq = (last_seq+1) % 256
seq2 = msg.get_seq()
if seq != seq2 and last_seq != -1:
diff = (seq2 - seq) % 256
self.mav_loss += diff
#print("lost %u seq=%u seq2=%u last_seq=%u src_tupe=%s %s" % (diff, seq, seq2, last_seq, str(src_tuple), msg.get_type()))
self.last_seq[src_tuple] = seq2
self.mav_count += 1
self.timestamp = msg._timestamp
if type == 'HEARTBEAT' and self.probably_vehicle_heartbeat(msg):
if self.sysid == 0:
# lock onto id tuple of first vehicle heartbeat
self.sysid = src_system
if float(mavlink.WIRE_PROTOCOL_VERSION) >= 1:
self.flightmode = mode_string_v10(msg)
self.mav_type = msg.type
self.base_mode = msg.base_mode
self.sysid_state[self.sysid].armed = (msg.base_mode & mavlink.MAV_MODE_FLAG_SAFETY_ARMED)
elif type == 'PARAM_VALUE':
if not src_tuple in self.param_state:
self.param_state[src_tuple] = param_state()
self.param_state[src_tuple].params[msg.param_id] = msg.param_value
elif type == 'SYS_STATUS' and mavlink.WIRE_PROTOCOL_VERSION == '0.9':
self.flightmode = mode_string_v09(msg)
elif type == 'GPS_RAW':
if self.sysid_state[src_system].messages['HOME'].fix_type < 2:
self.sysid_state[src_system].messages['HOME'] = msg
elif type == 'GPS_RAW_INT':
if self.sysid_state[src_system].messages['HOME'].fix_type < 3:
self.sysid_state[src_system].messages['HOME'] = msg
for hook in self.message_hooks:
hook(self, msg)
if (msg.get_signed() and
self.mav.signing.link_id == 0 and
msg.get_link_id() != 0 and
self.target_system == msg.get_srcSystem() and
self.target_component == msg.get_srcComponent()):
# change to link_id from incoming packet
self.mav.signing.link_id = msg.get_link_id()
def packet_loss(self):
'''packet loss as a percentage'''
if self.mav_count == 0:
return 0
return (100.0*self.mav_loss)/(self.mav_count+self.mav_loss)
def recv_msg(self):
'''message receive routine'''
self.pre_message()
while True:
n = self.mav.bytes_needed()
s = self.recv(n)
numnew = len(s)
if numnew != 0:
if self.logfile_raw:
self.logfile_raw.write(str(s))
if self.first_byte:
self.auto_mavlink_version(s)
# We always call parse_char even if the new string is empty, because the existing message buf might already have some valid packet
# we can extract
msg = self.mav.parse_char(s)
if msg:
if self.logfile and msg.get_type() != 'BAD_DATA' :
usec = int(time.time() * 1.0e6) & ~3
self.logfile.write(str(struct.pack('>Q', usec) + msg.get_msgbuf()))
self.post_message(msg)
return msg
else:
# if we failed to parse any messages _and_ no new bytes arrived, return immediately so the client has the option to
# timeout
if numnew == 0:
return None
def recv_match(self, condition=None, type=None, blocking=False, timeout=None):
'''recv the next MAVLink message that matches the given condition
type can be a string or a list of strings'''
if type is not None and not isinstance(type, list) and not isinstance(type, set):
type = [type]
start_time = time.time()
while True:
if timeout is not None:
now = time.time()
if now < start_time:
start_time = now # If an external process rolls back system time, we should not spin forever.
if start_time + timeout < time.time():
return None
m = self.recv_msg()
if m is None:
if blocking:
for hook in self.idle_hooks:
hook(self)
if timeout is None:
self.select(0.05)
else:
self.select(timeout/2)
continue
return None
if type is not None and not m.get_type() in type:
continue
if not evaluate_condition(condition, self.messages):
continue
return m
def check_condition(self, condition):
'''check if a condition is true'''
return evaluate_condition(condition, self.messages)
def mavlink10(self):
'''return True if using MAVLink 1.0 or later'''
return float(self.WIRE_PROTOCOL_VERSION) >= 1
def mavlink20(self):
'''return True if using MAVLink 2.0 or later'''
return float(self.WIRE_PROTOCOL_VERSION) >= 2
def setup_logfile(self, logfile, mode='w'):
'''start logging to the given logfile, with timestamps'''
self.logfile = open(logfile, mode=mode)
def setup_logfile_raw(self, logfile, mode='w'):
'''start logging raw bytes to the given logfile, without timestamps'''
self.logfile_raw = open(logfile, mode=mode)
def wait_heartbeat(self, blocking=True, timeout=None):
'''wait for a heartbeat so we know the target system IDs'''
return self.recv_match(type='HEARTBEAT', blocking=blocking, timeout=timeout)
def param_fetch_all(self):
'''initiate fetch of all parameters'''
if time.time() - self.param_fetch_start < 2.0:
# don't fetch too often
return
self.param_fetch_start = time.time()
self.mav.param_request_list_send(self.target_system, self.target_component)
def param_fetch_one(self, name):
'''initiate fetch of one parameter'''
try:
idx = int(name)
self.mav.param_request_read_send(self.target_system, self.target_component, b"", idx)
except Exception:
if sys.version_info.major >= 3 and not isinstance(name, bytes):
name = bytes(name,'ascii')
self.mav.param_request_read_send(self.target_system, self.target_component, name, -1)
def time_since(self, mtype):
'''return the time since the last message of type mtype was received'''
if not mtype in self.messages:
return time.time() - self.start_time
return time.time() - self.messages[mtype]._timestamp
def param_set_send(self, parm_name, parm_value, parm_type=None):
'''wrapper for parameter set'''
if self.mavlink10():
if parm_type is None:
parm_type = mavlink.MAVLINK_TYPE_FLOAT
self.mav.param_set_send(self.target_system, self.target_component,
parm_name.encode('utf8'), parm_value, parm_type)
else:
self.mav.param_set_send(self.target_system, self.target_component,
parm_name.encode('utf8'), parm_value)
def waypoint_request_list_send(self):
'''wrapper for waypoint_request_list_send'''
if self.mavlink10():
self.mav.mission_request_list_send(self.target_system, self.target_component)
else:
self.mav.waypoint_request_list_send(self.target_system, self.target_component)
def waypoint_clear_all_send(self):
'''wrapper for waypoint_clear_all_send'''
if self.mavlink10():
self.mav.mission_clear_all_send(self.target_system, self.target_component)
else:
self.mav.waypoint_clear_all_send(self.target_system, self.target_component)
def waypoint_request_send(self, seq):
'''wrapper for waypoint_request_send'''
if self.mavlink10():
self.mav.mission_request_send(self.target_system, self.target_component, seq)
else:
self.mav.waypoint_request_send(self.target_system, self.target_component, seq)
def waypoint_set_current_send(self, seq):
'''wrapper for waypoint_set_current_send'''
if self.mavlink10():
self.mav.mission_set_current_send(self.target_system, self.target_component, seq)
else:
self.mav.waypoint_set_current_send(self.target_system, self.target_component, seq)
def waypoint_current(self):
'''return current waypoint'''
if self.mavlink10():
m = self.recv_match(type='MISSION_CURRENT', blocking=True)
else:
m = self.recv_match(type='WAYPOINT_CURRENT', blocking=True)
return m.seq
def waypoint_count_send(self, seq):
'''wrapper for waypoint_count_send'''
if self.mavlink10():
self.mav.mission_count_send(self.target_system, self.target_component, seq)
else:
self.mav.waypoint_count_send(self.target_system, self.target_component, seq)
def set_mode_flag(self, flag, enable):
'''
Enables/ disables MAV_MODE_FLAG
@param flag The mode flag,
see MAV_MODE_FLAG enum
@param enable Enable the flag, (True/False)
'''
if self.mavlink10():
mode = self.base_mode
if enable:
mode = mode | flag
elif not enable:
mode = mode & ~flag
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_DO_SET_MODE, 0,
mode,
0, 0, 0, 0, 0, 0)
else:
print("Set mode flag not supported")
def set_mode_auto(self):
'''enter auto mode'''
if self.mavlink10():
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_MISSION_START, 0, 0, 0, 0, 0, 0, 0, 0)
else:
MAV_ACTION_SET_AUTO = 13
self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_SET_AUTO)
def mode_mapping(self):
'''return dictionary mapping mode names to numbers, or None if unknown'''
mav_type = self.field('HEARTBEAT', 'type', self.mav_type)
mav_autopilot = self.field('HEARTBEAT', 'autopilot', None)
if mav_autopilot == mavlink.MAV_AUTOPILOT_PX4:
return px4_map
if mav_type is None:
return None
map = None
if mav_type in [mavlink.MAV_TYPE_QUADROTOR,
mavlink.MAV_TYPE_HELICOPTER,
mavlink.MAV_TYPE_HEXAROTOR,
mavlink.MAV_TYPE_OCTOROTOR,
mavlink.MAV_TYPE_DODECAROTOR,
mavlink.MAV_TYPE_COAXIAL,
mavlink.MAV_TYPE_TRICOPTER]:
map = mode_mapping_acm
if mav_type == mavlink.MAV_TYPE_FIXED_WING:
map = mode_mapping_apm
if mav_type == mavlink.MAV_TYPE_GROUND_ROVER:
map = mode_mapping_rover
if mav_type == mavlink.MAV_TYPE_SURFACE_BOAT:
map = mode_mapping_rover # for the time being
if mav_type == mavlink.MAV_TYPE_ANTENNA_TRACKER:
map = mode_mapping_tracker
if mav_type == mavlink.MAV_TYPE_SUBMARINE:
map = mode_mapping_sub
if map is None:
return None
inv_map = dict((a, b) for (b, a) in map.items())
return inv_map
def set_mode_apm(self, mode, custom_mode = 0, custom_sub_mode = 0):
'''enter arbitrary mode'''
if isinstance(mode, str):
mode_map = self.mode_mapping()
if mode_map is None or mode not in mode_map:
print("Unknown mode '%s'" % mode)
return
mode = mode_map[mode]
# set mode by integer mode number for ArduPilot
self.mav.command_long_send(self.target_system,
self.target_component,
mavlink.MAV_CMD_DO_SET_MODE,
0,
mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED,
mode,
0,
0,
0,
0,
0)
def set_mode_px4(self, mode, custom_mode, custom_sub_mode):
'''enter arbitrary mode'''
if isinstance(mode, str):
mode_map = self.mode_mapping()
if mode_map is None or mode not in mode_map:
print("Unknown mode '%s'" % mode)
return
# PX4 uses two fields to define modes
mode, custom_mode, custom_sub_mode = px4_map[mode]
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_DO_SET_MODE, 0, mode, custom_mode, custom_sub_mode, 0, 0, 0, 0)
def set_mode(self, mode, custom_mode = 0, custom_sub_mode = 0):
'''set arbitrary flight mode'''
mav_autopilot = self.field('HEARTBEAT', 'autopilot', None)
if mav_autopilot == mavlink.MAV_AUTOPILOT_PX4:
self.set_mode_px4(mode, custom_mode, custom_sub_mode)
else:
self.set_mode_apm(mode)
def set_mode_rtl(self):
'''enter RTL mode'''
if self.mavlink10():
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_NAV_RETURN_TO_LAUNCH, 0, 0, 0, 0, 0, 0, 0, 0)
else:
MAV_ACTION_RETURN = 3
self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_RETURN)
def set_mode_manual(self):
'''enter MANUAL mode'''
if self.mavlink10():
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_DO_SET_MODE, 0,
mavlink.MAV_MODE_MANUAL_ARMED,
0, 0, 0, 0, 0, 0)
else:
MAV_ACTION_SET_MANUAL = 12
self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_SET_MANUAL)
def set_mode_fbwa(self):
'''enter FBWA mode'''
if self.mavlink10():
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_DO_SET_MODE, 0,
mavlink.MAV_MODE_STABILIZE_ARMED,
0, 0, 0, 0, 0, 0)
else:
print("Forcing FBWA not supported")
def set_mode_loiter(self):
'''enter LOITER mode'''
if self.mavlink10():
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_NAV_LOITER_UNLIM, 0, 0, 0, 0, 0, 0, 0, 0)
else:
MAV_ACTION_LOITER = 27
self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_LOITER)
def set_servo(self, channel, pwm):
'''set a servo value'''
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_DO_SET_SERVO, 0,
channel, pwm,
0, 0, 0, 0, 0)
def set_relay(self, relay_pin=0, state=True):
'''Set relay_pin to value of state'''
if self.mavlink10():
self.mav.command_long_send(
self.target_system, # target_system
self.target_component, # target_component
mavlink.MAV_CMD_DO_SET_RELAY, # command
0, # Confirmation
relay_pin, # Relay Number
int(state), # state (1 to indicate arm)
0, # param3 (all other params meaningless)
0, # param4
0, # param5
0, # param6
0) # param7
else:
print("Setting relays not supported.")
def calibrate_level(self):
'''calibrate accels (1D version)'''
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_PREFLIGHT_CALIBRATION, 0,
1, 1, 0, 0, 0, 0, 0)
def calibrate_pressure(self):
'''calibrate pressure'''
if self.mavlink10():
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_PREFLIGHT_CALIBRATION, 0,
0, 0, 1, 0, 0, 0, 0)
else:
MAV_ACTION_CALIBRATE_PRESSURE = 20
self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_CALIBRATE_PRESSURE)
def reboot_autopilot(self, hold_in_bootloader=False):
'''reboot the autopilot'''
if self.mavlink10():
if hold_in_bootloader:
param1 = 3
else:
param1 = 1
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN, 0,
param1, 0, 0, 0, 0, 0, 0)
def wait_gps_fix(self):
self.recv_match(type='VFR_HUD', blocking=True)
if self.mavlink10():
self.recv_match(type='GPS_RAW_INT', blocking=True,
condition='GPS_RAW_INT.fix_type>=3 and GPS_RAW_INT.lat != 0')
else:
self.recv_match(type='GPS_RAW', blocking=True,
condition='GPS_RAW.fix_type>=2 and GPS_RAW.lat != 0')
def location(self, relative_alt=False):
'''return current location'''
self.wait_gps_fix()
# wait for another VFR_HUD, to ensure we have correct altitude
self.recv_match(type='VFR_HUD', blocking=True)
self.recv_match(type='GLOBAL_POSITION_INT', blocking=True)
if relative_alt:
alt = self.messages['GLOBAL_POSITION_INT'].relative_alt*0.001
else:
alt = self.messages['VFR_HUD'].alt
return location(self.messages['GPS_RAW_INT'].lat*1.0e-7,
self.messages['GPS_RAW_INT'].lon*1.0e-7,
alt,
self.messages['VFR_HUD'].heading)
def arducopter_arm(self):
'''arm motors (arducopter only)'''
if self.mavlink10():
self.mav.command_long_send(
self.target_system, # target_system
self.target_component,
mavlink.MAV_CMD_COMPONENT_ARM_DISARM, # command
0, # confirmation
1, # param1 (1 to indicate arm)
0, # param2 (all other params meaningless)
0, # param3
0, # param4
0, # param5
0, # param6
0) # param7
def arducopter_disarm(self):
'''calibrate pressure'''
if self.mavlink10():
self.mav.command_long_send(
self.target_system, # target_system
self.target_component,
mavlink.MAV_CMD_COMPONENT_ARM_DISARM, # command
0, # confirmation
0, # param1 (0 to indicate disarm)
0, # param2 (all other params meaningless)
0, # param3
0, # param4
0, # param5
0, # param6
0) # param7
def motors_armed(self):
'''return true if motors armed'''
return self.sysid_state[self.sysid].armed
def motors_armed_wait(self):
'''wait for motors to be armed'''
while True:
m = self.wait_heartbeat()
if self.motors_armed():
return
def motors_disarmed_wait(self):
'''wait for motors to be disarmed'''
while True:
m = self.wait_heartbeat()
if not self.motors_armed():
return
def field(self, type, field, default=None):
'''convenient function for returning an arbitrary MAVLink
field with a default'''
if not type in self.messages:
return default
return getattr(self.messages[type], field, default)
def param(self, name, default=None):
'''convenient function for returning an arbitrary MAVLink
parameter with a default'''
if not name in self.params:
return default
return self.params[name]
def setup_signing(self, secret_key, sign_outgoing=True, allow_unsigned_callback=None, initial_timestamp=None, link_id=None):
'''setup for MAVLink2 signing'''
self.mav.signing.secret_key = secret_key
self.mav.signing.sign_outgoing = sign_outgoing
self.mav.signing.allow_unsigned_callback = allow_unsigned_callback
if link_id is None:
# auto-increment the link_id for each link
global global_link_id
link_id = global_link_id
global_link_id = min(global_link_id + 1, 255)
self.mav.signing.link_id = link_id
if initial_timestamp is None:
# timestamp is time since 1/1/2015
epoch_offset = 1420070400
now = max(time.time(), epoch_offset)
initial_timestamp = now - epoch_offset
initial_timestamp = int(initial_timestamp * 100 * 1000)
# initial_timestamp is in 10usec units
self.mav.signing.timestamp = initial_timestamp
def disable_signing(self):
'''disable MAVLink2 signing'''
self.mav.signing.secret_key = None
self.mav.signing.sign_outgoing = False
self.mav.signing.allow_unsigned_callback = None
self.mav.signing.link_id = 0
self.mav.signing.timestamp = 0
def set_close_on_exec(fd):
'''set the clone on exec flag on a file descriptor. Ignore exceptions'''
try:
import fcntl
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
flags |= fcntl.FD_CLOEXEC
fcntl.fcntl(fd, fcntl.F_SETFD, flags)
except Exception:
pass
class FakeSerial():
def __init__(self):
pass
def read(self, len):
return ""
def write(self, buf):
raise Exception("write always fails")
def inWaiting(self):
return 0
def close(self):
pass
class mavserial(mavfile):
'''a serial mavlink port'''
def __init__(self, device, baud=115200, autoreconnect=False, source_system=255, source_component=0, use_native=default_native, force_connected=False):
import serial
if ',' in device and not os.path.exists(device):
device, baud = device.split(',')
self.baud = baud
self.device = device
self.autoreconnect = autoreconnect
self.force_connected = force_connected
# we rather strangely set the baudrate initially to 1200, then change to the desired
# baudrate. This works around a kernel bug on some Linux kernels where the baudrate
# is not set correctly
try:
self.port = serial.Serial(self.device, 1200, timeout=0,
dsrdtr=False, rtscts=False, xonxoff=False)
except serial.SerialException as e:
if not force_connected:
raise e
self.port = FakeSerial()
try:
fd = self.port.fileno()
set_close_on_exec(fd)
except Exception:
fd = None
self.set_baudrate(self.baud)
mavfile.__init__(self, fd, device, source_system=source_system, source_component=source_component, use_native=use_native)
self.rtscts = False
def set_rtscts(self, enable):
'''enable/disable RTS/CTS if applicable'''
try:
self.port.setRtsCts(enable)
except Exception:
self.port.rtscts = enable
self.rtscts = enable
def set_baudrate(self, baudrate):
'''set baudrate'''
try:
self.port.setBaudrate(baudrate)
except Exception:
# for pySerial 3.0, which doesn't have setBaudrate()
self.port.baudrate = baudrate
def close(self):
self.port.close()
def recv(self,n=None):
if n is None:
n = self.mav.bytes_needed()
if self.fd is None:
waiting = self.port.inWaiting()
if waiting < n:
n = waiting
ret = self.port.read(n)
return ret
def write(self, buf):
try:
return self.port.write(bytes(buf))
except Exception:
if not self.portdead:
print("Device %s is dead" % self.device)
self.portdead = True
if self.autoreconnect:
self.reset()
return -1
def reset(self):
import serial
try:
try:
newport = serial.Serial(self.device, self.baud, timeout=0,
dsrdtr=False, rtscts=False, xonxoff=False)
except serial.SerialException as e:
if not self.force_connected:
raise e
newport = FakeSerial()
return False
self.port.close()
self.port = newport
print("Device %s reopened OK" % self.device)
self.portdead = False
try:
self.fd = self.port.fileno()
except Exception:
self.fd = None
self.set_baudrate(self.baud)
if self.rtscts:
self.set_rtscts(self.rtscts)
return True
except Exception:
return False
class mavudp(mavfile):
'''a UDP mavlink socket'''
def __init__(self, device, input=True, broadcast=False, source_system=255, source_component=0, use_native=default_native):
a = device.split(':')
if len(a) != 2:
print("UDP ports must be specified as host:port")
sys.exit(1)
self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_server = input
self.broadcast = False
if input:
self.port.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.port.bind((a[0], int(a[1])))
else:
self.destination_addr = (a[0], int(a[1]))
if broadcast:
self.port.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.broadcast = True
set_close_on_exec(self.port.fileno())
self.port.setblocking(0)
self.last_address = None
self.resolved_destination_addr = None
mavfile.__init__(self, self.port.fileno(), device, source_system=source_system, source_component=source_component, input=input, use_native=use_native)
def close(self):
self.port.close()
def recv(self,n=None):
try:
data, new_addr = self.port.recvfrom(UDP_MAX_PACKET_LEN)
except socket.error as e:
if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.ECONNREFUSED ]:
return ""
raise
if self.udp_server or self.broadcast:
self.last_address = new_addr
return data
def write(self, buf):
try:
if self.udp_server:
if self.last_address:
self.port.sendto(buf, self.last_address)
else:
if self.last_address and self.broadcast:
self.destination_addr = self.last_address
self.broadcast = False
self.port.connect(self.destination_addr)
# turn a (possible) hostname into an IP address to
# avoid resolving the hostname for every packet sent:
if self.destination_addr[0] != self.resolved_destination_addr:
self.resolved_destination_addr = self.destination_addr[0]
self.destination_addr = (socket.gethostbyname(self.destination_addr[0]), self.destination_addr[1])
self.port.sendto(buf, self.destination_addr)
except socket.error:
pass
def recv_msg(self):
'''message receive routine for UDP link'''
self.pre_message()
s = self.recv()
if len(s) > 0:
if self.first_byte:
self.auto_mavlink_version(s)
m = self.mav.parse_char(s)
if m is not None:
self.post_message(m)
return m
class mavmcast(mavfile):
'''a UDP multicast mavlink socket'''
def __init__(self, device, broadcast=False, source_system=255, source_component=0, use_native=default_native):
a = device.split(':')
mcast_ip = "239.255.145.50"
mcast_port = 14550
if len(a) == 1 and len(a[0]) > 0:
mcast_port = int(a[0])
elif len(a) > 1:
mcast_ip = a[0]
mcast_port = int(a[1])
# first the receiving socket. We use separate sending and receiving
# sockets so we can use the port number of the sending socket to detect
# packets from ourselves
self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.port.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.port.bind((mcast_ip, mcast_port))
mreq = struct.pack("4sl", socket.inet_aton(mcast_ip), socket.INADDR_ANY)
self.port.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
self.port.setblocking(0)
set_close_on_exec(self.port.fileno())
# now the sending socket
self.port_out = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.port_out.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.port_out.setblocking(0)
self.port_out.connect((mcast_ip, mcast_port))
set_close_on_exec(self.port_out.fileno())
self.myport = None
mavfile.__init__(self, self.port.fileno(), device,
source_system=source_system, source_component=source_component,
input=False, use_native=use_native)
def close(self):
self.port.close()
self.port_out.close()
def recv(self,n=None):
try:
data, new_addr = self.port.recvfrom(UDP_MAX_PACKET_LEN)
if self.myport is None:
try:
(myaddr,self.myport) = self.port_out.getsockname()
except Exception:
pass
except socket.error as e:
if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.ECONNREFUSED ]:
return ""
raise
if self.myport == new_addr[1]:
# data from ourselves, discard
return ''
return data
def write(self, buf):
try:
self.port_out.send(buf)
except socket.error as e:
pass
def recv_msg(self):
'''message receive routine for UDP link'''
self.pre_message()
s = self.recv()
if len(s) > 0:
if self.first_byte:
self.auto_mavlink_version(s)
m = self.mav.parse_char(s)
if m is not None:
self.post_message(m)
return m
class mavtcp(mavfile):
'''a TCP mavlink socket'''
def __init__(self,
device,
autoreconnect=False,
source_system=255,
source_component=0,
retries=6,
use_native=default_native):
a = device.split(':')
if len(a) != 2:
print("TCP ports must be specified as host:port")
sys.exit(1)
self.destination_addr = (a[0], int(a[1]))
self.autoreconnect = autoreconnect
self.retries = retries
self.do_connect()
mavfile.__init__(self, self.port.fileno(), "tcp:" + device, source_system=source_system, source_component=source_component, use_native=use_native)
def do_connect(self):
self.port = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
retries = self.retries
if retries <= 0:
# try to connect at least once:
retries = 1
while retries >= 0:
retries -= 1
try:
self.port.connect(self.destination_addr)
break
except Exception as e:
if retries == 0:
if self.port is not None:
self.port.close()
self.port = None
raise e
print(e, "sleeping")
time.sleep(1)
self.port.setblocking(0)
set_close_on_exec(self.port.fileno())
self.port.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
def close(self):
self.port.close()
def handle_disconnect(self):
print("Connection reset or closed by peer on TCP socket")
self.reconnect()
def handle_eof(self):
# EOF
print("EOF on TCP socket")
self.reconnect()
def recv(self,n=None):
if self.port is None:
self.reconnect()
if n is None:
n = self.mav.bytes_needed()
try:
data = self.port.recv(n)
except socket.error as e:
if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK ]:
return ""
if e.errno in [ errno.ECONNRESET, errno.EPIPE ]:
self.handle_disconnect()
raise
if len(data) == 0:
self.handle_eof()
return data
def write(self, buf):
if self.port is None:
try:
self.reconnect()
except socket.error as e:
pass
if self.port is None:
return
try:
self.port.send(buf)
except socket.error as e:
if e.errno in [ errno.ECONNRESET, errno.EPIPE ]:
self.handle_disconnect()
pass
def reconnect(self):
if self.autoreconnect:
print("Attempting reconnect")
if self.port is not None:
self.port.close()
self.port = None
self.do_connect()
class mavtcpin(mavfile):
'''a TCP input mavlink socket'''
def __init__(self, device, source_system=255, source_component=0, retries=3, use_native=default_native):
a = device.split(':')
if len(a) != 2:
print("TCP ports must be specified as host:port")
sys.exit(1)
self.listen = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.listen_addr = (a[0], int(a[1]))
self.listen.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.listen.bind(self.listen_addr)
self.listen.listen(1)
self.listen.setblocking(0)
set_close_on_exec(self.listen.fileno())
self.listen.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
mavfile.__init__(self, self.listen.fileno(), "tcpin:" + device, source_system=source_system, source_component=source_component, use_native=use_native)
self.port = None
def close(self):
self.listen.close()
def recv(self,n=None):
if not self.port:
try:
(self.port, addr) = self.listen.accept()
except Exception:
return ''
self.port.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
self.port.setblocking(0)
set_close_on_exec(self.port.fileno())
self.fd = self.port.fileno()
if n is None:
n = self.mav.bytes_needed()
try:
data = self.port.recv(n)
except socket.error as e:
if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK ]:
return ""
self.port.close()
self.port = None
self.fd = self.listen.fileno()
return ''
return data
def write(self, buf):
if self.port is None:
return
try:
self.port.send(buf)
except socket.error as e:
if e.errno in [ errno.EPIPE ]:
self.port.close()
self.port = None
self.fd = self.listen.fileno()
pass
class mavlogfile(mavfile):
'''a MAVLink logfile reader/writer'''
def __init__(self, filename, planner_format=None,
write=False, append=False,
robust_parsing=True, notimestamps=False, source_system=255, source_component=0, use_native=default_native):
self.filename = filename
self.writeable = write
self.robust_parsing = robust_parsing
self.planner_format = planner_format
self._two64 = math.pow(2.0, 63)
mode = 'rb'
if self.writeable:
if append:
mode = 'ab'
else:
mode = 'wb'
self.f = open(filename, mode)
self.filesize = os.path.getsize(filename)
self.percent = 0
mavfile.__init__(self, None, filename, source_system=source_system, source_component=source_component, notimestamps=notimestamps, use_native=use_native)
if self.notimestamps:
self._timestamp = 0
else:
self._timestamp = time.time()
self.stop_on_EOF = True
self._last_message = None
self._last_timestamp = None
self._link = 0
def close(self):
self.f.close()
def recv(self,n=None):
if n is None:
n = self.mav.bytes_needed()
return self.f.read(n)
def write(self, buf):
self.f.write(buf)
def scan_timestamp(self, tbuf):
'''scan forward looking in a tlog for a timestamp in a reasonable range'''
while True:
(tusec,) = struct.unpack('>Q', tbuf)
t = tusec * 1.0e-6
if abs(t - self._last_timestamp) <= 3*24*60*60:
break
c = self.f.read(1)
if len(c) != 1:
break
tbuf = tbuf[1:] + c
return t
def pre_message(self):
'''read timestamp if needed'''
# read the timestamp
if self.filesize != 0:
self.percent = (100.0 * self.f.tell()) / self.filesize
if self.notimestamps:
return
if self.planner_format:
tbuf = self.f.read(21)
if len(tbuf) != 21 or tbuf[0] != '-' or tbuf[20] != ':':
raise RuntimeError('bad planner timestamp %s' % tbuf)
hnsec = self._two64 + float(tbuf[0:20])
t = hnsec * 1.0e-7 # convert to seconds
t -= 719163 * 24 * 60 * 60 # convert to 1970 base
self._link = 0
else:
tbuf = self.f.read(8)
if len(tbuf) != 8:
return
(tusec,) = struct.unpack('>Q', tbuf)
t = tusec * 1.0e-6
if (self._last_timestamp is not None and
self._last_message.get_type() == "BAD_DATA" and
abs(t - self._last_timestamp) > 3*24*60*60):
t = self.scan_timestamp(tbuf)
self._link = tusec & 0x3
self._timestamp = t
def post_message(self, msg):
'''add timestamp to message'''
# read the timestamp
super(mavlogfile, self).post_message(msg)
if self.planner_format:
self.f.read(1) # trailing newline
self.timestamp = msg._timestamp
self._last_message = msg
if msg.get_type() != "BAD_DATA":
self._last_timestamp = msg._timestamp
msg._link = self._link
class mavmmaplog(mavlogfile):
'''a MAVLink log file accessed via mmap. Used for fast read-only
access with low memory overhead where particular message types are wanted'''
def __init__(self, filename, progress_callback=None):
import platform, mmap
mavlogfile.__init__(self, filename)
self.f.seek(0, 2)
self.data_len = self.f.tell()
self.f.seek(0)
if platform.system() == "Windows":
self.data_map = mmap.mmap(self.f.fileno(), self.data_len, None, mmap.ACCESS_READ)
else:
self.data_map = mmap.mmap(self.f.fileno(), self.data_len, mmap.MAP_PRIVATE, mmap.PROT_READ)
self._rewind()
self.init_arrays(progress_callback)
self._flightmodes = None
def _rewind(self):
'''rewind to start of log'''
self.flightmode = "UNKNOWN"
self.offset = 0
self.type_nums = None
self.f.seek(0)
def rewind(self):
'''rewind to start of log'''
self._rewind()
def init_arrays(self, progress_callback=None):
'''initialise arrays for fast recv_match()'''
# dictionary indexed by msgid, mapping to arrays of file offsets where
# each instance of a msg type is found
self.offsets = {}
# number of msgs of each msg type
self.counts = {}
self._count = 0
# mapping from msg name to msg id
self.name_to_id = {}
# mapping from msg id to name
self.id_to_name = {}
self.type_nums = None
ofs = 0
pct = 0
MARKER_V1 = 0xFE
MARKER_V2 = 0xFD
while ofs+8+6 < self.data_len:
marker = u_ord(self.data_map[ofs+8])
mlen = u_ord(self.data_map[ofs+9]) + 8
if marker == MARKER_V1:
mtype = u_ord(self.data_map[ofs+13])
mlen += 8
elif marker == MARKER_V2:
if ofs+8+10 > self.data_len:
break
mtype = u_ord(self.data_map[ofs+15]) | (u_ord(self.data_map[ofs+16])<<8) | (u_ord(self.data_map[ofs+17])<<16)
mlen += 12
incompat_flags = u_ord(self.data_map[ofs+10])
if incompat_flags & mavlink.MAVLINK_IFLAG_SIGNED:
mlen += mavlink.MAVLINK_SIGNATURE_BLOCK_LEN
else:
# unrecognised marker; probably a malformed log
ofs += 1
continue
if not mtype in self.offsets:
if not mtype in mavlink.mavlink_map:
ofs += mlen
continue
self.offsets[mtype] = []
self.counts[mtype] = 0
msg = mavlink.mavlink_map[mtype]
self.name_to_id[msg.name] = mtype
self.id_to_name[mtype] = msg.name
self.f.seek(ofs)
m = self.recv_msg()
self.messages[msg.name] = m
self.offsets[mtype].append(ofs)
self.counts[mtype] += 1
ofs += mlen
new_pct = (100 * ofs) // self.data_len
if progress_callback is not None and new_pct != pct:
progress_callback(new_pct)
pct = new_pct
for mtype in self.counts:
self._count += self.counts[mtype]
self.offset = 0
self._rewind()
def skip_to_type(self, type):
'''skip fwd to next msg matching given type set'''
if self.type_nums is None:
# always add some key msg types so we can track flightmode, params etc
type = type.copy()
type.update(set(['HEARTBEAT','PARAM_VALUE']))
self.indexes = []
self.type_nums = []
for t in type:
if not t in self.name_to_id:
continue
self.type_nums.append(self.name_to_id[t])
self.indexes.append(0)
smallest_index = -1
smallest_offset = self.data_len
for i in range(len(self.type_nums)):
mtype = self.type_nums[i]
if self.indexes[i] >= self.counts[mtype]:
continue
ofs = self.offsets[mtype][self.indexes[i]]
if ofs < smallest_offset:
smallest_offset = ofs
smallest_index = i
if smallest_index >= 0:
self.indexes[smallest_index] += 1
self.offset = smallest_offset
self.f.seek(smallest_offset)
def recv_match(self, condition=None, type=None, blocking=False, timeout=None):
'''recv the next message that matches the given condition
type can be a string or a list of strings'''
if type is not None:
if isinstance(type, str):
type = set([type])
elif isinstance(type, list):
type = set(type)
while True:
if type is not None:
self.skip_to_type(type)
m = self.recv_msg()
if m is None:
if blocking:
for hook in self.idle_hooks:
hook(self)
if timeout is None:
self.select(0.05)
else:
self.select(timeout/2)
continue
return None
if type is not None and not m.get_type() in type:
continue
if not evaluate_condition(condition, self.messages):
continue
return m
def flightmode_list(self):
'''return an array of tuples for all flightmodes in log. Tuple is (modestring, t0, t1)'''
tstamp = None
fmode = None
if self._flightmodes is None:
self._rewind()
self._flightmodes = []
types = set(['HEARTBEAT'])
while True:
m = self.recv_match(type=types)
if m is None:
break
tstamp = m._timestamp
if self.flightmode == fmode:
continue
if len(self._flightmodes) > 0:
(mode, t0, t1) = self._flightmodes[-1]
self._flightmodes[-1] = (mode, t0, tstamp)
self._flightmodes.append((self.flightmode, tstamp, None))
fmode = self.flightmode
if tstamp is not None:
(mode, t0, t1) = self._flightmodes[-1]
self._flightmodes[-1] = (mode, t0, tstamp)
self._rewind()
return self._flightmodes
class mavchildexec(mavfile):
'''a MAVLink child processes reader/writer'''
def __init__(self, filename, source_system=255, source_component=0, use_native=default_native):
from subprocess import Popen, PIPE
import fcntl
self.filename = filename
self.child = Popen(filename, shell=False, stdout=PIPE, stdin=PIPE, bufsize=0)
self.fd = self.child.stdout.fileno()
fl = fcntl.fcntl(self.fd, fcntl.F_GETFL)
fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
fl = fcntl.fcntl(self.child.stdout.fileno(), fcntl.F_GETFL)
fcntl.fcntl(self.child.stdout.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)
mavfile.__init__(self, self.fd, filename, source_system=source_system, source_component=source_component, use_native=use_native)
def close(self):
self.child.close()
def recv(self,n=None):
try:
x = self.child.stdout.read(1)
except Exception:
return ''
return x
def write(self, buf):
self.child.stdin.write(buf)
def mavlink_connection(device, baud=115200, source_system=255, source_component=0,
planner_format=None, write=False, append=False,
robust_parsing=True, notimestamps=False, input=True,
dialect=None, autoreconnect=False, zero_time_base=False,
retries=3, use_native=default_native,
force_connected=False, progress_callback=None):
'''open a serial, UDP, TCP or file mavlink connection'''
global mavfile_global
if force_connected:
# force_connected implies autoreconnect
autoreconnect = True
if dialect is not None:
set_dialect(dialect)
if device.startswith('tcp:'):
return mavtcp(device[4:],
autoreconnect=autoreconnect,
source_system=source_system,
source_component=source_component,
retries=retries,
use_native=use_native)
if device.startswith('tcpin:'):
return mavtcpin(device[6:], source_system=source_system, source_component=source_component, retries=retries, use_native=use_native)
if device.startswith('udpin:'):
return mavudp(device[6:], input=True, source_system=source_system, source_component=source_component, use_native=use_native)
if device.startswith('udpout:'):
return mavudp(device[7:], input=False, source_system=source_system, source_component=source_component, use_native=use_native)
if device.startswith('udpbcast:'):
return mavudp(device[9:], input=False, source_system=source_system, source_component=source_component, use_native=use_native, broadcast=True)
# For legacy purposes we accept the following syntax and let the caller to specify direction
if device.startswith('udp:'):
return mavudp(device[4:], input=input, source_system=source_system, source_component=source_component, use_native=use_native)
if device.startswith('mcast:'):
return mavmcast(device[6:], source_system=source_system, source_component=source_component, use_native=use_native)
if device.lower().endswith('.bin') or device.lower().endswith('.px4log'):
# support dataflash logs
from pymavlink import DFReader
m = DFReader.DFReader_binary(device, zero_time_base=zero_time_base, progress_callback=progress_callback)
mavfile_global = m
return m
if device.endswith('.log'):
# support dataflash text logs
from pymavlink import DFReader
if DFReader.DFReader_is_text_log(device):
m = DFReader.DFReader_text(device, zero_time_base=zero_time_base, progress_callback=progress_callback)
mavfile_global = m
return m
# list of suffixes to prevent setting DOS paths as UDP sockets
logsuffixes = ['mavlink', 'log', 'raw', 'tlog' ]
suffix = device.split('.')[-1].lower()
if device.find(':') != -1 and not suffix in logsuffixes:
return mavudp(device, source_system=source_system, source_component=source_component, input=input, use_native=use_native)
if os.path.isfile(device):
if device.endswith(".elf") or device.find("/bin/") != -1:
print("executing '%s'" % device)
return mavchildexec(device, source_system=source_system, source_component=source_component, use_native=use_native)
elif not write and not append and not notimestamps:
return mavmmaplog(device, progress_callback=progress_callback)
else:
return mavlogfile(device, planner_format=planner_format, write=write,
append=append, robust_parsing=robust_parsing, notimestamps=notimestamps,
source_system=source_system, source_component=source_component, use_native=use_native)
return mavserial(device,
baud=baud,
source_system=source_system,
source_component=source_component,
autoreconnect=autoreconnect,
use_native=use_native,
force_connected=force_connected)
class periodic_event(object):
'''a class for fixed frequency events'''
def __init__(self, frequency):
self.frequency = float(frequency)
self.last_time = time.time()
def force(self):
'''force immediate triggering'''
self.last_time = 0
def trigger(self):
'''return True if we should trigger now'''
tnow = time.time()
if tnow < self.last_time:
print("Warning, time moved backwards. Restarting timer.")
self.last_time = tnow
if self.last_time + (1.0/self.frequency) <= tnow:
self.last_time = tnow
return True
return False
try:
from curses import ascii
have_ascii = True
except:
have_ascii = False
def is_printable(c):
'''see if a character is printable'''
global have_ascii
if have_ascii:
return ascii.isprint(c)
if isinstance(c, int):
ic = c
else:
ic = ord(c)
return ic >= 32 and ic <= 126
def all_printable(buf):
'''see if a string is all printable'''
for c in buf:
if not is_printable(c) and not c in ['\r', '\n', '\t'] and not c in [ord('\r'), ord('\n'), ord('\t')]:
return False
return True
class SerialPort(object):
'''auto-detected serial port'''
def __init__(self, device, description=None, hwid=None):
self.device = device
self.description = description
self.hwid = hwid
def __str__(self):
ret = self.device
if self.description is not None:
ret += " : " + self.description
if self.hwid is not None:
ret += " : " + self.hwid
return ret
def auto_detect_serial_win32(preferred_list=['*']):
'''try to auto-detect serial ports on win32'''
try:
from serial.tools.list_ports_windows import comports
list = sorted(comports())
except:
return []
ret = []
others = []
for port, description, hwid in list:
matches = False
p = SerialPort(port, description=description, hwid=hwid)
for preferred in preferred_list:
if fnmatch.fnmatch(description, preferred) or fnmatch.fnmatch(hwid, preferred):
matches = True
if matches:
ret.append(p)
else:
others.append(p)
if len(ret) > 0:
return ret
# now the rest
ret.extend(others)
return ret
def auto_detect_serial_unix(preferred_list=['*']):
'''try to auto-detect serial ports on unix'''
import glob
glist = glob.glob('/dev/ttyS*') + glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*') + glob.glob('/dev/serial/by-id/*')
ret = []
others = []
# try preferred ones first
for d in glist:
matches = False
for preferred in preferred_list:
if fnmatch.fnmatch(d, preferred):
matches = True
if matches:
ret.append(SerialPort(d))
else:
others.append(SerialPort(d))
if len(ret) > 0:
return ret
ret.extend(others)
return ret
def auto_detect_serial(preferred_list=['*']):
'''try to auto-detect serial port'''
# see if
if os.name == 'nt':
return auto_detect_serial_win32(preferred_list=preferred_list)
return auto_detect_serial_unix(preferred_list=preferred_list)
def mode_string_v09(msg):
'''mode string for 0.9 protocol'''
mode = msg.mode
nav_mode = msg.nav_mode
MAV_MODE_UNINIT = 0
MAV_MODE_MANUAL = 2
MAV_MODE_GUIDED = 3
MAV_MODE_AUTO = 4
MAV_MODE_TEST1 = 5
MAV_MODE_TEST2 = 6
MAV_MODE_TEST3 = 7
MAV_NAV_GROUNDED = 0
MAV_NAV_LIFTOFF = 1
MAV_NAV_HOLD = 2
MAV_NAV_WAYPOINT = 3
MAV_NAV_VECTOR = 4
MAV_NAV_RETURNING = 5
MAV_NAV_LANDING = 6
MAV_NAV_LOST = 7
MAV_NAV_LOITER = 8
cmode = (mode, nav_mode)
mapping = {
(MAV_MODE_UNINIT, MAV_NAV_GROUNDED) : "INITIALISING",
(MAV_MODE_MANUAL, MAV_NAV_VECTOR) : "MANUAL",
(MAV_MODE_TEST3, MAV_NAV_VECTOR) : "CIRCLE",
(MAV_MODE_GUIDED, MAV_NAV_VECTOR) : "GUIDED",
(MAV_MODE_TEST1, MAV_NAV_VECTOR) : "STABILIZE",
(MAV_MODE_TEST2, MAV_NAV_LIFTOFF) : "FBWA",
(MAV_MODE_AUTO, MAV_NAV_WAYPOINT) : "AUTO",
(MAV_MODE_AUTO, MAV_NAV_RETURNING) : "RTL",
(MAV_MODE_AUTO, MAV_NAV_LOITER) : "LOITER",
(MAV_MODE_AUTO, MAV_NAV_LIFTOFF) : "TAKEOFF",
(MAV_MODE_AUTO, MAV_NAV_LANDING) : "LANDING",
(MAV_MODE_AUTO, MAV_NAV_HOLD) : "LOITER",
(MAV_MODE_GUIDED, MAV_NAV_VECTOR) : "GUIDED",
(MAV_MODE_GUIDED, MAV_NAV_WAYPOINT) : "GUIDED",
(100, MAV_NAV_VECTOR) : "STABILIZE",
(101, MAV_NAV_VECTOR) : "ACRO",
(102, MAV_NAV_VECTOR) : "ALT_HOLD",
(107, MAV_NAV_VECTOR) : "CIRCLE",
(109, MAV_NAV_VECTOR) : "LAND",
}
if cmode in mapping:
return mapping[cmode]
return "Mode(%s,%s)" % cmode
mode_mapping_apm = {
0 : 'MANUAL',
1 : 'CIRCLE',
2 : 'STABILIZE',
3 : 'TRAINING',
4 : 'ACRO',
5 : 'FBWA',
6 : 'FBWB',
7 : 'CRUISE',
8 : 'AUTOTUNE',
10 : 'AUTO',
11 : 'RTL',
12 : 'LOITER',
13 : 'TAKEOFF',
14 : 'AVOID_ADSB',
15 : 'GUIDED',
16 : 'INITIALISING',
17 : 'QSTABILIZE',
18 : 'QHOVER',
19 : 'QLOITER',
20 : 'QLAND',
21 : 'QRTL',
22 : 'QAUTOTUNE',
23 : 'QACRO',
}
mode_mapping_acm = {
0 : 'STABILIZE',
1 : 'ACRO',
2 : 'ALT_HOLD',
3 : 'AUTO',
4 : 'GUIDED',
5 : 'LOITER',
6 : 'RTL',
7 : 'CIRCLE',
8 : 'POSITION',
9 : 'LAND',
10 : 'OF_LOITER',
11 : 'DRIFT',
13 : 'SPORT',
14 : 'FLIP',
15 : 'AUTOTUNE',
16 : 'POSHOLD',
17 : 'BRAKE',
18 : 'THROW',
19 : 'AVOID_ADSB',
20 : 'GUIDED_NOGPS',
21 : 'SMART_RTL',
22 : 'FLOWHOLD',
23 : 'FOLLOW',
24 : 'ZIGZAG',
}
mode_mapping_rover = {
0 : 'MANUAL',
1 : 'ACRO',
2 : 'LEARNING',
3 : 'STEERING',
4 : 'HOLD',
5 : 'LOITER',
6 : 'FOLLOW',
7 : 'SIMPLE',
10 : 'AUTO',
11 : 'RTL',
12 : 'SMART_RTL',
15 : 'GUIDED',
16 : 'INITIALISING'
}
mode_mapping_tracker = {
0 : 'MANUAL',
1 : 'STOP',
2 : 'SCAN',
4 : 'GUIDED',
10 : 'AUTO',
16 : 'INITIALISING'
}
mode_mapping_sub = {
0: 'STABILIZE',
1: 'ACRO',
2: 'ALT_HOLD',
3: 'AUTO',
4: 'GUIDED',
7: 'CIRCLE',
9: 'SURFACE',
16: 'POSHOLD',
19: 'MANUAL',
}
# map from a PX4 "main_state" to a string; see msg/commander_state.msg
# This allows us to map sdlog STAT.MainState to a simple "mode"
# string, used in DFReader and possibly other places. These are
# related but distict from what is found in mavlink messages; see
# "Custom mode definitions", below.
mainstate_mapping_px4 = {
0 : 'MANUAL',
1 : 'ALTCTL',
2 : 'POSCTL',
3 : 'AUTO_MISSION',
4 : 'AUTO_LOITER',
5 : 'AUTO_RTL',
6 : 'ACRO',
7 : 'OFFBOARD',
8 : 'STAB',
9 : 'RATTITUDE',
10 : 'AUTO_TAKEOFF',
11 : 'AUTO_LAND',
12 : 'AUTO_FOLLOW_TARGET',
13 : 'MAX',
}
def mode_string_px4(MainState):
return mainstate_mapping_px4.get(MainState, "Unknown")
# Custom mode definitions from PX4
PX4_CUSTOM_MAIN_MODE_MANUAL = 1
PX4_CUSTOM_MAIN_MODE_ALTCTL = 2
PX4_CUSTOM_MAIN_MODE_POSCTL = 3
PX4_CUSTOM_MAIN_MODE_AUTO = 4
PX4_CUSTOM_MAIN_MODE_ACRO = 5
PX4_CUSTOM_MAIN_MODE_OFFBOARD = 6
PX4_CUSTOM_MAIN_MODE_STABILIZED = 7
PX4_CUSTOM_MAIN_MODE_RATTITUDE = 8
PX4_CUSTOM_SUB_MODE_OFFBOARD = 0
PX4_CUSTOM_SUB_MODE_AUTO_READY = 1
PX4_CUSTOM_SUB_MODE_AUTO_TAKEOFF = 2
PX4_CUSTOM_SUB_MODE_AUTO_LOITER = 3
PX4_CUSTOM_SUB_MODE_AUTO_MISSION = 4
PX4_CUSTOM_SUB_MODE_AUTO_RTL = 5
PX4_CUSTOM_SUB_MODE_AUTO_LAND = 6
PX4_CUSTOM_SUB_MODE_AUTO_RTGS = 7
PX4_CUSTOM_SUB_MODE_AUTO_FOLLOW_TARGET = 8
auto_mode_flags = mavlink.MAV_MODE_FLAG_AUTO_ENABLED \
| mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED \
| mavlink.MAV_MODE_FLAG_GUIDED_ENABLED
px4_map = { "MANUAL": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_MANUAL, 0 ),
"STABILIZED": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_STABILIZED, 0 ),
"ACRO": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_ACRO, 0 ),
"RATTITUDE": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_RATTITUDE, 0 ),
"ALTCTL": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_ALTCTL, 0 ),
"POSCTL": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_POSCTL, 0 ),
"LOITER": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_LOITER ),
"MISSION": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_MISSION ),
"RTL": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_RTL ),
"LAND": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_LAND ),
"RTGS": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_RTGS ),
"FOLLOWME": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_FOLLOW_TARGET ),
"OFFBOARD": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_OFFBOARD, 0 ),
"TAKEOFF": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_TAKEOFF )}
def interpret_px4_mode(base_mode, custom_mode):
custom_main_mode = (custom_mode & 0xFF0000) >> 16
custom_sub_mode = (custom_mode & 0xFF000000) >> 24
if base_mode & mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED != 0: #manual modes
if custom_main_mode == PX4_CUSTOM_MAIN_MODE_MANUAL:
return "MANUAL"
elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_ACRO:
return "ACRO"
elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_RATTITUDE:
return "RATTITUDE"
elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_STABILIZED:
return "STABILIZED"
elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_ALTCTL:
return "ALTCTL"
elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_POSCTL:
return "POSCTL"
elif (base_mode & auto_mode_flags) == auto_mode_flags: #auto modes
if custom_main_mode & PX4_CUSTOM_MAIN_MODE_AUTO != 0:
if custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_MISSION:
return "MISSION"
elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_TAKEOFF:
return "TAKEOFF"
elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_LOITER:
return "LOITER"
elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_FOLLOW_TARGET:
return "FOLLOWME"
elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_RTL:
return "RTL"
elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_LAND:
return "LAND"
elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_RTGS:
return "RTGS"
elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_OFFBOARD:
return "OFFBOARD"
return "UNKNOWN"
def mode_mapping_byname(mav_type):
'''return dictionary mapping mode names to numbers, or None if unknown'''
map = None
if mav_type in [mavlink.MAV_TYPE_QUADROTOR,
mavlink.MAV_TYPE_HELICOPTER,
mavlink.MAV_TYPE_HEXAROTOR,
mavlink.MAV_TYPE_OCTOROTOR,
mavlink.MAV_TYPE_COAXIAL,
mavlink.MAV_TYPE_TRICOPTER]:
map = mode_mapping_acm
if mav_type == mavlink.MAV_TYPE_FIXED_WING:
map = mode_mapping_apm
if mav_type == mavlink.MAV_TYPE_GROUND_ROVER:
map = mode_mapping_rover
if mav_type == mavlink.MAV_TYPE_SURFACE_BOAT:
map = mode_mapping_rover # for the time being
if mav_type == mavlink.MAV_TYPE_ANTENNA_TRACKER:
map = mode_mapping_tracker
if mav_type == mavlink.MAV_TYPE_SUBMARINE:
map = mode_mapping_sub
if map is None:
return None
inv_map = dict((a, b) for (b, a) in map.items())
return inv_map
def mode_mapping_bynumber(mav_type):
'''return dictionary mapping mode numbers to name, or None if unknown'''
map = None
if mav_type in [mavlink.MAV_TYPE_QUADROTOR,
mavlink.MAV_TYPE_HELICOPTER,
mavlink.MAV_TYPE_HEXAROTOR,
mavlink.MAV_TYPE_OCTOROTOR,
mavlink.MAV_TYPE_DODECAROTOR,
mavlink.MAV_TYPE_COAXIAL,
mavlink.MAV_TYPE_TRICOPTER]:
map = mode_mapping_acm
if mav_type == mavlink.MAV_TYPE_FIXED_WING:
map = mode_mapping_apm
if mav_type == mavlink.MAV_TYPE_GROUND_ROVER:
map = mode_mapping_rover
if mav_type == mavlink.MAV_TYPE_SURFACE_BOAT:
map = mode_mapping_rover # for the time being
if mav_type == mavlink.MAV_TYPE_ANTENNA_TRACKER:
map = mode_mapping_tracker
if mav_type == mavlink.MAV_TYPE_SUBMARINE:
map = mode_mapping_sub
if map is None:
return None
return map
def mode_string_v10(msg):
'''mode string for 1.0 protocol, from heartbeat'''
if msg.autopilot == mavlink.MAV_AUTOPILOT_PX4:
return interpret_px4_mode(msg.base_mode, msg.custom_mode)
if not msg.base_mode & mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED:
return "Mode(0x%08x)" % msg.base_mode
if msg.type in [ mavlink.MAV_TYPE_QUADROTOR, mavlink.MAV_TYPE_HEXAROTOR,
mavlink.MAV_TYPE_OCTOROTOR, mavlink.MAV_TYPE_TRICOPTER,
mavlink.MAV_TYPE_COAXIAL,
mavlink.MAV_TYPE_HELICOPTER ]:
if msg.custom_mode in mode_mapping_acm:
return mode_mapping_acm[msg.custom_mode]
if msg.type == mavlink.MAV_TYPE_FIXED_WING:
if msg.custom_mode in mode_mapping_apm:
return mode_mapping_apm[msg.custom_mode]
if msg.type == mavlink.MAV_TYPE_GROUND_ROVER:
if msg.custom_mode in mode_mapping_rover:
return mode_mapping_rover[msg.custom_mode]
if msg.type == mavlink.MAV_TYPE_SURFACE_BOAT:
if msg.custom_mode in mode_mapping_rover:
return mode_mapping_rover[msg.custom_mode]
if msg.type == mavlink.MAV_TYPE_ANTENNA_TRACKER:
if msg.custom_mode in mode_mapping_tracker:
return mode_mapping_tracker[msg.custom_mode]
if msg.type == mavlink.MAV_TYPE_SUBMARINE:
if msg.custom_mode in mode_mapping_sub:
return mode_mapping_sub[msg.custom_mode]
return "Mode(%u)" % msg.custom_mode
def mode_string_apm(mode_number):
'''return mode string for APM:Plane'''
if mode_number in mode_mapping_apm:
return mode_mapping_apm[mode_number]
return "Mode(%u)" % mode_number
def mode_string_acm(mode_number):
'''return mode string for APM:Copter'''
if mode_number in mode_mapping_acm:
return mode_mapping_acm[mode_number]
return "Mode(%u)" % mode_number
class x25crc(object):
'''x25 CRC - based on checksum.h from mavlink library'''
def __init__(self, buf=''):
self.crc = 0xffff
self.accumulate(buf)
def accumulate(self, buf):
'''add in some more bytes'''
byte_buf = array.array('B')
if isinstance(buf, array.array):
byte_buf.extend(buf)
else:
byte_buf.fromstring(buf)
accum = self.crc
for b in byte_buf:
tmp = b ^ (accum & 0xff)
tmp = (tmp ^ (tmp<<4)) & 0xFF
accum = (accum>>8) ^ (tmp<<8) ^ (tmp<<3) ^ (tmp>>4)
accum = accum & 0xFFFF
self.crc = accum
class MavlinkSerialPort(object):
'''an object that looks like a serial port, but
transmits using mavlink SERIAL_CONTROL packets'''
def __init__(self, portname, baudrate, devnum=0, devbaud=0, timeout=3, debug=0):
from . import mavutil
self.baudrate = 0
self.timeout = timeout
self._debug = debug
self.buf = ''
self.port = devnum
self.debug("Connecting with MAVLink to %s ..." % portname)
self.mav = mavutil.mavlink_connection(portname, autoreconnect=True, baud=baudrate)
self.mav.wait_heartbeat()
self.debug("HEARTBEAT OK\n")
if devbaud != 0:
self.setBaudrate(devbaud)
self.debug("Locked serial device\n")
def debug(self, s, level=1):
'''write some debug text'''
if self._debug >= level:
print(s)
def write(self, b):
'''write some bytes'''
from . import mavutil
self.debug("sending '%s' (0x%02x) of len %u\n" % (b, ord(b[0]), len(b)), 2)
while len(b) > 0:
n = len(b)
if n > 70:
n = 70
buf = [ord(x) for x in b[:n]]
buf.extend([0]*(70-len(buf)))
self.mav.mav.serial_control_send(self.port,
mavutil.mavlink.SERIAL_CONTROL_FLAG_EXCLUSIVE |
mavutil.mavlink.SERIAL_CONTROL_FLAG_RESPOND,
0,
0,
n,
buf)
b = b[n:]
def _recv(self):
'''read some bytes into self.buf'''
from . import mavutil
start_time = time.time()
while time.time() < start_time + self.timeout:
m = self.mav.recv_match(condition='SERIAL_CONTROL.count!=0',
type='SERIAL_CONTROL', blocking=False, timeout=0)
if m is not None and m.count != 0:
break
self.mav.mav.serial_control_send(self.port,
mavutil.mavlink.SERIAL_CONTROL_FLAG_EXCLUSIVE |
mavutil.mavlink.SERIAL_CONTROL_FLAG_RESPOND,
0,
0,
0, [0]*70)
m = self.mav.recv_match(condition='SERIAL_CONTROL.count!=0',
type='SERIAL_CONTROL', blocking=True, timeout=0.01)
if m is not None and m.count != 0:
break
if m is not None:
if self._debug > 2:
print(m)
data = m.data[:m.count]
self.buf += ''.join(str(chr(x)) for x in data)
def read(self, n):
'''read some bytes'''
if len(self.buf) == 0:
self._recv()
if len(self.buf) > 0:
if n > len(self.buf):
n = len(self.buf)
ret = self.buf[:n]
self.buf = self.buf[n:]
if self._debug >= 2:
for b in ret:
self.debug("read 0x%x" % ord(b), 2)
return ret
return ''
def flushInput(self):
'''flush any pending input'''
self.buf = ''
saved_timeout = self.timeout
self.timeout = 0.5
self._recv()
self.timeout = saved_timeout
self.buf = ''
self.debug("flushInput")
def setBaudrate(self, baudrate):
'''set baudrate'''
from . import mavutil
if self.baudrate == baudrate:
return
self.baudrate = baudrate
self.mav.mav.serial_control_send(self.port,
mavutil.mavlink.SERIAL_CONTROL_FLAG_EXCLUSIVE,
0,
self.baudrate,
0, [0]*70)
self.flushInput()
self.debug("Changed baudrate %u" % self.baudrate)
def decode_bitmask(messagetype, field, value):
try:
_class = eval("mavlink.MAVLink_%s_message" % messagetype.lower())
except AttributeError as e:
raise AttributeError("No such message type")
try:
display = _class.fielddisplays_by_name[field]
except KeyError as e:
raise AttributeError("Not a bitmask field (none specified)")
if display != "bitmask":
raise ValueError("Not a bitmask field")
try:
enum_name = _class.fieldenums_by_name[field]
except KeyError as e:
raise AttributeError("No enum found for bitmask field")
try:
enum = mavlink.enums[enum_name]
except KeyError as e:
raise AttributeError("Did not find specified enumeration (%s)" % enum_name)
class EnumBitInfo(object):
def __init__(self, offset, value, name):
self.offset = offset
self.value = value
self.name = name
ret = []
for i in range(0, 64):
bit_value = (1 << i)
try:
enum_entry = enum[bit_value]
enum_entry_name = enum_entry.name
except KeyError as e:
enum_entry_name = None
if value == 0:
continue
if value & bit_value:
ret.append( EnumBitInfo(i, True, enum_entry_name) )
value = value & ~bit_value
else:
ret.append( EnumBitInfo(i, False, enum_entry_name) )
return ret
def dump_message_verbose(f, m):
'''write an excruciatingly detailed dump of message m to file descriptor f'''
try:
# __getattr__ may be overridden on m, thus this try/except
timestamp = m._timestamp
except AttributeError as e:
timestamp = ""
if timestamp != "":
timestamp = "%s.%02u: " % (
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)),
int(timestamp*100.0)%100)
f.write("%s%s (link=%s) (signed=%s) (seq=%u) (src=%u/%u)\n" % (timestamp, m.get_type(), str(m.get_link_id()), str(m.get_signed()), m.get_seq(), m.get_srcSystem(), m.get_srcComponent()))
for fieldname in m.get_fieldnames():
# format in those most boring way possible:
value = m.format_attr(fieldname)
# try to add units:
try:
units = m.fieldunits_by_name[fieldname]
# perform simple unit conversions:
divisor = None
if units == "d%":
divisor = 10.0
units = "%"
if units == "c%":
divisor = 100.0
units = "%"
if units == "cA":
divisor = 100.0
units = "A"
elif units == "cdegC":
divisor = 100.0
units = "degC"
elif units == "cdeg":
divisor = 100.0
units = "deg"
elif units == "degE7":
divisor = 10000000.0
units = "deg"
elif units == "mG":
divisor = 1000.0
units = "G"
elif units == "mrad/s":
divisor = 1000.0
units = "rad/s"
elif units == "mV":
divisor = 1000.0
units = "V"
if divisor is not None:
if type(value) == list:
value = [x/divisor for x in value]
else:
value = value / divisor
# and give radians in degrees too:
if units == "rad":
value = "%s%s (%sdeg)" % (value, units, math.degrees(value))
elif units == "rad/s":
value = "%s%s (%sdeg/s)" % (value, units, math.degrees(value))
elif units == "rad/s/s":
value = "%s%s (%sdeg/s/s)" % (value, units, math.degrees(value))
else:
value = "%s%s" % (value, units)
except AttributeError as e:
# e.g. BAD_DATA
pass
except KeyError as e:
pass
# format any bitmask enumerations:
try:
enum_name = m.fieldenums_by_name[fieldname]
display = m.fielddisplays_by_name[fieldname]
if enum_name is not None and display == "bitmask":
bits = decode_bitmask(m.get_type(), fieldname, value)
f.write(" %s: %s\n" % (fieldname, value))
for bit in bits:
value = bit.value
name = bit.name
svalue = " "
if not value:
svalue = "!"
if name is None:
name = "[UNKNOWN]"
f.write(" %s %s\n" % (svalue, name))
continue
# except NameError as e:
# pass
except AttributeError as e:
# e.g. BAD_DATA
pass
except KeyError as e:
pass
# add any enumeration name:
try:
enum_name = m.fieldenums_by_name[fieldname]
try:
enum_value = mavlink.enums[enum_name][value].name
value = "%s (%s)" % (value, enum_value)
except KeyError as e:
value = "%s (%s)" % (value, "[UNKNOWN]")
except AttributeError as e:
# e.g. BAD_DATA
pass
except KeyError as e:
pass
f.write(" %s: %s\n" % (fieldname, value))
if __name__ == '__main__':
serial_list = auto_detect_serial(preferred_list=['*FTDI*',"*Arduino_Mega_2560*", "*3D_Robotics*", "*USB_to_UART*", '*PX4*', '*FMU*'])
for port in serial_list:
print("%s" % port)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。