Šiame straipsnyje norėčiau aptarti mano pamąstymus, kaip galima efektyviau apdoroti konvejerio tipo užduotis .NET platformoje.
Teorija
Pradėsime nuo to, kas yra konvejeris programavime. Konvejeris – tai nuoseklių žingsnių rinkinys, kur kiekvienas žingsnis žino tik prieš tai buvusį bei sekantį žingsnius. Dažnai dar tokia architektūra vadinama vamzdžiu (angl. pipeline). Žemiau pateiktas tokio konvejerio pavyzdys:

Kaip matyti, čia nuosekliai atliekami 3 žingsniai:
- duomenų nuskaitymas;
- nuskaitytų duomenų įrašymas;
- įrašytų duomenų suspaudimas.
Paprasčiausias būdas įgyvendinti konvejerį – viską daryti nuosekliai ir sinchroniškai. Be abejo, tai nėra pats efektyviausias įgyvendinimo būdas, nes einamuoju laiko momentu tik vienas iš žingsnių bus vykdomas. Efektyviau būtų jeigu visi trys konvejerio žingsniai būtų vykdomi lygiagrečiai, pvz.:
- nuskaitoma pirma duomenų porcija
- nuskaitoma antra duomenų porcija
- įrašoma pirma duomenų porcija
- nuskaitoma trečia duomenų porcija
- įrašoma antra duomenų porcija
- suspaudžiama pirma duomenų porcija
Tokiu būdu, po tam tikro laiko momento visi trys žingsniai bus vykdomi, o tai reiškia, kad ir pats konvejeris dirbs greičiau. Komunikaciją tarp žingsnių galima įgyvendinti įvykių pagalba: jeigu pirmas žingsnis atliko savo darbą – jis signalizuoja sekančiam žingsniui, kad jam atsirado darbo. Jeigu darbo nėra – žingsnis turėtų būti „užmigdomas“ ir laukti signalo.
Komunikacija tarp žingsnių yra efektyvi, bet jeigu vienas žingsnis dėl kažkokių priežasčių užstrigo, jo lauks visi kiti žingsniai, kas praktiškai reiškia konvejeris „sustoja“. Tam, kad žingsniai kuo mažiau įtakotų vienas kitą, šalia įvykių mechanizmo atsiranda eilės, kurios atlieka buferio vaidmenį:

Tokia architektūra vadinama SEDA (Staged event-driven architecture). .NET platformoje tokią architektūrą įgyvendinti tapo kaip niekada paprasta BlockingCollection klasės pagalba, apie kurią jau esu rašęs.
Praktika
Pradėsime nuo dviejų paprasčiausių interfeisų:
public interface IPipe
{
void Process();
}
public interface IStep
{
void Execute();
}
IPipe interfeisas tai ir bus mūsų konvejeris, o IStep – atitinkamai konvejerio žingsnis. Aprašykime aukščiau išvardintus žingsnius (su minimaliu funkcionalumu):
public class ReadStep : IStep
{
private readonly BlockingCollection<string> outputQueue;
public ReadStep(BlockingCollection<string> outputQueue)
{
this.outputQueue = outputQueue;
}
public void Execute()
{
Console.WriteLine("Reading");
Thread.Sleep(TimeSpan.FromSeconds(1));
outputQueue.Add("some text here");
}
}
public class WriteStep : IStep
{
private readonly BlockingCollection<string> inputQueue;
private readonly BlockingCollection<FileInfo> outputQueue;
public WriteStep(BlockingCollection<string> inputQueue, BlockingCollection<FileInfo> outputQueue)
{
this.inputQueue = inputQueue;
this.outputQueue = outputQueue;
}
public void Execute()
{
var text = inputQueue.Take();
Console.WriteLine("Writing " + text);
Thread.Sleep(TimeSpan.FromSeconds(2));
outputQueue.Add(new FileInfo("c:\\test"));
}
}
public class ZipStep : IStep
{
private readonly BlockingCollection<FileInfo> inputQueue;
public ZipStep(BlockingCollection<FileInfo> inputQueue)
{
this.inputQueue = inputQueue;
}
public void Execute()
{
var file = inputQueue.Take();
Console.WriteLine("Zipping " + file);
Thread.Sleep(TimeSpan.FromSeconds(5));
}
}
Atkreipkite dėmesį, kad žingsnių vykdymo trukmė specialiai padaryta skirtinga. Turint visus reikalingus žingsnius, beliko aprašyti patį konvejerį:
public class Pipe : IPipe
{
public void Process()
{
var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
var writeQueue = new BlockingCollection<string>();
var zipQueue = new BlockingCollection<FileInfo>();
var readStep = new ReadStep(writeQueue);
var writeStep = new WriteStep(writeQueue, zipQueue);
var zipStep = new ZipStep(zipQueue);
var steps = new List<Task>
{
taskFactory.StartNew(() => DoStep(readStep)),
taskFactory.StartNew(() => DoStep(writeStep)),
taskFactory.StartNew(() => DoStep(zipStep)),
};
Task.WaitAll(steps.ToArray());
}
private static void DoStep(IStep step)
{
while (true)
{
try
{
step.Execute();
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
}
}
}
Pateiktoje konvejerio realizacijoje metodas DoStep yra gerokai supaprastintas, realybėje jis turėtų mokėti pagal poreikį nutraukti žingsnio vykdymą, stabdant visą konvejerį.
Apibendrinimas
SEDA labai gerai tinka apkrautoms sistemoms, t. y. kurios apdoroja kelis šimtus veiksmų per sekundę. Šios architektūros pagalba galima ne tik paspartinti konvejerį, bet ir pagerinti bendrą sistemos patikimumą. Ką manote?
Rodyk draugams
Naujausi komentarai