Jump to content

Recommended Posts

Can anyone point me in the right direction to implement compression with sockets and ReactPHP?  I will be sending json rpc messages and using a length prefix to allow it to be parsed.

 

Is it as simple as just compressing the string before sending it:

$msg=json_encode($msg);
$compressedMsg=gzcompress($msg, 9);
$this->socket->write(pack("V", strlen($compressedMsg)).$compressedMsg);

And then uncompressing it before adding it to the buffer?

$this->socket->on('data', function($data){
    $this->buffer .= gzuncompress($data);
    $this->parseBuffer();
});

I've found this class https://github.com/clue/php-zlib-react which is used for compressing, but if it is as easy as I showed, what is the point?

 

Thanks

Link to comment
https://forums.phpfreaks.com/topic/304163-gzip-or-zlib-with-sockets/
Share on other sites

Note that gzip and zlib are not compression algorithms. This StackOverflow answer does a good job of explaining zip (a program and format), gzip (a program and format), zlib (mostly a library but also a format), compress (a program), deflate (a compression algorithm), and even mentions LZW (a compression algorithm).

 

- gzcompress() uses the zlib format with the deflate algorithm and gives compression, a header, and error detection (Adler-32)

- gzencode() uses the gzip format with the deflate algorithm and gives compression, a header, and error detection (CRC32)

- gzdeflate() is the deflate algorithm and gives compression

 

Your first decision is whether you want the error detection or not. Over TCP you probably don't need it but unless we're talking about sending tons of tiny messages then I would still use it.

 

If you do then you can choose zlib/gzcompress or gzip/gzencode. It doesn't really matter which you use. I'd look at portability but most software uses zlib (the library), which can handle everything, so that doesn't matter much either.

 

If I had to choose? zlib, because gzip is geared more towards files.

Yes, that is a good answer.  I actually skimmed it earlier today, but have since done so in more detail.

 

I was planning on going with zlib as well as I have since read it is more often used with streams.

 

Is my described implementation approach correct?  What is the point of using the above referenced class do you think?

You don't need compression level 9. Remember that higher compression means more processing time and has diminishing returns with compressed length. 6 is pretty decent. Otherwise it seems fine, though you should check for errors from gzuncompress (returns false).

 

As for using the class, it seems convenient for most uses but in your case I think doing it manually would be easier.

You still don't seem to understand the concept of streams.

 

Your $data is not a message. It's not the string you've sent. It's a portion of the original string which has been turned into a byte stream. If I send you “Hello”, you might receive that as “H” – “e” – “l” – “l” – “o” or “He” – “ll” – “o”  “Hel” – “lo” or any other variation. There's no guarentee whatsoever that you'll receive the full message all at once. That's the difference between message-oriented protocols (like UDP) and stream-oriented protocols (like TCP).

 

You cannot gzuncompress() a fraction of a gzip stream or json_decode() a fraction of a JSON document, because those functions have no idea what to do with the partial data. They expect the whole message, which makes them unsuitable for a streaming context. You could of course wait until all data has been received and then start the processing, but this kinda misses the point of streaming.

 

That's why people have come up with the idea of decompressing streams rather than full messages. And lo and behold, that's what the class does.

 

 

  • Like 1

You still don't seem to understand the concept of streams.

 

Your $data is not a message. It's not the string you've sent. It's a portion of the original string which has been turned into a byte stream. If I send you “Hello”, you might receive that as “H” – “e” – “l” – “l” – “o” or “He” – “ll” – “o”  “Hel” – “lo” or any other variation. There's no guarentee whatsoever that you'll receive the full message all at once. That's the difference between message-oriented protocols (like UDP) and stream-oriented protocols (like TCP).

 

You cannot gzuncompress() a fraction of a gzip stream or json_decode() a fraction of a JSON document, because those functions have no idea what to do with the partial data. They expect the whole message, which makes them unsuitable for a streaming context. You could of course wait until all data has been received and then start the processing, but this kinda misses the point of streaming.

 

That's why people have come up with the idea of decompressing streams rather than full messages. And lo and behold, that's what the class does.

 

 

I thought TCP does guarantee that the order will be right.  It pretty much needs to be to make kicken's JSONStream.php parsing class work (https://forums.phpfreaks.com/topic/302840-http-server-with-two-hosts-and-same-port/), doesn't it?  My understanding is that I would be compressing a stream, not a message.

<?php
namespace Kicken\RLGL;


use Evenement\EventEmitterInterface;
use Evenement\EventEmitterTrait;
use React\Stream\DuplexStreamInterface;


class JSONStream implements EventEmitterInterface {
    use EventEmitterTrait;


    private $socket;
    private $buffer;


    public function __construct(DuplexStreamInterface $socket){
        $this->socket = $socket;
        $this->buffer = '';


        $this->socket->on('data', function($data){
            $this->buffer .= $data;
            $this->parseBuffer();
        });
    }


    public function send($structure){
        $json = json_encode($structure);
        $this->socket->write($json."\r\n");
    }


    private function parseBuffer(){
        $eol = strpos($this->buffer, "\r\n");
        while ($eol !== false){
            $json = substr($this->buffer, 0, $eol);
            $this->buffer = substr($this->buffer, $eol + 2);
            $data = json_decode($json, true);


            if (json_last_error() == JSON_ERROR_NONE){
                $this->emit('data', [$data]);
            }


            $eol = strpos($this->buffer, "\r\n");
        }
    }
}

TCP guarantees the order of the segments. It does not guarantee that a segment contains a complete message.

 

You're trying to gzuncompress() a TCP segment. This is nonsensical, because you have no idea what that segment contains. It might be a fragment of the header or a small portion of the data stream. gzuncompress() has no idea what to do with that. If you want to decompress the stream, you need a stream decompressor -- which is what that clue library is for.

 

You asked why it is so complicated. Well, why is kicken's class so complicated? Why not just json_decode() the $data and be done with it? Because $data isn't a full message, just a fragment of a stream.

Okay, good, the order is what maters.  If it doesn't contain a complete message, it will eventually  I will just keep on blindly adding the stream to the buffer, and when the buffer exceeds the indicated length of the message, I will now have a complete message, and could finally json_decode() it.

 

Unless....

 

By compressing it, somehow isn't fed to my stream the same way?

I'm still not sure if you understand the problem.

 

Right now, you have a JSON document which is sent over TCP. Since TCP splits the document into arbitrary segments rather than sending it as a whole message, you've implemented/copied a class which collects the segments until a newline character is encountered, at which point a complete document may be parsed. Cool.

 

Now you want to compress the JSON documents and send the compressed data (which is a complex format with a header followed by the payload). This again leads to the problem that you receive arbitrary fragments of the compressed structure rather than the whole structure. You cannot simple decompress a fragment like you tried in your first post.

 

You have two options:

  • You throw away pretty much all current code and switch to delimited messages which are first collected, then decompressed as a whole and then JSON-decoded as a whole. Again: This is fundamentally incompatible will all code you've shown so far.
  • You keep the current code and simply add a stream compressor/decompressor (like the class in the original post) on top of it. This class transparently compresses the data as you send it and decompresses it as you receive it. There's no need for length prefixes, there's no need to throw away kicken's JSON class. You simply do what you currently do, except that you receive the JSON fragments from the stream decompressor rather than directly from TCP.

Well, now I have to apologize.  When I posted the kicken class, I forgot that I am not still using this approach, and am doing the following (still I think strongly inspired by kicken).

 

I did indicate in my original post that I am using length prefixes (which was original inspired by you), however, am not expecting you or any other to come through details to derive such.

 

Does this change things?

 

Thanks for your help.

 

PS.  Not just copied :)

<?php
namespace DataApp\Server;


use Evenement\EventEmitterInterface;
use Evenement\EventEmitterTrait;
use React\Stream\DuplexStreamInterface;


/*
Parses stream, and emits data, or error bad JSON received.
*/
class LengthPrefixStream implements EventEmitterInterface {


    use EventEmitterTrait;


    public $app; //Will be set by server after the fact.


    private $socket=false,
    $buffer='',
    $parseJson; //0=>no, 1=>yes and return object, 2=>yes and return array


    public function __construct(DuplexStreamInterface $socket, $log, $parseJson=1){
        $this->log = $log;
        $this->parseJson = $parseJson;
        $this->socket = $socket;
        $this->socket->on('data', function($data){
            $this->buffer .= $data;
            $this->parseBuffer();
        });
    }


    public function send($msg){
        //Do I need to parse???  Still need to implement drain
        if($this->isConnected()) {
            if($this->parseJson) {
                $msg=json_encode($msg);
            }
            if(!$this->socket->write(pack("V", strlen($msg)).$msg)) {
                $this->log->error("LengthPrefixStream::send() didn't write: $msg");
            }
            return true;
        }
        else {
            $this->log->error("LengthPrefixStream::send() not connected (should never happen). ".$msg);
            return false;
        }
    }
    public function test($msg, $prefixLengthError){
        $this->log->debug("Test: $msg $prefixLengthError (".(strlen($msg)+$prefixLengthError).")");
        if(!$this->socket->write(pack("V", strlen($msg)+$prefixLengthError).$msg)) {
            $this->log->error("TEST LengthPrefixStream::send() didn't write");
        }
        return true;
    }


    private function parseBuffer(){
        do {
            $checkAgain = false;
            $bufferLength = strlen($this->buffer);
            $length = unpack('Vlen', substr($this->buffer, 0, 4))['len'];
            if(is_int($length)) {
                if($bufferLength >= $length + 4){
                    $msg=substr($this->buffer, 4, $length);
                    if($this->parseJson) {
                        $msg=json_decode($msg,$this->parseJson-1);
                        if (json_last_error() == JSON_ERROR_NONE){
                            $this->emit('data', [$msg]);
                        }
                        else {
                            $this->emit('error', ['Invalid JSON: '.json_last_error_msg()." (".json_last_error().'): '.substr($this->buffer, 4, $length)]);
                        }
                    }
                    else {
                        $this->emit('data', [$msg]);
                    }
                    $this->buffer = substr($this->buffer, $length+4);
                    $checkAgain = strlen($this->buffer)>=4;
                }
            }
            else {
                $this->emit('error', ["Invalid length prefix provided by client: ".substr($this->buffer, 0, 3)]);
            }
        } while ($checkAgain);
    }


    public function isConnected()
    {
        return $this->socket?true:false;
    }


    public function close()
    {
        $this->socket->close();
    }
}

 

I did indicate in my original post that I am using length prefixes

 

You did, but the code didn't even make sense on a conceptual level, so this looked more like a work in progress rather than a fully though-out project.

 

Anyway, I hope it's clear now why the class is relatively complex and why a simple gzuncompress() doesn't work, at least not in the way you've shown.

 

Length-prefixed messages do work as long as you actually wait for the full message before you decompress and then decode it.

 

The new code looks more reasonable. You should change the prefix check, though. Right now, a message shorter than 4 bytes will trigger a notice due to a failing unpack(). You should first check the length, then unpack the data. Or suppress the notice, but that's not exactly pretty.

Thanks Jacques1,

 

Appreciate the advice about the prefix check for under 4 byte messages.  Don't know if I experienced this issue yet, but will eventually will, and probably have unwittingly have.

 

Yes, I now see how my uncompressing before sending to the buffer isn't right as the lengths will be for different strings of bytes.

 

My intent is not to use the class I earlier referenced.  Providing I understand what is going on, do you think this a bad decision?

 

If you are a father, wishing you a happy Fathers Day!  If no, wishing the same to your father!

Yes, I now see how my uncompressing before sending to the buffer isn't right as the lengths will be for different strings of bytes.

 

No, it's because gzuncompress() has no idea what to do with fragments of GZIP structures. It expects a complete structure which is only available after the entire message has arrived.

 

 

 

My intent is not to use the class I earlier referenced.  Providing I understand what is going on, do you think this a bad decision?

 

The class is incompatible with your prefix approach, so if you've already chosen the latter, you cannot have the other.

 

Like I said, length prefixes work (which is why I suggested them) and have the benefit of being very, very simple. Actual stream decompressors which process the data on the fly are much more sophisticated, but they have other benefits like not requiring large message buffers to aggregate the TCP segments (each processed segment can simply be thrown away).

 

In your case, I don't think any of this matters. Even compression may be total overkill.

This thread is more than a year old. Please don't revive it unless you have something important to add.

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...

×   Pasted as rich text.   Restore formatting

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.

×
×
  • Create New...

Important Information

We have placed cookies on your device to help make this website better. You can adjust your cookie settings, otherwise we'll assume you're okay to continue.