kubernetes中python api的二次封装
admin
2023-03-08 00:41:53
0

k8s python api二次封装

pip install pprint   kubernetes
import urllib3
from pprint import pprint
from kubernetes import client
from os import path
import yaml

class K8sApi(object):
    def __init__(self):
        # self.config = config.kube_config.load_kube_config()
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
        self.configuration = client.Configuration()
        self.configuration.host = "https://192.168.100.111:6443"
         self.configuration.api_key[
            'authorization'] = 'Bearer  token'
        self.configuration.verify_ssl = False
        self.k8s_apps_v1 = client.AppsV1Api(client.ApiClient(self.configuration))
        self.Api_Instance = client.CoreV1Api(client.ApiClient(self.configuration))
        self.Api_Instance_Extensions = client.ExtensionsV1beta1Api(client.ApiClient(self.configuration))

    ####################################################################################################################

    def list_deployment(self, namespace="default"):
        api_response = self.k8s_apps_v1.list_namespaced_deployment(namespace)
        return api_response

    def read_deployment(self, name="nginx-deployment", namespace="default"):
        api_response = self.k8s_apps_v1.read_namespaced_deployment(name, namespace)
        return api_response

    def create_deployment(self, file="deploy-nginx.yaml"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            resp = self.k8s_apps_v1.create_namespaced_deployment(
                body=dep, namespace="default")
            return resp

    def replace_deployment(self, file="deploy-nginx.yaml", name="nginx-deployment", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            resp = self.k8s_apps_v1.replace_namespaced_deployment(name, namespace,
                                                                  body=dep)
            return resp

    def delete_deployment(self, name="nginx-deployment", namespace="default"):
        api_response = self.k8s_apps_v1.delete_namespaced_deployment(name, namespace)
        return api_response

    ####################################################################################################################

    def list_namespace(self):
        api_response = self.Api_Instance.list_namespace()
        return api_response

    def read_namespace(self, name="default"):
        api_response = self.Api_Instance.read_namespace(name)
        return api_response

    def create_namespace(self, file="pod-nginx.yaml"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance.create_namespace(body=dep)
            return api_response

    def replace_namespace(self, file="pod-nginx.yaml", name="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
        api_response = self.Api_Instance.replace_namespace(name, body=dep)
        return api_response

    def delete_namespace(self, name="default", namespace="default"):
        api_response = self.Api_Instance.delete_namespace(name)
        return api_response

    ####################################################################################################################

    def list_node(self):
        api_response = self.Api_Instance.list_node()
        data = {}
        for i in api_response.items:
            data[i.metadata.name] = {"name": i.metadata.name,
                                     "status": i.status.conditions[-1].type if i.status.conditions[
                                                                                   -1].status == "True" else "NotReady",
                                     "ip": i.status.addresses[0].address,
                                     "kubelet_version": i.status.node_info.kubelet_version,
                                     "os_image": i.status.node_info.os_image,
                                     }
        return data

    def list_pod(self):
        api_response = self.Api_Instance.list_pod_for_all_namespaces()
        data = {}
        for i in api_response.items:
            data[i.metadata.name] = {"ip": i.status.pod_ip, "namespace": i.metadata.namespace}
        return data

    def read_pod(self, name="nginx-pod", namespace="default"):
        api_response = self.Api_Instance.read_namespaced_pod(name, namespace)
        return api_response

    def create_pod(self, file="pod-nginx.yaml", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance.create_namespaced_pod(namespace, body=dep)
            return api_response

    def replace_pod(self, file="pod-nginx.yaml", name="nginx-pod", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance.replace_namespaced_pod(name, namespace, body=dep)
        return api_response

    def delete_pod(self, name="nginx-pod", namespace="default"):
        api_response = self.Api_Instance.delete_namespaced_pod(name, namespace)
        return api_response

    ####################################################################################################################

    def list_service(self):
        api_response = self.Api_Instance.list_service_for_all_namespaces()
        return api_response

    def read_service(self, name="", namespace="default"):
        api_response = self.Api_Instance.read_namespaced_service(name, namespace)
        return api_response

    def create_service(self, file="service-nginx.yaml", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance.create_namespaced_service(namespace, body=dep)
            return api_response

    def replace_service(self, file="pod-nginx.yaml", name="hequan", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance.replace_namespaced_service(name, namespace, body=dep)
        return api_response

    def delete_service(self, name="hequan", namespace="default"):
        api_response = self.Api_Instance.delete_namespaced_service(name, namespace)
        return api_response

    ####################################################################################################################

    def list_ingress(self):
        api_response = self.Api_Instance_Extensions.list_ingress_for_all_namespaces()
        return api_response

    def read_ingress(self, name="", namespace="default"):
        api_response = self.Api_Instance_Extensions.read_namespaced_ingress(name, namespace)
        return api_response

    def create_ingress(self, file="ingress-nginx.yaml", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance_Extensions.create_namespaced_ingress(namespace, body=dep)
            return api_response

    def replace_ingress(self, name="", file="ingress-nginx.yaml", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            api_response = self.Api_Instance_Extensions.replace_namespaced_ingress(name=name, namespace=namespace,
                                                                                   body=dep)
            return api_response

    def delete_ingress(self, name="hequan", namespace="default"):
        api_response = self.Api_Instance_Extensions.delete_namespaced_ingress(name, namespace)
        return api_response

    #####################################################################################################################

    def list_stateful(self):
        api_response = self.k8s_apps_v1.list_stateful_set_for_all_namespaces()
        return api_response

    def read_stateful(self, name="nginx-deployment", namespace="default"):
        api_response = self.k8s_apps_v1.read_namespaced_stateful_set(name, namespace)
        return api_response

    def create_stateful(self, file="deploy-nginx.yaml"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            resp = self.k8s_apps_v1.create_namespaced_stateful_set(
                body=dep, namespace="default")
            return resp

    def replace_stateful(self, file="deploy-nginx.yaml", name="nginx-deployment", namespace="default"):
        with open(path.join(path.dirname(__file__), file)) as f:
            dep = yaml.safe_load(f)
            resp = self.k8s_apps_v1.replace_namespaced_stateful_set(name, namespace,
                                                                    body=dep)
            return resp

    def delete_stateful(self, name="nginx-deployment", namespace="default"):
        api_response = self.k8s_apps_v1.delete_namespaced_stateful_set(name, namespace)
        return api_response

    ####################################################################################################################

if __name__ == '__main__':
    def test():
        obj = K8sApi()
        ret = obj.list_deployment()
        pprint(ret)

    test()

相关内容

热门资讯

黄土塬藏“算力密码”!中国电信... 在陇东黄土塬上,数字浪潮奔涌不息。作为国家 “东数西算” 工程的核心承载地,中国电信庆阳智算产业园正...
科沃斯加入清洁电器行业反虚假测... 【CNMO科技消息】5月8日,科沃斯官方微博宣布,科沃斯加入由中国家用电器协会指导、中国家电网发起成...
东华科技获得发明专利授权:“一... 证券之星消息,根据天眼查APP数据显示东华科技(002140)新获得一项发明专利授权,专利名为“一种...
头部企业扎堆超百场无人机赛事,... 文/陈聆听 2026 年 5 月 4 日,山东诸城,山东天鸟航空负责人丁总与当地多位中小学校长会面,...
重磅签约!华为、中国银联战略合... 快科技5月8日消息,华为和中国银联在深圳正式签约,进一步深化双方的战略合作,还一起发布了自主创新和人...
胡锡进:日本已成中国“头号潜在... 日本高市早苗政府代表了日本政治思维,特别是对华策略最阴暗的一面,这个女人绝对是“政治女巫”级别的。中...
“190元榴莲遭仅退款”商家:... 近日,河南濮阳一冷冻榴莲果肉商家“程大叔”反映,自己遭遇一名买家恶意“仅退款”。为讨回公道,4月28...
新价格项目落地,让医疗真创新更... “让瘫痪的人能动、心衰末期的人能活、万里之外能手术”,这些看似奇迹的变化,背后藏着一个关键支撑—近三...
苹果宣布在印度扩大三项环保举措... 【CNMO科技消息】近日,苹果宣布扩大在印度的环保举措,共推出三个相关项目,覆盖清洁能源、塑料污染治...
华为在数字中国建设峰会:只有根... 作者:王聪彬 “根技术、落地”是华为在第九届数字中国建设峰会想要传递出的两个关键字。 “根技术”顾名...