MongoDB ScaleOut
This section is a proof of concept to show how you can create a custom ScaleOut
. In the previous section we took a look at what you need to do when creating a custom ScaleOut
, and now we will implement one.
The most popular ScaleOut
in XSockets is the one using Azure Service Bus
. However, this sample will show you that we (thanks to modularity) can ScaleOut
any way we want to.
Why MongoDB?
When doing ScaleOut
it is convenient to avoid polling a data-source for new messages. Polling is pretty much the opposite of what XSockets stands for.
MongoDB
has something called Capped Collections
. A capped collection is like a predefined buffer that you can limit by size and/or number of documents. When the buffer is full MongoDB
will remove the oldest document to be able to add new ones.
MongoDB
allows you to query the capped collections
with tailable cursors
. This means that we will continuously get documents as they are inserted into the capped collection
Source Code & Sample
The full code for this sample is available on GitHub. There is also a video with some explanation about how this works.
Code: https://github.com/uffebjorklund/MongoDbScaleOut
Demo: https://www.youtube.com/watch?v=eve2jx28uSA
Pre Req
Software
You need to install MongoDB on your machine (only takes a few minutes). Remember to start the database after installing, those steps are covered in the installation tutorial.
Nuget Packages
Besides XSockets you will also need the MongoDB.Driver package
Implementation
As you could see in the previous section about Custom ScaleOut
you have to implement 3 methods in the ScaleOut
module.
Init
This will be called when the server uses the ScaleOut
for the first time. In here we will make sure that we can connect to MongoDB
and that we have a capped collection
to use.
We will also make sure that we have some information about the server. The server information is an Id for the server as well as a document-id so that we know the is of the last document we got from MongoDB
. This way we do not need to get old messages when the server restarts.
public override void Init()
{
try
{
SetupServerInfo();
SetupDatabase();
EnsureCappedCollection();
InitSuccess = true;
}
catch (Exception ex)
{
Composable.GetExport<IXLogger>().Error(ex, "Failed to initialize MongoDB ScaleOut");
}
}
Subscribe
The subscribe method will create a tailable cursor
so that we can get messages/documents continuously from the capped collection
.
public override async Task Subscribe()
{
if (!InitSuccess) return;
//Setup listener for new docs from other servers
await Watch<BsonDocument>(collection);
}
Publish
When a message is published we create a new BsonDocument
and attach the IMessage
as well as the server-id. We attach the server-id since the query used to get a tailable cursor
has a filter excluding messages that has the same server-id.
public override async Task Publish(MessageDirection direction, IMessage message, ScaleOutOrigin scaleOutOrigin)
{
if (!InitSuccess) return;
try
{
await ScaleToMongoDb(message);
}
catch (Exception ex)
{
Composable.GetExport<IXLogger>().Error(ex, "Failed to publish in MongoDB ScaleOut");
}
}
private async Task ScaleToMongoDb(IMessage message)
{
//Create doc with server-id so that we only send this to other servers
var document = new BsonDocument
{
{ "sid" , this.SI.ServerId },
{ "data", new BsonDocument()
{
{ "t",message.Topic },
{ "d",message.Data },
{ "c",message.Controller }
}
}
};
//Insert into mongo capped collection
await collection.InsertOneAsync(document);
}
Summary
ScaleOut
over MongoDB
was very easy to implement, but do know that this is a proof of concept
and not recommended for production. Take a look at the source code and video (links at the beginning of the section).