百家乐必知技巧
大数据

迟来的干货 | Kafka权限管理实战

今年有很多小伙伴在公众号或者微信留言问能不能整一篇有关Kafka权限管理的文章,迫于工作关系,这个需求一直拖后。后来知道我的好友兼Kafka高玩 —— 【一岁小宝 】他一直在弄这一块的东西,所以我就厚着face皮去催他整一篇,也就是你们接下来看到的内容。本文原文地址:https://www.jianshu.com/p/09129c9f4c80。

有关Kafka权限管理的资料可谓是少之又少,当你遇到这块内容的时候,如果靠自?#22909;?#32034;的话会耗费很长的时间。如果现在还没有接触到这一块,也建议收藏本文,以备不时之需。

本片篇幅很大,在编辑本文的时候,右下角字数统计18000字(算上了英文字符,tx的字数统计有点怪异),建议先马后看。

一、概述

1、Kafka的权限分类

1)、身份?#29616;ぃˋuthentication):对client 与服务器的连接进行身份?#29616;ぃ琤rokers和zookeeper之间的连接进行Authentication(producer 和 consumer)、其他 brokers、tools与 brokers 之间连接的?#29616;ぁ?/p>

2)、权限控制(Authorization):实现对于消息级别的权限控制,clients的读写操作进行Authorization?#28023;?#29983;产/消费/group)数据权限。

2、实?#22336;?#24335;

自0.9.0.0版本开始Kafka社区添加了许多功能用于提高Kafka群集的安全性,Kafka提供SSL或者SASL两种安全策略。SSL方式主要是通过CA令牌实现,此文主要介绍SASL方式。

1)SASL验证:

验证方式Kafka版本特点
SASL/PLAIN0.10.0.0不能动态增加?#27809;?/td>
SASL/SCRAM0.10.2.0可以动态增加?#27809;?/td>
SASL/Kerberos0.9.0.0需要独立部署验证服务
SASL/OAUTHBEARER2.0.0需自己实现接口实现token的创建和验证,需要额外Oauth服务

2)SSL加密: 使用SSL加密在代理和客户端之间,代理之间或代理和工具之间传输的数据。

二、使用SASL进行身份验证

1、SASL/PLAIN验证

1.1 版本

相关包版本下载地址
Kafka0.11.0.0http://kafka.apache.org/downloads
Zookeeper3.4.8https://www.apache.org/dyn/closer.cgi/zookeeper/

1.2 Zookeeper配置

1)修改zoo.cfg增加两行配置:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl

2)配置JAAS文件:

Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-sec"
};

说明:定义了需要链接到Zookeeper服务器的?#27809;?#21517;和密码

3)加入需要的包:

kafka-clients-0.10.0.1.jar
lz4-1.3.0.jar
slf4j-api-1.7.21.jar
slf4j-log4j12-1.7.21.jar
snappy-java-1.1.2.6.jar

如果没有引入对应的kafka包,启动kafka时会报找不到org.apache.kafka.common.security.plain.PlainLoginModule包的错误。

Zookeeper引入加载包需改Zookeeper脚本zkEnv.sh,在最后加入:

for i in "$ZOOBINDIR"/../for_sasl/*.jar; do
  CLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf "

将for_sasl目录下的所有jar文件追加到CLASSPATH变量,再设置一个JVM参数给SERVER_JVMFLAGS变量,这两个变量都会在Zookeeper启动时传给JVM。

4)启动Zookeeper

bin/zkServer.sh start

1.3 Kafka服务端配置

1)kafka增加?#29616;?#20449;息:

创建JAAS文件:

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-sec"
user_admin="admin-sec"
user_producer="prod-sec"
user_consumer="cons-sec";
};

user_XXX 为自定义的?#27809;В?#25152;有可以使用的?#27809;?#24517;须在此定义,不能再之后?#30053;觥?/p>

  • producer用于开放生产权限。
  • consumer用于开放消费权限。

JAAS文件定义了链接Kafka Broker时所需要的?#27809;?#21517;密码及broker各个节点之间相互通信的?#27809;?#21517;密码:

  • username/ password:broker之间通信使用的?#27809;?#21517;密码。
  • user_admin/user_producer/user_consumer:客户端(管理员、生产者、消费者)链接broker时所使用到的?#27809;?#21517;密码。

2)配置server.properties

listeners=SASL_PLAINTEXT://主机名称:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true     //当没有找到ACL配置时,允许所有的访问操作。

3)修改启动脚本

exec $base_dir/kafka-run-class.sh 
$EXTRA_ARGS -Djava.security.auth.login.config
=/home/qa/Downloads/kafka_2.12-0.11.0.1/config/kafka_server_jaas.conf kafka.Kafka "[email protected]"

1.4 Kafka客户端端配置

1)创建JAAS文件:消费者:kafka_client_consumer_jaas.conf

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="consumer"
password="cons-sec";
};

生产者:kafka_client_producer_jaas.conf

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="consumer"
password="cons-sec";
};

2)修改客户端配置信息:分别在producer.properties和consumer.properties添加?#29616;?#26426;制

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

consumer.properties中额外加入分组配置

group.id=test-group

3)修改客户端脚本指定JAAS文件加载:

export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/home/kafka_client_jaas.conf"

1.5 授权

此时已经完成了基本的配置,但是如果测试可以发现无论是生产还是消费,都不被允许。最后一步,我们需要为设置的账号授权。

1)创建主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic test

2)增加生产权限:

./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
 --authorizer-properties zookeeper.connect=localhost:2181 
--add --allow-principal User:producer --operation Write --topic test

3)配置消费权限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add
 --allow-principal User:consumer --operation Read --topic test

4)配置消费分组权限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:consumer --operation Read --group test-group

5)生产数据:

./kafka-console-producer-acl.sh --broker-list 127.0.0.1:9092 --topic test

6)消费数据:

bin/kafka-console-consumer-acl.sh --bootstrap-server  127.0.0.1:9092 
--topic test --from-beginning --consumer.config ./config/consumer.properties

我们可以查看配置的权限信息:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer 
--authorizer-properties zookeeper.connect=localhost:2181 --list

Current ACLs for resource Topic:test: User:producer has Allow permission for operations: Write from hosts: * User:consumer has Allow permission for operations: Read from hosts: * Current ACLs for resource Group:test-group: User:consumer has Allow permission for operations: Read from hosts: *

1.6 JAVA客户端

在消费者/生产者初始化属性中引入JAAS文件:

static{
System.setProperty("java.security.auth.login.config","D://demoPeoject//JpaTest//src//main//resources//kafka_client_scram_consumer_jaas.conf");
}
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "SCRAM-SHA-256");

2、SASL/SCRAM验证

上一节,我们通过配置SASL/PLAIN验证,实现了对Kafka的权限控制。但SASL/PLAIN验证有一个问题:只能在JAAS文件KafkaServer?#20449;?#32622;?#27809;В?#19968;但Kafka启动,无法动态?#30053;鲇没А?/p>

SASL/SCRAM验证可以动态?#30053;鲇没?#24182;分配权限。

2.1 版本

同上文

2.2 启动Zookeeper和Kafka

此方法是把凭证(credential)存储在Zookeeper,可以使用kafka-configs.sh在Zookeeper?#20889;?#24314;凭据。对于每个SCRAM机制,必须添加具有机制名称的配置来创建凭证,在启动Kafka broker之前创建代理间通信的凭据。

所以第一步,在没有设置任何权限的配置下启动Kafka和Zookeeper。

2.3 创建SCRAM证书

1)创建broker建通信?#27809;В篴dmin(在使用sasl之前必须先创建,否则启动报错)

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter 
--add-config 'SCRAM-SHA-256=[password=admin-sec],
SCRAM-SHA-512=[password=admin-sec]' --entity-type users --entity-name admin

2)创建生产?#27809;В簆roducer

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter 
--add-config 'SCRAM-SHA-256=[iterations=8192,password=prod-sec],
SCRAM-SHA-512=[password=prod-sec]' --entity-type users --entity-name producer

3)创建消费?#27809;В篶onsumer

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter 
--add-config 'SCRAM-SHA-256=[iterations=8192,password=cons-sec],
SCRAM-SHA-512=[password=cons-sec]' --entity-type users --entity-name consume

SCRAM-SHA-256/SCRAM-SHA-512是对密码加密的算法,二者有其一即可。

2.4 查看SCRAM证书

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name consumer

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name producer

2.5 删除SCRAM证书

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' 
--delete-config 'SCRAM-SHA-256' --entity-type users --entity-name producer

2.6 服务端配置

在?#27809;?#35777;书创建完毕之后开始Kafka服务端的配置:

1)创建JAAS文件:

KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-sec";
};

2)将JAAS配置文件位置作为JVM参数传递给每个Kafka Broker

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/home/qa/Downloads/kafka_2.12-0.11.0.1/config/kafka_server_jaas.conf kafka.Kafka "[email protected]"

3)配置server.properties:

?#29616;?#37197;置
listeners=SASL_PLAINTEXT://主机名称:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
ACL配置
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

可以根据自己的需求选择SASL_SSL或SASL_PLAINTEXT, PLAINTEXT为不加密明文传输,性能好与SSL

4)重启Kafka和Zookeeper

2.7 客户端配置

1)为我们创建的三个?#27809;?#20998;别创建三个JAAS文件:

KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-sec";
};


KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="consumer"
password="cons-sec";
};


KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="producer"
password="prod-sec";
};

2)修改启动脚本引入JAAS文件:

以生产者为例:

exec $(dirname $0)/kafka-run-class.sh 
-Djava.security.auth.login.config=/home/qa/Downloads/kafka_2.12-0.11.0.1/config/kafka_client_scram_producer_jaas.conf

3)ACL权限配置:此时如果我们生产数据则会发生如下错误:

上文2.3节中我们创建了三个?#27809;В?#20294;是还未?#20113;?#36171;予操作权限,接下来我们为其增加权限。

生产者:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add
 --allow-principal User:producer --operation Write --topic test --allow-host 192.168.2.*

消费者:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer 
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:consumer--operation Read --topic test --allow-host 192.168.2.*

为生产者增加分组权限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:producer --operation Read --group test-group --allow-host 192.168.2.*

分配权限之后就可以进行生产,消费操作了。(其他颗粒度的权限设?#20204;?#21442;考官方文档)

4)查看权限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer 
--authorizer-properties zookeeper.connect=localhost:2181 --list

2.8 小结

SASL/SCRAM验证方法可以在Kafka服务启动之后,动态的?#30053;鲇没?#20998;并配权限,在业务变动频繁,开发人员多的情况下比SASL/PLAIN方法更加灵活。

3. SASL/OAUTHBEARER

SASL/OAUTHBEARER验证是Kafka2.0版?#34892;略?#30340;验证方式:

KIP-255 adds a framework for authenticating to Kafka brokers using OAuth2 bearer tokens. The SASL/ OAuthBurer implementation is customizable using callbacks for token retrieval and validation.

KIP-255增加了一个使用OAuth2?#24615;?#20196;牌对KafkaBroker进行?#29616;?#30340;框架。SASL/OAuthBurer实现可使?#27809;?#35843;进行令牌检索和验证。

3.1 Oauth2.0验证

OAuth(开放授权)是一个开放标准,允许?#27809;?#25480;权第三方移动应用访问他们存储在另外的服务提供者上的信息,而不需要将?#27809;?#21517;和密码提供给第三方移动应用或分享他们数据的所有内容,OAuth2.0是OAuth协议的?#26377;?#29256;本。

Oauth2.0的工作流程如下图:

(A)?#27809;?#25171;开客户端?#38498;螅?#23458;户端要求?#27809;?#32473;予授权。

(B)?#27809;?#21516;意给予客户端授权。

(C)客户端使用上一步获得的授权,向?#29616;?#26381;务器申请令牌。

(D)?#29616;?#26381;务器对客户端进行?#29616;ひ院螅?#30830;认无误,同意发放令牌。

(E)客户端使用令牌,向资源服务器申请获取资源。

(F)资源服务器确认令牌无误,同意向客户端开放资源。

3.2 JWT规范

Oauth2.0返回验证令牌使用了JWT格式:JSON Web Token(缩写 JWT)是目前最流行的跨域?#29616;?#35299;决方案,JWT 的原理是,服务器?#29616;ひ院螅?#29983;成一个 JSON 对象,发回给?#27809;А?/p>

JWT由三个部分组成,分别是:

  • Header(头部)
  • Claims(载荷)
  • Signature(签名)

1)载荷

{
"sub": "1",
"iss": "http://localhost:8000/auth/login",
"iat": 1451888119,
"exp": 1454516119,
"jti": "37c107e4609ddbcc9c096ea5ee76c667"
"nbf": 1451888119
}

sub:该JWT所面向的?#27809;?iss:该JWT的签发者 iat(issued at):在什?#35789;?#20505;签发的token exp(expires):token什?#35789;?#20505;过期 nbf(not before):token在此时间之?#23433;?#33021;被接收处理 jti:JWT ID为web token提供唯一标识

2)头部

{
"typ": "JWT",
"alg": "HS256"
}

typ:指明数据格式 alg:指明加密算法

3)签名 以上两个部分base64编码之后链接成一个字符串,用HS256算法进行加密。在加密的时候,我们还需要提供一个密钥(secret):

HMACSHA256(
base64UrlEncode(header) + "." +
base64UrlEncode(payload),
secret
)

算出签名?#38498;螅?#25226; Header、Payload、Signature 三个部分用”点”(.)分割拼成一个字符串,就是最后的结果。

完整JWT = 载荷 . 头部 . 签名

3.3 版本

相关包版本下载地址
Kafka2.12-2.2.0http://kafka.apache.org/downloads
Zookeeper3.4.8https://www.apache.org/dyn/closer.cgi/zookeeper/

Kafka2.0以上版本才支持此种验证

3.4 默认验证方式(非安全验证)

1)服务端配置:

(A)创建用于broker端通信的JAAS文件:

KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
unsecuredLoginStringClaim_sub="thePrincipalName"
unsecuredLoginListClaim_scope=",KAFKA_BROKER,LOGIN_TO_KAFKA"
unsecuredValidatorRequiredScope="LOGIN_TO_KAFKA"
unsecuredValidatorAllowableClockSkewMs="3000";
};

参数说明:

选项说明
unsecuredValidatorPrincipalClaimName=”value”默认为“sub”,如果需要修改在此定义
unsecuredValidatorScopeClaimName=”value”数?#36947;?#22411;
unsecuredValidatorRequiredScope=”value”scope范围值,可配置字符串或者List
unsecuredValidatorAllowableClockSkewMs=”value”允许的时间偏差。单位?#22909;耄?#40664;认0

举例:

KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
unsecuredLoginStringClaim_sub="admin";
};

admin?#27809;?#20316;为sub的主题?#27809;?#29992;于kafka的broker之间通讯。

(B)在启动jvm参数中加入该文件:

-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf 

(C)配置server.properties

listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
sasl.enabled.mechanisms=OAUTHBEARER

2)客户端配置:

(A)配置producer.properties/consumer.properties

security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
sasl.mechanism=OAUTHBEARER

(B)创建客户端通信的JAAS文件(或配置文件?#24615;?#21152;配置项):

sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
unsecuredLoginStringClaim_sub="alice";

JAAS文件样本:

KafkaClient {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
unsecuredLoginStringClaim_sub="thePrincipalName"
unsecuredLoginListClaim_scope="|scopeValue1|scopeValue2"
unsecuredLoginLifetimeSeconds="60";
};

参数说明:

选项说明
unsecuredLoginStringClaim_<claimname>=”value”创建一个值为“value”的claim(不能为iat/exp)
unsecuredLoginNumberClaim_<claimname>=”value”数?#36947;?#22411;
unsecuredLoginListClaim_<claimname>=”value”List类型
unsecuredLoginPrincipalClaimName默认为“sub”,如果需要修改在此定义
unsecuredLoginLifetimeSeconds超时时间单位?#22909;耄?#40664;认3600
unsecuredLoginScopeClaimName默认为“scope”,如果需要修改在此定义

优先级?#21495;?#32622;文件 > JAAS文件

(C)验证过程:

在3.4中我们列出了Kafka提供的一?#36861;?#23433;全的验证用于非正式环境,通过创建和验证JWT实现验证。只要配置对应JAAS文件即可,本节我们来看一下验证原理。

通过阅读官方文档了解到Kafka提供了一个接口,通过实现接口,创建OAuthBearer token和验证OAuthBearer token。

此类一共有两个方法,查看此接口的继承关系:

通过名字不难猜到这两个就是Kafka提供的默认的非安全的验证类

OAuthBearerUnsecuredValidatorCallbackHandler
OAuthBearerUnsecuredLoginCallbackHandler

先看OAuthBearerUnsecuredLoginCallbackHandler的configure方法:在saslMechanism是OAUTHBEARER并且jaasConfigEntries存在的情况下为moduleOptions赋值。moduleOptions保存的是JAAS里面的各种配置项。

handleCallback方法:取出配置的选项值,并组成claimsJson和headerJson,封装成OAuthBearerUnsecuredJws对象,把值赋给OAuthBearerTokenCallback。

接下来看OAuthBearerUnsecuredValidatorCallbackHandler:

在handleCallback里对callback里面的值进行校验,如果校验通过则返回OAuthBearerUnsecuredJws,验证成功,否则抛出异常。

3.5 安全验证

Kafka官方文档中说明:

Production use cases will require writing an implementation of org.apache.kafka.common.security.auth.AuthenticateCallbackHandler that can handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback and declaring it via either the sasl.login.callback.handler.class configuration option for a non-broker client or via the listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class configuration option for brokers (when SASL/OAUTHBEARER is the inter-broker protocol). Production use cases will also require writing an implementation of org.apache.kafka.common.security.auth.AuthenticateCallbackHandler that can handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback and declaring it via the listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class broker configuration option.

分别需要编写两个实现类处理OAuthBearerTokenCallback并分别在:

listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class
listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class或者sasl.login.callback.handler.class

3.6 配置

上一节我们讨论了Kafka默认提供的两个实现类,分别实现了AuthenticateCallbackHandler接口,并验证了基于JWT格式的token。受此启发我们可以编写自己的实现类。

1)服务端配置

(A)创建JAAS文件:

KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
LoginStringClaim_sub="admin";
};

(B)在启动jvm参数中加入该文件:

-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf

(C)配置server.properties:

listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
sasl.enabled.mechanisms=OAUTHBEARER

2)实现接口:此处只给出思路,具体验证过程根据业务需求定制,我们定义两个类:

Oauth2AuthenticateLoginCallbackHandler
Oauth2AuthenticateValidatorCallbackHandler
public class Oauth2AuthenticateValidatorCallbackHandler implements AuthenticateCallbackHandler {

 ... 此处省略无关代码...

    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        if (!isConfigured())
            throw new IllegalStateException("Callback handler not configured");
        for (Callback callback : callbacks) {
            if (callback instanceof OAuthBearerValidatorCallback)
                try {
                    OAuthBearerValidatorCallback validationCallback = (OAuthBearerValidatorCallback) callback;
                    handleCallback(validationCallback);
                } catch (KafkaException e) {
                    throw new IOException(e.getMessage(), e);
                }
            else
                throw new UnsupportedCallbackException(callback);
        }
    }

    private void handleCallback(OAuthBearerValidatorCallback callback) {
        String accessToken = callback.tokenValue();
        if (accessToken == null)
            throw new IllegalArgumentException("Callback missing required token value");

        log.info("Trying to introspect Token!");
        OauthBearerTokenJwt token = OauthHttpClient.introspectBearer(accessToken);
        log.info("Trying to introspected");

        // Implement Check Expire Token..
        long now = time.milliseconds();
        if (now > token.expirationTime()) {
            OAuthBearerValidationResult.newFailure("Expired Token, needs refresh!");
        }

        log.info("Validated! token..");
        callback.token(token);
    }
}

public class Oauth2AuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {

 ... 此处省略无关代码...

    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        if (!isConfigured())
            throw new IllegalStateException("Callback handler not configured");
        for (Callback callback : callbacks) {
            if (callback instanceof OAuthBearerTokenCallback)
                try {
                    handleCallback((OAuthBearerTokenCallback) callback);
                } catch (KafkaException e) {
                    throw new IOException(e.getMessage(), e);
                }
            else
                throw new UnsupportedCallbackException(callback);
        }
    }

    private void handleCallback(OAuthBearerTokenCallback callback) {
        if (callback.token() != null)
            throw new IllegalArgumentException("Callback had a toke" +
                    "n already");

        log.info("Try to acquire token!");
        OauthBearerTokenJwt token = OauthHttpClient.login(null);
        log.info("Retrieved token..");
        if (token == null) {
            throw new IllegalArgumentException("Null token returned from server");
        }
        callback.token(token);
    }
}
OauthBearerTokenJwt token = OauthHttpClient.login(null);
OauthBearerTokenJwt token = OauthHttpClient.introspectBearer(accessToken);

区别在于这两个方法:

  • login方法主要是客户端用自己的信息(可是是?#27809;?#21517;/密码或者token)创建http或者https请求去Oauth服务器申请token,并封装OauthBearerTokenJwt返回。
  • introspectBearer方法利用自己的accessToken去Oauth服务器做验证(查询数据库,验证失效时间等等),验证成功后同样返回OauthBearerTokenJwt。

本人用springboot搭建一个简易的后端用来模拟Oauth服务。

小结

本节介绍了自Kafka2.0版本?#30053;?#30340;SASL/OAUTHBEARER验证以及相关Oauth和JWT技术。分别介绍了Kafka默认的非安全验证方法和正式环境的验证实?#22336;?#27861;。

SASL/OAUTHBEARER可以加密传输验证信息,自定义实现类处理创建/验证token。在此过程中可以对接数据库,便于持久化?#27809;?#26435;限信息。

尚未解决的问题:权限细粒度不知如何控制,对生产数据,消费数据,分组信息的控制暂时没有找到方法。

?#19968;?#27809;有学会写个人说明!

mysql5.7 General tablespace使用说明

上一篇

华为:一个数通老兵的择决

下一篇

你也可能?#19981;?/h4>

迟来的干货 | Kafka权限管理实战

长按储存图像,分享给朋友

ITPUB 每周精要将以?#22987;?#30340;?#38382;?#21457;放至您的邮箱


微信扫一扫

微信扫一扫
百家乐必知技巧 云南快乐十分前3直预测 江西快三最基本走势图 彩之星大发快三 辽宁快乐l2走势图奖 快乐赛计划下载 j2赛马直击 山东十一选五走势图表爱彩乐彩乐 重庆时时杀2码100准 今天广西快乐十分开奖结果查询 欧足联联赛排名系统