抱歉,您的浏览器无法访问本站

本页面需要浏览器支持(启用)JavaScript


了解详情 >

公司apollo未开启开放授权平台,无法使用官方推荐的openapi方式获取配置,故封装一层方法,获取指定环境指定app的所有items,通过指定key返回value。

使用方法

  • apopplo_settings中配置地址,登录用户名,密码
  • 调用ApolloClient传入指定key
1
2
apolloinstance = ApolloClient(app_id=, env="")
value = apolloinstance.get_value_by_key(key="")

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# -*- coding: utf-8 -*-
# @Author : ryan
# @Time : 2021/8/24 上午11:30

import requests
from apollo_settings import APOLLO_BASE_URL_LIST, APOLLO_USERNAME, APOLLO_PASSWORD
# apollo_settings文件单独找个文件存不同环境apollo地址和用户名密码

class ApolloSession(object):
"""
请求方法,完成登录操作,提供基本的get,post方法
"""
def __init__(self, env, timeout=60):
self._timeout = timeout
self.base_url = APOLLO_BASE_URL_LIST.get(env)
self._session = self.get_session()

def get_session(self):
"""
完成登录,返回登录后的session
:return:
"""
login_url = "{base_url}/signin".format(base_url=self.base_url)
session = requests.session()
session.post(url=login_url, data={"username": APOLLO_USERNAME, "password": APOLLO_PASSWORD, "login-submit": "登录"})
return session

def _request_get(self, url, params=None):
"""
get请求
:param url:
:param params:
:return:
"""
return self._session.get(url=url, params=params, timeout=self._timeout)

def _request_post(self, url, json_data=None, form_data=None):
"""
post请求,post可能存在data和json两种传参,要区分一下
:param url:
:param json_data:
:param form_data:
:return:
"""
if json_data:
return self._session.post(url=url, json=json_data, timeout=self._timeout)
if form_data:
return self._session.post(url=url, data=form_data, timeout=self._timeout)
return self._session.post(url=url, timeout=self._timeout)


class ApolloClient(ApolloSession):
"""
继承ApolloSession,无需手动登录,可直接调用方法
"""
def __init__(self, app_id, env='sst', clusterName="default", timeout=60):
"""
:param app_id: 所管理的配置AppId
:param env: 所管理的配置环境
:param clusterName: 所管理的配置集群名, 一般情况下传入 default 即可。如果是特殊集群,传入相应集群的名称即可
:param timeout: 接口超时时间
"""
ApolloSession.__init__(self, env=env, timeout=timeout)
self._appid = app_id
self._env = env
self._clusterName = clusterName

def get_value_by_key(self, key, namespaceName='application'):
"""
返回配置value
:param key: 配置对应的key名称
:param namespaceName: 所管理的Namespace的名称,如果是非properties格式,需要加上后缀名,如sample.yml
:return:
"""
__url = "{base_url}/configfiles/json/{app_id}/default/{namespaceName}".format(base_url=self.base_url, app_id=self._appid, namespaceName=namespaceName)
try:
all_items = self._request_get(url=__url).json()
return all_items.get(key)
except BaseException as e:
return e

评论