Model aktorów – implementacja z użyciem biblioteki gevent

Cześć,
zgodnie z poprzednim wpisem – staram się pisać częściej, a skoro karp już zjedzony i popity kompotem z suszu (obu nie lubię ;) ) to można by coś napisać. Od jakiegoś czasu przymierzam się do napisania o gevencie – tj. implementacji pętli zdarzeń dla Pythona. I ten czas – nastał właśnie teraz.

Zacznijmy więc może od tego czym jest pętla zdarzeń. A może nie, zacznijmy od tego jak możemy wykorzystać wątki do zapewnienia przetwarzania wielowątkowego.
Działa więc to tak że jak przychodzi jakiś request to cała jego obsługa jest robiona w nowym wątku (lub w wątku leżącym w puli wątków) – każdy wiec taki request jest całkowicie niezależny od innego – brzmi to całkiem nieźle. Aczkolwiek może też powodować problemy – ponieważ dla każdego połączenia tworzony jest nowy wątek, co angażuje zasoby systemowe w jego stworzenie jak i w „administrację” nim – przełączanie itp. Co więcej, jeśli wątek natrafi na jakieś żądanie I/O tj. robienie selecta do bazy czy czytanie z pliku – wątek ten będzie leżał nieużywany – co powoduje marnowanie zasobów. Przy wątkach dochodzi jeszcze problem ich synchronizacji – tj takiego manewrowania nimi żeby nie zaszkodzić sobie nawzajem przy dostępnie do części wspólnych a więc – tworzenie sekcji krytycznych czy odpowiednie poziomy transakcji na bazie danych.

Jako alternatywę mamy właśnie pętle zdarzeń, której architektura wygląda tak:

wchodząc w głębie tego schematu – pętla zdarzeń pracuje na pojedynczym wątku, przez co w jednym czasie może robić tylko jedną rzecz. Żongluje ona zadaniami do wykonania, biorąc pod uwagę priorytety zadań. Jeśli zadanie zostanie odpalone przez pętle zdarzeń, ale zacznie np. czytać z bazy danych (mieć operację I/O) to takie zadanie przestanie blokować pętle i pętla zacznie wykonywać inne zadanie z kolejki – a zadanie z operacją I/O wróci do kolejki gdy ta operacja się zakończy. A więc jak widać, tutaj jednostką wykonawczą nie jest wątek. Wszystko dzieje się w jednym wątku, przez co nie ma problemów z synchronizacją, nie nakładamy na system operacyjny ciężaru obsługi kolejnych wątków a wszystko jest nadal „współbieżne”.

Modeli użycia pętli zdarzeń jest kilka – my zajmiemy się biblioteką gevent i modelem aktorów. Gevent w najprostszym użyciu wygląda tak:

1
2
3
4
5
6
7
>>> import gevent
>>> from gevent import socket
>>> urls = ['www.google.com', 'www.example.com', 'www.python.org']
>>> jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
>>> gevent.joinall(jobs, timeout=2)
>>> [job.value for job in jobs]
['74.125.79.106', '208.77.188.166', '82.94.164.162']

Odpalamy tu tyle greenletów (tak nazywa się jednostka wykonawcza w gevencie – tłumaczy się to jako „lekkie wątki”) ile jest elementów tablicy urls – i odpalamy wszystkie, czekając na wszystkie, max 2 sekundy. Każdy nasz greenlet odpala funkcję gethostbyname dla zadanego adresu url.

My się pobawimy troszkę inaczej, tak jak pisałem wcześniej – zaimplementujemy wzorzec aktorów.

Wzorzec ten tworzy abstrakcję – Aktora. Jest to jednostka wykonawcza której stan jest modyfikowany tylko i wyłączenie przez nią samą. Komunikuje się innymi aktorami poprzez wiadomości – a wiec aktor może odbierać wiadomości i wysyłać je. No i rzecz jasna – coś z nimi robić. Skrzynka odbiorcza aktora jest kolejką – a więc jeden aktor może w jednym momencie przetwarzać jedną wiadomość – reszta jest kolejkowana.

Zbudujemy system przetwarzania obrazów oparty o model aktorów. Będziemy mieli dwóch specjalnych aktorów, tj. aktora który stanowi wejście do systemu i taki który stanowi wyjście z systemu. Pomiędzy nimi będzie n aktorów którzy będą dokładać swoje cegiełki do przetwarzanego obrazu. Więc pomysł działania jest prosty:

Aktor Startowy (odbiera jaki plik przetworzyć i jakie efekty na niego nałożyć)
.. przetwarzanie
.. przetwarzanie
Aktor końcowy (zapisuje obraz z nałożonymi wcześniej efektami do pliku).

Zacznijmy więc od napisania klasy po której będą dziedziczyli aktorzy:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import gevent
from gevent.queue import Queue
from gevent.local import local
from exceptions import NotImplementedError
from manager.manager import manager

class AbstractActor(gevent.Greenlet):

    def __init__(self):
        self.inbox = Queue()
        print 'Creating %r ...' % self
        gevent.Greenlet.__init__(self)

    def process(self, args):
        raise NotImplementedError()

    def receive(self, message):  
        arguments = message['args']
        path = message['path']
        if len(path) > 1:
            path.pop(0)
            next_step = path[0]
        else:
            next_step = None

        print 'Processing in %r, next will be %r' % (self, next_step)
        args = self.process(arguments)
        if args:
            manager.go_next(next_step, {
                'args': args,
                'path': path
            })

    def _run(self):
        self.running = True
        print 'Waiting in %r' % self
        while self.running:
            message = self.inbox.get()
            self.receive(message)

Jak widać aktor będzie dziedziczył pośrednio po klasie Greenlet – dzięki czemu będzie mógł zostać dodany do pętli zdarzeń. Jego główna funkcja która odpali się od razu po jego uruchomieniu to funkcja _run() – przeciążamy ją i w pętli czekamy na wiadomości. Jeśli wiadomość nadejdzie – przekazujemy ją do funkcji receive() która podzieli wiadomość na części, wykona funkcję process(), którą nadpiszemy dla każdego konkretnego aktora i przekażemy info o zakończeniu przetwarzana do managera pętli.

Manager to nasza klasa która będzie trzymać pieczę nad tym by komunikaty były przesyłane do właściwych aktorów:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import random
import gevent

class Manager():

    def __init__(self):
        self.dictionary = dict()

    def add(self, obj):
        class_name = obj.__class__.__name__
        if self.dictionary.has_key(class_name):
            self.dictionary[class_name].append(obj)
        else:
            self.dictionary[class_name] = [obj]

    def get_next_actor(self, name):
        if not self.dictionary.has_key(name):
            print 'Actor %s not found!' % name
            name = 'FinishActor'
        return random.choice(self.dictionary[name])

    def go_next(self, next_path, args):
        actor_instance = self.get_next_actor(next_path)
        print 'Putting into  %r' % actor_instance
        actor_instance.inbox.put(args)
        gevent.idle()

manager = Manager()

jak widać – posiada ona słownik ze wszystkimi aktorami i po podanych argumentach wybiera kolejnego aktora, wrzucając mu do skrzynki wiadomość. Zauważ proszę że jeśli aktor nie zostanie znaleziony – przeskakujemy do aktora końcowego oraz jeśli mamy więcej niż jednego aktora o tej samej nazwie – wybrany zostanie losowy.

Jako że aktor startowy nie posiada własnej skrzynki odbiorczej, czyli w sumie to nie jest aktorem w rozumieniu tego wzorca – nie bedzie dziedziczył po klasie AbstractActor, zbudujemy dla niego jej minimalistyczną formę:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class StartActor(gevent.Greenlet):

    def __init__(self):
        gevent.Greenlet.__init__(self)
        # self.prepare_input()

    def get_path(self):
        """
        Define in your subclass.
        """

        raise NotImplementedError()

    def prepare_input(self):
        """
        Define in your subclass.
        """

        raise NotImplementedError()

    def _run(self):
        self.prepare_input()

I zaimplementujemy już naszego aktor startowego:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class RedisActor(StartActor):

    CHANNEL = 'tasks'
    SEPARATOR = '_'
    PATH_SEPARATOR = '=>'
    def __init__(self):
        self.connection = redis_connector.StrictRedis(host='localhost', port=6379, db=0)
        self.subscriber = self.connection.pubsub(ignore_subscribe_messages=True)
        self.path = ''
        StartActor.__init__(self)

    def get_path(self):
        return self.path.split(self.PATH_SEPARATOR)

    def handle(self, msg):
        arguments, path = msg.split(self.SEPARATOR)
        arguments = self.connection.get(arguments)
        path = self.connection.get(path)
        if not path or not arguments:
            return
        self.path = path
        parsed_path = self.get_path()
        im = Image.open(arguments)

        manager.go_next(parsed_path[0], {'path': parsed_path, 'args':im})

    def prepare_input(self):
        self.subscriber.subscribe(self.CHANNEL)
        for msg in self.subscriber.listen():
            self.handle(msg['data'])

Jak widać, jako interfejsu wejściowego użyłem bazy noSql’owej – Redis. Nasłuchuje ona na kanał „tasks” gdzie dostaje stringa w formacie xxxx_yyyy gdzie xxxx to klucz pod który znajduje się ścieżka do pliku który ma być przetworzony a pod kluczem yyyy jest ścieżka przetwarzania w formacie AAA=>BBB. Całość tego dzieje się w funkcji

1
2
3
4
5
6
7
8
9
10
11
    def handle(self, msg):
        arguments, path = msg.split(self.SEPARATOR)
        arguments = self.connection.get(arguments)
        path = self.connection.get(path)
        if not path or not arguments:
            return
        self.path = path
        parsed_path = self.get_path()
        im = Image.open(arguments)

        manager.go_next(parsed_path[0], {'path': parsed_path, 'args':im})

zauważ proszę że do kolejnego aktora nie jest wysyłany obiekt z ścieżką pliku a obiekt z już uchwytem do otwartego pliku. Wyżej pokazany manager bierze tablice przetwarzania w formacie [‚AAA’, ‚BBB’] dla argumentu ‚AAA=>BBB’, wycina element z góry stosu, przekazuje pozostałą część do wyciętego Aktora, ten przetwarza i cykl się powtarza.

Stworzyłem sobie dwóch aktorów przetwarzajacych:

1
2
3
4
5
6
7
from ..abstract_actor import AbstractActor
from PIL import ImageFilter

class Blur(AbstractActor):
    def process(self, img):
        blurred = img.filter(ImageFilter.BLUR)
        return blurred
1
2
3
4
5
6
7
from ..abstract_actor import AbstractActor
from PIL import ImageFilter

class Detail(AbstractActor):
    def process(self, img):
        detailed = img.filter(ImageFilter.DETAIL)
        return detailed

odpowiednio – efekt rozmycia i wyostrzenia.

Na koniec jeszcze aktor kończący:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from ..abstract_actor import AbstractActor
import string, random


class FinishActor(AbstractActor):

    def random_generator(self, size=6, chars=string.ascii_uppercase + string.digits):
        return ''.join(random.choice(chars) for x in range(size))

    def process(self, img):
        print 'Saving in format %r .... ' % 'jpg'
        filename = '/home/mmazurek/' + self.random_generator() + '.jpg'
        img.save(filename, img.format)
        print 'Saved in %r' % filename

I zepnijmy wszystko do kupy :)

Plik main.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import gevent
from gevent.monkey import patch_all
from actors.abstract_actor import AbstractActor
from actors.finish_actors import *
from actors.initial_actors import *
from actors.processing_actors import *
ABSTRACT_ACTORS = ['StartActor']

def inheritors(klass):
    subclasses = []
    work = klass
    while work:
        parent = work.pop()
        for child in parent.__subclasses__():
            if child not in subclasses:
                if child.__name__ not in ABSTRACT_ACTORS:
                    subclasses.append(child)
                work.append(child)
    return subclasses

def setup():
    all_actors = inheritors([AbstractActor, StartActor])
    all_actors_instances = [actor() for actor in all_actors]
    to_join = []
    for actor in all_actors_instances:
        actor.start()
        manager.add(actor)
        to_join.append(actor)

    print 'Created %r' % to_join

    gevent.joinall(to_join)


patch_all()
setup()

Funkcja inheritors znajdzie nam wszystkie klasy dziedziczące po „AbstractActor” i „StartActor”. Taki prosty autodiscovery. Dla każdej znalezionej klasy tworzymy jej instancję, każdą startujemy (dodajemy do pętli zdarzeń), rejestrujemy do managera tak żeby była przez niego widoczna i odpalamy „joinall” – czekając na wszystkie. Zobaczmy jak to działa.

Obrazek wejściowy:

i efekt dla „Blur=>Detail”:

Logi tak wyglądają:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Creating <FinishActor at 0x7ff071118190> ...
Creating <Blur at 0x7ff071118230> ...
Creating <Detail at 0x7ff0711182d0> ...
Created [<RedisActor at 0x7ff0724dc690>, <FinishActor at 0x7ff071118190>, <Blur at 0x7ff071118230>, <Detail at 0x7
ff0711182d0>]
Putting into  <Blur at 0x7ff071118230>
Waiting in <FinishActor at 0x7ff071118190>
Waiting in <Blur at 0x7ff071118230>
Processing in <Blur at 0x7ff071118230>, next will be 'Detail'
Putting into  <Detail at 0x7ff0711182d0>
Waiting in <Detail at 0x7ff0711182d0>
Processing in <Detail at 0x7ff0711182d0>, next will be None
Actor None not found!
Putting into  <FinishActor at 0x7ff071118190>
Processing in <FinishActor at 0x7ff071118190>, next will be None
Saving in format 'jpg' ....
Saved in '/home/mmazurek/V5K02J.jpg'

Dla Blur=>Blur=>Blur=>Blur:

I dla np. Detail=>Blur=>Detail=>Detail=>Detail

Oczywiście w trakcie używania takiego systemu możemy dojść do wniosku że któraś z warstw (aktorów) ma większe obciążenie, w sensie przetwarzanie w niej trwa dłużej. Można wtedy dołożyć kolejną instancję już istniejącego Aktora:

1
all_actors_instances.append(Blur())

w funkcji setup() przed pętlą odpalającą greenlety.

Spowoduje to lekką zmianę w logach powitalnych:

1
2
3
4
5
6
7
8
9
Creating <FinishActor at 0x7f17aee32d70> ...
Creating <Blur at 0x7f17aee32cd0> ...
Creating <Detail at 0x7f17aee32eb0> ...
Creating <Blur at 0x7f17aee32e10> ...
Created [<RedisActor at 0x7f17aee32c30>, <FinishActor at 0x7f17aee32d70>, <Blur at 0x7f17aee32cd0>, <Detail at 0x7f17aee32eb0>, <Blur at 0x7f17aee32e10>]
Waiting in <FinishActor at 0x7f17aee32d70>
Waiting in <Blur at 0x7f17aee32cd0>
Waiting in <Detail at 0x7f17aee32eb0>
Waiting in <Blur at 0x7f17aee32e10>

Widać po prostu że dodatkowy greenlet został uruchomiony. Rozróżnić można je po adresie pamięci. Spróbujmy odpalić system dla danych Blur=>Blur=>Blur:

1
2
3
4
5
6
7
8
9
10
11
Putting into  <Blur at 0x7f17aee32cd0>
Processing in <Blur at 0x7f17aee32cd0>, next will be 'Blur'
Putting into  <Blur at 0x7f17aee32e10>
Processing in <Blur at 0x7f17aee32e10>, next will be 'Blur'
Putting into  <Blur at 0x7f17aee32e10>
Processing in <Blur at 0x7f17aee32e10>, next will be None
Actor None not found!
Putting into  <FinishActor at 0x7f17aee32d70>
Processing in <FinishActor at 0x7f17aee32d70>, next will be None
Saving in format 'jpg' ....
Saved in '/home/mmazurek/V14113.jpg'

I można zauważyć że raz obrazek przetwarza greenlet o adresie z końcówką cd0 a raz e10. Oczywiście nie jest to jakiś bardzo dobry sposób na wybór docelowego Aktora – można by pokusić się tutaj o np. jakiś loadballancing albo chociaż rotację liniową.

Warto dostrzec też że zbudowaliśmy architekturę potokową. Tzn podzieliliśmy jakiś proces na etapy i poszczególne etapy, mimo iż są zależne od siebie, to mogą przetwarzać inne zadania, tzn że jeśli zadanie #1 dojdzie do 3 etapu przetwarzania to drugi etap może już działać na zadaniu #2 a pierwszy na #3.

Oczywiście ten system jest pokazowy – nie do zastosowań produkcyjnych, brakuje mu np obsługi wyjątków, brak podnoszenia greenletów np. bo błędzie czy jakiejkolwiek odporności na awarie – w tym momencie wyjątek w którymkolwiek etapie blokuje pętle zdarzeń.

Może warto odpalić wiele pętli zdarzeń, każda na osobnym procesie?

Kombinować zawsze warto :)

Dzięki za wizytę,
Mateusz Mazurek
Podziel się na:
    Facebook email PDF Wykop Twitter

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *

This site uses Akismet to reduce spam. Learn how your comment data is processed.