Jump to content

Multiple connections to a socket server


NotionCommotion

Recommended Posts

I have the following workflow:

  1. A browser client first makes a HTTP request to a webserver.
  2. Webserver makes a cURL request to another REST API HTTP server.
  3. REST server spawns a socket client, connects to a local socket server, initiates startTaskA, and returns confirmation status to the webserver's cURL request.
  4. Webserver responds to the browser that startTaskA is started.

Later, but before startTaskA is complete, the browser client makes a second HTTP request with the intention to cancel startTaskA.

How can this be accomplished?

$loop = \React\EventLoop\Factory::create();
$server = new \React\Socket\TcpServer('0.0.0.0:1337', $loop);
$server->on('connection', function (\React\Socket\ConnectionInterface $conn) {
    $conn = new LengthPrefixStream($conn);
    $this->clientController->addClient($conn);
    $conn->on('data', function($data) use ($conn){
        switch($data['method']) {
            case 'startTaskA':
                for ($i = 1; $i <= 1000; $i++) {
                    doSomethingThatTakesAboutOnceSecond();
                }
                break;
            case 'cancelTaskA':
                //How can this be done?
                break;
            case 'otherTasks':break;
        }
    });
});
$loop->run();

 

Link to comment
Share on other sites

startTaskA has to be based on something identifiable so that code can reference it. That same identifier should be passed back through to the browser so it can specifically request that task be aborted instead of some other one.

As a very basic example,

class RunningTasks {
    public static $running = array();
    public static $n = 0;

    public static function abort($n) {
        if (isset(self::$running[$n])) {
            self::$running[$n] = false;
        }
    }
    public static function &next(&$n) {
        $n = self::$n++;
        self::$running[$n] = true;
        return self::$running[$n];
    }
    public static function stop($n) {
        unset(self::$running[$n]);
    }
}

$loop = \React\EventLoop\Factory::create();
$server = new \React\Socket\TcpServer('0.0.0.0:1337', $loop);
$server->on('connection', function (\React\Socket\ConnectionInterface $conn) {
    $conn = new LengthPrefixStream($conn);
    $this->clientController->addClient($conn);
    $conn->on('data', function($data) use ($conn){
        switch($data['method']) {
            case 'startTaskA':
                $r =& RunningTasks::next($n);
                /* send $n to the client... */

                for ($i = 1; $r && $i <= 1000; $i++) {
                    doSomethingThatTakesAboutOnceSecond();
                }
                RunningTasks::stop($n);
                break;
            case 'cancelTaskA':
                RunningTasks::abort(/* $n from the initial request */);
                break;
            case 'otherTasks':break;
        }
    });
});
$loop->run();

 

Link to comment
Share on other sites

You'll need to run your task in such a way that it can be interrupted also.  If you just do a simple loop that runs a time consuming function then your socket server will be unresponsive during that time as PHP will be busy doing other stuff.

What you can do instead is between each doSomethingThatTakesAboutOnceSecond() call let your server process other tasks and check for an abort signal before starting the next call.

In the react world, this can be done by replacing your loop with calls to the event loop's futureTick() function.

class Task {
    private $loop;
    private $abort = false;
    private $loopCounter = 0;
    private $loopLimit = 1000;
	    public function __construct(LoopInterface $loop){
        $this->loop = $loop;
    }
	    public function start(){
        $this->loopCounter = 0;
        $this->tick();
    }
	    public function abort(){
        $this->abort = true;
    }
	    private function tick(){
        if (!$this->abort && $this->loopCounter < $this->loopLimit){
            $this->loop->futureTick(function(){
                $this->doSomethingThatTakesAboutASecond();
                $this->tick();
            });
            $this->loopCounter++;
        }
    }
    private function doSomethingThatTakesAboutASecond(){
        sleep(1);
    }
}

With the above after every execution of doSomethingThatTakesAboutASecond() your socket processing code can process to accept new data/connections allowing your abort call to be received.  Then you'd just call the abort() method of the task to prevent it from registering any future tick operations.  

Combine that with what requinix said of generating and tracking unique identifiers so you can identify what task to abort and you should be good to go.

 

Edited by kicken
Link to comment
Share on other sites

This thread is more than a year old. Please don't revive it unless you have something important to add.

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...

×   Pasted as rich text.   Restore formatting

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.

×
×
  • Create New...

Important Information

We have placed cookies on your device to help make this website better. You can adjust your cookie settings, otherwise we'll assume you're okay to continue.