Java使用pulsar

网友投稿 345 2022-10-08


Java使用pulsar

简介

通过 pulsar-flink-connector 读取到 Apache pulsar 中的namespaces、topics的元数据信息。

pulsar-flink-connector 的 github: https://github.com/streamnative/pulsar-flink

Maven

io.streamnative.connectors

pulsar-flink-connector-2.11-1.12

2.7.3

central

default

https://repo1.maven.org/maven2

bintray-streamnative-maven

bintray

https://dl.bintray.com/streamnative/maven

CODE

使用PulsarMetadataReader获取元数据

package com.levi.demo;

import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;

import org.apache.pulsar.client.admin.PulsarAdminException;

import org.apache.pulsar.client.impl.auth.AuthenticationToken;

import org.apache.pulsar.client.impl.conf.ClientConfigurationData;

import org.apache.pulsar.common.schema.SchemaInfo;

import org.apache.pulsar.common.schema.SchemaType;

import java.io.IOException;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

/**

* Test.

*

* @author levi

* @version 1.0

**/

public class Test {

public static void main(String[] args) {

final ClientConfigurationData conPzNTWvbZYIfigurationData = new ClientConfigurationData();

configurationData.setServiceUrl("pulsar://127.0.0.1:6650");

//Your Pulsar Token

final AuthenticationToken token =

new AuthenticationToken(

"eyJxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx");

configurationData.setAuthentication(token);

try (final PulsarMetadataReader reader =

new PulsarMetadataReader("http://127.0.0.1:8443",

configurationData,

"",

new HashMap(),

-1,

-1)) {

//获取namespaces

final List namespaces = reader.listNamespaces();

System.out.println("namehttp://spaces: " + namespaces.toString())PzNTWvbZYI;

for (final String namespace : namespaces) {

//获取Topics

final List topics = reader.getTopics(namespace);

System.out.println("topic: " + topics.toString());

http:// for (String topic : topics) {

//获取字段SchemaInfo

final SchemaInfo schemaInfo = reader.getPulsarSchema(topic);

final String name = schemaInfo.getName();

System.out.println("SchemaName:" + name); //topicName

final SchemaType type = schemaInfo.getType();

System.out.println("SchemaType:" + type.toString());// "jsON"...

final Map properties = schemaInfo.getProperties();

System.out.println(properties);

final String schemaDefinition = schemaInfo.getSchemaDefinition();

System.out.println(schemaDefinition); // Field info.

}

}

} catch (IOException | PulsarAdminException e) {

e.printStackTrace();

}

}

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:nmap(nmap扫描端口命令)
下一篇:DVWA+SQLMAP
相关文章

 发表评论

暂时没有评论,来抢沙发吧~