Kas čia? Šio puslapio pagalba gali išsaugoti įrašą tolimesniam naudojimui, arba parodyti savo draugams per socialinius tinklus. Pranešimą apie įrašą galima nusiųsti ir el. paštu.

Kur norite publikuoti?

Nusiųsk draugui el. paštu

E-mail It
2010-10-19

Pingy - #5 Windows Azure Queue ir PingTaskRepository

Publikuota: Cloud Computing

Primenu, Pingy – tai mano bandymas sukurti paslaugą, kuri kas tam tikrą laiką kreiptųsi į vartotojo nurodytą URL, taip neduodant IIS užmigti. Tuo pačiu metu bus renkama statistika apie serverio būseną, atsakos laikas ir pan. Pingy yra kuriamas specialiai Windows Azure mokymosi tikslais.

Turinys

Pingy - #1 Architektūra
Pingy - #2 Windows Azure Table ir NoSQL mąstymas
Pingy - #3 PingItem ir PingItemRepository
Pingy - #4 PingItem repozitorijos atnaujinimas metodu GetItemsByPeriod
Pingy - #5 Windows Azure Queue ir PingTaskRepository
Pingy - #6 Windows Azure Queue korektiškas žinučių apdorojimas
Pingy - #7 Pinger serviso kūrimas
Pingy - #8 PingReportItem ir PingReportRepository

Praeitoje dalyje mes užbaigėme PingItemRepository, o šitoje – aptarsime Windows Azure Queue bei jos panaudojimą aprašant PingTaskRepository.

Windows Azure Queue apžvalga

Eilės – labai galingas mechanizmas. Jų pagalba galima ne tik padalinti apkrovą tarp nurodytų serverių, bet ir logiškai atskirti servisus-gamintojus nuo apdorojimo servisų. Taip, N servisų-gamintojų gali siųsti pranešimus (uždavinius) į eilę, o M servisų imti tuos uždavinius paeiliui ir apdoroti. Windows Azure turi nuosavą eilių realizaciją – Windows Azure Queue. Iš plečiamumo pusės, mes galime keisti servisų-gamintojų ir apdorojimo servisų skaičių pagal poreikį nepriklausomai vienas nuo kito. Jeigu ateis momentas, kad lėčiausia vieta taps Windows Azure Queue – reikės naudoti kelias eiles skirtingoms užduotims atlikti (taip vadinamas eilių padalijimas, angl. Queue partitioning).

Windows Azure Queue pranešimų apdorojimui naudoja taip vadinamą dviejų fazių patvirtinimą:

  • Pasiima pranešimą (užduotį) iš eilės ir pažymį jį kaip nematomą
  • Jeigu pranešimo apdorojimo metu kils klaida, po tam tikro laiko pranešimas vėl taps matomu ir bus grąžintas į eilę
  • Jeigu pranešimas buvo sėkmingai apdorotas – jis turi būti išreikštinai pašalintas iš eilės

PingTask ir susijusios klasės

Prisiminkime pirmoje dalyje aprašytą architektūrą:

alt

Kaip matyti, kreipimasis į serverius yra inicijuojamas Ping Scheduler serviso, patalpinant atitinkamą pranešimą (užduotį) į eilę. Tada kiekvienas iš Pinger servisų iš eilės pasiima po užduotį ir pradeda jas vykdyti. Užduoties esybę mes modeliuosime naudojant klasę PingTask:

public class PingTask
{
    public string Id { get; set; }
    public string PopReceipt { get; set; }
    public int DequeueCount { get; set; }

    public PingItem PingItem { get; set; }
}

Čia PingItem – prieš tai aprašyta esybė, kuri gaunama ir Azure Table saugyklos. Kitos savybės yra nustatomos Windows Azure Queue pusėje ir reikalauja detalesnio aprašymo:

  • Id – unikalus pranešimo identifikatorius (automatiškai generuojamas)
  • PopReceipt – gavėjas, kuris išėmė pranešimą iš eilės
  • DequeueCount – nusako kiek kartų pranešimas buvo išimtas iš eilės (dažnai reiškia, kiek kartų jau buvo bandoma nesėkmingai apdoroti pranešimą, po ko nulūždavo apdorojimo servisas ir pranešimas buvo grąžinamas atgal į eilę)

Kadangi Windows Azure Queue turi nuosavą pranešimo klasę CloudQueueMessage, mums reikalingas labai paprastas esybių konverteris:

public static class PingTaskConverter
{
    public static PingTask ToPingTask(this CloudQueueMessage message)
    {
        return new PingTask
        {
            Id = message.Id,
            PopReceipt = message.PopReceipt,
            DequeueCount = message.DequeueCount,
            PingItem = message.AsString.FromJson<PingItem>()
        };
    }
}

CloudQueueMessage pranešimo turinys pasiekiamas dviem būdais – savybės AsByte arba AsString pagalba, todėl papildysime mūsų Helper klasę iš 3 dalies dviem metodais: ToJson ir FromJson:

internal static class Helper
{
    public static string SanitizeUrl(this string s)
    {
        var str = s.ToLower();
        str = Regex.Replace(str, @"(http|https)://", String.Empty);
        str = Regex.Replace(str, @"[^a-z0-9_.]", "-");

        return str;
    }

    public static string ToJson<T>(this T obj)
    {
        string json;

        var serializer = new DataContractJsonSerializer(typeof(T));
        using (var stream = new MemoryStream())
        {
            serializer.WriteObject(stream, obj);
            json = Encoding.Default.GetString(stream.ToArray());
        }

        return json;
    }

    public static T FromJson<T>(this string json)
    {
        T obj;

        var serializer = new DataContractJsonSerializer(typeof(T));
        using (var stream = new MemoryStream(Encoding.Default.GetBytes(json)))
        {
            obj = (T)serializer.ReadObject(stream);
        }

        return obj;
    }
}

PingTask repozitorija

Darbui su eile, aprašysime labai paprastą repozitorijos interfeisą IPingTaskRepository:

public interface IPingTaskRepository
{
    PingTask GetTask();
    void AddItem(PingItem item);
    void DeleteTask(PingTask task);
    void ClearTasks();
}

Šio interfeiso realizacija pateikiama žemiau:

public class PingTaskRepository : IPingTaskRepository
{
    private readonly CloudQueue queue;
    private readonly string queueName = typeof(PingTask).Name.ToLowerInvariant();
    private readonly TimeSpan visibilityTimeout = TimeSpan.FromMinutes(3);

    public PingTaskRepository(CloudStorageAccount account)
    {
        var client = account.CreateCloudQueueClient();
        queue = client.GetQueueReference(queueName);
        queue.CreateIfNotExist();
    }

    public void AddItem(PingItem item)
    {
        var message = new CloudQueueMessage(item.ToJson());
        queue.AddMessage(message);
    }

    public PingTask GetTask()
    {
        PingTask task = null;

        var message = queue.GetMessage(visibilityTimeout);
        if (message != null)
        {
            task = message.ToPingTask();
        }

        return task;
    }

    public void DeleteTask(PingTask task)
    {
        queue.DeleteMessage(task.Id, task.PopReceipt);
    }

    public void ClearTasks()
    {
        queue.Clear();
    }
}

Kodas yra pakankamai paprastas, bet verta paminėti du dalykus:

  • nematomumo periodas nustatomas trims minutėms po kurio jeigu pranešimas nėra ištrintas, jis bus grąžintas atgal į eilę
  • eilės pavadinimas privalo būti iš mažųjų raidžių, todėl išreikštinai darome ToLowerInvariant

Kitoje dalyje…

Pamatę PingTaskRepository realizaciją, gali kilti klausimas dėl jos panaudojimo (tiksliau kaip teisingai apdoroti skirtingas išskirtines situacijas). Kitoje dalyje pabandysiu aprašyti visas įmanomas problemas, kurios gali kilti apdorojant Windows Azure Queue pranešimus bei pateiksiu teisingo apdorojimo šabloną.

Patiko (0)


Atgal į: Pingy - #5 Windows Azure Queue ir PingTaskRepository