NotionCommotion Posted June 25, 2017 Share Posted June 25, 2017 The following seems to send big messages, but provides a false positive because the write does not complete in one packet. Any recommendations how to rectify? Thanks [michael@centos7 testing]$ php server.php Connect onConnect. onData message length: 9 onSend message length: 22 onData message length: 10933382 onSend message length: 10933395 LengthPrefixStream::send() didn't write message with length 10933395 [michael@centos7 testing]$ php client.php onConnect. onSend message length: 9 onSend message length: 10933382 LengthPrefixStream::send() didn't write message with length 10933382 onData message length: 22 onData message length: 10933395 <?php #Testing Server error_reporting(E_ALL); ini_set('display_startup_errors', 1); ini_set('display_errors', 1); require '../../vendor/autoload.php'; require 'LengthPrefixStream.php'; $c= [ 'privateKey' => '/etc/pki/tls/private/sockets_ss_key.pem', 'publicCert' => '/etc/pki/tls/certs/sockets_ss_crt.pem', 'url'=>'0.0.0.0', 'server'=>'12.345.678.90', 'port'=>'1500', 'peer_name'=>'example.com' ]; openlog('server', LOG_NDELAY, LOG_LOCAL2); $loop = React\EventLoop\Factory::create(); $server = new React\Socket\TcpServer($c['url'].':'.$c['port'], $loop); $server = new React\Socket\SecureServer($server, $loop, ['local_cert' => $c['publicCert'],'local_pk' => $c['privateKey']] ); $server->on('connection', function (React\Socket\ConnectionInterface $conn) { echo("onConnect.".PHP_EOL); $connStream = new LengthPrefixStream($conn); $connStream->on('data', function($data) use ($connStream){ echo("onData message length: ".strlen(json_encode($data)).PHP_EOL); $connStream->send(['received'=>$data]); }); $conn->on('error', function($error, $conn) { echo('OnError: '.json_encode($error).PHP_EOL); }); $connStream->on('error', function($error){ echo('OnConnError: '.json_encode($error).PHP_EOL); }); $conn->on('close', function() { echo("onClose.".PHP_EOL); }); }); $server->on('error', function($error) use ($loop){ echo('onServerError: '.json_encode($error).PHP_EOL); }); echo('Connect'.PHP_EOL); $loop->run(); <?php #Testing Client error_reporting(E_ALL); ini_set('display_startup_errors', 1); ini_set('display_errors', 1); require '../../vendor/autoload.php'; require 'LengthPrefixStream.php'; $c= [ 'privateKey' => '/etc/pki/tls/private/sockets_ss_key.pem', 'publicCert' => '/etc/pki/tls/certs/sockets_ss_crt.pem', 'url'=>'0.0.0.0', 'server'=>'12.345.678.90', 'port'=>'1500', 'peer_name'=>'example.com' ]; openlog('server', LOG_NDELAY, LOG_LOCAL2); $loop = React\EventLoop\Factory::create(); $connector = new React\Socket\Connector($loop); $connector = new React\Socket\SecureConnector($connector, $loop, ['cafile' => $c['publicCert'],'peer_name' => $c['peer_name'],'allow_self_signed'=>true,]); $connector->connect($c['server'].':'.$c['port']) ->then(function (React\Socket\ConnectionInterface $connection) { echo("onConnect.".PHP_EOL); $connStream = new LengthPrefixStream($connection); $connStream->on('data', function($data) { echo("onData message length: ".strlen(json_encode($data)).PHP_EOL); }); $connection->on('error', function($error, $connection){ echo('OnError: '.json_encode($error).PHP_EOL); }); $connStream->on('error', function($error) { echo('OnConnError: '.json_encode($error).PHP_EOL); }); $connStream->send(['a'=>123]); $connStream->send(json_decode(file_get_contents('bigData.log'))); }); $loop->run(); <?php use Evenement\EventEmitterInterface; use Evenement\EventEmitterTrait; use React\Stream\DuplexStreamInterface; /* Parses stream, and emits data, or error bad JSON received. */ class LengthPrefixStream implements EventEmitterInterface { use EventEmitterTrait; private $socket=false, $buffer='', $parseJson; //0=>no, 1=>yes and return object, 2=>yes and return array public function __construct(DuplexStreamInterface $socket, $parseJson=1){ $this->parseJson = $parseJson; $this->socket = $socket; $this->socket->on('data', function($data){ $this->buffer .= $data; $this->parseBuffer(); }); } public function send($msg){ //Do I need to parse??? Still need to implement drain if($this->isConnected()) { if($this->parseJson) { $msg=json_encode($msg); } echo("onSend message length: ".strlen($msg).PHP_EOL); if(!$this->socket->write(pack("V", strlen($msg)).$msg)) { echo("LengthPrefixStream::send() didn't write message with length ".strlen($msg).PHP_EOL); } return true; } else { echo("$this->type LengthPrefixStream::send() not connected".PHP_EOL); return false; } } private function parseBuffer(){ do { $checkAgain = false; if(($bufferLength = strlen($this->buffer)) > 3 ){ $length = unpack('Vlen', substr($this->buffer, 0, 4))['len']; if(is_int($length)) { if($bufferLength >= $length + 4){ $msg=substr($this->buffer, 4, $length); if($this->parseJson) { $msg=json_decode($msg,$this->parseJson-1); if (json_last_error() == JSON_ERROR_NONE){ $this->emit('data', [$msg]); } else { $this->emit('error', ['Invalid JSON: '.json_last_error_msg()." (".json_last_error().'): '.substr($this->buffer, 4, $length)]); } } else { $this->emit('data', [$msg]); } $this->buffer = substr($this->buffer, $length+4); $checkAgain = strlen($this->buffer)>=4; } } else { $this->emit('error', ["Invalid length prefix provided by client: ".substr($this->buffer, 0, 3)]); } } } while ($checkAgain); } public function isConnected() { return $this->socket?true:false; } public function close() { $this->socket->close(); } } Quote Link to comment Share on other sites More sharing options...
requinix Posted June 26, 2017 Share Posted June 26, 2017 Assuming the data size is the problem, break it into smaller pieces. 10MB is a bit much. For the drain, there's a "drain" event to listen for if the write() returns false (and remains connected). And just like how you have a buffer for reading, you can set up a buffer for writing. Quote Link to comment Share on other sites More sharing options...
NotionCommotion Posted June 26, 2017 Author Share Posted June 26, 2017 Yes, size is the problem, but hopefully not much of a problem. I swore I searched for the drain event. I evidently did not do a good job of doing so. I hoped that write() would return true if accepted fully (via direct write or reactphp's buffer), false if something bad happened, or an integer representing the number of bytes accepted so I could remove from them the buffer. But I don't think it will return the number of bytes accepted. Am I mistaken? How is this dealt with? Thank you for your help. Quote Link to comment Share on other sites More sharing options...
kicken Posted June 26, 2017 Share Posted June 26, 2017 React's interface is a bit poor IMO when it comes to writing to a stream. They return false on write to indicate the buffer is either full or closed, so just checking the return value of write is not enough to determine if the data was written. Adding additional complexity to the return value, it will return false if the write causes the socket buffer to become full, but everything you wrote will have been accepted. It doesn't do partial writes. It seems their intent is that you do something like this: $data = "..."; if ($this->socket->isWritable()){ $this->socket->write($data); //You can assume everything was written } else { //Buffer $data and try again later. //How is up to you to figure out. } If the socket is not writable due to being full then you have to wait until it's writable before trying again or the data would be lost. This is where their drain event comes into play. It's emitted once the socket has flushed out enough data that it's buffer is no longer full. public function __construct(){ //... $this->socket->on('drain', function(){ $this->socket->write($this->sendBuffer); $this->sendBuffer = ''; }); } public function write($data){ if ($this->socket->isWritable()){ $this->socket->write($data); } else { $this->sendBuffer .= $data; } } 1 Quote Link to comment Share on other sites More sharing options...
kicken Posted June 26, 2017 Share Posted June 26, 2017 Looking at the code a bit closer it seems they only actually disable writing if the stream is closed to writes and it will continue buffering data even if the buffer is "full", so that simplifies things. You still can't use the return value of write by itself to determine if you data was written, but there's no real need to listen for the drain event and buffer data yourself unless you want to. Just check if the socket is writable prior to trying to write to it and if not handle it appropriately. 2 Quote Link to comment Share on other sites More sharing options...
NotionCommotion Posted June 26, 2017 Author Share Posted June 26, 2017 (edited) Looking at the code a bit closer it seems they only actually disable writing if the stream is closed to writes and it will continue buffering data even if the buffer is "full", so that simplifies things. You still can't use the return value of write by itself to determine if you data was written, but there's no real need to listen for the drain event and buffer data yourself unless you want to. Just check if the socket is writable prior to trying to write to it and if not handle it appropriately. Yeah, it is a little misleading. According to https://github.com/reactphp/stream, the drain event is mostly used internally by the write() method. But then the write() method says to use the drain event. It seems to me and I understand that you agree that there is no need to implement any write buffering after all. drain event The drain event will be emitted whenever the write buffer became full previously and is now ready to accept more data. ... This event is mostly used internally, see also write() for more details. write() The write(mixed $data): bool method can be used to write some data into the stream. A successful write MUST be confirmed with a boolean true, which means that either the data was written (flushed) immediately or is buffered and scheduled for a future write. Note that this interface gives you no control over explicitly flushing the buffered data, as finding the appropriate time for this is beyond the scope of this interface and left up to the implementation of this interface. ... If a stream cannot handle writing (or flushing) the data, it SHOULD emit an error event and MAY close() the stream if it can not recover from this error. If the internal buffer is full after adding $data, then write() SHOULD return false, indicating that the caller should stop sending data until the buffer drains. The stream SHOULD send a drain event once the buffer is ready to accept more data. Similarly, if the the stream is not writable (already in a closed state) it MUST NOT process the given $data and SHOULD return false, indicating that the caller should stop sending data. ... EDIT. After testing a little more, it is possible to break the application by not explicitly buffering and using the drain. I will come up with an implementation and post it. Thanks PHP Fatal error: Allowed memory size of 134217728 bytes exhausted (tried to allocate 10489856 bytes) in /var/www/public/testing/LengthPrefixStream.php on line 33 Edited June 26, 2017 by NotionCommotion Quote Link to comment Share on other sites More sharing options...
Recommended Posts
Join the conversation
You can post now and register later. If you have an account, sign in now to post with your account.