下一代MQ中间件,不来了解下?( 二 )

配置environments:
这里需要保证Pulsar Manager应用服务能够访问到Pulsar应用,由于都是通过Docker部署,配置Service URL需要使用网络IP,不要用localhost 。

下一代MQ中间件,不来了解下?

文章插图
管理界面:
下一代MQ中间件,不来了解下?

文章插图
Pulsar与SpringBoot集成
  • springboot version : 2.3.7.RELEASE
  • pulsar client: 2.10.2
  1. 通过Properties简单定义一些Broker相关的属性
@Data@ConfigurationProperties(prefix = "pulsar")public class PulsarProperties {private String cluster;private String namespace;private String serverUrl;private String token;}
  1. 通过配置定义了一些常用的组件,比如生产、消费工厂
@Configuration@EnableConfigurationProperties({PulsarProperties.class})public class PulsarBootstrapConfiguration {private final PulsarProperties properties;public PulsarBootstrapConfiguration(PulsarProperties properties) {this.properties = properties;}@Bean(destroyMethod = "close")public PulsarClient pulsarClient() throws PulsarClientException {ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(properties.getServerUrl());return clientBuilder.build();}@Beanpublic PulsarProducerFactory pulsarProducerFactory() throws PulsarClientException {return new PulsarProducerFactory(pulsarClient(), properties);}@Beanpublic PulsarConsumerFactory pulsarConsumerFactory() throws PulsarClientException {return new PulsarConsumerFactory(pulsarClient(), properties);}}
  1. 启动服务,在服务启动后,通过实现SmartInitializingSingleton接口,完成容器基本启动(不包含Lazy的Bean)后,开始对消费者Consumer监听
【下一代MQ中间件,不来了解下?】@Slf4j@SpringBootApplicationpublic class PulsarApplication implements SmartInitializingSingleton {@Autowiredprivate PulsarConsumerFactory consumerFactory;public static void main(String[] args) {SpringApplication.run(PulsarApplication.class,args);}@Overridepublic void afterSingletonsInstantiated() {startConsumerListener();}private void startConsumerListener(){Consumer<String> consumer = createConsumer();if( consumer != null ){while (!Thread.currentThread().isInterrupted()){CompletableFuture<? extends Message<?>> completableFuture = consumer.receiveAsync();Message<?> message = null;try {message = completableFuture.get();} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("错误",e);} catch (ExecutionException e) {log.error("错误",e);}if( message!=null ){try {log.info(" 接收消息:{} ", message.getValue() );consumer.acknowledge(message);} catch (PulsarClientException e) {consumer.negativeAcknowledge(message);throw new RuntimeException(e);}}}}}private Consumer<String> createConsumer() {try {return consumerFactory.getConsumer(Constants.TOPIC_DEMO);} catch (PulsarClientException e) {log.error("创建consumer出错:{}", e.getMessage(),e);}return null;}}
  1. 消息发送测试
@Slf4j@RunWith(SpringRunner.class)@SpringBootTestpublic class PulsarBootTests {@Autowiredprivate PulsarProducerFactory producerFactory;@Testpublic void sendMessage() throws PulsarClientException {Producer producer = producerFactory.getProducer(Constants.TOPIC_DEMO);producer.send(" 测试消息: " + new Date());producer.close();}}
  1. 检查消息接收情况
 2023-02-05 12:05:14.043INFO 23472 --- [ulsar-timer-6-1] o.a.p.c.impl.ConsumerStatsRecorderImpl: [TOPIC_DEMO] [sub-TOPIC_DEMO] [7c2b2] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.02 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 02023-02-05 12:06:16.425INFO 23472 --- [main] com.sucl.pulsar.PulsarApplication:接收消息: 测试消息: Sun Feb 05 12:06:16 CST 2023结束语该篇主要通过官网对Apache Pulsar做了简单的了解与尝试,同时基于SpringBoot,以简单的示例代码实现了消息的发送与接收,其中各个组件仅仅使用了默认的配置,在生产环境需要根据Pulsar的特性以及官方API使其具有扩展性与易用性 。




推荐阅读