1 Star 0 Fork 2

shaopengyuan/pymavlink

forked from 科技ing/pymavlink 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
mavutil.py 90.97 KB
一键复制 编辑 原始数据 按行查看 历史
Peter Barker 提交于 2020-06-15 20:30 . mavutil: stop double-rebooting
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442
#!/usr/bin/env python
'''
mavlink python utility functions
Copyright Andrew Tridgell 2011-2019
Released under GNU LGPL version 3 or later
'''
from __future__ import print_function
from builtins import object
import socket, math, struct, time, os, fnmatch, array, sys, errno
import select
from pymavlink import mavexpression
# adding these extra imports allows pymavlink to be used directly with pyinstaller
# without having complex spec files. To allow for installs that don't have ardupilotmega
# at all we avoid throwing an exception if it isn't installed
try:
import json
from pymavlink.dialects.v10 import ardupilotmega
except Exception:
pass
# maximum packet length for a single receive call - use the UDP limit
UDP_MAX_PACKET_LEN = 65535
# Store the MAVLink library for the currently-selected dialect
# (set by set_dialect())
mavlink = None
# Store the mavlink file currently being operated on
# (set by mavlink_connection())
mavfile_global = None
# If the caller hasn't specified a particular native/legacy version, use this
default_native = False
# link_id used for signing
global_link_id = 0
# Use a globally-set MAVLink dialect if one has been specified as an environment variable.
if not 'MAVLINK_DIALECT' in os.environ:
os.environ['MAVLINK_DIALECT'] = 'ardupilotmega'
def mavlink10():
'''return True if using MAVLink 1.0 or later'''
return not 'MAVLINK09' in os.environ
def mavlink20():
'''return True if using MAVLink 2.0'''
return 'MAVLINK20' in os.environ
def evaluate_expression(expression, vars):
'''evaluation an expression'''
return mavexpression.evaluate_expression(expression, vars)
def evaluate_condition(condition, vars):
'''evaluation a conditional (boolean) statement'''
if condition is None:
return True
v = evaluate_expression(condition, vars)
if v is None:
return False
return v
def u_ord(c):
return ord(c) if sys.version_info.major < 3 else c
class location(object):
'''represent a GPS coordinate'''
def __init__(self, lat, lng, alt=0, heading=0):
self.lat = lat
self.lng = lng
self.alt = alt
self.heading = heading
def __str__(self):
return "lat=%.6f,lon=%.6f,alt=%.1f" % (self.lat, self.lng, self.alt)
def set_dialect(dialect):
'''set the MAVLink dialect to work with.
For example, set_dialect("ardupilotmega")
'''
global mavlink, current_dialect
from .generator import mavparse
if 'MAVLINK20' in os.environ:
wire_protocol = mavparse.PROTOCOL_2_0
modname = "pymavlink.dialects.v20." + dialect
elif mavlink is None or mavlink.WIRE_PROTOCOL_VERSION == "1.0" or not 'MAVLINK09' in os.environ:
wire_protocol = mavparse.PROTOCOL_1_0
modname = "pymavlink.dialects.v10." + dialect
else:
wire_protocol = mavparse.PROTOCOL_0_9
modname = "pymavlink.dialects.v09." + dialect
try:
mod = __import__(modname)
except Exception:
# auto-generate the dialect module
from .generator.mavgen import mavgen_python_dialect
mavgen_python_dialect(dialect, wire_protocol)
mod = __import__(modname)
components = modname.split('.')
for comp in components[1:]:
mod = getattr(mod, comp)
current_dialect = dialect
mavlink = mod
# Set the default dialect. This is done here as it needs to be after the function declaration
set_dialect(os.environ['MAVLINK_DIALECT'])
class mavfile_state(object):
'''state for a particular system id'''
def __init__(self):
self.messages = { 'MAV' : self }
self.flightmode = "UNKNOWN"
self.vehicle_type = "UNKNOWN"
self.mav_type = mavlink.MAV_TYPE_FIXED_WING
self.base_mode = 0
self.armed = False # canonical arm state for the vehicle as a whole
if float(mavlink.WIRE_PROTOCOL_VERSION) >= 1:
self.messages['HOME'] = mavlink.MAVLink_gps_raw_int_message(0,0,0,0,0,0,0,0,0,0)
mavlink.MAVLink_waypoint_message = mavlink.MAVLink_mission_item_message
else:
self.messages['HOME'] = mavlink.MAVLink_gps_raw_message(0,0,0,0,0,0,0,0,0)
class param_state(object):
'''state for a particular system id/component id pair'''
def __init__(self):
self.params = {}
class mavfile(object):
'''a generic mavlink port'''
def __init__(self, fd, address, source_system=255, source_component=0, notimestamps=False, input=True, use_native=default_native):
global mavfile_global
if input:
mavfile_global = self
self.fd = fd
self.sysid = 0
self.param_sysid = (0,0)
self.address = address
self.timestamp = 0
self.last_seq = {}
self.mav_loss = 0
self.mav_count = 0
self.param_fetch_start = 0
# state for each sysid
self.sysid_state = {}
self.sysid_state[self.sysid] = mavfile_state()
# param state for each sysid/compid tuple
self.param_state = {}
self.param_state[self.param_sysid] = param_state()
# status of param fetch, indexed by sysid,compid tuple
self.source_system = source_system
self.source_component = source_component
self.first_byte = True
self.robust_parsing = True
self.mav = mavlink.MAVLink(self, srcSystem=self.source_system, srcComponent=self.source_component, use_native=use_native)
self.mav.robust_parsing = self.robust_parsing
self.logfile = None
self.logfile_raw = None
self.start_time = time.time()
self.message_hooks = []
self.idle_hooks = []
self.uptime = 0.0
self.notimestamps = notimestamps
self._timestamp = None
self.WIRE_PROTOCOL_VERSION = mavlink.WIRE_PROTOCOL_VERSION
self.stop_on_EOF = False
self.portdead = False
@property
def target_system(self):
return self.sysid
@property
def target_component(self):
return self.param_sysid[1]
@target_system.setter
def target_system(self, value):
self.sysid = value
if not self.sysid in self.sysid_state:
self.sysid_state[self.sysid] = mavfile_state()
if self.sysid != self.param_sysid[0]:
self.param_sysid = (self.sysid, self.param_sysid[1])
if not self.param_sysid in self.param_state:
self.param_state[self.param_sysid] = param_state()
@target_component.setter
def target_component(self, value):
if value != self.param_sysid[1]:
self.param_sysid = (self.param_sysid[0], value)
if not self.param_sysid in self.param_state:
self.param_state[self.param_sysid] = param_state()
@property
def params(self):
if self.param_sysid[1] == 0:
eff_tuple = (self.sysid, 1)
if eff_tuple in self.param_state:
return getattr(self.param_state[eff_tuple],'params')
return getattr(self.param_state[self.param_sysid],'params')
@property
def messages(self):
return getattr(self.sysid_state[self.sysid],'messages')
@property
def flightmode(self):
return getattr(self.sysid_state[self.sysid],'flightmode')
@flightmode.setter
def flightmode(self, value):
setattr(self.sysid_state[self.sysid],'flightmode',value)
@property
def vehicle_type(self):
return getattr(self.sysid_state[self.sysid],'vehicle_type')
@vehicle_type.setter
def vehicle_type(self, value):
setattr(self.sysid_state[self.sysid],'vehicle_type',value)
@property
def mav_type(self):
return getattr(self.sysid_state[self.sysid],'mav_type')
@mav_type.setter
def mav_type(self, value):
setattr(self.sysid_state[self.sysid],'mav_type',value)
@property
def base_mode(self):
return getattr(self.sysid_state[self.sysid],'base_mode')
@base_mode.setter
def base_mode(self, value):
setattr(self.sysid_state[self.sysid],'base_mode',value)
def auto_mavlink_version(self, buf):
'''auto-switch mavlink protocol version'''
global mavlink
if len(buf) == 0:
return
try:
magic = ord(buf[0])
except:
magic = buf[0]
if not magic in [ 85, 254, 253 ]:
return
self.first_byte = False
if self.WIRE_PROTOCOL_VERSION == "0.9" and magic == 254:
self.WIRE_PROTOCOL_VERSION = "1.0"
set_dialect(current_dialect)
elif self.WIRE_PROTOCOL_VERSION == "1.0" and magic == 85:
self.WIRE_PROTOCOL_VERSION = "0.9"
os.environ['MAVLINK09'] = '1'
set_dialect(current_dialect)
elif self.WIRE_PROTOCOL_VERSION != "2.0" and magic == 253:
self.WIRE_PROTOCOL_VERSION = "2.0"
os.environ['MAVLINK20'] = '1'
set_dialect(current_dialect)
else:
return
# switch protocol
(callback, callback_args, callback_kwargs) = (self.mav.callback,
self.mav.callback_args,
self.mav.callback_kwargs)
self.mav = mavlink.MAVLink(self, srcSystem=self.source_system, srcComponent=self.source_component)
self.mav.robust_parsing = self.robust_parsing
self.WIRE_PROTOCOL_VERSION = mavlink.WIRE_PROTOCOL_VERSION
(self.mav.callback, self.mav.callback_args, self.mav.callback_kwargs) = (callback,
callback_args,
callback_kwargs)
def recv(self, n=None):
'''default recv method'''
raise RuntimeError('no recv() method supplied')
def close(self, n=None):
'''default close method'''
raise RuntimeError('no close() method supplied')
def write(self, buf):
'''default write method'''
raise RuntimeError('no write() method supplied')
def select(self, timeout):
'''wait for up to timeout seconds for more data'''
if self.fd is None:
time.sleep(min(timeout,0.5))
return True
try:
(rin, win, xin) = select.select([self.fd], [], [], timeout)
except select.error:
return False
return len(rin) == 1
def pre_message(self):
'''default pre message call'''
return
def set_rtscts(self, enable):
'''enable/disable RTS/CTS if applicable'''
return
def probably_vehicle_heartbeat(self, msg):
if msg.get_srcComponent() == mavlink.MAV_COMP_ID_GIMBAL:
return False
if msg.type in (mavlink.MAV_TYPE_GCS,
mavlink.MAV_TYPE_GIMBAL,
mavlink.MAV_TYPE_ADSB,
mavlink.MAV_TYPE_ONBOARD_CONTROLLER):
return False
return True
def post_message(self, msg):
'''default post message call'''
if '_posted' in msg.__dict__:
return
msg._posted = True
msg._timestamp = time.time()
type = msg.get_type()
if 'usec' in msg.__dict__:
self.uptime = msg.usec * 1.0e-6
if 'time_boot_ms' in msg.__dict__:
self.uptime = msg.time_boot_ms * 1.0e-3
if self._timestamp is not None:
if self.notimestamps:
msg._timestamp = self.uptime
else:
msg._timestamp = self._timestamp
src_system = msg.get_srcSystem()
src_component = msg.get_srcComponent()
src_tuple = (src_system, src_component)
radio_tuple = (ord('3'), ord('D'))
if not src_system in self.sysid_state:
# we've seen a new system
self.sysid_state[src_system] = mavfile_state()
self.sysid_state[src_system].messages[type] = msg
if src_tuple == radio_tuple:
# as a special case radio msgs are added for all sysids
for s in self.sysid_state.keys():
self.sysid_state[s].messages[type] = msg
if not (src_tuple == radio_tuple or msg.get_type() == 'BAD_DATA'):
if not src_tuple in self.last_seq:
last_seq = -1
else:
last_seq = self.last_seq[src_tuple]
seq = (last_seq+1) % 256
seq2 = msg.get_seq()
if seq != seq2 and last_seq != -1:
diff = (seq2 - seq) % 256
self.mav_loss += diff
#print("lost %u seq=%u seq2=%u last_seq=%u src_tupe=%s %s" % (diff, seq, seq2, last_seq, str(src_tuple), msg.get_type()))
self.last_seq[src_tuple] = seq2
self.mav_count += 1
self.timestamp = msg._timestamp
if type == 'HEARTBEAT' and self.probably_vehicle_heartbeat(msg):
if self.sysid == 0:
# lock onto id tuple of first vehicle heartbeat
self.sysid = src_system
if float(mavlink.WIRE_PROTOCOL_VERSION) >= 1:
self.flightmode = mode_string_v10(msg)
self.mav_type = msg.type
self.base_mode = msg.base_mode
self.sysid_state[self.sysid].armed = (msg.base_mode & mavlink.MAV_MODE_FLAG_SAFETY_ARMED)
elif type == 'PARAM_VALUE':
if not src_tuple in self.param_state:
self.param_state[src_tuple] = param_state()
self.param_state[src_tuple].params[msg.param_id] = msg.param_value
elif type == 'SYS_STATUS' and mavlink.WIRE_PROTOCOL_VERSION == '0.9':
self.flightmode = mode_string_v09(msg)
elif type == 'GPS_RAW':
if self.sysid_state[src_system].messages['HOME'].fix_type < 2:
self.sysid_state[src_system].messages['HOME'] = msg
elif type == 'GPS_RAW_INT':
if self.sysid_state[src_system].messages['HOME'].fix_type < 3:
self.sysid_state[src_system].messages['HOME'] = msg
for hook in self.message_hooks:
hook(self, msg)
if (msg.get_signed() and
self.mav.signing.link_id == 0 and
msg.get_link_id() != 0 and
self.target_system == msg.get_srcSystem() and
self.target_component == msg.get_srcComponent()):
# change to link_id from incoming packet
self.mav.signing.link_id = msg.get_link_id()
def packet_loss(self):
'''packet loss as a percentage'''
if self.mav_count == 0:
return 0
return (100.0*self.mav_loss)/(self.mav_count+self.mav_loss)
def recv_msg(self):
'''message receive routine'''
self.pre_message()
while True:
n = self.mav.bytes_needed()
s = self.recv(n)
numnew = len(s)
if numnew != 0:
if self.logfile_raw:
self.logfile_raw.write(str(s))
if self.first_byte:
self.auto_mavlink_version(s)
# We always call parse_char even if the new string is empty, because the existing message buf might already have some valid packet
# we can extract
msg = self.mav.parse_char(s)
if msg:
if self.logfile and msg.get_type() != 'BAD_DATA' :
usec = int(time.time() * 1.0e6) & ~3
self.logfile.write(str(struct.pack('>Q', usec) + msg.get_msgbuf()))
self.post_message(msg)
return msg
else:
# if we failed to parse any messages _and_ no new bytes arrived, return immediately so the client has the option to
# timeout
if numnew == 0:
return None
def recv_match(self, condition=None, type=None, blocking=False, timeout=None):
'''recv the next MAVLink message that matches the given condition
type can be a string or a list of strings'''
if type is not None and not isinstance(type, list) and not isinstance(type, set):
type = [type]
start_time = time.time()
while True:
if timeout is not None:
now = time.time()
if now < start_time:
start_time = now # If an external process rolls back system time, we should not spin forever.
if start_time + timeout < time.time():
return None
m = self.recv_msg()
if m is None:
if blocking:
for hook in self.idle_hooks:
hook(self)
if timeout is None:
self.select(0.05)
else:
self.select(timeout/2)
continue
return None
if type is not None and not m.get_type() in type:
continue
if not evaluate_condition(condition, self.messages):
continue
return m
def check_condition(self, condition):
'''check if a condition is true'''
return evaluate_condition(condition, self.messages)
def mavlink10(self):
'''return True if using MAVLink 1.0 or later'''
return float(self.WIRE_PROTOCOL_VERSION) >= 1
def mavlink20(self):
'''return True if using MAVLink 2.0 or later'''
return float(self.WIRE_PROTOCOL_VERSION) >= 2
def setup_logfile(self, logfile, mode='w'):
'''start logging to the given logfile, with timestamps'''
self.logfile = open(logfile, mode=mode)
def setup_logfile_raw(self, logfile, mode='w'):
'''start logging raw bytes to the given logfile, without timestamps'''
self.logfile_raw = open(logfile, mode=mode)
def wait_heartbeat(self, blocking=True, timeout=None):
'''wait for a heartbeat so we know the target system IDs'''
return self.recv_match(type='HEARTBEAT', blocking=blocking, timeout=timeout)
def param_fetch_all(self):
'''initiate fetch of all parameters'''
if time.time() - self.param_fetch_start < 2.0:
# don't fetch too often
return
self.param_fetch_start = time.time()
self.mav.param_request_list_send(self.target_system, self.target_component)
def param_fetch_one(self, name):
'''initiate fetch of one parameter'''
try:
idx = int(name)
self.mav.param_request_read_send(self.target_system, self.target_component, b"", idx)
except Exception:
if sys.version_info.major >= 3 and not isinstance(name, bytes):
name = bytes(name,'ascii')
self.mav.param_request_read_send(self.target_system, self.target_component, name, -1)
def time_since(self, mtype):
'''return the time since the last message of type mtype was received'''
if not mtype in self.messages:
return time.time() - self.start_time
return time.time() - self.messages[mtype]._timestamp
def param_set_send(self, parm_name, parm_value, parm_type=None):
'''wrapper for parameter set'''
if self.mavlink10():
if parm_type is None:
parm_type = mavlink.MAVLINK_TYPE_FLOAT
self.mav.param_set_send(self.target_system, self.target_component,
parm_name.encode('utf8'), parm_value, parm_type)
else:
self.mav.param_set_send(self.target_system, self.target_component,
parm_name.encode('utf8'), parm_value)
def waypoint_request_list_send(self):
'''wrapper for waypoint_request_list_send'''
if self.mavlink10():
self.mav.mission_request_list_send(self.target_system, self.target_component)
else:
self.mav.waypoint_request_list_send(self.target_system, self.target_component)
def waypoint_clear_all_send(self):
'''wrapper for waypoint_clear_all_send'''
if self.mavlink10():
self.mav.mission_clear_all_send(self.target_system, self.target_component)
else:
self.mav.waypoint_clear_all_send(self.target_system, self.target_component)
def waypoint_request_send(self, seq):
'''wrapper for waypoint_request_send'''
if self.mavlink10():
self.mav.mission_request_send(self.target_system, self.target_component, seq)
else:
self.mav.waypoint_request_send(self.target_system, self.target_component, seq)
def waypoint_set_current_send(self, seq):
'''wrapper for waypoint_set_current_send'''
if self.mavlink10():
self.mav.mission_set_current_send(self.target_system, self.target_component, seq)
else:
self.mav.waypoint_set_current_send(self.target_system, self.target_component, seq)
def waypoint_current(self):
'''return current waypoint'''
if self.mavlink10():
m = self.recv_match(type='MISSION_CURRENT', blocking=True)
else:
m = self.recv_match(type='WAYPOINT_CURRENT', blocking=True)
return m.seq
def waypoint_count_send(self, seq):
'''wrapper for waypoint_count_send'''
if self.mavlink10():
self.mav.mission_count_send(self.target_system, self.target_component, seq)
else:
self.mav.waypoint_count_send(self.target_system, self.target_component, seq)
def set_mode_flag(self, flag, enable):
'''
Enables/ disables MAV_MODE_FLAG
@param flag The mode flag,
see MAV_MODE_FLAG enum
@param enable Enable the flag, (True/False)
'''
if self.mavlink10():
mode = self.base_mode
if enable:
mode = mode | flag
elif not enable:
mode = mode & ~flag
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_DO_SET_MODE, 0,
mode,
0, 0, 0, 0, 0, 0)
else:
print("Set mode flag not supported")
def set_mode_auto(self):
'''enter auto mode'''
if self.mavlink10():
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_MISSION_START, 0, 0, 0, 0, 0, 0, 0, 0)
else:
MAV_ACTION_SET_AUTO = 13
self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_SET_AUTO)
def mode_mapping(self):
'''return dictionary mapping mode names to numbers, or None if unknown'''
mav_type = self.field('HEARTBEAT', 'type', self.mav_type)
mav_autopilot = self.field('HEARTBEAT', 'autopilot', None)
if mav_autopilot == mavlink.MAV_AUTOPILOT_PX4:
return px4_map
if mav_type is None:
return None
map = None
if mav_type in [mavlink.MAV_TYPE_QUADROTOR,
mavlink.MAV_TYPE_HELICOPTER,
mavlink.MAV_TYPE_HEXAROTOR,
mavlink.MAV_TYPE_OCTOROTOR,
mavlink.MAV_TYPE_DODECAROTOR,
mavlink.MAV_TYPE_COAXIAL,
mavlink.MAV_TYPE_TRICOPTER]:
map = mode_mapping_acm
if mav_type == mavlink.MAV_TYPE_FIXED_WING:
map = mode_mapping_apm
if mav_type == mavlink.MAV_TYPE_GROUND_ROVER:
map = mode_mapping_rover
if mav_type == mavlink.MAV_TYPE_SURFACE_BOAT:
map = mode_mapping_rover # for the time being
if mav_type == mavlink.MAV_TYPE_ANTENNA_TRACKER:
map = mode_mapping_tracker
if mav_type == mavlink.MAV_TYPE_SUBMARINE:
map = mode_mapping_sub
if map is None:
return None
inv_map = dict((a, b) for (b, a) in map.items())
return inv_map
def set_mode_apm(self, mode, custom_mode = 0, custom_sub_mode = 0):
'''enter arbitrary mode'''
if isinstance(mode, str):
mode_map = self.mode_mapping()
if mode_map is None or mode not in mode_map:
print("Unknown mode '%s'" % mode)
return
mode = mode_map[mode]
# set mode by integer mode number for ArduPilot
self.mav.command_long_send(self.target_system,
self.target_component,
mavlink.MAV_CMD_DO_SET_MODE,
0,
mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED,
mode,
0,
0,
0,
0,
0)
def set_mode_px4(self, mode, custom_mode, custom_sub_mode):
'''enter arbitrary mode'''
if isinstance(mode, str):
mode_map = self.mode_mapping()
if mode_map is None or mode not in mode_map:
print("Unknown mode '%s'" % mode)
return
# PX4 uses two fields to define modes
mode, custom_mode, custom_sub_mode = px4_map[mode]
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_DO_SET_MODE, 0, mode, custom_mode, custom_sub_mode, 0, 0, 0, 0)
def set_mode(self, mode, custom_mode = 0, custom_sub_mode = 0):
'''set arbitrary flight mode'''
mav_autopilot = self.field('HEARTBEAT', 'autopilot', None)
if mav_autopilot == mavlink.MAV_AUTOPILOT_PX4:
self.set_mode_px4(mode, custom_mode, custom_sub_mode)
else:
self.set_mode_apm(mode)
def set_mode_rtl(self):
'''enter RTL mode'''
if self.mavlink10():
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_NAV_RETURN_TO_LAUNCH, 0, 0, 0, 0, 0, 0, 0, 0)
else:
MAV_ACTION_RETURN = 3
self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_RETURN)
def set_mode_manual(self):
'''enter MANUAL mode'''
if self.mavlink10():
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_DO_SET_MODE, 0,
mavlink.MAV_MODE_MANUAL_ARMED,
0, 0, 0, 0, 0, 0)
else:
MAV_ACTION_SET_MANUAL = 12
self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_SET_MANUAL)
def set_mode_fbwa(self):
'''enter FBWA mode'''
if self.mavlink10():
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_DO_SET_MODE, 0,
mavlink.MAV_MODE_STABILIZE_ARMED,
0, 0, 0, 0, 0, 0)
else:
print("Forcing FBWA not supported")
def set_mode_loiter(self):
'''enter LOITER mode'''
if self.mavlink10():
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_NAV_LOITER_UNLIM, 0, 0, 0, 0, 0, 0, 0, 0)
else:
MAV_ACTION_LOITER = 27
self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_LOITER)
def set_servo(self, channel, pwm):
'''set a servo value'''
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_DO_SET_SERVO, 0,
channel, pwm,
0, 0, 0, 0, 0)
def set_relay(self, relay_pin=0, state=True):
'''Set relay_pin to value of state'''
if self.mavlink10():
self.mav.command_long_send(
self.target_system, # target_system
self.target_component, # target_component
mavlink.MAV_CMD_DO_SET_RELAY, # command
0, # Confirmation
relay_pin, # Relay Number
int(state), # state (1 to indicate arm)
0, # param3 (all other params meaningless)
0, # param4
0, # param5
0, # param6
0) # param7
else:
print("Setting relays not supported.")
def calibrate_level(self):
'''calibrate accels (1D version)'''
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_PREFLIGHT_CALIBRATION, 0,
1, 1, 0, 0, 0, 0, 0)
def calibrate_pressure(self):
'''calibrate pressure'''
if self.mavlink10():
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_PREFLIGHT_CALIBRATION, 0,
0, 0, 1, 0, 0, 0, 0)
else:
MAV_ACTION_CALIBRATE_PRESSURE = 20
self.mav.action_send(self.target_system, self.target_component, MAV_ACTION_CALIBRATE_PRESSURE)
def reboot_autopilot(self, hold_in_bootloader=False):
'''reboot the autopilot'''
if self.mavlink10():
if hold_in_bootloader:
param1 = 3
else:
param1 = 1
self.mav.command_long_send(self.target_system, self.target_component,
mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN, 0,
param1, 0, 0, 0, 0, 0, 0)
def wait_gps_fix(self):
self.recv_match(type='VFR_HUD', blocking=True)
if self.mavlink10():
self.recv_match(type='GPS_RAW_INT', blocking=True,
condition='GPS_RAW_INT.fix_type>=3 and GPS_RAW_INT.lat != 0')
else:
self.recv_match(type='GPS_RAW', blocking=True,
condition='GPS_RAW.fix_type>=2 and GPS_RAW.lat != 0')
def location(self, relative_alt=False):
'''return current location'''
self.wait_gps_fix()
# wait for another VFR_HUD, to ensure we have correct altitude
self.recv_match(type='VFR_HUD', blocking=True)
self.recv_match(type='GLOBAL_POSITION_INT', blocking=True)
if relative_alt:
alt = self.messages['GLOBAL_POSITION_INT'].relative_alt*0.001
else:
alt = self.messages['VFR_HUD'].alt
return location(self.messages['GPS_RAW_INT'].lat*1.0e-7,
self.messages['GPS_RAW_INT'].lon*1.0e-7,
alt,
self.messages['VFR_HUD'].heading)
def arducopter_arm(self):
'''arm motors (arducopter only)'''
if self.mavlink10():
self.mav.command_long_send(
self.target_system, # target_system
self.target_component,
mavlink.MAV_CMD_COMPONENT_ARM_DISARM, # command
0, # confirmation
1, # param1 (1 to indicate arm)
0, # param2 (all other params meaningless)
0, # param3
0, # param4
0, # param5
0, # param6
0) # param7
def arducopter_disarm(self):
'''calibrate pressure'''
if self.mavlink10():
self.mav.command_long_send(
self.target_system, # target_system
self.target_component,
mavlink.MAV_CMD_COMPONENT_ARM_DISARM, # command
0, # confirmation
0, # param1 (0 to indicate disarm)
0, # param2 (all other params meaningless)
0, # param3
0, # param4
0, # param5
0, # param6
0) # param7
def motors_armed(self):
'''return true if motors armed'''
return self.sysid_state[self.sysid].armed
def motors_armed_wait(self):
'''wait for motors to be armed'''
while True:
m = self.wait_heartbeat()
if self.motors_armed():
return
def motors_disarmed_wait(self):
'''wait for motors to be disarmed'''
while True:
m = self.wait_heartbeat()
if not self.motors_armed():
return
def field(self, type, field, default=None):
'''convenient function for returning an arbitrary MAVLink
field with a default'''
if not type in self.messages:
return default
return getattr(self.messages[type], field, default)
def param(self, name, default=None):
'''convenient function for returning an arbitrary MAVLink
parameter with a default'''
if not name in self.params:
return default
return self.params[name]
def setup_signing(self, secret_key, sign_outgoing=True, allow_unsigned_callback=None, initial_timestamp=None, link_id=None):
'''setup for MAVLink2 signing'''
self.mav.signing.secret_key = secret_key
self.mav.signing.sign_outgoing = sign_outgoing
self.mav.signing.allow_unsigned_callback = allow_unsigned_callback
if link_id is None:
# auto-increment the link_id for each link
global global_link_id
link_id = global_link_id
global_link_id = min(global_link_id + 1, 255)
self.mav.signing.link_id = link_id
if initial_timestamp is None:
# timestamp is time since 1/1/2015
epoch_offset = 1420070400
now = max(time.time(), epoch_offset)
initial_timestamp = now - epoch_offset
initial_timestamp = int(initial_timestamp * 100 * 1000)
# initial_timestamp is in 10usec units
self.mav.signing.timestamp = initial_timestamp
def disable_signing(self):
'''disable MAVLink2 signing'''
self.mav.signing.secret_key = None
self.mav.signing.sign_outgoing = False
self.mav.signing.allow_unsigned_callback = None
self.mav.signing.link_id = 0
self.mav.signing.timestamp = 0
def set_close_on_exec(fd):
'''set the clone on exec flag on a file descriptor. Ignore exceptions'''
try:
import fcntl
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
flags |= fcntl.FD_CLOEXEC
fcntl.fcntl(fd, fcntl.F_SETFD, flags)
except Exception:
pass
class FakeSerial():
def __init__(self):
pass
def read(self, len):
return ""
def write(self, buf):
raise Exception("write always fails")
def inWaiting(self):
return 0
def close(self):
pass
class mavserial(mavfile):
'''a serial mavlink port'''
def __init__(self, device, baud=115200, autoreconnect=False, source_system=255, source_component=0, use_native=default_native, force_connected=False):
import serial
if ',' in device and not os.path.exists(device):
device, baud = device.split(',')
self.baud = baud
self.device = device
self.autoreconnect = autoreconnect
self.force_connected = force_connected
# we rather strangely set the baudrate initially to 1200, then change to the desired
# baudrate. This works around a kernel bug on some Linux kernels where the baudrate
# is not set correctly
try:
self.port = serial.Serial(self.device, 1200, timeout=0,
dsrdtr=False, rtscts=False, xonxoff=False)
except serial.SerialException as e:
if not force_connected:
raise e
self.port = FakeSerial()
try:
fd = self.port.fileno()
set_close_on_exec(fd)
except Exception:
fd = None
self.set_baudrate(self.baud)
mavfile.__init__(self, fd, device, source_system=source_system, source_component=source_component, use_native=use_native)
self.rtscts = False
def set_rtscts(self, enable):
'''enable/disable RTS/CTS if applicable'''
try:
self.port.setRtsCts(enable)
except Exception:
self.port.rtscts = enable
self.rtscts = enable
def set_baudrate(self, baudrate):
'''set baudrate'''
try:
self.port.setBaudrate(baudrate)
except Exception:
# for pySerial 3.0, which doesn't have setBaudrate()
self.port.baudrate = baudrate
def close(self):
self.port.close()
def recv(self,n=None):
if n is None:
n = self.mav.bytes_needed()
if self.fd is None:
waiting = self.port.inWaiting()
if waiting < n:
n = waiting
ret = self.port.read(n)
return ret
def write(self, buf):
try:
return self.port.write(bytes(buf))
except Exception:
if not self.portdead:
print("Device %s is dead" % self.device)
self.portdead = True
if self.autoreconnect:
self.reset()
return -1
def reset(self):
import serial
try:
try:
newport = serial.Serial(self.device, self.baud, timeout=0,
dsrdtr=False, rtscts=False, xonxoff=False)
except serial.SerialException as e:
if not self.force_connected:
raise e
newport = FakeSerial()
return False
self.port.close()
self.port = newport
print("Device %s reopened OK" % self.device)
self.portdead = False
try:
self.fd = self.port.fileno()
except Exception:
self.fd = None
self.set_baudrate(self.baud)
if self.rtscts:
self.set_rtscts(self.rtscts)
return True
except Exception:
return False
class mavudp(mavfile):
'''a UDP mavlink socket'''
def __init__(self, device, input=True, broadcast=False, source_system=255, source_component=0, use_native=default_native):
a = device.split(':')
if len(a) != 2:
print("UDP ports must be specified as host:port")
sys.exit(1)
self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_server = input
self.broadcast = False
if input:
self.port.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.port.bind((a[0], int(a[1])))
else:
self.destination_addr = (a[0], int(a[1]))
if broadcast:
self.port.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self.broadcast = True
set_close_on_exec(self.port.fileno())
self.port.setblocking(0)
self.last_address = None
self.resolved_destination_addr = None
mavfile.__init__(self, self.port.fileno(), device, source_system=source_system, source_component=source_component, input=input, use_native=use_native)
def close(self):
self.port.close()
def recv(self,n=None):
try:
data, new_addr = self.port.recvfrom(UDP_MAX_PACKET_LEN)
except socket.error as e:
if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.ECONNREFUSED ]:
return ""
raise
if self.udp_server or self.broadcast:
self.last_address = new_addr
return data
def write(self, buf):
try:
if self.udp_server:
if self.last_address:
self.port.sendto(buf, self.last_address)
else:
if self.last_address and self.broadcast:
self.destination_addr = self.last_address
self.broadcast = False
self.port.connect(self.destination_addr)
# turn a (possible) hostname into an IP address to
# avoid resolving the hostname for every packet sent:
if self.destination_addr[0] != self.resolved_destination_addr:
self.resolved_destination_addr = self.destination_addr[0]
self.destination_addr = (socket.gethostbyname(self.destination_addr[0]), self.destination_addr[1])
self.port.sendto(buf, self.destination_addr)
except socket.error:
pass
def recv_msg(self):
'''message receive routine for UDP link'''
self.pre_message()
s = self.recv()
if len(s) > 0:
if self.first_byte:
self.auto_mavlink_version(s)
m = self.mav.parse_char(s)
if m is not None:
self.post_message(m)
return m
class mavmcast(mavfile):
'''a UDP multicast mavlink socket'''
def __init__(self, device, broadcast=False, source_system=255, source_component=0, use_native=default_native):
a = device.split(':')
mcast_ip = "239.255.145.50"
mcast_port = 14550
if len(a) == 1 and len(a[0]) > 0:
mcast_port = int(a[0])
elif len(a) > 1:
mcast_ip = a[0]
mcast_port = int(a[1])
# first the receiving socket. We use separate sending and receiving
# sockets so we can use the port number of the sending socket to detect
# packets from ourselves
self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.port.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.port.bind((mcast_ip, mcast_port))
mreq = struct.pack("4sl", socket.inet_aton(mcast_ip), socket.INADDR_ANY)
self.port.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
self.port.setblocking(0)
set_close_on_exec(self.port.fileno())
# now the sending socket
self.port_out = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.port_out.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.port_out.setblocking(0)
self.port_out.connect((mcast_ip, mcast_port))
set_close_on_exec(self.port_out.fileno())
self.myport = None
mavfile.__init__(self, self.port.fileno(), device,
source_system=source_system, source_component=source_component,
input=False, use_native=use_native)
def close(self):
self.port.close()
self.port_out.close()
def recv(self,n=None):
try:
data, new_addr = self.port.recvfrom(UDP_MAX_PACKET_LEN)
if self.myport is None:
try:
(myaddr,self.myport) = self.port_out.getsockname()
except Exception:
pass
except socket.error as e:
if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.ECONNREFUSED ]:
return ""
raise
if self.myport == new_addr[1]:
# data from ourselves, discard
return ''
return data
def write(self, buf):
try:
self.port_out.send(buf)
except socket.error as e:
pass
def recv_msg(self):
'''message receive routine for UDP link'''
self.pre_message()
s = self.recv()
if len(s) > 0:
if self.first_byte:
self.auto_mavlink_version(s)
m = self.mav.parse_char(s)
if m is not None:
self.post_message(m)
return m
class mavtcp(mavfile):
'''a TCP mavlink socket'''
def __init__(self,
device,
autoreconnect=False,
source_system=255,
source_component=0,
retries=6,
use_native=default_native):
a = device.split(':')
if len(a) != 2:
print("TCP ports must be specified as host:port")
sys.exit(1)
self.destination_addr = (a[0], int(a[1]))
self.autoreconnect = autoreconnect
self.retries = retries
self.do_connect()
mavfile.__init__(self, self.port.fileno(), "tcp:" + device, source_system=source_system, source_component=source_component, use_native=use_native)
def do_connect(self):
self.port = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
retries = self.retries
if retries <= 0:
# try to connect at least once:
retries = 1
while retries >= 0:
retries -= 1
try:
self.port.connect(self.destination_addr)
break
except Exception as e:
if retries == 0:
if self.port is not None:
self.port.close()
self.port = None
raise e
print(e, "sleeping")
time.sleep(1)
self.port.setblocking(0)
set_close_on_exec(self.port.fileno())
self.port.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
def close(self):
self.port.close()
def handle_disconnect(self):
print("Connection reset or closed by peer on TCP socket")
self.reconnect()
def handle_eof(self):
# EOF
print("EOF on TCP socket")
self.reconnect()
def recv(self,n=None):
if self.port is None:
self.reconnect()
if n is None:
n = self.mav.bytes_needed()
try:
data = self.port.recv(n)
except socket.error as e:
if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK ]:
return ""
if e.errno in [ errno.ECONNRESET, errno.EPIPE ]:
self.handle_disconnect()
raise
if len(data) == 0:
self.handle_eof()
return data
def write(self, buf):
if self.port is None:
try:
self.reconnect()
except socket.error as e:
pass
if self.port is None:
return
try:
self.port.send(buf)
except socket.error as e:
if e.errno in [ errno.ECONNRESET, errno.EPIPE ]:
self.handle_disconnect()
pass
def reconnect(self):
if self.autoreconnect:
print("Attempting reconnect")
if self.port is not None:
self.port.close()
self.port = None
self.do_connect()
class mavtcpin(mavfile):
'''a TCP input mavlink socket'''
def __init__(self, device, source_system=255, source_component=0, retries=3, use_native=default_native):
a = device.split(':')
if len(a) != 2:
print("TCP ports must be specified as host:port")
sys.exit(1)
self.listen = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.listen_addr = (a[0], int(a[1]))
self.listen.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.listen.bind(self.listen_addr)
self.listen.listen(1)
self.listen.setblocking(0)
set_close_on_exec(self.listen.fileno())
self.listen.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
mavfile.__init__(self, self.listen.fileno(), "tcpin:" + device, source_system=source_system, source_component=source_component, use_native=use_native)
self.port = None
def close(self):
self.listen.close()
def recv(self,n=None):
if not self.port:
try:
(self.port, addr) = self.listen.accept()
except Exception:
return ''
self.port.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
self.port.setblocking(0)
set_close_on_exec(self.port.fileno())
self.fd = self.port.fileno()
if n is None:
n = self.mav.bytes_needed()
try:
data = self.port.recv(n)
except socket.error as e:
if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK ]:
return ""
self.port.close()
self.port = None
self.fd = self.listen.fileno()
return ''
return data
def write(self, buf):
if self.port is None:
return
try:
self.port.send(buf)
except socket.error as e:
if e.errno in [ errno.EPIPE ]:
self.port.close()
self.port = None
self.fd = self.listen.fileno()
pass
class mavlogfile(mavfile):
'''a MAVLink logfile reader/writer'''
def __init__(self, filename, planner_format=None,
write=False, append=False,
robust_parsing=True, notimestamps=False, source_system=255, source_component=0, use_native=default_native):
self.filename = filename
self.writeable = write
self.robust_parsing = robust_parsing
self.planner_format = planner_format
self._two64 = math.pow(2.0, 63)
mode = 'rb'
if self.writeable:
if append:
mode = 'ab'
else:
mode = 'wb'
self.f = open(filename, mode)
self.filesize = os.path.getsize(filename)
self.percent = 0
mavfile.__init__(self, None, filename, source_system=source_system, source_component=source_component, notimestamps=notimestamps, use_native=use_native)
if self.notimestamps:
self._timestamp = 0
else:
self._timestamp = time.time()
self.stop_on_EOF = True
self._last_message = None
self._last_timestamp = None
self._link = 0
def close(self):
self.f.close()
def recv(self,n=None):
if n is None:
n = self.mav.bytes_needed()
return self.f.read(n)
def write(self, buf):
self.f.write(buf)
def scan_timestamp(self, tbuf):
'''scan forward looking in a tlog for a timestamp in a reasonable range'''
while True:
(tusec,) = struct.unpack('>Q', tbuf)
t = tusec * 1.0e-6
if abs(t - self._last_timestamp) <= 3*24*60*60:
break
c = self.f.read(1)
if len(c) != 1:
break
tbuf = tbuf[1:] + c
return t
def pre_message(self):
'''read timestamp if needed'''
# read the timestamp
if self.filesize != 0:
self.percent = (100.0 * self.f.tell()) / self.filesize
if self.notimestamps:
return
if self.planner_format:
tbuf = self.f.read(21)
if len(tbuf) != 21 or tbuf[0] != '-' or tbuf[20] != ':':
raise RuntimeError('bad planner timestamp %s' % tbuf)
hnsec = self._two64 + float(tbuf[0:20])
t = hnsec * 1.0e-7 # convert to seconds
t -= 719163 * 24 * 60 * 60 # convert to 1970 base
self._link = 0
else:
tbuf = self.f.read(8)
if len(tbuf) != 8:
return
(tusec,) = struct.unpack('>Q', tbuf)
t = tusec * 1.0e-6
if (self._last_timestamp is not None and
self._last_message.get_type() == "BAD_DATA" and
abs(t - self._last_timestamp) > 3*24*60*60):
t = self.scan_timestamp(tbuf)
self._link = tusec & 0x3
self._timestamp = t
def post_message(self, msg):
'''add timestamp to message'''
# read the timestamp
super(mavlogfile, self).post_message(msg)
if self.planner_format:
self.f.read(1) # trailing newline
self.timestamp = msg._timestamp
self._last_message = msg
if msg.get_type() != "BAD_DATA":
self._last_timestamp = msg._timestamp
msg._link = self._link
class mavmmaplog(mavlogfile):
'''a MAVLink log file accessed via mmap. Used for fast read-only
access with low memory overhead where particular message types are wanted'''
def __init__(self, filename, progress_callback=None):
import platform, mmap
mavlogfile.__init__(self, filename)
self.f.seek(0, 2)
self.data_len = self.f.tell()
self.f.seek(0)
if platform.system() == "Windows":
self.data_map = mmap.mmap(self.f.fileno(), self.data_len, None, mmap.ACCESS_READ)
else:
self.data_map = mmap.mmap(self.f.fileno(), self.data_len, mmap.MAP_PRIVATE, mmap.PROT_READ)
self._rewind()
self.init_arrays(progress_callback)
self._flightmodes = None
def _rewind(self):
'''rewind to start of log'''
self.flightmode = "UNKNOWN"
self.offset = 0
self.type_nums = None
self.f.seek(0)
def rewind(self):
'''rewind to start of log'''
self._rewind()
def init_arrays(self, progress_callback=None):
'''initialise arrays for fast recv_match()'''
# dictionary indexed by msgid, mapping to arrays of file offsets where
# each instance of a msg type is found
self.offsets = {}
# number of msgs of each msg type
self.counts = {}
self._count = 0
# mapping from msg name to msg id
self.name_to_id = {}
# mapping from msg id to name
self.id_to_name = {}
self.type_nums = None
ofs = 0
pct = 0
MARKER_V1 = 0xFE
MARKER_V2 = 0xFD
while ofs+8+6 < self.data_len:
marker = u_ord(self.data_map[ofs+8])
mlen = u_ord(self.data_map[ofs+9]) + 8
if marker == MARKER_V1:
mtype = u_ord(self.data_map[ofs+13])
mlen += 8
elif marker == MARKER_V2:
if ofs+8+10 > self.data_len:
break
mtype = u_ord(self.data_map[ofs+15]) | (u_ord(self.data_map[ofs+16])<<8) | (u_ord(self.data_map[ofs+17])<<16)
mlen += 12
incompat_flags = u_ord(self.data_map[ofs+10])
if incompat_flags & mavlink.MAVLINK_IFLAG_SIGNED:
mlen += mavlink.MAVLINK_SIGNATURE_BLOCK_LEN
else:
# unrecognised marker; probably a malformed log
ofs += 1
continue
if not mtype in self.offsets:
if not mtype in mavlink.mavlink_map:
ofs += mlen
continue
self.offsets[mtype] = []
self.counts[mtype] = 0
msg = mavlink.mavlink_map[mtype]
self.name_to_id[msg.name] = mtype
self.id_to_name[mtype] = msg.name
self.f.seek(ofs)
m = self.recv_msg()
self.messages[msg.name] = m
self.offsets[mtype].append(ofs)
self.counts[mtype] += 1
ofs += mlen
new_pct = (100 * ofs) // self.data_len
if progress_callback is not None and new_pct != pct:
progress_callback(new_pct)
pct = new_pct
for mtype in self.counts:
self._count += self.counts[mtype]
self.offset = 0
self._rewind()
def skip_to_type(self, type):
'''skip fwd to next msg matching given type set'''
if self.type_nums is None:
# always add some key msg types so we can track flightmode, params etc
type = type.copy()
type.update(set(['HEARTBEAT','PARAM_VALUE']))
self.indexes = []
self.type_nums = []
for t in type:
if not t in self.name_to_id:
continue
self.type_nums.append(self.name_to_id[t])
self.indexes.append(0)
smallest_index = -1
smallest_offset = self.data_len
for i in range(len(self.type_nums)):
mtype = self.type_nums[i]
if self.indexes[i] >= self.counts[mtype]:
continue
ofs = self.offsets[mtype][self.indexes[i]]
if ofs < smallest_offset:
smallest_offset = ofs
smallest_index = i
if smallest_index >= 0:
self.indexes[smallest_index] += 1
self.offset = smallest_offset
self.f.seek(smallest_offset)
def recv_match(self, condition=None, type=None, blocking=False, timeout=None):
'''recv the next message that matches the given condition
type can be a string or a list of strings'''
if type is not None:
if isinstance(type, str):
type = set([type])
elif isinstance(type, list):
type = set(type)
while True:
if type is not None:
self.skip_to_type(type)
m = self.recv_msg()
if m is None:
if blocking:
for hook in self.idle_hooks:
hook(self)
if timeout is None:
self.select(0.05)
else:
self.select(timeout/2)
continue
return None
if type is not None and not m.get_type() in type:
continue
if not evaluate_condition(condition, self.messages):
continue
return m
def flightmode_list(self):
'''return an array of tuples for all flightmodes in log. Tuple is (modestring, t0, t1)'''
tstamp = None
fmode = None
if self._flightmodes is None:
self._rewind()
self._flightmodes = []
types = set(['HEARTBEAT'])
while True:
m = self.recv_match(type=types)
if m is None:
break
tstamp = m._timestamp
if self.flightmode == fmode:
continue
if len(self._flightmodes) > 0:
(mode, t0, t1) = self._flightmodes[-1]
self._flightmodes[-1] = (mode, t0, tstamp)
self._flightmodes.append((self.flightmode, tstamp, None))
fmode = self.flightmode
if tstamp is not None:
(mode, t0, t1) = self._flightmodes[-1]
self._flightmodes[-1] = (mode, t0, tstamp)
self._rewind()
return self._flightmodes
class mavchildexec(mavfile):
'''a MAVLink child processes reader/writer'''
def __init__(self, filename, source_system=255, source_component=0, use_native=default_native):
from subprocess import Popen, PIPE
import fcntl
self.filename = filename
self.child = Popen(filename, shell=False, stdout=PIPE, stdin=PIPE, bufsize=0)
self.fd = self.child.stdout.fileno()
fl = fcntl.fcntl(self.fd, fcntl.F_GETFL)
fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
fl = fcntl.fcntl(self.child.stdout.fileno(), fcntl.F_GETFL)
fcntl.fcntl(self.child.stdout.fileno(), fcntl.F_SETFL, fl | os.O_NONBLOCK)
mavfile.__init__(self, self.fd, filename, source_system=source_system, source_component=source_component, use_native=use_native)
def close(self):
self.child.close()
def recv(self,n=None):
try:
x = self.child.stdout.read(1)
except Exception:
return ''
return x
def write(self, buf):
self.child.stdin.write(buf)
def mavlink_connection(device, baud=115200, source_system=255, source_component=0,
planner_format=None, write=False, append=False,
robust_parsing=True, notimestamps=False, input=True,
dialect=None, autoreconnect=False, zero_time_base=False,
retries=3, use_native=default_native,
force_connected=False, progress_callback=None):
'''open a serial, UDP, TCP or file mavlink connection'''
global mavfile_global
if force_connected:
# force_connected implies autoreconnect
autoreconnect = True
if dialect is not None:
set_dialect(dialect)
if device.startswith('tcp:'):
return mavtcp(device[4:],
autoreconnect=autoreconnect,
source_system=source_system,
source_component=source_component,
retries=retries,
use_native=use_native)
if device.startswith('tcpin:'):
return mavtcpin(device[6:], source_system=source_system, source_component=source_component, retries=retries, use_native=use_native)
if device.startswith('udpin:'):
return mavudp(device[6:], input=True, source_system=source_system, source_component=source_component, use_native=use_native)
if device.startswith('udpout:'):
return mavudp(device[7:], input=False, source_system=source_system, source_component=source_component, use_native=use_native)
if device.startswith('udpbcast:'):
return mavudp(device[9:], input=False, source_system=source_system, source_component=source_component, use_native=use_native, broadcast=True)
# For legacy purposes we accept the following syntax and let the caller to specify direction
if device.startswith('udp:'):
return mavudp(device[4:], input=input, source_system=source_system, source_component=source_component, use_native=use_native)
if device.startswith('mcast:'):
return mavmcast(device[6:], source_system=source_system, source_component=source_component, use_native=use_native)
if device.lower().endswith('.bin') or device.lower().endswith('.px4log'):
# support dataflash logs
from pymavlink import DFReader
m = DFReader.DFReader_binary(device, zero_time_base=zero_time_base, progress_callback=progress_callback)
mavfile_global = m
return m
if device.endswith('.log'):
# support dataflash text logs
from pymavlink import DFReader
if DFReader.DFReader_is_text_log(device):
m = DFReader.DFReader_text(device, zero_time_base=zero_time_base, progress_callback=progress_callback)
mavfile_global = m
return m
# list of suffixes to prevent setting DOS paths as UDP sockets
logsuffixes = ['mavlink', 'log', 'raw', 'tlog' ]
suffix = device.split('.')[-1].lower()
if device.find(':') != -1 and not suffix in logsuffixes:
return mavudp(device, source_system=source_system, source_component=source_component, input=input, use_native=use_native)
if os.path.isfile(device):
if device.endswith(".elf") or device.find("/bin/") != -1:
print("executing '%s'" % device)
return mavchildexec(device, source_system=source_system, source_component=source_component, use_native=use_native)
elif not write and not append and not notimestamps:
return mavmmaplog(device, progress_callback=progress_callback)
else:
return mavlogfile(device, planner_format=planner_format, write=write,
append=append, robust_parsing=robust_parsing, notimestamps=notimestamps,
source_system=source_system, source_component=source_component, use_native=use_native)
return mavserial(device,
baud=baud,
source_system=source_system,
source_component=source_component,
autoreconnect=autoreconnect,
use_native=use_native,
force_connected=force_connected)
class periodic_event(object):
'''a class for fixed frequency events'''
def __init__(self, frequency):
self.frequency = float(frequency)
self.last_time = time.time()
def force(self):
'''force immediate triggering'''
self.last_time = 0
def trigger(self):
'''return True if we should trigger now'''
tnow = time.time()
if tnow < self.last_time:
print("Warning, time moved backwards. Restarting timer.")
self.last_time = tnow
if self.last_time + (1.0/self.frequency) <= tnow:
self.last_time = tnow
return True
return False
try:
from curses import ascii
have_ascii = True
except:
have_ascii = False
def is_printable(c):
'''see if a character is printable'''
global have_ascii
if have_ascii:
return ascii.isprint(c)
if isinstance(c, int):
ic = c
else:
ic = ord(c)
return ic >= 32 and ic <= 126
def all_printable(buf):
'''see if a string is all printable'''
for c in buf:
if not is_printable(c) and not c in ['\r', '\n', '\t'] and not c in [ord('\r'), ord('\n'), ord('\t')]:
return False
return True
class SerialPort(object):
'''auto-detected serial port'''
def __init__(self, device, description=None, hwid=None):
self.device = device
self.description = description
self.hwid = hwid
def __str__(self):
ret = self.device
if self.description is not None:
ret += " : " + self.description
if self.hwid is not None:
ret += " : " + self.hwid
return ret
def auto_detect_serial_win32(preferred_list=['*']):
'''try to auto-detect serial ports on win32'''
try:
from serial.tools.list_ports_windows import comports
list = sorted(comports())
except:
return []
ret = []
others = []
for port, description, hwid in list:
matches = False
p = SerialPort(port, description=description, hwid=hwid)
for preferred in preferred_list:
if fnmatch.fnmatch(description, preferred) or fnmatch.fnmatch(hwid, preferred):
matches = True
if matches:
ret.append(p)
else:
others.append(p)
if len(ret) > 0:
return ret
# now the rest
ret.extend(others)
return ret
def auto_detect_serial_unix(preferred_list=['*']):
'''try to auto-detect serial ports on unix'''
import glob
glist = glob.glob('/dev/ttyS*') + glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*') + glob.glob('/dev/serial/by-id/*')
ret = []
others = []
# try preferred ones first
for d in glist:
matches = False
for preferred in preferred_list:
if fnmatch.fnmatch(d, preferred):
matches = True
if matches:
ret.append(SerialPort(d))
else:
others.append(SerialPort(d))
if len(ret) > 0:
return ret
ret.extend(others)
return ret
def auto_detect_serial(preferred_list=['*']):
'''try to auto-detect serial port'''
# see if
if os.name == 'nt':
return auto_detect_serial_win32(preferred_list=preferred_list)
return auto_detect_serial_unix(preferred_list=preferred_list)
def mode_string_v09(msg):
'''mode string for 0.9 protocol'''
mode = msg.mode
nav_mode = msg.nav_mode
MAV_MODE_UNINIT = 0
MAV_MODE_MANUAL = 2
MAV_MODE_GUIDED = 3
MAV_MODE_AUTO = 4
MAV_MODE_TEST1 = 5
MAV_MODE_TEST2 = 6
MAV_MODE_TEST3 = 7
MAV_NAV_GROUNDED = 0
MAV_NAV_LIFTOFF = 1
MAV_NAV_HOLD = 2
MAV_NAV_WAYPOINT = 3
MAV_NAV_VECTOR = 4
MAV_NAV_RETURNING = 5
MAV_NAV_LANDING = 6
MAV_NAV_LOST = 7
MAV_NAV_LOITER = 8
cmode = (mode, nav_mode)
mapping = {
(MAV_MODE_UNINIT, MAV_NAV_GROUNDED) : "INITIALISING",
(MAV_MODE_MANUAL, MAV_NAV_VECTOR) : "MANUAL",
(MAV_MODE_TEST3, MAV_NAV_VECTOR) : "CIRCLE",
(MAV_MODE_GUIDED, MAV_NAV_VECTOR) : "GUIDED",
(MAV_MODE_TEST1, MAV_NAV_VECTOR) : "STABILIZE",
(MAV_MODE_TEST2, MAV_NAV_LIFTOFF) : "FBWA",
(MAV_MODE_AUTO, MAV_NAV_WAYPOINT) : "AUTO",
(MAV_MODE_AUTO, MAV_NAV_RETURNING) : "RTL",
(MAV_MODE_AUTO, MAV_NAV_LOITER) : "LOITER",
(MAV_MODE_AUTO, MAV_NAV_LIFTOFF) : "TAKEOFF",
(MAV_MODE_AUTO, MAV_NAV_LANDING) : "LANDING",
(MAV_MODE_AUTO, MAV_NAV_HOLD) : "LOITER",
(MAV_MODE_GUIDED, MAV_NAV_VECTOR) : "GUIDED",
(MAV_MODE_GUIDED, MAV_NAV_WAYPOINT) : "GUIDED",
(100, MAV_NAV_VECTOR) : "STABILIZE",
(101, MAV_NAV_VECTOR) : "ACRO",
(102, MAV_NAV_VECTOR) : "ALT_HOLD",
(107, MAV_NAV_VECTOR) : "CIRCLE",
(109, MAV_NAV_VECTOR) : "LAND",
}
if cmode in mapping:
return mapping[cmode]
return "Mode(%s,%s)" % cmode
mode_mapping_apm = {
0 : 'MANUAL',
1 : 'CIRCLE',
2 : 'STABILIZE',
3 : 'TRAINING',
4 : 'ACRO',
5 : 'FBWA',
6 : 'FBWB',
7 : 'CRUISE',
8 : 'AUTOTUNE',
10 : 'AUTO',
11 : 'RTL',
12 : 'LOITER',
13 : 'TAKEOFF',
14 : 'AVOID_ADSB',
15 : 'GUIDED',
16 : 'INITIALISING',
17 : 'QSTABILIZE',
18 : 'QHOVER',
19 : 'QLOITER',
20 : 'QLAND',
21 : 'QRTL',
22 : 'QAUTOTUNE',
23 : 'QACRO',
}
mode_mapping_acm = {
0 : 'STABILIZE',
1 : 'ACRO',
2 : 'ALT_HOLD',
3 : 'AUTO',
4 : 'GUIDED',
5 : 'LOITER',
6 : 'RTL',
7 : 'CIRCLE',
8 : 'POSITION',
9 : 'LAND',
10 : 'OF_LOITER',
11 : 'DRIFT',
13 : 'SPORT',
14 : 'FLIP',
15 : 'AUTOTUNE',
16 : 'POSHOLD',
17 : 'BRAKE',
18 : 'THROW',
19 : 'AVOID_ADSB',
20 : 'GUIDED_NOGPS',
21 : 'SMART_RTL',
22 : 'FLOWHOLD',
23 : 'FOLLOW',
24 : 'ZIGZAG',
}
mode_mapping_rover = {
0 : 'MANUAL',
1 : 'ACRO',
2 : 'LEARNING',
3 : 'STEERING',
4 : 'HOLD',
5 : 'LOITER',
6 : 'FOLLOW',
7 : 'SIMPLE',
10 : 'AUTO',
11 : 'RTL',
12 : 'SMART_RTL',
15 : 'GUIDED',
16 : 'INITIALISING'
}
mode_mapping_tracker = {
0 : 'MANUAL',
1 : 'STOP',
2 : 'SCAN',
4 : 'GUIDED',
10 : 'AUTO',
16 : 'INITIALISING'
}
mode_mapping_sub = {
0: 'STABILIZE',
1: 'ACRO',
2: 'ALT_HOLD',
3: 'AUTO',
4: 'GUIDED',
7: 'CIRCLE',
9: 'SURFACE',
16: 'POSHOLD',
19: 'MANUAL',
}
# map from a PX4 "main_state" to a string; see msg/commander_state.msg
# This allows us to map sdlog STAT.MainState to a simple "mode"
# string, used in DFReader and possibly other places. These are
# related but distict from what is found in mavlink messages; see
# "Custom mode definitions", below.
mainstate_mapping_px4 = {
0 : 'MANUAL',
1 : 'ALTCTL',
2 : 'POSCTL',
3 : 'AUTO_MISSION',
4 : 'AUTO_LOITER',
5 : 'AUTO_RTL',
6 : 'ACRO',
7 : 'OFFBOARD',
8 : 'STAB',
9 : 'RATTITUDE',
10 : 'AUTO_TAKEOFF',
11 : 'AUTO_LAND',
12 : 'AUTO_FOLLOW_TARGET',
13 : 'MAX',
}
def mode_string_px4(MainState):
return mainstate_mapping_px4.get(MainState, "Unknown")
# Custom mode definitions from PX4
PX4_CUSTOM_MAIN_MODE_MANUAL = 1
PX4_CUSTOM_MAIN_MODE_ALTCTL = 2
PX4_CUSTOM_MAIN_MODE_POSCTL = 3
PX4_CUSTOM_MAIN_MODE_AUTO = 4
PX4_CUSTOM_MAIN_MODE_ACRO = 5
PX4_CUSTOM_MAIN_MODE_OFFBOARD = 6
PX4_CUSTOM_MAIN_MODE_STABILIZED = 7
PX4_CUSTOM_MAIN_MODE_RATTITUDE = 8
PX4_CUSTOM_SUB_MODE_OFFBOARD = 0
PX4_CUSTOM_SUB_MODE_AUTO_READY = 1
PX4_CUSTOM_SUB_MODE_AUTO_TAKEOFF = 2
PX4_CUSTOM_SUB_MODE_AUTO_LOITER = 3
PX4_CUSTOM_SUB_MODE_AUTO_MISSION = 4
PX4_CUSTOM_SUB_MODE_AUTO_RTL = 5
PX4_CUSTOM_SUB_MODE_AUTO_LAND = 6
PX4_CUSTOM_SUB_MODE_AUTO_RTGS = 7
PX4_CUSTOM_SUB_MODE_AUTO_FOLLOW_TARGET = 8
auto_mode_flags = mavlink.MAV_MODE_FLAG_AUTO_ENABLED \
| mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED \
| mavlink.MAV_MODE_FLAG_GUIDED_ENABLED
px4_map = { "MANUAL": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_MANUAL, 0 ),
"STABILIZED": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_STABILIZED, 0 ),
"ACRO": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_ACRO, 0 ),
"RATTITUDE": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_RATTITUDE, 0 ),
"ALTCTL": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_ALTCTL, 0 ),
"POSCTL": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | mavlink.MAV_MODE_FLAG_STABILIZE_ENABLED | mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED, PX4_CUSTOM_MAIN_MODE_POSCTL, 0 ),
"LOITER": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_LOITER ),
"MISSION": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_MISSION ),
"RTL": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_RTL ),
"LAND": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_LAND ),
"RTGS": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_RTGS ),
"FOLLOWME": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_FOLLOW_TARGET ),
"OFFBOARD": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_OFFBOARD, 0 ),
"TAKEOFF": (mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED | auto_mode_flags, PX4_CUSTOM_MAIN_MODE_AUTO, PX4_CUSTOM_SUB_MODE_AUTO_TAKEOFF )}
def interpret_px4_mode(base_mode, custom_mode):
custom_main_mode = (custom_mode & 0xFF0000) >> 16
custom_sub_mode = (custom_mode & 0xFF000000) >> 24
if base_mode & mavlink.MAV_MODE_FLAG_MANUAL_INPUT_ENABLED != 0: #manual modes
if custom_main_mode == PX4_CUSTOM_MAIN_MODE_MANUAL:
return "MANUAL"
elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_ACRO:
return "ACRO"
elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_RATTITUDE:
return "RATTITUDE"
elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_STABILIZED:
return "STABILIZED"
elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_ALTCTL:
return "ALTCTL"
elif custom_main_mode == PX4_CUSTOM_MAIN_MODE_POSCTL:
return "POSCTL"
elif (base_mode & auto_mode_flags) == auto_mode_flags: #auto modes
if custom_main_mode & PX4_CUSTOM_MAIN_MODE_AUTO != 0:
if custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_MISSION:
return "MISSION"
elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_TAKEOFF:
return "TAKEOFF"
elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_LOITER:
return "LOITER"
elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_FOLLOW_TARGET:
return "FOLLOWME"
elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_RTL:
return "RTL"
elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_LAND:
return "LAND"
elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_AUTO_RTGS:
return "RTGS"
elif custom_sub_mode == PX4_CUSTOM_SUB_MODE_OFFBOARD:
return "OFFBOARD"
return "UNKNOWN"
def mode_mapping_byname(mav_type):
'''return dictionary mapping mode names to numbers, or None if unknown'''
map = None
if mav_type in [mavlink.MAV_TYPE_QUADROTOR,
mavlink.MAV_TYPE_HELICOPTER,
mavlink.MAV_TYPE_HEXAROTOR,
mavlink.MAV_TYPE_OCTOROTOR,
mavlink.MAV_TYPE_COAXIAL,
mavlink.MAV_TYPE_TRICOPTER]:
map = mode_mapping_acm
if mav_type == mavlink.MAV_TYPE_FIXED_WING:
map = mode_mapping_apm
if mav_type == mavlink.MAV_TYPE_GROUND_ROVER:
map = mode_mapping_rover
if mav_type == mavlink.MAV_TYPE_SURFACE_BOAT:
map = mode_mapping_rover # for the time being
if mav_type == mavlink.MAV_TYPE_ANTENNA_TRACKER:
map = mode_mapping_tracker
if mav_type == mavlink.MAV_TYPE_SUBMARINE:
map = mode_mapping_sub
if map is None:
return None
inv_map = dict((a, b) for (b, a) in map.items())
return inv_map
def mode_mapping_bynumber(mav_type):
'''return dictionary mapping mode numbers to name, or None if unknown'''
map = None
if mav_type in [mavlink.MAV_TYPE_QUADROTOR,
mavlink.MAV_TYPE_HELICOPTER,
mavlink.MAV_TYPE_HEXAROTOR,
mavlink.MAV_TYPE_OCTOROTOR,
mavlink.MAV_TYPE_DODECAROTOR,
mavlink.MAV_TYPE_COAXIAL,
mavlink.MAV_TYPE_TRICOPTER]:
map = mode_mapping_acm
if mav_type == mavlink.MAV_TYPE_FIXED_WING:
map = mode_mapping_apm
if mav_type == mavlink.MAV_TYPE_GROUND_ROVER:
map = mode_mapping_rover
if mav_type == mavlink.MAV_TYPE_SURFACE_BOAT:
map = mode_mapping_rover # for the time being
if mav_type == mavlink.MAV_TYPE_ANTENNA_TRACKER:
map = mode_mapping_tracker
if mav_type == mavlink.MAV_TYPE_SUBMARINE:
map = mode_mapping_sub
if map is None:
return None
return map
def mode_string_v10(msg):
'''mode string for 1.0 protocol, from heartbeat'''
if msg.autopilot == mavlink.MAV_AUTOPILOT_PX4:
return interpret_px4_mode(msg.base_mode, msg.custom_mode)
if not msg.base_mode & mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED:
return "Mode(0x%08x)" % msg.base_mode
if msg.type in [ mavlink.MAV_TYPE_QUADROTOR, mavlink.MAV_TYPE_HEXAROTOR,
mavlink.MAV_TYPE_OCTOROTOR, mavlink.MAV_TYPE_TRICOPTER,
mavlink.MAV_TYPE_COAXIAL,
mavlink.MAV_TYPE_HELICOPTER ]:
if msg.custom_mode in mode_mapping_acm:
return mode_mapping_acm[msg.custom_mode]
if msg.type == mavlink.MAV_TYPE_FIXED_WING:
if msg.custom_mode in mode_mapping_apm:
return mode_mapping_apm[msg.custom_mode]
if msg.type == mavlink.MAV_TYPE_GROUND_ROVER:
if msg.custom_mode in mode_mapping_rover:
return mode_mapping_rover[msg.custom_mode]
if msg.type == mavlink.MAV_TYPE_SURFACE_BOAT:
if msg.custom_mode in mode_mapping_rover:
return mode_mapping_rover[msg.custom_mode]
if msg.type == mavlink.MAV_TYPE_ANTENNA_TRACKER:
if msg.custom_mode in mode_mapping_tracker:
return mode_mapping_tracker[msg.custom_mode]
if msg.type == mavlink.MAV_TYPE_SUBMARINE:
if msg.custom_mode in mode_mapping_sub:
return mode_mapping_sub[msg.custom_mode]
return "Mode(%u)" % msg.custom_mode
def mode_string_apm(mode_number):
'''return mode string for APM:Plane'''
if mode_number in mode_mapping_apm:
return mode_mapping_apm[mode_number]
return "Mode(%u)" % mode_number
def mode_string_acm(mode_number):
'''return mode string for APM:Copter'''
if mode_number in mode_mapping_acm:
return mode_mapping_acm[mode_number]
return "Mode(%u)" % mode_number
class x25crc(object):
'''x25 CRC - based on checksum.h from mavlink library'''
def __init__(self, buf=''):
self.crc = 0xffff
self.accumulate(buf)
def accumulate(self, buf):
'''add in some more bytes'''
byte_buf = array.array('B')
if isinstance(buf, array.array):
byte_buf.extend(buf)
else:
byte_buf.fromstring(buf)
accum = self.crc
for b in byte_buf:
tmp = b ^ (accum & 0xff)
tmp = (tmp ^ (tmp<<4)) & 0xFF
accum = (accum>>8) ^ (tmp<<8) ^ (tmp<<3) ^ (tmp>>4)
accum = accum & 0xFFFF
self.crc = accum
class MavlinkSerialPort(object):
'''an object that looks like a serial port, but
transmits using mavlink SERIAL_CONTROL packets'''
def __init__(self, portname, baudrate, devnum=0, devbaud=0, timeout=3, debug=0):
from . import mavutil
self.baudrate = 0
self.timeout = timeout
self._debug = debug
self.buf = ''
self.port = devnum
self.debug("Connecting with MAVLink to %s ..." % portname)
self.mav = mavutil.mavlink_connection(portname, autoreconnect=True, baud=baudrate)
self.mav.wait_heartbeat()
self.debug("HEARTBEAT OK\n")
if devbaud != 0:
self.setBaudrate(devbaud)
self.debug("Locked serial device\n")
def debug(self, s, level=1):
'''write some debug text'''
if self._debug >= level:
print(s)
def write(self, b):
'''write some bytes'''
from . import mavutil
self.debug("sending '%s' (0x%02x) of len %u\n" % (b, ord(b[0]), len(b)), 2)
while len(b) > 0:
n = len(b)
if n > 70:
n = 70
buf = [ord(x) for x in b[:n]]
buf.extend([0]*(70-len(buf)))
self.mav.mav.serial_control_send(self.port,
mavutil.mavlink.SERIAL_CONTROL_FLAG_EXCLUSIVE |
mavutil.mavlink.SERIAL_CONTROL_FLAG_RESPOND,
0,
0,
n,
buf)
b = b[n:]
def _recv(self):
'''read some bytes into self.buf'''
from . import mavutil
start_time = time.time()
while time.time() < start_time + self.timeout:
m = self.mav.recv_match(condition='SERIAL_CONTROL.count!=0',
type='SERIAL_CONTROL', blocking=False, timeout=0)
if m is not None and m.count != 0:
break
self.mav.mav.serial_control_send(self.port,
mavutil.mavlink.SERIAL_CONTROL_FLAG_EXCLUSIVE |
mavutil.mavlink.SERIAL_CONTROL_FLAG_RESPOND,
0,
0,
0, [0]*70)
m = self.mav.recv_match(condition='SERIAL_CONTROL.count!=0',
type='SERIAL_CONTROL', blocking=True, timeout=0.01)
if m is not None and m.count != 0:
break
if m is not None:
if self._debug > 2:
print(m)
data = m.data[:m.count]
self.buf += ''.join(str(chr(x)) for x in data)
def read(self, n):
'''read some bytes'''
if len(self.buf) == 0:
self._recv()
if len(self.buf) > 0:
if n > len(self.buf):
n = len(self.buf)
ret = self.buf[:n]
self.buf = self.buf[n:]
if self._debug >= 2:
for b in ret:
self.debug("read 0x%x" % ord(b), 2)
return ret
return ''
def flushInput(self):
'''flush any pending input'''
self.buf = ''
saved_timeout = self.timeout
self.timeout = 0.5
self._recv()
self.timeout = saved_timeout
self.buf = ''
self.debug("flushInput")
def setBaudrate(self, baudrate):
'''set baudrate'''
from . import mavutil
if self.baudrate == baudrate:
return
self.baudrate = baudrate
self.mav.mav.serial_control_send(self.port,
mavutil.mavlink.SERIAL_CONTROL_FLAG_EXCLUSIVE,
0,
self.baudrate,
0, [0]*70)
self.flushInput()
self.debug("Changed baudrate %u" % self.baudrate)
def decode_bitmask(messagetype, field, value):
try:
_class = eval("mavlink.MAVLink_%s_message" % messagetype.lower())
except AttributeError as e:
raise AttributeError("No such message type")
try:
display = _class.fielddisplays_by_name[field]
except KeyError as e:
raise AttributeError("Not a bitmask field (none specified)")
if display != "bitmask":
raise ValueError("Not a bitmask field")
try:
enum_name = _class.fieldenums_by_name[field]
except KeyError as e:
raise AttributeError("No enum found for bitmask field")
try:
enum = mavlink.enums[enum_name]
except KeyError as e:
raise AttributeError("Did not find specified enumeration (%s)" % enum_name)
class EnumBitInfo(object):
def __init__(self, offset, value, name):
self.offset = offset
self.value = value
self.name = name
ret = []
for i in range(0, 64):
bit_value = (1 << i)
try:
enum_entry = enum[bit_value]
enum_entry_name = enum_entry.name
except KeyError as e:
enum_entry_name = None
if value == 0:
continue
if value & bit_value:
ret.append( EnumBitInfo(i, True, enum_entry_name) )
value = value & ~bit_value
else:
ret.append( EnumBitInfo(i, False, enum_entry_name) )
return ret
def dump_message_verbose(f, m):
'''write an excruciatingly detailed dump of message m to file descriptor f'''
try:
# __getattr__ may be overridden on m, thus this try/except
timestamp = m._timestamp
except AttributeError as e:
timestamp = ""
if timestamp != "":
timestamp = "%s.%02u: " % (
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)),
int(timestamp*100.0)%100)
f.write("%s%s (link=%s) (signed=%s) (seq=%u) (src=%u/%u)\n" % (timestamp, m.get_type(), str(m.get_link_id()), str(m.get_signed()), m.get_seq(), m.get_srcSystem(), m.get_srcComponent()))
for fieldname in m.get_fieldnames():
# format in those most boring way possible:
value = m.format_attr(fieldname)
# try to add units:
try:
units = m.fieldunits_by_name[fieldname]
# perform simple unit conversions:
divisor = None
if units == "d%":
divisor = 10.0
units = "%"
if units == "c%":
divisor = 100.0
units = "%"
if units == "cA":
divisor = 100.0
units = "A"
elif units == "cdegC":
divisor = 100.0
units = "degC"
elif units == "cdeg":
divisor = 100.0
units = "deg"
elif units == "degE7":
divisor = 10000000.0
units = "deg"
elif units == "mG":
divisor = 1000.0
units = "G"
elif units == "mrad/s":
divisor = 1000.0
units = "rad/s"
elif units == "mV":
divisor = 1000.0
units = "V"
if divisor is not None:
if type(value) == list:
value = [x/divisor for x in value]
else:
value = value / divisor
# and give radians in degrees too:
if units == "rad":
value = "%s%s (%sdeg)" % (value, units, math.degrees(value))
elif units == "rad/s":
value = "%s%s (%sdeg/s)" % (value, units, math.degrees(value))
elif units == "rad/s/s":
value = "%s%s (%sdeg/s/s)" % (value, units, math.degrees(value))
else:
value = "%s%s" % (value, units)
except AttributeError as e:
# e.g. BAD_DATA
pass
except KeyError as e:
pass
# format any bitmask enumerations:
try:
enum_name = m.fieldenums_by_name[fieldname]
display = m.fielddisplays_by_name[fieldname]
if enum_name is not None and display == "bitmask":
bits = decode_bitmask(m.get_type(), fieldname, value)
f.write(" %s: %s\n" % (fieldname, value))
for bit in bits:
value = bit.value
name = bit.name
svalue = " "
if not value:
svalue = "!"
if name is None:
name = "[UNKNOWN]"
f.write(" %s %s\n" % (svalue, name))
continue
# except NameError as e:
# pass
except AttributeError as e:
# e.g. BAD_DATA
pass
except KeyError as e:
pass
# add any enumeration name:
try:
enum_name = m.fieldenums_by_name[fieldname]
try:
enum_value = mavlink.enums[enum_name][value].name
value = "%s (%s)" % (value, enum_value)
except KeyError as e:
value = "%s (%s)" % (value, "[UNKNOWN]")
except AttributeError as e:
# e.g. BAD_DATA
pass
except KeyError as e:
pass
f.write(" %s: %s\n" % (fieldname, value))
if __name__ == '__main__':
serial_list = auto_detect_serial(preferred_list=['*FTDI*',"*Arduino_Mega_2560*", "*3D_Robotics*", "*USB_to_UART*", '*PX4*', '*FMU*'])
for port in serial_list:
print("%s" % port)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/shaopengyuan/pymavlink.git
git@gitee.com:shaopengyuan/pymavlink.git
shaopengyuan
pymavlink
pymavlink
master

搜索帮助