full_container_scheme/kubernets_api/get-deploy-v2.py

89 lines
2.9 KiB
Python

'''
Author: Logan.Li
Gitee: https://gitee.com/attacker
email: admin@attacker.club
Date: 2025-01-05 12:25:52
LastEditTime: 2025-03-03 00:06:37
Description:
pip install kubernetes
'''
from kubernetes import client, config
import os
import yaml
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
config.load_kube_config()
api_client = client.ApiClient()
def sanitize(obj):
api_version = getattr(obj, 'api_version', None)
kind = getattr(obj, 'kind', None)
obj_yaml = api_client.sanitize_for_serialization(obj)
obj_yaml['apiVersion'] = api_version or ('v1' if isinstance(obj, client.V1Service) else 'apps/v1')
obj_yaml['kind'] = kind or ('Service' if isinstance(obj, client.V1Service) else 'Deployment')
if isinstance(obj, client.V1Service):
if 'clusterIPs' in obj_yaml['spec']:
del obj_yaml['spec']['clusterIPs']
if 'clusterIP' in obj_yaml['spec']:
del obj_yaml['spec']['clusterIP']
metadata_fields = ['selfLink', 'generation', 'creationTimestamp', 'resourceVersion', 'uid', 'managedFields']
for field in metadata_fields:
obj_yaml['metadata'].pop(field, None)
obj_yaml['metadata'].setdefault('annotations', {})
obj_yaml['metadata'].setdefault('labels', {})
annotations_to_remove = [
'deployment.kubernetes.io/revision',
'kubectl.kubernetes.io/restartedAt',
'kubectl.kubernetes.io/last-applied-configuration'
]
for ann in annotations_to_remove:
obj_yaml['metadata']['annotations'].pop(ann, None)
if 'strategy' in obj_yaml.get('spec', {}):
del obj_yaml['spec']['strategy']
return obj_yaml
def merge_and_export(deployment, services, namespace):
combined = [sanitize(deployment)]
combined.extend([sanitize(svc) for svc in services])
dir_path = os.path.join(os.getcwd(), namespace)
os.makedirs(dir_path, exist_ok=True)
filename = f"{deployment.metadata.name}-combined.yaml"
with open(os.path.join(dir_path, filename), 'w') as f:
yaml.dump_all(combined, f, default_flow_style=False)
print(f"Exported {len(services)} services with deployment to {filename}")
def main():
apps_api = client.AppsV1Api(api_client)
core_api = client.CoreV1Api(api_client)
all_deployments = apps_api.list_deployment_for_all_namespaces().items
all_services = core_api.list_service_for_all_namespaces().items
for dep in all_deployments:
namespace = dep.metadata.namespace
pod_labels = dep.spec.template.metadata.labels
matched_svcs = [
svc for svc in all_services
if svc.metadata.namespace == namespace
and svc.spec.selector
and all(pod_labels.get(k) == v for k, v in svc.spec.selector.items())
]
if matched_svcs:
merge_and_export(dep, matched_svcs, namespace)
if __name__ == "__main__":
main()