|
@@ -41,6 +41,8 @@
|
|
#include "qprotobufserializerregistry_p.h"
|
|
#include "qprotobufserializerregistry_p.h"
|
|
#include "qtprotobuflogging.h"
|
|
#include "qtprotobuflogging.h"
|
|
|
|
|
|
|
|
+#include <qglobal.h>
|
|
|
|
+
|
|
using namespace QtProtobuf;
|
|
using namespace QtProtobuf;
|
|
|
|
|
|
namespace {
|
|
namespace {
|
|
@@ -89,7 +91,7 @@ const char *AcceptEncodingHeader = "accept-encoding";
|
|
const char *TEHeader = "te";
|
|
const char *TEHeader = "te";
|
|
const char *GrpcStatusHeader = "grpc-status";
|
|
const char *GrpcStatusHeader = "grpc-status";
|
|
const char *GrpcStatusMessage = "grpc-message";
|
|
const char *GrpcStatusMessage = "grpc-message";
|
|
-const int GRpcMessageSizeHeaderSize = 5;
|
|
|
|
|
|
+const int GrpcMessageSizeHeaderSize = 5;
|
|
}
|
|
}
|
|
|
|
|
|
namespace QtProtobuf {
|
|
namespace QtProtobuf {
|
|
@@ -124,7 +126,7 @@ struct QGrpcHttp2ChannelPrivate {
|
|
|
|
|
|
request.setAttribute(QNetworkRequest::Http2DirectAttribute, true);
|
|
request.setAttribute(QNetworkRequest::Http2DirectAttribute, true);
|
|
|
|
|
|
- QByteArray msg(GRpcMessageSizeHeaderSize, '\0');
|
|
|
|
|
|
+ QByteArray msg(GrpcMessageSizeHeaderSize, '\0');
|
|
*(int *)(msg.data() + 1) = qToBigEndian(args.size());
|
|
*(int *)(msg.data() + 1) = qToBigEndian(args.size());
|
|
msg += args;
|
|
msg += args;
|
|
qProtoDebug() << "SEND: " << msg.size();
|
|
qProtoDebug() << "SEND: " << msg.size();
|
|
@@ -167,7 +169,7 @@ struct QGrpcHttp2ChannelPrivate {
|
|
}
|
|
}
|
|
|
|
|
|
//Message size doesn't matter for now
|
|
//Message size doesn't matter for now
|
|
- return networkReply->readAll().mid(GRpcMessageSizeHeaderSize);
|
|
|
|
|
|
+ return networkReply->readAll().mid(GrpcMessageSizeHeaderSize);
|
|
}
|
|
}
|
|
|
|
|
|
QGrpcHttp2ChannelPrivate(const QUrl &_url, const AbstractCredentials &_credentials)
|
|
QGrpcHttp2ChannelPrivate(const QUrl &_url, const AbstractCredentials &_credentials)
|
|
@@ -185,7 +187,7 @@ struct QGrpcHttp2ChannelPrivate {
|
|
}
|
|
}
|
|
|
|
|
|
static int getExpectedDataSize(const QByteArray &container) {
|
|
static int getExpectedDataSize(const QByteArray &container) {
|
|
- return qFromBigEndian(*(int *)(container.data() + 1)) + GRpcMessageSizeHeaderSize;
|
|
|
|
|
|
+ return qFromBigEndian(*(int *)(container.data() + 1)) + GrpcMessageSizeHeaderSize;
|
|
}
|
|
}
|
|
|
|
|
|
QObject lambdaContext;
|
|
QObject lambdaContext;
|
|
@@ -275,16 +277,16 @@ void QGrpcHttp2Channel::subscribe(const QString &method, const QString &service,
|
|
qProtoDebug() << "Proceed chunk: " << data.size() << " dataContainer: " << dataContainer.container.size() << " capacity: " << dataContainer.expectedSize;
|
|
qProtoDebug() << "Proceed chunk: " << data.size() << " dataContainer: " << dataContainer.container.size() << " capacity: " << dataContainer.expectedSize;
|
|
while (dataContainer.container.size() >= dataContainer.expectedSize) {
|
|
while (dataContainer.container.size() >= dataContainer.expectedSize) {
|
|
qProtoDebug() << "Full data received: " << data.size() << " dataContainer: " << dataContainer.container.size() << " capacity: " << dataContainer.expectedSize;
|
|
qProtoDebug() << "Full data received: " << data.size() << " dataContainer: " << dataContainer.container.size() << " capacity: " << dataContainer.expectedSize;
|
|
- handler(dataContainer.container.mid(GRpcMessageSizeHeaderSize, dataContainer.expectedSize - GRpcMessageSizeHeaderSize));
|
|
|
|
|
|
+ handler(dataContainer.container.mid(GrpcMessageSizeHeaderSize, dataContainer.expectedSize - GrpcMessageSizeHeaderSize));
|
|
dataContainer.container.remove(0, dataContainer.expectedSize);
|
|
dataContainer.container.remove(0, dataContainer.expectedSize);
|
|
- if (dataContainer.container.size() > GRpcMessageSizeHeaderSize) {
|
|
|
|
|
|
+ if (dataContainer.container.size() > GrpcMessageSizeHeaderSize) {
|
|
dataContainer.expectedSize = QGrpcHttp2ChannelPrivate::getExpectedDataSize(dataContainer.container);
|
|
dataContainer.expectedSize = QGrpcHttp2ChannelPrivate::getExpectedDataSize(dataContainer.container);
|
|
- } else {
|
|
|
|
|
|
+ } else if (dataContainer.container.size() > 0) {
|
|
qProtoWarning() << "Invalid container size received, size header is less than 5 bytes";
|
|
qProtoWarning() << "Invalid container size received, size header is less than 5 bytes";
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- if (dataContainer.container.size() < GRpcMessageSizeHeaderSize) {
|
|
|
|
|
|
+ if (dataContainer.container.size() < GrpcMessageSizeHeaderSize) {
|
|
d_ptr->activeStreamReplies.erase(replyIt);
|
|
d_ptr->activeStreamReplies.erase(replyIt);
|
|
}
|
|
}
|
|
});
|
|
});
|
|
@@ -295,11 +297,26 @@ void QGrpcHttp2Channel::subscribe(const QString &method, const QString &service,
|
|
QGrpcHttp2ChannelPrivate::abortNetworkReply(networkReply);
|
|
QGrpcHttp2ChannelPrivate::abortNetworkReply(networkReply);
|
|
});
|
|
});
|
|
|
|
|
|
- QObject::connect(networkReply, &QNetworkReply::finished, &(d_ptr->lambdaContext), [networkReply, connection, this]() {
|
|
|
|
|
|
+ QObject::connect(networkReply, &QNetworkReply::finished, &(d_ptr->lambdaContext), [method, service, args, client, handler, networkReply, connection, this]() {
|
|
|
|
+ QString errorString = networkReply->errorString();
|
|
|
|
+ QNetworkReply::NetworkError networkError = networkReply->error();
|
|
|
|
+
|
|
d_ptr->activeStreamReplies.erase(networkReply);
|
|
d_ptr->activeStreamReplies.erase(networkReply);
|
|
- //TODO: implement error handling and subscription recovery here
|
|
|
|
QObject::disconnect(connection);
|
|
QObject::disconnect(connection);
|
|
QGrpcHttp2ChannelPrivate::abortNetworkReply(networkReply);
|
|
QGrpcHttp2ChannelPrivate::abortNetworkReply(networkReply);
|
|
|
|
+
|
|
|
|
+ qProtoWarning() << method << "call" << service << "subscription finished: " << errorString;
|
|
|
|
+ switch(networkError) {
|
|
|
|
+ case QNetworkReply::RemoteHostClosedError:
|
|
|
|
+ subscribe(method, service, args, client, handler);
|
|
|
|
+ break;
|
|
|
|
+ case QNetworkReply::NoError:
|
|
|
|
+ break;
|
|
|
|
+ default:
|
|
|
|
+ client->error(QGrpcStatus{StatusCodeMap.at(networkError), QString("%1 call %2 subscription failed: %3").arg(service).arg(method).arg(errorString)});
|
|
|
|
+ subscribe(method, service, args, client, handler);
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|