1 Star 0 Fork 0

LMcallme/LearnRosTutorials

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
learn.py 8.02 KB
一键复制 编辑 原始数据 按行查看 历史
LM 提交于 2016-09-19 21:57 . * 在 liunx 上运行成功
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Created by lm.91@qq.com on 2016/9/8
import os
import sys
import json
import logging
import platform
import shutil
from docker import Client
__author__ = 'lm.91@qq.com'
cli = Client()
# 将日志同时输出到文件和屏幕
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d]%(levelname)s %(message)s',
datefmt='%a, %d %b %Y %H:%M:%S',
filename=__file__ + '.log',
filemode='a')
##########################################################################
# 定义一个StreamHandler,将INFO级别或更高的日志信息打印到标准错误,并将其添加到当前的日志处理对象#
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
##########################################################################
def check_network(bridge):
if not exists_network(bridge):
create_network(bridge)
def exists_network(bridge):
for network in cli.networks():
if network['Name'] == bridge:
return True
return False
def create_network(bridge):
# cli.create_network(bridge, driver="bridge")
cli.create_network(bridge)
def exists_image(name):
images = cli.images(all=True)
for image in images:
if name+":latest" in image["RepoTags"]:
logging.info("exists image %s" % name)
return True
logging.info("not exists image %s" % name)
return False
def exists_container(name):
containers = cli.containers(all=True)
for container in containers:
if '/' + name in container["Names"]:
logging.info("exists container %s" % name)
return True
logging.info("not exists container %s" % name)
return False
def remove_dangling_image():
logging.info("removing dangling images")
for image in cli.images(filters={"dangling": True}):
try:
cli.remove_image(image['Id'])
finally:
pass
logging.info("removed dangling images")
def build_image(name, debug=True):
logging.info("building docker image %s" % name)
path = os.path.abspath(os.curdir)
tag = name
rm = True # 去除中间容器
generator = cli.build(path=path, rm=rm, tag=tag)
for line in generator:
line = line.decode()
try:
stream_line = json.loads(line)
line = stream_line["stream"]
logging.info(line)
except:
logging.info(line)
if not debug:
remove_dangling_image()
def create_container(name, image, hostname=None, bridge=None, master=None):
logging.info("creating container %s use image %s" % (name, image))
path = os.path.abspath(os.curdir)
if master is None:
master = 'master'
if hostname is not None:
path = os.path.join(path, 'ros', hostname)
if not os.path.exists(path):
masterDir = os.path.join(
os.path.abspath(os.curdir), 'ros','master'
)
shutil.copytree(masterDir, path) # 假定 ./ros/master 文件夹存在
if bridge is not None:
networking_config = cli.create_networking_config({
bridge: cli.create_endpoint_config()})
else:
networking_config = None
binds = {
path: {
'bind': '/home/ros',
'mode': 'rw'
}
}
environment = {
"ROS_HOSTNAME": hostname,
"ROS_MASTER_URI": "http://%s:11311" % master
}
if platform.system() == "Windows":
environment.update({"DISPLAY": "10.9.2.35:0"})
elif platform.system() == "Linux":
environment.update({
"DISPLAY": ":0.0",
"QT_X11_NO_MITSHM": "1"
})
binds.update({"/tmp/.X11-unix": {
'bind': '/tmp/.X11-unix',
'mode': 'rw'
}})
host_config = cli.create_host_config(binds=binds)
container_id = cli.create_container(
name=name,
image=image,
detach=False, # 后台运行
stdin_open=True,
tty=True,
working_dir='/home/ros',
hostname=hostname,
networking_config=networking_config,
environment=environment,
ports=[(22, 'tcp')], # 容器开放的端口
volumes=['/home/ros', '/tmp/.X11-unix'], # 容器的卷路径
host_config=host_config
)
logging.info("created container %s use image %s" % (name, image))
return container_id # 返回容器的ID
def container_is_running(name):
containers = cli.containers()
for container in containers:
if '/' + name in container["Names"]:
logging.info("container %s is running" % name)
return True
logging.info("container %s is not running" % name)
return False
def start_container(name):
logging.info("starting container %s" % name)
cli.start(name)
logging.info("started container %s" % name)
def stop_container(name):
logging.info("stopping container %s" % name)
cli.stop(name)
logging.info("stopped container %s" % name)
def remove_container(name):
logging.info("removing container %s" % name)
cli.remove_container(name)
logging.info("removed container %s" % name)
def force_removed_container(containerName, imageName=None):
if containerName is not None:
if container_is_running(containerName):
stop_container(containerName)
remove_container(containerName)
else:
if imageName is not None:
for containerName in containers_used_image(imageName):
force_removed_container(containerName)
def containers_used_image(imageName):
result = []
for containers in cli.containers(all=True):
if imageName == containers['Image']:
for containerName in containers['Names']:
result.append(containerName.replace('/', ''))
return result
def exec_container(containerName):
from subprocess import call
if not container_is_running(containerName):
start_container(containerName)
os.system("docker exec -u root %s chown -R ros /home/ros" % containerName)
# bash = cli.exec_create(containerName,
# '/bin/bash',
# stdout=True,
# stderr=True,
# stdin=True,
# tty=True,
# user='ros')
# cli.exec_start(bash,tty=True)
# call("docker exec -u ros -it %s /bin/bash" % containerName)
os.system("docker exec -u ros -it %s /bin/bash" % containerName)
if __name__ == '__main__':
myName = "lmcallme"
imageName = myName + "/ros-indigo-tutorials"
containerName = myName + "%s"
bridge = myName + "_ros"
master = containerName % "master"
if not exists_image(imageName):
build_image(imageName)
check_network(bridge)
if not exists_container(master):
create_container(
master,
imageName,
hostname=master,
bridge=bridge,
master=master)
if not container_is_running(master):
start_container(master)
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('name', type=str, nargs="?",
help='ros node container %s' % imageName)
args = parser.parse_args()
if args.name:
if args.name == 'remove':
force_removed_container(containerName=None, imageName=imageName)
sys.exit(0)
hostname = args.name
containerName = containerName % hostname
if not exists_container(containerName):
create_container(
containerName,
imageName,
hostname=containerName,
bridge=bridge,
master=master)
if not container_is_running(containerName):
start_container(containerName)
exec_container(containerName)
else:
exec_container(master)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/LMcallme/LearnRosTutorials.git
git@gitee.com:LMcallme/LearnRosTutorials.git
LMcallme
LearnRosTutorials
LearnRosTutorials
master

搜索帮助

0d507c66 1850385 C8b1a773 1850385