71 lines
2.5 KiB
Python
71 lines
2.5 KiB
Python
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import re
|
|
from kubernetes import client, config, stream
|
|
import urllib3
|
|
import hashlib
|
|
import yaml
|
|
|
|
|
|
# 禁用 InsecureRequestWarning
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
class KubernetsAPI:
|
|
def __init__(self, kubeconfig=None,token=None,apiServer=None):
|
|
if os.path.isfile(os.path.expanduser("~/.kube/config")): # 如果存在默认的 kubeconfig 文件,加载本地配置
|
|
print("本地调用")
|
|
config.load_kube_config()
|
|
elif kubeconfig:
|
|
kubeconfig_dict = self.parse_kubeconfig(kubeconfig) # 解析 kubeconfig 内容并创建配置对象
|
|
config.load_kube_config_from_dict(kubeconfig_dict) # 使用 config.load_kube_config_from_dict 创建 kubeconfig 配置对象
|
|
elif token:
|
|
kubeconfig = client.Configuration()
|
|
kubeconfig.host = apiServer # APISERVER 地址
|
|
kubeconfig.verify_ssl = False
|
|
kubeconfig.api_key = {"authorization": f"Bearer {token}"}
|
|
client.Configuration.set_default(kubeconfig)
|
|
else :
|
|
pass
|
|
|
|
try:
|
|
self.core_api = client.CoreV1Api()
|
|
self.apps_api = client.AppsV1Api()
|
|
print("api接口调用验证成功.")
|
|
except Exception as e:
|
|
print(f"api接口调用验证失败.: {str(e)}")
|
|
sys.exit("API接口调用验证失败.程序退出.")
|
|
|
|
|
|
def parse_kubeconfig(self,kubeconfig_content):
|
|
try:
|
|
kubeconfig_dict = yaml.safe_load(kubeconfig_content)
|
|
return kubeconfig_dict
|
|
except yaml.YAMLError as e:
|
|
raise Exception(f"Error parsing kubeconfig content: {str(e)}")
|
|
|
|
|
|
def get_node_exporter_pods(self):
|
|
# 定义要获取的命名空间
|
|
namespace = "monitor"
|
|
# 定义 label selector
|
|
label_selector = "app=node-exporter"
|
|
|
|
try:
|
|
# 调用 Kubernetes API 获取 Pod 列表
|
|
api_response = self.core_api.list_namespaced_pod(namespace, label_selector=label_selector)
|
|
for pod in api_response.items:
|
|
print("Pod 名称: %s \t IP: %s \t 节点: %s" % (pod.metadata.name, pod.status.pod_ip, pod.spec.node_name))
|
|
|
|
except Exception as e:
|
|
print("获取 Pod 列表时出错: %s" % e)
|
|
|
|
if __name__ == "__main__":
|
|
K8s = KubernetsAPI()
|
|
K8s.get_node_exporter_pods()
|
|
|
|
|
|
# pod_list = self.core_api.list_pod_for_all_namespaces(watch=False)
|
|
# print(pod_list)
|