Selaa lähdekoodia

Apply thread safety to grpc calls and subscription

- Replace return value of gRPC call functionaly by shared pointers
- Restrict channel and client created in same thread
- Add tests

Fixes: #43
Alexey Edelev 4 vuotta sitten
vanhempi
commit
b95a949a36

+ 1 - 1
examples/addressbook/addressbookengine.cpp

@@ -57,7 +57,7 @@ AddressBookEngine::AddressBookEngine() : QObject()
                                                                                       QtProtobuf::QGrpcUserPasswordCredentials<>("authorizedUser", QCryptographicHash::hash("test", QCryptographicHash::Md5).toHex())));
     m_client->attachChannel(channel);
     auto subscription = m_client->subscribeContactsUpdates(ListFrame());
-    connect(subscription, &QtProtobuf::QGrpcSubscription::updated, this, [this, subscription]() {
+    connect(subscription.get(), &QtProtobuf::QGrpcSubscription::updated, this, [this, subscription]() {
         m_contacts->reset(subscription->read<Contacts>().list());
     });
     m_client->subscribeCallStatusUpdates(qtprotobuf::examples::None(), QPointer<CallStatus>(&m_callStatus));

+ 3 - 3
examples/simplechat/simplechatengine.cpp

@@ -73,12 +73,12 @@ void SimpleChatEngine::login(const QString &name, const QString &password)
                                                                                       QtProtobuf::QGrpcUserPasswordCredentials<>(name, QCryptographicHash::hash(password.toUtf8(), QCryptographicHash::Md5).toHex())));
 
     m_client->attachChannel(channel);
-    QtProtobuf::QGrpcSubscription *subscription = m_client->subscribeMessageListUpdates(None());
-    QObject::connect(subscription, &QtProtobuf::QGrpcSubscription::error, this, [subscription] {
+    QtProtobuf::QGrpcSubscriptionShared subscription = m_client->subscribeMessageListUpdates(None());
+    QObject::connect(subscription.get(), &QtProtobuf::QGrpcSubscription::error, this, [subscription] {
         qCritical() << "Subscription error, cancel";
         subscription->cancel();
     });
-    QObject::connect(subscription, &QtProtobuf::QGrpcSubscription::updated, this, [this, name, subscription]() {
+    QObject::connect(subscription.get(), &QtProtobuf::QGrpcSubscription::updated, this, [this, name, subscription]() {
         if (m_userName != name) {
             m_userName = name;
             userNameChanged();

+ 13 - 13
src/generator/templates.cpp

@@ -284,8 +284,8 @@ const char *Templates::ClassDefinitionTemplate = "\nclass $classname$ : public $
                                                  "{\n";
 const char *Templates::QObjectMacro = "Q_OBJECT";
 const char *Templates::ClientMethodDeclarationSyncTemplate = "QtProtobuf::QGrpcStatus $method_name$(const $param_type$ &$param_name$, const QPointer<$return_type$> &$return_name$);\n";
-const char *Templates::ClientMethodDeclarationAsyncTemplate = "QtProtobuf::QGrpcAsyncReply *$method_name$(const $param_type$ &$param_name$);\n";
-const char *Templates::ClientMethodDeclarationAsync2Template = "Q_INVOKABLE void $method_name$(const $param_type$ &$param_name$, const QObject *context, const std::function<void(QtProtobuf::QGrpcAsyncReply *)> &callback);\n";
+const char *Templates::ClientMethodDeclarationAsyncTemplate = "QtProtobuf::QGrpcAsyncReplyShared $method_name$(const $param_type$ &$param_name$);\n";
+const char *Templates::ClientMethodDeclarationAsync2Template = "Q_INVOKABLE void $method_name$(const $param_type$ &$param_name$, const QObject *context, const std::function<void(QtProtobuf::QGrpcAsyncReplyShared)> &callback);\n";
 const char *Templates::ClientMethodDeclarationQmlTemplate = "Q_INVOKABLE void $method_name$($param_type$ *$param_name$, const QJSValue &callback, const QJSValue &errorCallback);\n";
 
 const char *Templates::ServerMethodDeclarationTemplate = "Q_INVOKABLE virtual $return_type$ $method_name$(const $param_type$ &$param_name$) = 0;\n";
@@ -298,14 +298,14 @@ const char *Templates::ClientMethodDefinitionSyncTemplate = "\nQtProtobuf::QGrpc
                                                             "{\n"
                                                             "    return call(\"$method_name$\", $param_name$, $return_name$);\n"
                                                             "}\n";
-const char *Templates::ClientMethodDefinitionAsyncTemplate = "\nQtProtobuf::QGrpcAsyncReply *$classname$::$method_name$(const $param_type$ &$param_name$)\n"
+const char *Templates::ClientMethodDefinitionAsyncTemplate = "\nQtProtobuf::QGrpcAsyncReplyShared $classname$::$method_name$(const $param_type$ &$param_name$)\n"
                                                              "{\n"
                                                              "    return call(\"$method_name$\", $param_name$);\n"
                                                              "}\n";
-const char *Templates::ClientMethodDefinitionAsync2Template = "\nvoid $classname$::$method_name$(const $param_type$ &$param_name$, const QObject *context, const std::function<void(QGrpcAsyncReply *)> &callback)\n"
+const char *Templates::ClientMethodDefinitionAsync2Template = "\nvoid $classname$::$method_name$(const $param_type$ &$param_name$, const QObject *context, const std::function<void(QGrpcAsyncReplyShared)> &callback)\n"
                                                               "{\n"
-                                                              "    QtProtobuf::QGrpcAsyncReply *reply = call(\"$method_name$\", $param_name$);\n"
-                                                              "    QObject::connect(reply, &QtProtobuf::QGrpcAsyncReply::finished, context, [reply, callback]() {\n"
+                                                              "    QtProtobuf::QGrpcAsyncReplyShared reply = call(\"$method_name$\", $param_name$);\n"
+                                                              "    QObject::connect(reply.get(), &QtProtobuf::QGrpcAsyncReply::finished, context, [reply, callback]() {\n"
                                                               "        callback(reply);\n"
                                                               "    });\n"
                                                               "}\n";
@@ -325,7 +325,7 @@ const char *Templates::ClientMethodDefinitionQmlTemplate = "\nvoid $classname$::
                                                            "        qProtoWarning() << \"Unable to call $classname$::$method_name$, it's only callable from JS engine context\";\n"
                                                            "        return;\n"
                                                            "    }\n\n"
-                                                           "    QtProtobuf::QGrpcAsyncReply *reply = call(\"$method_name$\", *$param_name$);\n"
+                                                           "    QtProtobuf::QGrpcAsyncReplyShared reply = call(\"$method_name$\", *$param_name$);\n"
                                                            "    reply->subscribe(jsEngine, [this, reply, callback, jsEngine]() {\n"
                                                            "        auto result = new $return_type$(reply->read<$return_type$>());\n"
                                                            "        qmlEngine(this)->setObjectOwnership(result, QQmlEngine::JavaScriptOwnership);\n"
@@ -344,19 +344,19 @@ const char *Templates::QmlRegisterEnumTypeTemplate = "qmlRegisterUncreatableType
 
 
 const char *Templates::ClientMethodSignalDeclarationTemplate = "Q_SIGNAL void $method_name$Updated(const $return_type$ &);\n";
-const char *Templates::ClientMethodServerStreamDeclarationTemplate = "QtProtobuf::QGrpcSubscription *subscribe$method_name_upper$Updates(const $param_type$ &$param_name$);\n";
-const char *Templates::ClientMethodServerStream2DeclarationTemplate = "QtProtobuf::QGrpcSubscription *subscribe$method_name_upper$Updates(const $param_type$ &$param_name$, const QPointer<$return_type$> &$return_name$);\n";
-const char *Templates::ClientMethodServerStreamQmlDeclarationTemplate = "Q_INVOKABLE QtProtobuf::QGrpcSubscription *qmlSubscribe$method_name_upper$Updates_p($param_type$ *$param_name$, $return_type$ *$return_name$);\n";
+const char *Templates::ClientMethodServerStreamDeclarationTemplate = "QtProtobuf::QGrpcSubscriptionShared subscribe$method_name_upper$Updates(const $param_type$ &$param_name$);\n";
+const char *Templates::ClientMethodServerStream2DeclarationTemplate = "QtProtobuf::QGrpcSubscriptionShared subscribe$method_name_upper$Updates(const $param_type$ &$param_name$, const QPointer<$return_type$> &$return_name$);\n";
+const char *Templates::ClientMethodServerStreamQmlDeclarationTemplate = "Q_INVOKABLE QtProtobuf::QGrpcSubscriptionShared qmlSubscribe$method_name_upper$Updates_p($param_type$ *$param_name$, $return_type$ *$return_name$);\n";
 
-const char *Templates::ClientMethodServerStreamDefinitionTemplate = "QtProtobuf::QGrpcSubscription *$classname$::subscribe$method_name_upper$Updates(const $param_type$ &$param_name$)\n"
+const char *Templates::ClientMethodServerStreamDefinitionTemplate = "QtProtobuf::QGrpcSubscriptionShared $classname$::subscribe$method_name_upper$Updates(const $param_type$ &$param_name$)\n"
                                                                     "{\n"
                                                                     "    return subscribe(\"$method_name$\", $param_name$);\n"
                                                                     "}\n";
-const char *Templates::ClientMethodServerStream2DefinitionTemplate = "QtProtobuf::QGrpcSubscription *$classname$::subscribe$method_name_upper$Updates(const $param_type$ &$param_name$, const QPointer<$return_type$> &$return_name$)\n"
+const char *Templates::ClientMethodServerStream2DefinitionTemplate = "QtProtobuf::QGrpcSubscriptionShared $classname$::subscribe$method_name_upper$Updates(const $param_type$ &$param_name$, const QPointer<$return_type$> &$return_name$)\n"
                                                                      "{\n"
                                                                      "    return subscribe(\"$method_name$\", $param_name$, $return_name$);\n"
                                                                      "}\n";
-const char *Templates::ClientMethodServerStreamQmlDefinitionTemplate = "QtProtobuf::QGrpcSubscription *$classname$::qmlSubscribe$method_name_upper$Updates_p($param_type$ *$param_name$, $return_type$ *$return_name$)\n"
+const char *Templates::ClientMethodServerStreamQmlDefinitionTemplate = "QtProtobuf::QGrpcSubscriptionShared $classname$::qmlSubscribe$method_name_upper$Updates_p($param_type$ *$param_name$, $return_type$ *$return_name$)\n"
                                                                        "{\n"
                                                                        "    return subscribe(\"$method_name$\", *$param_name$, QPointer<$return_type$>($return_name$));\n"
                                                                        "}\n";

+ 23 - 1
src/grpc/qabstractgrpcchannel.cpp

@@ -27,8 +27,23 @@
 
 #include "qgrpcasyncreply.h"
 #include "qgrpcsubscription.h"
+#include <QThread>
 
-using namespace QtProtobuf;
+namespace QtProtobuf {
+
+struct QAbstractGrpcChannelPrivate {
+    QAbstractGrpcChannelPrivate() : thread(QThread::currentThread()) {
+        assert(thread != nullptr && "QAbstractGrpcChannel has to be created in QApplication context");
+    }
+    const QThread *thread;
+};
+
+QAbstractGrpcChannel::QAbstractGrpcChannel() : dPtr(new QAbstractGrpcChannelPrivate)
+{
+
+}
+
+QAbstractGrpcChannel::~QAbstractGrpcChannel() = default;
 
 void QAbstractGrpcChannel::abort(QGrpcAsyncReply *reply)
 {
@@ -42,3 +57,10 @@ void QAbstractGrpcChannel::cancel(QGrpcSubscription *subscription)
     assert(subscription != nullptr);
     subscription->finished();
 }
+
+const QThread *QAbstractGrpcChannel::thread() const
+{
+    return dPtr->thread;
+}
+
+}

+ 9 - 2
src/grpc/qabstractgrpcchannel.h

@@ -34,12 +34,15 @@
 #include "qgrpccredentials.h"
 #include "qtgrpcglobal.h"
 
+class QThread;
+
 namespace QtProtobuf {
 
 class QGrpcAsyncReply;
 class QGrpcSubscription;
 class QAbstractGrpcClient;
 class QAbstractProtobufSerializer;
+struct QAbstractGrpcChannelPrivate;
 /*!
  * \ingroup QtGrpc
  * \brief The QAbstractGrpcChannel class is interface that represents common gRPC channel functionality.
@@ -85,11 +88,14 @@ public:
     virtual void subscribe(QGrpcSubscription *subscription, const QString &service, QAbstractGrpcClient *client) = 0;
 
     virtual std::shared_ptr<QAbstractProtobufSerializer> serializer() const = 0;
+
+    const QThread *thread() const;
+
 protected:
     //! \private
-    QAbstractGrpcChannel() = default;
+    QAbstractGrpcChannel();
     //! \private
-    virtual ~QAbstractGrpcChannel() = default;
+    virtual ~QAbstractGrpcChannel();
 
     /*!
      * \private
@@ -109,5 +115,6 @@ protected:
     friend class QGrpcSubscription;
 private:
     Q_DISABLE_COPY(QAbstractGrpcChannel)
+    std::unique_ptr<QAbstractGrpcChannelPrivate> dPtr;
 };
 }

+ 56 - 22
src/grpc/qabstractgrpcclient.cpp

@@ -30,6 +30,7 @@
 #include "qprotobufserializerregistry_p.h"
 
 #include <QTimer>
+#include <QThread>
 
 namespace QtProtobuf {
 
@@ -43,7 +44,7 @@ public:
     std::shared_ptr<QAbstractGrpcChannel> channel;
     const QString service;
     std::shared_ptr<QAbstractProtobufSerializer> serializer;
-    std::vector<QGrpcSubscription *> activeSubscriptions;
+    std::vector<QGrpcSubscriptionShared> activeSubscriptions;
 };
 }
 
@@ -59,6 +60,12 @@ QAbstractGrpcClient::~QAbstractGrpcClient()
 
 void QAbstractGrpcClient::attachChannel(const std::shared_ptr<QAbstractGrpcChannel> &channel)
 {
+    if (channel->thread() != QThread::currentThread()) {
+        qProtoCritical() << "QAbstractGrpcClient::attachChannel is called from different thread.\n"
+                           "QtGrpc doesn't guarantie thread safety on channel level.\n"
+                           "You have to be confident that channel routines are working in the same thread as QAbstractGrpcClient";
+        assert(channel->thread() == QThread::currentThread());
+    }
     dPtr->channel = channel;
     dPtr->serializer = channel->serializer();
 }
@@ -66,6 +73,14 @@ void QAbstractGrpcClient::attachChannel(const std::shared_ptr<QAbstractGrpcChann
 QGrpcStatus QAbstractGrpcClient::call(const QString &method, const QByteArray &arg, QByteArray &ret)
 {
     QGrpcStatus callStatus{QGrpcStatus::Unknown};
+    if (thread() != QThread::currentThread()) {
+        QMetaObject::invokeMethod(this, [&]()->QGrpcStatus {
+                                                qProtoDebug() << "Method: " << dPtr->service << method << " called from different thread";
+                                                return call(method, arg, ret);
+                                            }, Qt::BlockingQueuedConnection, &callStatus);
+        return callStatus;
+    }
+
     if (dPtr->channel) {
         callStatus = dPtr->channel->call(method, dPtr->service, arg, ret);
     } else {
@@ -79,22 +94,33 @@ QGrpcStatus QAbstractGrpcClient::call(const QString &method, const QByteArray &a
     return callStatus;
 }
 
-QGrpcAsyncReply *QAbstractGrpcClient::call(const QString &method, const QByteArray &arg)
+QGrpcAsyncReplyShared QAbstractGrpcClient::call(const QString &method, const QByteArray &arg)
 {
-    QGrpcAsyncReply *reply = nullptr;
-    if (dPtr->channel) {
-        reply = new QGrpcAsyncReply(dPtr->channel, this);
-
-        connect(reply, &QGrpcAsyncReply::error, this, [this, reply](const QGrpcStatus &status) {
+    QGrpcAsyncReplyShared reply;
+    if (thread() != QThread::currentThread()) {
+        QMetaObject::invokeMethod(this, [&]()->QGrpcAsyncReplyShared {
+                                      qProtoDebug() << "Method: " << dPtr->service << method << " called from different thread";
+                                      return call(method, arg);
+                                  }, Qt::BlockingQueuedConnection, &reply);
+    } else if (dPtr->channel) {
+        reply.reset(new QGrpcAsyncReply(dPtr->channel, this), [](QGrpcAsyncReply *reply) { reply->deleteLater(); });
+
+        auto errorConnection = std::make_shared<QMetaObject::Connection>();
+        auto finishedConnection = std::make_shared<QMetaObject::Connection>();
+        *errorConnection = connect(reply.get(), &QGrpcAsyncReply::error, this, [this, reply, errorConnection, finishedConnection](const QGrpcStatus &status) mutable {
             error(status);
-            reply->deleteLater();
+            QObject::disconnect(*finishedConnection);
+            QObject::disconnect(*errorConnection);
+            reply.reset();
         });
 
-        connect(reply, &QGrpcAsyncReply::finished, this, [reply] {
-            reply->deleteLater();
+        *finishedConnection = connect(reply.get(), &QGrpcAsyncReply::finished, [reply, errorConnection, finishedConnection]() mutable {
+            QObject::disconnect(*finishedConnection);
+            QObject::disconnect(*errorConnection);
+            reply.reset();
         });
 
-        dPtr->channel->call(method, dPtr->service, arg, reply);
+        dPtr->channel->call(method, dPtr->service, arg, reply.get());
     } else {
         error({QGrpcStatus::Unknown, QLatin1String("No channel(s) attached.")});
     }
@@ -102,13 +128,19 @@ QGrpcAsyncReply *QAbstractGrpcClient::call(const QString &method, const QByteArr
     return reply;
 }
 
-QGrpcSubscription *QAbstractGrpcClient::subscribe(const QString &method, const QByteArray &arg, const QtProtobuf::SubscriptionHandler &handler)
+QGrpcSubscriptionShared QAbstractGrpcClient::subscribe(const QString &method, const QByteArray &arg, const QtProtobuf::SubscriptionHandler &handler)
 {
-    QGrpcSubscription *subscription = nullptr;
-    if (dPtr->channel) {
-        subscription = new QGrpcSubscription(dPtr->channel, method, arg, handler, this);
+    QGrpcSubscriptionShared subscription;
+
+    if (thread() != QThread::currentThread()) {
+        QMetaObject::invokeMethod(this, [&]()->QGrpcSubscriptionShared {
+                                      qProtoDebug() << "Subscription: " << dPtr->service << method << " called from different thread";
+                                      return subscribe(method, arg, handler);
+                                  }, Qt::BlockingQueuedConnection, &subscription);
+    } else if (dPtr->channel) {
+        subscription.reset(new QGrpcSubscription(dPtr->channel, method, arg, handler, this), [](QGrpcSubscription *subscription) { subscription->deleteLater(); });
 
-        auto it = std::find_if(std::begin(dPtr->activeSubscriptions), std::end(dPtr->activeSubscriptions), [subscription](QGrpcSubscription *activeSubscription) {
+        auto it = std::find_if(std::begin(dPtr->activeSubscriptions), std::end(dPtr->activeSubscriptions), [subscription](const QGrpcSubscriptionShared &activeSubscription) {
            return *activeSubscription == *subscription;
         });
 
@@ -117,25 +149,27 @@ QGrpcSubscription *QAbstractGrpcClient::subscribe(const QString &method, const Q
             return *it; //If subscription already exists return it for handling
         }
 
-        connect(subscription, &QGrpcSubscription::error, this, [this, subscription](const QGrpcStatus &status) {
+        connect(subscription.get(), &QGrpcSubscription::error, this, [this, subscription](const QGrpcStatus &status) {
             qProtoWarning() << subscription->method() << "call" << dPtr->service << "subscription error: " << status.message();
             error(status);
-            dPtr->channel->subscribe(subscription, dPtr->service, this);
+            dPtr->channel->subscribe(subscription.get(), dPtr->service, this);
         });
 
-        connect(subscription, &QGrpcSubscription::finished, this, [this, subscription] {
+        auto finishedConnection = std::make_shared<QMetaObject::Connection>();
+        *finishedConnection = connect(subscription.get(), &QGrpcSubscription::finished, this, [this, subscription, finishedConnection]() mutable {
             qProtoWarning() << subscription->method() << "call" << dPtr->service << "subscription finished";
-            auto it = std::find_if(std::begin(dPtr->activeSubscriptions), std::end(dPtr->activeSubscriptions), [subscription](QGrpcSubscription *activeSubscription) {
+            auto it = std::find_if(std::begin(dPtr->activeSubscriptions), std::end(dPtr->activeSubscriptions), [subscription](QGrpcSubscriptionShared activeSubscription) {
                return *activeSubscription == *subscription;
             });
 
             if (it != std::end(dPtr->activeSubscriptions)) {
                 dPtr->activeSubscriptions.erase(it);
             }
-            subscription->deleteLater();
+            QObject::disconnect(*finishedConnection);
+            subscription.reset();
         });
 
-        dPtr->channel->subscribe(subscription, dPtr->service, this);
+        dPtr->channel->subscribe(subscription.get(), dPtr->service, this);
         dPtr->activeSubscriptions.push_back(subscription);
     } else {
         error({QGrpcStatus::Unknown, QLatin1String("No channel(s) attached.")});

+ 8 - 5
src/grpc/qabstractgrpcclient.h

@@ -61,6 +61,7 @@ using SubscriptionHandler = std::function<void(const QByteArray&)>;
  * \ingroup QtGrpc
  * \brief The QAbstractGrpcClient class is bridge between gRPC clients and channels. QAbstractGrpcClient provides set of
  *        bridge functions for client classes generated out of protobuf services.
+ * \details QAbstractGrpcClient provides threads safety for subscribe and call methods of generated clients.
  */
 class Q_GRPC_EXPORT QAbstractGrpcClient : public QObject
 {
@@ -69,6 +70,8 @@ public:
     /*!
      * \brief Attaches \a channel to client as transport layer for gRPC. Parameters and return values will be serialized
      *        to supported by channel format.
+     * \note \b Warning: QtGrpc doesn't guarantie thread safety on channel level.
+     *       You have to be confident that channel routines are working in the same thread as QAbstractGrpcClient.
      * \see QAbstractGrcpChannel
      * \param channel Shared pointer to channel will be used as transport layer for gRPC
      */
@@ -119,7 +122,7 @@ protected:
      * \param[in] arg Protobuf message argument for \p method
      */
     template<typename A>
-    QGrpcAsyncReply *call(const QString &method, const A &arg) {
+    QGrpcAsyncReplyShared call(const QString &method, const A &arg) {
         return call(method, arg.serialize(serializer()));
     }
 
@@ -132,7 +135,7 @@ protected:
      *             update recevied from server-stream
      */
     template<typename A>
-    QGrpcSubscription *subscribe(const QString &method, const A &arg) {
+    QGrpcSubscriptionShared subscribe(const QString &method, const A &arg) {
         return subscribe(method, arg.serialize(serializer()));
     }
 
@@ -147,7 +150,7 @@ protected:
      *       updated message recevied from server-stream
      */
     template<typename A, typename R>
-    QGrpcSubscription *subscribe(const QString &method, const A &arg, const QPointer<R> &ret) {
+    QGrpcSubscriptionShared subscribe(const QString &method, const A &arg, const QPointer<R> &ret) {
         if (ret.isNull()) {
             static const QString nullPointerError("Unable to subscribe method: %1. Pointer to return data is null");
             error({QGrpcStatus::InvalidArgument, nullPointerError.arg(method)});
@@ -184,10 +187,10 @@ private:
     QGrpcStatus call(const QString &method, const QByteArray &arg, QByteArray &ret);
 
     //!\private
-    QGrpcAsyncReply *call(const QString &method, const QByteArray &arg);
+    QGrpcAsyncReplyShared call(const QString &method, const QByteArray &arg);
 
     //!\private
-    QGrpcSubscription *subscribe(const QString &method, const QByteArray &arg, const QtProtobuf::SubscriptionHandler &handler = {});
+    QGrpcSubscriptionShared subscribe(const QString &method, const QByteArray &arg, const QtProtobuf::SubscriptionHandler &handler = {});
 
     /*!
      * \private

+ 11 - 0
src/grpc/qgrpcasyncreply.cpp

@@ -25,4 +25,15 @@
 
 #include "qgrpcasyncreply.h"
 
+#include <QThread>
+
 using namespace QtProtobuf;
+
+void QGrpcAsyncReply::abort()
+{
+    if (thread() != QThread::currentThread()) {
+        QMetaObject::invokeMethod(this, [this](){m_channel->abort(this);}, Qt::BlockingQueuedConnection);
+    } else {
+        m_channel->abort(this);
+    }
+}

+ 1 - 3
src/grpc/qgrpcasyncreply.h

@@ -50,9 +50,7 @@ public:
     /*!
      * \brief Aborts this reply and try to abort call in channel
      */
-    void abort() {
-        m_channel->abort(this);
-    }
+    void abort();
 
     /*!
      * \brief Subscribe to QGrpcAsyncReply signals

+ 10 - 0
src/grpc/qgrpcsubscription.cpp

@@ -26,6 +26,7 @@
 #include "qgrpcsubscription.h"
 
 #include <qtprotobuflogging.h>
+#include <QThread>
 
 using namespace QtProtobuf;
 
@@ -45,3 +46,12 @@ void QGrpcSubscription::addHandler(const SubscriptionHandler &handler)
         m_handlers.push_back(handler);
     }
 }
+
+void QGrpcSubscription::cancel()
+{
+    if (thread() != QThread::currentThread()) {
+        QMetaObject::invokeMethod(this, [this](){m_channel->cancel(this);}, Qt::BlockingQueuedConnection);
+    } else {
+        m_channel->cancel(this);
+    }
+}

+ 11 - 9
src/grpc/qgrpcsubscription.h

@@ -50,9 +50,7 @@ public:
     /*!
      * \brief Cancels this subscription and try to abort call in channel
      */
-    void cancel() {
-        m_channel->cancel(this);
-    }
+    void cancel();
 
     /*!
      * \brief Returns method for this subscription
@@ -69,7 +67,10 @@ public:
     }
 
     /*!
-     * \brief Invokes handler method assigned to this subscription
+     * \brief Invokes handler method assigned to this subscription.
+     * \param data updated subscription data buffer
+     * \details Should be used by QAbstractGrpcChannel implementations,
+     *          to update data in subscription and notify clients about subscription updates.
      */
     void handler(const QByteArray& data) {
         setData(data);
@@ -79,11 +80,6 @@ public:
         updated();
     }
 
-    bool operator ==(const QGrpcSubscription &other) const {
-        return other.method() == this->method() &&
-                other.arg() == this->arg();
-    }
-
 signals:
     /*!
      * \brief The signal is emitted when subscription received updated value from server
@@ -99,6 +95,12 @@ protected:
 
     //! \private
     void addHandler(const SubscriptionHandler &handler);
+
+    bool operator ==(const QGrpcSubscription &other) const {
+        return other.method() == this->method() &&
+                other.arg() == this->arg();
+    }
+
 private:
     friend class QAbstractGrpcClient;
     QString m_method;

+ 9 - 0
src/grpc/qtgrpcglobal.h

@@ -62,3 +62,12 @@
         #define Q_GRPC_IMPORT_QUICK_PLUGIN()
     #endif //QT_PROTOBUF_STATIC
 #endif //Q_PROTOBUF_IMPORT_QUICK_PLUGIN
+
+#include <memory>
+
+namespace QtProtobuf {
+class QGrpcAsyncReply;
+class QGrpcSubscription;
+using QGrpcAsyncReplyShared = std::shared_ptr<QGrpcAsyncReply>;
+using QGrpcSubscriptionShared = std::shared_ptr<QGrpcSubscription>;
+}

+ 9 - 9
src/grpc/quick/qquickgrpcsubscription.cpp

@@ -40,7 +40,7 @@ QQuickGrpcSubscription::QQuickGrpcSubscription(QObject *parent) : QObject(parent
 
 QQuickGrpcSubscription::~QQuickGrpcSubscription()
 {
-    if (!m_subscription.isNull()) {
+    if (m_subscription) {
         m_subscription->cancel();
     }
     delete m_returnValue;
@@ -49,9 +49,9 @@ QQuickGrpcSubscription::~QQuickGrpcSubscription()
 
 void QQuickGrpcSubscription::updateSubscription()
 {
-    if (!m_subscription.isNull()) {
+    if (m_subscription) {
         m_subscription->cancel();
-        m_subscription = nullptr;
+        m_subscription.reset();
     }
 
     if (m_returnValue != nullptr) {
@@ -150,13 +150,13 @@ bool QQuickGrpcSubscription::subscribe()
         return false;
     }
 
-    QGrpcSubscription *subscription = nullptr;
+    QGrpcSubscriptionShared subscription = nullptr;
     bool ok = method.invoke(m_client, Qt::DirectConnection,
-                                      QGenericReturnArgument("QtProtobuf::QGrpcSubscription*", static_cast<void *>(&subscription)),
+                                      QGenericReturnArgument("QtProtobuf::QGrpcSubscriptionShared", static_cast<void *>(&subscription)),
                                       QGenericArgument(method.parameterTypes().at(0).data(), static_cast<const void *>(&argument)),
                                       QGenericArgument(method.parameterTypes().at(1).data(), static_cast<const void *>(&m_returnValue)));
     if (!ok || subscription == nullptr) {
-        errorString = QString("Unable to call ") + m_method + "invalidate subscription.";
+        errorString = QString("Unable to call ") + m_method + " invalidate subscription.";
         qProtoWarning() << errorString;
         error({QGrpcStatus::Unknown, errorString});
         return false;
@@ -164,13 +164,13 @@ bool QQuickGrpcSubscription::subscribe()
 
     m_subscription = subscription;
 
-    connect(m_subscription, &QGrpcSubscription::updated, this, [this](){
+    connect(m_subscription.get(), &QGrpcSubscription::updated, this, [this](){
         updated(qjsEngine(this)->toScriptValue(m_returnValue));
     });
 
-    connect(m_subscription, &QGrpcSubscription::error, this, &QQuickGrpcSubscription::error);//TODO: Probably it's good idea to disable subscription here
+    connect(m_subscription.get(), &QGrpcSubscription::error, this, &QQuickGrpcSubscription::error);//TODO: Probably it's good idea to disable subscription here
 
-    connect(m_subscription, &QGrpcSubscription::finished, this, [this](){ setEnabled(false); });
+    connect(m_subscription.get(), &QGrpcSubscription::finished, this, [this](){ m_subscription.reset(); setEnabled(false); });
 
     return true;
 }

+ 1 - 1
src/grpc/quick/qquickgrpcsubscription_p.h

@@ -183,7 +183,7 @@ private:
     bool m_enabled;
     QString m_method;
     QPointer<QObject> m_argument;
-    QPointer<QGrpcSubscription> m_subscription;
+    std::shared_ptr<QGrpcSubscription> m_subscription;
     QObject *m_returnValue;
 };
 

+ 158 - 28
tests/test_grpc/clienttest.cpp

@@ -31,6 +31,7 @@
 #include <QTimer>
 #include <QFile>
 #include <QCryptographicHash>
+#include <QThread>
 
 #include <QCoreApplication>
 #include <gtest/gtest.h>
@@ -64,7 +65,7 @@ TEST_F(ClientTest, CheckMethodsGeneration)
     QPointer<SimpleStringMessage> result(new SimpleStringMessage);
     testClient.testMethod(request, result);
     testClient.testMethod(request);
-    testClient.testMethod(request, &testClient, [](QGrpcAsyncReply *) {});
+    testClient.testMethod(request, &testClient, [](QGrpcAsyncReplyShared) {});
     delete result;
 }
 
@@ -89,9 +90,10 @@ TEST_F(ClientTest, StringEchoAsyncTest)
     request.setTestFieldString("Hello beach!");
     QEventLoop waiter;
 
-    QGrpcAsyncReply *reply = testClient.testMethod(request);
-    QObject::connect(reply, &QGrpcAsyncReply::finished, &m_app, [reply, &result, &waiter]() {
+    QGrpcAsyncReplyShared reply = testClient.testMethod(request);
+    QObject::connect(reply.get(), &QGrpcAsyncReply::finished, &m_app, [reply, &result, &waiter]() {
         result = reply->read<SimpleStringMessage>();
+        reply->deleteLater();
         waiter.quit();
     });
 
@@ -107,7 +109,7 @@ TEST_F(ClientTest, StringEchoAsync2Test)
     SimpleStringMessage request;
     request.setTestFieldString("Hello beach!");
     QEventLoop waiter;
-    testClient.testMethod(request, &m_app, [&result, &waiter](QGrpcAsyncReply *reply) {
+    testClient.testMethod(request, &m_app, [&result, &waiter](QGrpcAsyncReplyShared reply) {
         result = reply->read<SimpleStringMessage>();
         waiter.quit();
     });
@@ -124,21 +126,22 @@ TEST_F(ClientTest, StringEchoImmediateAsyncAbortTest)
     SimpleStringMessage request;
     request.setTestFieldString("sleep");
     QEventLoop waiter;
-    QGrpcAsyncReply *reply = testClient.testMethod(request);
+    QGrpcAsyncReplyShared reply = testClient.testMethod(request);
 
     result.setTestFieldString("Result not changed by echo");
-    QObject::connect(reply, &QGrpcAsyncReply::finished, &m_app, [&waiter, &result, reply]() {
+    QObject::connect(reply.get(), &QGrpcAsyncReply::finished, &m_app, [&waiter, &result, reply]() {
         result = reply->read<SimpleStringMessage>();
+        reply->deleteLater();
         waiter.quit();
     });
 
     QGrpcStatus::StatusCode asyncStatus = QGrpcStatus::StatusCode::Ok;
-    QObject::connect(reply, &QGrpcAsyncReply::error, reply, [&asyncStatus](const QGrpcStatus &status) {
+    QObject::connect(reply.get(), &QGrpcAsyncReply::error, [&asyncStatus](const QGrpcStatus &status) {
         asyncStatus = status.code();
     });
 
     QGrpcStatus::StatusCode clientStatus = QGrpcStatus::StatusCode::Ok;
-    QObject::connect(&testClient, &TestServiceClient::error, reply, [&clientStatus](const QGrpcStatus &status) {
+    QObject::connect(&testClient, &TestServiceClient::error, [&clientStatus](const QGrpcStatus &status) {
         clientStatus = status.code();
         std::cerr << status.code() << ":" << status.message().toStdString();
     });
@@ -160,20 +163,20 @@ TEST_F(ClientTest, StringEchoDeferredAsyncAbortTest)
     SimpleStringMessage request;
     request.setTestFieldString("sleep");
     QEventLoop waiter;
-    QGrpcAsyncReply *reply = testClient.testMethod(request);
+    QGrpcAsyncReplyShared reply = testClient.testMethod(request);
 
     result.setTestFieldString("Result not changed by echo");
     bool errorCalled = false;
     reply = testClient.testMethod(request);
-    QObject::connect(reply, &QGrpcAsyncReply::finished, &m_app, [reply, &result, &waiter]() {
+    QObject::connect(reply.get(), &QGrpcAsyncReply::finished, &m_app, [reply, &result, &waiter]() {
         result = reply->read<SimpleStringMessage>();
         waiter.quit();
     });
-    QObject::connect(reply, &QGrpcAsyncReply::error, reply, [&errorCalled]() {
+    QObject::connect(reply.get(), &QGrpcAsyncReply::error, [&errorCalled]() {
         errorCalled = true;
     });
 
-    QTimer::singleShot(500, reply, &QGrpcAsyncReply::abort);
+    QTimer::singleShot(500, reply.get(), &QGrpcAsyncReply::abort);
     QTimer::singleShot(5000, &waiter, &QEventLoop::quit);
 
     waiter.exec();
@@ -194,7 +197,7 @@ TEST_F(ClientTest, StringEchoStreamTest)
 
     int i = 0;
     auto subscription = testClient.subscribeTestMethodServerStreamUpdates(request);
-    QObject::connect(subscription, &QGrpcSubscription::updated, &m_app, [&result, &i, &waiter, subscription]() {
+    QObject::connect(subscription.get(), &QGrpcSubscription::updated, &m_app, [&result, &i, &waiter, subscription]() {
         SimpleStringMessage ret = subscription->read<SimpleStringMessage>();
 
         ++i;
@@ -225,8 +228,8 @@ TEST_F(ClientTest, StringEchoStreamAbortTest)
     QEventLoop waiter;
 
     int i = 0;
-    QtProtobuf::QGrpcSubscription *subscription = testClient.subscribeTestMethodServerStreamUpdates(request);
-    QObject::connect(subscription, &QGrpcSubscription::updated, &m_app, [&result, &i, &waiter, subscription]() {
+    auto subscription = testClient.subscribeTestMethodServerStreamUpdates(request);
+    QObject::connect(subscription.get(), &QGrpcSubscription::updated, &m_app, [&result, &i, &waiter, subscription]() {
         SimpleStringMessage ret = subscription->read<SimpleStringMessage>();
         ++i;
 
@@ -257,22 +260,22 @@ TEST_F(ClientTest, StringEchoStreamAbortByTimerTest)
 
 
     int i = 0;
-    QtProtobuf::QGrpcSubscription *subscription = testClient.subscribeTestMethodServerStreamUpdates(request);
-    QTimer::singleShot(3500, subscription, [subscription]() {
+    auto subscription = testClient.subscribeTestMethodServerStreamUpdates(request);
+    QTimer::singleShot(3500, subscription.get(), [subscription]() {
         subscription->cancel();
     });
 
     bool isFinished = false;
-    QObject::connect(subscription, &QtProtobuf::QGrpcSubscription::finished, [&isFinished]() {
+    QObject::connect(subscription.get(), &QtProtobuf::QGrpcSubscription::finished, [&isFinished]() {
         isFinished = true;
     });
 
     bool isError = false;
-    QObject::connect(subscription, &QtProtobuf::QGrpcSubscription::error, [&isError]() {
+    QObject::connect(subscription.get(), &QtProtobuf::QGrpcSubscription::error, [&isError]() {
         isError = true;
     });
 
-    QObject::connect(subscription, &QGrpcSubscription::updated, &m_app, [&result, &i, subscription]() {
+    QObject::connect(subscription.get(), &QGrpcSubscription::updated, &m_app, [&result, &i, subscription]() {
         SimpleStringMessage ret = subscription->read<SimpleStringMessage>();
         ++i;
 
@@ -330,7 +333,7 @@ TEST_F(ClientTest, HugeBlobEchoStreamTest)
 
     auto subscription = testClient.subscribeTestMethodBlobServerStreamUpdates(request);
 
-    QObject::connect(subscription, &QGrpcSubscription::updated, &m_app, [&result, &waiter, subscription]() {
+    QObject::connect(subscription.get(), &QGrpcSubscription::updated, &m_app, [&result, &waiter, subscription]() {
         BlobMessage ret = subscription->read<BlobMessage>();
         result.setTestBytes(ret.testBytes());
         waiter.quit();
@@ -353,8 +356,8 @@ TEST_F(ClientTest, StatusMessageAsyncTest)
     QEventLoop waiter;
     QString statusMessage;
 
-    QGrpcAsyncReply* reply = testClient.testMethodStatusMessage(request);
-    QObject::connect(reply, &QGrpcAsyncReply::error, reply, [&asyncStatus, &waiter, &statusMessage](const QGrpcStatus &status) {
+    QGrpcAsyncReplyShared reply = testClient.testMethodStatusMessage(request);
+    QObject::connect(reply.get(), &QGrpcAsyncReply::error, [&asyncStatus, &waiter, &statusMessage](const QGrpcStatus &status) {
         asyncStatus = status.code();
         statusMessage = status.message();
         waiter.quit();
@@ -540,7 +543,7 @@ TEST_F(ClientTest, MultipleSubscriptionsTest)
     ASSERT_EQ(subscription, subscriptionNext);
 
     int i = 0;
-    QObject::connect(subscription, &QGrpcSubscription::updated, &m_app, [&result, &i, subscription]() {
+    QObject::connect(subscription.get(), &QGrpcSubscription::updated, &m_app, [&result, &i, subscription]() {
         SimpleStringMessage ret = subscription->read<SimpleStringMessage>();
         ++i;
 
@@ -568,12 +571,12 @@ TEST_F(ClientTest, MultipleSubscriptionsCancelTest)
     ASSERT_EQ(subscription, subscriptionNext);
 
     bool isFinished = false;
-    QObject::connect(subscription, &QtProtobuf::QGrpcSubscription::finished, [&isFinished]() {
+    QObject::connect(subscription.get(), &QtProtobuf::QGrpcSubscription::finished, [&isFinished]() {
         isFinished = true;
     });
 
     bool isFinishedNext = false;
-    QObject::connect(subscriptionNext, &QtProtobuf::QGrpcSubscription::finished, [&isFinishedNext]() {
+    QObject::connect(subscriptionNext.get(), &QtProtobuf::QGrpcSubscription::finished, [&isFinishedNext]() {
         isFinishedNext = true;
     });
 
@@ -590,12 +593,12 @@ TEST_F(ClientTest, MultipleSubscriptionsCancelTest)
     ASSERT_EQ(subscription, subscriptionNext);
 
     isFinished = false;
-    QObject::connect(subscription, &QtProtobuf::QGrpcSubscription::finished, [&isFinished]() {
+    QObject::connect(subscription.get(), &QtProtobuf::QGrpcSubscription::finished, [&isFinished]() {
         isFinished = true;
     });
 
     isFinishedNext = false;
-    QObject::connect(subscriptionNext, &QtProtobuf::QGrpcSubscription::finished, [&isFinishedNext]() {
+    QObject::connect(subscriptionNext.get(), &QtProtobuf::QGrpcSubscription::finished, [&isFinishedNext]() {
         isFinishedNext = true;
     });
 
@@ -615,3 +618,130 @@ TEST_F(ClientTest, NonCompatibleArgRetTest)
     ASSERT_STREQ(result->testFieldString().toStdString().c_str(), "2048");
     delete result;
 }
+
+TEST_F(ClientTest, StringEchoThreadTest)
+{
+    TestServiceClient testClient;
+    testClient.attachChannel(std::make_shared<QGrpcHttp2Channel>(m_echoServerAddress, QGrpcInsecureChannelCredentials() | QGrpcInsecureCallCredentials()));
+    SimpleStringMessage request;
+    QPointer<SimpleStringMessage> result(new SimpleStringMessage);
+    request.setTestFieldString("Hello beach from thread!");
+    bool ok = false;
+    std::shared_ptr<QThread> thread(QThread::create([&](){
+        ok = testClient.testMethod(request, result) == QGrpcStatus::Ok;
+    }));
+
+    thread->start();
+    QEventLoop wait;
+    QTimer::singleShot(2000, &wait, &QEventLoop::quit);
+    wait.exec();
+
+    ASSERT_TRUE(ok);
+    ASSERT_STREQ(result->testFieldString().toStdString().c_str(), "Hello beach from thread!");
+    delete result;
+
+    //Delete result pointer in between call operations
+    result = new SimpleStringMessage();
+    ok = false;
+    thread.reset(QThread::create([&](){
+        ok = testClient.testMethod(request, result) == QGrpcStatus::Ok;
+    }));
+
+    thread->start();
+    delete result;
+    QTimer::singleShot(2000, &wait, &QEventLoop::quit);
+    wait.exec();
+
+    ASSERT_TRUE(!ok);
+}
+
+
+TEST_F(ClientTest, StringEchoAsyncThreadTest)
+{
+    TestServiceClient testClient;
+    testClient.attachChannel(std::make_shared<QGrpcHttp2Channel>(m_echoServerAddress, QGrpcInsecureChannelCredentials() | QGrpcInsecureCallCredentials()));
+    SimpleStringMessage request;
+    SimpleStringMessage result;
+    request.setTestFieldString("Hello beach from thread!");
+
+    bool threadsOk = true;
+    bool replyDestroyed = true;
+    std::shared_ptr<QThread> thread(QThread::create([&](){
+        QEventLoop waiter;
+        QThread *validThread = QThread::currentThread();
+        QGrpcAsyncReplyShared reply = testClient.testMethod(request);
+        QObject::connect(reply.get(), &QObject::destroyed, [&replyDestroyed]{replyDestroyed = true;});
+        QObject::connect(reply.get(), &QGrpcAsyncReply::finished, &waiter, [reply, &result, &waiter, &threadsOk, validThread]() {
+            threadsOk &= reply->thread() != QThread::currentThread();
+            threadsOk &= validThread == QThread::currentThread();
+            result = reply->read<SimpleStringMessage>();
+            waiter.quit();
+        });
+        threadsOk &= reply->thread() != QThread::currentThread();
+        waiter.exec();
+    }));
+
+    thread->start();
+    QEventLoop wait;
+    QTimer::singleShot(2000, &wait, &QEventLoop::quit);
+    wait.exec();
+    ASSERT_TRUE(replyDestroyed);
+    ASSERT_TRUE(threadsOk);
+    ASSERT_STREQ(result.testFieldString().toStdString().c_str(), "Hello beach from thread!");
+}
+
+TEST_F(ClientTest, StringEchoStreamThreadTest)
+{
+    TestServiceClient testClient;
+    testClient.attachChannel(std::make_shared<QGrpcHttp2Channel>(m_echoServerAddress, QGrpcInsecureCallCredentials() | QGrpcInsecureChannelCredentials()));
+    SimpleStringMessage result;
+    SimpleStringMessage request;
+    request.setTestFieldString("Stream");
+
+    int i = 0;
+    bool threadsOk = true;
+    std::shared_ptr<QThread> thread(QThread::create([&](){
+        QEventLoop waiter;
+        QThread *validThread = QThread::currentThread();
+        auto subscription = testClient.subscribeTestMethodServerStreamUpdates(request);
+        QObject::connect(subscription.get(), &QGrpcSubscription::updated, &waiter, [&result, &i, &waiter, subscription, &threadsOk, validThread]() {
+            SimpleStringMessage ret = subscription->read<SimpleStringMessage>();
+            result.setTestFieldString(result.testFieldString() + ret.testFieldString());
+            ++i;
+            if (i == 4) {
+                waiter.quit();
+            }
+            threadsOk &= subscription->thread() != QThread::currentThread();
+            threadsOk &= validThread == QThread::currentThread();
+        });
+
+        threadsOk &= subscription->thread() != QThread::currentThread();
+        QTimer::singleShot(20000, &waiter, &QEventLoop::quit);
+        waiter.exec();
+    }));
+
+    thread->start();
+    QEventLoop wait;
+    QObject::connect(thread.get(), &QThread::finished, &wait, [&wait]{ wait.quit(); });
+    QTimer::singleShot(20000, &wait, &QEventLoop::quit);
+    wait.exec();
+
+    ASSERT_TRUE(threadsOk);
+    ASSERT_EQ(i, 4);
+    ASSERT_STREQ(result.testFieldString().toStdString().c_str(), "Stream1Stream2Stream3Stream4");
+}
+
+TEST_F(ClientTest, AttachChannelThreadTest)
+{
+    std::shared_ptr<QGrpcHttp2Channel> channel;
+    std::shared_ptr<QThread> thread(QThread::create([&](){
+        channel = std::make_shared<QGrpcHttp2Channel>(m_echoServerAddress, QGrpcInsecureCallCredentials() | QGrpcInsecureChannelCredentials());
+    }));
+    thread->start();
+    QThread::msleep(1000);
+    TestServiceClient testClient;
+    EXPECT_DEATH({
+                     testClient.attachChannel(channel);
+                 }, ".*");
+}
+