mirror of
https://github.com/telegramdesktop/tdesktop
synced 2025-09-04 16:35:44 +00:00
Use explicit fields for sent container ids wrap.
This commit is contained in:
@@ -85,11 +85,15 @@ using namespace details;
|
||||
: TemporaryKeyType::Regular;
|
||||
}
|
||||
|
||||
void wrapInvokeAfter(SecureRequest &to, const SecureRequest &from, const RequestMap &haveSent, int32 skipBeforeRequest = 0) {
|
||||
void WrapInvokeAfter(
|
||||
SerializedRequest &to,
|
||||
const SerializedRequest &from,
|
||||
const base::flat_map<mtpMsgId, SerializedRequest> &haveSent,
|
||||
int32 skipBeforeRequest = 0) {
|
||||
const auto afterId = *(mtpMsgId*)(from->after->data() + 4);
|
||||
const auto i = afterId ? haveSent.constFind(afterId) : haveSent.cend();
|
||||
const auto i = afterId ? haveSent.find(afterId) : haveSent.end();
|
||||
int32 size = to->size(), lenInInts = (tl::count_length(from) >> 2), headlen = 4, fulllen = headlen + lenInInts;
|
||||
if (i == haveSent.constEnd()) { // no invoke after or such msg was not sent or was completed recently
|
||||
if (i == haveSent.end()) { // no invoke after or such msg was not sent or was completed recently
|
||||
to->resize(size + fulllen + skipBeforeRequest);
|
||||
if (skipBeforeRequest) {
|
||||
memcpy(to->data() + size, from->constData() + 4, headlen * sizeof(mtpPrime));
|
||||
@@ -229,41 +233,41 @@ int16 ConnectionPrivate::getProtocolDcId() const {
|
||||
}
|
||||
|
||||
void ConnectionPrivate::checkSentRequests() {
|
||||
QVector<mtpMsgId> removingIds; // remove very old (10 minutes) containers and resend requests
|
||||
// Remove very old (10 minutes) containers and resend requests.
|
||||
auto removingIds = std::vector<mtpMsgId>();
|
||||
auto requesting = false;
|
||||
{
|
||||
QReadLocker locker(_sessionData->haveSentMutex());
|
||||
auto &haveSent = _sessionData->haveSentMap();
|
||||
const auto haveSentCount = haveSent.size();
|
||||
auto ms = crl::now();
|
||||
for (auto i = haveSent.begin(), e = haveSent.end(); i != e; ++i) {
|
||||
auto &req = i.value();
|
||||
if (req->msDate > 0) {
|
||||
if (req->msDate + kCheckSentRequestTimeout < ms) {
|
||||
// Need to check state.
|
||||
req->msDate = ms;
|
||||
if (_stateRequestData.emplace(i.key()).second) {
|
||||
requesting = true;
|
||||
}
|
||||
auto now = crl::now();
|
||||
for (const auto &[msgId, request] : haveSent) {
|
||||
if (request.isStateRequest()) {
|
||||
continue;
|
||||
} else if (request.isSentContainer()) {
|
||||
if (base::unixtime::now()
|
||||
> int32(msgId >> 32) + kContainerLives) {
|
||||
removingIds.push_back(msgId);
|
||||
}
|
||||
} else if (request->lastSentTime + kCheckSentRequestTimeout
|
||||
< now) {
|
||||
// Need to check state.
|
||||
request->lastSentTime = now;
|
||||
if (_stateRequestData.emplace(msgId).second) {
|
||||
requesting = true;
|
||||
}
|
||||
} else if (base::unixtime::now()
|
||||
> int32(i.key() >> 32) + kContainerLives) {
|
||||
removingIds.reserve(haveSentCount);
|
||||
removingIds.push_back(i.key());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (requesting) {
|
||||
_sessionData->queueSendAnything(kSendStateRequestWaiting);
|
||||
}
|
||||
if (!removingIds.isEmpty()) {
|
||||
if (!removingIds.empty()) {
|
||||
QWriteLocker locker(_sessionData->haveSentMutex());
|
||||
auto &haveSent = _sessionData->haveSentMap();
|
||||
for (uint32 i = 0, l = removingIds.size(); i < l; ++i) {
|
||||
auto j = haveSent.find(removingIds[i]);
|
||||
if (j != haveSent.cend()) {
|
||||
Assert(!j.value()->requestId);
|
||||
haveSent.erase(j);
|
||||
for (const auto msgId : removingIds) {
|
||||
if (const auto removed = haveSent.take(msgId)) {
|
||||
Assert(!(*removed)->requestId);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -436,7 +440,7 @@ bool ConnectionPrivate::markSessionAsStarted() {
|
||||
}
|
||||
|
||||
mtpMsgId ConnectionPrivate::prepareToSend(
|
||||
SecureRequest &request,
|
||||
SerializedRequest &request,
|
||||
mtpMsgId currentLastId,
|
||||
bool forceNewMsgId) {
|
||||
Expects(request->size() > 8);
|
||||
@@ -460,7 +464,7 @@ mtpMsgId ConnectionPrivate::prepareToSend(
|
||||
return currentLastId;
|
||||
}
|
||||
|
||||
mtpMsgId ConnectionPrivate::replaceMsgId(SecureRequest &request, mtpMsgId newId) {
|
||||
mtpMsgId ConnectionPrivate::replaceMsgId(SerializedRequest &request, mtpMsgId newId) {
|
||||
Expects(request->size() > 8);
|
||||
|
||||
const auto oldMsgId = request.getMsgId();
|
||||
@@ -472,11 +476,14 @@ mtpMsgId ConnectionPrivate::replaceMsgId(SecureRequest &request, mtpMsgId newId)
|
||||
|
||||
while (_resendingIds.contains(newId)
|
||||
|| _ackedIds.contains(newId)
|
||||
|| haveSent.constFind(newId) != haveSent.cend()) {
|
||||
|| haveSent.contains(newId)) {
|
||||
newId = base::unixtime::mtproto_msg_id();
|
||||
}
|
||||
|
||||
MTP_LOG(_shiftedDcId, ("[r%1] msg_id %2 -> %3").arg(request->requestId).arg(oldMsgId).arg(newId));
|
||||
MTP_LOG(_shiftedDcId, ("[r%1] msg_id %2 -> %3"
|
||||
).arg(request->requestId
|
||||
).arg(oldMsgId
|
||||
).arg(newId));
|
||||
|
||||
const auto i = _resendingIds.find(oldMsgId);
|
||||
if (i != _resendingIds.end()) {
|
||||
@@ -486,24 +493,23 @@ mtpMsgId ConnectionPrivate::replaceMsgId(SecureRequest &request, mtpMsgId newId)
|
||||
}
|
||||
|
||||
const auto j = _ackedIds.find(oldMsgId);
|
||||
if (j != _ackedIds.cend()) {
|
||||
if (j != _ackedIds.end()) {
|
||||
const auto requestId = j->second;
|
||||
_ackedIds.erase(j);
|
||||
_ackedIds.emplace(newId, requestId);
|
||||
}
|
||||
|
||||
const auto k = haveSent.find(oldMsgId);
|
||||
if (k != haveSent.cend()) {
|
||||
const auto req = k.value();
|
||||
if (k != haveSent.end()) {
|
||||
const auto request = k->second;
|
||||
haveSent.erase(k);
|
||||
haveSent.insert(newId, req);
|
||||
haveSent.emplace(newId, request);
|
||||
}
|
||||
|
||||
for (auto l = haveSent.begin(); l != haveSent.cend(); ++l) {
|
||||
const auto req = l.value();
|
||||
if (req.isSentContainer()) {
|
||||
const auto ids = (mtpMsgId *)(req->data() + 8);
|
||||
for (uint32 i = 0, l = (req->size() - 8) >> 1; i < l; ++i) {
|
||||
for (const auto &[requestId, sent] : haveSent) {
|
||||
if (sent.isSentContainer()) {
|
||||
const auto ids = (mtpMsgId *)(sent->data() + 8);
|
||||
for (uint32 i = 0, l = (sent->size() - 8) >> 1; i < l; ++i) {
|
||||
if (ids[i] == oldMsgId) {
|
||||
ids[i] = newId;
|
||||
}
|
||||
@@ -517,11 +523,11 @@ mtpMsgId ConnectionPrivate::replaceMsgId(SecureRequest &request, mtpMsgId newId)
|
||||
}
|
||||
|
||||
mtpMsgId ConnectionPrivate::placeToContainer(
|
||||
SecureRequest &toSendRequest,
|
||||
SerializedRequest &toSendRequest,
|
||||
mtpMsgId &bigMsgId,
|
||||
bool forceNewMsgId,
|
||||
mtpMsgId *&haveSentArr,
|
||||
SecureRequest &req) {
|
||||
SerializedRequest &req) {
|
||||
const auto msgId = prepareToSend(req, bigMsgId, forceNewMsgId);
|
||||
if (msgId >= bigMsgId) {
|
||||
bigMsgId = base::unixtime::mtproto_msg_id();
|
||||
@@ -560,28 +566,28 @@ void ConnectionPrivate::tryToSend() {
|
||||
_keyCreator->restartBinder();
|
||||
}
|
||||
|
||||
auto pingRequest = SecureRequest();
|
||||
auto ackRequest = SecureRequest();
|
||||
auto resendRequest = SecureRequest();
|
||||
auto stateRequest = SecureRequest();
|
||||
auto httpWaitRequest = SecureRequest();
|
||||
auto bindDcKeyRequest = SecureRequest();
|
||||
auto pingRequest = SerializedRequest();
|
||||
auto ackRequest = SerializedRequest();
|
||||
auto resendRequest = SerializedRequest();
|
||||
auto stateRequest = SerializedRequest();
|
||||
auto httpWaitRequest = SerializedRequest();
|
||||
auto bindDcKeyRequest = SerializedRequest();
|
||||
if (_pingIdToSend) {
|
||||
if (sendOnlyFirstPing || !isMainSession) {
|
||||
DEBUG_LOG(("MTP Info: sending ping, ping_id: %1"
|
||||
).arg(_pingIdToSend));
|
||||
pingRequest = SecureRequest::Serialize(MTPPing(
|
||||
pingRequest = SerializedRequest::Serialize(MTPPing(
|
||||
MTP_long(_pingIdToSend)
|
||||
));
|
||||
} else {
|
||||
DEBUG_LOG(("MTP Info: sending ping_delay_disconnect, "
|
||||
"ping_id: %1").arg(_pingIdToSend));
|
||||
pingRequest = SecureRequest::Serialize(MTPPing_delay_disconnect(
|
||||
pingRequest = SerializedRequest::Serialize(MTPPing_delay_disconnect(
|
||||
MTP_long(_pingIdToSend),
|
||||
MTP_int(kPingDelayDisconnect)));
|
||||
_pingSender.callOnce(kPingSendAfterForce);
|
||||
}
|
||||
_pingSendAt = pingRequest->msDate + kPingSendAfter;
|
||||
_pingSendAt = pingRequest->lastSentTime + kPingSendAfter;
|
||||
_pingId = base::take(_pingIdToSend);
|
||||
} else if (!sendAll) {
|
||||
DEBUG_LOG(("MTP Info: dc %1 sending only service or bind."
|
||||
@@ -594,12 +600,12 @@ void ConnectionPrivate::tryToSend() {
|
||||
|
||||
if (!sendOnlyFirstPing) {
|
||||
if (!_ackRequestData.isEmpty()) {
|
||||
ackRequest = SecureRequest::Serialize(MTPMsgsAck(
|
||||
ackRequest = SerializedRequest::Serialize(MTPMsgsAck(
|
||||
MTP_msgs_ack(MTP_vector<MTPlong>(
|
||||
base::take(_ackRequestData)))));
|
||||
}
|
||||
if (!_resendRequestData.isEmpty()) {
|
||||
resendRequest = SecureRequest::Serialize(MTPMsgResendReq(
|
||||
resendRequest = SerializedRequest::Serialize(MTPMsgResendReq(
|
||||
MTP_msg_resend_req(MTP_vector<MTPlong>(
|
||||
base::take(_resendRequestData)))));
|
||||
}
|
||||
@@ -609,13 +615,13 @@ void ConnectionPrivate::tryToSend() {
|
||||
for (const auto id : base::take(_stateRequestData)) {
|
||||
ids.push_back(MTP_long(id));
|
||||
}
|
||||
stateRequest = SecureRequest::Serialize(MTPMsgsStateReq(
|
||||
stateRequest = SerializedRequest::Serialize(MTPMsgsStateReq(
|
||||
MTP_msgs_state_req(MTP_vector<MTPlong>(ids))));
|
||||
// Add to haveSent / _ackedIds, but don't add to requestMap.
|
||||
stateRequest->requestId = GetNextRequestId();
|
||||
}
|
||||
if (_connection->usingHttpWait()) {
|
||||
httpWaitRequest = SecureRequest::Serialize(MTPHttpWait(
|
||||
httpWaitRequest = SerializedRequest::Serialize(MTPHttpWait(
|
||||
MTP_http_wait(MTP_int(100), MTP_int(30), MTP_int(25000))));
|
||||
}
|
||||
if (!_bindMsgId && _keyCreator && _keyCreator->readyToBind()) {
|
||||
@@ -631,7 +637,7 @@ void ConnectionPrivate::tryToSend() {
|
||||
}
|
||||
}
|
||||
|
||||
MTPInitConnection<SecureRequest> initWrapper;
|
||||
MTPInitConnection<SerializedRequest> initWrapper;
|
||||
int32 initSize = 0, initSizeInInts = 0;
|
||||
if (needsLayer) {
|
||||
Assert(_connectionOptions != nullptr);
|
||||
@@ -660,8 +666,8 @@ void ConnectionPrivate::tryToSend() {
|
||||
MTP_string(_connectionOptions->proxy.host),
|
||||
MTP_int(_connectionOptions->proxy.port))
|
||||
: MTPInputClientProxy();
|
||||
using Flag = MTPInitConnection<SecureRequest>::Flag;
|
||||
initWrapper = MTPInitConnection<SecureRequest>(
|
||||
using Flag = MTPInitConnection<SerializedRequest>::Flag;
|
||||
initWrapper = MTPInitConnection<SerializedRequest>(
|
||||
MTP_flags(mtprotoProxy ? Flag::f_proxy : Flag(0)),
|
||||
MTP_int(ApiId),
|
||||
MTP_string(deviceModel),
|
||||
@@ -671,25 +677,22 @@ void ConnectionPrivate::tryToSend() {
|
||||
MTP_string(langPackName),
|
||||
MTP_string(cloudLangCode),
|
||||
clientProxyFields,
|
||||
SecureRequest());
|
||||
SerializedRequest());
|
||||
initSizeInInts = (tl::count_length(initWrapper) >> 2) + 2;
|
||||
initSize = initSizeInInts * sizeof(mtpPrime);
|
||||
}
|
||||
|
||||
bool needAnyResponse = false;
|
||||
SecureRequest toSendRequest;
|
||||
SerializedRequest toSendRequest;
|
||||
{
|
||||
QWriteLocker locker1(_sessionData->toSendMutex());
|
||||
|
||||
auto toSendDummy = PreRequestMap();
|
||||
auto toSendDummy = base::flat_map<mtpRequestId, SerializedRequest>();
|
||||
auto &toSend = sendAll
|
||||
? _sessionData->toSendMap()
|
||||
: toSendDummy;
|
||||
if (!sendAll) {
|
||||
locker1.unlock();
|
||||
} else {
|
||||
int time = crl::now();
|
||||
int now = crl::now();
|
||||
}
|
||||
|
||||
uint32 toSendCount = toSend.size();
|
||||
@@ -716,8 +719,8 @@ void ConnectionPrivate::tryToSend() {
|
||||
? httpWaitRequest
|
||||
: bindDcKeyRequest
|
||||
? bindDcKeyRequest
|
||||
: toSend.cbegin().value();
|
||||
if (toSendCount == 1 && first->msDate > 0) { // if can send without container
|
||||
: toSend.begin()->second;
|
||||
if (toSendCount == 1 && !first->forceSendInContainer) {
|
||||
toSendRequest = first;
|
||||
if (sendAll) {
|
||||
toSend.clear();
|
||||
@@ -740,27 +743,27 @@ void ConnectionPrivate::tryToSend() {
|
||||
|
||||
if (toSendRequest->requestId) {
|
||||
if (toSendRequest.needAck()) {
|
||||
toSendRequest->msDate = toSendRequest.isStateRequest() ? 0 : crl::now();
|
||||
toSendRequest->lastSentTime = crl::now();
|
||||
|
||||
QWriteLocker locker2(_sessionData->haveSentMutex());
|
||||
auto &haveSent = _sessionData->haveSentMap();
|
||||
haveSent.insert(msgId, toSendRequest);
|
||||
haveSent.emplace(msgId, toSendRequest);
|
||||
|
||||
const auto wrapLayer = needsLayer && toSendRequest->needsLayer;
|
||||
if (toSendRequest->after) {
|
||||
const auto toSendSize = tl::count_length(toSendRequest) >> 2;
|
||||
auto wrappedRequest = SecureRequest::Prepare(
|
||||
auto wrappedRequest = SerializedRequest::Prepare(
|
||||
toSendSize,
|
||||
toSendSize + 3);
|
||||
wrappedRequest->resize(4);
|
||||
memcpy(wrappedRequest->data(), toSendRequest->constData(), 4 * sizeof(mtpPrime));
|
||||
wrapInvokeAfter(wrappedRequest, toSendRequest, haveSent);
|
||||
WrapInvokeAfter(wrappedRequest, toSendRequest, haveSent);
|
||||
toSendRequest = std::move(wrappedRequest);
|
||||
}
|
||||
if (wrapLayer) {
|
||||
const auto noWrapSize = (tl::count_length(toSendRequest) >> 2);
|
||||
const auto toSendSize = noWrapSize + initSizeInInts;
|
||||
auto wrappedRequest = SecureRequest::Prepare(toSendSize);
|
||||
auto wrappedRequest = SerializedRequest::Prepare(toSendSize);
|
||||
memcpy(wrappedRequest->data(), toSendRequest->constData(), 7 * sizeof(mtpPrime)); // all except length
|
||||
wrappedRequest->push_back(mtpc_invokeWithLayer);
|
||||
wrappedRequest->push_back(internal::CurrentLayer);
|
||||
@@ -784,9 +787,9 @@ void ConnectionPrivate::tryToSend() {
|
||||
if (stateRequest) containerSize += stateRequest.messageSize();
|
||||
if (httpWaitRequest) containerSize += httpWaitRequest.messageSize();
|
||||
if (bindDcKeyRequest) containerSize += bindDcKeyRequest.messageSize();
|
||||
for (auto i = toSend.begin(), e = toSend.end(); i != e; ++i) {
|
||||
containerSize += i.value().messageSize();
|
||||
if (needsLayer && i.value()->needsLayer) {
|
||||
for (const auto &[requestId, request] : toSend) {
|
||||
containerSize += request.messageSize();
|
||||
if (needsLayer && request->needsLayer) {
|
||||
containerSize += initSizeInInts;
|
||||
willNeedInit = true;
|
||||
}
|
||||
@@ -799,7 +802,7 @@ void ConnectionPrivate::tryToSend() {
|
||||
initWrapper.write<mtpBuffer>(initSerialized);
|
||||
}
|
||||
// prepare container + each in invoke after
|
||||
toSendRequest = SecureRequest::Prepare(
|
||||
toSendRequest = SerializedRequest::Prepare(
|
||||
containerSize,
|
||||
containerSize + 3 * toSend.size());
|
||||
toSendRequest->push_back(mtpc_msg_container);
|
||||
@@ -813,9 +816,8 @@ void ConnectionPrivate::tryToSend() {
|
||||
auto &haveSent = _sessionData->haveSentMap();
|
||||
|
||||
// prepare "request-like" wrap for msgId vector
|
||||
auto haveSentIdsWrap = SecureRequest::Prepare(idsWrapSize);
|
||||
haveSentIdsWrap->msDate = 0; // Container: msDate = 0, seqNo = 0.
|
||||
haveSentIdsWrap->requestId = 0;
|
||||
auto haveSentIdsWrap = SerializedRequest::Prepare(idsWrapSize);
|
||||
haveSentIdsWrap->isContainerIdsWrap = true;
|
||||
haveSentIdsWrap->resize(haveSentIdsWrap->size() + idsWrapSize);
|
||||
auto haveSentArr = (mtpMsgId*)(haveSentIdsWrap->data() + 8);
|
||||
|
||||
@@ -840,10 +842,9 @@ void ConnectionPrivate::tryToSend() {
|
||||
if (resendRequest || stateRequest) {
|
||||
needAnyResponse = true;
|
||||
}
|
||||
for (auto i = toSend.begin(), e = toSend.end(); i != e; ++i) {
|
||||
auto &req = i.value();
|
||||
for (auto &[requestId, request] : toSend) {
|
||||
const auto msgId = prepareToSend(
|
||||
req,
|
||||
request,
|
||||
bigMsgId,
|
||||
forceNewMsgId);
|
||||
if (msgId >= bigMsgId) {
|
||||
@@ -851,44 +852,43 @@ void ConnectionPrivate::tryToSend() {
|
||||
}
|
||||
*(haveSentArr++) = msgId;
|
||||
bool added = false;
|
||||
if (req->requestId) {
|
||||
if (req.needAck()) {
|
||||
req->msDate = req.isStateRequest() ? 0 : crl::now();
|
||||
int32 reqNeedsLayer = (needsLayer && req->needsLayer) ? toSendRequest->size() : 0;
|
||||
if (req->after) {
|
||||
wrapInvokeAfter(toSendRequest, req, haveSent, reqNeedsLayer ? initSizeInInts : 0);
|
||||
if (request->requestId) {
|
||||
if (request.needAck()) {
|
||||
request->lastSentTime = crl::now();
|
||||
int32 reqNeedsLayer = (needsLayer && request->needsLayer) ? toSendRequest->size() : 0;
|
||||
if (request->after) {
|
||||
WrapInvokeAfter(toSendRequest, request, haveSent, reqNeedsLayer ? initSizeInInts : 0);
|
||||
if (reqNeedsLayer) {
|
||||
memcpy(toSendRequest->data() + reqNeedsLayer + 4, initSerialized.constData(), initSize);
|
||||
*(toSendRequest->data() + reqNeedsLayer + 3) += initSize;
|
||||
}
|
||||
added = true;
|
||||
} else if (reqNeedsLayer) {
|
||||
toSendRequest->resize(reqNeedsLayer + initSizeInInts + req.messageSize());
|
||||
memcpy(toSendRequest->data() + reqNeedsLayer, req->constData() + 4, 4 * sizeof(mtpPrime));
|
||||
toSendRequest->resize(reqNeedsLayer + initSizeInInts + request.messageSize());
|
||||
memcpy(toSendRequest->data() + reqNeedsLayer, request->constData() + 4, 4 * sizeof(mtpPrime));
|
||||
memcpy(toSendRequest->data() + reqNeedsLayer + 4, initSerialized.constData(), initSize);
|
||||
memcpy(toSendRequest->data() + reqNeedsLayer + 4 + initSizeInInts, req->constData() + 8, tl::count_length(req));
|
||||
memcpy(toSendRequest->data() + reqNeedsLayer + 4 + initSizeInInts, request->constData() + 8, tl::count_length(request));
|
||||
*(toSendRequest->data() + reqNeedsLayer + 3) += initSize;
|
||||
added = true;
|
||||
}
|
||||
Assert(!haveSent.contains(msgId));
|
||||
haveSent.insert(msgId, req);
|
||||
haveSent.emplace(msgId, request);
|
||||
|
||||
needAnyResponse = true;
|
||||
} else {
|
||||
_ackedIds.emplace(msgId, req->requestId);
|
||||
_ackedIds.emplace(msgId, request->requestId);
|
||||
}
|
||||
}
|
||||
if (!added) {
|
||||
uint32 from = toSendRequest->size(), len = req.messageSize();
|
||||
uint32 from = toSendRequest->size(), len = request.messageSize();
|
||||
toSendRequest->resize(from + len);
|
||||
memcpy(toSendRequest->data() + from, req->constData() + 4, len * sizeof(mtpPrime));
|
||||
memcpy(toSendRequest->data() + from, request->constData() + 4, len * sizeof(mtpPrime));
|
||||
}
|
||||
}
|
||||
if (stateRequest) {
|
||||
mtpMsgId msgId = placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, stateRequest);
|
||||
stateRequest->msDate = 0; // 0 for state request, do not request state of it
|
||||
const auto msgId = placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, stateRequest);
|
||||
Assert(!haveSent.contains(msgId));
|
||||
haveSent.insert(msgId, stateRequest);
|
||||
haveSent.emplace(msgId, stateRequest);
|
||||
}
|
||||
if (resendRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, resendRequest);
|
||||
if (ackRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, ackRequest);
|
||||
@@ -899,9 +899,8 @@ void ConnectionPrivate::tryToSend() {
|
||||
bigMsgId,
|
||||
forceNewMsgId);
|
||||
*(mtpMsgId*)(haveSentIdsWrap->data() + 4) = containerMsgId;
|
||||
(*haveSentIdsWrap)[6] = 0; // for container, msDate = 0, seqNo = 0
|
||||
Assert(!haveSent.contains(containerMsgId));
|
||||
haveSent.insert(containerMsgId, haveSentIdsWrap);
|
||||
haveSent.emplace(containerMsgId, haveSentIdsWrap);
|
||||
toSend.clear();
|
||||
}
|
||||
}
|
||||
@@ -1370,7 +1369,8 @@ void ConnectionPrivate::handleReceived() {
|
||||
}
|
||||
|
||||
auto lock = QReadLocker(_sessionData->haveReceivedMutex());
|
||||
const auto tryToReceive = !_sessionData->haveReceivedResponses().isEmpty() || !_sessionData->haveReceivedUpdates().isEmpty();
|
||||
const auto tryToReceive = !_sessionData->haveReceivedResponses().empty()
|
||||
|| !_sessionData->haveReceivedUpdates().empty();
|
||||
lock.unlock();
|
||||
|
||||
if (tryToReceive) {
|
||||
@@ -1521,16 +1521,16 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(
|
||||
|| (errorCode == 64); // bad container
|
||||
if (errorCode == 64) { // bad container!
|
||||
if (Logs::DebugEnabled()) {
|
||||
SecureRequest request;
|
||||
SerializedRequest request;
|
||||
{
|
||||
QWriteLocker locker(_sessionData->haveSentMutex());
|
||||
auto &haveSent = _sessionData->haveSentMap();
|
||||
|
||||
const auto i = haveSent.constFind(resendId);
|
||||
if (i == haveSent.cend()) {
|
||||
const auto i = haveSent.find(resendId);
|
||||
if (i == haveSent.end()) {
|
||||
LOG(("Message Error: Container not found!"));
|
||||
} else {
|
||||
request = i.value();
|
||||
request = i->second;
|
||||
}
|
||||
}
|
||||
if (request) {
|
||||
@@ -1626,53 +1626,6 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(
|
||||
resend(resendId);
|
||||
} return HandleResult::Success;
|
||||
|
||||
case mtpc_msgs_state_req: {
|
||||
if (badTime) {
|
||||
DEBUG_LOG(("Message Info: skipping with bad time..."));
|
||||
return HandleResult::Ignored;
|
||||
}
|
||||
MTPMsgsStateReq msg;
|
||||
if (!msg.read(from, end)) {
|
||||
return HandleResult::ParseError;
|
||||
}
|
||||
auto &ids = msg.c_msgs_state_req().vmsg_ids().v;
|
||||
auto idsCount = ids.size();
|
||||
DEBUG_LOG(("Message Info: msgs_state_req received, ids: %1").arg(LogIdsVector(ids)));
|
||||
if (!idsCount) return HandleResult::Success;
|
||||
|
||||
QByteArray info(idsCount, Qt::Uninitialized);
|
||||
{
|
||||
const auto minRecv = _receivedMessageIds.min();
|
||||
const auto maxRecv = _receivedMessageIds.max();
|
||||
for (uint32 i = 0, l = idsCount; i < l; ++i) {
|
||||
char state = 0;
|
||||
uint64 reqMsgId = ids[i].v;
|
||||
if (reqMsgId < minRecv) {
|
||||
state |= 0x01;
|
||||
} else if (reqMsgId > maxRecv) {
|
||||
state |= 0x03;
|
||||
} else {
|
||||
auto msgIdState = _receivedMessageIds.lookup(reqMsgId);
|
||||
if (msgIdState == ReceivedIdsManager::State::NotFound) {
|
||||
state |= 0x02;
|
||||
} else {
|
||||
state |= 0x04;
|
||||
if (_ackedIds.contains(reqMsgId)) {
|
||||
state |= 0x80; // we know, that server knows, that we received request
|
||||
}
|
||||
if (msgIdState == ReceivedIdsManager::State::NeedsAck) { // need ack, so we sent ack
|
||||
state |= 0x08;
|
||||
} else {
|
||||
state |= 0x10;
|
||||
}
|
||||
}
|
||||
}
|
||||
info[i] = state;
|
||||
}
|
||||
}
|
||||
_sessionData->queueSendMsgsStateInfo(msgId, info);
|
||||
} return HandleResult::Success;
|
||||
|
||||
case mtpc_msgs_state_info: {
|
||||
MTPMsgsStateInfo msg;
|
||||
if (!msg.read(from, end)) {
|
||||
@@ -1684,12 +1637,12 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(
|
||||
auto &states = data.vinfo().v;
|
||||
|
||||
DEBUG_LOG(("Message Info: msg state received, msgId %1, reqMsgId: %2, HEX states %3").arg(msgId).arg(reqMsgId).arg(Logs::mb(states.data(), states.length()).str()));
|
||||
SecureRequest requestBuffer;
|
||||
SerializedRequest requestBuffer;
|
||||
{ // find this request in session-shared sent requests map
|
||||
QReadLocker locker(_sessionData->haveSentMutex());
|
||||
const auto &haveSent = _sessionData->haveSentMap();
|
||||
const auto replyTo = haveSent.constFind(reqMsgId);
|
||||
if (replyTo == haveSent.cend()) { // do not look in toResend, because we do not resend msgs_state_req requests
|
||||
const auto replyTo = haveSent.find(reqMsgId);
|
||||
if (replyTo == haveSent.end()) { // do not look in toResend, because we do not resend msgs_state_req requests
|
||||
DEBUG_LOG(("Message Error: such message was not sent recently %1").arg(reqMsgId));
|
||||
return (badTime ? HandleResult::Ignored : HandleResult::Success);
|
||||
}
|
||||
@@ -1703,7 +1656,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(
|
||||
|
||||
badTime = false;
|
||||
}
|
||||
requestBuffer = replyTo.value();
|
||||
requestBuffer = replyTo->second;
|
||||
}
|
||||
QVector<MTPlong> toAckReq(1, MTP_long(reqMsgId)), toAck;
|
||||
requestsAcked(toAck, true);
|
||||
@@ -1809,7 +1762,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(
|
||||
if (from + 3 > end) {
|
||||
return HandleResult::ParseError;
|
||||
}
|
||||
auto response = SerializedMessage();
|
||||
auto response = mtpBuffer();
|
||||
|
||||
MTPlong reqMsgId;
|
||||
if (!reqMsgId.read(++from, end)) {
|
||||
@@ -1861,7 +1814,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(
|
||||
if (requestId && requestId != mtpRequestId(0xFFFFFFFF)) {
|
||||
// Save rpc_result for processing in the main thread.
|
||||
QWriteLocker locker(_sessionData->haveReceivedMutex());
|
||||
_sessionData->haveReceivedResponses().insert(requestId, response);
|
||||
_sessionData->haveReceivedResponses().emplace(requestId, response);
|
||||
} else {
|
||||
DEBUG_LOG(("RPC Info: requestId not found for msgId %1").arg(requestMsgId));
|
||||
}
|
||||
@@ -1893,11 +1846,11 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(
|
||||
QReadLocker locker(_sessionData->haveSentMutex());
|
||||
const auto &haveSent = _sessionData->haveSentMap();
|
||||
toResend.reserve(haveSent.size());
|
||||
for (auto i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) {
|
||||
if (i.key() >= firstMsgId) {
|
||||
for (const auto &[msgId, request] : haveSent) {
|
||||
if (msgId >= firstMsgId) {
|
||||
break;
|
||||
} else if (i.value()->requestId) {
|
||||
toResend.push_back(i.key());
|
||||
} else if (request->requestId) {
|
||||
toResend.push_back(msgId);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1910,7 +1863,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(
|
||||
|
||||
// Notify main process about new session - need to get difference.
|
||||
QWriteLocker locker(_sessionData->haveReceivedMutex());
|
||||
_sessionData->haveReceivedUpdates().push_back(SerializedMessage(update));
|
||||
_sessionData->haveReceivedUpdates().push_back(mtpBuffer(update));
|
||||
} return HandleResult::Success;
|
||||
|
||||
case mtpc_pong: {
|
||||
@@ -1957,7 +1910,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(
|
||||
|
||||
// Notify main process about the new updates.
|
||||
QWriteLocker locker(_sessionData->haveReceivedMutex());
|
||||
_sessionData->haveReceivedUpdates().push_back(SerializedMessage(update));
|
||||
_sessionData->haveReceivedUpdates().push_back(mtpBuffer(update));
|
||||
} else {
|
||||
LOG(("Message Error: unexpected updates in dcType: %1"
|
||||
).arg(static_cast<int>(_currentDcType)));
|
||||
@@ -2075,27 +2028,27 @@ void ConnectionPrivate::requestsAcked(const QVector<MTPlong> &ids, bool byRespon
|
||||
for (uint32 i = 0; i < idsCount; ++i) {
|
||||
mtpMsgId msgId = ids[i].v;
|
||||
const auto req = haveSent.find(msgId);
|
||||
if (req != haveSent.cend()) {
|
||||
if (!req.value()->msDate) {
|
||||
if (req != haveSent.end()) {
|
||||
if (req->second.isSentContainer()) {
|
||||
DEBUG_LOG(("Message Info: container ack received, msgId %1").arg(ids[i].v));
|
||||
uint32 inContCount = ((*req)->size() - 8) / 2;
|
||||
const mtpMsgId *inContId = (const mtpMsgId *)(req.value()->constData() + 8);
|
||||
uint32 inContCount = (req->second->size() - 8) / 2;
|
||||
const mtpMsgId *inContId = (const mtpMsgId *)(req->second->constData() + 8);
|
||||
toAckMore.reserve(toAckMore.size() + inContCount);
|
||||
for (uint32 j = 0; j < inContCount; ++j) {
|
||||
toAckMore.push_back(MTP_long(*(inContId++)));
|
||||
}
|
||||
haveSent.erase(req);
|
||||
} else {
|
||||
mtpRequestId reqId = req.value()->requestId;
|
||||
const auto requestId = req->second->requestId;
|
||||
bool moveToAcked = byResponse;
|
||||
if (!moveToAcked) { // ignore ACK, if we need a response (if we have a handler)
|
||||
moveToAcked = !_instance->hasCallbacks(reqId);
|
||||
moveToAcked = !_instance->hasCallbacks(requestId);
|
||||
}
|
||||
if (moveToAcked) {
|
||||
_ackedIds.emplace(msgId, reqId);
|
||||
_ackedIds.emplace(msgId, requestId);
|
||||
haveSent.erase(req);
|
||||
} else {
|
||||
DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(reqId));
|
||||
DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(requestId));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -2112,9 +2065,9 @@ void ConnectionPrivate::requestsAcked(const QVector<MTPlong> &ids, bool byRespon
|
||||
auto &toSend = _sessionData->toSendMap();
|
||||
const auto req = toSend.find(reqId);
|
||||
if (req != toSend.cend()) {
|
||||
_ackedIds.emplace(msgId, req.value()->requestId);
|
||||
if (req.value()->requestId != reqId) {
|
||||
DEBUG_LOG(("Message Error: for msgId %1 found resent request, requestId %2, contains requestId %3").arg(msgId).arg(reqId).arg(req.value()->requestId));
|
||||
_ackedIds.emplace(msgId, req->second->requestId);
|
||||
if (req->second->requestId != reqId) {
|
||||
DEBUG_LOG(("Message Error: for msgId %1 found resent request, requestId %2, contains requestId %3").arg(msgId).arg(reqId).arg(req->second->requestId));
|
||||
} else {
|
||||
DEBUG_LOG(("Message Info: acked msgId %1 that was prepared to resend, requestId %2").arg(msgId).arg(reqId));
|
||||
}
|
||||
@@ -2172,9 +2125,7 @@ void ConnectionPrivate::handleMsgsStates(const QVector<MTPlong> &ids, const QByt
|
||||
uint64 requestMsgId = ids[i].v;
|
||||
{
|
||||
QReadLocker locker(_sessionData->haveSentMutex());
|
||||
const auto &haveSent = _sessionData->haveSentMap();
|
||||
const auto haveSentEnd = haveSent.cend();
|
||||
if (haveSent.find(requestMsgId) == haveSentEnd) {
|
||||
if (!_sessionData->haveSentMap().contains(requestMsgId)) {
|
||||
DEBUG_LOG(("Message Info: state was received for msgId %1, but request is not found, looking in resent requests...").arg(requestMsgId));
|
||||
const auto reqIt = _resendingIds.find(requestMsgId);
|
||||
if (reqIt != _resendingIds.cend()) {
|
||||
@@ -2226,7 +2177,7 @@ void ConnectionPrivate::resend(
|
||||
if (i == haveSent.end()) {
|
||||
return;
|
||||
}
|
||||
auto request = i.value();
|
||||
auto request = i->second;
|
||||
haveSent.erase(i);
|
||||
lock.unlock();
|
||||
|
||||
@@ -2238,11 +2189,12 @@ void ConnectionPrivate::resend(
|
||||
resend(ids[i], -1, true);
|
||||
}
|
||||
} else if (!request.isStateRequest()) {
|
||||
request->msDate = forceContainer ? 0 : crl::now();
|
||||
request->lastSentTime = crl::now();
|
||||
request->forceSendInContainer = forceContainer;
|
||||
_resendingIds.emplace(msgId, request->requestId);
|
||||
{
|
||||
QWriteLocker locker(_sessionData->toSendMutex());
|
||||
_sessionData->toSendMap().insert(request->requestId, request);
|
||||
_sessionData->toSendMap().emplace(request->requestId, request);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2253,9 +2205,9 @@ void ConnectionPrivate::resendAll() {
|
||||
auto lock = QReadLocker(_sessionData->haveSentMutex());
|
||||
const auto &haveSent = _sessionData->haveSentMap();
|
||||
toResend.reserve(haveSent.size());
|
||||
for (auto i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) {
|
||||
if (!i.value().isSentContainer()) {
|
||||
toResend.push_back(i.key());
|
||||
for (const auto &[msgId, request] : haveSent) {
|
||||
if (!request.isSentContainer() && !request.isStateRequest()) {
|
||||
toResend.push_back(msgId);
|
||||
}
|
||||
}
|
||||
lock.unlock();
|
||||
@@ -2581,7 +2533,7 @@ void ConnectionPrivate::destroyTemporaryKey() {
|
||||
}
|
||||
|
||||
bool ConnectionPrivate::sendSecureRequest(
|
||||
SecureRequest &&request,
|
||||
SerializedRequest &&request,
|
||||
bool needAnyResponse) {
|
||||
#ifdef TDESKTOP_MTPROTO_OLD
|
||||
const auto oldPadding = true;
|
||||
@@ -2671,10 +2623,10 @@ mtpRequestId ConnectionPrivate::wasSent(mtpMsgId msgId) const {
|
||||
{
|
||||
QReadLocker locker(_sessionData->haveSentMutex());
|
||||
const auto &haveSent = _sessionData->haveSentMap();
|
||||
const auto i = haveSent.constFind(msgId);
|
||||
if (i != haveSent.cend()) {
|
||||
return i.value()->requestId
|
||||
? i.value()->requestId
|
||||
const auto i = haveSent.find(msgId);
|
||||
if (i != haveSent.end()) {
|
||||
return i->second->requestId
|
||||
? i->second->requestId
|
||||
: mtpRequestId(0xFFFFFFFF);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user