full_container_scheme/kubernets_api/get-deployment-svc.py

94 lines
3.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

'''
导出deployment + svc 配置信息
'''
from kubernetes import client, config
import os
import yaml
# 禁用不安全请求警告
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 创建API客户端
config.load_kube_config()
api_client = client.ApiClient()
def sanitize(obj):
# 获取原始对象的apiVersion和kind字段
api_version = getattr(obj, 'api_version', None)
kind = getattr(obj, 'kind', None)
obj_yaml = api_client.sanitize_for_serialization(obj)
# 如果apiVersion或kind为空则使用默认值
obj_yaml['apiVersion'] = api_version or 'apps/v1' # 对于Deployment
obj_yaml['kind'] = kind or 'Deployment' # 对于Deployment
# 如果是Service资源则使用不同的默认值
if isinstance(obj, client.V1Service):
obj_yaml['apiVersion'] = api_version or 'v1'
obj_yaml['kind'] = kind or 'Service'
# 移除clusterIP字段
if 'clusterIPs' in obj_yaml['spec']:
del obj_yaml['spec']['clusterIPs']
del obj_yaml['spec']['clusterIP']
# 删除不需要的字段
if 'selfLink' in obj_yaml['metadata']:
del obj_yaml['metadata']['selfLink']
if 'generation' in obj_yaml['metadata']:
del obj_yaml['metadata']['generation'] # 只有Deployment等资源才有generation
del obj_yaml['metadata']['creationTimestamp']
del obj_yaml['metadata']['resourceVersion']
del obj_yaml['metadata']['uid']
del obj_yaml['status']
if 'managedFields' in obj_yaml['metadata']:
del obj_yaml['metadata']['managedFields']
# 移除strategy字段
if 'strategy' in obj_yaml['spec']:
del obj_yaml['spec']['strategy']
# 清空 annotations 和 labels 字段
obj_yaml['metadata'].setdefault('annotations', {})
obj_yaml['metadata'].setdefault('labels', {})
# 移除不需要的注解
if 'deployment.kubernetes.io/revision' in obj_yaml['metadata']['annotations']:
del obj_yaml['metadata']['annotations']['deployment.kubernetes.io/revision']
if 'kubectl.kubernetes.io/restartedAt' in obj_yaml['metadata']['annotations']:
del obj_yaml['metadata']['annotations']['kubectl.kubernetes.io/restartedAt']
if 'kubectl.kubernetes.io/last-applied-configuration' in obj_yaml['metadata']['annotations']:
del obj_yaml['metadata']['annotations']['kubectl.kubernetes.io/last-applied-configuration']
return obj_yaml
def merge_and_export(deployment, service, namespace):
deployment_yaml = sanitize(deployment)
service_yaml = sanitize(service)
# 合并两个对象
combined_yaml = [deployment_yaml, service_yaml]
# 生成YAML文件
yaml_file = f"{deployment.metadata.name}.yaml"
dir_path = os.path.join(os.getcwd(), namespace)
if not os.path.exists(dir_path):
os.makedirs(dir_path) # 使用makedirs以递归创建目录
with open(os.path.join(dir_path, yaml_file), 'w') as f:
yaml.dump_all(combined_yaml, f, default_flow_style=False)
print(f"{yaml_file} created in {namespace} directory.")
# 获取所有Deployment
deployments = client.AppsV1Api(api_client).list_deployment_for_all_namespaces(watch=False)
# 获取所有Service
services = client.CoreV1Api(api_client).list_service_for_all_namespaces(watch=False)
# 将Deployment和Service配对
# 假设Service的名称与Deployment的名称相同
for deployment in deployments.items:
service = next((s for s in services.items if s.metadata.name == deployment.metadata.name), None)
if service is not None:
merge_and_export(deployment, service, deployment.metadata.namespace)