import os import sys import json import time import re from kubernetes import client, config, stream import urllib3 import hashlib import yaml # 禁用 InsecureRequestWarning urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class KubernetsAPI: def __init__(self, kubeconfig=None,token=None,apiServer=None): if os.path.isfile(os.path.expanduser("~/.kube/config")): # 如果存在默认的 kubeconfig 文件,加载本地配置 print("本地调用") config.load_kube_config() elif kubeconfig: kubeconfig_dict = self.parse_kubeconfig(kubeconfig) # 解析 kubeconfig 内容并创建配置对象 config.load_kube_config_from_dict(kubeconfig_dict) # 使用 config.load_kube_config_from_dict 创建 kubeconfig 配置对象 elif token: kubeconfig = client.Configuration() kubeconfig.host = apiServer # APISERVER 地址 kubeconfig.verify_ssl = False kubeconfig.api_key = {"authorization": f"Bearer {token}"} client.Configuration.set_default(kubeconfig) else : pass try: self.core_api = client.CoreV1Api() self.apps_api = client.AppsV1Api() print("api接口调用验证成功.") except Exception as e: print(f"api接口调用验证失败.: {str(e)}") sys.exit("API接口调用验证失败.程序退出.") def parse_kubeconfig(self,kubeconfig_content): try: kubeconfig_dict = yaml.safe_load(kubeconfig_content) return kubeconfig_dict except yaml.YAMLError as e: raise Exception(f"Error parsing kubeconfig content: {str(e)}") def get_node_exporter_pods(self): # 定义要获取的命名空间 namespace = "monitor" # 定义 label selector label_selector = "app=node-exporter" try: # 调用 Kubernetes API 获取 Pod 列表 api_response = self.core_api.list_namespaced_pod(namespace, label_selector=label_selector) for pod in api_response.items: print("Pod 名称: %s \t IP: %s \t 节点: %s" % (pod.metadata.name, pod.status.pod_ip, pod.spec.node_name)) except Exception as e: print("获取 Pod 列表时出错: %s" % e) if __name__ == "__main__": K8s = KubernetsAPI() K8s.get_node_exporter_pods() # pod_list = self.core_api.list_pod_for_all_namespaces(watch=False) # print(pod_list)