1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
| import yaml import os import time from pathlib import Path
NEW_CONFIG_FILENAME = "config-all-new.yaml"
def read_kube_config(filename): with open(filename) as f: return yaml.full_load(f)
def output_kube_config(filename, config): with open(filename, "w") as f: f.write(config) print(f"导出配置文件【{filename}】成功")
def merge_kubeconfig(file_list): default_config = { "apiVersion": "v1", "kind": "Config", "preferences": {}, "clusters": [], "contexts": [], "users": [], "current-context": "", }
for file in file_list: config = read_kube_config(file) for cluster in config["clusters"]: default_config["clusters"].append(cluster) for user in config["users"]: default_config["users"].append(user) for context in config["contexts"]: default_config["contexts"].append(context)
output_kube_config(NEW_CONFIG_FILENAME, yaml.dump(default_config, allow_unicode=True))
def remove_cluster_context(config_filepath, context_name): kube_config = read_kube_config(config_filepath) for context_idx, context in enumerate(kube_config["contexts"]): if context["name"] == context_name: for cluster_idx, cluster in enumerate(kube_config["clusters"]): if cluster["name"] == context["context"]["cluster"]: print(f'删除cluster【{cluster["name"]}】') del kube_config["clusters"][cluster_idx] for user_idx, user in enumerate(kube_config["users"]): if user["name"] == context["context"]["user"]: print(f'删除user【{user["name"]}】') del kube_config["users"][user_idx] print(f'删除context【{context["name"]}】') del kube_config["contexts"][context_idx]
current_date = time.strftime("%Y-%m-%d-%H:%M:%S", time.localtime()) os.system(f"cp ~/.kube/config ~/.kube/backup/config_backup_{current_date}_reason_del_context") output_kube_config(config_filepath, yaml.dump(kube_config, allow_unicode=True))
return print(f"未找到context 【{context_name}】,删除失败")
def replace_default_kube_config(): current_date = time.strftime("%Y-%m-%d-%H:%M:%S", time.localtime())
print("备份旧配置...") os.system(f"cp ~/.kube/config ~/.kube/backup/config_backup_{current_date}_reason_merge_context") print("替换新配置...") os.system(f"cp ./{NEW_CONFIG_FILENAME} ~/.kube/config") print("删除配置文件...") os.system(f"rm -f ./{NEW_CONFIG_FILENAME}") os.system("unset KUBECONFIG") print("✅完成替换")
if __name__ == "__main__": remove_cluster_context(f"{Path.home()}/.kube/config", "【本地测试环境】docker-desktop") pass
|