百家乐必知技巧
大數據

遲來的干貨 | Kafka權限管理實戰

廣告
廣告

今年有很多小伙伴在公眾號或者微信留言問能不能整一篇有關Kafka權限管理的文章,迫于工作關系,這個需求一直拖后。后來知道我的好友兼Kafka高玩 —— 【一歲小寶 】他一直在弄這一塊的東西,所以我就厚著face皮去催他整一篇,也就是你們接下來看到的內容。本文原文地址:https://www.jianshu.com/p/09129c9f4c80。

有關Kafka權限管理的資料可謂是少之又少,當你遇到這塊內容的時候,如果靠自己摸索的話會耗費很長的時間。如果現在還沒有接觸到這一塊,也建議收藏本文,以備不時之需。

本片篇幅很大,在編輯本文的時候,右下角字數統計18000字(算上了英文字符,tx的字數統計有點怪異),建議先馬后看。

一、概述

1、Kafka的權限分類

1)、身份認證(Authentication):對client 與服務器的連接進行身份認證,brokers和zookeeper之間的連接進行Authentication(producer 和 consumer)、其他 brokers、tools與 brokers 之間連接的認證。

2)、權限控制(Authorization):實現對于消息級別的權限控制,clients的讀寫操作進行Authorization:(生產/消費/group)數據權限。

2、實現方式

自0.9.0.0版本開始Kafka社區添加了許多功能用于提高Kafka群集的安全性,Kafka提供SSL或者SASL兩種安全策略。SSL方式主要是通過CA令牌實現,此文主要介紹SASL方式。

1)SASL驗證:

驗證方式Kafka版本特點
SASL/PLAIN0.10.0.0不能動態增加用戶
SASL/SCRAM0.10.2.0可以動態增加用戶
SASL/Kerberos0.9.0.0需要獨立部署驗證服務
SASL/OAUTHBEARER2.0.0需自己實現接口實現token的創建和驗證,需要額外Oauth服務

2)SSL加密: 使用SSL加密在代理和客戶端之間,代理之間或代理和工具之間傳輸的數據。

二、使用SASL進行身份驗證

1、SASL/PLAIN驗證

1.1 版本

相關包版本下載地址
Kafka0.11.0.0http://kafka.apache.org/downloads
Zookeeper3.4.8https://www.apache.org/dyn/closer.cgi/zookeeper/

1.2 Zookeeper配置

1)修改zoo.cfg增加兩行配置:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl

2)配置JAAS文件:

Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-sec"
};

說明:定義了需要鏈接到Zookeeper服務器的用戶名和密碼

3)加入需要的包:

kafka-clients-0.10.0.1.jar
lz4-1.3.0.jar
slf4j-api-1.7.21.jar
slf4j-log4j12-1.7.21.jar
snappy-java-1.1.2.6.jar

如果沒有引入對應的kafka包,啟動kafka時會報找不到org.apache.kafka.common.security.plain.PlainLoginModule包的錯誤。

Zookeeper引入加載包需改Zookeeper腳本zkEnv.sh,在最后加入:

for i in "$ZOOBINDIR"/../for_sasl/*.jar; do
  CLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf "

將for_sasl目錄下的所有jar文件追加到CLASSPATH變量,再設置一個JVM參數給SERVER_JVMFLAGS變量,這兩個變量都會在Zookeeper啟動時傳給JVM。

4)啟動Zookeeper

bin/zkServer.sh start

1.3 Kafka服務端配置

1)kafka增加認證信息:

創建JAAS文件:

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-sec"
user_admin="admin-sec"
user_producer="prod-sec"
user_consumer="cons-sec";
};

user_XXX 為自定義的用戶,所有可以使用的用戶必須在此定義,不能再之后新增。

  • producer用于開放生產權限。
  • consumer用于開放消費權限。

JAAS文件定義了鏈接Kafka Broker時所需要的用戶名密碼及broker各個節點之間相互通信的用戶名密碼:

  • username/ password:broker之間通信使用的用戶名密碼。
  • user_admin/user_producer/user_consumer:客戶端(管理員、生產者、消費者)鏈接broker時所使用到的用戶名密碼。

2)配置server.properties

listeners=SASL_PLAINTEXT://主機名稱:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true     //當沒有找到ACL配置時,允許所有的訪問操作。

3)修改啟動腳本

exec $base_dir/kafka-run-class.sh 
$EXTRA_ARGS -Djava.security.auth.login.config
=/home/qa/Downloads/kafka_2.12-0.11.0.1/config/kafka_server_jaas.conf kafka.Kafka "[email protected]"

1.4 Kafka客戶端端配置

1)創建JAAS文件:消費者:kafka_client_consumer_jaas.conf

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="consumer"
password="cons-sec";
};

生產者:kafka_client_producer_jaas.conf

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="consumer"
password="cons-sec";
};

2)修改客戶端配置信息:分別在producer.properties和consumer.properties添加認證機制

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

consumer.properties中額外加入分組配置

group.id=test-group

3)修改客戶端腳本指定JAAS文件加載:

export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/home/kafka_client_jaas.conf"

1.5 授權

此時已經完成了基本的配置,但是如果測試可以發現無論是生產還是消費,都不被允許。最后一步,我們需要為設置的賬號授權。

1)創建主題:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic test

2)增加生產權限:

./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
 --authorizer-properties zookeeper.connect=localhost:2181 
--add --allow-principal User:producer --operation Write --topic test

3)配置消費權限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add
 --allow-principal User:consumer --operation Read --topic test

4)配置消費分組權限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:consumer --operation Read --group test-group

5)生產數據:

./kafka-console-producer-acl.sh --broker-list 127.0.0.1:9092 --topic test

6)消費數據:

bin/kafka-console-consumer-acl.sh --bootstrap-server  127.0.0.1:9092 
--topic test --from-beginning --consumer.config ./config/consumer.properties

我們可以查看配置的權限信息:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer 
--authorizer-properties zookeeper.connect=localhost:2181 --list

Current ACLs for resource Topic:test: User:producer has Allow permission for operations: Write from hosts: * User:consumer has Allow permission for operations: Read from hosts: * Current ACLs for resource Group:test-group: User:consumer has Allow permission for operations: Read from hosts: *

1.6 JAVA客戶端

在消費者/生產者初始化屬性中引入JAAS文件:

static{
System.setProperty("java.security.auth.login.config","D://demoPeoject//JpaTest//src//main//resources//kafka_client_scram_consumer_jaas.conf");
}
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "SCRAM-SHA-256");

2、SASL/SCRAM驗證

上一節,我們通過配置SASL/PLAIN驗證,實現了對Kafka的權限控制。但SASL/PLAIN驗證有一個問題:只能在JAAS文件KafkaServer中配置用戶,一但Kafka啟動,無法動態新增用戶。

SASL/SCRAM驗證可以動態新增用戶并分配權限。

2.1 版本

同上文

2.2 啟動Zookeeper和Kafka

此方法是把憑證(credential)存儲在Zookeeper,可以使用kafka-configs.sh在Zookeeper中創建憑據。對于每個SCRAM機制,必須添加具有機制名稱的配置來創建憑證,在啟動Kafka broker之前創建代理間通信的憑據。

所以第一步,在沒有設置任何權限的配置下啟動Kafka和Zookeeper。

2.3 創建SCRAM證書

1)創建broker建通信用戶:admin(在使用sasl之前必須先創建,否則啟動報錯)

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter 
--add-config 'SCRAM-SHA-256=[password=admin-sec],
SCRAM-SHA-512=[password=admin-sec]' --entity-type users --entity-name admin

2)創建生產用戶:producer

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter 
--add-config 'SCRAM-SHA-256=[iterations=8192,password=prod-sec],
SCRAM-SHA-512=[password=prod-sec]' --entity-type users --entity-name producer

3)創建消費用戶:consumer

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter 
--add-config 'SCRAM-SHA-256=[iterations=8192,password=cons-sec],
SCRAM-SHA-512=[password=cons-sec]' --entity-type users --entity-name consume

SCRAM-SHA-256/SCRAM-SHA-512是對密碼加密的算法,二者有其一即可。

2.4 查看SCRAM證書

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name consumer

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name producer

2.5 刪除SCRAM證書

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' 
--delete-config 'SCRAM-SHA-256' --entity-type users --entity-name producer

2.6 服務端配置

在用戶證書創建完畢之后開始Kafka服務端的配置:

1)創建JAAS文件:

KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-sec";
};

2)將JAAS配置文件位置作為JVM參數傳遞給每個Kafka Broker

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/home/qa/Downloads/kafka_2.12-0.11.0.1/config/kafka_server_jaas.conf kafka.Kafka "[email protected]"

3)配置server.properties:

認證配置
listeners=SASL_PLAINTEXT://主機名稱:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
ACL配置
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

可以根據自己的需求選擇SASL_SSL或SASL_PLAINTEXT, PLAINTEXT為不加密明文傳輸,性能好與SSL

4)重啟Kafka和Zookeeper

2.7 客戶端配置

1)為我們創建的三個用戶分別創建三個JAAS文件:

KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-sec";
};


KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="consumer"
password="cons-sec";
};


KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="producer"
password="prod-sec";
};

2)修改啟動腳本引入JAAS文件:

以生產者為例:

exec $(dirname $0)/kafka-run-class.sh 
-Djava.security.auth.login.config=/home/qa/Downloads/kafka_2.12-0.11.0.1/config/kafka_client_scram_producer_jaas.conf

3)ACL權限配置:此時如果我們生產數據則會發生如下錯誤:

上文2.3節中我們創建了三個用戶,但是還未對其賦予操作權限,接下來我們為其增加權限。

生產者:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add
 --allow-principal User:producer --operation Write --topic test --allow-host 192.168.2.*

消費者:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer 
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:consumer--operation Read --topic test --allow-host 192.168.2.*

為生產者增加分組權限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:producer --operation Read --group test-group --allow-host 192.168.2.*

分配權限之后就可以進行生產,消費操作了。(其他顆粒度的權限設置請參考官方文檔)

4)查看權限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer 
--authorizer-properties zookeeper.connect=localhost:2181 --list

2.8 小結

SASL/SCRAM驗證方法可以在Kafka服務啟動之后,動態的新增用戶分并配權限,在業務變動頻繁,開發人員多的情況下比SASL/PLAIN方法更加靈活。

3. SASL/OAUTHBEARER

SASL/OAUTHBEARER驗證是Kafka2.0版中新增的驗證方式:

KIP-255 adds a framework for authenticating to Kafka brokers using OAuth2 bearer tokens. The SASL/ OAuthBurer implementation is customizable using callbacks for token retrieval and validation.

KIP-255增加了一個使用OAuth2承載令牌對KafkaBroker進行認證的框架。SASL/OAuthBurer實現可使用回調進行令牌檢索和驗證。

3.1 Oauth2.0驗證

OAuth(開放授權)是一個開放標準,允許用戶授權第三方移動應用訪問他們存儲在另外的服務提供者上的信息,而不需要將用戶名和密碼提供給第三方移動應用或分享他們數據的所有內容,OAuth2.0是OAuth協議的延續版本。

Oauth2.0的工作流程如下圖:

(A)用戶打開客戶端以后,客戶端要求用戶給予授權。

(B)用戶同意給予客戶端授權。

(C)客戶端使用上一步獲得的授權,向認證服務器申請令牌。

(D)認證服務器對客戶端進行認證以后,確認無誤,同意發放令牌。

(E)客戶端使用令牌,向資源服務器申請獲取資源。

(F)資源服務器確認令牌無誤,同意向客戶端開放資源。

3.2 JWT規范

Oauth2.0返回驗證令牌使用了JWT格式:JSON Web Token(縮寫 JWT)是目前最流行的跨域認證解決方案,JWT 的原理是,服務器認證以后,生成一個 JSON 對象,發回給用戶。

JWT由三個部分組成,分別是:

  • Header(頭部)
  • Claims(載荷)
  • Signature(簽名)

1)載荷

{
"sub": "1",
"iss": "http://localhost:8000/auth/login",
"iat": 1451888119,
"exp": 1454516119,
"jti": "37c107e4609ddbcc9c096ea5ee76c667"
"nbf": 1451888119
}

sub:該JWT所面向的用戶 iss:該JWT的簽發者 iat(issued at):在什么時候簽發的token exp(expires):token什么時候過期 nbf(not before):token在此時間之前不能被接收處理 jti:JWT ID為web token提供唯一標識

2)頭部

{
"typ": "JWT",
"alg": "HS256"
}

typ:指明數據格式 alg:指明加密算法

3)簽名 以上兩個部分base64編碼之后鏈接成一個字符串,用HS256算法進行加密。在加密的時候,我們還需要提供一個密鑰(secret):

HMACSHA256(
base64UrlEncode(header) + "." +
base64UrlEncode(payload),
secret
)

算出簽名以后,把 Header、Payload、Signature 三個部分用”點”(.)分割拼成一個字符串,就是最后的結果。

完整JWT = 載荷 . 頭部 . 簽名

3.3 版本

相關包版本下載地址
Kafka2.12-2.2.0http://kafka.apache.org/downloads
Zookeeper3.4.8https://www.apache.org/dyn/closer.cgi/zookeeper/

Kafka2.0以上版本才支持此種驗證

3.4 默認驗證方式(非安全驗證)

1)服務端配置:

(A)創建用于broker端通信的JAAS文件:

KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
unsecuredLoginStringClaim_sub="thePrincipalName"
unsecuredLoginListClaim_scope=",KAFKA_BROKER,LOGIN_TO_KAFKA"
unsecuredValidatorRequiredScope="LOGIN_TO_KAFKA"
unsecuredValidatorAllowableClockSkewMs="3000";
};

參數說明:

選項說明
unsecuredValidatorPrincipalClaimName=”value”默認為“sub”,如果需要修改在此定義
unsecuredValidatorScopeClaimName=”value”數值類型
unsecuredValidatorRequiredScope=”value”scope范圍值,可配置字符串或者List
unsecuredValidatorAllowableClockSkewMs=”value”允許的時間偏差。單位:秒,默認0

舉例:

KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
unsecuredLoginStringClaim_sub="admin";
};

admin用戶作為sub的主題用戶用于kafka的broker之間通訊。

(B)在啟動jvm參數中加入該文件:

-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf 

(C)配置server.properties

listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
sasl.enabled.mechanisms=OAUTHBEARER

2)客戶端配置:

(A)配置producer.properties/consumer.properties

security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
sasl.mechanism=OAUTHBEARER

(B)創建客戶端通信的JAAS文件(或配置文件中增加配置項):

sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
unsecuredLoginStringClaim_sub="alice";

JAAS文件樣本:

KafkaClient {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
unsecuredLoginStringClaim_sub="thePrincipalName"
unsecuredLoginListClaim_scope="|scopeValue1|scopeValue2"
unsecuredLoginLifetimeSeconds="60";
};

參數說明:

選項說明
unsecuredLoginStringClaim_<claimname>=”value”創建一個值為“value”的claim(不能為iat/exp)
unsecuredLoginNumberClaim_<claimname>=”value”數值類型
unsecuredLoginListClaim_<claimname>=”value”List類型
unsecuredLoginPrincipalClaimName默認為“sub”,如果需要修改在此定義
unsecuredLoginLifetimeSeconds超時時間單位:秒,默認3600
unsecuredLoginScopeClaimName默認為“scope”,如果需要修改在此定義

優先級:配置文件 > JAAS文件

(C)驗證過程:

在3.4中我們列出了Kafka提供的一套非安全的驗證用于非正式環境,通過創建和驗證JWT實現驗證。只要配置對應JAAS文件即可,本節我們來看一下驗證原理。

通過閱讀官方文檔了解到Kafka提供了一個接口,通過實現接口,創建OAuthBearer token和驗證OAuthBearer token。

此類一共有兩個方法,查看此接口的繼承關系:

通過名字不難猜到這兩個就是Kafka提供的默認的非安全的驗證類

OAuthBearerUnsecuredValidatorCallbackHandler
OAuthBearerUnsecuredLoginCallbackHandler

先看OAuthBearerUnsecuredLoginCallbackHandler的configure方法:在saslMechanism是OAUTHBEARER并且jaasConfigEntries存在的情況下為moduleOptions賦值。moduleOptions保存的是JAAS里面的各種配置項。

handleCallback方法:取出配置的選項值,并組成claimsJson和headerJson,封裝成OAuthBearerUnsecuredJws對象,把值賦給OAuthBearerTokenCallback。

接下來看OAuthBearerUnsecuredValidatorCallbackHandler:

在handleCallback里對callback里面的值進行校驗,如果校驗通過則返回OAuthBearerUnsecuredJws,驗證成功,否則拋出異常。

3.5 安全驗證

Kafka官方文檔中說明:

Production use cases will require writing an implementation of org.apache.kafka.common.security.auth.AuthenticateCallbackHandler that can handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback and declaring it via either the sasl.login.callback.handler.class configuration option for a non-broker client or via the listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class configuration option for brokers (when SASL/OAUTHBEARER is the inter-broker protocol). Production use cases will also require writing an implementation of org.apache.kafka.common.security.auth.AuthenticateCallbackHandler that can handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback and declaring it via the listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class broker configuration option.

分別需要編寫兩個實現類處理OAuthBearerTokenCallback并分別在:

listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class
listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class或者sasl.login.callback.handler.class

3.6 配置

上一節我們討論了Kafka默認提供的兩個實現類,分別實現了AuthenticateCallbackHandler接口,并驗證了基于JWT格式的token。受此啟發我們可以編寫自己的實現類。

1)服務端配置

(A)創建JAAS文件:

KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
LoginStringClaim_sub="admin";
};

(B)在啟動jvm參數中加入該文件:

-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf

(C)配置server.properties:

listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
sasl.enabled.mechanisms=OAUTHBEARER

2)實現接口:此處只給出思路,具體驗證過程根據業務需求定制,我們定義兩個類:

Oauth2AuthenticateLoginCallbackHandler
Oauth2AuthenticateValidatorCallbackHandler
public class Oauth2AuthenticateValidatorCallbackHandler implements AuthenticateCallbackHandler {

 ... 此處省略無關代碼...

    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        if (!isConfigured())
            throw new IllegalStateException("Callback handler not configured");
        for (Callback callback : callbacks) {
            if (callback instanceof OAuthBearerValidatorCallback)
                try {
                    OAuthBearerValidatorCallback validationCallback = (OAuthBearerValidatorCallback) callback;
                    handleCallback(validationCallback);
                } catch (KafkaException e) {
                    throw new IOException(e.getMessage(), e);
                }
            else
                throw new UnsupportedCallbackException(callback);
        }
    }

    private void handleCallback(OAuthBearerValidatorCallback callback) {
        String accessToken = callback.tokenValue();
        if (accessToken == null)
            throw new IllegalArgumentException("Callback missing required token value");

        log.info("Trying to introspect Token!");
        OauthBearerTokenJwt token = OauthHttpClient.introspectBearer(accessToken);
        log.info("Trying to introspected");

        // Implement Check Expire Token..
        long now = time.milliseconds();
        if (now > token.expirationTime()) {
            OAuthBearerValidationResult.newFailure("Expired Token, needs refresh!");
        }

        log.info("Validated! token..");
        callback.token(token);
    }
}

public class Oauth2AuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {

 ... 此處省略無關代碼...

    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        if (!isConfigured())
            throw new IllegalStateException("Callback handler not configured");
        for (Callback callback : callbacks) {
            if (callback instanceof OAuthBearerTokenCallback)
                try {
                    handleCallback((OAuthBearerTokenCallback) callback);
                } catch (KafkaException e) {
                    throw new IOException(e.getMessage(), e);
                }
            else
                throw new UnsupportedCallbackException(callback);
        }
    }

    private void handleCallback(OAuthBearerTokenCallback callback) {
        if (callback.token() != null)
            throw new IllegalArgumentException("Callback had a toke" +
                    "n already");

        log.info("Try to acquire token!");
        OauthBearerTokenJwt token = OauthHttpClient.login(null);
        log.info("Retrieved token..");
        if (token == null) {
            throw new IllegalArgumentException("Null token returned from server");
        }
        callback.token(token);
    }
}
OauthBearerTokenJwt token = OauthHttpClient.login(null);
OauthBearerTokenJwt token = OauthHttpClient.introspectBearer(accessToken);

區別在于這兩個方法:

  • login方法主要是客戶端用自己的信息(可是是用戶名/密碼或者token)創建http或者https請求去Oauth服務器申請token,并封裝OauthBearerTokenJwt返回。
  • introspectBearer方法利用自己的accessToken去Oauth服務器做驗證(查詢數據庫,驗證失效時間等等),驗證成功后同樣返回OauthBearerTokenJwt。

本人用springboot搭建一個簡易的后端用來模擬Oauth服務。

小結

本節介紹了自Kafka2.0版本新增的SASL/OAUTHBEARER驗證以及相關Oauth和JWT技術。分別介紹了Kafka默認的非安全驗證方法和正式環境的驗證實現方法。

SASL/OAUTHBEARER可以加密傳輸驗證信息,自定義實現類處理創建/驗證token。在此過程中可以對接數據庫,便于持久化用戶權限信息。

尚未解決的問題:權限細粒度不知如何控制,對生產數據,消費數據,分組信息的控制暫時沒有找到方法。

我還沒有學會寫個人說明!

mysql5.7 General tablespace使用說明

上一篇

華為:一個數通老兵的擇決

下一篇

你也可能喜歡

遲來的干貨 | Kafka權限管理實戰

長按儲存圖像,分享給朋友

ITPUB 每周精要將以郵件的形式發放至您的郵箱


微信掃一掃

微信掃一掃
百家乐必知技巧 网球比分直播郑玄 澳洲幸运10计划 足彩进球彩推荐 青海快3开奖结果走势图 北单比分直播 山西快乐10分钟开奖结果 cba赛程2014 dd彩票首页 试机号759历史记录查询 福建十一选五手机版