Apache Pulsar
Apache Pulsar is a distributed messaging and event streaming platform designed for high-throughput, low-latency data processing across multiple topics.
Add the following dependency to your project file:
dotnet add package Testcontainers.Pulsar
You can start a Apache Pulsar container instance from any .NET application. Here, we create different container instances and pass them to the base test class. This allows us to test different configurations.
[UsedImplicitly]
public sealed class PulsarDefaultConfiguration : PulsarContainerTest
{
public PulsarDefaultConfiguration()
: base(new PulsarBuilder().Build(), false)
{
}
}
This example uses xUnit.net's IAsyncLifetime
interface to manage the lifecycle of the container. The container is started in the InitializeAsync
method before the test method runs, ensuring that the environment is ready for testing. After the test completes, the container is removed in the DisposeAsync
method.
public Task InitializeAsync()
{
return _pulsarContainer.StartAsync();
}
public Task DisposeAsync()
{
return _pulsarContainer.DisposeAsync().AsTask();
}
[Fact]
public async Task ConsumerReceivesSendMessage()
{
// Given
const string helloPulsar = "Hello, Pulsar!";
var topic = $"persistent://public/default/{Guid.NewGuid():D}";
var name = Guid.NewGuid().ToString("D");
var clientBuilder = PulsarClient.Builder().ServiceUrl(new Uri(_pulsarContainer.GetBrokerAddress()));
if (_authenticationEnabled)
{
var authToken = await _pulsarContainer.CreateAuthenticationTokenAsync(Timeout.InfiniteTimeSpan);
_ = clientBuilder.Authentication(new TokenAuthentication(authToken));
}
var client = clientBuilder.Build();
await using var producer = client.NewProducer(Schema.String)
.Topic(topic)
.Create();
await using var consumer = client.NewConsumer(Schema.String)
.Topic(topic)
.SubscriptionName(name)
.InitialPosition(SubscriptionInitialPosition.Earliest)
.Create();
// When
_ = await producer.Send(helloPulsar)
.ConfigureAwait(true);
var message = await consumer.Receive()
.ConfigureAwait(true);
// Then
Assert.Equal(helloPulsar, Encoding.Default.GetString(message.Data));
}
The test example uses the following NuGet dependencies:
<PackageReference Include="Microsoft.NET.Test.Sdk"/>
<PackageReference Include="coverlet.collector"/>
<PackageReference Include="xunit.runner.visualstudio"/>
<PackageReference Include="xunit"/>
<PackageReference Include="DotPulsar"/>
To execute the tests, use the command dotnet test
from a terminal.
Tip
For the complete source code of this example and additional information, please refer to our test projects.
Access Apache Pulsar
string pulsarBrokerUrl = _pulsarContainer.GetPulsarBrokerUrl();
string pulsarServiceUrl = _pulsarContainer.GetHttpServiceUrl();
Enable token authentication
If you need to use token authentication, use the following builder configuration to enable authentication:
PulsarContainer _pulsarContainer = PulsarBuilder().WithTokenAuthentication().Build();
Start the container and obtain an authentication token with a specified expiration time
var authToken = await container.CreateAuthenticationTokenAsync(TimeSpan.FromHours(1))
.ConfigureAwait(false);
Alternatively, set the token to never expire
var authToken = await container.CreateAuthenticationTokenAsync(Timeout.InfiniteTimeSpan)
.ConfigureAwait(false);
Enable Pulsar Functions
If you need to use Pulsar Functions, use the following builder configuration to enable it:
PulsarContainer _pulsarContainer = PulsarBuilder().WithFunctions().Build();