Jump to content

Implementing lenght prefix with stream


NotionCommotion

Recommended Posts

The following script seems to work, however, might have a couple of issues.

 

First, LengthPrefixStream::send()seems to work, however, the receiving client doesn't respond correctly to it.  See any issues with it?

 

LengthPrefixStream::parseBuffer() works by making sure that $this->buffer always starts with a new packet/message (don't know the right word to use for this), and then knows that the first four bytes indicate the length of the message.  See any issues?  One concern I have is "if" it is sent content with more or less bytes than the length prefix indicates, $this->buffer might not always start with a new message, and I will lose the ability to find out the length prefix.  Should this be a concern?

 

Thanks

<?php
require 'vendor/autoload.php';

$port = isset($argv[1])?$argv[1]:1337;
$host = isset($argv[2])?$argv[2]:'0.0.0.0';

$server=new Server($port,$host);
$server->start();

class Server
{
    private $port,$host,$client;

    public function __construct($port,$host)
    {
        $this->port=$port;
        $this->host=$host;
    }

    public function start() {
        $loop = React\EventLoop\Factory::create();
        $socket = new React\Socket\Server($loop);
        $socket->on('connection', function (\React\Socket\ConnectionInterface $stream){
            $client = new DataLogger\Server\LengthPrefixStream($stream,'o');
            $this->client=$client;
            echo("New connection accepted.".PHP_EOL);
            $client->on('data', function($rsp) use ($client){
                echo('received: '.json_encode($rsp).PHP_EOL);
            });

        });

        $loop->addPeriodicTimer(15, function() {
            $this->client->send(["method"=>"discovery.Start"]);
        });

        $socket->listen($this->port,$this->host);
        echo("TCP Socket Server Started: {$this->host}:{$this->port} \r\n");

        $loop->run();
    }
}
<?php
namespace DataLogger\Server;

use Evenement\EventEmitterInterface;
use Evenement\EventEmitterTrait;
use React\Stream\DuplexStreamInterface;

class LengthPrefixStream implements EventEmitterInterface {
    use EventEmitterTrait;

    private $socket,
    $buffer='',
    $messageLength=false,
    $type;  //Type of data provided and returned.  Can be s for string, a for array, or o for object.  If array is associated, will be sent as an object.

    public function __construct(DuplexStreamInterface $socket, $type='s'){
        if (!in_array($type,['s','a','o'])){
            trigger_error("Invalid LengthPrefixStream type.", E_USER_ERROR);
        }
        $this->socket = $socket;
        $this->type=$type;

        $this->socket->on('data', function($data){
            //echo("LengthPrefixStream on data: $data".PHP_EOL);
            $this->buffer .= $data;
            $this->parseBuffer();
        });
    }

    public function send($message){
        //How should drain be implemented?
        if($this->type!='s') {
            $message = json_encode($message);
        }
        echo("send: $message".PHP_EOL);
        $lng=pack("V", strlen($message));  //Is this correct?
        $this->socket->write($lng.$message);
    }

    private function parseBuffer(){
        //Question.  What happens if something goes wrong and I lose $this->messageLength so I can't find the length prefix?
        if(!$this->messageLength) {
            //Save the first time data is received or if not enough stream was provided to determine the length
            $this->messageLength=$this->getLength($this->buffer);
        }
        while (strlen($this->buffer)>=($this->messageLength+4)){
            $message = substr($this->buffer, 4, $this->messageLength);
            if($this->type=='s') {
                $this->emit('data', [$message]);
            }
            else {
                //emit either an array or object
                $message = json_decode($message,$this->type=='a');                
                if (json_last_error() == JSON_ERROR_NONE){
                    $this->emit('data', [$message]);
                }
            }
            $this->buffer = substr($this->buffer, $this->messageLength+4);
            $this->messageLength=strlen($this->buffer)>=4?$this->getLength($this->buffer):false;
        }
    }

    private function getLength($string){
        // Appears length prefix is given as unsigned long (always 32 bit, little endian byte order)
        /*
        L     unsigned long (always 32 bit, machine byte order)
        N     unsigned long (always 32 bit, big endian byte order)
        V     unsigned long (always 32 bit, little endian byte order)
        */
        return unpack('Vlen', substr($string,0,4))['len'];
    }
}
Link to comment
Share on other sites

I would expect parseBuffer to look more like

do {
	if have message length {
		if buffer is >=length long {
			get message as substr(buffer, 0, length)
			deal with message
			adjust buffer to substr(buffer, length)
			reset length
		} else {
			quit loop // wait for rest of message
		}
	} else {
		if buffer is >=4 long {
			get length as substr(buffer, 0, 4) and unpack
			adjust buffer to substr(buffer, 4)
		} else {
			quit loop // wait for rest of length
		}
	}
} indefinitely // will keep going until buffer runs out
Link to comment
Share on other sites

I would expect parseBuffer to look more like

 

Thanks,

 

My send() method look okay?

 

As far parseBuffer(), I believe it performs the same task, however, your implementation is easier to follow.

    private function parseBuffer(){
        //How can I validate that the received message length prefix matchs the length of the message so it doesn't get out of sync?
        do {
            if($this->messageLength) {
                if(strlen($this->buffer)>=$this->messageLength){
                    $message=substr($this->buffer, 0, $this->messageLength);
                    if($this->type=='s') {
                        $this->emit('data', [$message]);
                    }
                    else {
                        //emit either an array or object
                        $message = json_decode($message,$this->type=='a');                
                        if (json_last_error() == JSON_ERROR_NONE){
                            $this->emit('data', [$message]);
                        }
                    }
                    $this->buffer = substr($this->buffer, $this->messageLength);
                    $this->messageLength=false;
                } else {
                    break;
                }
            } else {
                if(strlen($this->buffer)>=4){
                    $this->messageLength=unpack('Vlen', substr($this->buffer,0,4))['len'];
                    $this->buffer = substr($this->buffer, 4);
                } else {
                    break;
                }
            }
        } while (true);
    }
Edited by NotionCommotion
Link to comment
Share on other sites

//How can I validate that the received message length prefix matchs the length of the message so it doesn't get out of sync?
...you can't. Because you're using that length to decide how long the message is. If the other end sends the wrong length then it's their fault, and you'll probably discover it when the json_decode() fails.

 

My send() method look okay?

It's pretty simple so yeah.
Link to comment
Share on other sites

I generally prefer to avoid changing the buffer unless I know I can extract a complete message. That way you don't have to keep track of state or partial information. For example:

function parseBuffer(){
    do {
        $checkAgain = false;
        $bufferLength = strlen($this->buffer);
        $length = $this->getLength(substr($this->buffer, 0, 4));
        if ($bufferLength >= $length + 4){
            $message = substr($this->buffer, 4, $length);
            $this->buffer = substr($this->buffer, $length+4);

            $this->emit('data', [$message]);
            $checkAgain = true;
        } 
    } while ($checkAgain);
}
The buffer should always begin with a 4-byte length value assuming no transmission issues. Assume the first four bytes are the length and extract them then check if there's enough left in the buffer to satisfy the length. If not, do nothing and just return to let more data arrive. If so, extract the complete message, modify the buffer then check again for a second message.

 

If you want to try and ensure that you don't try and parse bad data then you can do things like add checksums or require every packet to begin with a Magic number/code that you check against. If the magic ever doesn't match consider the connection failed and close it.

Link to comment
Share on other sites

...you can't. Because you're using that length to decide how long the message is. If the other end sends the wrong length then it's their fault, and you'll probably discover it when the json_decode() fails.

 

 

Started thinking about it, and can came to the same "then it's their fault" conclusion.  Not saying it is everything, but this is one benefit of using deliminator over length prefixes.

Link to comment
Share on other sites

I generally prefer to avoid changing the buffer unless I know I can extract a complete message. That way you don't have to keep track of state or partial information.

requinix's sudo code was simpler than mine, but yours takes that cake!

 

If you want to try and ensure that you don't try and parse bad data then you can do things like add checksums or require every packet to begin with a Magic number/code that you check against.

I would rather not.  How important do you think this is?

 

... consider the connection failed and close it.

Interesting.  Would invalid JSON be a condition to close the connection?  What about X number of invalid JSON transmissions within a given timeframe?  If so, do you have any rules of thumb?

Link to comment
Share on other sites

I would rather not.  How important do you think this is?

If you're using a TCP stream for your connection and are comfortable assuming your clients/servers don't make mistakes then you'd be fine without any additional stuff probably. TCP will already ensure that all the data arrives in order and without accidental corruption so the only thing that might cause a problem would be a client sending bad data to begin with. So long as all your clients send packets in the form of "$length$data" and $length is always accurate you'll be fine.

 

Interesting.  Would invalid JSON be a condition to close the connection?  What about X number of invalid JSON transmissions within a given timeframe?  If so, do you have any rules of thumb?

Closing the connection is just the easiest thing to do if your connection is somehow out of sync/receiving bad data. You could try and implement some kind of recovery that gets things back in sync but it's extra work. Just close the connection and let the client re-connect if they want. That'll reset things back to a known state with relatively little effort (you'll need reconnect capability anyway for network failures).

Link to comment
Share on other sites

This thread is more than a year old. Please don't revive it unless you have something important to add.

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...

×   Pasted as rich text.   Restore formatting

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.

×
×
  • Create New...

Important Information

We have placed cookies on your device to help make this website better. You can adjust your cookie settings, otherwise we'll assume you're okay to continue.