乐趣区

关于消息队列:Pulsar学习笔记之-Authentication认证机制与插件开发

Pulsar 自带的 Authentication 认证形式有很多种:TLS/Basic/JWT Token/Athenz/Sasl,然而均存在安全性或复杂性的一些问题,且有时候咱们须要和已有的账户零碎做集成,以保持一致的产品体验,此时须要自行开发认证插件。这里介绍一个应用签名机制加强安全性的认证插件开发计划。

Pulsar 认证的扩大机制

Pulsar 认证提供了比拟好的扩大机制,通过实现几个预约义的接口类,就能够不便的插入本人开发的认证实现。这些接口次要包含以下 4 个:

# 客户端包装认证参数
org.apache.pulsar.client.api.Authentication
org.apache.pulsar.client.api.AuthenticationDataProvider

#服务端验证认证参数
org.apache.pulsar.broker.authentication.AuthenticationProvider
org.apache.pulsar.broker.authentication.AuthenticationState

此外默认的多种认证形式的代码能够提供十分丰盛的认证实现参考。

待集成的认证服务

假如咱们曾经有了一个账号零碎,外面存储有账号名(accessKey)、加盐 hash 过的明码(accessSecret)等认证须要的信息;咱们须要再开发一个专用的接口,供咱们实现的 Pulsar 认证插件调用。为防止认证接口被别人滥用,通过 Header 中的 Auth 参数做简略比对校验。

接口参数

  • 接管 POST 参数:accessKey,timeStr,paramStr,signature
  • 接管 Header 参数:Auth

认证大抵过程

  • 对收到的 accessKey 做格局校验
  • 申请账号零碎接口或查询数据库获取 accessSecret
  • 用跟客户端雷同的形式从新计算签名
  • sginStr:accessKey+timeStr+paramStr
  • signature=base64(HmacSHA1.init(accessSecret).doFinal(signStr))
  • 比对签名,失败报错
  • 结构 userRole

认证通过后响应后果

{
    "uid":"123456",
    "userRole": "ur-123456", #命名规定能够自行感觉定
}

Pulsar 认证插件的实现要点

客户端包装认证参数

  • 传入参数:accessKey,accessSecret
  • 生成参数

    • paramStr:method=akauth&client=$UUID&rand=Math.rand()
    • timeStr:String.valueOf(System.currentTimeMillis()/1000)
    • sginStr:accessKey+timeStr+paramStr
  • 签名计算:signature=base64(HmacSHA1.init(accessSecret).doFinal(signStr))
  • 传递参数:accessKey,timeStr,paramStr,signature

服务端转发认证参数

  • 接管参数:accessKey,timeStr,paramStr,signature
  • 申请 认证服务接口

服务端签名存活期校验

  • AuthenticationProviderTabaltAK.authenticate 判断是否过期
// check auth params survival time
long currentTimeSeconds = System.currentTimeMillis() / 1000;
long authTimeSeconds = Long.parseLong(authParams.getTimeStr());
if ((authTimeSeconds + this.akSignatureSurvivalSeconds) < currentTimeSeconds) {throw new AuthenticationException("auth out of survival time");
}
  • TokenAuthenticationState.isExpired 判断是否过期

    • 具体判断逻辑与下面雷同

Pulsar 认证插件的应用配置

Pulsar 服务端配置

服务端通常在 Broker 和 Proxy 的配置文件中配置

# 配置 Broker 的认证插件和参数
authenticationEnabled=true
authenticationProviders=net.tabalt.pulsar.broker.authentication.AuthenticationProviderTabaltAK
tabaltAKServerUrl=http://127.0.0.1/ak  #AK 服务地址
tabaltAKServerAuth=test-auth  #AK 服务接口认证 Token,通过 header 传递给认证服务
tabaltAKSignatureSurvivalSeconds=3600 #签名存活秒数

# 配置 Broker 作为客户端申请其余 Broker 的认证插件和参数,此处须要配置一个超级账号
brokerClientAuthenticationPlugin=net.tabalt.pulsar.client.auth.AuthenticationTabaltAK
brokerClientAuthenticationParameters={"accessKey":"test_access_key","accessSecret":"test_access_secret"}

Pulsar 客户端配置

AuthenticationTabaltAK authAK = new AuthenticationTabaltAK("test_access_key", "test_access_secret");
PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://127.0.0.1:6650")
                .authentication(authAK).build();
Producer<byte[]> producer = client.newProducer().topic("my-topic").create();
...
退出移动版