Browse Source

Add async call support to client

- Implement basics of AsyncReply
- Settled down async calls interfaces of client
- Add basic AsyncReply abort functionality support
- Add abort functionality to reply
TODO: Tests are not implemented
Alexey Edelev 5 years ago
parent
commit
b55fb0c64f

+ 6 - 8
examples/addressbook/addressbookengine.cpp

@@ -35,22 +35,20 @@ AddressBookEngine::AddressBookEngine() : QObject()
   , m_client(new AddressBookClient)
   , m_contacts(new ContactsListModel({}, this))
 {
-    Contacts tmp;
     std::shared_ptr<qtprotobuf::AbstractChannel> channel(new qtprotobuf::Http2Channel("localhost", 65001));
     m_client->attachChannel(channel);
-    m_client->getContacts(ListFrame(), tmp);
-    m_contacts->reset(tmp.list());
+    m_client->getContacts(ListFrame(), this, [this](qtprotobuf::AsyncReply *reply) {
+        m_contacts->reset(reply->read<Contacts>().list());
+    });
 }
 
 void AddressBookEngine::addContact(qtprotobuf::examples::Contact *contact)
 {
-    Contacts tmp;
-    m_client->addContact(*contact, tmp);
-    qDebug() << "tmp count:" << tmp.list().count();
-    m_contacts->reset(tmp.list());
+    m_client->addContact(*contact, this, [this](qtprotobuf::AsyncReply *reply) {
+        m_contacts->reset(reply->read<Contacts>().list());
+    });
 }
 
-
 AddressBookEngine::~AddressBookEngine()
 {
     delete m_client;

+ 0 - 2
examples/addressbook/main.cpp

@@ -41,7 +41,6 @@ using namespace qtprotobuf::examples;
 
 int main(int argc, char *argv[])
 {
-
     QCoreApplication::setAttribute(Qt::AA_EnableHighDpiScaling);
     qtprotobuf::QtProtobuf::init();
     Contact::registerTypes();
@@ -51,7 +50,6 @@ int main(int argc, char *argv[])
     PhoneNumber::registerTypes();
 
     qmlRegisterType<ContactsListModel>("examples.addressbook", 1, 0, "ContactsListModel");
-
     QGuiApplication app(argc, argv);
     AddressBookEngine abEngine;
     QQmlApplicationEngine engine;

+ 1 - 1
src/generator/clientgenerator.h

@@ -52,7 +52,7 @@ public:
         printClientClass();
         printPublic();
         printConstructor();
-        printMethodsDeclaration(Templates::ClientMethodDeclarationSyncTemplate, Templates::ClientMethodDeclarationAsyncTemplate);
+        printMethodsDeclaration(Templates::ClientMethodDeclarationSyncTemplate, Templates::ClientMethodDeclarationAsyncTemplate, Templates::ClientMethodDeclarationAsync2Template);
         encloseClass();
         encloseNamespaces();
     }

+ 1 - 0
src/generator/clientsourcegenerator.cpp

@@ -59,6 +59,7 @@ void ClientSourceGenerator::printMethods()
                                                         };
         mPrinter.Print(parameters, Templates::ClientMethodDefinitionSyncTemplate);
         mPrinter.Print(parameters, Templates::ClientMethodDefinitionAsyncTemplate);
+        mPrinter.Print(parameters, Templates::ClientMethodDefinitionAsync2Template);
     }
 }
 

+ 2 - 1
src/generator/servicegeneratorbase.cpp

@@ -72,7 +72,7 @@ void ServiceGeneratorBase::printClassName()
     mPrinter.Print({{"classname", mClassName}}, Templates::NonProtoClassDefinitionTemplate);
 }
 
-void ServiceGeneratorBase::printMethodsDeclaration(const char* methodTemplate, const char* methodAsyncTemplate)
+void ServiceGeneratorBase::printMethodsDeclaration(const char* methodTemplate, const char* methodAsyncTemplate, const char* methodAsync2Template)
 {
     Indent();
     for (int i = 0; i < mService->method_count(); i++) {
@@ -89,6 +89,7 @@ void ServiceGeneratorBase::printMethodsDeclaration(const char* methodTemplate, c
                                                         };
         mPrinter.Print(parameters, methodTemplate);
         mPrinter.Print(parameters, methodAsyncTemplate);
+        mPrinter.Print(parameters, methodAsync2Template);
     }
     Outdent();
 }

+ 1 - 1
src/generator/servicegeneratorbase.h

@@ -50,7 +50,7 @@ public:
 
     void printIncludes();
     void printClassName();
-    void printMethodsDeclaration(const char* methodTemplate, const char* methodAsyncTemplate = "");
+    void printMethodsDeclaration(const char* methodTemplate, const char* methodAsyncTemplate = "", const char *methodAsync2Template = "");
 };
 
 } //namespace generator

+ 12 - 4
src/generator/templates.cpp

@@ -186,7 +186,8 @@ const char *Templates::MapSerializationRegisterTemplate = "qtprotobuf::ProtobufO
 const char *Templates::ClassDefinitionTemplate = "\nclass $classname$ : public $parent_class$\n"
                                                  "{\n";
 const char *Templates::ClientMethodDeclarationSyncTemplate = "Q_INVOKABLE bool $method_name$(const $param_type$ &$param_name$, $return_type$ &$return_name$);\n";
-const char *Templates::ClientMethodDeclarationAsyncTemplate = "Q_INVOKABLE bool $method_name$(const $param_type$ &$param_name$, const qtprotobuf::AsyncReply<$return_type$> &reply);\n";
+const char *Templates::ClientMethodDeclarationAsyncTemplate = "Q_INVOKABLE qtprotobuf::AsyncReply *$method_name$(const $param_type$ &$param_name$);\n";
+const char *Templates::ClientMethodDeclarationAsync2Template = "Q_INVOKABLE void $method_name$(const $param_type$ &$param_name$, const QObject* context, const std::function<void(qtprotobuf::AsyncReply*)> &callback);\n";
 const char *Templates::ServerMethodDeclarationTemplate = "Q_INVOKABLE virtual $return_type$ $method_name$(const $param_type$ &$param_name$) = 0;\n";
 
 
@@ -197,11 +198,18 @@ const char *Templates::ClientMethodDefinitionSyncTemplate = "\nbool $classname$:
                                                             "{\n"
                                                             "    return call(\"$method_name$\", $param_name$, $return_name$);\n"
                                                             "}\n";
-const char *Templates::ClientMethodDefinitionAsyncTemplate = "\nbool $classname$::$method_name$(const $param_type$ &$param_name$, const qtprotobuf::AsyncReply<$return_type$> &reply)\n"
+const char *Templates::ClientMethodDefinitionAsyncTemplate = "\nqtprotobuf::AsyncReply *$classname$::$method_name$(const $param_type$ &$param_name$)\n"
                                                              "{\n"
-                                                             "    //TODO: call transport method to serialize this method\n"
-                                                             "    return false;\n"
+                                                             "    return call(\"$method_name$\", $param_name$);\n"
                                                              "}\n";
+const char *Templates::ClientMethodDefinitionAsync2Template = "\nvoid $classname$::$method_name$(const $param_type$ &$param_name$, const QObject* context, const std::function<void(AsyncReply*)> &callback)\n"
+                                                              "{\n"
+                                                              "    qtprotobuf::AsyncReply* reply = call(\"$method_name$\", $param_name$);\n"
+                                                              "    QObject::connect(reply, &qtprotobuf::AsyncReply::finished, context, [reply, callback](){\n"
+                                                              "        callback(reply);\n"
+                                                              "    });\n"
+                                                              "}\n";
+
 const char *Templates::SerializersTemplate = "Q_DECLARE_PROTOBUF_SERIALIZERS($classname$)\n";
 const char *Templates::RegisterSerializersTemplate = "qtprotobuf::ProtobufObjectPrivate::registerSerializers<$classname$>();\n";
 const char *Templates::QmlRegisterTypeTemplate = "qmlRegisterType<$namespaces$::$classname$>(\"$package$\", 1, 0, \"$classname$\");\n";

+ 2 - 0
src/generator/templates.h

@@ -116,10 +116,12 @@ public:
 
     static const char *ClientMethodDeclarationSyncTemplate;
     static const char *ClientMethodDeclarationAsyncTemplate;
+    static const char *ClientMethodDeclarationAsync2Template;
     static const char *ServerMethodDeclarationTemplate;
 
     static const char *ClientMethodDefinitionSyncTemplate;
     static const char *ClientMethodDefinitionAsyncTemplate;
+    static const char *ClientMethodDefinitionAsync2Template;
 
     static const std::unordered_map<::google::protobuf::FieldDescriptor::Type, std::string> TypeReflection;
 };

+ 0 - 7
src/grpc/abstractchannel.cpp

@@ -24,10 +24,3 @@
  */
 
 #include "abstractchannel.h"
-
-using namespace qtprotobuf;
-
-AbstractChannel::AbstractChannel()
-{
-
-}

+ 17 - 1
src/grpc/abstractchannel.h

@@ -30,6 +30,8 @@
 
 namespace qtprotobuf {
 
+class AsyncReply;
+
 class AbstractChannel
 {
 public:
@@ -57,11 +59,25 @@ public:
     };
 
     virtual StatusCodes call(const QString &method, const QString &service, const QByteArray &args, QByteArray &ret) = 0;
+    /*!
+     * \brief call
+     * \param method
+     * \param service
+     * \param args
+     * \param ret AsyncReply that will be returned to end-point user to read data once ready. AsyncReply owned by AbstractClient only.
+     * \return
+     */
+    virtual void call(const QString &method, const QString &service, const QByteArray &args, qtprotobuf::AsyncReply *ret) = 0;
 
 protected:
-    AbstractChannel();
+    AbstractChannel() = default;
     virtual ~AbstractChannel() = default;
 
+    virtual void abort(AsyncReply *) {
+        assert("Abort is not supported by used channel");
+    }
+
+    friend class AsyncReply;
 private:
     Q_DISABLE_COPY(AbstractChannel)
 };

+ 41 - 0
src/grpc/abstractclient.cpp

@@ -25,6 +25,8 @@
 
 #include "abstractclient.h"
 
+#include <QTimer>
+
 namespace qtprotobuf {
 struct AbstractClientPrivate final {
     AbstractClientPrivate(const QString &service) : service(service) {}
@@ -32,6 +34,7 @@ struct AbstractClientPrivate final {
     std::shared_ptr<AbstractChannel> channel;
     const QString service;
     AbstractChannel::StatusCodes lastError;
+    QString lastErrorString;
 };
 }
 
@@ -63,3 +66,41 @@ bool AbstractClient::call(const QString &method, const QByteArray& arg, QByteArr
     d->lastError = d->channel->call(method, d->service, arg, ret);
     return d->lastError == AbstractChannel::Ok;
 }
+
+AsyncReply *AbstractClient::call(const QString &method, const QByteArray& arg)
+{
+    AsyncReply *reply = new AsyncReply(d->channel);
+
+    if (!d->channel) {
+        d->lastError = AbstractChannel::Unknown;
+        d->lastErrorString = "No channel attached";
+        QTimer::singleShot(0, this, [reply]() {
+            reply->error(AbstractChannel::Unknown);
+            reply->deleteLater();
+        });
+        return reply;
+    }
+
+    connect(reply, &AsyncReply::error, this, [this, reply](AbstractChannel::StatusCodes statusCode){
+        d->lastError = statusCode;
+        reply->deleteLater();
+    });
+
+    connect(reply, &AsyncReply::finished, this, [this, reply](){
+        d->lastError = AbstractChannel::Ok;
+        reply->deleteLater();
+    });
+
+    d->channel->call(method, d->service, arg, reply);
+    return reply;
+}
+
+AbstractChannel::StatusCodes AbstractClient::lastError() const
+{
+    return d->lastError;
+}
+
+QString AbstractClient::lastErrorString() const
+{
+    return d->lastErrorString;
+}

+ 11 - 1
src/grpc/abstractclient.h

@@ -31,17 +31,21 @@
 #include <qtprotobuflogging.h>
 
 #include "abstractchannel.h"
+#include "asyncreply.h"
 
 namespace qtprotobuf {
 
 class AbstractChannel;
 class AbstractClientPrivate;
 
-class AbstractClient : public QObject //TODO: QObject is not really required yet
+class AbstractClient : public QObject
 {
 public:
     void attachChannel(std::shared_ptr<AbstractChannel> channel);
 
+    AbstractChannel::StatusCodes lastError() const;
+    QString lastErrorString() const;
+
 protected:
     AbstractClient(const QString &service, QObject *parent = nullptr);
     virtual ~AbstractClient();
@@ -66,8 +70,14 @@ protected:
         return false;
     }
 
+    template<typename A>
+    AsyncReply *call(const QString &method, const A &arg) {
+        return call(method, arg.serialize());
+    }
+
 private:
     bool call(const QString &method, const QByteArray& arg, QByteArray& ret);
+    AsyncReply *call(const QString &method, const QByteArray& arg);
 
     Q_DISABLE_COPY(AbstractClient)
 

+ 10 - 0
src/grpc/asyncreply.cpp

@@ -24,4 +24,14 @@
  */
 
 #include "asyncreply.h"
+#include <qtprotobuflogging.h>
 
+using namespace qtprotobuf;
+
+AsyncReply::~AsyncReply()
+{
+    qProtoDebug() << "Trying ~AsyncReply" << this;
+    QMutexLocker locker(&m_asyncLock);
+    qProtoDebug() << "~AsyncReply" << this;
+    (void)locker;
+}

+ 31 - 24
src/grpc/asyncreply.h

@@ -26,43 +26,50 @@
 #pragma once
 
 #include <functional>
+#include <QPointer>
+#include <QMutex>
+#include <memory>
+
+#include "abstractchannel.h"
 
 namespace qtprotobuf {
 
-template <typename M>
-class AsyncReply final
+class AsyncReply final : public QObject
 {
-    AsyncReply();
+    Q_OBJECT
 public:
-    AsyncReply(const std::function<void(const M&)> &callback) : m_callback(callback) {}
-    AsyncReply(AsyncReply &&other) {
-        m_callback = std::move(other.m_callback);
+    void abort() {
+        m_channel->abort(this);
     }
 
-    AsyncReply(const AsyncReply &other) {
-        m_callback = other.m_callback;
+    template <typename T>
+    T read() {
+        QMutexLocker locker(&m_asyncLock);
+        T value;
+        value.deserialize(m_data);
+        return value;
     }
 
-    AsyncReply& operator=(AsyncReply &&other) {
-        m_callback = std::move(other.m_callback);
-        return *this;
-    }
-
-    AsyncReply& operator=(const AsyncReply &other) {
-        m_callback = other.m_callback;
-        return *this;
-    }
+    void setData(const QByteArray &data) { m_data = data; }
 
-    ~AsyncReply() {}
+signals:
+    void finished();
+    void error(AbstractChannel::StatusCodes);
 
-    void operator()(const M& value) {
-        if (m_callback) {
-            m_callback(value);
-        }
-    }
+protected:
+    AsyncReply(const std::shared_ptr<AbstractChannel> &channel, QObject* parent = nullptr) : QObject(parent) {}
+    ~AsyncReply();
 
 private:
-    std::function<void(const M&)> m_callback;
+    AsyncReply();
+    Q_DISABLE_COPY(AsyncReply)
+
+    friend class AbstractClient;
+
+    std::shared_ptr<AbstractChannel> m_channel;
+    QByteArray m_data;
+
+    QMutex m_asyncLock;
 };
 
 }

+ 89 - 52
src/grpc/http2channel.cpp

@@ -35,6 +35,7 @@
 #include <QEventLoop>
 #include <QTimer>
 #include <QtEndian>
+#include "asyncreply.h"
 
 #include <unordered_map>
 
@@ -42,15 +43,6 @@
 
 using namespace qtprotobuf;
 
-namespace qtprotobuf {
-
-struct Http2ChannelPrivate {
-    QUrl url;
-    QNetworkAccessManager nm;
-};
-
-}
-
 namespace  {
 const static std::unordered_map<QNetworkReply::NetworkError, AbstractChannel::StatusCodes> StatusCodeMap = { { QNetworkReply::ConnectionRefusedError, AbstractChannel::Unavailable },
                                                                 { QNetworkReply::RemoteHostClosedError, AbstractChannel::Unavailable },
@@ -92,6 +84,59 @@ const char *TEHeader = "te";
 const char *GrpcStatusHeader = "grpc-status";
 }
 
+namespace qtprotobuf {
+
+struct Http2ChannelPrivate {
+    QUrl url;
+    QNetworkAccessManager nm;
+    QNetworkReply* post(const QString &method, const QString &service, const QByteArray &args) {
+        QUrl callUrl = url;
+        callUrl.setPath("/" + service + "/" + method);
+
+        qProtoDebug() << "Service call url: " << callUrl;
+
+        QNetworkRequest request(callUrl);
+        request.setHeader(QNetworkRequest::ContentTypeHeader, "application/grpc");
+        request.setRawHeader(GrpcAcceptEncodingHeader, "identity,deflate,gzip");
+        request.setRawHeader(AcceptEncodingHeader, "identity,gzip");
+        request.setRawHeader(TEHeader, "trailers");
+
+        request.setAttribute(QNetworkRequest::Http2DirectAttribute, true);
+
+        QByteArray msg(5, '\0');
+        *(unsigned int*)(msg.data() + 1) = qToBigEndian(args.size());
+        msg += args;
+        qProtoDebug() << "SEND: " << msg.toHex();
+
+        QNetworkReply* networkReply = nm.post(request, msg);
+
+        //TODO: Add configurable timeout logic
+        QTimer::singleShot(6000, networkReply, &QNetworkReply::abort);
+        return networkReply;
+    }
+
+    static QByteArray processReply(QNetworkReply* networkReply, AbstractChannel::StatusCodes& statusCode) {
+        //Check if no network error occured
+        if (networkReply->error() != QNetworkReply::NoError) {
+            qProtoWarning() << "Network error occured" << networkReply->errorString();
+            statusCode = StatusCodeMap.at(networkReply->error());
+            return {};
+        }
+
+        //Check if server answer with error
+        statusCode = static_cast<AbstractChannel::StatusCodes>(networkReply->rawHeader(GrpcStatusHeader).toInt());
+        if (statusCode != AbstractChannel::StatusCodes::Ok) {
+            qProtoWarning() << "Protobuf server error occured" << networkReply->errorString();
+            return {};
+        }
+
+        //Message size doesn't matter for now
+        return networkReply->readAll().mid(5);
+    }
+};
+
+}
+
 Http2Channel::Http2Channel(const QString &addr, quint16 port) : AbstractChannel()
   , d(new Http2ChannelPrivate)
 {
@@ -107,56 +152,48 @@ Http2Channel::~Http2Channel()
 
 AbstractChannel::StatusCodes Http2Channel::call(const QString &method, const QString &service, const QByteArray &args, QByteArray &ret)
 {
-    QUrl callUrl = d->url;
-    callUrl.setPath("/" + service + "/" + method);
-
-    qProtoDebug() << "Service call url: " << callUrl;
-    QNetworkRequest request(callUrl);
-    request.setHeader(QNetworkRequest::ContentTypeHeader, "application/grpc");
-    request.setRawHeader(GrpcAcceptEncodingHeader, "identity,deflate,gzip");
-    request.setRawHeader(AcceptEncodingHeader, "identity,gzip");
-    request.setRawHeader(TEHeader, "trailers");
-
-    request.setAttribute(QNetworkRequest::Http2DirectAttribute, true);
-
-    QByteArray msg(5, '\0');
-    *(unsigned int*)(msg.data() + 1) = qToBigEndian(args.size());
-    msg += args;
-    qProtoDebug() << "SEND: " << msg.toHex();
-    QNetworkReply *reply = d->nm.post(request, msg);
     QEventLoop loop;
-    QTimer timer;
-    loop.connect(&timer, &QTimer::timeout, &loop, &QEventLoop::quit);
-    loop.connect(reply, &QNetworkReply::finished, &loop, &QEventLoop::quit);
 
-    //TODO: Add configurable timeout logic
-    timer.setInterval(1000);
-    timer.start();
+    QNetworkReply *networkReply = d->post(method, service, args);
+    QObject::connect(networkReply, &QNetworkReply::finished, &loop, &QEventLoop::quit);
 
-    if (!reply->isFinished()) {
+    //If reply was finished in same stack it doesn't matter to start event loop
+    if (!networkReply->isFinished()) {
         loop.exec();
+    } else {
+        return AbstractChannel::DeadlineExceeded;
     }
 
-    timer.stop();
+    StatusCodes grpcStatus = StatusCodes::Unknown;
+    ret = d->processReply(networkReply, grpcStatus);
 
-    //Check if no network error occured
-    if (reply->error() != QNetworkReply::NoError) {
-        return StatusCodeMap.at(reply->error());
-    }
-
-    //Check if response timeout triggered
-    if (!reply->isFinished()) {
-        reply->abort();
-        return AbstractChannel::DeadlineExceeded;
-    }
+    qProtoDebug() << __func__ << "RECV: " << ret.toHex();
+    return grpcStatus;
+}
 
-    //Check if server answer with error
-    StatusCodes grpcStatus = static_cast<StatusCodes>(reply->rawHeader(GrpcStatusHeader).toInt());
-    if (grpcStatus != StatusCodes::Ok) {
-        return grpcStatus;
-    }
+void Http2Channel::call(const QString &method, const QString &service, const QByteArray &args, qtprotobuf::AsyncReply *reply)
+{
+    QNetworkReply *networkReply = d->post(method, service, args);
+
+    QObject::connect(networkReply, &QNetworkReply::finished, reply, [this, reply, networkReply]() {
+        StatusCodes grpcStatus = StatusCodes::Unknown;
+        QByteArray data = Http2ChannelPrivate::processReply(networkReply, grpcStatus);
+        qProtoDebug() << "RECV: " << data;
+        if (grpcStatus != StatusCodes::Ok) {
+            reply->setData({});
+            reply->error(grpcStatus);
+            reply->finished();
+            return;
+        }
+        reply->setData(data);
+        reply->finished();
+    });
+}
 
-    ret = reply->readAll();
-    qProtoDebug() << "RECV: " << ret.toHex();
-    return StatusCodes::Ok;
+void Http2Channel::abort(AsyncReply *reply)
+{
+    assert(reply != nullptr);
+    reply->setData({});
+    reply->error(StatusCodes::Aborted);
+    reply->finished();
 }

+ 4 - 0
src/grpc/http2channel.h

@@ -41,6 +41,10 @@ public:
     ~Http2Channel();
 
     StatusCodes call(const QString &method, const QString &service, const QByteArray &args, QByteArray &ret) override;
+    void call(const QString &method, const QString &service, const QByteArray &args, qtprotobuf::AsyncReply *reply) override;
+
+protected:
+    void abort(AsyncReply *reply) override;
 
 private:
     Q_DISABLE_COPY(Http2Channel)

+ 2 - 2
tests/test_grpc/clienttest.cpp

@@ -44,12 +44,12 @@ protected:
 
 TEST_F(ClientTest, CheckMethodsGeneration)
 {
-    //Dummy compile time check of functions generation
+    //Dummy compile time check of functions generation and interface compatibility
     TestServiceClient testClient;
     SimpleStringMessage result;
     SimpleStringMessage request;
     testClient.testMethod(result, request);
-    testClient.testMethod(result, std::function<void(const SimpleStringMessage&)>([](const SimpleStringMessage&){}));
+    testClient.testMethod(result);
 }
 
 TEST_F(ClientTest, StringEchoTest)