''' 导出deployment + svc 配置信息 ''' from kubernetes import client, config import os import yaml # 禁用不安全请求警告 import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # 创建API客户端 config.load_kube_config() api_client = client.ApiClient() def sanitize(obj): # 获取原始对象的apiVersion和kind字段 api_version = getattr(obj, 'api_version', None) kind = getattr(obj, 'kind', None) obj_yaml = api_client.sanitize_for_serialization(obj) # 如果apiVersion或kind为空,则使用默认值 obj_yaml['apiVersion'] = api_version or 'apps/v1' # 对于Deployment obj_yaml['kind'] = kind or 'Deployment' # 对于Deployment # 如果是Service资源,则使用不同的默认值 if isinstance(obj, client.V1Service): obj_yaml['apiVersion'] = api_version or 'v1' obj_yaml['kind'] = kind or 'Service' # 移除clusterIP字段 if 'clusterIPs' in obj_yaml['spec']: del obj_yaml['spec']['clusterIPs'] del obj_yaml['spec']['clusterIP'] # 删除不需要的字段 if 'selfLink' in obj_yaml['metadata']: del obj_yaml['metadata']['selfLink'] if 'generation' in obj_yaml['metadata']: del obj_yaml['metadata']['generation'] # 只有Deployment等资源才有generation del obj_yaml['metadata']['creationTimestamp'] del obj_yaml['metadata']['resourceVersion'] del obj_yaml['metadata']['uid'] del obj_yaml['status'] if 'managedFields' in obj_yaml['metadata']: del obj_yaml['metadata']['managedFields'] # 移除strategy字段 if 'strategy' in obj_yaml['spec']: del obj_yaml['spec']['strategy'] # 清空 annotations 和 labels 字段 obj_yaml['metadata'].setdefault('annotations', {}) obj_yaml['metadata'].setdefault('labels', {}) # 移除不需要的注解 if 'deployment.kubernetes.io/revision' in obj_yaml['metadata']['annotations']: del obj_yaml['metadata']['annotations']['deployment.kubernetes.io/revision'] if 'kubectl.kubernetes.io/restartedAt' in obj_yaml['metadata']['annotations']: del obj_yaml['metadata']['annotations']['kubectl.kubernetes.io/restartedAt'] if 'kubectl.kubernetes.io/last-applied-configuration' in obj_yaml['metadata']['annotations']: del obj_yaml['metadata']['annotations']['kubectl.kubernetes.io/last-applied-configuration'] return obj_yaml def merge_and_export(deployment, service, namespace): deployment_yaml = sanitize(deployment) service_yaml = sanitize(service) # 合并两个对象 combined_yaml = [deployment_yaml, service_yaml] # 生成YAML文件 yaml_file = f"{deployment.metadata.name}.yaml" dir_path = os.path.join(os.getcwd(), namespace) if not os.path.exists(dir_path): os.makedirs(dir_path) # 使用makedirs以递归创建目录 with open(os.path.join(dir_path, yaml_file), 'w') as f: yaml.dump_all(combined_yaml, f, default_flow_style=False) print(f"{yaml_file} created in {namespace} directory.") # 获取所有Deployment deployments = client.AppsV1Api(api_client).list_deployment_for_all_namespaces(watch=False) # 获取所有Service services = client.CoreV1Api(api_client).list_service_for_all_namespaces(watch=False) # 将Deployment和Service配对 # 假设Service的名称与Deployment的名称相同 for deployment in deployments.items: service = next((s for s in services.items if s.metadata.name == deployment.metadata.name), None) if service is not None: merge_and_export(deployment, service, deployment.metadata.namespace)