Browse Source

Remove the linking between QGrpcAsyncOperationBase and QAbstractGrpcChannel

- Remove redundant logic in QAbstractGrpcChannel related to Grpc
  call/stream handling. This allows to remove redundand linking
  between QGrpcAsyncOperationBase and QAbstractGrpcChannel and
  simplify the implementation.
- Unify the method that cancels call/stream, the name is abort and
  QGrpcAsyncOperationBase specifies it as pure virtual.
- Adjust tests.
Alexey Edelev 3 years ago
parent
commit
bc0319636d

+ 1 - 1
examples/simplechat/simplechatengine.cpp

@@ -76,7 +76,7 @@ void SimpleChatEngine::login(const QString &name, const QString &password)
     QtProtobuf::QGrpcStreamShared stream = m_client->streamMessageList(None());
     QtProtobuf::QGrpcStreamShared stream = m_client->streamMessageList(None());
     QObject::connect(stream.get(), &QtProtobuf::QGrpcStream::error, this, [stream] {
     QObject::connect(stream.get(), &QtProtobuf::QGrpcStream::error, this, [stream] {
         qCritical() << "Stream error, cancel";
         qCritical() << "Stream error, cancel";
-        stream->cancel();
+        stream->abort();
     });
     });
     QObject::connect(stream.get(), &QtProtobuf::QGrpcStream::messageReceived, this, [this, name, stream]() {
     QObject::connect(stream.get(), &QtProtobuf::QGrpcStream::messageReceived, this, [this, name, stream]() {
         if (m_userName != name) {
         if (m_userName != name) {

+ 1 - 18
src/grpc/qabstractgrpcchannel.cpp

@@ -38,26 +38,9 @@ struct QAbstractGrpcChannelPrivate {
     const QThread *thread;
     const QThread *thread;
 };
 };
 
 
-QAbstractGrpcChannel::QAbstractGrpcChannel() : dPtr(new QAbstractGrpcChannelPrivate)
-{
-
-}
-
+QAbstractGrpcChannel::QAbstractGrpcChannel() : dPtr(new QAbstractGrpcChannelPrivate) {}
 QAbstractGrpcChannel::~QAbstractGrpcChannel() = default;
 QAbstractGrpcChannel::~QAbstractGrpcChannel() = default;
 
 
-void QAbstractGrpcChannel::abort(QGrpcCallReply *reply)
-{
-    assert(reply != nullptr);
-    reply->setData({});
-    reply->error({QGrpcStatus::StatusCode::Aborted, QLatin1String("Call aborted by user or timeout")});
-}
-
-void QAbstractGrpcChannel::cancel(QGrpcStream *stream)
-{
-    assert(stream != nullptr);
-    stream->finished();
-}
-
 const QThread *QAbstractGrpcChannel::thread() const
 const QThread *QAbstractGrpcChannel::thread() const
 {
 {
     return dPtr->thread;
     return dPtr->thread;

+ 0 - 19
src/grpc/qabstractgrpcchannel.h

@@ -38,8 +38,6 @@ class QThread;
 
 
 namespace QtProtobuf {
 namespace QtProtobuf {
 
 
-class QGrpcCallReply;
-class QGrpcStream;
 class QAbstractGrpcClient;
 class QAbstractGrpcClient;
 class QAbstractProtobufSerializer;
 class QAbstractProtobufSerializer;
 struct QAbstractGrpcChannelPrivate;
 struct QAbstractGrpcChannelPrivate;
@@ -96,23 +94,6 @@ protected:
     QAbstractGrpcChannel();
     QAbstractGrpcChannel();
     //! \private
     //! \private
     virtual ~QAbstractGrpcChannel();
     virtual ~QAbstractGrpcChannel();
-
-    /*!
-     * \private
-     * \brief Aborts async call for given \p reply
-     * \param[in] reply returned by asynchronous QAbstractGrpcChannel::call() method
-     */
-    virtual void abort(QGrpcCallReply *reply);
-
-    /*!
-     * \private
-     * \brief Cancels \p stream
-     * \param[in] stream returned by QAbstractGrpcChannel::stream() method
-     */
-    virtual void cancel(QGrpcStream *stream);
-
-    friend class QGrpcCallReply;
-    friend class QGrpcStream;
 private:
 private:
     Q_DISABLE_COPY(QAbstractGrpcChannel)
     Q_DISABLE_COPY(QAbstractGrpcChannel)
     std::unique_ptr<QAbstractGrpcChannelPrivate> dPtr;
     std::unique_ptr<QAbstractGrpcChannelPrivate> dPtr;

+ 4 - 4
src/grpc/qabstractgrpcclient.cpp

@@ -64,11 +64,11 @@ void QAbstractGrpcClient::attachChannel(const std::shared_ptr<QAbstractGrpcChann
         throw std::runtime_error("Call from another thread");
         throw std::runtime_error("Call from another thread");
     }
     }
     for (auto stream : dPtr->activeStreams) {
     for (auto stream : dPtr->activeStreams) {
-        stream->cancel();
+        stream->abort();
     }
     }
     dPtr->channel = channel;
     dPtr->channel = channel;
     for (auto stream : dPtr->activeStreams) {
     for (auto stream : dPtr->activeStreams) {
-        stream->cancel();
+        stream->abort();
     }
     }
 }
 }
 
 
@@ -105,7 +105,7 @@ QGrpcCallReplyShared QAbstractGrpcClient::call(const QString &method, const QByt
                                       return call(method, arg);
                                       return call(method, arg);
                                   }, Qt::BlockingQueuedConnection, &reply);
                                   }, Qt::BlockingQueuedConnection, &reply);
     } else if (dPtr->channel) {
     } else if (dPtr->channel) {
-        reply.reset(new QGrpcCallReply(dPtr->channel, this), [](QGrpcCallReply *reply) { reply->deleteLater(); });
+        reply.reset(new QGrpcCallReply(this), [](QGrpcCallReply *reply) { reply->deleteLater(); });
 
 
         auto errorConnection = std::make_shared<QMetaObject::Connection>();
         auto errorConnection = std::make_shared<QMetaObject::Connection>();
         auto finishedConnection = std::make_shared<QMetaObject::Connection>();
         auto finishedConnection = std::make_shared<QMetaObject::Connection>();
@@ -140,7 +140,7 @@ QGrpcStreamShared QAbstractGrpcClient::stream(const QString &method, const QByte
                                       return stream(method, arg, handler);
                                       return stream(method, arg, handler);
                                   }, Qt::BlockingQueuedConnection, &grpcStream);
                                   }, Qt::BlockingQueuedConnection, &grpcStream);
     } else if (dPtr->channel) {
     } else if (dPtr->channel) {
-        grpcStream.reset(new QGrpcStream(dPtr->channel, method, arg, handler, this), [](QGrpcStream *stream) { stream->deleteLater(); });
+        grpcStream.reset(new QGrpcStream(method, arg, handler, this), [](QGrpcStream *stream) { stream->deleteLater(); });
 
 
         auto it = std::find_if(std::begin(dPtr->activeStreams), std::end(dPtr->activeStreams), [grpcStream](const QGrpcStreamShared &activeStream) {
         auto it = std::find_if(std::begin(dPtr->activeStreams), std::end(dPtr->activeStreams), [grpcStream](const QGrpcStreamShared &activeStream) {
            return *activeStream == *grpcStream;
            return *activeStream == *grpcStream;

+ 0 - 7
src/grpc/qabstractgrpcclient.h

@@ -46,17 +46,10 @@
  */
  */
 namespace QtProtobuf {
 namespace QtProtobuf {
 
 
-class QGrpcCallReply;
-class QGrpcStream;
 class QGrpcAsyncOperationBase;
 class QGrpcAsyncOperationBase;
 class QAbstractGrpcChannel;
 class QAbstractGrpcChannel;
 class QAbstractGrpcClientPrivate;
 class QAbstractGrpcClientPrivate;
 
 
-/*!
- * \private
- */
-using StreamHandler = std::function<void(const QByteArray&)>;
-
 /*!
 /*!
  * \ingroup QtGrpc
  * \ingroup QtGrpc
  * \brief The QAbstractGrpcClient class is bridge between gRPC clients and channels. QAbstractGrpcClient provides set of
  * \brief The QAbstractGrpcClient class is bridge between gRPC clients and channels. QAbstractGrpcClient provides set of

+ 5 - 6
src/grpc/qgrpcasyncoperationbase_p.h

@@ -31,7 +31,6 @@
 #include <functional>
 #include <functional>
 #include <memory>
 #include <memory>
 
 
-#include "qabstractgrpcchannel.h"
 #include "qabstractgrpcclient.h"
 #include "qabstractgrpcclient.h"
 
 
 #include "qtgrpcglobal.h"
 #include "qtgrpcglobal.h"
@@ -74,10 +73,12 @@ public:
         m_data = data;
         m_data = data;
     }
     }
 
 
+    virtual void abort() = 0;
+
 signals:
 signals:
     /*!
     /*!
-     * \brief The signal is emitted when reply is ready for read. Usualy called by channel when all chunks of data
-     *        recevied
+     * \brief The signal indicates the end of communication for this call. If signal emitted by
+     *        stream this means that stream is succesfully closed either by client or server.
      */
      */
     void finished();
     void finished();
 
 
@@ -89,12 +90,10 @@ signals:
 
 
 protected:
 protected:
     //! \private
     //! \private
-    QGrpcAsyncOperationBase(const std::shared_ptr<QAbstractGrpcChannel> &channel, QAbstractGrpcClient *parent) : QObject(parent)
-      , m_channel(channel) {}
+    QGrpcAsyncOperationBase(QAbstractGrpcClient *parent) : QObject(parent) {}
     //! \private
     //! \private
     virtual ~QGrpcAsyncOperationBase();
     virtual ~QGrpcAsyncOperationBase();
 
 
-    std::shared_ptr<QAbstractGrpcChannel> m_channel;
 private:
 private:
     QGrpcAsyncOperationBase();
     QGrpcAsyncOperationBase();
     Q_DISABLE_COPY_MOVE(QGrpcAsyncOperationBase)
     Q_DISABLE_COPY_MOVE(QGrpcAsyncOperationBase)

+ 6 - 2
src/grpc/qgrpccallreply.cpp

@@ -31,9 +31,13 @@ using namespace QtProtobuf;
 
 
 void QGrpcCallReply::abort()
 void QGrpcCallReply::abort()
 {
 {
+    auto abortFunc = [this]() {
+        this->setData({});
+        this->error({QGrpcStatus::StatusCode::Aborted, QLatin1String("Call aborted by user or timeout")});
+    };
     if (thread() != QThread::currentThread()) {
     if (thread() != QThread::currentThread()) {
-        QMetaObject::invokeMethod(this, [this](){m_channel->abort(this);}, Qt::BlockingQueuedConnection);
+        QMetaObject::invokeMethod(this, abortFunc, Qt::BlockingQueuedConnection);
     } else {
     } else {
-        m_channel->abort(this);
+        abortFunc();
     }
     }
 }
 }

+ 2 - 3
src/grpc/qgrpccallreply.h

@@ -29,7 +29,6 @@
 #include <QMutex>
 #include <QMutex>
 #include <memory>
 #include <memory>
 
 
-#include "qabstractgrpcchannel.h"
 #include "qabstractgrpcclient.h"
 #include "qabstractgrpcclient.h"
 #include "qgrpcasyncoperationbase_p.h"
 #include "qgrpcasyncoperationbase_p.h"
 
 
@@ -50,7 +49,7 @@ public:
     /*!
     /*!
      * \brief Aborts this reply and try to abort call in channel
      * \brief Aborts this reply and try to abort call in channel
      */
      */
-    void abort();
+    void abort() override;
 
 
     /*!
     /*!
      * \brief Subscribe to QGrpcCallReply signals
      * \brief Subscribe to QGrpcCallReply signals
@@ -76,7 +75,7 @@ public:
 
 
 protected:
 protected:
     //! \private
     //! \private
-    QGrpcCallReply(const std::shared_ptr<QAbstractGrpcChannel> &channel, QAbstractGrpcClient *parent) : QGrpcAsyncOperationBase(channel, parent)
+    QGrpcCallReply(QAbstractGrpcClient *parent) : QGrpcAsyncOperationBase(parent)
     {}
     {}
     //! \private
     //! \private
     ~QGrpcCallReply() = default;
     ~QGrpcCallReply() = default;

+ 4 - 5
src/grpc/qgrpcstream.cpp

@@ -30,8 +30,7 @@
 
 
 using namespace QtProtobuf;
 using namespace QtProtobuf;
 
 
-QGrpcStream::QGrpcStream(const std::shared_ptr<QAbstractGrpcChannel> &channel, const QString &method,
-                                     const QByteArray &arg, const StreamHandler &handler, QAbstractGrpcClient *parent) : QGrpcAsyncOperationBase(channel, parent)
+QGrpcStream::QGrpcStream(const QString &method, const QByteArray &arg, const StreamHandler &handler, QAbstractGrpcClient *parent) : QGrpcAsyncOperationBase(parent)
   , m_method(method)
   , m_method(method)
   , m_arg(arg)
   , m_arg(arg)
 {
 {
@@ -47,11 +46,11 @@ void QGrpcStream::addHandler(const StreamHandler &handler)
     }
     }
 }
 }
 
 
-void QGrpcStream::cancel()
+void QGrpcStream::abort()
 {
 {
     if (thread() != QThread::currentThread()) {
     if (thread() != QThread::currentThread()) {
-        QMetaObject::invokeMethod(this, [this](){m_channel->cancel(this);}, Qt::BlockingQueuedConnection);
+        QMetaObject::invokeMethod(this, &QGrpcStream::finished, Qt::BlockingQueuedConnection);
     } else {
     } else {
-        m_channel->cancel(this);
+        finished();
     }
     }
 }
 }

+ 3 - 4
src/grpc/qgrpcstream.h

@@ -29,7 +29,6 @@
 #include <QMutex>
 #include <QMutex>
 #include <memory>
 #include <memory>
 
 
-#include "qabstractgrpcchannel.h"
 #include "qabstractgrpcclient.h"
 #include "qabstractgrpcclient.h"
 #include "qgrpcasyncoperationbase_p.h"
 #include "qgrpcasyncoperationbase_p.h"
 
 
@@ -50,7 +49,7 @@ public:
     /*!
     /*!
      * \brief Cancels this stream and try to abort call in channel
      * \brief Cancels this stream and try to abort call in channel
      */
      */
-    void cancel();
+    void abort() override;
 
 
     /*!
     /*!
      * \brief Returns method for this stream
      * \brief Returns method for this stream
@@ -88,8 +87,8 @@ signals:
 
 
 protected:
 protected:
     //! \private
     //! \private
-    QGrpcStream(const std::shared_ptr<QAbstractGrpcChannel> &channel, const QString &method,
-                      const QByteArray &arg, const StreamHandler &handler, QAbstractGrpcClient *parent);
+    QGrpcStream(const QString &method, const QByteArray &arg,
+                const StreamHandler &handler, QAbstractGrpcClient *parent);
     //! \private
     //! \private
     virtual ~QGrpcStream() = default;
     virtual ~QGrpcStream() = default;
 
 

+ 2 - 0
src/grpc/qtgrpcglobal.h

@@ -64,10 +64,12 @@
 #endif //Q_PROTOBUF_IMPORT_QUICK_PLUGIN
 #endif //Q_PROTOBUF_IMPORT_QUICK_PLUGIN
 
 
 #include <memory>
 #include <memory>
+#include <functional>
 
 
 namespace QtProtobuf {
 namespace QtProtobuf {
 class QGrpcCallReply;
 class QGrpcCallReply;
 class QGrpcStream;
 class QGrpcStream;
 using QGrpcCallReplyShared = std::shared_ptr<QGrpcCallReply>;
 using QGrpcCallReplyShared = std::shared_ptr<QGrpcCallReply>;
 using QGrpcStreamShared = std::shared_ptr<QGrpcStream>;
 using QGrpcStreamShared = std::shared_ptr<QGrpcStream>;
+using StreamHandler = std::function<void(const QByteArray&)>;
 }
 }

+ 2 - 2
src/grpc/quick/qquickgrpcstream.cpp

@@ -41,7 +41,7 @@ QQuickGrpcStream::QQuickGrpcStream(QObject *parent) : QObject(parent)
 QQuickGrpcStream::~QQuickGrpcStream()
 QQuickGrpcStream::~QQuickGrpcStream()
 {
 {
     if (m_stream) {
     if (m_stream) {
-        m_stream->cancel();
+        m_stream->abort();
     }
     }
     delete m_returnValue;
     delete m_returnValue;
 }
 }
@@ -50,7 +50,7 @@ QQuickGrpcStream::~QQuickGrpcStream()
 void QQuickGrpcStream::updateStream()
 void QQuickGrpcStream::updateStream()
 {
 {
     if (m_stream) {
     if (m_stream) {
-        m_stream->cancel();
+        m_stream->abort();
         m_stream.reset();
         m_stream.reset();
     }
     }
 
 

+ 5 - 5
tests/test_grpc/clienttest.cpp

@@ -277,7 +277,7 @@ TEST_P(ClientTest, StringEchoStreamAbortTest)
         result.setTestFieldString(result.testFieldString() + ret.testFieldString());
         result.setTestFieldString(result.testFieldString() + ret.testFieldString());
 
 
         if (i == 3) {
         if (i == 3) {
-            stream->cancel();
+            stream->abort();
             QTimer::singleShot(4000, &waiter, &QEventLoop::quit);
             QTimer::singleShot(4000, &waiter, &QEventLoop::quit);
         }
         }
     });
     });
@@ -303,7 +303,7 @@ TEST_P(ClientTest, StringEchoStreamAbortByTimerTest)
     int i = 0;
     int i = 0;
     auto stream = testClient->streamTestMethodServerStream(request);
     auto stream = testClient->streamTestMethodServerStream(request);
     QTimer::singleShot(3500, stream.get(), [stream]() {
     QTimer::singleShot(3500, stream.get(), [stream]() {
-        stream->cancel();
+        stream->abort();
     });
     });
 
 
     bool isFinished = false;
     bool isFinished = false;
@@ -621,7 +621,7 @@ TEST_P(ClientTest, MultipleStreamsCancelTest)
         isFinishedNext = true;
         isFinishedNext = true;
     });
     });
 
 
-    streamNext->cancel();
+    streamNext->abort();
 
 
     ASSERT_TRUE(isFinished);
     ASSERT_TRUE(isFinished);
     ASSERT_TRUE(isFinishedNext);
     ASSERT_TRUE(isFinishedNext);
@@ -643,7 +643,7 @@ TEST_P(ClientTest, MultipleStreamsCancelTest)
         isFinishedNext = true;
         isFinishedNext = true;
     });
     });
 
 
-    stream->cancel();
+    stream->abort();
 
 
     ASSERT_TRUE(isFinished);
     ASSERT_TRUE(isFinished);
     ASSERT_TRUE(isFinishedNext);
     ASSERT_TRUE(isFinishedNext);
@@ -801,7 +801,7 @@ TEST_P(ClientTest, StreamCancelWhileErrorTimeoutTest)
         ok = true;
         ok = true;
         waiter.quit();
         waiter.quit();
     });
     });
-    stream->cancel();
+    stream->abort();
     stream.reset();
     stream.reset();
 
 
     QTimer::singleShot(5000, &waiter, &QEventLoop::quit);
     QTimer::singleShot(5000, &waiter, &QEventLoop::quit);