【Azure 事件中心】使用Azure AD认证方式创建Event Hub Consume Client + 自定义Event Position

网友投稿 277 2022-10-05

【Azure 事件中心】使用Azure AD认证方式创建Event Hub Consume Client + 自定义Event Position

问题描述

当使用SDK连接到Azure Event Hub时,最常规的方式为使用连接字符串。这种做法参考官网文档就可成功完成代码:​​AD认证方式进行访问,代码需要如何修改呢? 如何来使用AAD的 TokenCredential呢?

分析问题

在使用Connection String的时候,EventProcessorClientBuilder使用connectionString方法配置连接字符串。

如果使用Azure AD认证,则需要先根据AAD中注册应用获取到Client ID, Tenant ID, Client Secret,然后把这些内容设置为系统环境变量

AZURE_TENANT_ID :对应AAD 注册应用页面的 Tenant IDAZURE_CLIENT_ID :对应AAD 注册应用页面的 Application (Client) IDAZURE_CLIENT_SECRET :对应AAD 注册应用的 Certificates & secrets 中创建的Client Secrets

然后使用 credential 初始化 EventProcessorClientBuilder 对象

注意点:

1) DefaultAzureCredentialBuilder 需要指定 Authority Host为 Azure China

2) EventProcessorClientBuilder . Credential 方法需要指定Event Hub Namespce 的域名

操作实现

第一步:为AAD注册应用赋予操作Event Hub Data的权限

Azure 提供了以下 Azure 内置角色,用于通过 Azure AD 和 OAuth 授予对事件中心数据的访问权限:

Azure 事件中心数据所有者 (Azure Event Hubs Data Owner): 使用此角色可以授予对事件中心资源的完全访问权限。Azure 事件中心数据发送者 (Azure Event Hubs Data Sender) : 使用此角色可以授予对事件中心资源的发送访问权限。Azure 事件中心数据接收者 (Azure Event Hubs Data Receiver): 使用此角色可以授予对事件中心资源的使用/接收访问权限。

本实例中,只需要接收数据,所以只赋予了 Azure Event Hubs Data Receiver权限。

第二步:在Java 项目中添加SDK依赖

添加在pom.xml文件中 dependencies 部分的内容为:azure-identity , azure-messaging-eventhubs , azure-messaging-eventhubs-checkpointstore-blob,最好都是用最新版本,避免出现运行时出现类型冲突或找不到

"; // private static final String eventHubName = ""; // private static final String storageConnectionString = ""; // private static final String storageContainerName = ""; public static void main(String[] args) throws IOException { System.out.println("Hello World!"); // String connectionString =""; // The fully qualified namespace for the Event Hubs instance. This is likely to // be similar to: // {your-namespace}.servicebus.windows.net // String fullyQualifiedNamespace =".servicebus.chinacloudapi.cn"; // String eventHubName = ""; String storageConnectionString = System.getenv("storageConnectionString"); String storageContainerName = System.getenv("storageContainerName"); String fullyQualifiedNamespace = System.getenv("fullyQualifiedNamespace"); String eventHubName = System.getenv("eventHubName"); TokenCredential credential = new DefaultAzureCredentialBuilder().authorityHost(AzureAuthorityHosts.AZURE_CHINA) .build(); // Create a blob container client that you use later to build an event processor // client to receive and process events BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder() .connectionString(storageConnectionString) .containerName(storageContainerName) .buildAsyncClient(); // EventHubProducerClient // EventHubProducerClient client = new EventHubClientBuilder() // .credential(fullyQualifiedNamespace, eventHubName, credential) // .buildProducerClient(); Map initialPartitionEventPosition = new HashMap<>(); initialPartitionEventPosition.put("0", EventPosition.fromSequenceNumber(3000)); // EventProcessorClientBuilder // Create a builder object that you will use later to build an event processor // client to receive and process events and errors. EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder() // .connectionString(connectionString, eventHubName) .credential(fullyQualifiedNamespace, eventHubName, credential) .initialPartitionEventPosition(initialPartitionEventPosition) .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) .processEvent(PARTITION_PROCESSOR) .processError(ERROR_HANDLER) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)); // EventPosition.f // Use the builder object to create an event processor client EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient(); System.out.println("Starting event processor"); eventProcessorClient.start(); System.out.println("Press enter to stop."); System.in.read(); System.out.println("Stopping event processor"); eventProcessorClient.stop(); System.out.println("Event processor stopped."); System.out.println("Exiting process"); } public static final Consumer PARTITION_PROCESSOR = eventContext -> { PartitionContext partitionContext = eventContext.getPartitionContext(); EventData eventData = eventContext.getEventData(); System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n", partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString()); // Every 10 events received, it will update the checkpoint stored in Azure Blob // Storage. if (eventData.getSequenceNumber() % 10 == 0) { eventContext.updateCheckpoint(); } }; public static final Consumer ERROR_HANDLER = errorContext -> { System.out.printf("Error occurred in partition processor for partition %s, %s.%n", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); };}

附录:自定义设置 Event Position,当程序运行时,指定从Event Hub中获取消息的 Sequence Number

使用EventPosition对象中的fromSequenceNumber方法,可以指定一个序列号,Consume端会根据这个号码获取之后的消息。其他的方法还有 fromOffset(指定游标) / fromEnqueuedTime(指定一个时间点,获取之后的消息) / earliest(从最早开始) / latest(从最后开始获取新的数据,旧数据不获取)

Map initialPartitionEventPosition = new HashMap<>();initialPartitionEventPosition.put("0", EventPosition.fromSequenceNumber(3000));

注意:

Map 中的String 对象为Event Hub的分区ID,如果Event Hub有2个分区,则它的值分别时0,1.

EventPosition 设置的值,只在Storage Account所保存在CheckPoint Store中的值没有,或者小于此处设定的值时,才会起效果。否则,Consume 会根据从Checkpoint中获取的SequenceNumber为准。

参考资料

授予对 Azure 事件中心的访问权限 :​​Azure Active Directory 访问事件中心资源的应用程序进行身份验证 : ​​Java 向/从 Azure 事件中心 (azure-messaging-eventhubs) 发送/接收事件 : ​​云中,恰是如此!

分类: ​​【Azure 事件中心】​​

标签: ​​Azure Developer​​, ​​事件中心 Azure Event Hub​​, ​​JAVA Event Hub SDK​​, ​​credential(..., .., credential)​​, ​​EventPosition.fromSequenceNumber(3000)​​

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java中 ? extends T 和 ? super T的理解
下一篇:云图说丨初识华为云微服务引擎CSE
相关文章

 发表评论

暂时没有评论,来抢沙发吧~