full_container_scheme/kubernets_api/consul-v2.py

222 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import sys
import time
import json
import hashlib
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib3
import requests
from kubernetes import client, config
from pathlib import Path
# 禁用 InsecureRequestWarning
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class KubernetesMonitor:
def __init__(self, consul_url):
self.consul_url = consul_url
self.cache_dir = Path.home() / ".k8s_monitor_cache"
self.cache_dir.mkdir(exist_ok=True)
# 加载kubeconfig
self._init_kubernetes()
# 初始化线程池
self.executor = ThreadPoolExecutor(max_workers=10)
def _init_kubernetes(self):
"""初始化Kubernetes客户端"""
kubeconfig_path = os.path.expanduser("~/.kube/config")
if not os.path.isfile(kubeconfig_path):
sys.exit(f"错误未找到kubeconfig文件 {kubeconfig_path}")
try:
config.load_kube_config()
self.core_api = client.CoreV1Api()
print("Kubernetes API 连接成功")
except Exception as e:
sys.exit(f"Kubernetes连接失败: {str(e)}")
def _get_service_hash(self, service):
"""生成服务唯一标识哈希"""
return hashlib.md5(f"{service.metadata.namespace}/{service.metadata.name}".encode()).hexdigest()
def _load_cache(self, cache_key):
"""加载缓存"""
cache_file = self.cache_dir / f"{cache_key}.json"
if cache_file.exists():
with open(cache_file) as f:
return json.load(f)
return None
def _save_cache(self, cache_key, data, ttl=300):
"""保存缓存默认5分钟有效期"""
cache_file = self.cache_dir / f"{cache_key}.json"
with open(cache_file, 'w') as f:
json.dump({
"expire": time.time() + ttl,
"data": data
}, f)
def _is_cache_valid(self, cache_key):
"""检查缓存有效性"""
cache_file = self.cache_dir / f"{cache_key}.json"
if not cache_file.exists():
return False
try:
with open(cache_file) as f:
cache_data = json.load(f)
return cache_data["expire"] > time.time()
except:
return False
def _batch_register_services(self, services):
"""批量注册服务到Consul"""
if not services:
return
try:
response = requests.put(
f"{self.consul_url}/v1/agent/service/register",
json=services,
timeout=10
)
if response.status_code == 200:
print(f"✓ 批量注册成功 {len(services)} 个服务")
else:
print(f"× 批量注册失败[{response.status_code}]: {response.text.strip()}")
except Exception as e:
print(f"! Consul批量注册异常: {str(e)}")
def _process_service(self, service):
"""并行处理单个服务"""
svc_hash = self._get_service_hash(service)
# 检查缓存有效性
if self._is_cache_valid(svc_hash):
return None
# 服务基本信息
svc_name = service.metadata.name
svc_namespace = service.metadata.namespace
svc_port = service.spec.ports[0].port
svc_cluster_ip = service.spec.cluster_ip
prometheus_url = f"http://{svc_cluster_ip}:{svc_port}/actuator/prometheus"
# 验证Prometheus端点带缓存
cache_key = f"prom_check_{svc_hash}"
if self._is_cache_valid(cache_key):
is_valid = self._load_cache(cache_key)["data"]
else:
is_valid = self._check_prometheus_endpoint(prometheus_url)
self._save_cache(cache_key, is_valid, ttl=300) # 5分钟缓存
if not is_valid:
return None
# 构建注册数据
service_data = {
"id": f"app-{svc_namespace}-{svc_name}",
"name": "application",
"address": svc_cluster_ip,
"port": svc_port,
"checks": [{
"http": prometheus_url,
"interval": "5s"
}]
}
# 保存服务缓存
self._save_cache(svc_hash, service_data)
return service_data
def _check_prometheus_endpoint(self, url):
"""带超时的端点检查"""
try:
response = requests.get(url, timeout=3)
return response.status_code == 200 and "system_cpu_usage" in response.text
except:
return False
def update_services(self):
"""增量更新服务"""
try:
# 获取当前所有服务
current_services = self.core_api.list_service_for_all_namespaces().items
# 并行处理服务
futures = []
for service in current_services:
futures.append(self.executor.submit(self._process_service, service))
# 收集结果
batch_services = []
for future in as_completed(futures):
result = future.result()
if result:
batch_services.append(result)
# 批量注册
if batch_services:
self._batch_register_services(batch_services)
print(f"本次更新处理 {len(batch_services)} 个服务")
except Exception as e:
print(f"服务更新异常: {str(e)}")
def clean_failed_instances(self):
"""智能清理失效实例"""
try:
# 获取所有失效实例
response = requests.get(
f"{self.consul_url}/v1/health/state/critical",
timeout=5
)
if response.status_code != 200:
return
critical_services = response.json()
if not critical_services:
return
# 并行注销
futures = []
for instance in critical_services:
futures.append(
self.executor.submit(
requests.put,
f"{self.consul_url}/v1/agent/service/deregister/{instance['ServiceID']}",
timeout=3
)
)
# 等待完成
success_count = 0
for future in as_completed(futures):
try:
response = future.result()
if response.status_code == 200:
success_count += 1
except:
pass
print(f"清理完成,共移除 {success_count}/{len(critical_services)} 个失效实例")
except Exception as e:
print(f"清理流程异常: {str(e)}")
if __name__ == "__main__":
consul_endpoint = "http://172.16.5.37:8500" # 替换实际Consul地址
monitor = KubernetesMonitor(consul_endpoint)
try:
# 增量更新服务
monitor.update_services()
# 异步清理失效实例
monitor.clean_failed_instances()
except KeyboardInterrupt:
print("\n操作已中止")
sys.exit(0)