1 Star 0 Fork 0

李春鹏/python_test

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
k8s-01.py 6.99 KB
一键复制 编辑 原始数据 按行查看 历史
李春鹏 提交于 2023-02-26 12:18 . first commit
import datetime
from kubernetes import client, config
import json
import pytz
config.kube_config.load_kube_config(config_file="/root/.kube/config")
core = client.CoreV1Api()
api = client.AppsV1Api()
def print_json(obj):
json_str = json.dumps(obj, indent=4, ensure_ascii=False)
print(json_str)
# 获取所有的namespace
def list_namespace():
result = []
for ns in core.list_namespace().items:
result.append(ns.metadata.name)
print_json(result)
# 按照namespace获取deployment
def list_deploy(ns):
result = []
# ret = api.list_deployment_for_all_namespaces(watch=False)
ret = api.list_namespaced_deployment(namespace=ns, watch=False)
for i in ret.items:
print(i)
break
images = [x.image for x in i.spec.template.spec.containers]
info = {'namespace': i.metadata.namespace, 'name': i.metadata.name, 'images': images}
result.append(info)
print_json(result)
# 根据deployment获取newreplicaSet
# def get_newReplicaSet(name):
# result = []
# ret = api.list_namespaced_replica_set(namespace="default",)
# 获取label获取对应的pod状态
def list_pod(labels=None):
result = []
# ret = api.list_deployment_for_all_namespaces(watch=False)
# ret = core.read_namespaced_pod(name="ssss-dd8446dd6-d4qsc", namespace="default", pertty=True)
# 拼接过滤条件,将过滤条件{"appid": "xxx", "appname": "yyy"}拼接成字符串,格式为appid=xxx,appname=yyy
labels_str = ''
if labels:
labels_list = []
for key, value in labels.items():
labels_list.append('{0}={1}'.format(key, value))
labels_str = ','.join(labels_list)
ret = core.list_namespaced_pod(namespace="default", label_selector=labels_str)
# ret = co(name="ssss-dd8446dd6-d4qsc", namespace="default")
for i in ret.items:
print(i.metadata.name, i.status.start_time, i.status.phase)
# print()
# break
# for i in ret.items:
# # images = [x.image for x in i.spec.template.spec.containers]
# # info = {'namespace': i.metadata.namespace, 'name': i.metadata.name, 'images': images}
# # result.append(info)
# print(i)
# break
# 获取容器日志
# 参数说明:
# config_path:k8s集群token文件路径
# namespace:k8s namespace
# pod_name:pod名称
# container:容器名称
# previous:是否获取重启前的容器日志
# 获取容器的日志信息
def read_pod_container_log(namespace, pod_name, container="",previous=False):
ret = core.read_namespaced_pod_log(namespace=namespace, name=pod_name, container=container,previous=previous, pretty=True)
print(ret)
# list_pod({"k8s-app":"ssss"})
# list_deploy("default")
#
# for ns in v1.list_namespace().items:
# print(ns)
# read_pod_container_log("default", "ssss-dd8446dd6-d4qsc")
# res = v1.list_namespace()
# for i in res.items:
# print(i.metadata.name, i.metadata.creation_timestamp)
# # break
# 创建一个deployment对象
def create_deployment_obj(name):
# 配置pod
container = client.V1Container(
name="python-test",
image="python:3.9",
)
# 创建和配置模板
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": "test-python39"}),
spec=client.V1PodSpec(containers=[container])
)
# 创建deployment
spec = client.V1DeploymentSpec(
replicas=3,
template=template,
selector={
"matchLabels": {"app": "test-python39"},
}
)
# 配置元数据
metadata = client.V1ObjectMeta(
name=name,
labels={"app": "test-python39"}
)
# 实例化deployment对象
deployment = client.V1Deployment(
api_version="apps/v1",
kind="Deployment",
metadata=metadata,
spec=spec
)
return deployment
def create_deployment(deployment):
# 创建deployment
resp = api.create_namespaced_deployment(
body=deployment, namespace="default"
)
print("\n[INFO] deployment `nginx-deployment` created.\n")
print("%s\t%s\t\t\t%s\t%s" % ("NAMESPACE", "NAME", "REVISION", "IMAGE"))
print(
"%s\t\t%s\t%s\t\t%s\n"
% (
resp.metadata.namespace,
resp.metadata.name,
resp.metadata.generation,
resp.spec.template.spec.containers[0].image,
)
)
# 更新deployment
def update_deployment(name, deployment):
# 更新模板
deployment.spec.template.spec.containers[0].command = ["/bin/bash"]
# deployment.spec.template.spec.containers[0].args = [
# "-c",
# "curl https://gitee.com/lichunpeng12/python_test.git && sleep 3600"
# ]
deployment.spec.template.spec.containers[0].args = [
"-c",
"git clone https://gitee.com/lichunpeng12/python_test.git && cd python_test && python3 main.py"
]
deployment.spec.template.spec.containers[0].env = [
{
"name": "PYTHONUNBUFFERED",
"value": "1"
}
]
# deployment.spec.template.spec.containers[0].command = ["/usr/bin/git clone https://gitee.com/lichunpeng12/python_test.git", "cd python_test", "/usr/local/bin/python3 main.py"]
# deployment.spec.template.spec.containers[0].args = ["USER", "PWD"]
# 重启容器
resp = api.patch_namespaced_deployment(
name=name, namespace="default", body=deployment
)
print("\n[INFO] deployment's container command args updated.\n")
print("%s\t%s\t\t\t%s\t%s" % ("NAMESPACE", "NAME", "REVISION", "IMAGE"))
print(
"%s\t\t%s\t%s\t\t%s\n"
% (
resp.metadata.namespace,
resp.metadata.name,
resp.metadata.generation,
resp.spec.template.spec.containers[0].image,
)
)
# 重启deployment
def restart_deployment(name, deployment):
# 增加注解
deployment.spec.template.metadata.annotations = {
"kubectl.kubernetes.io/restartedAt": datetime.datetime.utcnow().replace(tzinfo=pytz.UTC).isoformat()
}
# 进行修改
resp = api.patch_namespaced_deployment(
name=name,
body=deployment, namespace="default"
)
print(f"\n[INFO] deployment `{name}` restarted.\n")
print("%s\t\t\t%s\t%s" % ("NAME", "REVISION", "RESTARTED-AT"))
print(
"%s\t%s\t\t%s\n"
% (
resp.metadata.name,
resp.metadata.generation,
resp.spec.template.metadata.annotations,
)
)
# 删除deployment
def delete_deployment(name):
# Delete deployment
resp = api.delete_namespaced_deployment(
name=name,
namespace="default",
body=client.V1DeleteOptions(
propagation_policy="Foreground", grace_period_seconds=5
),
)
print(f"\n[INFO] deployment `{name}` deleted.")
name = "hello-python"
deployment = create_deployment_obj(name)
# create_deployment(deployment)
update_deployment(name, deployment)
# delete_deployment(name)
# read_pod_container_log(
# pod_name="hello-python-7549db8f89-brxp4", namespace="default", previous=True
# )
# list_deploy("default")
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/cplinux98/python_test.git
git@gitee.com:cplinux98/python_test.git
cplinux98
python_test
python_test
master

搜索帮助