157 lines
7.1 KiB
Python
157 lines
7.1 KiB
Python
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import urllib3
|
|
import requests
|
|
from kubernetes import client, config
|
|
|
|
# 禁用 InsecureRequestWarning
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
class KubernetesAPI:
|
|
def __init__(self, kubeconfig=None,token=None,apiServer=None,consul=None):
|
|
self.consul_url = consul
|
|
if os.path.isfile(os.path.expanduser("~/.kube/config")): # 如果存在默认的 kubeconfig 文件,加载本地配置
|
|
print("本地调用")
|
|
config.load_kube_config()
|
|
elif kubeconfig:
|
|
kubeconfig_dict = self.parse_kubeconfig(kubeconfig) # 解析 kubeconfig 内容并创建配置对象
|
|
config.load_kube_config_from_dict(kubeconfig_dict) # 使用 config.load_kube_config_from_dict 创建 kubeconfig 配置对象
|
|
elif token:
|
|
kubeconfig = client.Configuration()
|
|
kubeconfig.host = apiServer # APISERVER 地址
|
|
kubeconfig.verify_ssl = False
|
|
kubeconfig.api_key = {"authorization": f"Bearer {token}"}
|
|
client.Configuration.set_default(kubeconfig)
|
|
else :
|
|
pass
|
|
try:
|
|
self.core_api = client.CoreV1Api()
|
|
print("API 接口验证成功.")
|
|
except Exception as e:
|
|
print(f"验证 API 接口失败: {str(e)}")
|
|
sys.exit("API 接口验证失败。程序退出。")
|
|
|
|
|
|
def update_app_services(self):
|
|
try:
|
|
# 获取所有 Pod
|
|
pods = self.core_api.list_pod_for_all_namespaces().items
|
|
pod_dict = {}
|
|
|
|
# 记录命名空间、Pod 名称和 Pod IP 到字典中
|
|
for pod in pods:
|
|
namespace = pod.metadata.namespace
|
|
pod_name = pod.metadata.name
|
|
pod_ip = pod.status.pod_ip
|
|
pod_dict[(namespace, pod_name)] = pod_ip
|
|
#print("pod_dict:",pod_dict)
|
|
# 遍历所有服务
|
|
services = self.core_api.list_service_for_all_namespaces().items
|
|
for service in services:
|
|
svc_name = service.metadata.name
|
|
svc_namespace = service.metadata.namespace
|
|
svc_port = service.spec.ports[0].port
|
|
svc_cluster_ip = service.spec.cluster_ip
|
|
prometheus_url = f"http://{svc_cluster_ip}:{svc_port}/actuator/prometheus"
|
|
try:
|
|
response = requests.get(prometheus_url,timeout=3)
|
|
if response.status_code == 200 and "system_cpu_usage" in response.text:
|
|
# 使用标签选择器选择与Prometheus相关联的Pod
|
|
app_selector = f"app={svc_name}"
|
|
|
|
# 获取与服务关联的所有Pod
|
|
pod_list = self.core_api.list_namespaced_pod(namespace=svc_namespace, label_selector=app_selector).items
|
|
|
|
for pod in pod_list:
|
|
pod_name = pod.metadata.name
|
|
pod_ip = pod.status.pod_ip
|
|
if pod_ip:
|
|
service_data = {
|
|
"id": f"{pod_name}",
|
|
"name": "application",
|
|
"address": pod_ip,
|
|
"port": svc_port,
|
|
"checks": [{
|
|
"http": prometheus_url,
|
|
"interval": "5s"
|
|
}]
|
|
}
|
|
|
|
try:
|
|
response = requests.put(f"{self.consul_url}/v1/agent/service/register", json=service_data)
|
|
if response.status_code == 200:
|
|
print(f"prometheus: {prometheus_url} Pod {pod_name},注册成功。")
|
|
else:
|
|
print(f"无法注册服务 {svc_name}。状态码: {response.status_code}")
|
|
print(response.text)
|
|
except Exception as register_err:
|
|
print(f"注册服务到 Consul 时出错: {register_err}")
|
|
|
|
except Exception as prometheus_err:
|
|
pass
|
|
#print(f"检查 Prometheus 端点时出错: {prometheus_err}")
|
|
|
|
except Exception as e:
|
|
print("处理注册服务到 Consul 时出错: %s" % e)
|
|
|
|
def update_node_exporter_pods(self):
|
|
namespace = "monitor" # 定义要获取的命名空间
|
|
label_selector = "app=node-exporter" # 定义 label selector
|
|
|
|
try:
|
|
# 调用 Kubernetes API 获取 Pod 列表
|
|
api_response = self.core_api.list_namespaced_pod(namespace, label_selector=label_selector)
|
|
for pod in api_response.items:
|
|
# 定义服务注册的数据
|
|
data = {
|
|
"id": f"{pod.spec.node_name}-{pod.metadata.name}",
|
|
"name": "node-exporter",
|
|
"address": f"{pod.status.pod_ip}",
|
|
"port": 9100,
|
|
"checks": [{
|
|
"http": f"http://{pod.status.pod_ip}:9100/metrics",
|
|
"interval": "5s"
|
|
}]
|
|
}
|
|
# 发送 PUT 请求以注册服务
|
|
response = requests.put(f"{self.consul_url}/v1/agent/service/register", json=data)
|
|
|
|
# 检查响应状态
|
|
if response.status_code == 200:
|
|
print(f"Pod 名称:{pod.metadata.name} IP:{pod.status.pod_ip} 节点:{pod.spec.node_name} 注册成功.")
|
|
|
|
else:
|
|
print(f"无法注册服务 {pod.spec.node_name}. 状态码: {response.status_code}")
|
|
print(response.text)
|
|
|
|
except Exception as e:
|
|
print("获取 Pod 列表时出错: %s" % e)
|
|
|
|
|
|
|
|
def clean_failed_instances(self):
|
|
time.sleep(3)
|
|
response = requests.get(f"{self.consul_url}/v1/health/state/critical")
|
|
if response.status_code == 200:
|
|
instances = response.json()
|
|
for instance in instances:
|
|
if instance['Status'] == 'critical': # 如果实例状态为严重
|
|
service_id = instance['ServiceID']
|
|
requests.put(f"{self.consul_url}/v1/agent/service/deregister/{service_id}")
|
|
print(f"失效实例ID: {service_id}")
|
|
else:
|
|
print(f"无法从 Consul API 获取数据。状态码:{response.status_code}")
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
consul_url = "http://172.16.5.37:8500"
|
|
token = "eyJhbGciOiJSUzI1Nixxxx"
|
|
apiServer = "https://46CA01C54B919FA35648DF454239A740.gr7.ap-northeast-1.eks.amazonaws.com"
|
|
k8s = KubernetesAPI(token=token,apiServer=apiServer,consul=consul_url)
|
|
k8s.update_app_services()
|
|
k8s.update_node_exporter_pods()
|
|
k8s.clean_failed_instances() |