ThanhNguyen logo
thanh_nguyen
kafka

Building Kafka Consumer & Producer in .NET

Building Kafka Consumer & Producer in .NET
6 min read
#kafka

Introduction

Apache Kafka is a powerful event streaming platform used to handle high-throughput and low-latency data pipelines. In modern .NET microservices, Kafka acts as the backbone for asynchronous messaging. In this post, we will walk through how to build a robust Kafka consumer and producer in .NET using the Confluent.Kafka library — with detailed explanations and real-world implementation.


What You’ll Learn

  • Setting up the Kafka configuration
  • Building a reusable Kafka consumer base class
  • Implementing parallel processing with throttling
  • Handling errors and retries gracefully
  • Health checking and metrics
  • Best practices with IServiceProvider and BackgroundService

Setting Up Kafka in .NET

Install the NuGet package:

dotnet add package Confluent.Kafka

Ensure you have access to a Kafka cluster (e.g., Confluent Cloud or local KRaft setup). Add the required settings in your appsettings.json or configuration provider:

"Kafka": {
  "BootstrapServers": "your-cluster:9092",
  "SaslUsername": "your-username",
  "SaslPassword": "your-password"
}

Abstract Kafka Consumer with BackgroundService

Let’s break down this example class step by step:

Step 1: Constructor Setup

public KafkaConsumerBase(...)

Here we:

  • Read Kafka configuration
  • Initialize the ConsumerBuilder
  • Subscribe to topic
  • Configure throttling for max concurrent message processing

Step 2: ExecuteAsync

This method runs continuously while the service is active. It:

  • Subscribes to Kafka
  • Polls for messages
  • Manages active processing tasks
  • Throttles concurrent workers

💡 Real-world case: Your document service publishes a message whenever a user creates a new document. Your analytics service consumes that message to track user behavior.

Step 3: Controlled Parallelism

_throttler = new SemaphoreSlim(maxConcurrentProcessing);

You control how many messages can be processed at once. This avoids overloading the system.

Step 4: Retry Logic

await ProcessMessageWithRetryAsync(...)

Each message is retried with exponential backoff before being marked as failed. This improves resilience when handling transient errors.

Step 5: Metrics and Health

You track:

  • Total messages processed
  • Failed messages
  • Health via heartbeat (_lastSuccessfulConsume)

Useful for dashboards, Prometheus exporters, etc.


Code Example (Simplified)

public class MyKafkaConsumer : KafkaConsumerBase<MyEvent>
{
    public MyKafkaConsumer(...) : base(...) { }

    protected override async Task ProcessMessage(string message, IServiceProvider serviceProvider)
    {
        var myService = serviceProvider.GetRequiredService<IMyService>();
        var data = JsonSerializer.Deserialize<MyEvent>(message);
        await myService.HandleAsync(data);
    }
}

🧠 Health Check

private volatile bool _isHealthy = true;
private DateTime _lastSuccessfulConsume = DateTime.UtcNow;

Health status is updated when errors or timeouts occur. We define a method to check if the consumer is healthy:

public bool IsHealthy() =>
    _isHealthy && DateTime.UtcNow - _lastSuccessfulConsume < TimeSpan.FromMinutes(5);

✅ Purpose: This allows external services (e.g., monitoring tools like Prometheus) to determine whether the consumer is healthy.


⛵ Controlled Concurrency with Semaphore

private readonly SemaphoreSlim _throttler;

The throttler limits how many messages can be processed in parallel:

_throttler = new SemaphoreSlim(maxConcurrentProcessing);

We wait to acquire a slot before processing a message:

bool acquired = await _throttler.WaitAsync(TimeSpan.FromSeconds(5), stoppingToken);

After processing:

_throttler.Release();

🎯 Why? Without a semaphore, you risk consuming more messages than your system can handle simultaneously.


🔁 Retry Logic for Processing Failures

private async Task ProcessMessageWithRetryAsync(string message, IServiceProvider serviceProvider)

Implements exponential backoff for failed message processing attempts:

var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));

After 3 attempts, it logs an error and rethrows the exception.

📌 Note: This is critical in distributed systems where transient failures are common.


📊 Metrics Tracking

private long _totalMessagesProcessed = 0;
private long _failedMessages = 0;

Helps monitor performance and stability. This is useful for dashboards or alerts.

_logger.LogInformation($"Processed {_totalMessagesProcessed} messages total...");

🧼 Graceful Shutdown and Cleanup

The consumer waits for all background tasks to complete before shutting down:

await Task.WhenAll(tasksToWait);
_consumer.Close();

This ensures no messages are lost and offsets are properly committed.


🧪 Summary

This advanced Kafka consumer pattern in .NET offers:

  • ✅ Health checks for monitoring
  • 🚥 Concurrency control with SemaphoreSlim
  • 🔁 Automatic retry logic
  • 📊 Metrics for observability
  • 🧹 Graceful shutdown handling

It’s designed for scalable, production-grade systems.

In the next blog, we’ll explore how to implement the producer side in .NET.


Producer Example

A fully Kafka producer example:

using Confluent.Kafka;
using System.Diagnostics;

namespace Application.Common.Kafka
{
	public abstract class KafkaConsumerBase<T> : BackgroundService
	{
		private readonly IConsumer<string, string> _consumer;
		private readonly IServiceProvider _serviceProvider;
		private readonly ILogger<KafkaConsumerBase<T>> _logger;
		private readonly string _topicName;
		private readonly SemaphoreSlim _throttler;

		// Metrics tracking
		private long _totalMessagesProcessed = 0;
		private long _failedMessages = 0;
		private readonly Stopwatch _processingStopwatch = new Stopwatch();

		// Health monitoring
		private volatile bool _isHealthy = true;
		private DateTime _lastSuccessfulConsume = DateTime.UtcNow;

		protected KafkaConsumerBase(
			IConfiguration configuration,
			ILogger<KafkaConsumerBase<T>> logger,
			IServiceProvider serviceProvider,
			string topicName,
			string groupId,
			int maxConcurrentProcessing = 10)
		{
			_logger = logger;
			_serviceProvider = serviceProvider;
			_topicName = topicName;
			_throttler = new SemaphoreSlim(maxConcurrentProcessing);

			var consumerConfig = new ConsumerConfig
			{
				BootstrapServers = configuration["Kafka:BootstrapServers"],
				SaslUsername = configuration["Kafka:SaslUsername"],
				SaslPassword = configuration["Kafka:SaslPassword"],
				SecurityProtocol = SecurityProtocol.SaslSsl,
				SaslMechanism = SaslMechanism.Plain,
				GroupId = groupId,
				AutoOffsetReset = AutoOffsetReset.Earliest,
				EnableAutoCommit = false,

				// Performance optimizations
				MaxPollIntervalMs = 300000,     // 5 minutes
				SessionTimeoutMs = 30000,       // 30 seconds
				FetchMaxBytes = 1048576,        // 1MB
				FetchMinBytes = 10240,          // 10KB
				FetchWaitMaxMs = 500,           // 500ms
				QueuedMaxMessagesKbytes = 1048576, // 1MB
				StatisticsIntervalMs = 5000     // Enable statistics for monitoring
			};

			_consumer = new ConsumerBuilder<string, string>(consumerConfig)
				.SetErrorHandler((_, error) =>
				{
					_logger.LogError($"Kafka error: {error.Reason}");
					if (error.IsFatal)
					{
						_isHealthy = false;
					}
				})
				.Build();
		}
		protected override async Task ExecuteAsync(CancellationToken stoppingToken)
		{
			await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
			_consumer.Subscribe(_topicName);
			_logger.LogInformation($"Subscribed to topic: {_topicName}");

			// Track currently running tasks
			var runningTasks = new List<Task>();

			// Periodically clean up completed tasks
			using var cleanupTimer = new Timer(_ =>
			{
				lock (runningTasks)
				{
					runningTasks.RemoveAll(t => t.IsCompleted);
				}
			}, null, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30));

			while (!stoppingToken.IsCancellationRequested)
			{
				try
				{
					// Check if we're at capacity
					if (runningTasks.Count >= _throttler.CurrentCount * 2)
					{
						_logger.LogWarning($"Task queue is full ({runningTasks.Count} running tasks). Pausing consumption.");
						await Task.Delay(TimeSpan.FromMilliseconds(500), stoppingToken);
						continue;
					}

					var consumeResult = _consumer.Consume(TimeSpan.FromSeconds(2));
					if (consumeResult != null)
					{
						_lastSuccessfulConsume = DateTime.UtcNow;

						// Wait for a processing slot using the throttler with a timeout
						bool acquired = await _throttler.WaitAsync(TimeSpan.FromSeconds(5), stoppingToken);

						if (!acquired)
						{
							_logger.LogWarning("Failed to acquire processing slot after timeout. Requeuing message.");
							continue;
						}

						// Process the message with controlled parallelism
						var processTask = Task.Run(async () =>
						{
							try
							{
								using var scope = _serviceProvider.CreateScope();
								_processingStopwatch.Restart();

								// Process with retry logic
								await ProcessMessageWithRetryAsync(consumeResult.Message.Value, scope.ServiceProvider);

								_processingStopwatch.Stop();
								Interlocked.Increment(ref _totalMessagesProcessed);

								if (_totalMessagesProcessed % 100 == 0)
								{
									_logger.LogInformation(
										$"Processed {_totalMessagesProcessed} messages total, " +
										$"last message processed in {_processingStopwatch.ElapsedMilliseconds}ms, " +
										$"active tasks: {runningTasks.Count}, " +
										$"available slots: {_throttler.CurrentCount}");
								}

								try
								{
									_consumer.Commit(consumeResult);
								}
								catch (Exception ex)
								{
									_logger.LogError($"Error committing offset: {ex.Message}");
								}
							}
							catch (Exception ex)
							{
								Interlocked.Increment(ref _failedMessages);
								_logger.LogError($"Error processing message: {ex.Message}");
							}
							finally
							{
								_throttler.Release();
							}
						}, stoppingToken);

						// Track the task
						lock (runningTasks)
						{
							runningTasks.Add(processTask);
						}
					}
				}
				catch (ConsumeException e)
				{
					_logger.LogError($"Consume error: {e.Error.Reason}");
				}
				catch (OperationCanceledException)
				{
					_logger.LogInformation("Kafka consumption was canceled.");
					break;
				}
				catch (Exception ex)
				{
					_logger.LogError($"Error in consumer loop: {ex.Message}");
					await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
				}
			}

			// Wait for all tasks to complete on shutdown
			_logger.LogInformation("Waiting for all processing tasks to complete...");
			try
			{
				Task[] tasksToWait;
				lock (runningTasks)
				{
					tasksToWait = runningTasks.ToArray();
				}

				await Task.WhenAll(tasksToWait);
			}
			catch (Exception ex)
			{
				_logger.LogError($"Error waiting for tasks to complete: {ex.Message}");
			}

			try
			{
				_consumer.Unsubscribe();
				_consumer.Close();
				_logger.LogInformation("Kafka consumer closed gracefully.");
			}
			catch (Exception ex)
			{
				_logger.LogError($"Error during consumer shutdown: {ex.Message}");
			}
		}
		
		protected abstract Task ProcessMessage(string message, IServiceProvider serviceProvider);

		private async Task ProcessMessageWithRetryAsync(string message, IServiceProvider serviceProvider)
		{
			int retryCount = 0;
			const int maxRetries = 3;

			while (true)
			{
				try
				{
					// Create a new scope for each retry attempt
					using var retryScope = serviceProvider.CreateScope();
					await ProcessMessage(message, retryScope.ServiceProvider);
					return; // Success
				}
				catch (Exception ex)
				{
					if (++retryCount > maxRetries)
					{
						_logger.LogError($"Failed to process message after {maxRetries} attempts. Error: {ex.Message}");
						throw; // Rethrow after max retries
					}

					var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount)); // Exponential backoff
					_logger.LogWarning($"Retry {retryCount}/{maxRetries} after {delay.TotalSeconds}s. Error: {ex.Message}");
					await Task.Delay(delay);
				}
			}
		}

		public bool IsHealthy() =>
			_isHealthy && DateTime.UtcNow - _lastSuccessfulConsume < TimeSpan.FromMinutes(5);

		public ConsumerMetrics GetMetrics() => new ConsumerMetrics
		{
			MessagesProcessed = _totalMessagesProcessed,
			FailedMessages = _failedMessages,
			IsHealthy = IsHealthy()
		};

		public class ConsumerMetrics
		{
			public long MessagesProcessed { get; set; }
			public long FailedMessages { get; set; }
			public bool IsHealthy { get; set; }
		}
	}
}

Real-World Use Case: Analytics Event Tracker

Imagine a web app where users interact with learning materials. You want to track every document view:

  • The Document Service sends an event to Kafka when a user views a document.
  • The Analytics Service consumes that event and saves the view to MongoDB for analysis.

This decouples your services, improving scalability and fault tolerance.


Conclusion

Using Confluent.Kafka, you can build powerful consumer and producer services in .NET. By abstracting consumers, adding throttling, retries, and health checks, you ensure production-ready reliability.


Next Steps

  • Add Prometheus metrics exporter
  • Add Data Contracts for typed messages
  • Integrate with OpenTelemetry
  • Support Avro or Protobuf for typed messages
  • Implement partition-specific consumers

Have questions or want to see the full GitHub repo? Let me know!