import os import sys import json import time import urllib3 import requests from kubernetes import client, config # 禁用 InsecureRequestWarning urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class KubernetesAPI: def __init__(self, kubeconfig=None,token=None,apiServer=None,consul=None): self.consul_url = consul if os.path.isfile(os.path.expanduser("~/.kube/config")): # 如果存在默认的 kubeconfig 文件,加载本地配置 print("本地调用") config.load_kube_config() elif kubeconfig: kubeconfig_dict = self.parse_kubeconfig(kubeconfig) # 解析 kubeconfig 内容并创建配置对象 config.load_kube_config_from_dict(kubeconfig_dict) # 使用 config.load_kube_config_from_dict 创建 kubeconfig 配置对象 elif token: kubeconfig = client.Configuration() kubeconfig.host = apiServer # APISERVER 地址 kubeconfig.verify_ssl = False kubeconfig.api_key = {"authorization": f"Bearer {token}"} client.Configuration.set_default(kubeconfig) else : pass try: self.core_api = client.CoreV1Api() print("API 接口验证成功.") except Exception as e: print(f"验证 API 接口失败: {str(e)}") sys.exit("API 接口验证失败。程序退出。") def update_app_services(self): try: # 获取所有 Pod pods = self.core_api.list_pod_for_all_namespaces().items pod_dict = {} # 记录命名空间、Pod 名称和 Pod IP 到字典中 for pod in pods: namespace = pod.metadata.namespace pod_name = pod.metadata.name pod_ip = pod.status.pod_ip pod_dict[(namespace, pod_name)] = pod_ip #print("pod_dict:",pod_dict) # 遍历所有服务 services = self.core_api.list_service_for_all_namespaces().items for service in services: svc_name = service.metadata.name svc_namespace = service.metadata.namespace svc_port = service.spec.ports[0].port svc_cluster_ip = service.spec.cluster_ip prometheus_url = f"http://{svc_cluster_ip}:{svc_port}/actuator/prometheus" try: response = requests.get(prometheus_url,timeout=3) if response.status_code == 200 and "system_cpu_usage" in response.text: # 使用标签选择器选择与Prometheus相关联的Pod app_selector = f"app={svc_name}" # 获取与服务关联的所有Pod pod_list = self.core_api.list_namespaced_pod(namespace=svc_namespace, label_selector=app_selector).items for pod in pod_list: pod_name = pod.metadata.name pod_ip = pod.status.pod_ip if pod_ip: service_data = { "id": f"{pod_name}", "name": "application", "address": pod_ip, "port": svc_port, "checks": [{ "http": prometheus_url, "interval": "5s" }] } try: response = requests.put(f"{self.consul_url}/v1/agent/service/register", json=service_data) if response.status_code == 200: print(f"prometheus: {prometheus_url} Pod {pod_name},注册成功。") else: print(f"无法注册服务 {svc_name}。状态码: {response.status_code}") print(response.text) except Exception as register_err: print(f"注册服务到 Consul 时出错: {register_err}") except Exception as prometheus_err: pass #print(f"检查 Prometheus 端点时出错: {prometheus_err}") except Exception as e: print("处理注册服务到 Consul 时出错: %s" % e) def update_node_exporter_pods(self): namespace = "monitor" # 定义要获取的命名空间 label_selector = "app=node-exporter" # 定义 label selector try: # 调用 Kubernetes API 获取 Pod 列表 api_response = self.core_api.list_namespaced_pod(namespace, label_selector=label_selector) for pod in api_response.items: # 定义服务注册的数据 data = { "id": f"{pod.spec.node_name}-{pod.metadata.name}", "name": "node-exporter", "address": f"{pod.status.pod_ip}", "port": 9100, "checks": [{ "http": f"http://{pod.status.pod_ip}:9100/metrics", "interval": "5s" }] } # 发送 PUT 请求以注册服务 response = requests.put(f"{self.consul_url}/v1/agent/service/register", json=data) # 检查响应状态 if response.status_code == 200: print(f"Pod 名称:{pod.metadata.name} IP:{pod.status.pod_ip} 节点:{pod.spec.node_name} 注册成功.") else: print(f"无法注册服务 {pod.spec.node_name}. 状态码: {response.status_code}") print(response.text) except Exception as e: print("获取 Pod 列表时出错: %s" % e) def clean_failed_instances(self): time.sleep(3) response = requests.get(f"{self.consul_url}/v1/health/state/critical") if response.status_code == 200: instances = response.json() for instance in instances: if instance['Status'] == 'critical': # 如果实例状态为严重 service_id = instance['ServiceID'] requests.put(f"{self.consul_url}/v1/agent/service/deregister/{service_id}") print(f"失效实例ID: {service_id}") else: print(f"无法从 Consul API 获取数据。状态码:{response.status_code}") if __name__ == "__main__": consul_url = "http://172.16.5.37:8500" token = "eyJhbGciOiJSUzI1Nixxxx" apiServer = "https://46CA01C54B919FA35648DF454239A740.gr7.ap-northeast-1.eks.amazonaws.com" k8s = KubernetesAPI(token=token,apiServer=apiServer,consul=consul_url) k8s.update_app_services() k8s.update_node_exporter_pods() k8s.clean_failed_instances()