Back
Featured image of post .NET Core Hosted Service ile Azure Service Bus Kullanımı

.NET Core Hosted Service ile Azure Service Bus Kullanımı

Bu yazımızda .NET Core içerisinde gelen HostedServices ile Azure Service Bus ürünü nasıl konuşur ve Producer & Consumer konularını konuşacağız.

Bu yazımızda .NET Core içerisinde gelen HostedServices ile Azure Service Bus ürünü nasıl konuşur ve Producer & Consumer konularını konuşacağız.

Azure Service Bus

Microsoft Azure üzerinde PaaS olarak sunulan scalable bir messaging ürünüdür. Event Driven Architecture gibi konularda sık tercih edilen bu messaging ürünlerinden Cloud Vendor üzerinde sunulan bir servistir. Azure üzerinde RabbitMQ ve Apache Kafka da sunulmaktadır fakat bu diğer iki ürün arasındaki en büyük fark, infrastructure maliyetleri, maintenance gibi operasyonel süreçlerden kurtulup sadece scale ettiğiniz ve infrastructure ı düşünmeden çalışabilmenizi sağlamaktadır.

Photo from Azure Service Bus Pricing
Photo from Azure Service Bus Pricing

En temel fiyatlandırma yukarıdaki gördüğünüz tablo ile açıklanabilir. Bu fiyatlar West Europe fiyatlarıdır. Bu fiyatlandırmaları daha detaylandırmak ve incelemk isterseniz Azure Calculator üzerinden fiyatlandırmalarınızı hesaplayabilirsiniz

Azure Service Bus Oluşturma

Azure Service Bus oluşturmanız için en temel ihtiyaçlardan bir tanesi Azure üzerinde geçerli bir aboneliğinizin olması gerekmektedir.

Azure Portal üzerinden Create a Resource > Integration > Service Bus seçerek oluşturma ekranına girelim.

Buradaki en çok dikkat etmemiz gereken nokta Pricing tarafıdır. Azure, öneri olarak Premium sunmaktadır fakat buradaki ihtiyacınızı belirleyerek ilgili tier ı seçmeniz daha doğru olacaktır.

Photo from Azure Service Bus Pricing Tier Selection
Photo from Azure Service Bus Pricing Tier Selection

Buradaki Standard ve Premium tier ları arasında en önemli farklardan birisi resource un dedicated veya shared ortamlarda sunulmasıdır. Dedicated ortam daha performanslı ama maliyeti yüksek olacaktır. Yüksek seviyede yük sahibi iseniz Premium mantıklıdır. Standard ile Basic arasında ise en büyük farklılıklardan birisi topic kavramı Standard ve sonraki tierlar üzerinde gelmektedir. Bu yüzden orta ölçekli uygulamalar için Standard paketini seçmekteyim.

Standard to Premium migration için safe bir geçiş sağlamaktadır. Post-Migration ile tüm trafiğinizi Premium Service Bus üzerine aldığınızda artık Standard paketi kullanmadan devam edebilirsiniz. Aslında migration sürecinde iki farklı resource oluşturup aralarında post-migration sağlamaktadır.

Photo from Azure Service Bus Setup Page
Photo from Azure Service Bus Setup Page

Yukarıdaki ekran görüntüsündeki gibi resource ve location seçimlerimi yapıyorum ve oluşturuyorum.

Photo from Azure Service Bus Entities
Photo from Azure Service Bus Entities

Service Bus resource unu oluşturduğumuza göre artık Queue ve Topics arasındaki farkı konuşabiliriz. Bu yazımda çok darlamak istemiyorum fakat en temel farkını çok basitçe ve örnekle aktaracağım

Queue üzerinde bir mesajı bir consumer (tüketen) kullanabilirken, topics üzerinden geçen bir mesajı birden fazla consumer kullanabilmektedir.

Örneğin; ürün oluşturduğuna dair bir mesaj gönderiyorsunuz ve bu mesaj atıldıktan sonra SMS, Email, Teslimat paketi oluşturma işlemlerini yapıyorsunuz ve bunu paralel olarak yapmak istiyorsunuz diyelim. Bu noktada Topics kullanmanız gerekmektedir. Siz mesajı verdiğiniz zaman bu mesaj, SMS için ayrı, Email için ayrı ve Teslimat paketi için ayrı kullanılacak ve paralel olarak çalışabilecektir.

.NET Core & ServiceBus ile Consumer Projesi oluşturma

Boş console application oluşturalım ve içerisine birkaç package kurarak hazırlayalım.

dotnet new console -n ServiceBusExample

Ardından proje içerisine girelim ve aşağıdaki iki önemli paketi kuralım

dotnet add package Microsoft.Extensions.Hosting

Bu paket HostedServices ve Dependency Injection’ı configure edebilmenizi sağlaması açısından hazırlanmış bir pakettir.

dotnet add package Microsoft.Azure.ServiceBus

Bu paket ise Azure ServiceBus integration sağlamanız için gereken Client library package ıdır.

Geliştireceğimiz Consumer projesinde configuration larımızı CommandLine Arguments veya Environment Variables olarak alacağım. İsterseniz JsonFileConfiguration ekleyebilirsiniz. Configuration için Program.cs içerisinde basit bir configure method u yazalım

Burada Arguments olarak belirttiğimiz aslında Main methodunda gelen args parametresidir. Bu parametreyi class içerisine taşıyalım

Böylelikle Configuration methodu artık Arguments a erişebilir. Host umuzu basitçe ayağa kaldıralmanın vakti geldi. DefaultHostBuilder ile basit bir host build edeceğiz ve onu çalıştıracağız. Host kapanana kadar uygulamamız çalışacaktır. Böylelikle Kubernetes üzerinde rahatlıkla çalıştırabilir ve consumer projelerinizi ayaklandırabilirsiniz.

Host async olarak run edilebilidiği için burada Main methodumuzu async olarak çalıştırmaya başladık. Yazdığımzı Configure methodunu Host Configuration ı olarak verdik ve artık Command-line ve Environment Variables olarak configleri alabiliriz.

Artık HostedService oluşturmamızın vakti geldi 🎉🎉 Bir host içerisine birden fazla HostedService yerleştirebilirsiniz ve bu HostedServices lar üzerinde Dependency Injection uygulayabilirsiniz. ConsumerHostedService adında bir class oluşturalım ve bunu HostedService olarak initialize etmeye başlayalım.

Disposable bir hosted service yarattığımız zaman en temelde implement etmemiz gereken methodlar bu şekildedir. Start ve Stop bölümünde connection kurduğumuz external service leri open ve close edebiliriz. Dispose methodunda ise artık bu objeleri GC’ye teslim edebiliriz.

StartAsync ve StopAsync ile gelen CancellationToken lar önemlidir. Buradaki cancellationToken ları açacağınız connection veya thread lere vermenizi tavsiye ederim. HostedService durdurulduğunda böylelikle o anda gerçekleşen process i de CancellationToken sayesinde durdurabilirsiniz. Böylelikle uygulamanızı sağlıklı bir şekilde devam ettirmiş olur veya HostedService kapatıldığını anladığınzıda yapmanız gereken son işlemleri gerçekleştirip, uygulamanızın kapanmasını sağlayabilirsiniz.

Oluşturduğumuz HostedService i Program.cs içerisinde ConfigureService methodu oluşturarak Dependency Injection a eklemek adına hazırlayalım. Ardından bu methodumuzu HostBuilder içerisine verelim ve Host’u bu service ler ile birlikte oluşturmaya başlasın.

Uygulamamızı çalıştırdığımızda gördüğümüz gibi HostedService imiz çalışmaktadır.

Console App Output
Console App Output

Durdurmak istediğimizde ise gördüğünüz gibi kill etme süreci de bu şekilde işlemektedir.

Console App Output
Console App Output

Bunları tamamladığımıza göre artık ServiceBus Integration işlemlerimize geçebiliriz. Bu aşamalarda ServiceBus üzerinde Queue oluşturduğumuzu varsayarak geliştireceğiz. Son aşamada ServiceBus üzerinde Queue oluşturup aldığımız ConnectionString i uygulamamıza verip çalıştığını göreceğiz

Service Bus Integration

ConfigureServices methodumuzda ServiceBus için QueueClient initialize edelim ve IoC içerisine register edelim. Ardından da HostedService içerisine inject ederek consumer ımızın son aşamasını hazırlayalım.

Bu method içerisinde configuration üzerinden gelen ServiceBusConnectionString ve ServiceBusQueueName adında iki tane config alacağız ve bunlarla QueueClient ı initialize edeceğiz

Burada configler boş gelirse uygulamamızı direkt patlatıyoruz ve devam etmesini engelliyoruz. Bu configler bizim için required seviyesindedir. Configler düzgün geldiğinde artık QueueClient ı initialize edip, IQueueClient ile IoC mize register edebiliriz. Aynı zamanda Logging içinde AddLogging diyerek IoC içerisine register edelim. Console içerisine loglama yapabilmemiz için ihtiyacımız olacaktır.

ConsumerHostedService içerisinde QueueClient ve Logger ı inject edelim. Ardından Queue binding operasyonlarımızı yapalım

StartAsync içerisinde Queue üzerinde gelecek Message ları handle edebilmemiz için OnProcess adında bir methodumuzu register ediyoruz. Queue içerisine eklenen her mesajda OnProcess methodu tetiklenecektir. MessageHnadlerOptions üzerinde Concurrent işlenebilecek mesaj sayısını, işlenen mesajların otomatik silinmesi gibi ayarları yapabiliyorsunuz. Ben AutoComplete özelliğini kapatıyorum. Bunun sebebi, olası aldığım hatalarda tekrarlayabilmek adınadır. Eğer böyle bir ihtiyacınız yok ise AutoComplete i true olarak ayarlayabilir ve mesajlarınız exception almadığı sürece Queue üzerinden düşmeyecektir.

Hata alma durumunda aldığı exception ı OnError adında bir method a gönderelim. Ardından bu method basit bir loglama işlemi gerçekleştirsin. Burada custom logicler yazabilirsiniz. Örneğin bir mesajı 3 kez işlemeyi denesin, eğer 3 den fazla hata alırsa bu mesajı queue dan silsin gibi retry mekanizmaları yaratabilirsiniz

StopAsync içerisinde QueueClient ı stop ederek artık consume etmesini kapatabiliriz.

OnProcess içerisinde gelen message byte array olarak gelmektedir. Bu gelen byte array i JSON string e çevirip ardından Deserialize edebilirsiniz. Buradaki kritik nokta ise eğer yüksek boyutlarda mesajlar geliyorsa OutOfMemory yaşamamak için MemoryStream ile Stream Deserializer yapabilirsiniz. Böylelikle büyük JSON verilerini OutOfMemory hatası almadan deserialize edebilirsiniz. Async bir method çağırdığınız takdirde retun Task.CompletedTask dönmenize de gerek kalmayacaktır.

Service Bus Queue Oluşturma

Azure Portal üzerinden Queue oluşturalım ve ConnectionString alalım. Böylelikle uygulamamızı çalışması için gereken son aşamayı tamamlamış olacağız.

Create Azure Service Bus Queue
Create Azure Service Bus Queue

Burada ServiceBus üzerinde built-in gelen birtakım özellikler bulunmaktadır. Benim sevdiğim en güzel özelliklerden birisi duplication detection özelliğidir. Bu özelliği enable ettiğinizde belirli bir aralıktaki tekrarlanan mesajları engelleyebilirsiniz. Böylelikle sisteminizin daha stabil olmasını sağlarsınız. Bir diğer özelliği ise session özelliğidir. Burada ise queue içerisindeki mesajların atıldığı sırada consume edilmesinin garantisini vermektedir. First In First Out (FIFO) olarak çalışan ve sizin attığınız sırada çalışmasını garanti etmektedir. Böylelikle session oluşturarak farklı bir transactional yapı kullanabilirsiniz. Bu özellik distributed sistemler için tehlikelidir. Bu yüzden mimarinizi bunun üzerine kurmamanızı tavsiye edebilirim. Expire olan message ları otomatik olarak dead letter a atabilir ve message kaybetmenizi minimum a düşürebilirsiniz. Buradaki dikkat edebileceğiniz bir diğer özellik ise Lock Duration olacaktır. Ack göndermediğiniz mesajlar için belirlediğiniz bir lock time yaratıyor ve bu süre içerisinde başka consumer bu mesajı consume edemiyor. Bu süreyi aştığında ve Ack almadıysa başka consumerlar tarafından consume edilebilmektedir.

Add Azure Service Bus SAS Policy
Add Azure Service Bus SAS Policy

Oluşturduğumuz Queue üzerinden Shared access policies bölümünden Policy yaratalım. Burada oluşturacağımız Policy, tam olarak ne yapacağını belirtmekteyiz. Consumer için Listen Policy, Producer için ise Send Policy tanımlayarak iki farklı ConnectionString yaratabilir, böylelikle Security açısından ortamlarınızın güvenliğini sağlayabilirsiniz. Bu örneğimizde hem Consumer hem de producer olarak yapacağımız için Manage i tercih ediyoruz.

Azure Service Bus SAS Policy
Azure Service Bus SAS Policy

ConnectionString içerisindeki EntityPath i silebilirsiniz. Uygulamamız içerisinde zaten QueueName olarak vereceğimiz için ConnectionString e eklememize gerek yok.

Consumer Uygulamasını Test Edelim

Gerekli olan tüm aşamaları tamamladığımıza göre artık çalıştırabiliriz. Uygulamamıza Environment Variable ya da Command-line üzerinden ConnectionString ve QueueName i iletebiliriz. Ben Command-line üzerinden tercih edeceğim

dotnet run /"ServiceBusConnectionString={ServiceBusConnectionString}" /"ServiceBusQueueName={ServiceBusQueueName}"

Yukarıdaki komut içerisinde {ServiceBusConnectionString} ve {ServiceBusQueueName} kendi oluşturduğunuz ConnectionString ve QueueName ile değiştiriniz.

Azure Service Bus Send Message
Azure Service Bus Send Message

Ardından test için Azure Portal üzerinden oluşturduğunuz Queue içerisinde Service Bus Explorer ile mesaj gönderebilirsiniz.

Console App Output
Console App Output

Gördüğünüz gibi gönderdiğimiz message uygulamamıza iletildi ve log olarak yazdı. Consumer bölümümüz başarılı bir şekilde çalıştı.

Producer Uygulaması Geliştirme

Consumer ile Producer arasındaki en büyük farklardan birisi, RegisterMessageHandler ı çağırmanıza gerek yoktur. Yapmanız gereken IQueueClient ı mesajı göndermek istediğiniz class içerisine inject ettikten sonra aşağıdaki gibi örnek bir kod çalıştırmanız yeterli olacaktır. Örnek olarak consume ettiğim mesajı tekrar aynı queue ya bırakalım.

Gelen mesaj boş değilse producerData adında bir obje oluşturuluyor ve bu JSON string olarak serialize ediliyor ve ardından byte array e çevirilerek Message class ına verilerek gönderiliyor. Bu message class ı içerisinde custom properties ler de gönderebilirsiniz. Örneğin kaç kez retry ettiği, veya birtakım metadata lar olabilir (CorrelationId, Sender Agent Name vs.) SendAsync ile birden fazla mesajı aynı zamanda da gönderebilirsiniz. List olarak da kabul etmektedir. Burada da OutOfMemory yaşamamak adına Serialize işlemini de Stream olarak uygulayabilirsiniz

Sonlandıralım

Bu yazımızda temel olarak Azure Service Bus kullanımını ve bunu .NET Core uygulamalarında en ideal yöntem olan HostedService ile nasıl Consumer veya Producer olarak sağladığımızı, Dependency Injection uygulayarak temiz bir yapıda kurduk. Unit Test yazacağınız ve QueueClient ı kolayca Mocklayabileceksiniz. Böylelikle 3rd party integration larınızı da kolayca Mock layarak business logic testlerinizi yazabilirsiniz.

Twitter ve Github üzerinden takip etmeyi unutmuyoruzdur umarım 😇😇😇

Hugo ile oluşturuldu.
Stack teması Jimmy tarafından tasarlandı