NotionCommotion Posted February 17, 2017 Share Posted February 17, 2017 The following script seems to work, however, might have a couple of issues. First, LengthPrefixStream::send()seems to work, however, the receiving client doesn't respond correctly to it. See any issues with it? LengthPrefixStream::parseBuffer() works by making sure that $this->buffer always starts with a new packet/message (don't know the right word to use for this), and then knows that the first four bytes indicate the length of the message. See any issues? One concern I have is "if" it is sent content with more or less bytes than the length prefix indicates, $this->buffer might not always start with a new message, and I will lose the ability to find out the length prefix. Should this be a concern? Thanks <?php require 'vendor/autoload.php'; $port = isset($argv[1])?$argv[1]:1337; $host = isset($argv[2])?$argv[2]:'0.0.0.0'; $server=new Server($port,$host); $server->start(); class Server { private $port,$host,$client; public function __construct($port,$host) { $this->port=$port; $this->host=$host; } public function start() { $loop = React\EventLoop\Factory::create(); $socket = new React\Socket\Server($loop); $socket->on('connection', function (\React\Socket\ConnectionInterface $stream){ $client = new DataLogger\Server\LengthPrefixStream($stream,'o'); $this->client=$client; echo("New connection accepted.".PHP_EOL); $client->on('data', function($rsp) use ($client){ echo('received: '.json_encode($rsp).PHP_EOL); }); }); $loop->addPeriodicTimer(15, function() { $this->client->send(["method"=>"discovery.Start"]); }); $socket->listen($this->port,$this->host); echo("TCP Socket Server Started: {$this->host}:{$this->port} \r\n"); $loop->run(); } } <?php namespace DataLogger\Server; use Evenement\EventEmitterInterface; use Evenement\EventEmitterTrait; use React\Stream\DuplexStreamInterface; class LengthPrefixStream implements EventEmitterInterface { use EventEmitterTrait; private $socket, $buffer='', $messageLength=false, $type; //Type of data provided and returned. Can be s for string, a for array, or o for object. If array is associated, will be sent as an object. public function __construct(DuplexStreamInterface $socket, $type='s'){ if (!in_array($type,['s','a','o'])){ trigger_error("Invalid LengthPrefixStream type.", E_USER_ERROR); } $this->socket = $socket; $this->type=$type; $this->socket->on('data', function($data){ //echo("LengthPrefixStream on data: $data".PHP_EOL); $this->buffer .= $data; $this->parseBuffer(); }); } public function send($message){ //How should drain be implemented? if($this->type!='s') { $message = json_encode($message); } echo("send: $message".PHP_EOL); $lng=pack("V", strlen($message)); //Is this correct? $this->socket->write($lng.$message); } private function parseBuffer(){ //Question. What happens if something goes wrong and I lose $this->messageLength so I can't find the length prefix? if(!$this->messageLength) { //Save the first time data is received or if not enough stream was provided to determine the length $this->messageLength=$this->getLength($this->buffer); } while (strlen($this->buffer)>=($this->messageLength+4)){ $message = substr($this->buffer, 4, $this->messageLength); if($this->type=='s') { $this->emit('data', [$message]); } else { //emit either an array or object $message = json_decode($message,$this->type=='a'); if (json_last_error() == JSON_ERROR_NONE){ $this->emit('data', [$message]); } } $this->buffer = substr($this->buffer, $this->messageLength+4); $this->messageLength=strlen($this->buffer)>=4?$this->getLength($this->buffer):false; } } private function getLength($string){ // Appears length prefix is given as unsigned long (always 32 bit, little endian byte order) /* L unsigned long (always 32 bit, machine byte order) N unsigned long (always 32 bit, big endian byte order) V unsigned long (always 32 bit, little endian byte order) */ return unpack('Vlen', substr($string,0,4))['len']; } } Quote Link to comment Share on other sites More sharing options...
requinix Posted February 17, 2017 Share Posted February 17, 2017 I would expect parseBuffer to look more like do { if have message length { if buffer is >=length long { get message as substr(buffer, 0, length) deal with message adjust buffer to substr(buffer, length) reset length } else { quit loop // wait for rest of message } } else { if buffer is >=4 long { get length as substr(buffer, 0, 4) and unpack adjust buffer to substr(buffer, 4) } else { quit loop // wait for rest of length } } } indefinitely // will keep going until buffer runs out Quote Link to comment Share on other sites More sharing options...
NotionCommotion Posted February 17, 2017 Author Share Posted February 17, 2017 (edited) I would expect parseBuffer to look more like Thanks, My send() method look okay? As far parseBuffer(), I believe it performs the same task, however, your implementation is easier to follow. private function parseBuffer(){ //How can I validate that the received message length prefix matchs the length of the message so it doesn't get out of sync? do { if($this->messageLength) { if(strlen($this->buffer)>=$this->messageLength){ $message=substr($this->buffer, 0, $this->messageLength); if($this->type=='s') { $this->emit('data', [$message]); } else { //emit either an array or object $message = json_decode($message,$this->type=='a'); if (json_last_error() == JSON_ERROR_NONE){ $this->emit('data', [$message]); } } $this->buffer = substr($this->buffer, $this->messageLength); $this->messageLength=false; } else { break; } } else { if(strlen($this->buffer)>=4){ $this->messageLength=unpack('Vlen', substr($this->buffer,0,4))['len']; $this->buffer = substr($this->buffer, 4); } else { break; } } } while (true); } Edited February 17, 2017 by NotionCommotion Quote Link to comment Share on other sites More sharing options...
requinix Posted February 17, 2017 Share Posted February 17, 2017 //How can I validate that the received message length prefix matchs the length of the message so it doesn't get out of sync?...you can't. Because you're using that length to decide how long the message is. If the other end sends the wrong length then it's their fault, and you'll probably discover it when the json_decode() fails. My send() method look okay?It's pretty simple so yeah. Quote Link to comment Share on other sites More sharing options...
kicken Posted February 17, 2017 Share Posted February 17, 2017 I generally prefer to avoid changing the buffer unless I know I can extract a complete message. That way you don't have to keep track of state or partial information. For example: function parseBuffer(){ do { $checkAgain = false; $bufferLength = strlen($this->buffer); $length = $this->getLength(substr($this->buffer, 0, 4)); if ($bufferLength >= $length + 4){ $message = substr($this->buffer, 4, $length); $this->buffer = substr($this->buffer, $length+4); $this->emit('data', [$message]); $checkAgain = true; } } while ($checkAgain); } The buffer should always begin with a 4-byte length value assuming no transmission issues. Assume the first four bytes are the length and extract them then check if there's enough left in the buffer to satisfy the length. If not, do nothing and just return to let more data arrive. If so, extract the complete message, modify the buffer then check again for a second message. If you want to try and ensure that you don't try and parse bad data then you can do things like add checksums or require every packet to begin with a Magic number/code that you check against. If the magic ever doesn't match consider the connection failed and close it. Quote Link to comment Share on other sites More sharing options...
NotionCommotion Posted February 18, 2017 Author Share Posted February 18, 2017 ...you can't. Because you're using that length to decide how long the message is. If the other end sends the wrong length then it's their fault, and you'll probably discover it when the json_decode() fails. Started thinking about it, and can came to the same "then it's their fault" conclusion. Not saying it is everything, but this is one benefit of using deliminator over length prefixes. Quote Link to comment Share on other sites More sharing options...
NotionCommotion Posted February 18, 2017 Author Share Posted February 18, 2017 I generally prefer to avoid changing the buffer unless I know I can extract a complete message. That way you don't have to keep track of state or partial information. requinix's sudo code was simpler than mine, but yours takes that cake! If you want to try and ensure that you don't try and parse bad data then you can do things like add checksums or require every packet to begin with a Magic number/code that you check against. I would rather not. How important do you think this is? ... consider the connection failed and close it. Interesting. Would invalid JSON be a condition to close the connection? What about X number of invalid JSON transmissions within a given timeframe? If so, do you have any rules of thumb? Quote Link to comment Share on other sites More sharing options...
kicken Posted February 18, 2017 Share Posted February 18, 2017 I would rather not. How important do you think this is? If you're using a TCP stream for your connection and are comfortable assuming your clients/servers don't make mistakes then you'd be fine without any additional stuff probably. TCP will already ensure that all the data arrives in order and without accidental corruption so the only thing that might cause a problem would be a client sending bad data to begin with. So long as all your clients send packets in the form of "$length$data" and $length is always accurate you'll be fine. Interesting. Would invalid JSON be a condition to close the connection? What about X number of invalid JSON transmissions within a given timeframe? If so, do you have any rules of thumb? Closing the connection is just the easiest thing to do if your connection is somehow out of sync/receiving bad data. You could try and implement some kind of recovery that gets things back in sync but it's extra work. Just close the connection and let the client re-connect if they want. That'll reset things back to a known state with relatively little effort (you'll need reconnect capability anyway for network failures). Quote Link to comment Share on other sites More sharing options...
Recommended Posts
Join the conversation
You can post now and register later. If you have an account, sign in now to post with your account.