import org.apache.beam.sdk.util.BackOff; //导入依赖的package包/类
/**
* Writes a batch of mutations to Cloud Datastore.
*
*
If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. All
* mutations in the batch will be committed again, even if the commit was partially
* successful. If the retry limit is exceeded, the last exception from Cloud Datastore will be
* thrown.
*
* @throws DatastoreException if the commit fails or IOException or InterruptedException if
* backing off between retries fails.
*/
private void flushBatch) throws DatastoreException, IOException, InterruptedException {
LOG.debug“Writing batch of {} mutations”, mutations.size));
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff);
while true) {
// Batch upsert entities.
CommitRequest.Builder commitRequest = CommitRequest.newBuilder);
commitRequest.addAllMutationsmutations);
commitRequest.setModeCommitRequest.Mode.NON_TRANSACTIONAL);
long startTime = System.currentTimeMillis), endTime;
if throttler.throttleRequeststartTime)) {
LOG.info“Delaying request due to previous failures”);
throttledSeconds.incWriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS / 1000);
sleeper.sleepWriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS);
continue;
}
try {
datastore.commitcommitRequest.build));
endTime = System.currentTimeMillis);
writeBatcher.addRequestLatencyendTime, endTime – startTime, mutations.size));
throttler.successfulRequeststartTime);
rpcSuccesses.inc);
// Break if the commit threw no exception.
break;
} catch DatastoreException exception) {
if exception.getCode) == Code.DEADLINE_EXCEEDED) {
/* Most errors are not related to request size, and should not change our expectation of
* the latency of successful requests. DEADLINE_EXCEEDED can be taken into
* consideration, though. */
endTime = System.currentTimeMillis);
writeBatcher.addRequestLatencyendTime, endTime – startTime, mutations.size));
}
// Only log the code and message for potentially-transient errors. The entire exception
// will be propagated upon the last retry.
LOG.error“Error writing batch of {} mutations to Datastore {}): {}”, mutations.size),
exception.getCode), exception.getMessage));
rpcErrors.inc);
if NON_RETRYABLE_ERRORS.containsexception.getCode))) {
throw exception;
}
if !BackOffUtils.nextsleeper, backoff)) {
LOG.error“Aborting after {} retries.”, MAX_RETRIES);
throw exception;
}
}
}
LOG.debug“Successfully wrote {} mutations”, mutations.size));
mutations.clear);
mutationsSize = 0;
}