BLOGas.lt
Sukurk savo BLOGą Kitas atsitiktinis BLOGas

ZeroMQ įrenginiai – standartinių šablonų apibendrinimas (5 dalis)

Parašė Sergejus | 2012-03-18 20:00

Praeitoje (4-oje) dalyje aš parodžiau kaip ZeroMQ pagalba įgyvendinti Publish / Subscribe šabloną bei aprašiau asinchroninį pranešimų siuntimą / gavimą. Šioje, paskutinėje, dalyje trumpai aptarsiu ZeroMQ įrenginius, kurie apibendrina prieš tai išnagrinėtus Request / Reply, Push / Pull i Publish / Subscribe šablonus. Pagrindinis šių šablonų trūkumas, klientai turi žinoti serverių adresus. ZeroMQ įrenginiai leidžia atrišti klientus nuo serverių.

Queue

Supaprastintai, Queue įrenginys veikia taip:

image

Paprasčiausia kliento realizacija:

using (var context = ZmqContext.Create())
using (var socket = context.CreateSocket(SocketType.REQ))
{
    socket.Connect("tcp://localhost:9001");

    for (var i = 0; ; i++)
    {
        var clientMessage = "CLIENT> Request #" + i;
        socket.Send(clientMessage, Encoding.Unicode);

        var serverMessage = socket.Receive(Encoding.Unicode);
        Console.WriteLine(serverMessage);

        Console.WriteLine("--------------------------------");
        Thread.Sleep(TimeSpan.FromSeconds(1));
    }
}

Paprasčiausia kliento realizacija:

using (var context = ZmqContext.Create())
using (var socket = context.CreateSocket(SocketType.REP))
{
    socket.Connect("tcp://localhost:9002");

    while (true)
    {
        var clientMessage = socket.Receive(Encoding.Unicode);
        Console.WriteLine(clientMessage);

        var serverMessage = "SERVER> Response" + Regex.Match(clientMessage, "#\\d+").Value;
        socket.Send(serverMessage, Encoding.Unicode);

        Console.WriteLine("--------------------------------");
    }
}

Paprasčiausia įrenginio realizacija:

using (var context = ZmqContext.Create())
{
    var queue = new QueueDevice(context, "tcp://*:9001", "tcp://*:9002", DeviceMode.Blocking);
    queue.Start();
}

Kaip matyti, visi klientai jungiasi prie įrenginio 9001 portu, o visi serveriai – 9002. Kadangi kiti įrenginiai apsirašo panašiai, toliau išvardinsiu tik pačius įrenginius.

Streamer

image

Forwarder

image

Pabaiga

Štai ir viskas ką norėjau papasakot apie ZeroMQ. Tikiuosi šis straipsnių ciklas buvo jums naudingas ir ZeroMQ užims garbingą vietą jūsų įrankių sąraše. Kaip visada, atsiliepimus ir klausimus galite palikti komentaruose.

Rodyk draugams

ZeroMQ – Publish / Subscribe šablono įgyvendinimas (4 dalis)

Parašė Sergejus | 2012-03-15 20:00

Praeitoje (3-oje) dalyje aš parodžiau kaip ZeroMQ pagalba įgyvendinti Push / Pull šabloną bei aprašiau bibliotekos siūlomą apkrovos paskirstymo (angl. load balancing) galimybę su automatiniu ryšio problemų aptikimu. Šį kartą įgyvendinsiu kitą svarbų šabloną – Publish / Subscribe, kuris įgalina asinchroninį komunikavimą tarp serverio ir klientų. Taipogi parodysiu kaip sinchroninius Send / Receive kvietimus galima pakeisti į asinchroninius.

Publish / Subscribe šablonas

Supaprastinai, Publish / Subscribe šablonas atrodo taip:

image

Paprasčiausia serverio realizacija atrodytų taip:

using (var context = ZmqContext.Create())
using (var socket = context.CreateSocket(SocketType.PUB))
{
    socket.Bind("tcp://*:9001");

    for (var i = 0; ; i++)
    {
        var clientMessage = "SERVER> Notification #" + i;
        socket.Send(clientMessage, Encoding.Unicode);
        Console.WriteLine(clientMessage);

        Console.WriteLine("--------------------------------");
        Thread.Sleep(TimeSpan.FromSeconds(1));
    }
}

Kliento realizacija šį kartą kiek įdomesnė, nes norint gauti pranešimus, reikia pirma prisinumeruoti:

using (var context = ZmqContext.Create())
using (var socket = context.CreateSocket(SocketType.SUB))
{
    socket.Connect("tcp://localhost:9001");

    socket.SubscribeAll();

    while (true)
    {
        var clientMessage = socket.Receive(Encoding.Unicode);
        Console.WriteLine(clientMessage);

        Console.WriteLine("--------------------------------");
    }
}

Atkreipkite dėmesį į metodą SubscribeAll. Juo mes pasakome, kad norime gauti absoliučiai visus siunčiamus pranešimus. ZeroMQ palaiko pranešimų filtravimą pagal prefiksą, tad norint gauti tik pranešimus prasidedančius „SERVER“, užtenka SubscribeAll pakeisti į Subscribe:

using (var context = ZmqContext.Create())
using (var socket = context.CreateSocket(SocketType.SUB))
{
    socket.Connect("tcp://localhost:9001");

    socket.Subscribe(Encoding.Unicode.GetBytes("SERVER"));

    while (true)
    {
        var clientMessage = socket.Receive(Encoding.Unicode);
        Console.WriteLine(clientMessage);

        Console.WriteLine("--------------------------------");
    }
}

Asinchroninis pranešimų siuntimas / gavimas

Kaip žinia, metodai Send ir Receive blokuoja programos vykdymą. Kartais gali praversti asinchroninis žinučių apdorojimas, tam ZeroMQ turi įvykius SendReady bei ReceiveReady ir klasę Poller. Asinchroninis klientas galėtų atrodyti taip:

using (var context = ZmqContext.Create())
using (var socket = context.CreateSocket(SocketType.SUB))
using (var poller = new Poller())
{
    socket.Connect("tcp://localhost:9001");

    socket.Subscribe(Encoding.Unicode.GetBytes("SERVER"));

    socket.ReceiveReady += (sender, args) =>
    {
        var clientMessage = args.Socket.Receive(Encoding.Unicode);
        Console.WriteLine(clientMessage);

        Console.WriteLine("--------------------------------");
    };

    poller.AddSocket(socket);

    while (true)
    {
        poller.Poll();

        // do more work here
    }
}

Toks asinchroninis klientas praverčia kai norima klausytis kelių soketų vienu metu. Užtenka į poller objektą pridėti norimus soketus ir ReceiveReady suveiks atėjus pranešimui į bet kurį iš klausomų soketų.

Kitoje dalyje

Nepaisant fakto, kad aukščiau pateiktas asinchroninio pranešimų apdorojimo pavyzdys yra Publish / Subscribe kontekste, tas pats principas galioja ir kitiems komunikavimo šablonams. Kitoje, paskutinėje, dalyje aptarsime taip vadinamus ZeroMQ įrenginius, kurie apibendrina 3 išnagrinėtus komunikavimo šablonus.

Rodyk draugams

ZeroMQ – Push / Pull šablono įgyvendinimas (3 dalis)

Parašė Sergejus | 2012-03-08 23:59

Praeitoje (2-oje) dalyje aš parodžiau kaip įgyvendinti Request / Reply šabloną ZeroMQ pagalba. Šį kartą įgyvendinsiu Push / Pull šabloną, kuris yra vienkrypčio komunikavimo pagrindas.

Push / Pull šablonas

Supaprastinai, Push / Pull šablonas atrodo taip:

image

Paprasčiausia serverio realizacija atrodytų taip:

using (var context = ZmqContext.Create())
using (var socket = context.CreateSocket(SocketType.PULL))
{
    socket.Bind("tcp://*:9001");

    while (true)
    {
        var clientMessage = socket.Receive(Encoding.Unicode);
        Console.WriteLine(clientMessage);

        Console.WriteLine("--------------------------------");
    }
}

Tikriausiai, jau įsivaizduojate kaip atrodo kliento dalis:

using (var context = ZmqContext.Create())
using (var socket = context.CreateSocket(SocketType.PUSH))
{
    socket.Connect("tcp://localhost:9001");

    for (var i = 0; ; i++)
    {
        var clientMessage = "CLIENT> Request #" + i;
        socket.Send(clientMessage, Encoding.Unicode);
        Console.WriteLine(clientMessage);

        Console.WriteLine("--------------------------------");
        Thread.Sleep(TimeSpan.FromSeconds(1));
    }
}

Jeigu palygintumėte Pull / Push serverio / kliento realizaciją su Request / Reply, pastebėtume, kad jos yra praktiškai vienodos, skiriasi tik du dalykai:

  • soketų tipai
  • komunikavimas tarp kliento ir serverio yra vienkryptis

ZeroMQ pagalba su minimaliais pakeitimais mes iš esmės pakeitėme komunikavimo šabloną!

ZeroMQ apkrovos paskirstymo galimybė

ZeroMQ pasižymi ne tik palaikomų komunikavimo šablonų bei protokolų įvairove, bet ir apkrovos paskirstymo galimybe (angl. load balancing) su automatiniu ryšio problemų aptikimu. Tarkime, visą apkrovą mes norime paskirstyti tarp 3 serverių, šiuo atveju aukščiau pateiktas kliento kodas atrodytų taip:

using (var context = ZmqContext.Create())
using (var socket = context.CreateSocket(SocketType.PUSH))
{
    socket.Connect("tcp://localhost:9001");
    socket.Connect("tcp://localhost:9002");
    socket.Connect("tcp://localhost:9003");

    for (var i = 0; ; i++)
    {
        var clientMessage = "CLIENT> Request #" + i;
        socket.Send(clientMessage, Encoding.Unicode);
        Console.WriteLine(clientMessage);

        Console.WriteLine("--------------------------------");
        Thread.Sleep(TimeSpan.FromSeconds(1));
    }
}

Kaip matote, užtenka tiesiog pridėti dar 2 Connect iškvietimus, ir ZeroMQ yra pakankamai sumanus, kad automatiškai pradėti balansuoti siunčiamus pranešimus tarp trijų serverių (round robin principu).

Kas atsitinka, kaip vienas iš serverių nepasiekiamas? Kaip jau rašiau pirmoje dalyje, ZeroMQ visus siunčiamus bei gaunamus pranešimus buferizuoja eilėse atmintyje. Soketui galima nustatyti SendHighWatermark ir ReceiveHighWatermark parametrus, kurie ir nusako siuntimo / gavimo buferių dydį. Pagal nutylėjimą į buferius telpa 1000 pranešimų, kuriems užsipildžius, tolimesnis siuntimas / gavimas tiesiog užsiblokuoja. ZeroMQ automatinis ryšio problemų aptikimas yra paremtas buferio dydžiais, tad kai siuntimo buferis užsipildo, serveris išimamas iš apkrovos balansavimo. Kadangi 1000 neišsiųsti žinučių gali būti pakankamai daug, greitesniam problemų aptikimui galima sumažinti SendHighWatermark iki 10:

using (var context = ZmqContext.Create())
using (var socket = context.CreateSocket(SocketType.PUSH))
{
    socket.SendHighWatermark = 10;

    socket.Connect("tcp://localhost:9001");
    socket.Connect("tcp://localhost:9002");
    socket.Connect("tcp://localhost:9003");

    for (var i = 0; ; i++)
    {
        var clientMessage = "CLIENT> Request #" + i;
        socket.Send(clientMessage, Encoding.Unicode);
        Console.WriteLine(clientMessage);

        Console.WriteLine("--------------------------------");
        Thread.Sleep(TimeSpan.FromSeconds(1));
    }
}

Kitoje dalyje

Tiek šiam kartui, tikiuosi man pavyko parodyti koks lankstus ir galingas ZeroMQ yra. Kitoje dalyje įgyvendinsime kitą labai svarbų komunikavimo šabloną – Publish / Subscribe.

Rodyk draugams

ZeroMQ – Request / Reply šablono įgyvendinimas (2 dalis)

Parašė Sergejus | 2012-03-06 20:33

Praeitoje dalyje aš aprašiau pagrindines ZeroMQ galimybes bei apribojimus, o šioje dalyje pateiksiu paprasčiausią Request / Reply principu veikiantį kliento / serverio pavyzdį.

Prieš pradedant

Norėdami pridėti ZeroMQ į sprendimą, Package Manager konsolėje įvykdykite komandą Install-Package clrzmq –Pre.

Request / Reply šablonas

Supaprastintai, Request / Reply šablonas atrodo taip:

Untitled

ZeroMQ dėka paprasčiausia serverio realizacija telpa į 10 eilučių:

using (var context = ZmqContext.Create())
using (var socket = context.CreateSocket(SocketType.REP))
{
    socket.Bind("tcp://*:9001");

    while (true)
    {
        var clientMessage = socket.Receive(Encoding.Unicode);
        Console.WriteLine(clientMessage);

        var serverMessage = "SERVER> Response" + Regex.Match(clientMessage, "#\\d+").Value;
        socket.Send(serverMessage, Encoding.Unicode);

        Console.WriteLine("--------------------------------");
    }
}

Kaip matyti, serverio pusėje mes sukuriame Reply tipo soketą, kuriam nurodome naudoti 9001 portą bei TCP protokolą. Kliento realizacija yra labai panaši į serverio:

using (var context = ZmqContext.Create())
using (var socket = context.CreateSocket(SocketType.REQ))
{
    socket.Connect("tcp://localhost:9001");

    for (var i = 0; ; i++)
    {
        var clientMessage = "CLIENT> Request #" + i;
        socket.Send(clientMessage, Encoding.Unicode);

        var serverMessage = socket.Receive(Encoding.Unicode);
        Console.WriteLine(serverMessage);

        Console.WriteLine("--------------------------------");
        Thread.Sleep(TimeSpan.FromSeconds(1));
    }
}

Kliento pusėje sukuriamas Request tipo soketas iš kurio bandoma prisijungti prie anksčiau aprašyto serverio. Pažymėtina, kad mes galime paleisti iš karto kelis klientus, kurie lygiagrečiai dirbs su tuo pačiu serveriu.

Kitoje dalyje

Tiek šiam kartui, kitoje dalyje ZeroMQ pagrindu įgyvendinsime Push / Pull šabloną bei išnagrinėsime ZeroMQ apkrovos paskirstymo (angl. load balancing) galimybę.

Rodyk draugams

ZeroMQ – Įvadas (1 dalis)

Parašė Sergejus | 2012-02-19 18:46

Šiandien pradedu straipsnių ciklą apie neįtikėtiną biblioteką ZeroMQ (ØMQ), kuri leidžia ženkliai supaprastinti paskirstytų programų kūrimą. Pradėsiu nuo to, kas nėra ZeroMQ. Nepaisant MQ santrumpos pavadinime, tai nėra eilių (angl. Message Queue) sprendimas kaip RabbitMQ, ActiveMQ ar MSMQ. Taipogi, tai nėra WCF ar Thrift RPC atitikmuo. Aš apibrėžčiau ZeroMQ kaip ypatingai greitą ir labai paprastą panaudojime buferizuotą soketų komunikavimo biblioteką.

Kam teko programuoti soketus (nesvarbu kokia programavimo kalba), tikriausiai pritars, kad tai nėra labai paprastas užsiėmimas reikalaujantis pakankamai gilių tinklo žinių. Reikia galvoti apie prisijungimus, atsijungimus, ryšio palaikymą, buferizavimą, tinklo problemų aptikimą, siuntimo pakartojimą ir dar daug kitų dalykų. ZeroMQ – tai labai patogi abstrakcija virš soketų. Ši biblioteka leidžia pamiršti apie visas su žinučių perdavimu susijusias problemas išlaikant soketų greitaveiką.

ZeroMQ galimybių apžvalga

ZeroMQ leidžia nesukti galvos dėl prisijungimo eiliškumo – klientai ir serveriai gali būti paleidžiami bet kokia eilės tvarka. Jeigu klientas pasileis pirmas ir reikalingo serverio nebus – visos žinutės bus kaupiamos kliento pusėje ir išsiųstos kai tik serveris taps prieinamas.

Dėl bibliotekos paprastumo (leidžiama siųsti ir priimti tik baitų masyvus), nereikia prisirišti prie konkretaus komunikavimo protokolo ir galima naudoti bet kurį duomenų formatą: JSON, BSON, Protocol Buffers (ProtoBuf) ir t.t.

Kadangi ZeroMQ yra žemo lygio abstrakcija virš soketų parašyta su C ir viduje veikia pilnai asinchroniškai, ji pasižymi ypatinga greitaveika. Lokaliame kompiuteryje man pavyko pasiekti 30 tūkstančių žinučių per sekundę perdavimo greitį, įskaitant duomenų serializavimą į ir iš ProtoBuf.

Kita nuostabi bibliotekos savybė – ji palaiko visus pagrindinius komunikavimo šablonus:

Papildomai, ZeroMQ palaiko tokius transporto protokolus kaip in-process, IPC, multicast ir TCP. Norimas protokolas nurodomas tiesiai prisijungimo eilutėje, tad jo keitimas yra trivialus konfigūracijos atnaujinimas.

Ir pabaigai, kas labiausiai mane sužavėjo – ZeroMQ automatiškai palaiko žinučių balansavimą (round-robin principu) bei turi lokalius buferius (eiles). Visos įeinamos ir išeinamos žinutės pirma dedamos į eiles, iš kurių žinutė po žinutės yra išimamos ir apdorojamos.

ZeroMQ apribojimai

Apart visų aukščiau išvardintų privalumų, manau būtina žinoti ir tam tikrus bibliotekos apribojimus. Skirtingai negu pilnaverčiai eilių sprendimai, ZeroMQ neturi centrinio brokerio, kas gali būti tiek privalumas, tiek ir trūkumas:

  • Brokerio nebuvimas leidžia pasiekti žymiai geresnių greitaveikos rezultatų negu bet koks kitas eilių sprendimas.
  • Brokeris eilių sprendimuose yra labai svarbus ir jam nulūžus – visos eilės sustoja. ZeroMQ tokios problemos neturi, nes centrinio komunikavimo taško nėra.
  • Architektūriškai, ZeroMQ negarantuoja žinučių eiliškumo išlaikymo, tad vėliau siųsta žinutė gali būti apdorota pirmiau už anksčiau siųstą.
  • Kadangi žinučių eilės saugomos atmintyje, nulūžus programai pradings ir visos neapdorotos / neišsiųstos žinutės.

Kitoje dalyje

Tiek šiam kartui, tikiuosi pavyko jus sudominti ZeroMQ galimybėmis. Kitoje dalyje sukursime kliento-serverio programas, kurios komunikuos Request / Reply principu su automatiniu žinučių balansavimu.

Rodyk draugams