Pomoc - Szukaj - Użytkownicy - Kalendarz
Pełna wersja: Kolejka w PHP,
Forum PHP.pl > Forum > PHP
Prph
Dawno mnie tu nie było smile.gif

Forumowicze, problem z kolejką w PHP. System, który tworzę przetwarza dane co minutę (z CRONa). Niestety uruchomienie pojedynczego procesu np, przetwarzaj.php nie kończy się w 1 minucie, ponieważ przetwarzanie danych może trwać np. 2 mninuty dla pojedynczego rekordu bazy danych, a rekordów jest np. 20. Teoretycznie szeregowe uruchomienie przetwarzania zajmie więc 40 min. Każdy przetwarzaj.pho bierze z bazy zatem 1 rekord i tylko taki mieli.

Moje pytanie: w jaki sposób napisać kolejkę, która poprawnie przetworzy takie dane? Wywołać z crona 20x ten sam przetwarzaj.php? Zanim proces zacznie wykonywać swoją pracę, ustawia na rekordzie bazy, że jest on przetwarzany, więc inny proces weźmie kolejne dane.

Spotkaliście się z podobnym problemem, macie pomysły?
Crozin
Podstawowe pytanie: czy przetworzenie kilku, np. maksymalnie pięciu, elementów na raz działa szybciej od przetwarzania kolejnych elementów po kolei?
Prph
Tak, przecież zostaną wykonane równolegle. Wykonają się zatem w ciągu 2 minut, zamiast 40.
!*!
Cytat(Prph @ 23.05.2012, 07:39:57 ) *
Tak, przecież zostaną wykonane równolegle. Wykonają się zatem w ciągu 2 minut, zamiast 40.


Chciałbyś. Przecież "moc" obliczeń jest identyczna,więc jak zaginasz czasoprzestrzeń?
hind
Zły tok rozumowania... równoległe uruchomienie spwoduje że wszystkie procesy będą działać wolniej... więc zamiast upragnionych 2 minut będzie to 20 (lub i więcej)...

jak zrobić?
napisać skrypt który w pętli odpali te 20 zadań np
  1. #!/usr/bin/env php
  2. <?php
  3. foreach(range(1,20) as $v) {
  4. echo `/skrypt/do/eykonania.php --id=$v &`; // & <- b. ważny znak
  5. }
Prph
No bez przesady. 16 rdzeni przetworzy szybciej 16 równoległych procesów, niż 1 ciągły. Proces nie może wykonać się szeregowo, bo już pierwsze przetwarzanie danych może zająć dużo czasu, a przetwarzanie musi odbywać się co minutę. Pomysł hind wnosi coś do dyskusji.
Crozin
To może inaczej, co jest powodem tak długiego przetwarzania pojedynczego rekordu? Bo jeżeli są to jakieś ciężkie operacje wykorzystujące procesor albo dysk, wykonanie równoległych przetwarzań wcale nie musi przyspieszyć ich ogólnego czasu wykonywania. Zaś jeżeli jest to spowodowane przykładowo oczekiwaniem na pobranie danych z sieci to już jak najbardziej miałoby to sens.

1. Zamiast odpalać co minutę zadanie w Cronie, utwórz sobie daemona(y) (System_Daemon).
2. Generalnie utworzenie w bazie danych kolumny status (awaiting, processing, processed) będzie niewystarczające. Dwa (lub nawet więcej) procesy mogłyby pobrać ten sam wiersz nim którykolwiek z nich zdąży zmienić jego status z awaiting na processing. Tutaj trzeba by było albo zablokować na chwilę tabelę dla odczytu:
  1. LOCK TABLE tbl_name READ;
  2. SELECT ... WHERE STATUS = 'awaiting' LIMIT 1;
  3. UPDATE ... SET STATUS = 'processing' WHERE id = ...;
  4. UNLOCK TABLE tbl_name;

3. Jeżeli masz możliwość nie pisz tego w PHP, tylko w czymś co wspiera współbieżność (ot, chociażby Java) - znacznie uprości Ci to życie.

EDIT:
Ewentualnie zamiast ręcznego bawienia się w blokowanie tabeli skorzystaj z transakcji, i tak zawsze trzeba, na odpowiednim poziomie izolacji - tutaj potrzebny byłby chyba aż poziom serializable.
Prph
No to jest odpowiedź na wysokim poziomie smile.gif

Proces nie obciąża procesora. Długo wykonuje się, ponieważ przesyła dane i też niedużo, ale klient odbierający dane przetwarza je i to zajmuje czas.

Nawet gdyby 2 procesy dobrały się do tych samych danych nic się nie stanie, klient otrzyma je drugi raz co będzie skutkowało najwyżej ponownym przemieleniem danych.

Jeśli chodzi o Javę, to faktycznie mógłbym jej użyć, ale to banalny kod w PHP, jeśli udałoby się tylko wywołać 20 równoległych procesów to myślę, że byłoby OK.

Co z forkowaniem? Jak ten Demon w PHP? Ktoś to wdrażał w profesjonalnych zastosowaniach?
Crozin
Cytat
Proces nie obciąża procesora. Długo wykonuje się, ponieważ przesyła dane i też niedużo, ale klient odbierający dane przetwarza je i to zajmuje czas.
Tak więc tutaj może to mieć rzeczywiście sens. Musisz tylko pamiętać, że jednoczesne podesłanie znaczącej ilości żądań do drugiej strony może ją "zamulić" i w efekcie jeszcze bardziej wydłużyć ogólny czas działania. Prawdopodobnie dobrym pomysłem będzie wysyłanie jakiejś ograniczonej ilości żądań jednocześnie, np. 5 albo 20.
Cytat
Jeśli chodzi o Javę, to faktycznie mógłbym jej użyć, ale to banalny kod w PHP, jeśli udałoby się tylko wywołać 20 równoległych procesów to myślę, że byłoby OK.
Tak na pierwszy rzut oka, wykonanie tego np. w Javie będzie znacznie prostsze niż w PHP. Jeden wątek pobierający rekordy do obrobienia (w ilości np. 100), który będzie je wrzucać na stos oraz 5-10-20 wątków pobierających po jednym elemencie ze stosu i przetwarzających go. W momencie gdy ilość rekordów na stosie się kończy będzie on pobierać sobie następną porcję.
Zero problemów z wielokrotnym obrabianiem tego samego rekordu, "prosta" baza danych nie wymagająca żadnego dbania o współbieżność (brak LOCKów), jeden proces w systemie zamiast n+1 (czyli łatwe okiełznanie tego). Całość pewnie w 50 linijkach kodu by się zmieściła, z czego połowa to pewnie by były klamerki.
Cytat
Co z forkowaniem? Jak ten Demon w PHP? Ktoś to wdrażał w profesjonalnych zastosowaniach?
W forkowanie nigdy się w PHP nie bawiłem - ono się po prostu do tego kompletnie nie nadaje. Daemon w PHP? Jak najbardziej, ale pojedynczy.

EDIT: Oczywiście strukturą danych przechowującą rekordy do przetworzenia powinna być tytułowa kolejka, nie stos.
Niktoś
Php z natury nie jest wielowątkowe(czyt.threading), pozostaje kolejkowanie, albo "emulacja" wielowątkowości poprzez użycie pamięci wspóldzielonej.
Taka implementacja wielowątkowości w PHP:
http://www.php.blogle.info/2011/nietypowe-...atkowosc-w-php/
Prph
Zrobione za pomocą forkowania. Aż miło patrzeć jak to działa smile.gif

  1.  
  2. $sql ='tutaj selekt pobierający listę do wysłania...';
  3.  
  4. $res = db_query($sql);
  5. $queue = db_fetch_all($res);
  6.  
  7. if(!count($queue)) {
  8. echo "Nothing to send\n";
  9. }
  10.  
  11. $numOfPids = 0;
  12. $processes = array();
  13. foreach($queue as $entry) {
  14.  
  15. $processes[$numOfPids] = pcntl_fork();
  16. if($processes[$numOfPids]) { // this is parent process
  17. $numOfPids++;
  18. }
  19. else {
  20.  
  21. echo "In $numOfPids process, sending {$entry['id']}\n";
  22.  
  23. // ok we are in the child
  24. // send data
  25.  
  26. // have to connect again
  27. db_connect('...');
  28.  
  29. $update = array('status' => 'in_process');
  30. db_update('...'));
  31. send_data($entry); // funkcja wysyłająca
  32.  
  33. echo "Process $numOfPids sent data\n";
  34.  
  35. exit();
  36. }
  37. }


Wynik:

  1. In 0 process, sending 5790
  2. In 1 process, sending 5791
  3. In 2 process, sending 5797
  4. Process 0 sent data
  5. Process 2 sent data
  6. Process 1 sent data
  7. It's done!
Crozin
Aż mnie coś wzięło, żeby zobaczyć jak można zrobić to "normalnie" (czyt: bez forkowania procesów, a z użyciem normalnych wątków):
  1. package com.crozin.concurrent;
  2.  
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.concurrent.LinkedBlockingQueue;
  5.  
  6. public class RecordsProcessor implements Runnable {
  7. private static final int NUMBER_OF_PROCESSORS = 8;
  8.  
  9. private final BlockingQueue<Integer> processingQueue = new LinkedBlockingQueue<Integer>();
  10.  
  11. private class ProcessorThread extends Thread {
  12. public ProcessorThread(String name) {
  13. super(name);
  14. }
  15.  
  16. @Override
  17. public void run() {
  18. try {
  19. while (true) {
  20. Integer record = processingQueue.take();
  21.  
  22. log("Processing record #" + record);
  23.  
  24. Thread.sleep(new java.util.Random().nextInt(1000) + 1000);
  25. }
  26. } catch (InterruptedException e) {
  27. return;
  28. }
  29. }
  30. }
  31.  
  32. @Override
  33. public void run() {
  34. for (int i = 0; i < NUMBER_OF_PROCESSORS; i++) {
  35. new ProcessorThread("Processor #" + i).start();
  36. }
  37.  
  38. int c = 1;
  39.  
  40. while (true) {
  41. if (processingQueue.size() <= 10) {
  42. log("Refilling the queue");
  43.  
  44. for (int i = 0; i < 100; i++) {
  45. processingQueue.add(c++);
  46. }
  47. }
  48.  
  49. try {
  50. Thread.sleep(2000);
  51. } catch (InterruptedException e) {
  52. return;
  53. }
  54. }
  55. }
  56.  
  57. public static void main(String[] args) throws Exception {
  58. new RecordsProcessor().run();
  59. }
  60.  
  61. public static void log(String msg) {
  62. System.out.println(Thread.currentThread().getName() + ": " + msg);
  63. }
  64. }
Minusy:
- konieczność zrezygnowania z PHP dla tego elementu systemu.

Plusy:
- będzie działać na każdej platformie,
- gwarantuje, że rekordy zostaną przetworzone w odpowiedniej kolejności oraz tylko i wyłącznie jeden raz,
- brak niepotrzebnych przerw w działaniu,
- kontrola nad ilością przetwarzanych rekordów w danym momencie,
- łatwiej to wszystko ogarnąć, wprowadzać zmiany itp.
cezet
Witajcie,

Pewnie mnie teraz zjedziecie, bo jak śmiem... smile.gif
Ale zaryzykuję. Uważam, że metody które podaliście, to zabijanie muchy przy użyciu czołgu.

Ja swojego czasu miałem podobny problem. Potrzebowałem mechanizmu częstego uruchamiania procesu, który może trwać chwilę, lub pół godziny, i warunek - procesy nie mogły działać równolegle.

I rozwiązałem to na zasadznie sprawdzania pliku typu lock.
czyli coś w stylu:

jeden plik odpalany np. co minutę z crona (np cron.php)
  1. <?php
  2. if(!file_exists('lock.lck')) {
  3. if(touch('lock.lck')) {
  4. include('zrob_cos.php');
  5. unlink('lock.lck');
  6. }
  7. }
  8. ?>


oraz drugi plik zrob_cos.php który wykonywał nasze zadania.

To taki ogólny zarys, ale.. proste, banalne i skuteczne smile.gif
Crozin
Cytat
[...] procesy nie mogły działać równolegle.
A zauważyłeś, że cały wątek tyczy się właśnie równoległego odpalenia wielu procesów/wątków?
greycoffey
Nie zjedziemy, acz wytkniemy błąd.
Ten skrypt to tylko semafor zapobiegający uruchomieniu dwóch instancji tego skryptu. Autor natomiast chciałby równloegle przetwarzać te rekordy, rozłożyć je na kilka wątków (serwer jak autor pisał 16-rdzeniowy), dzięki czemu zadania będą się wykonywały równolegle.
cezet
Zwracam honor wink.gif
To jest wersja lo-fi głównej zawartości. Aby zobaczyć pełną wersję z większą zawartością, obrazkami i formatowaniem proszę kliknij tutaj.
Invision Power Board © 2001-2025 Invision Power Services, Inc.