Building Kafka Consumer & Producer in .NET

Table Of Content
- Introduction
- What You’ll Learn
- Setting Up Kafka in .NET
- Abstract Kafka Consumer with BackgroundService
- Code Example (Simplified)
- 🧠 Health Check
- ⛵ Controlled Concurrency with Semaphore
- 🔁 Retry Logic for Processing Failures
- 📊 Metrics Tracking
- 🧼 Graceful Shutdown and Cleanup
- 🧪 Summary
- Producer Example
- Real-World Use Case: Analytics Event Tracker
- Conclusion
- Next Steps
Introduction
Apache Kafka is a powerful event streaming platform used to handle high-throughput and low-latency data pipelines. In modern .NET microservices, Kafka acts as the backbone for asynchronous messaging. In this post, we will walk through how to build a robust Kafka consumer and producer in .NET using the Confluent.Kafka
library — with detailed explanations and real-world implementation.
What You’ll Learn
- Setting up the Kafka configuration
- Building a reusable Kafka consumer base class
- Implementing parallel processing with throttling
- Handling errors and retries gracefully
- Health checking and metrics
- Best practices with
IServiceProvider
andBackgroundService
Setting Up Kafka in .NET
Install the NuGet package:
dotnet add package Confluent.Kafka
Ensure you have access to a Kafka cluster (e.g., Confluent Cloud or local KRaft setup). Add the required settings in your appsettings.json
or configuration provider:
"Kafka": {
"BootstrapServers": "your-cluster:9092",
"SaslUsername": "your-username",
"SaslPassword": "your-password"
}
Abstract Kafka Consumer with BackgroundService
Let’s break down this example class step by step:
Step 1: Constructor Setup
public KafkaConsumerBase(...)
Here we:
- Read Kafka configuration
- Initialize the
ConsumerBuilder
- Subscribe to topic
- Configure throttling for max concurrent message processing
Step 2: ExecuteAsync
This method runs continuously while the service is active. It:
- Subscribes to Kafka
- Polls for messages
- Manages active processing tasks
- Throttles concurrent workers
💡 Real-world case: Your document service publishes a message whenever a user creates a new document. Your analytics service consumes that message to track user behavior.
Step 3: Controlled Parallelism
_throttler = new SemaphoreSlim(maxConcurrentProcessing);
You control how many messages can be processed at once. This avoids overloading the system.
Step 4: Retry Logic
await ProcessMessageWithRetryAsync(...)
Each message is retried with exponential backoff before being marked as failed. This improves resilience when handling transient errors.
Step 5: Metrics and Health
You track:
- Total messages processed
- Failed messages
- Health via heartbeat (
_lastSuccessfulConsume
)
Useful for dashboards, Prometheus exporters, etc.
Code Example (Simplified)
public class MyKafkaConsumer : KafkaConsumerBase<MyEvent>
{
public MyKafkaConsumer(...) : base(...) { }
protected override async Task ProcessMessage(string message, IServiceProvider serviceProvider)
{
var myService = serviceProvider.GetRequiredService<IMyService>();
var data = JsonSerializer.Deserialize<MyEvent>(message);
await myService.HandleAsync(data);
}
}
🧠 Health Check
private volatile bool _isHealthy = true;
private DateTime _lastSuccessfulConsume = DateTime.UtcNow;
Health status is updated when errors or timeouts occur. We define a method to check if the consumer is healthy:
public bool IsHealthy() =>
_isHealthy && DateTime.UtcNow - _lastSuccessfulConsume < TimeSpan.FromMinutes(5);
✅ Purpose: This allows external services (e.g., monitoring tools like Prometheus) to determine whether the consumer is healthy.
⛵ Controlled Concurrency with Semaphore
private readonly SemaphoreSlim _throttler;
The throttler limits how many messages can be processed in parallel:
_throttler = new SemaphoreSlim(maxConcurrentProcessing);
We wait to acquire a slot before processing a message:
bool acquired = await _throttler.WaitAsync(TimeSpan.FromSeconds(5), stoppingToken);
After processing:
_throttler.Release();
🎯 Why? Without a semaphore, you risk consuming more messages than your system can handle simultaneously.
🔁 Retry Logic for Processing Failures
private async Task ProcessMessageWithRetryAsync(string message, IServiceProvider serviceProvider)
Implements exponential backoff for failed message processing attempts:
var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));
After 3 attempts, it logs an error and rethrows the exception.
📌 Note: This is critical in distributed systems where transient failures are common.
📊 Metrics Tracking
private long _totalMessagesProcessed = 0;
private long _failedMessages = 0;
Helps monitor performance and stability. This is useful for dashboards or alerts.
_logger.LogInformation($"Processed {_totalMessagesProcessed} messages total...");
🧼 Graceful Shutdown and Cleanup
The consumer waits for all background tasks to complete before shutting down:
await Task.WhenAll(tasksToWait);
_consumer.Close();
This ensures no messages are lost and offsets are properly committed.
🧪 Summary
This advanced Kafka consumer pattern in .NET offers:
- ✅ Health checks for monitoring
- 🚥 Concurrency control with SemaphoreSlim
- 🔁 Automatic retry logic
- 📊 Metrics for observability
- 🧹 Graceful shutdown handling
It’s designed for scalable, production-grade systems.
In the next blog, we’ll explore how to implement the producer side in .NET.
Producer Example
A fully Kafka producer example:
using Confluent.Kafka;
using System.Diagnostics;
namespace Application.Common.Kafka
{
public abstract class KafkaConsumerBase<T> : BackgroundService
{
private readonly IConsumer<string, string> _consumer;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<KafkaConsumerBase<T>> _logger;
private readonly string _topicName;
private readonly SemaphoreSlim _throttler;
// Metrics tracking
private long _totalMessagesProcessed = 0;
private long _failedMessages = 0;
private readonly Stopwatch _processingStopwatch = new Stopwatch();
// Health monitoring
private volatile bool _isHealthy = true;
private DateTime _lastSuccessfulConsume = DateTime.UtcNow;
protected KafkaConsumerBase(
IConfiguration configuration,
ILogger<KafkaConsumerBase<T>> logger,
IServiceProvider serviceProvider,
string topicName,
string groupId,
int maxConcurrentProcessing = 10)
{
_logger = logger;
_serviceProvider = serviceProvider;
_topicName = topicName;
_throttler = new SemaphoreSlim(maxConcurrentProcessing);
var consumerConfig = new ConsumerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
SaslUsername = configuration["Kafka:SaslUsername"],
SaslPassword = configuration["Kafka:SaslPassword"],
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
GroupId = groupId,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false,
// Performance optimizations
MaxPollIntervalMs = 300000, // 5 minutes
SessionTimeoutMs = 30000, // 30 seconds
FetchMaxBytes = 1048576, // 1MB
FetchMinBytes = 10240, // 10KB
FetchWaitMaxMs = 500, // 500ms
QueuedMaxMessagesKbytes = 1048576, // 1MB
StatisticsIntervalMs = 5000 // Enable statistics for monitoring
};
_consumer = new ConsumerBuilder<string, string>(consumerConfig)
.SetErrorHandler((_, error) =>
{
_logger.LogError($"Kafka error: {error.Reason}");
if (error.IsFatal)
{
_isHealthy = false;
}
})
.Build();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
_consumer.Subscribe(_topicName);
_logger.LogInformation($"Subscribed to topic: {_topicName}");
// Track currently running tasks
var runningTasks = new List<Task>();
// Periodically clean up completed tasks
using var cleanupTimer = new Timer(_ =>
{
lock (runningTasks)
{
runningTasks.RemoveAll(t => t.IsCompleted);
}
}, null, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30));
while (!stoppingToken.IsCancellationRequested)
{
try
{
// Check if we're at capacity
if (runningTasks.Count >= _throttler.CurrentCount * 2)
{
_logger.LogWarning($"Task queue is full ({runningTasks.Count} running tasks). Pausing consumption.");
await Task.Delay(TimeSpan.FromMilliseconds(500), stoppingToken);
continue;
}
var consumeResult = _consumer.Consume(TimeSpan.FromSeconds(2));
if (consumeResult != null)
{
_lastSuccessfulConsume = DateTime.UtcNow;
// Wait for a processing slot using the throttler with a timeout
bool acquired = await _throttler.WaitAsync(TimeSpan.FromSeconds(5), stoppingToken);
if (!acquired)
{
_logger.LogWarning("Failed to acquire processing slot after timeout. Requeuing message.");
continue;
}
// Process the message with controlled parallelism
var processTask = Task.Run(async () =>
{
try
{
using var scope = _serviceProvider.CreateScope();
_processingStopwatch.Restart();
// Process with retry logic
await ProcessMessageWithRetryAsync(consumeResult.Message.Value, scope.ServiceProvider);
_processingStopwatch.Stop();
Interlocked.Increment(ref _totalMessagesProcessed);
if (_totalMessagesProcessed % 100 == 0)
{
_logger.LogInformation(
$"Processed {_totalMessagesProcessed} messages total, " +
$"last message processed in {_processingStopwatch.ElapsedMilliseconds}ms, " +
$"active tasks: {runningTasks.Count}, " +
$"available slots: {_throttler.CurrentCount}");
}
try
{
_consumer.Commit(consumeResult);
}
catch (Exception ex)
{
_logger.LogError($"Error committing offset: {ex.Message}");
}
}
catch (Exception ex)
{
Interlocked.Increment(ref _failedMessages);
_logger.LogError($"Error processing message: {ex.Message}");
}
finally
{
_throttler.Release();
}
}, stoppingToken);
// Track the task
lock (runningTasks)
{
runningTasks.Add(processTask);
}
}
}
catch (ConsumeException e)
{
_logger.LogError($"Consume error: {e.Error.Reason}");
}
catch (OperationCanceledException)
{
_logger.LogInformation("Kafka consumption was canceled.");
break;
}
catch (Exception ex)
{
_logger.LogError($"Error in consumer loop: {ex.Message}");
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
// Wait for all tasks to complete on shutdown
_logger.LogInformation("Waiting for all processing tasks to complete...");
try
{
Task[] tasksToWait;
lock (runningTasks)
{
tasksToWait = runningTasks.ToArray();
}
await Task.WhenAll(tasksToWait);
}
catch (Exception ex)
{
_logger.LogError($"Error waiting for tasks to complete: {ex.Message}");
}
try
{
_consumer.Unsubscribe();
_consumer.Close();
_logger.LogInformation("Kafka consumer closed gracefully.");
}
catch (Exception ex)
{
_logger.LogError($"Error during consumer shutdown: {ex.Message}");
}
}
protected abstract Task ProcessMessage(string message, IServiceProvider serviceProvider);
private async Task ProcessMessageWithRetryAsync(string message, IServiceProvider serviceProvider)
{
int retryCount = 0;
const int maxRetries = 3;
while (true)
{
try
{
// Create a new scope for each retry attempt
using var retryScope = serviceProvider.CreateScope();
await ProcessMessage(message, retryScope.ServiceProvider);
return; // Success
}
catch (Exception ex)
{
if (++retryCount > maxRetries)
{
_logger.LogError($"Failed to process message after {maxRetries} attempts. Error: {ex.Message}");
throw; // Rethrow after max retries
}
var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount)); // Exponential backoff
_logger.LogWarning($"Retry {retryCount}/{maxRetries} after {delay.TotalSeconds}s. Error: {ex.Message}");
await Task.Delay(delay);
}
}
}
public bool IsHealthy() =>
_isHealthy && DateTime.UtcNow - _lastSuccessfulConsume < TimeSpan.FromMinutes(5);
public ConsumerMetrics GetMetrics() => new ConsumerMetrics
{
MessagesProcessed = _totalMessagesProcessed,
FailedMessages = _failedMessages,
IsHealthy = IsHealthy()
};
public class ConsumerMetrics
{
public long MessagesProcessed { get; set; }
public long FailedMessages { get; set; }
public bool IsHealthy { get; set; }
}
}
}
Real-World Use Case: Analytics Event Tracker
Imagine a web app where users interact with learning materials. You want to track every document view:
- The Document Service sends an event to Kafka when a user views a document.
- The Analytics Service consumes that event and saves the view to MongoDB for analysis.
This decouples your services, improving scalability and fault tolerance.
Conclusion
Using Confluent.Kafka
, you can build powerful consumer and producer services in .NET. By abstracting consumers, adding throttling, retries, and health checks, you ensure production-ready reliability.
Next Steps
- Add Prometheus metrics exporter
- Add Data Contracts for typed messages
- Integrate with OpenTelemetry
- Support Avro or Protobuf for typed messages
- Implement partition-specific consumers
Have questions or want to see the full GitHub repo? Let me know!