MongoDB
 sql >> Base de données >  >> NoSQL >> MongoDB

Le taux de demande CosmosDb est élevé avec insertMany

Le pilote Mongo vous indique quels enregistrements ont eu des erreurs et lesquels n'ont pas été traités du tout. Si toutes les erreurs (généralement une) ont le code 16500, votre problème est la limitation et réessayez sur les erreurs et les enregistrements restants sont sûrs. Sinon, vos erreurs sont causées par autre chose et vous devriez faire une analyse et décider si vous souhaitez continuer avec les tentatives.

Le pilote Mongo ne renvoie pas l'en-tête HTTP où Cosmos DB suggère un délai avant une nouvelle tentative, mais ce n'est pas grave. De toute façon, le délai ne garantit pas le succès, car d'autres requêtes atteignant la même base de données peuvent utiliser des RU. Vous feriez mieux d'expérimenter et de déterminer vos propres règles de nouvelle tentative. Vous trouverez ci-dessous une solution récursive simple qui continue de réessayer jusqu'à ce que tout aille bien ou que la limite de tentatives soit atteinte.

    private async Task InsertManyWithRetry(IMongoCollection<BsonDocument> collection, 
        IEnumerable<BsonDocument> batch, int retries = 10, int delay = 300)
    {
        var batchArray = batch.ToArray();

        try
        {
            await collection.InsertManyAsync(batchArray);
        }
        catch (MongoBulkWriteException<BsonDocument> e)
        {
            if (retries <= 0)
                throw;

            //Check if there were any errors other than throttling.
            var realErrors = e.WriteErrors.Where(we => we.Code != 16500).ToArray();
            //Re-throw original exception for now.
            //TODO: We can make it more sophisticated by continuing with unprocessed records and collecting all errors from all retries.
            if (realErrors.Any())
                throw;

            //Take all records that had errors.
            var errors = e.WriteErrors.Select(we => batchArray[we.Index]);
            //Take all unprocessed records.
            var unprocessed = e.UnprocessedRequests
                .Where(ur => ur.ModelType == WriteModelType.InsertOne)
                .OfType<InsertOneModel<BsonDocument>>() 
                .Select(ur => ur.Document);

            var retryBatchArray = errors.Union(unprocessed).ToArray();

            _logger($"Retry {retryBatchArray.Length} records after {delay} ms");

            await Task.Delay(delay);

            await InsertManyWithRetry(collection, retryBatchArray, retries - 1, delay);
        }
    }