NotionCommotion Posted June 7, 2019 Share Posted June 7, 2019 Browser HTTP client makes a request to a HTTP web server which makes a HTTP cURL request to a HTTP REST API which initiates a ReactPHP socket client to make a request to a socket server, and the socket server script eventually execute the following method: public function executeSpecificRequestCommand(array $data):bool { $status = $this->doSomething($data); return $status; //{success: $status} will be returned to the socket client } All is good until doSomething() takes a lot of time and results in a cURL error between the HTTP web server and HTTP REST API. For this particular case, the task isn't meant to provide immediate feedback to the user, but to do some work and update the database, and as such, my desire is to return true status and then perform the work instead of extending the cURL timeout. One option is to ran some process in the background and return status, but I don't think doing so is really right. Using a queue seems excessive as I am already decoupled via the socket. As such, I will probably just add some logic between the initial $string->on('data') and this executeSpecificRequestCommand() method to determine whether the success message should be returned before or after the method is complete. Before doing so, however, I would like to know if there is a more appropriate approach to this scenario. It appears that maybe a child process or a deferred might be appropriate, but am not sure whether I am going down the wrong path. Quote Link to comment Share on other sites More sharing options...
requinix Posted June 8, 2019 Share Posted June 8, 2019 Pick a point in the process where you decide that it is potentially long-running. Have the server end return immediate information (eg, HTTP has a 201 Created response), possibly with information that allows the client to poll the server for the final result. Then end the connection and run in the background. Quote Link to comment Share on other sites More sharing options...
NotionCommotion Posted June 10, 2019 Author Share Posted June 10, 2019 Thanks requinix, Any reason this this logic shouldn't be located at the socket server instead of the HTTP server? Benefits seem to be: 1) the HTTP side shouldn't need to know it is a long process and 2) it provides more information specifically that the SS has been reached without decrease in UX. Currently, I haven't injected or passed the connection to executeSpecificRequestCommand(), but if I did, I could do something like the following: public function executeSpecificRequestCommand(array $data):void { $this->connection->send(['success'=>true]); //which will be returned to HTTP server's socket client which will then result in a HTTP 201 response $result = $this->doSomething($data); //which takes a long time $this->saveInDB($result); } But what I was trying to get at (granted, my subject title was really crappy) is "I think" XMLHttpRequest kind of does what a promise/deferred does (still got to read up). var xhttp = new XMLHttpRequest(); xhttp.onreadystatechange = function() { if (this.readyState == 4 && this.status == 200) { //response from really long process (i.e. HTTP request between client and server } }; xhttp.open("GET", "bla.php", true); xhttp.send(); ... and I shouldn't need/want to pass the connection, but instead do something like... public function executeSpecificRequestCommand(array $data):bool { $thing = new XMLHttpRequestLikePromise(); $thing->onComplete = function() { $this->saveInDB($result); }; $thing->open('doSomething', $data); return $thing->startAkaSend(); } Granted, the above script is nonsense, but hopefully communicates what I am trying to convey. Quote Link to comment Share on other sites More sharing options...
requinix Posted June 10, 2019 Share Posted June 10, 2019 2 hours ago, NotionCommotion said: Any reason this this logic shouldn't be located at the socket server instead of the HTTP server? Benefits seem to be: 1) the HTTP side shouldn't need to know it is a long process and 2) it provides more information specifically that the SS has been reached without decrease in UX. Sounds like you picked a good point in the process. 2 hours ago, NotionCommotion said: But what I was trying to get at (granted, my subject title was really crappy) is "I think" XMLHttpRequest kind of does what a promise/deferred does (still got to read up). Not... really? A Promise is a design pattern for dealing with asynchronous operations. XMLHttpRequest doesn't do that - it's just pure functionality. If I could I would have the socket server (the place you chose) try to determine whether the operation will be long-running or not. Would be nice. Anyways, the server should use one response if the request was completed (eg, 200) and another distinctly different response if it was accepted but is not yet completed (eg, 201). The client recognizes each for what it is and, perhaps, passes that information upstream back to the HTTP portion. Whether the client can then "saveInDB", I don't know, I would imagine that depends on whether "accepted but not yet completed" counts as success. Perhaps not. Quote Link to comment Share on other sites More sharing options...
NotionCommotion Posted June 11, 2019 Author Share Posted June 11, 2019 On 6/10/2019 at 7:08 AM, requinix said: Not... really? A Promise is a design pattern for dealing with asynchronous operations. XMLHttpRequest doesn't do that - it's just pure functionality. I am confusing promises with callbacks. But are not both promises and callbacks both design patterns for dealing with asynchronous operations, and while XMLHttpRequest might not directly provide asynchronous functionality, does it not exhibit such behavior through use of promises or callbacks? It seems to me that the main differences is if multiple callbacks are used, they must be nested and each must have its own catch(), however, promises are chained and have a single common catch(). No? My previous post showed something like the following. I would have thought that ['success'=>true] would have been first returned to the caller stream before the sleep blocking function, but I am observing differently. Do you think I have some error elsewhere and shouldn't be observing differently? function executeSpecificRequestCommand(\React\Socket\ConnectionInterface $connection){ syslog(LOG_INFO, 'start'); $msg = json_encode(['success'=>true]); $connection->write(pack("V", strlen($msg)).$msg); exec('sleep 60'); syslog(LOG_INFO, 'save results'); } $loop = \React\EventLoop\Factory::create(); (new \React\Socket\TcpServer('127.0.0.1:1337', $loop)) ->on('connection', function (\React\Socket\ConnectionInterface $connection) { $connection->on('data', executeSpecificRequestCommand($data)); }); $loop->run(); Assuming that I am observing the expected behavior, I will need to utilize either a callback or promise, or do the command in the background. Note really sure about best way to utilize callbacks, and am thinking that maybe adding a background task to a gearmanclient? For using promises, maybe something like the non-working following? function executeSpecificRequestCommand(\React\Socket\ConnectionInterface $connection, array $data){ syslog(LOG_INFO, 'start'); someAsyncOperation($data) //sleep 60 ->then(function($result){ syslog(LOG_INFO, 'save results'); }) ->catch(function($error){ // Handle error }); $msg = json_encode(['success'=>true]); $connection->write(pack("V", strlen($msg)).$msg); syslog(LOG_INFO, 'end'); } On 6/10/2019 at 7:08 AM, requinix said: ... Anyways, the server should use one response if the request was completed (eg, 200) and another distinctly different response if it was accepted but is not yet completed (eg, 201). The client recognizes each for what it is and, perhaps, passes that information upstream back to the HTTP portion... I like your use of the 200 versus 201 responses. Thanks! Quote Link to comment Share on other sites More sharing options...
kicken Posted June 11, 2019 Share Posted June 11, 2019 1 hour ago, NotionCommotion said: My previous post showed something like the following. I would have thought that ['success'=>true] would have been first returned to the caller stream before the sleep blocking function, but I am observing differently. Do you think I have some error elsewhere and shouldn't be observing differently? The reason for your observations is that calling the write method doesn't immediately send the data over the wire. Instead it just adds it to the write buffer which will then be flushed to the socket at some point in the near future, probably on the next loop tick. So what you would need to do is find a way to wait until you know the write has gone through before you start your long operation. Unfortunately I don't think react has a nice way to do this. The only way would probably be to just set a timeout on the loop to run your operation after a seconds delay or similar. Quote Link to comment Share on other sites More sharing options...
NotionCommotion Posted June 11, 2019 Author Share Posted June 11, 2019 6 minutes ago, kicken said: The reason for your observations is that calling the write method doesn't immediately send the data over the wire. Instead it just adds it to the write buffer which will then be flushed to the socket at some point in the near future, probably on the next loop tick. Thanks kicken, When executing sleep 60, I would see a delay of 60 seconds which definitely exceeds the next loop tick. I just stumbled upon a possible culprit. At the end of the following script (which actually has timeouts, etc, but were removed in this example. if you think applicable, I can post the whole thing), you will see me performing back-to-back sends first to register and second to do the real request. $loop = \React\EventLoop\Factory::create(); (new \React\Socket\TimeoutConnector(new \React\Socket\Connector($loop), 5, $loop)) ->connect('127.0.0.1:1337') ->then(function (\React\Socket\ConnectionInterface $stream) { $lengthPrefixStream = new LengthPrefixStream($stream); $lengthPrefixStream->on('data', function($data) use ($stream){ syslog(LOG_INFO, 'on data: '.json_encode($data)); if($data['id']===1) { //response that registration was accepted } else { //real response with data } }); $lengthPrefixStream ->send($this->makeJsonRpcRequest('register',$this->logon, 1)->toArray()) ->send($jsonRpcRequest->toArray()); }); Under this scenario, I only see the on data syslog when both requests are complete. I've since changed it to first perform the registration request and only perform the real request after receiving the registration response, and now seem to get the results I wanted/expected. While I think my problem is solved, I still question why I wasn't getting the intermediate responses. Any ideas? The LengthPrefixStream class which you basically wrote is below. On a related, note, if you have any recommendations for the below class, or know why I might have previously been having it implement EventEmitterInterface, please let me know. Thanks <?php namespace NotionCommotion\SocketServer; use Evenement\EventEmitterInterface; use Evenement\EventEmitterTrait; use React\Stream\DuplexStreamInterface; /* Parses stream based on length prefix, and emits data or error if bad JSON is received. */ class LengthPrefixStream {// why => implements EventEmitterInterface { use EventEmitterTrait; private $socket=false, $debug, //If null, do not debug. If zero, don't crop message. Else log and crop. $buffer='', $encryptCBOR=null, $timestamp, $client; //Will be set after object is created public function __construct(DuplexStreamInterface $socket, int $debug=null){ $this->timestamp=time(); $this->socket = $socket; $this->socket->on('data', function($data){ $this->parseBuffer($data); }); $this->debug=$debug; $this->log('on connect with '.$socket->getRemoteAddress()); //$this->socket->on('error', function($error, $stream){$this->log($error, LOG_ERR);}); } public function setClient(AbstractClient $client):self{ $this->client=$client; return $this; } public function getClient():AbstractClient{ return $this->client; } public function getRemoteAddress():string{ return $this->socket->getRemoteAddress(); } public function getLocalAddress():string{ return $this->socket->getLocalAddress(); } public function send(array $msg, string $debug=null):self{ if($this->isConnected()) { $this->log('onSend'.($debug?" ($debug): ":': ').json_encode($msg)); $msg = $this->encode($msg); if(!$this->socket->write(pack("V", strlen($msg)).$msg)) { $this->log("LengthPrefixStream::send() buffer data: $msg", LOG_ERR); } } else { $this->log('LengthPrefixStream::send() not connected (should never happen). '.json_encode($msg), LOG_ERR); } return $this; } private function parseBuffer(string $data):void{ $this->buffer .= $data; do { $checkAgain = false; if(($bufferLength = strlen($this->buffer)) > 3 ){ $length = unpack('Vlen', substr($this->buffer, 0, 4))['len']; if(is_int($length)) { if($bufferLength >= $length + 4){ try { $msg=$this->decode(substr($this->buffer, 4, $length)); $this->log('onData: '.json_encode($msg)); $this->emit('data', [$msg]); } catch(\InvalidArgumentException $e) { $this->log('onError: '.$e->getMessage(), LOG_ERR); $this->emit('errorJson', [$e->getMessage()]); } $this->buffer = substr($this->buffer, $length+4); $checkAgain = strlen($this->buffer)>=4; } else { $this->log('Buffer is 4 or less bytes. Skip.'); } } else { $this->log('LengthPrefixStream - Invalid Prefix', LOG_ERR); $this->emit('errorPrefix', ["Invalid length prefix provided by client: ".substr($this->buffer, 0, 3)]); } } else { $this->log('Buffer less than 3 bytes. Skip.'); } } while ($checkAgain); } public function isConnected():bool { return boolval($this->socket); } public function close():void { $this->log('LengthPrefixStream::close()'); $this->socket->close(); } public function getTimestamp():int { return $this->timestamp; } private function log(string $msg, int $format=LOG_INFO):void { if(!is_null($this->debug) || $format!==LOG_INFO) { $name = $this->client?$this->client->getClientType():'No Client'; $msg=$this->debug?substr($msg, 0, $this->debug):$msg; syslog($format, "Debug ($name): $msg"); } } private function encode(array $msg):string { return $this->encryptCBOR===true?\CBOR\CBOREncoder::encode($msg):json_encode($msg); } private function decode(string $msg):array { if($this->encryptCBOR===true) { if(($rs=\CBOR\CBOREncoder::decode($msg))===false){ throw new \InvalidArgumentException('Invalid CBOR: '.substr($this->buffer, 4, $length)); } } elseif($this->encryptCBOR===false) { $rs=json_decode($msg, true); if (json_last_error() !== JSON_ERROR_NONE){ throw new \InvalidArgumentException('Invalid JSON: '.json_last_error_msg()." (".json_last_error().'): '.substr($this->buffer, 4, $length)); } } else { //First time communication $rs=json_decode($msg, true); if (json_last_error() === JSON_ERROR_NONE){ $this->encryptCBOR=false; } elseif(($rs=\CBOR\CBOREncoder::decode($msg))===false){ throw new \InvalidArgumentException('Invalid JSON: '.json_last_error_msg()." (".json_last_error().'): '.substr($this->buffer, 4, $length)); } else { $this->encryptCBOR=true; } } return $rs; } } Quote Link to comment Share on other sites More sharing options...
Recommended Posts
Join the conversation
You can post now and register later. If you have an account, sign in now to post with your account.