Jump to content

Executing time consuming functions in an EventLoop


NotionCommotion
Go to solution Solved by requinix,

Recommended Posts

Occasionally bla::doWork() under test case 0 will send out a group of emails and and take a little time, and while ReactPHP provides async i/o, doWork() is blocking and prevents the other loops from triggering.

I came up with two other possible solutions under my test case 1 and 2. As another solution, thinking of using a redis/etc queue to dump the content to in this script and then have another script continuously run and process the data when it becomes available.

Any recommendations which of my three solutions are best, or whether I should do something all together different?  Thanks

<?php
ini_set('display_errors', 1);
require __DIR__.'/../../vendor/autoload.php';
class bla {
    private $loop, $time, $onesecondCounter=0, $fivesecondCounter=0, $processCounter=0;
    public function __construct($test) {
        $this->time = microtime(true);
        $this->loop = \React\EventLoop\Factory::create();

        $phpScript = 'php -r "echo(\"start work\n\");sleep(2);echo(\"end work\n\");"';
        $this->loop->addPeriodicTimer(5, function () use($test, $phpScript) {
            $this->fivesecondCounter++;
            $this->test('5 second loop', 5*$this->fivesecondCounter);
            switch($test) {
                case 0:
                    $this->doWork();
                    break;
                case 1:
                    $process = new \React\ChildProcess\Process($phpScript);
                    $process->start($this->loop);
                    $process->stdout->on('data', function ($chunk) {
                        echo '$process->stdout->on: '.$chunk. PHP_EOL;
                    });

                    $process->on('exit', function($exitCode, $termSignal) use ($process) {
                        $this->processCounter++;
                        echo 'Process exited with code ' . $exitCode .' and termSignal ' . $termSignal. PHP_EOL;
                        $process->terminate();
                    });
                    break;
                case 2:
                    echo 'start exec'.PHP_EOL;
                    shell_exec("/usr/bin/nohup ".$phpScript." >/dev/null 2>&1 &");
                    echo 'end exec'.PHP_EOL;
                    break;
                case 3:
                    echo 'Or use another approach such as a redis queue?'.PHP_EOL;
                    break;
                default: exit("invalid $test");
            }
        });

        $this->loop->addPeriodicTimer(1, function () {
            $this->onesecondCounter++;
            $this->test('1 second loop', $this->onesecondCounter);
        });

    }

    public function run()
    {
        $this->loop->run();   
    }

    private function doWork()
    {
        $t=microtime(true);
        $c=0;
        for($i=0; $i<=10000000; $i++ ) {
            $c++;
        }
        printf('Just did %.2f seconds of work'.PHP_EOL, microtime(true)-$t);
    }

    private function test($msg, $c)
    {
        $time =  microtime(true) - $this->time;
        printf('msg: %s mS: %.2f Error: %.2f onesecondCounter: %d, fivesecondCounter %d, processCounter %d'.PHP_EOL, $msg ,100*$time, $time - $c, $this->onesecondCounter, $this->fivesecondCounter, $this->processCounter);
    }
}

$bla = new bla($argv[1]??0);
$bla->run();

 

Link to comment
Share on other sites

Doesn't look like ReactPHP has something to make use of generators... Those would have been pretty much the ideal answer to this.

Can't you split doWork's work up? If it needs to send a group of emails, don't try to do them all at once: have it do whatever emails it can fit within one second or something, then stop and let the event loop take over.
That's the basic approach to how some sorts of cronjob-type tasks run. It's not about getting everything done at once but about doing a chunk of them every time you get "processor" time.

Link to comment
Share on other sites

Not sure if a generator would help.  It might save memory but didn't think they had anything to do with not blocking.  Am I wrong?

I am sure I can split up doWork's work, but how much is enough?  Would be nice to not have to worry about it.  Any thoughts on my new process or queue idea?  For the queue, is it typical to also include a loop which checks it periodically checks it?  Or maybe something really simple like the following?

while(true) {
    if($workToBeDone = $redis->rpop('workToBeDone')) {
        // process the work
    }
    else {
        sleep(1);
    }
}

 

Link to comment
Share on other sites

  • Solution
5 minutes ago, NotionCommotion said:

Not sure if a generator would help.  It might save memory but didn't think they had anything to do with not blocking.  Am I wrong?

In this case it's about using the generator as a coroutine. Generators can suspend execution and return to some caller, then that caller can have the generator resume.

The use case here is that your doWork becomes a generator, and it yields (suspends) every email or so. When it suspends ReactPHP would have a chance to execute as it needs (ie, run through the event loop), and when it's ready it has doWork continue. Rinse and repeat until hair is sufficiently washed.

https://3v4l.org/qlXCj

 

5 minutes ago, NotionCommotion said:

I am sure I can split up doWork's work, but how much is enough?  Would be nice to not have to worry about it.  Any thoughts on my new process or queue idea?  For the queue, is it typical to also include a loop which checks it periodically checks it?  Or maybe something really simple like the following?


while(true) {
    if($workToBeDone = $redis->rpop('workToBeDone')) {
        // process the work
    }
    else {
        sleep(1);
    }
}

 

As long as doWork is running, ReactPHP's loop will not execute. Because it's not truly asynchronous - it just looks that way.

The basic problem you have is that doWork takes too long. The basic solution is to split the work according to whatever is not "too long". Since your metric is time, I suggested running work until some suitable amount of time has passed. Like one second.

doWork is probably self-contained, right? It determines what work needs to be done and does it. You'd refactor that so that determining the work to be done happens before doWork, then doWork goes through that set until either too much time passes or it runs out of stuff.

class DoesWork {
	private $work = [];

	public function __construct() {
		$this->work = ???;
	}

	public function addWork($item) {
		$this->work[] = $item;
	}

	public function doWork() {
		// exact code varies, but basically...
		$now = microtime(true);
		$end = $now + 1;
		while ($this->work && $now < $end) {
			$item = array_shift($this->work);

			// whatever

			$now = microtime(true);
		}
	}

	public function hasWork() {
		return !empty($this->work);
	}
}
// setup
$worker = new DoesWork();

// inside the event loop,
if ($worker->hasWork()) {
	$worker->doWork();
}

If you look closely, you'll notice the code structure looks similar to what was in that 3v4l link...

Link to comment
Share on other sites

Thanks requinix,

Didn't know that about generators.  They've made me rethink things a few times.

Yes, I now know for sure that React is not truly asynchronous, and had to create the initial script I posted just to prove it to myself.  The last script I showed was a totally different process which checks a queue, and my reactphp script would lpush() work into a queue, and thus not be blocked.

Ah, I get your idea.  Specify the amount of time to dedicate to it and do no more.  Seems simpler that a queue.

 

Link to comment
Share on other sites

It is quite a bit simpler than having to deal with Redis queues, yeah. And unless the work needs to come from an external source, the approach I showed also allows you to add tasks over time. So you could have one "process" (whatever ReactPHP calls it) that has the emailer and doWork, and another process could just as easily add emails to it. And you can go even further than that...

Link to comment
Share on other sites

Sorry, questions.  Inside what event loop?  Some periodic timer I create using addPeriodicTimer(), and not the core reactphp loop, right?    I don't believe I could (or should) create multiple loops, and I am also using the same loop for a socket connection.  Your solution would potentially block for up to 1 second, true?  Is that likely too long?  Are there good rules of thumb for the loop period to maximum block time ratio?

 

// setup
$worker = new DoesWork();

// inside the event loop,
if ($worker->hasWork()) {
	$worker->doWork();
}

 

<?php
$loop = \React\EventLoop\Factory::create();

$server = new \React\Socket\TcpServer("$ip:$port", $loop);

$loop->addPeriodicTimer(5, function () {
    // 5 second loop
});
$loop->addPeriodicTimer(1, function () {
    // 1 second loop
});
$loop->run();   

 

Link to comment
Share on other sites

1 hour ago, NotionCommotion said:

Your solution would potentially block for up to 1 second, true?  Is that likely too long?  Are there good rules of thumb for the loop period to maximum block time ratio?

Ideally you don't ever block if you have work to do.  In general, as long as you can do work you should be doing work.  It's when you cannot do any work that you want to yield control to the event loop.  That's usually when you're waiting for I/O from somewhere.

That said, if the work you can do is going to take along time without having any sort of natural blocking point (I/O) but you don't want to block your other tasks for that long then you need to start trying to find ways you can break up that task so you can yield control occasionally.  That's generally known as cooperative multitasking.  In the case of sending a bunch of emails, an obvious way to break that up would be to yield control after each email.  If other tasks are pending in the queue then they will get a chance to do their work.  If not, control will return back to your original task immediately.  The only time the process should be blocked is when either there are no tasks at all or all tasks are waiting on I/O.

I haven't done anything with react in a long time but as far as I can tell what you'd want to do is use the futureTick function to schedule more work and break up your task.  For example:

<?php

class EmailQueue {
    private $loop;
    private $emailList;
    private $isTicking;

    public function __construct(LoopInterface $loop){
        $this->loop = $loop;
        $this->index = 0;
        $this->emailList = new SplQueue();
        $this->isTicking = false;
    }

    public function queueEmail(string $to, string $message){
        $this->emailList->enqueue([$to, $message]);
        if (!$this->isTicking){
            $this->tick();
        }
    }

    private function tick(){
        if ($this->emailList->isEmpty()){
            $this->isTicking = false;
        } else {
            $this->isTicking = true;
            $this->loop->futureTick(function(){
                $this->processMessage();
                $this->tick();
            };
        }
    }

    private function processMessage(){
        [$to, $message] = $this->emailList->dequeue();
        //Send message.
    }
}

$loop = Factory::create();
$queue = new EmailQueue($loop);
//load up the queue with a bunch of emails.
$loop->run();

 

When you first queue up an email, it will call the tick() function.  That function will check if there are any messages in the queue.  If there are it registers a callback to be executed on the next cycle of the event loop.  When that callback is executed it will dequeue one message and send it, then call tick() again which causes the cycle to repeat if necessary.  As long as there is email in the queue this will constantly work through the queue while still allowing the loop to trigger other work items.

 

Edited by kicken
Link to comment
Share on other sites

4 hours ago, NotionCommotion said:

Your solution would potentially block for up to 1 second, true?  Is that likely too long?

Yes, because the expectation was that you have "a lot" of emails to send, and instead of doing one email, yielding, then another, and yielding, and another, it runs multiple emails in series.
There may not actually be a problem with doing that. Probably the only thing that would really be relevant is how frequently you have more emails to send - theoretically, the thing sending emails could be doing one at a time and something could be generating emails to send faster than the sender can keep up.

Anyway, read what kicken said.

Link to comment
Share on other sites

Thanks kicken,  Will definitely give this a try.  Looks like every iteration of the fast internal loop, everything on the future queue will be executed in its entirety and care must still be made not to put too much on it at once.  Thanks again for pointing me in this direction.

<?php
final class StreamSelectLoop implements LoopInterface
{
    public function __construct()
    {
        $this->futureTickQueue = new FutureTickQueue();
        //...
    }

    public function run()
    {
        $this->running = true;

        while ($this->running) {
            $this->futureTickQueue->tick();

            $this->timers->tick();

            // Future-tick queue has pending callbacks ...
            if (!$this->running || !$this->futureTickQueue->isEmpty()) {
                $timeout = 0;

            // There is a pending timer, only block until it is due ...
            } elseif ($scheduledAt = $this->timers->getFirst()) {
                //...
            }
            //...
        }
    }
    //...
}

 

<?php
final class FutureTickQueue
{
    // ...
    
    /**
     * Flush the callback queue.
     */
    public function tick()
    {
        // Only invoke as many callbacks as were on the queue when tick() was called.
        $count = $this->queue->count();

        while ($count--) {
            \call_user_func(
                $this->queue->dequeue()
            );
        }
    }
}

 

Link to comment
Share on other sites

1 hour ago, NotionCommotion said:

everything on the future queue will be executed in its entirety and care must still be made not to put too much on it at once

Indeed, This is why you don't just load up the queue with a simple foreach loop.  Instead you have your callback re-register itself as long as there is stuff to do.  This works out because the queue will only run whatever callbacks were registered at the time it started running.  Any newly added callbacks will be delayed until the next event loop cycle.

If there are any callbacks in the future tick queue you can see that the event loop will set it's timeout to zero so that it won't block.  It does still test for stream activity so it will still be able to dispatch and process incoming I/O as it needs it, it just won't wait around for it.

Link to comment
Share on other sites

This thread is more than a year old. Please don't revive it unless you have something important to add.

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...

×   Pasted as rich text.   Restore formatting

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.

×
×
  • Create New...

Important Information

We have placed cookies on your device to help make this website better. You can adjust your cookie settings, otherwise we'll assume you're okay to continue.