222 lines
7.2 KiB
Python
222 lines
7.2 KiB
Python
import os
|
||
import sys
|
||
import time
|
||
import json
|
||
import hashlib
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
import urllib3
|
||
import requests
|
||
from kubernetes import client, config
|
||
from pathlib import Path
|
||
|
||
# 禁用 InsecureRequestWarning
|
||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||
|
||
class KubernetesMonitor:
|
||
def __init__(self, consul_url):
|
||
self.consul_url = consul_url
|
||
self.cache_dir = Path.home() / ".k8s_monitor_cache"
|
||
self.cache_dir.mkdir(exist_ok=True)
|
||
|
||
# 加载kubeconfig
|
||
self._init_kubernetes()
|
||
|
||
# 初始化线程池
|
||
self.executor = ThreadPoolExecutor(max_workers=10)
|
||
|
||
def _init_kubernetes(self):
|
||
"""初始化Kubernetes客户端"""
|
||
kubeconfig_path = os.path.expanduser("~/.kube/config")
|
||
if not os.path.isfile(kubeconfig_path):
|
||
sys.exit(f"错误:未找到kubeconfig文件 {kubeconfig_path}")
|
||
|
||
try:
|
||
config.load_kube_config()
|
||
self.core_api = client.CoreV1Api()
|
||
print("Kubernetes API 连接成功")
|
||
except Exception as e:
|
||
sys.exit(f"Kubernetes连接失败: {str(e)}")
|
||
|
||
def _get_service_hash(self, service):
|
||
"""生成服务唯一标识哈希"""
|
||
return hashlib.md5(f"{service.metadata.namespace}/{service.metadata.name}".encode()).hexdigest()
|
||
|
||
def _load_cache(self, cache_key):
|
||
"""加载缓存"""
|
||
cache_file = self.cache_dir / f"{cache_key}.json"
|
||
if cache_file.exists():
|
||
with open(cache_file) as f:
|
||
return json.load(f)
|
||
return None
|
||
|
||
def _save_cache(self, cache_key, data, ttl=300):
|
||
"""保存缓存(默认5分钟有效期)"""
|
||
cache_file = self.cache_dir / f"{cache_key}.json"
|
||
with open(cache_file, 'w') as f:
|
||
json.dump({
|
||
"expire": time.time() + ttl,
|
||
"data": data
|
||
}, f)
|
||
|
||
def _is_cache_valid(self, cache_key):
|
||
"""检查缓存有效性"""
|
||
cache_file = self.cache_dir / f"{cache_key}.json"
|
||
if not cache_file.exists():
|
||
return False
|
||
try:
|
||
with open(cache_file) as f:
|
||
cache_data = json.load(f)
|
||
return cache_data["expire"] > time.time()
|
||
except:
|
||
return False
|
||
|
||
def _batch_register_services(self, services):
|
||
"""批量注册服务到Consul"""
|
||
if not services:
|
||
return
|
||
|
||
try:
|
||
response = requests.put(
|
||
f"{self.consul_url}/v1/agent/service/register",
|
||
json=services,
|
||
timeout=10
|
||
)
|
||
if response.status_code == 200:
|
||
print(f"✓ 批量注册成功 {len(services)} 个服务")
|
||
else:
|
||
print(f"× 批量注册失败[{response.status_code}]: {response.text.strip()}")
|
||
except Exception as e:
|
||
print(f"! Consul批量注册异常: {str(e)}")
|
||
|
||
def _process_service(self, service):
|
||
"""并行处理单个服务"""
|
||
svc_hash = self._get_service_hash(service)
|
||
|
||
# 检查缓存有效性
|
||
if self._is_cache_valid(svc_hash):
|
||
return None
|
||
|
||
# 服务基本信息
|
||
svc_name = service.metadata.name
|
||
svc_namespace = service.metadata.namespace
|
||
svc_port = service.spec.ports[0].port
|
||
svc_cluster_ip = service.spec.cluster_ip
|
||
prometheus_url = f"http://{svc_cluster_ip}:{svc_port}/actuator/prometheus"
|
||
|
||
# 验证Prometheus端点(带缓存)
|
||
cache_key = f"prom_check_{svc_hash}"
|
||
if self._is_cache_valid(cache_key):
|
||
is_valid = self._load_cache(cache_key)["data"]
|
||
else:
|
||
is_valid = self._check_prometheus_endpoint(prometheus_url)
|
||
self._save_cache(cache_key, is_valid, ttl=300) # 5分钟缓存
|
||
|
||
if not is_valid:
|
||
return None
|
||
|
||
# 构建注册数据
|
||
service_data = {
|
||
"id": f"app-{svc_namespace}-{svc_name}",
|
||
"name": "application",
|
||
"address": svc_cluster_ip,
|
||
"port": svc_port,
|
||
"checks": [{
|
||
"http": prometheus_url,
|
||
"interval": "5s"
|
||
}]
|
||
}
|
||
|
||
# 保存服务缓存
|
||
self._save_cache(svc_hash, service_data)
|
||
return service_data
|
||
|
||
def _check_prometheus_endpoint(self, url):
|
||
"""带超时的端点检查"""
|
||
try:
|
||
response = requests.get(url, timeout=3)
|
||
return response.status_code == 200 and "system_cpu_usage" in response.text
|
||
except:
|
||
return False
|
||
|
||
def update_services(self):
|
||
"""增量更新服务"""
|
||
try:
|
||
# 获取当前所有服务
|
||
current_services = self.core_api.list_service_for_all_namespaces().items
|
||
|
||
# 并行处理服务
|
||
futures = []
|
||
for service in current_services:
|
||
futures.append(self.executor.submit(self._process_service, service))
|
||
|
||
# 收集结果
|
||
batch_services = []
|
||
for future in as_completed(futures):
|
||
result = future.result()
|
||
if result:
|
||
batch_services.append(result)
|
||
|
||
# 批量注册
|
||
if batch_services:
|
||
self._batch_register_services(batch_services)
|
||
|
||
print(f"本次更新处理 {len(batch_services)} 个服务")
|
||
|
||
except Exception as e:
|
||
print(f"服务更新异常: {str(e)}")
|
||
|
||
def clean_failed_instances(self):
|
||
"""智能清理失效实例"""
|
||
try:
|
||
# 获取所有失效实例
|
||
response = requests.get(
|
||
f"{self.consul_url}/v1/health/state/critical",
|
||
timeout=5
|
||
)
|
||
if response.status_code != 200:
|
||
return
|
||
|
||
critical_services = response.json()
|
||
if not critical_services:
|
||
return
|
||
|
||
# 并行注销
|
||
futures = []
|
||
for instance in critical_services:
|
||
futures.append(
|
||
self.executor.submit(
|
||
requests.put,
|
||
f"{self.consul_url}/v1/agent/service/deregister/{instance['ServiceID']}",
|
||
timeout=3
|
||
)
|
||
)
|
||
|
||
# 等待完成
|
||
success_count = 0
|
||
for future in as_completed(futures):
|
||
try:
|
||
response = future.result()
|
||
if response.status_code == 200:
|
||
success_count += 1
|
||
except:
|
||
pass
|
||
|
||
print(f"清理完成,共移除 {success_count}/{len(critical_services)} 个失效实例")
|
||
|
||
except Exception as e:
|
||
print(f"清理流程异常: {str(e)}")
|
||
|
||
if __name__ == "__main__":
|
||
consul_endpoint = "http://172.16.5.37:8500" # 替换实际Consul地址
|
||
|
||
monitor = KubernetesMonitor(consul_endpoint)
|
||
try:
|
||
# 增量更新服务
|
||
monitor.update_services()
|
||
|
||
# 异步清理失效实例
|
||
monitor.clean_failed_instances()
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n操作已中止")
|
||
sys.exit(0) |