Jump to content

Implementing a drain when writing sockets


NotionCommotion

Recommended Posts

The following seems to send big messages, but provides a false positive because the write does not complete in one packet.  Any recommendations how to rectify?  Thanks

 

[michael@centos7 testing]$ php server.php
Connect
onConnect.
onData message length: 9
onSend message length: 22
onData message length: 10933382
onSend message length: 10933395
LengthPrefixStream::send() didn't write message with length 10933395
[michael@centos7 testing]$ php client.php
onConnect.
onSend message length: 9
onSend message length: 10933382
LengthPrefixStream::send() didn't write message with length 10933382
onData message length: 22
onData message length: 10933395
<?php
#Testing Server


error_reporting(E_ALL);
ini_set('display_startup_errors', 1);
ini_set('display_errors', 1);


require '../../vendor/autoload.php';
require 'LengthPrefixStream.php';
$c=
[
    'privateKey' => '/etc/pki/tls/private/sockets_ss_key.pem',
    'publicCert' => '/etc/pki/tls/certs/sockets_ss_crt.pem',
    'url'=>'0.0.0.0',
    'server'=>'12.345.678.90',
    'port'=>'1500',
    'peer_name'=>'example.com'
];


openlog('server', LOG_NDELAY, LOG_LOCAL2);


$loop = React\EventLoop\Factory::create();
$server = new React\Socket\TcpServer($c['url'].':'.$c['port'], $loop);
$server = new React\Socket\SecureServer($server, $loop, ['local_cert' => $c['publicCert'],'local_pk' => $c['privateKey']] );


$server->on('connection', function (React\Socket\ConnectionInterface $conn) {
    echo("onConnect.".PHP_EOL);
    $connStream = new LengthPrefixStream($conn);
    $connStream->on('data', function($data) use ($connStream){
        echo("onData message length: ".strlen(json_encode($data)).PHP_EOL);
        $connStream->send(['received'=>$data]);
    });


    $conn->on('error', function($error, $conn) {
        echo('OnError: '.json_encode($error).PHP_EOL);
    });


    $connStream->on('error', function($error){
        echo('OnConnError: '.json_encode($error).PHP_EOL);
    });


    $conn->on('close', function() {
        echo("onClose.".PHP_EOL);
    });


});
$server->on('error', function($error) use ($loop){
    echo('onServerError: '.json_encode($error).PHP_EOL);
});


echo('Connect'.PHP_EOL);
$loop->run();
<?php
#Testing Client


error_reporting(E_ALL);
ini_set('display_startup_errors', 1);
ini_set('display_errors', 1);


require '../../vendor/autoload.php';
require 'LengthPrefixStream.php';
$c=
[
    'privateKey' => '/etc/pki/tls/private/sockets_ss_key.pem',
    'publicCert' => '/etc/pki/tls/certs/sockets_ss_crt.pem',
    'url'=>'0.0.0.0',
    'server'=>'12.345.678.90',
    'port'=>'1500',
    'peer_name'=>'example.com'
];


openlog('server', LOG_NDELAY, LOG_LOCAL2);


$loop = React\EventLoop\Factory::create();
$connector = new React\Socket\Connector($loop);
$connector = new React\Socket\SecureConnector($connector, $loop, ['cafile' => $c['publicCert'],'peer_name' => $c['peer_name'],'allow_self_signed'=>true,]);
$connector->connect($c['server'].':'.$c['port'])
->then(function (React\Socket\ConnectionInterface $connection)  {
    echo("onConnect.".PHP_EOL);
    $connStream = new LengthPrefixStream($connection);
    $connStream->on('data', function($data) {
        echo("onData message length: ".strlen(json_encode($data)).PHP_EOL);
    });
    $connection->on('error', function($error, $connection){
        echo('OnError: '.json_encode($error).PHP_EOL);
    });
    $connStream->on('error', function($error) {
        echo('OnConnError: '.json_encode($error).PHP_EOL);
    });
    $connStream->send(['a'=>123]);
    $connStream->send(json_decode(file_get_contents('bigData.log')));
});


$loop->run();
 
<?php
use Evenement\EventEmitterInterface;
use Evenement\EventEmitterTrait;
use React\Stream\DuplexStreamInterface;


/*
Parses stream, and emits data, or error bad JSON received.
*/
class LengthPrefixStream implements EventEmitterInterface {


    use EventEmitterTrait;


    private $socket=false,
    $buffer='',
    $parseJson; //0=>no, 1=>yes and return object, 2=>yes and return array


    public function __construct(DuplexStreamInterface $socket, $parseJson=1){
        $this->parseJson = $parseJson;
        $this->socket = $socket;
        $this->socket->on('data', function($data){
            $this->buffer .= $data;
            $this->parseBuffer();
        });
    }


    public function send($msg){
        //Do I need to parse???  Still need to implement drain
        if($this->isConnected()) {
            if($this->parseJson) {
                $msg=json_encode($msg);
            }
            echo("onSend message length: ".strlen($msg).PHP_EOL);
            if(!$this->socket->write(pack("V", strlen($msg)).$msg)) {
                echo("LengthPrefixStream::send() didn't write message with length ".strlen($msg).PHP_EOL);
            }
            return true;
        }
        else {
            echo("$this->type LengthPrefixStream::send() not connected".PHP_EOL);
            return false;
        }
    }


    private function parseBuffer(){
        do {
            $checkAgain = false;
            if(($bufferLength = strlen($this->buffer)) > 3 ){
                $length = unpack('Vlen', substr($this->buffer, 0, 4))['len'];
                if(is_int($length)) {
                    if($bufferLength >= $length + 4){
                        $msg=substr($this->buffer, 4, $length);
                        if($this->parseJson) {
                            $msg=json_decode($msg,$this->parseJson-1);
                            if (json_last_error() == JSON_ERROR_NONE){
                                $this->emit('data', [$msg]);
                            }
                            else {
                                $this->emit('error', ['Invalid JSON: '.json_last_error_msg()." (".json_last_error().'): '.substr($this->buffer, 4, $length)]);
                            }
                        }
                        else {
                            $this->emit('data', [$msg]);
                        }
                        $this->buffer = substr($this->buffer, $length+4);
                        $checkAgain = strlen($this->buffer)>=4;
                    }
                }
                else {
                    $this->emit('error', ["Invalid length prefix provided by client: ".substr($this->buffer, 0, 3)]);
                }
            }
        } while ($checkAgain);
    }


    public function isConnected()
    {
        return $this->socket?true:false;
    }


    public function close()
    {
        $this->socket->close();
    }
}

 

Link to comment
Share on other sites

Assuming the data size is the problem, break it into smaller pieces. 10MB is a bit much.

 

For the drain, there's a "drain" event to listen for if the write() returns false (and remains connected). And just like how you have a buffer for reading, you can set up a buffer for writing.

Link to comment
Share on other sites

Yes, size is the problem, but hopefully not much of a problem.

 

I swore I searched for the drain event.  I evidently did not do a good job of doing so.

 

I hoped that write() would return true if accepted fully (via direct write or reactphp's buffer), false if something bad happened, or an integer representing the number of bytes accepted so I could remove from them the buffer.  But I don't think it will return the number of bytes accepted.  Am I mistaken?  How is this dealt with?

 

 Thank you for your help.

Link to comment
Share on other sites

React's interface is a bit poor IMO when it comes to writing to a stream. They return false on write to indicate the buffer is either full or closed, so just checking the return value of write is not enough to determine if the data was written.

 

Adding additional complexity to the return value, it will return false if the write causes the socket buffer to become full, but everything you wrote will have been accepted. It doesn't do partial writes.

 

It seems their intent is that you do something like this:

$data = "...";

if ($this->socket->isWritable()){
    $this->socket->write($data); //You can assume everything was written
} else {
    //Buffer $data and try again later. 
    //How is up to you to figure out.
}
If the socket is not writable due to being full then you have to wait until it's writable before trying again or the data would be lost. This is where their drain event comes into play. It's emitted once the socket has flushed out enough data that it's buffer is no longer full.

 

    public function __construct(){
        //...
        $this->socket->on('drain', function(){
            $this->socket->write($this->sendBuffer);
            $this->sendBuffer = '';
        });
    }
    
    public function write($data){
        if ($this->socket->isWritable()){
            $this->socket->write($data);
        } else {
            $this->sendBuffer .= $data;
        }
    }
Link to comment
Share on other sites

Looking at the code a bit closer it seems they only actually disable writing if the stream is closed to writes and it will continue buffering data even if the buffer is "full", so that simplifies things. You still can't use the return value of write by itself to determine if you data was written, but there's no real need to listen for the drain event and buffer data yourself unless you want to. Just check if the socket is writable prior to trying to write to it and if not handle it appropriately.

Link to comment
Share on other sites

Looking at the code a bit closer it seems they only actually disable writing if the stream is closed to writes and it will continue buffering data even if the buffer is "full", so that simplifies things. You still can't use the return value of write by itself to determine if you data was written, but there's no real need to listen for the drain event and buffer data yourself unless you want to. Just check if the socket is writable prior to trying to write to it and if not handle it appropriately.

 

Yeah, it is a little misleading.  According to https://github.com/reactphp/stream, the drain event is mostly used internally by the write() method.  But then the write() method says to use the drain event.

 

It seems to me and I understand that you agree that there is no need to implement any write buffering after all.

 

drain event
The drain event will be emitted whenever the write buffer became full previously and is now ready to accept more data.
...
This event is mostly used internally, see also write() for more details.


write()
The write(mixed $data): bool method can be used to write some data into the stream.


A successful write MUST be confirmed with a boolean true, which means that either the data was written (flushed) immediately or is buffered and scheduled for a future write. Note that this interface gives you no control over explicitly flushing the buffered data, as finding the appropriate time for this is beyond the scope of this interface and left up to the implementation of this interface.
...
If a stream cannot handle writing (or flushing) the data, it SHOULD emit an error event and MAY close() the stream if it can not recover from this error.


If the internal buffer is full after adding $data, then write() SHOULD return false, indicating that the caller should stop sending data until the buffer drains. The stream SHOULD send a drain event once the buffer is ready to accept more data.


Similarly, if the the stream is not writable (already in a closed state) it MUST NOT process the given $data and SHOULD return false, indicating that the caller should stop sending data.
...

 

EDIT.  After testing a little more, it is possible to break the application by not explicitly buffering and using the drain.  I will come up with an implementation and post it.  Thanks

PHP Fatal error:  Allowed memory size of 134217728 bytes exhausted (tried to allocate 10489856 bytes) in /var/www/public/testing/LengthPrefixStream.php on line 33
Link to comment
Share on other sites

Archived

This topic is now archived and is closed to further replies.

×
×
  • Create New...

Important Information

We have placed cookies on your device to help make this website better. You can adjust your cookie settings, otherwise we'll assume you're okay to continue.