first commit

This commit is contained in:
2021-11-30 21:24:49 +08:00
commit 08da50a0b7
13 changed files with 1433 additions and 0 deletions

View File

@ -0,0 +1,460 @@
<?php
declare(strict_types=1);
namespace StarPoolCloud\Handles\Apollo;
use Dotenv\Dotenv;
use Exception;
use GuzzleHttp\Promise\PromiseInterface;
use GuzzleHttp\Client as GuzzleHttpClient;
use Psr\Http\Message\ResponseInterface;
use GuzzleHttp\Exception\RequestException;
class Client
{
protected mixed $configServerUrl = '';
protected string $clientIp = '';
protected string $appId = '';
protected string $namespaceName = '';
protected string $clusterName = '';
protected string $secret = '';
protected array $httpInfo;
protected array $errorInfo;
protected array $onRejectedTimeList;
protected GuzzleHttpClient $guzzleHttpClient;
protected PromiseInterface $promise;
protected bool $isMultiGet = false;//是否批量获取配置
protected bool $promiseWait = true;//是否启用promise wait
protected mixed $asyncGetResult = null;//通过回调函数异步获取返回结果
protected bool $setHttpInfo = true;//附带http信息
protected bool $setErrorInfo = true;//附带错误信息
/**
* 创建配置
* @throws Exception
*/
public function __construct(array $config)
{
foreach ($config as $key => $item) {
if (blank($item)) {
throw new Exception("必须配置$key", -1);
}
}
$this->configServerUrl = $config['server_addr'];
$this->clientIp = $config['client_ip'];
$this->clusterName = $config['cluster'];
$this->namespaceName = $config['namespace'];
$this->secret = $config['secret'];
$this->appId = $config['appid'];
$this->guzzleHttpClient = new GuzzleHttpClient(['http_errors' => false]);
}
/**
* 从阿波罗服务器读取配置
* @param bool $useCacheApi 是否通过带缓存的Http接口从Apollo读取配置设置为false可以使用不带缓存的Http接口从Apollo读取配置
* @param string $releaseKey 上一次的releaseKey
* @return array
*/
public function getConfig(bool $useCacheApi = false, string $releaseKey = ''): array
{
$this->isMultiGet = false;
$res = $this->multiGetConfig([$this->appId => [$this->namespaceName => $releaseKey]], $useCacheApi);
//当前应用指定namespace的http请求信息
if (isset($this->httpInfo[$this->appId][$this->namespaceName])) {
$this->httpInfo = $this->httpInfo[$this->appId][$this->namespaceName];
} else {
$this->httpInfo = [];
}
//当前应用指定namespace的错误信息
if (isset($this->errorInfo[$this->appId][$this->namespaceName])) {
$this->errorInfo = $this->errorInfo[$this->appId][$this->namespaceName];
} else {
$this->errorInfo = [];
}
if (!empty($this->errorInfo)) {
return [];
}
return $res[$this->appId][$this->namespaceName] ?? [];
}
/**
* 更新.env文件
* @throws Exception
*/
public function updateEnv()
{
$config = $this->getConfig();
if (blank($config)) {
throw new Exception('未获取Apollo配置', -1);
}
$envFile = base_path(app()->environmentFile());
$envFileContent = file_get_contents($envFile);
if ($envFileContent === false) {
throw new Exception('无法读取.env文件', -1);
}
$envArr = Dotenv::parse($envFileContent);
$toArray = collect($envArr)->merge($config)->sortKeys()->toArray();
$newEnvFileContent = '';
$group = [];
$i = 0;
foreach ($toArray as $name => $value) {
$prefix = '';
$explode = explode('_', $name);
$group[$i] = $explode[0];
if ($i > 0 && $group[$i - 1] !== $group[$i]) {
$prefix = "\n";
}
$newEnvFileContent .= sprintf("%s%s=%s%s", $prefix,$name, $value, "\n");
$i++;
}
file_put_contents($envFile, $newEnvFileContent);
}
/**
* 批量读取配置
* @param array $appNamespaceData 应用id及其Namespace列表信息格式例子
* Array(
* 'app_id_1' => [
* 'application' => '',
* 'FX.apollo' => ''
* ],
* 'app_id_2' => [
* 'application' => ''
* ]
* )
* @param bool $useCacheApi 是否通过带缓存的Http接口从Apollo读取配置设置为false可以使用不带缓存的Http接口从Apollo读取配置
* @return array
*/
public function multiGetConfig(array $appNamespaceData, bool $useCacheApi = true): array
{
$this->httpInfo = [];
$res = [];
if (empty($appNamespaceData)) {
return $res;
}
$isMultiGet = $this->isMultiGet;//是否批量获取配置
$asyncGetResult = $this->asyncGetResult;//通过回调函数异步获取返回结果
$setHttpInfo = $this->setHttpInfo;//附带http信息
$setErrorInfo = $this->setHttpInfo;//附带错误信息
foreach ($appNamespaceData as $appId => &$namespaceData) {
foreach ($namespaceData as $namespaceName => &$releaseKey) {
//带缓存接口置空releaseKey
$useCacheApi === true && $releaseKey = '';
//初始化返回结果
!isset($res[$appId][$namespaceName]) && $res[$appId][$namespaceName] = [];
$this->promise = $this->requestAsync(
$this->buildGetConfigRequestUrl($appId, $namespaceName, $useCacheApi, $releaseKey),
$appId,
['timeout' => 10]//默认10秒超时
);
$this->promise->then(
function (ResponseInterface $response) use (
&$res,
$appId,
$namespaceName,
$useCacheApi,
$asyncGetResult,
$isMultiGet,
$setHttpInfo
) {
$responseCode = $response->getStatusCode();
$responseBody = (string)$response->getBody();
if ($setHttpInfo === true) {
$this->httpInfo[$appId][$namespaceName] = [
'response_code' => $responseCode,
'response_body' => $responseBody
];
}
switch ($responseCode) {
case 200:
$responseBody = json_decode($responseBody, true);
empty($responseBody) && $responseBody = [];
break;
case 304:
$responseBody = [];
break;
default:
$responseBody = false;
}
if ($useCacheApi === false) {//不带缓存的接口配置项在configurations里面
$res[$appId][$namespaceName] = $responseBody['configurations'];
} else {//带缓存的接口responseBody就是配置项
$res[$appId][$namespaceName] = $responseBody;
}
//把结果通过$asyncGetResult回调函数交给上层
if (is_callable($asyncGetResult)) {
call_user_func($asyncGetResult, $isMultiGet === true ? $res : $res[$appId][$namespaceName]);
}
},
function (RequestException $exception) use (&$res, $appId, $namespaceName, $setErrorInfo) {
if ($setErrorInfo === true) {
$this->errorInfo[$appId][$namespaceName] = [
'code' => $exception->getCode(),
'message' => $exception->getMessage()
];
}
$res[$appId][$namespaceName] = false;//存在异常则设置结果为false
}
);
}
}
$this->promiseWait === true && $this->promiseWait();
return $res;
}
/**
* 多个应用感知配置更新
* @param array $appNotificationsData 应用id及notifications信息格式例子
* Array(
* 'app_id_1' => [
* 'application' => 100,
* 'FX.apollo' => 200
* ],
* 'app_id_2' => [
* 'application' => 100
* ]
* )
* @param mixed|null $onConfigUpdate 当存在配置更新时触发的回调函数
* @param null $onResponse
* @return void
*/
public function listenMultiAppConfigUpdate(array $appNotificationsData, mixed $onConfigUpdate = null, $onResponse = null): void
{
if (empty($appNotificationsData)) {
return;
}
$this->httpInfo = [];
//以下是执行流程
//发起http长轮询监听指定应用的配置更新请求会被服务器hold住
//如果被监听namespace发生配置变更服务器会立刻响应当前请求返回新的notificationId
//本地拿到新的notificationId更新本地的映射表然后再次发起http长轮询监听指定应用的配置更新
$loopForConfigUpdate = function ($appId, $namespaceNotificationMapping) use (
&$onConfigUpdate, &$loopForConfigUpdate, &$onResponse
) {
//生成notifications
$notifications = [];
foreach ($namespaceNotificationMapping as $namespaceName => $notificationId) {
$notifications[] = ['namespaceName' => $namespaceName, 'notificationId' => $notificationId];
}
$this->promise = $this->requestAsync(
$this->buildAwareConfigUpdateUrl($appId, $notifications), $appId, ['timeout' => 63]
);
unset($notifications);
$this->promise->then(
function (ResponseInterface $response) use (
$appId, &$loopForConfigUpdate, &$namespaceNotificationMapping, &$onConfigUpdate, &$onResponse
) {
//触发响应函数
if (is_callable($onResponse)) {
call_user_func_array($onResponse, [$appId, $response]);
}
$responseCode = $response->getStatusCode();
if ($responseCode === 200) {
$body = $response->getBody();
$body = json_decode($body, true);
if (!empty($body) && is_array($body)) {
foreach ($body as &$value) {
if (
!isset($value['namespaceName']) ||
!isset($value['notificationId'])
) {
continue;
}
$namespaceName = &$value['namespaceName'];
$notificationId = &$value['notificationId'];
if (
isset($namespaceNotificationMapping[$namespaceName]) &&
$namespaceNotificationMapping[$namespaceName] != $notificationId
) {//配置发生变更了
//更新映射表
$namespaceNotificationMapping[$namespaceName] = $notificationId;
//触发配置变更回调函数
if (is_callable($onConfigUpdate)) {
$this->promiseWait = false;//关闭getConfig方法内的promise wait
$this->setHttpInfo = false;//关闭getConfig方法内的保存http信息的逻辑
$this->setErrorInfo = false;//关闭getConfig方法内的保存错误信息的逻辑
//由于接管了getConfig方法的promise wait通过回调函数获取返回结果
$this->asyncGetResult = function ($newConfig) use (
$appId,
$namespaceName,
&$onConfigUpdate,
$notificationId,
&$namespaceNotificationMapping
) {
if ($newConfig !== false) {
call_user_func_array(
$onConfigUpdate,
[
$appId,
$namespaceName,
$newConfig,
$notificationId,
&$namespaceNotificationMapping
]
);
}
};
//以下方法返回结果为空数组
$this->getConfig(false);
}
}
}
}
}
//再次发起http长轮询监听指定应用的配置更新
$loopForConfigUpdate($appId, $namespaceNotificationMapping);
},
function (RequestException $exception) use ($appId, &$loopForConfigUpdate, &$namespaceNotificationMapping) {//偶尔有些超时请求会从此处产生
//防止因为阿波罗服务器异常而导致进入无限死循环
$nowTime = time();
$errorTimeLimit = 5;
if (!empty($this->onRejectedTimeList[$appId])) {
if (
count($this->onRejectedTimeList[$appId]) === $errorTimeLimit &&
count(array_unique($this->onRejectedTimeList[$appId])) <= 2
) {//瞬间产生过多错误退出event loop
die('错误码:' . $exception->getCode() . ',错误信息:' . $exception->getMessage() . PHP_EOL);
}
}
$this->onRejectedTimeList[$appId][] = $nowTime;
if (count($this->onRejectedTimeList[$appId]) > $errorTimeLimit) {
array_shift($this->onRejectedTimeList[$appId]);
}
//再次发起http长轮询监听指定应用的配置更新
$loopForConfigUpdate($appId, $namespaceNotificationMapping);
}
);
};
foreach ($appNotificationsData as $appId => $namespaceNotificationMapping) {
if (empty($namespaceNotificationMapping)) {
continue;
}
$loopForConfigUpdate($appId, $namespaceNotificationMapping);
}
$this->promiseWait();
}
/**
* 获取http请求信息
* @return array
*/
public function getHttpInfo(): array
{
return $this->httpInfo;
}
/**
* 获取错误信息
* @return array
*/
public function getErrorInfo(): array
{
return $this->errorInfo;
}
/**
* 构建用于请求的阿波罗接口链接
* @param string $appId 应用的appId
* @param string $namespaceName Namespace的名字
* @param bool $useCacheApi 是否通过带缓存的Http接口从Apollo读取配置
* @param string $releaseKey 上一次的releaseKey
* @return string
*/
private function buildGetConfigRequestUrl(string $appId, string $namespaceName, bool $useCacheApi = true, string $releaseKey = ''): string
{
if (empty($appId) || empty($namespaceName)) {
return '';
}
if ($useCacheApi === true) {
$url = "$this->configServerUrl/configfiles/json/$appId/$this->clusterName/$namespaceName";
} else {
$url = "$this->configServerUrl/configs/$appId/$this->clusterName/$namespaceName";
}
$params = [];
if (!empty($this->clientIp)) {
$params['ip'] = $this->clientIp;
}
if (!empty($releaseKey)) {
$params['releaseKey'] = $releaseKey;
}
if (!empty($params)) {
$url .= '?' . http_build_query($params);
}
return $url;
}
/**
* 构建用于请求的阿波罗接口链接
* @param string $appId 应用的appId
* @param array $notifications notifications信息格式为二维数组格式例子
* Array(
* ['namespaceName' => 'application', 'notificationId' => 100],
* ['namespaceName' => 'FX.apollo', 'notificationId' => 200]
* )
* @return string
*/
private function buildAwareConfigUpdateUrl(string $appId, array $notifications = []): string
{
if (empty($appId) || empty($notifications)) {
return '';
}
$notifications = urlencode(json_encode($notifications));
return "$this->configServerUrl/notifications/v2?appId=$appId&cluster=$this->clusterName&notifications=$notifications";
}
/**
* 发起异步请求
* @param string $url 请求链接
* @param string $appId 应用的appId
* @param array $options 请求配置参考guzzlehttp文档
* @return PromiseInterface
*/
private function requestAsync(string $url, string $appId = '', array $options = []): PromiseInterface
{
if (
!empty($this->secret) &&
!empty($appId)
) {//追加访问密钥
$timestamp = time() * 1000;
$urlInfo = parse_url($url);
if (!empty($urlInfo['path'])) {
$pathWithQuery = $urlInfo['path'];
if (!empty($urlInfo['query'])) {
$pathWithQuery .= '?' . $urlInfo['query'];
}
$options['headers'][Signature::HTTP_HEADER_AUTHORIZATION] = Signature::getAuthorizationString(
$appId, $timestamp, $pathWithQuery, $this->secret
);
$options['headers'][Signature::HTTP_HEADER_TIMESTAMP] = $timestamp;
}
unset($urlInfo);
}
return $this->guzzleHttpClient->requestAsync('GET', $url, $options);
}
/**
* 发起异步请求
* @return void
*/
private function promiseWait(): void
{
if (!is_null($this->promise)) {
try {
$this->promise->wait();
} catch (Exception) {
//屏蔽promise wait的错误因为错误信息已经在promise then的onRejected回调函数中返回
}
}
}
}