Browse Source

Fix issue with glued messages

- Add iteration on incomming stream buffer,
  to read all messages glued in single http2
  packet
Alexey Edelev 5 years ago
parent
commit
a7128b6450
1 changed files with 20 additions and 7 deletions
  1. 20 7
      src/grpc/qgrpchttp2channel.cpp

+ 20 - 7
src/grpc/qgrpchttp2channel.cpp

@@ -89,6 +89,7 @@ const char *AcceptEncodingHeader = "accept-encoding";
 const char *TEHeader = "te";
 const char *GrpcStatusHeader = "grpc-status";
 const char *GrpcStatusMessage = "grpc-message";
+const int GRpcMessageSizeHeaderSize = 5;
 }
 
 namespace QtProtobuf {
@@ -123,7 +124,7 @@ struct QGrpcHttp2ChannelPrivate {
 
         request.setAttribute(QNetworkRequest::Http2DirectAttribute, true);
 
-        QByteArray msg(5, '\0');
+        QByteArray msg(GRpcMessageSizeHeaderSize, '\0');
         *(int *)(msg.data() + 1) = qToBigEndian(args.size());
         msg += args;
         qProtoDebug() << "SEND: " << msg.size();
@@ -166,7 +167,7 @@ struct QGrpcHttp2ChannelPrivate {
         }
 
         //Message size doesn't matter for now
-        return networkReply->readAll().mid(5);
+        return networkReply->readAll().mid(GRpcMessageSizeHeaderSize);
     }
 
     QGrpcHttp2ChannelPrivate(const QUrl &_url, const AbstractCredentials &_credentials)
@@ -183,6 +184,10 @@ struct QGrpcHttp2ChannelPrivate {
         }
     }
 
+    static int getExpectedDataSize(const QByteArray &container) {
+        return qFromBigEndian(*(int *)(container.data() + 1)) + GRpcMessageSizeHeaderSize;
+    }
+
     QObject lambdaContext;
 };
 
@@ -252,7 +257,7 @@ void QGrpcHttp2Channel::subscribe(const QString &method, const QString &service,
 
         if (replyIt == d_ptr->activeStreamReplies.end()) {
             qProtoDebug() << data.toHex();
-            int expectedDataSize = qFromBigEndian(*(int *)(data.data() + 1)) + 5;
+            int expectedDataSize = QGrpcHttp2ChannelPrivate::getExpectedDataSize(data);
             qProtoDebug() << "First chunk received: " << data.size() << " expectedDataSize: " << expectedDataSize;
 
             if (expectedDataSize == 0) {
@@ -261,17 +266,25 @@ void QGrpcHttp2Channel::subscribe(const QString &method, const QString &service,
             }
 
             QGrpcHttp2ChannelPrivate::ExpectedData dataContainer{expectedDataSize, QByteArray{}};
-            d_ptr->activeStreamReplies.insert({networkReply, dataContainer});
-            replyIt = d_ptr->activeStreamReplies.find(networkReply);
+            replyIt = d_ptr->activeStreamReplies.insert({networkReply, dataContainer}).first;
         }
 
         QGrpcHttp2ChannelPrivate::ExpectedData &dataContainer = replyIt->second;
         dataContainer.container.append(data);
 
         qProtoDebug() << "Proceed chunk: " << data.size() << " dataContainer: " << dataContainer.container.size() << " capacity: " << dataContainer.expectedSize;
-        if (dataContainer.container.size() == dataContainer.expectedSize) {
+        while (dataContainer.container.size() >= dataContainer.expectedSize) {
             qProtoDebug() << "Full data received: " << data.size() << " dataContainer: " << dataContainer.container.size() << " capacity: " << dataContainer.expectedSize;
-            handler(dataContainer.container.mid(5));
+            handler(dataContainer.container.mid(GRpcMessageSizeHeaderSize, dataContainer.expectedSize - GRpcMessageSizeHeaderSize));
+            dataContainer.container.remove(0, dataContainer.expectedSize);
+            if (dataContainer.container.size() > GRpcMessageSizeHeaderSize) {
+                dataContainer.expectedSize = QGrpcHttp2ChannelPrivate::getExpectedDataSize(dataContainer.container);
+            } else {
+                qProtoWarning() << "Invalid container size received, size header is less than 5 bytes";
+            }
+        }
+
+        if (dataContainer.container.size() < GRpcMessageSizeHeaderSize) {
             d_ptr->activeStreamReplies.erase(replyIt);
         }
     });