2 Commits 51615ca6bb ... 33f7e19db8

Author SHA1 Message Date
  Alexey Edelev 33f7e19db8 Remove the linking between QGrpcAsyncOperationBase and QAbstractGrpcChannel 2 years ago
  Alexey Edelev 503c126dbc Keep serializer ownership inside the grpc channel 2 years ago

+ 1 - 1
examples/simplechat/simplechatengine.cpp

@@ -76,7 +76,7 @@ void SimpleChatEngine::login(const QString &name, const QString &password)
     QtProtobuf::QGrpcStreamShared stream = m_client->streamMessageList(None());
     QObject::connect(stream.get(), &QtProtobuf::QGrpcStream::error, this, [stream] {
         qCritical() << "Stream error, cancel";
-        stream->cancel();
+        stream->abort();
     });
     QObject::connect(stream.get(), &QtProtobuf::QGrpcStream::messageReceived, this, [this, name, stream]() {
         if (m_userName != name) {

+ 1 - 18
src/grpc/qabstractgrpcchannel.cpp

@@ -38,26 +38,9 @@ struct QAbstractGrpcChannelPrivate {
     const QThread *thread;
 };
 
-QAbstractGrpcChannel::QAbstractGrpcChannel() : dPtr(new QAbstractGrpcChannelPrivate)
-{
-
-}
-
+QAbstractGrpcChannel::QAbstractGrpcChannel() : dPtr(new QAbstractGrpcChannelPrivate) {}
 QAbstractGrpcChannel::~QAbstractGrpcChannel() = default;
 
-void QAbstractGrpcChannel::abort(QGrpcCallReply *reply)
-{
-    assert(reply != nullptr);
-    reply->setData({});
-    reply->error({QGrpcStatus::StatusCode::Aborted, QLatin1String("Call aborted by user or timeout")});
-}
-
-void QAbstractGrpcChannel::cancel(QGrpcStream *stream)
-{
-    assert(stream != nullptr);
-    stream->finished();
-}
-
 const QThread *QAbstractGrpcChannel::thread() const
 {
     return dPtr->thread;

+ 0 - 19
src/grpc/qabstractgrpcchannel.h

@@ -38,8 +38,6 @@ class QThread;
 
 namespace QtProtobuf {
 
-class QGrpcCallReply;
-class QGrpcStream;
 class QAbstractGrpcClient;
 class QAbstractProtobufSerializer;
 struct QAbstractGrpcChannelPrivate;
@@ -96,23 +94,6 @@ protected:
     QAbstractGrpcChannel();
     //! \private
     virtual ~QAbstractGrpcChannel();
-
-    /*!
-     * \private
-     * \brief Aborts async call for given \p reply
-     * \param[in] reply returned by asynchronous QAbstractGrpcChannel::call() method
-     */
-    virtual void abort(QGrpcCallReply *reply);
-
-    /*!
-     * \private
-     * \brief Cancels \p stream
-     * \param[in] stream returned by QAbstractGrpcChannel::stream() method
-     */
-    virtual void cancel(QGrpcStream *stream);
-
-    friend class QGrpcCallReply;
-    friend class QGrpcStream;
 private:
     Q_DISABLE_COPY(QAbstractGrpcChannel)
     std::unique_ptr<QAbstractGrpcChannelPrivate> dPtr;

+ 10 - 11
src/grpc/qabstractgrpcclient.cpp

@@ -37,13 +37,10 @@ namespace QtProtobuf {
 //! \private
 class QAbstractGrpcClientPrivate final {
 public:
-    QAbstractGrpcClientPrivate(const QString &service) : service(service) {
-        serializer = QProtobufSerializerRegistry::instance().getSerializer("protobuf");
-    }
+    QAbstractGrpcClientPrivate(const QString &service) : service(service) {}
 
     std::shared_ptr<QAbstractGrpcChannel> channel;
     const QString service;
-    std::shared_ptr<QAbstractProtobufSerializer> serializer;
     std::vector<QGrpcStreamShared> activeStreams;
 };
 }
@@ -67,12 +64,11 @@ void QAbstractGrpcClient::attachChannel(const std::shared_ptr<QAbstractGrpcChann
         throw std::runtime_error("Call from another thread");
     }
     for (auto stream : dPtr->activeStreams) {
-        stream->cancel();
+        stream->abort();
     }
     dPtr->channel = channel;
-    dPtr->serializer = channel->serializer();
     for (auto stream : dPtr->activeStreams) {
-        stream->cancel();
+        stream->abort();
     }
 }
 
@@ -109,7 +105,7 @@ QGrpcCallReplyShared QAbstractGrpcClient::call(const QString &method, const QByt
                                       return call(method, arg);
                                   }, Qt::BlockingQueuedConnection, &reply);
     } else if (dPtr->channel) {
-        reply.reset(new QGrpcCallReply(dPtr->channel, this), [](QGrpcCallReply *reply) { reply->deleteLater(); });
+        reply.reset(new QGrpcCallReply(this), [](QGrpcCallReply *reply) { reply->deleteLater(); });
 
         auto errorConnection = std::make_shared<QMetaObject::Connection>();
         auto finishedConnection = std::make_shared<QMetaObject::Connection>();
@@ -144,7 +140,7 @@ QGrpcStreamShared QAbstractGrpcClient::stream(const QString &method, const QByte
                                       return stream(method, arg, handler);
                                   }, Qt::BlockingQueuedConnection, &grpcStream);
     } else if (dPtr->channel) {
-        grpcStream.reset(new QGrpcStream(dPtr->channel, method, arg, handler, this), [](QGrpcStream *stream) { stream->deleteLater(); });
+        grpcStream.reset(new QGrpcStream(method, arg, handler, this), [](QGrpcStream *stream) { stream->deleteLater(); });
 
         auto it = std::find_if(std::begin(dPtr->activeStreams), std::end(dPtr->activeStreams), [grpcStream](const QGrpcStreamShared &activeStream) {
            return *activeStream == *grpcStream;
@@ -194,7 +190,10 @@ QGrpcStreamShared QAbstractGrpcClient::stream(const QString &method, const QByte
     return grpcStream;
 }
 
-QAbstractProtobufSerializer *QAbstractGrpcClient::serializer() const
+std::shared_ptr<QAbstractProtobufSerializer> QAbstractGrpcClient::serializer() const
 {
-    return dPtr->serializer.get();
+    if (dPtr->channel == nullptr || dPtr->channel->serializer() == nullptr) {
+        return nullptr;
+    }
+    return dPtr->channel->serializer();
 }

+ 69 - 35
src/grpc/qabstractgrpcclient.h

@@ -46,17 +46,10 @@
  */
 namespace QtProtobuf {
 
-class QGrpcCallReply;
-class QGrpcStream;
 class QGrpcAsyncOperationBase;
 class QAbstractGrpcChannel;
 class QAbstractGrpcClientPrivate;
 
-/*!
- * \private
- */
-using StreamHandler = std::function<void(const QByteArray&)>;
-
 /*!
  * \ingroup QtGrpc
  * \brief The QAbstractGrpcClient class is bridge between gRPC clients and channels. QAbstractGrpcClient provides set of
@@ -83,7 +76,7 @@ signals:
      * \param[out] code gRPC channel StatusCode
      * \param[out] errorText Error description from channel or from QGrpc
      */
-    void error(const QGrpcStatus &status);
+    void error(const QGrpcStatus &status) const;
 
 protected:
     QAbstractGrpcClient(const QString &service, QObject *parent = nullptr);
@@ -108,9 +101,15 @@ protected:
         }
 
         QByteArray retData;
-        status = call(method, arg.serialize(serializer()), retData);
-        if (status == QGrpcStatus::StatusCode::Ok) {
-            return tryDeserialize(*ret, retData);
+        bool ok = false;
+        QByteArray argData = trySerialize(arg, ok);
+        if (ok) {
+            status = call(method, argData, retData);
+            if (status == QGrpcStatus::StatusCode::Ok) {
+                status = tryDeserialize(*ret, retData);
+            }
+        } else {
+            status = QGrpcStatus({QGrpcStatus::Unknown, QLatin1String("Serializing failed. Serializer is not ready")});
         }
         return status;
     }
@@ -123,7 +122,12 @@ protected:
      */
     template<typename A>
     QGrpcCallReplyShared call(const QString &method, const A &arg) {
-        return call(method, arg.serialize(serializer()));
+        bool ok = false;
+        QByteArray argData = trySerialize(arg, ok);
+        if (!ok) {
+            return QGrpcCallReplyShared();
+        }
+        return call(method, argData);
     }
 
     /*!
@@ -136,7 +140,12 @@ protected:
      */
     template<typename A>
     QGrpcStreamShared stream(const QString &method, const A &arg) {
-        return stream(method, arg.serialize(serializer()));
+        bool ok = false;
+        QByteArray argData = trySerialize(arg, ok);
+        if (!ok) {
+            return QGrpcStreamShared();
+        }
+        return stream(method, argData);
     }
 
     /*!
@@ -157,8 +166,12 @@ protected:
             qProtoCritical() << nullPointerError.arg(method);
             return nullptr;
         }
-
-        return stream(method, arg.serialize(serializer()), [ret, this](const QByteArray &data) {
+        bool ok = false;
+        QByteArray argData = trySerialize(arg, ok);
+        if (!ok) {
+            return QGrpcStreamShared();
+        }
+        return stream(method, argData, [ret, this](const QByteArray &data) {
             if (!ret.isNull()) {
                 tryDeserialize(*ret, data);
             } else {
@@ -175,12 +188,6 @@ protected:
      */
     void cancel(const QString &method);
 
-    /*!
-     * \brief serializer provides assigned to client serializer
-     * \return pointer to serializer. Serializer is owned by QtProtobuf::QProtobufSerializerRegistry.
-     */
-    QAbstractProtobufSerializer *serializer() const;
-
     friend class QGrpcAsyncOperationBase;
 private:
     //!\private
@@ -199,25 +206,52 @@ private:
     template<typename R>
     QGrpcStatus tryDeserialize(R &ret, const QByteArray &retData) {
         QGrpcStatus status{QGrpcStatus::Ok};
-        try {
-            ret.deserialize(serializer(), retData);
-        } catch (std::invalid_argument &) {
-            static const QLatin1String invalidArgumentErrorMessage("Response deserialization failed invalid field found");
-            status = {QGrpcStatus::InvalidArgument, invalidArgumentErrorMessage};
-            error(status);
-            qProtoCritical() << invalidArgumentErrorMessage;
-        } catch (std::out_of_range &) {
-            static const QLatin1String outOfRangeErrorMessage("Invalid size of received buffer");
-            status = {QGrpcStatus::OutOfRange, outOfRangeErrorMessage};
-            error(status);
-            qProtoCritical() << outOfRangeErrorMessage;
-        } catch (...) {
-            status = {QGrpcStatus::Internal, QLatin1String("Unknown exception caught during deserialization")};
+        auto _serializer = serializer();
+        if (_serializer != nullptr) {
+            try {
+                ret.deserialize(_serializer.get(), retData);
+            } catch (std::invalid_argument &) {
+                static const QLatin1String invalidArgumentErrorMessage("Response deserialization failed invalid field found");
+                status = {QGrpcStatus::InvalidArgument, invalidArgumentErrorMessage};
+                error(status);
+                qProtoCritical() << invalidArgumentErrorMessage;
+            } catch (std::out_of_range &) {
+                static const QLatin1String outOfRangeErrorMessage("Invalid size of received buffer");
+                status = {QGrpcStatus::OutOfRange, outOfRangeErrorMessage};
+                error(status);
+                qProtoCritical() << outOfRangeErrorMessage;
+            } catch (...) {
+                status = {QGrpcStatus::Internal, QLatin1String("Unknown exception caught during deserialization")};
+                error(status);
+            }
+        } else {
+            status = {QGrpcStatus::Unknown, QLatin1String("Deserializing failed. Serializer is not ready")};
             error(status);
         }
         return status;
     }
 
+
+    template<typename R>
+    QByteArray trySerialize(const R &arg, bool &ok) {
+        ok = false;
+        QGrpcStatus status{QGrpcStatus::Ok};
+        auto _serializer = serializer();
+        if (_serializer == nullptr) {
+            error({QGrpcStatus::Unknown, QLatin1String("Serializing failed. Serializer is not ready")});
+            return QByteArray();
+        }
+        ok = true;
+        return arg.serialize(_serializer.get());
+    }
+
+    /*!
+     * \private
+     * \brief serializer provides assigned to client serializer
+     * \return pointer to serializer. Serializer is owned by QtProtobuf::QProtobufSerializerRegistry.
+     */
+    std::shared_ptr<QAbstractProtobufSerializer> serializer() const;
+
     Q_DISABLE_COPY_MOVE(QAbstractGrpcClient)
 
     std::unique_ptr<QAbstractGrpcClientPrivate> dPtr;

+ 10 - 17
src/grpc/qgrpcasyncoperationbase_p.h

@@ -31,7 +31,6 @@
 #include <functional>
 #include <memory>
 
-#include "qabstractgrpcchannel.h"
 #include "qabstractgrpcclient.h"
 
 #include "qtgrpcglobal.h"
@@ -41,7 +40,8 @@ namespace QtProtobuf {
 /*!
  * \ingroup QtGrpc
  * \private
- * \brief The QGrpcAsyncOperationBase class implements stream logic
+ * \brief The QGrpcAsyncOperationBase class implements common logic to
+ *        handle communication in Grpc channel.
  */
 class Q_GRPC_EXPORT QGrpcAsyncOperationBase : public QObject
 {
@@ -55,16 +55,9 @@ public:
     T read() {
         QMutexLocker locker(&m_asyncLock);
         T value;
-        try {
-            value.deserialize(static_cast<QAbstractGrpcClient*>(parent())->serializer(), m_data);
-        } catch (std::invalid_argument &) {
-            static const QLatin1String invalidArgumentErrorMessage("Response deserialization failed invalid field found");
-            error({QGrpcStatus::InvalidArgument, invalidArgumentErrorMessage});
-        } catch (std::out_of_range &) {
-            static const QLatin1String outOfRangeErrorMessage("Invalid size of received buffer");
-            error({QGrpcStatus::OutOfRange, outOfRangeErrorMessage});
-        } catch (...) {
-            error({QGrpcStatus::Internal, QLatin1String("Unknown exception caught during deserialization")});
+        auto client = static_cast<QAbstractGrpcClient*>(parent());
+        if (client) {
+            client->tryDeserialize(value, m_data);
         }
         return value;
     }
@@ -80,10 +73,12 @@ public:
         m_data = data;
     }
 
+    virtual void abort() = 0;
+
 signals:
     /*!
-     * \brief The signal is emitted when reply is ready for read. Usualy called by channel when all chunks of data
-     *        recevied
+     * \brief The signal indicates the end of communication for this call. If signal emitted by
+     *        stream this means that stream is succesfully closed either by client or server.
      */
     void finished();
 
@@ -95,12 +90,10 @@ signals:
 
 protected:
     //! \private
-    QGrpcAsyncOperationBase(const std::shared_ptr<QAbstractGrpcChannel> &channel, QAbstractGrpcClient *parent) : QObject(parent)
-      , m_channel(channel) {}
+    QGrpcAsyncOperationBase(QAbstractGrpcClient *parent) : QObject(parent) {}
     //! \private
     virtual ~QGrpcAsyncOperationBase();
 
-    std::shared_ptr<QAbstractGrpcChannel> m_channel;
 private:
     QGrpcAsyncOperationBase();
     Q_DISABLE_COPY_MOVE(QGrpcAsyncOperationBase)

+ 6 - 2
src/grpc/qgrpccallreply.cpp

@@ -31,9 +31,13 @@ using namespace QtProtobuf;
 
 void QGrpcCallReply::abort()
 {
+    auto abortFunc = [this]() {
+        this->setData({});
+        this->error({QGrpcStatus::StatusCode::Aborted, QLatin1String("Call aborted by user or timeout")});
+    };
     if (thread() != QThread::currentThread()) {
-        QMetaObject::invokeMethod(this, [this](){m_channel->abort(this);}, Qt::BlockingQueuedConnection);
+        QMetaObject::invokeMethod(this, abortFunc, Qt::BlockingQueuedConnection);
     } else {
-        m_channel->abort(this);
+        abortFunc();
     }
 }

+ 2 - 3
src/grpc/qgrpccallreply.h

@@ -29,7 +29,6 @@
 #include <QMutex>
 #include <memory>
 
-#include "qabstractgrpcchannel.h"
 #include "qabstractgrpcclient.h"
 #include "qgrpcasyncoperationbase_p.h"
 
@@ -50,7 +49,7 @@ public:
     /*!
      * \brief Aborts this reply and try to abort call in channel
      */
-    void abort();
+    void abort() override;
 
     /*!
      * \brief Subscribe to QGrpcCallReply signals
@@ -76,7 +75,7 @@ public:
 
 protected:
     //! \private
-    QGrpcCallReply(const std::shared_ptr<QAbstractGrpcChannel> &channel, QAbstractGrpcClient *parent) : QGrpcAsyncOperationBase(channel, parent)
+    QGrpcCallReply(QAbstractGrpcClient *parent) : QGrpcAsyncOperationBase(parent)
     {}
     //! \private
     ~QGrpcCallReply() = default;

+ 4 - 5
src/grpc/qgrpcstream.cpp

@@ -30,8 +30,7 @@
 
 using namespace QtProtobuf;
 
-QGrpcStream::QGrpcStream(const std::shared_ptr<QAbstractGrpcChannel> &channel, const QString &method,
-                                     const QByteArray &arg, const StreamHandler &handler, QAbstractGrpcClient *parent) : QGrpcAsyncOperationBase(channel, parent)
+QGrpcStream::QGrpcStream(const QString &method, const QByteArray &arg, const StreamHandler &handler, QAbstractGrpcClient *parent) : QGrpcAsyncOperationBase(parent)
   , m_method(method)
   , m_arg(arg)
 {
@@ -47,11 +46,11 @@ void QGrpcStream::addHandler(const StreamHandler &handler)
     }
 }
 
-void QGrpcStream::cancel()
+void QGrpcStream::abort()
 {
     if (thread() != QThread::currentThread()) {
-        QMetaObject::invokeMethod(this, [this](){m_channel->cancel(this);}, Qt::BlockingQueuedConnection);
+        QMetaObject::invokeMethod(this, &QGrpcStream::finished, Qt::BlockingQueuedConnection);
     } else {
-        m_channel->cancel(this);
+        finished();
     }
 }

+ 3 - 4
src/grpc/qgrpcstream.h

@@ -29,7 +29,6 @@
 #include <QMutex>
 #include <memory>
 
-#include "qabstractgrpcchannel.h"
 #include "qabstractgrpcclient.h"
 #include "qgrpcasyncoperationbase_p.h"
 
@@ -50,7 +49,7 @@ public:
     /*!
      * \brief Cancels this stream and try to abort call in channel
      */
-    void cancel();
+    void abort() override;
 
     /*!
      * \brief Returns method for this stream
@@ -88,8 +87,8 @@ signals:
 
 protected:
     //! \private
-    QGrpcStream(const std::shared_ptr<QAbstractGrpcChannel> &channel, const QString &method,
-                      const QByteArray &arg, const StreamHandler &handler, QAbstractGrpcClient *parent);
+    QGrpcStream(const QString &method, const QByteArray &arg,
+                const StreamHandler &handler, QAbstractGrpcClient *parent);
     //! \private
     virtual ~QGrpcStream() = default;
 

+ 2 - 0
src/grpc/qtgrpcglobal.h

@@ -64,10 +64,12 @@
 #endif //Q_PROTOBUF_IMPORT_QUICK_PLUGIN
 
 #include <memory>
+#include <functional>
 
 namespace QtProtobuf {
 class QGrpcCallReply;
 class QGrpcStream;
 using QGrpcCallReplyShared = std::shared_ptr<QGrpcCallReply>;
 using QGrpcStreamShared = std::shared_ptr<QGrpcStream>;
+using StreamHandler = std::function<void(const QByteArray&)>;
 }

+ 2 - 2
src/grpc/quick/qquickgrpcstream.cpp

@@ -41,7 +41,7 @@ QQuickGrpcStream::QQuickGrpcStream(QObject *parent) : QObject(parent)
 QQuickGrpcStream::~QQuickGrpcStream()
 {
     if (m_stream) {
-        m_stream->cancel();
+        m_stream->abort();
     }
     delete m_returnValue;
 }
@@ -50,7 +50,7 @@ QQuickGrpcStream::~QQuickGrpcStream()
 void QQuickGrpcStream::updateStream()
 {
     if (m_stream) {
-        m_stream->cancel();
+        m_stream->abort();
         m_stream.reset();
     }
 

+ 3 - 3
src/protobuf/qprotobufserializerregistry.cpp

@@ -109,9 +109,9 @@ struct QProtobufSerializerRegistryPrivateRecord final
 
     QObject* loadPluginImpl()
     {
-        if (loader == nullptr || !loader->load())
-        {
-            qProtoWarning() << "Can't load plugin from" << libPath << loader->errorString();
+        if (loader == nullptr || !loader->load()) {
+            qProtoWarning() << "Can't load plugin from" << libPath
+                            << "loader error" << (loader != nullptr ? loader->errorString() : "");
             return nullptr;
         }
         return loader->instance();

+ 7 - 7
tests/test_grpc/clienttest.cpp

@@ -277,7 +277,7 @@ TEST_P(ClientTest, StringEchoStreamAbortTest)
         result.setTestFieldString(result.testFieldString() + ret.testFieldString());
 
         if (i == 3) {
-            stream->cancel();
+            stream->abort();
             QTimer::singleShot(4000, &waiter, &QEventLoop::quit);
         }
     });
@@ -303,7 +303,7 @@ TEST_P(ClientTest, StringEchoStreamAbortByTimerTest)
     int i = 0;
     auto stream = testClient->streamTestMethodServerStream(request);
     QTimer::singleShot(3500, stream.get(), [stream]() {
-        stream->cancel();
+        stream->abort();
     });
 
     bool isFinished = false;
@@ -484,7 +484,7 @@ TEST_F(ClientTest, ClientSyncTestUnattachedChannel)
     QGrpcStatus status = testClient.testMethodStatusMessage(request, ret);
 
     ASSERT_EQ(status.code(), QGrpcStatus::Unknown);
-    ASSERT_STREQ("No channel(s) attached.", status.message().toStdString().c_str());
+    ASSERT_STREQ("Serializing failed. Serializer is not ready", status.message().toStdString().c_str());
     delete ret;
 }
 
@@ -506,7 +506,7 @@ TEST_F(ClientTest, ClientSyncTestUnattachedChannelSignal)
     waiter.exec();
 
     ASSERT_EQ(asyncStatus, QGrpcStatus::Unknown);
-    ASSERT_STREQ("No channel(s) attached.", asyncStatus.message().toStdString().c_str());
+    ASSERT_STREQ("Serializing failed. Serializer is not ready", asyncStatus.message().toStdString().c_str());
     delete ret;
 }
 
@@ -621,7 +621,7 @@ TEST_P(ClientTest, MultipleStreamsCancelTest)
         isFinishedNext = true;
     });
 
-    streamNext->cancel();
+    streamNext->abort();
 
     ASSERT_TRUE(isFinished);
     ASSERT_TRUE(isFinishedNext);
@@ -643,7 +643,7 @@ TEST_P(ClientTest, MultipleStreamsCancelTest)
         isFinishedNext = true;
     });
 
-    stream->cancel();
+    stream->abort();
 
     ASSERT_TRUE(isFinished);
     ASSERT_TRUE(isFinishedNext);
@@ -801,7 +801,7 @@ TEST_P(ClientTest, StreamCancelWhileErrorTimeoutTest)
         ok = true;
         waiter.quit();
     });
-    stream->cancel();
+    stream->abort();
     stream.reset();
 
     QTimer::singleShot(5000, &waiter, &QEventLoop::quit);