Building Kafka Consumer & Producer in .NET

Table Of Content
- Introduction
- What You’ll Learn
- Setting Up Kafka in .NET
- Abstract Kafka Consumer with BackgroundService
- Code Example (Simplified)
- 🧠 Health Check
- ⛵ Controlled Concurrency with Semaphore
- 🔁 Retry Logic for Processing Failures
- 📊 Metrics Tracking
- 🧼 Graceful Shutdown and Cleanup
- 🧪 Summary
- Producer Example
- Real-World Use Case: Analytics Event Tracker
- Conclusion
- Next Steps
Introduction
Apache Kafka is a powerful event streaming platform used to handle high-throughput and low-latency data pipelines. In modern .NET microservices, Kafka acts as the backbone for asynchronous messaging. In this post, we will walk through how to build a robust Kafka consumer and producer in .NET using the Confluent.Kafka library — with detailed explanations and real-world implementation.
What You’ll Learn
- Setting up the Kafka configuration
- Building a reusable Kafka consumer base class
- Implementing parallel processing with throttling
- Handling errors and retries gracefully
- Health checking and metrics
- Best practices with
IServiceProviderandBackgroundService
Setting Up Kafka in .NET
Install the NuGet package:
dotnet add package Confluent.KafkaEnsure you have access to a Kafka cluster (e.g., Confluent Cloud or local KRaft setup). Add the required settings in your appsettings.json or configuration provider:
"Kafka": {
"BootstrapServers": "your-cluster:9092",
"SaslUsername": "your-username",
"SaslPassword": "your-password"
}Abstract Kafka Consumer with BackgroundService
Let’s break down this example class step by step:
Step 1: Constructor Setup
public KafkaConsumerBase(...)Here we:
- Read Kafka configuration
- Initialize the
ConsumerBuilder - Subscribe to topic
- Configure throttling for max concurrent message processing
Step 2: ExecuteAsync
This method runs continuously while the service is active. It:
- Subscribes to Kafka
- Polls for messages
- Manages active processing tasks
- Throttles concurrent workers
💡 Real-world case: Your document service publishes a message whenever a user creates a new document. Your analytics service consumes that message to track user behavior.
Step 3: Controlled Parallelism
_throttler = new SemaphoreSlim(maxConcurrentProcessing);You control how many messages can be processed at once. This avoids overloading the system.
Step 4: Retry Logic
await ProcessMessageWithRetryAsync(...)Each message is retried with exponential backoff before being marked as failed. This improves resilience when handling transient errors.
Step 5: Metrics and Health
You track:
- Total messages processed
- Failed messages
- Health via heartbeat (
_lastSuccessfulConsume)
Useful for dashboards, Prometheus exporters, etc.
Code Example (Simplified)
public class MyKafkaConsumer : KafkaConsumerBase<MyEvent>
{
public MyKafkaConsumer(...) : base(...) { }
protected override async Task ProcessMessage(string message, IServiceProvider serviceProvider)
{
var myService = serviceProvider.GetRequiredService<IMyService>();
var data = JsonSerializer.Deserialize<MyEvent>(message);
await myService.HandleAsync(data);
}
}🧠 Health Check
private volatile bool _isHealthy = true;
private DateTime _lastSuccessfulConsume = DateTime.UtcNow;Health status is updated when errors or timeouts occur. We define a method to check if the consumer is healthy:
public bool IsHealthy() =>
_isHealthy && DateTime.UtcNow - _lastSuccessfulConsume < TimeSpan.FromMinutes(5);✅ Purpose: This allows external services (e.g., monitoring tools like Prometheus) to determine whether the consumer is healthy.
⛵ Controlled Concurrency with Semaphore
private readonly SemaphoreSlim _throttler;The throttler limits how many messages can be processed in parallel:
_throttler = new SemaphoreSlim(maxConcurrentProcessing);We wait to acquire a slot before processing a message:
bool acquired = await _throttler.WaitAsync(TimeSpan.FromSeconds(5), stoppingToken);After processing:
_throttler.Release();🎯 Why? Without a semaphore, you risk consuming more messages than your system can handle simultaneously.
🔁 Retry Logic for Processing Failures
private async Task ProcessMessageWithRetryAsync(string message, IServiceProvider serviceProvider)Implements exponential backoff for failed message processing attempts:
var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));After 3 attempts, it logs an error and rethrows the exception.
📌 Note: This is critical in distributed systems where transient failures are common.
📊 Metrics Tracking
private long _totalMessagesProcessed = 0;
private long _failedMessages = 0;Helps monitor performance and stability. This is useful for dashboards or alerts.
_logger.LogInformation($"Processed {_totalMessagesProcessed} messages total...");🧼 Graceful Shutdown and Cleanup
The consumer waits for all background tasks to complete before shutting down:
await Task.WhenAll(tasksToWait);
_consumer.Close();This ensures no messages are lost and offsets are properly committed.
🧪 Summary
This advanced Kafka consumer pattern in .NET offers:
- ✅ Health checks for monitoring
- 🚥 Concurrency control with SemaphoreSlim
- 🔁 Automatic retry logic
- 📊 Metrics for observability
- 🧹 Graceful shutdown handling
It’s designed for scalable, production-grade systems.
In the next blog, we’ll explore how to implement the producer side in .NET.
Producer Example
A fully Kafka producer example:
using Confluent.Kafka;
using System.Diagnostics;
namespace Application.Common.Kafka
{
public abstract class KafkaConsumerBase<T> : BackgroundService
{
private readonly IConsumer<string, string> _consumer;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<KafkaConsumerBase<T>> _logger;
private readonly string _topicName;
private readonly SemaphoreSlim _throttler;
// Metrics tracking
private long _totalMessagesProcessed = 0;
private long _failedMessages = 0;
private readonly Stopwatch _processingStopwatch = new Stopwatch();
// Health monitoring
private volatile bool _isHealthy = true;
private DateTime _lastSuccessfulConsume = DateTime.UtcNow;
protected KafkaConsumerBase(
IConfiguration configuration,
ILogger<KafkaConsumerBase<T>> logger,
IServiceProvider serviceProvider,
string topicName,
string groupId,
int maxConcurrentProcessing = 10)
{
_logger = logger;
_serviceProvider = serviceProvider;
_topicName = topicName;
_throttler = new SemaphoreSlim(maxConcurrentProcessing);
var consumerConfig = new ConsumerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
SaslUsername = configuration["Kafka:SaslUsername"],
SaslPassword = configuration["Kafka:SaslPassword"],
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
GroupId = groupId,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false,
// Performance optimizations
MaxPollIntervalMs = 300000, // 5 minutes
SessionTimeoutMs = 30000, // 30 seconds
FetchMaxBytes = 1048576, // 1MB
FetchMinBytes = 10240, // 10KB
FetchWaitMaxMs = 500, // 500ms
QueuedMaxMessagesKbytes = 1048576, // 1MB
StatisticsIntervalMs = 5000 // Enable statistics for monitoring
};
_consumer = new ConsumerBuilder<string, string>(consumerConfig)
.SetErrorHandler((_, error) =>
{
_logger.LogError($"Kafka error: {error.Reason}");
if (error.IsFatal)
{
_isHealthy = false;
}
})
.Build();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
_consumer.Subscribe(_topicName);
_logger.LogInformation($"Subscribed to topic: {_topicName}");
// Track currently running tasks
var runningTasks = new List<Task>();
// Periodically clean up completed tasks
using var cleanupTimer = new Timer(_ =>
{
lock (runningTasks)
{
runningTasks.RemoveAll(t => t.IsCompleted);
}
}, null, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30));
while (!stoppingToken.IsCancellationRequested)
{
try
{
// Check if we're at capacity
if (runningTasks.Count >= _throttler.CurrentCount * 2)
{
_logger.LogWarning($"Task queue is full ({runningTasks.Count} running tasks). Pausing consumption.");
await Task.Delay(TimeSpan.FromMilliseconds(500), stoppingToken);
continue;
}
var consumeResult = _consumer.Consume(TimeSpan.FromSeconds(2));
if (consumeResult != null)
{
_lastSuccessfulConsume = DateTime.UtcNow;
// Wait for a processing slot using the throttler with a timeout
bool acquired = await _throttler.WaitAsync(TimeSpan.FromSeconds(5), stoppingToken);
if (!acquired)
{
_logger.LogWarning("Failed to acquire processing slot after timeout. Requeuing message.");
continue;
}
// Process the message with controlled parallelism
var processTask = Task.Run(async () =>
{
try
{
using var scope = _serviceProvider.CreateScope();
_processingStopwatch.Restart();
// Process with retry logic
await ProcessMessageWithRetryAsync(consumeResult.Message.Value, scope.ServiceProvider);
_processingStopwatch.Stop();
Interlocked.Increment(ref _totalMessagesProcessed);
if (_totalMessagesProcessed % 100 == 0)
{
_logger.LogInformation(
$"Processed {_totalMessagesProcessed} messages total, " +
$"last message processed in {_processingStopwatch.ElapsedMilliseconds}ms, " +
$"active tasks: {runningTasks.Count}, " +
$"available slots: {_throttler.CurrentCount}");
}
try
{
_consumer.Commit(consumeResult);
}
catch (Exception ex)
{
_logger.LogError($"Error committing offset: {ex.Message}");
}
}
catch (Exception ex)
{
Interlocked.Increment(ref _failedMessages);
_logger.LogError($"Error processing message: {ex.Message}");
}
finally
{
_throttler.Release();
}
}, stoppingToken);
// Track the task
lock (runningTasks)
{
runningTasks.Add(processTask);
}
}
}
catch (ConsumeException e)
{
_logger.LogError($"Consume error: {e.Error.Reason}");
}
catch (OperationCanceledException)
{
_logger.LogInformation("Kafka consumption was canceled.");
break;
}
catch (Exception ex)
{
_logger.LogError($"Error in consumer loop: {ex.Message}");
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
// Wait for all tasks to complete on shutdown
_logger.LogInformation("Waiting for all processing tasks to complete...");
try
{
Task[] tasksToWait;
lock (runningTasks)
{
tasksToWait = runningTasks.ToArray();
}
await Task.WhenAll(tasksToWait);
}
catch (Exception ex)
{
_logger.LogError($"Error waiting for tasks to complete: {ex.Message}");
}
try
{
_consumer.Unsubscribe();
_consumer.Close();
_logger.LogInformation("Kafka consumer closed gracefully.");
}
catch (Exception ex)
{
_logger.LogError($"Error during consumer shutdown: {ex.Message}");
}
}
protected abstract Task ProcessMessage(string message, IServiceProvider serviceProvider);
private async Task ProcessMessageWithRetryAsync(string message, IServiceProvider serviceProvider)
{
int retryCount = 0;
const int maxRetries = 3;
while (true)
{
try
{
// Create a new scope for each retry attempt
using var retryScope = serviceProvider.CreateScope();
await ProcessMessage(message, retryScope.ServiceProvider);
return; // Success
}
catch (Exception ex)
{
if (++retryCount > maxRetries)
{
_logger.LogError($"Failed to process message after {maxRetries} attempts. Error: {ex.Message}");
throw; // Rethrow after max retries
}
var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount)); // Exponential backoff
_logger.LogWarning($"Retry {retryCount}/{maxRetries} after {delay.TotalSeconds}s. Error: {ex.Message}");
await Task.Delay(delay);
}
}
}
public bool IsHealthy() =>
_isHealthy && DateTime.UtcNow - _lastSuccessfulConsume < TimeSpan.FromMinutes(5);
public ConsumerMetrics GetMetrics() => new ConsumerMetrics
{
MessagesProcessed = _totalMessagesProcessed,
FailedMessages = _failedMessages,
IsHealthy = IsHealthy()
};
public class ConsumerMetrics
{
public long MessagesProcessed { get; set; }
public long FailedMessages { get; set; }
public bool IsHealthy { get; set; }
}
}
}
Real-World Use Case: Analytics Event Tracker
Imagine a web app where users interact with learning materials. You want to track every document view:
- The Document Service sends an event to Kafka when a user views a document.
- The Analytics Service consumes that event and saves the view to MongoDB for analysis.
This decouples your services, improving scalability and fault tolerance.
Conclusion
Using Confluent.Kafka, you can build powerful consumer and producer services in .NET. By abstracting consumers, adding throttling, retries, and health checks, you ensure production-ready reliability.
Next Steps
- Add Prometheus metrics exporter
- Add Data Contracts for typed messages
- Integrate with OpenTelemetry
- Support Avro or Protobuf for typed messages
- Implement partition-specific consumers
Have questions or want to see the full GitHub repo? Let me know!
