Team XSockets.NET

Azure Service Bus ScaleOut

ScaleOut using Azure Service Bus is a very popular alternative! It is smooth and super easy to setup. You do not have to write a single line of code!

In this sample we will extend the sample we had in the default ScaleOut. If you do not remember we had a solution with 2 Console Applications. We will re-use that and scale over Azure instead of using the P2P socket.

When using the Azure Service Bus ScaleOut it is common to also host XSockets on Azure in a WorkerRole. That is covered in the Hosting section. A great benefit of hosting XSockets in a worker role when using the ScaleOut is that the WorkerRole will auto-scale and use Azure Service Bus as ScaleOut and you do not have to do anything!

Pre-Req

  • Azure Account
  • Azure SDK

Service Bus Account

Create a new Service Bus Namespace if needed at https://manage.windowsazure.com. Then you select your existing (or new) service bus. When selected you can click connection information at the bottom. This will show you the connection string and let you copy it.

azure connstring

Nuget packages

Since you (obviously) will be using Azure you will need to install an extra package.

Install-Package WindowsAzure.ServiceBus

ScaleOut Module for Azure Service Bus

Adding/Configuring the ScaleOut module for Azure Service Bus is really easy. It comes down to 2 simple steps.

  1. Add the Item-Template
  2. Add a unique server Id and the connection-string to your Azure Service Bus

1. Add the Item-Template

Under Add -> New Item you will find XSockets.NET 5 and the Azure Service Bus ScaleOut template.

azure service bus template

This template will provide you with a module that overrides the default ScaleOut.

2. Add the ConnectionString & Server Id

This new ScaleOut module will need 2 AppSettings. One for the Azure Service Bus and another to identify each server with a unique id. I use GUID's for ServerId, but as you can see below any unique string is ok.

<add key="XSockets.Scaleout.ServerId" value="serverY"/>
<add key="Microsoft.ServiceBus.ConnectionString" value="MySecrectAzureServiceBusConnString"/>

These 2 steps have to be repeated for both projects since they both will need to scale to our Azure Service Bus. Remember to set a unique server Id in the App.config. You cant have 2 identical Id's for the servers. The scaling will fail if the id is not unique!

Test the Azure Service Bus Scaleout

Just as we did in the Default ScaleOut we use Putty to test the scaling over Azure Service Bus.

We open two instances of Putty and connect one to localhost:8080 and the other to localhost:8181

I configured the solution to run both Console Applications on start, so this is what we got after starting the debugger.

scaleout servers

Then we start 2 instances of Putty and connects them to each server

putty init

After sending in the handshake PuttyProtocol, we open a controller instance by sending generic|1| If you have no idea what this means read the The Basics section as well as the Controllers section

handshake and open controller

Now the client to the right sends in a message on the generic controller generic|foo|bar. The message then arrives at both clients since the server ScaledOut the message coming in to the Azure Service Bus. Then the Service Bus sends the message to the other servers connected to the same Service Bus

scale out test

The Code

If you are curious abut the code without actually testing this... The item-template that we added looks like this.

Note: If you host an Azure Worker Role and scale the Worker Role you will have to make sure that you get unique id's for each server. Feel free to contact us if you need some guidance!

using System;
using System.Configuration;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using XSockets.Protocol;
using XSockets.Core.Common.Enterprise;
using XSockets.Core.Common.Socket;
using XSockets.Core.Common.Socket.Event.Interface;
using XSockets.Core.Common.Utility.Logging;
using XSockets.Core.XSocket.Model;
using XSockets.Enterprise.Scaling;
using XSockets.Plugin.Framework;
using XSockets.Core.Common.Protocol;
using XSockets.Plugin.Framework.Attributes;

/// <summary>
/// To use this scaleout you should install the nuget package for Azure Service Bus
/// 
/// Install-Package WindowsAzure.ServiceBus
/// 
/// Then get you connectionstring from http://manage.windowsazure.com/ and add it to 
/// the app.config for the key "Microsoft.ServiceBus.ConnectionString"    
/// 
/// Also add a GUID in the app.config that is unique, every server in the scaleout must have a unique id
/// for the "XSockets.Scaleout.ServerId" appsetting
/// If you host on azure and scale instances you have to make sure that each instance have a unique id, appsettings cant be used for that then.
/// </summary>
[Export(typeof(IXSocketsScaleOut), Rewritable = Rewritable.No, InstancePolicy = InstancePolicy.Shared)]
public class AzureServiceBusScaleOut : BaseScaleOut
{
    /// <summary>
    /// Server Identifier, to filter away messages from the sending server
    /// </summary>
    private string SID;

    /// <summary>
    /// Azure Service Bus Connection String
    /// </summary>
    private string _connString;

    /// <summary>
    /// Topic for scaled data
    /// </summary>
    private const string TopicName = "X";

    /// <summary>
    /// Publisher - for sending messages to Azure Service Bus
    /// </summary>
    private TopicClient _topicClient;

    /// <summary>
    /// Subscriber - for getting messages from Azure Service Bus
    /// </summary>
    private SubscriptionClient _subscriptionClient;

    /// <summary>
    /// Called at startup, setup/prepare your scaleout
    /// </summary>
    public override void Init()
    {
        try
        {
            this._connString = ConfigurationManager.AppSettings.Get("Microsoft.ServiceBus.ConnectionString");
            this.SID = ConfigurationManager.AppSettings.Get("XSockets.Scaleout.ServerId");
            SetupAzureServiceBus();
        }
        catch (Exception ex)
        {
            Composable.GetExport<IXLogger>().Error(ex, "Could not initialize Azure Service Bus ScaleOut");
        }
    }

    private void SetupAzureServiceBus()
    {
        Composable.GetExport<IXLogger>().Debug("Azure ServiceBus Scaling - INIT");
        NamespaceManager namespaceManager = NamespaceManager.CreateFromConnectionString(_connString);
        try
        {
            TopicDescription myTopic = null;

            if (!namespaceManager.TopicExists(TopicName))
            {
                Composable.GetExport<IXLogger>().Debug("Creating Topic for Azure Service Bus");
                myTopic = namespaceManager.CreateTopic(TopicName);
            }
            else
            {
                Composable.GetExport<IXLogger>().Debug("Getting Topic for Azure Service Bus");
                myTopic = namespaceManager.GetTopic(TopicName);
            }

            if (namespaceManager.SubscriptionExists(myTopic.Path, SID))
            {
                Composable.GetExport<IXLogger>().Debug("Delete old subscription for Azure Service Bus");
                namespaceManager.DeleteSubscription(myTopic.Path, SID);
            }
            Composable.GetExport<IXLogger>().Debug("Creating Subscription for Azure Service Bus");
            var filter = new SqlFilter(string.Format("SID != '{0}'", SID));
            namespaceManager.CreateSubscription(TopicName, SID, filter);

            this._topicClient = TopicClient.CreateFromConnectionString(_connString, myTopic.Path);
            this._subscriptionClient = SubscriptionClient.CreateFromConnectionString(_connString, myTopic.Path, SID);
        }
        catch (MessagingException e)
        {
            Composable.GetExport<IXLogger>().Error("Failed to setup scaling with Azure Service Bus: {e}", e.Message);
        }
    }

    public override async Task Publish(MessageDirection direction, IMessage message, ScaleOutOrigin scaleOutOrigin)
    {
        Composable.GetExport<IXLogger>().Debug("Azure ServiceBus Scaling - PUBLISH {@m}", message);
        await _topicClient.SendAsync(GetBrokerMessage(message));
    }

    private BrokeredMessage GetBrokerMessage(IMessage message)
    {
        // Create message, passing a string message for the body
        var m = new BrokeredMessage();
        m.Properties["JSON"] = this.Serializer.SerializeToString(message);
        m.Properties["SID"] = SID;
        return m;
    }

    public override async Task Subscribe()
    {
        Composable.GetExport<IXLogger>().Debug("Azure ServiceBus Scaling - SUBSCRIBE");

        var options = new OnMessageOptions { AutoComplete = false, AutoRenewTimeout = TimeSpan.FromMinutes(30) };
        await Task.Run(() => _subscriptionClient.OnMessage(OnBrokerMessage, options)).ConfigureAwait(false);
    }
    private void OnBrokerMessage(BrokeredMessage message)
    {
        try
        {
            Composable.GetExport<IXLogger>().Debug("Message Arrived {@m}", message);
            var m = this.Serializer.DeserializeFromString<Message>(message.Properties["JSON"].ToString());
            var pipe = Composable.GetExport<IXSocketPipeline>();
            var ctrl = Composable.GetExports<IXSocketController>().First(p => p.Alias == m.Controller);
            ctrl.ProtocolInstance = new XSocketInternalProtocol();
            pipe.OnIncomingMessage(ctrl, m);
            message.Complete();
        }
        catch (Exception)
        {
            // Indicates a problem, unlock message in subscription
            if (message.DeliveryCount > 3)
                message.DeadLetter();
            else
                message.Abandon();
        }
    }
}

results matching ""

    No results matching ""