Browse Source

Fix issue with streaming when data is sent partially #117

- Implement data chunks download for streaming
- Add and update tests
Alexey Edelev 6 years ago
parent
commit
933e22e6a1

+ 1 - 1
CMakeLists.txt

@@ -117,7 +117,7 @@ if(MAKE_TESTS)
         #Set  path to GTest build libraries
         set(GTEST_BOTH_LIBRARIES "")
 
-        #Set  path to GTest include directory
+        #Set path to GTest include directory
         include_directories(${GTEST_INCLUDE_PATHS} "/")
         link_directories(${GTEST_BOTH_LIBRARIES})
     endif()

+ 52 - 14
src/grpc/http2channel.cpp

@@ -90,10 +90,17 @@ const char *GrpcStatusHeader = "grpc-status";
 namespace qtprotobuf {
 
 struct Http2ChannelPrivate {
+    struct ExpectedData {
+        int expectedSize;
+        QByteArray container;
+    };
+
     QUrl url;
     QNetworkAccessManager nm;
     AbstractCredentials credentials;
     QSslConfiguration sslConfig;
+    std::unordered_map<QNetworkReply*, ExpectedData> activeStreamReplies;
+
     QNetworkReply* post(const QString &method, const QString &service, const QByteArray &args, bool stream = false) {
         QUrl callUrl = url;
         callUrl.setPath("/" + service + "/" + method);
@@ -113,15 +120,15 @@ struct Http2ChannelPrivate {
         request.setAttribute(QNetworkRequest::Http2DirectAttribute, true);
 
         QByteArray msg(5, '\0');
-        *(unsigned int*)(msg.data() + 1) = qToBigEndian(args.size());
+        *(int*)(msg.data() + 1) = qToBigEndian(args.size());
         msg += args;
-        qProtoDebug() << "SEND: " << msg.toHex();
+        qProtoDebug() << "SEND: " << msg.size();
 
         QNetworkReply* networkReply = nm.post(request, msg);
 
         if (!stream) {
             //TODO: Add configurable timeout logic
-            QTimer::singleShot(6000, networkReply, [networkReply]() {
+            QTimer::singleShot(6000, networkReply, [networkReply, this]() {
                 Http2ChannelPrivate::abortNetworkReply(networkReply);
             });
         }
@@ -164,6 +171,7 @@ struct Http2ChannelPrivate {
             url.setScheme("https");
         }
     }
+
 };
 
 }
@@ -187,11 +195,9 @@ AbstractChannel::StatusCodes Http2Channel::call(const QString &method, const QSt
     QNetworkReply *networkReply = d->post(method, service, args);
     QObject::connect(networkReply, &QNetworkReply::finished, &loop, &QEventLoop::quit);
 
-    //If reply was finished in same stack it doesn't matter to start event loop
+    //If reply was finished in same stack it doesn't make sense to start event loop
     if (!networkReply->isFinished()) {
         loop.exec();
-    } else {
-        return AbstractChannel::DeadlineExceeded;
     }
 
     StatusCodes grpcStatus = StatusCodes::Unknown;
@@ -229,19 +235,51 @@ void Http2Channel::subscribe(const QString &method, const QString &service, cons
 {
     QNetworkReply *networkReply = d->post(method, service, args, true);
 
-    auto connection = QObject::connect(networkReply, &QNetworkReply::readyRead, client, [networkReply, handler]() {
-        handler(networkReply->readAll().mid(5));
+    auto connection = QObject::connect(networkReply, &QNetworkReply::readyRead, client, [networkReply, handler, this]() {
+        auto replyIt = d->activeStreamReplies.find(networkReply);
+
+        QByteArray data = networkReply->readAll();
+        qProtoDebug() << "RECV" << data.size();
+
+        if (replyIt == d->activeStreamReplies.end()) {
+            qProtoDebug() << data.toHex();
+            int expectedDataSize = qFromBigEndian(*(int*)(data.data() + 1)) + 5;
+            qProtoDebug() << "First chunk received: " << data.size() << " expectedDataSize: " << expectedDataSize;
+
+            if (expectedDataSize == 0) {
+                handler(QByteArray());
+                return;
+            }
+
+            Http2ChannelPrivate::ExpectedData dataContainer{expectedDataSize, QByteArray{}};
+            d->activeStreamReplies.insert({networkReply, dataContainer});
+            replyIt = d->activeStreamReplies.find(networkReply);
+        }
+
+        Http2ChannelPrivate::ExpectedData &dataContainer = replyIt->second;
+        dataContainer.container.append(data);
+
+        qProtoDebug() << "Proceed chunk: " << data.size() << " dataContainer: " << dataContainer.container.size() << " capacity: " << dataContainer.expectedSize;
+        if (dataContainer.container.size() == dataContainer.expectedSize) {
+            qProtoDebug() << "Full data received: " << data.size() << " dataContainer: " << dataContainer.container.size() << " capacity: " << dataContainer.expectedSize;
+            handler(dataContainer.container.mid(5));
+            d->activeStreamReplies.erase(replyIt);
+        }
     });
 
-    QObject::connect(client, &AbstractClient::destroyed, networkReply, [client, networkReply, connection](){
+    QObject::connect(client, &AbstractClient::destroyed, networkReply, [client, networkReply, connection, this](){
+        d->activeStreamReplies.erase(networkReply);
         QObject::disconnect(connection);
         Http2ChannelPrivate::abortNetworkReply(networkReply);
     });
-//TODO: implement error handling
-//    QObject::connect(networkReply, &QNetworkReply::error, networkReply, [networkReply, connection](QNetworkReply::NetworkError) {
-//        QObject::disconnect(connection);
-//        Http2ChannelPrivate::abortNetworkReply(networkReply);
-//    });
+
+    //TODO: seems this connection might be invalid in case if this destroyed.
+    //Think about correct handling of this situation
+    QObject::connect(networkReply, &QNetworkReply::finished, [networkReply, this]() {
+        d->activeStreamReplies.erase(networkReply);
+        //TODO: implement error handling and subscription recovery here
+        Http2ChannelPrivate::abortNetworkReply(networkReply);
+    });
 }
 
 void Http2Channel::abort(AsyncReply *reply)

+ 3 - 0
tests/test_grpc/CMakeLists.txt

@@ -1,9 +1,12 @@
 configure_file(test_driver.sh.in test_driver.sh @ONLY)
 
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/testfile ${CMAKE_CURRENT_BINARY_DIR}/testfile COPYONLY)
+
 set(TARGET qtgrpc_test)
 set(GENERATED_HEADERS
     simplestringmessage.h
     testserviceclient.h
+    blobmessage.h
 #    testserviceserver.h
 )
 

+ 33 - 0
tests/test_grpc/clienttest.cpp

@@ -27,8 +27,11 @@
 #include "http2channel.h"
 #include "qtprotobuf.h"
 #include "insecurecredentials.h"
+#include "blobmessage.h"
 
 #include <QTimer>
+#include <QFile>
+#include <QCryptographicHash>
 
 #include <QCoreApplication>
 #include <gtest/gtest.h>
@@ -227,3 +230,33 @@ TEST_F(ClientTest, StringEchoStreamTestRetUpdates)
     ASSERT_STREQ(result.testFieldString().toStdString().c_str(), "Stream4");
     ASSERT_EQ(testClient.lastError(), AbstractChannel::StatusCodes::Ok);
 }
+
+TEST_F(ClientTest, HugeBlobEchoStreamTest)
+{
+    int argc = 0;
+    QCoreApplication app(argc, nullptr);
+    TestServiceClient testClient;
+    testClient.attachChannel(std::make_shared<Http2Channel>("localhost", 50051, InsecureCredentials()));
+    BlobMessage result;
+    BlobMessage request;
+    QFile testFile("testfile");
+    ASSERT_TRUE(testFile.open(QFile::ReadOnly));
+
+    request.setTestBytes(testFile.readAll());
+    QByteArray dataHash = QCryptographicHash::hash(request.testBytes(), QCryptographicHash::Sha256);
+    QEventLoop waiter;
+
+    QObject::connect(&testClient, &TestServiceClient::testMethodBlobServerStreamUpdated, &app, [&result, &waiter](const BlobMessage& ret) {
+        result.setTestBytes(ret.testBytes());
+        waiter.quit();
+    });
+
+    testClient.subscribeTestMethodBlobServerStreamUpdates(request);
+
+    QTimer::singleShot(20000, &waiter, &QEventLoop::quit);
+    waiter.exec();
+
+    QByteArray returnDataHash = QCryptographicHash::hash(result.testBytes(), QCryptographicHash::Sha256);
+    ASSERT_TRUE(returnDataHash == dataHash);
+    ASSERT_EQ(testClient.lastError(), AbstractChannel::StatusCodes::Ok);
+}

+ 10 - 0
tests/test_grpc/echoserver/main.cpp

@@ -46,6 +46,16 @@ public:
 
         return ::grpc::Status();
     }
+
+    ::grpc::Status testMethodBlobServerStream(grpc::ServerContext *, const qtprotobufnamespace::tests::BlobMessage *request,
+                                          ::grpc::ServerWriter<qtprotobufnamespace::tests::BlobMessage> *writer) override
+    {
+        std::cerr << "testMethodBlobServerStream called" << std::endl;
+        qtprotobufnamespace::tests::BlobMessage msg;
+        msg.set_allocated_testbytes(new std::string(request->testbytes()));
+        writer->Write(msg);
+        return ::grpc::Status();
+    }
 };
 
 int main(int, char *[])

+ 3 - 0
tests/test_grpc/proto/simpletest.proto

@@ -6,3 +6,6 @@ message SimpleStringMessage {
     string testFieldString = 6;
 }
 
+message BlobMessage {
+    bytes testBytes = 1;
+}

+ 1 - 0
tests/test_grpc/proto/testservice.proto

@@ -9,4 +9,5 @@ service TestService {
   rpc testMethodServerStream(SimpleStringMessage) returns (stream SimpleStringMessage) {}
   rpc testMethodClientStream(stream SimpleStringMessage) returns (SimpleStringMessage) {}
   rpc testMethodBiStream(stream SimpleStringMessage) returns (stream SimpleStringMessage) {}
+  rpc testMethodBlobServerStream(BlobMessage) returns (stream BlobMessage) {}
 } 

BIN
tests/test_grpc/testfile