Skip to content

datastax/reactive-pulsar

Repository files navigation

Reactive Pulsar adapter

Reactive Streams adapter for Apache Pulsar Java Client. This uses Project Reactor as the Reactive Streams implementation.

Update: This project is deprecated and replaced by official Reactive client for Apache Pulsar and Spring Pulsar

Please migrate to use Reactive client for Apache Pulsar and Spring Pulsar. Spring Pulsar contains reactive support.

Library status: API is evolving

The API is evolving and the documentation and examples might not match the released version available in Maven central. Please keep this in mind when using the library and the applying the examples.

Presentations about the library

Getting it

This library requires Java 8 or + to run.

With Gradle:

repositories {
    mavenCentral()
}

dependencies {
    implementation "com.github.lhotari:reactive-pulsar-adapter:0.2.1"
}

With Maven:

<dependencies>
    <dependency>
        <groupId>com.github.lhotari</groupId>
        <artifactId>reactive-pulsar-adapter</artifactId> 
        <version>0.2.1</version>
    </dependency>
</dependencies>

Spring Boot starter

There's a Spring Boot example at https://github.com/lhotari/reactive-pulsar-showcase . Another Spring Boot example is available at https://github.com/lhotari/reactive-iot-backend-ApacheCon2021 .

Getting it with Gradle:

repositories {
    mavenCentral()
}

dependencies {
    implementation "com.github.lhotari:reactive-pulsar-spring-boot-starter:0.2.1"
    testImplementation "com.github.lhotari:reactive-pulsar-spring-test-support:0.2.1"
}

Getting it with Maven:

<dependencies>
    <dependency>
        <groupId>com.github.lhotari</groupId>
        <artifactId>reactive-pulsar-spring-boot-starter</artifactId> 
        <version>0.2.1</version>
    </dependency>
  <dependency>
    <groupId>com.github.lhotari</groupId>
    <artifactId>reactive-pulsar-spring-test-support</artifactId>
    <version>0.2.1</version>
    <scope>test</scope>
  </dependency>
</dependencies>

Usage

Initializing the library

In standalone application

Using an existing PulsarClient instance:

ReactivePulsarClient reactivePulsarClient = ReactivePulsarClient.create(pulsarClient);

In Spring Boot application using reactive-pulsar-spring-boot-starter

Configure pulsar.client.serviceUrl property in application properties. Any additional properties under pulsar.client. prefix will be used to configure the Pulsar Client. The Spring Boot starter will configure a ReactivePulsarClient bean which will be available for autowiring.

Sending messages

ReactiveMessageSender<String> messageSender = reactivePulsarClient
        .messageSender(Schema.STRING)
        .topic(topicName)
        .maxInflight(100)
        .build();
Mono<MessageId> messageId = messageSender
        .sendMessage(Mono.just(MessageSpec.of("Hello world!")));
// for demonstration
messageId.subscribe(System.out::println);

Sending messages with cached producer

Add require dependency for cache implementation. This step isn't required when using reactive-pulsar-spring-boot-starter. A ReactiveProducerCache instance will be made available as a Spring bean in that case. However, it is necessary to set the cache on the ReactiveMessageSenderFactory.

With Gradle:

dependencies {
    implementation "com.github.lhotari:reactive-pulsar-adapter:0.2.1"
    implementation "com.github.lhotari:reactive-pulsar-caffeine-producer-cache:0.2.1"
}

With Maven:

<dependencies>
    <dependency>
        <groupId>com.github.lhotari</groupId>
        <artifactId>reactive-pulsar-adapter</artifactId> 
        <version>0.2.1</version>
    </dependency>
    <dependency>
        <groupId>com.github.lhotari</groupId>
        <artifactId>reactive-pulsar-caffeine-producer-cache</artifactId>
        <version>0.2.1</version>
    </dependency>
</dependencies>
CaffeineReactiveProducerCache producerCache = new CaffeineReactiveProducerCache();
ReactiveMessageSender<String> messageSender = reactivePulsarClient
        .messageSender(Schema.STRING)
        .cache(producerCache)
        .topic(topicName)
        .maxInflight(100)
        .build();
Mono<MessageId> messageId = messageSender
        .sendMessage(Mono.just(MessageSpec.of("Hello world!")));
// for demonstration
messageId.subscribe(System.out::println);

It is recommended to use a cached producer in most cases. The cache enables reusing the Pulsar Producer instance and related resources across multiple message sending calls. This improves performance since a producer won't have to be created and closed before and after sending a message.

The adapter library implementation together with the cache implementation will also enable reactive backpressure for sending messages. The maxInflight setting will limit the number of messages that are pending from the client to the broker. The solution will limit reactive streams subscription requests to keep the number of pending messages under the defined limit. This limit is per-topic and impacts the local JVM only.

Reading messages

Reading all messages for a topic:

    ReactiveMessageReader<String> messageReader =
            reactivePulsarClient.messageReader(Schema.STRING)
                    .topic(topicName)
                    .build();
    messageReader.readMessages()
            .map(Message::getValue)
            // for demonstration
            .subscribe(System.out::println);

By default, the stream will complete when end of the topic is reached. The end of the topic is detected with Pulsar Reader's hasMessageAvailableAsync method.

The ReactiveMessageReader doesn't support partitioned topics. It's possible to read the content of indidual partitions. Topic names for individual partitions can be discovered using the PulsarClient's getPartitionsForTopic method. The adapter library doesn't currently wrap that method.

Example: poll for up to 5 new messages and stop polling when a timeout occurs

With .endOfStreamAction(EndOfStreamAction.POLL) the Reader will poll for new messages when the reader reaches the end of the topic.

    ReactiveMessageReader<String> messageReader =
            reactivePulsarClient.messageReader(Schema.STRING)
                    .topic(topicName)
                    .startAtSpec(StartAtSpec.LATEST)
                    .endOfStreamAction(EndOfStreamAction.POLL)
                    .build();
    messageReader.readMessages()
            .take(Duration.ofSeconds(5))
            .take(5)
            // for demonstration
            .subscribe(System.out::println);

Consuming messages

    ReactiveMessageConsumer<String> messageConsumer=
        reactivePulsarClient.messageConsumer(Schema.STRING)
        .topic(topicName)
        .consumerConfigurer(consumerBuilder->consumerBuilder.subscriptionName("sub"))
        .build();
    messageConsumer.consumeMessages(messageFlux ->
                    messageFlux.map(message ->
                            MessageResult.acknowledge(message.getMessageId(), message.getValue())))
        .take(Duration.ofSeconds(2))
        // for demonstration
        .subscribe(System.out::println);

Consuming messages using a message handler component with auto-acknowledgements

ReactiveMessageHandler reactiveMessageHandler=
    ReactiveMessageHandlerBuilder
        .builder(reactivePulsarClient
           .messageConsumer(Schema.STRING)
           .consumerConfigurer(consumerBuilder->
             consumerBuilder.subscriptionName("sub")
            .topic(topicName))
            .build())
        .messageHandler(message -> Mono.fromRunnable(()->{
            System.out.println(message.getValue());
        }))
        .build()
        .start();
// for demonstration
// the reactive message handler is running in the background, delay for 10 seconds
Thread.sleep(10000L);
// now stop the message handler component
reactiveMessageHandler.stop();

License

Reactive Pulsar adapter library is Open Source Software released under the Apache Software License 2.0.

How to Contribute

The library is Apache 2.0 licensed.

Contributions are welcome. Contributors will be asked to sign a CLA before the contributions are merged since there's a desire to be able to move the Reactive Pulsar project under Apache in the future. Without CLAs that process comes complicated.

Bugs and Feature Requests

If you detect a bug or have a feature request or a good idea for Reactive Pulsar adapter, please open a GitHub issue or ping one of the contributors on Twitter or on Pulsar Slack.

Questions

Please use [reactive-pulsar] tag on Stackoverflow. Ask a question now.