full_container_scheme/kubernets_api/consul-clean.py

77 lines
3.0 KiB
Python

'''
Author: Logan.Li
Gitee: https://gitee.com/attacker
email: admin@attacker.club
Date: 2025-07-07 22:13:21
LastEditTime: 2025-07-07 22:13:25
Description:
'''
import requests
import time
import logging
from threading import Lock
class ConsulCleaner:
def __init__(self, consul_url="http://consul.xx.me"):
self.consul_url = consul_url.rstrip('/')
self.lock = Lock()
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def clean_failed_instances(self, max_retries=3):
"""Clean up failed (critical) instances from Consul"""
time.sleep(3) # Initial delay
for attempt in range(max_retries):
try:
with self.lock:
# Get critical services
health_url = f"{self.consul_url}/v1/health/state/critical"
response = requests.get(health_url, timeout=10)
if response.status_code != 200:
self.logger.error(f"Consul API returned {response.status_code}: {response.text}")
return False
instances = response.json()
if not instances:
self.logger.info("No critical instances found")
return True
# Deregister each critical service
for instance in instances:
if instance.get('Status') == 'critical':
service_id = instance.get('ServiceID')
if not service_id:
continue
dereg_url = f"{self.consul_url}/v1/agent/service/deregister/{service_id}"
dereg_response = requests.put(dereg_url, timeout=5)
if dereg_response.status_code == 200:
self.logger.info(f"Successfully deregistered failed instance: {service_id}")
else:
self.logger.warning(
f"Failed to deregister {service_id}. Status: {dereg_response.status_code}"
)
return True
except requests.exceptions.RequestException as e:
self.logger.error(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
return False
if __name__ == "__main__":
cleaner = ConsulCleaner()
if cleaner.clean_failed_instances():
print("Cleanup completed successfully")
else:
print("Cleanup encountered errors")