|
|
@ -78,10 +78,24 @@ class JobQueue : JobDelegate {
|
|
|
|
return
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
hasResumedPendingJobs = true
|
|
|
|
hasResumedPendingJobs = true
|
|
|
|
val allJobTypes = listOf(AttachmentDownloadJob.KEY, AttachmentDownloadJob.KEY, MessageReceiveJob.KEY, MessageSendJob.KEY, NotifyPNServerJob.KEY)
|
|
|
|
val allJobTypes = listOf(AttachmentUploadJob.KEY,
|
|
|
|
|
|
|
|
AttachmentDownloadJob.KEY,
|
|
|
|
|
|
|
|
MessageReceiveJob.KEY,
|
|
|
|
|
|
|
|
MessageSendJob.KEY,
|
|
|
|
|
|
|
|
NotifyPNServerJob.KEY
|
|
|
|
|
|
|
|
)
|
|
|
|
allJobTypes.forEach { type ->
|
|
|
|
allJobTypes.forEach { type ->
|
|
|
|
val allPendingJobs = MessagingModuleConfiguration.shared.storage.getAllPendingJobs(type)
|
|
|
|
val allPendingJobs = MessagingModuleConfiguration.shared.storage.getAllPendingJobs(type)
|
|
|
|
allPendingJobs.sortedBy { it.id }.forEach { job ->
|
|
|
|
val pendingJobs = mutableListOf<Job>()
|
|
|
|
|
|
|
|
for ((id, job) in allPendingJobs) {
|
|
|
|
|
|
|
|
if (job == null) {
|
|
|
|
|
|
|
|
// job failed to serialize, remove it from the DB
|
|
|
|
|
|
|
|
handleJobFailedPermanently(id)
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
pendingJobs.add(job)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
pendingJobs.sortedBy { it.id }.forEach { job ->
|
|
|
|
Log.i("Jobs", "Resuming pending job of type: ${job::class.simpleName}.")
|
|
|
|
Log.i("Jobs", "Resuming pending job of type: ${job::class.simpleName}.")
|
|
|
|
queue.offer(job) // Offer always called on unlimited capacity
|
|
|
|
queue.offer(job) // Offer always called on unlimited capacity
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -89,17 +103,18 @@ class JobQueue : JobDelegate {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
override fun handleJobSucceeded(job: Job) {
|
|
|
|
override fun handleJobSucceeded(job: Job) {
|
|
|
|
MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(job)
|
|
|
|
val jobId = job.id ?: return
|
|
|
|
|
|
|
|
MessagingModuleConfiguration.shared.storage.markJobAsSucceeded(jobId)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
override fun handleJobFailed(job: Job, error: Exception) {
|
|
|
|
override fun handleJobFailed(job: Job, error: Exception) {
|
|
|
|
job.failureCount += 1
|
|
|
|
job.failureCount += 1
|
|
|
|
val storage = MessagingModuleConfiguration.shared.storage
|
|
|
|
val storage = MessagingModuleConfiguration.shared.storage
|
|
|
|
if (storage.isJobCanceled(job)) { return Log.i("Jobs", "${job::class.simpleName} canceled.")}
|
|
|
|
if (storage.isJobCanceled(job)) { return Log.i("Jobs", "${job::class.simpleName} canceled.")}
|
|
|
|
storage.persistJob(job)
|
|
|
|
|
|
|
|
if (job.failureCount == job.maxFailureCount) {
|
|
|
|
if (job.failureCount == job.maxFailureCount) {
|
|
|
|
storage.markJobAsFailed(job)
|
|
|
|
handleJobFailedPermanently(job, error)
|
|
|
|
} else {
|
|
|
|
} else {
|
|
|
|
|
|
|
|
storage.persistJob(job)
|
|
|
|
val retryInterval = getRetryInterval(job)
|
|
|
|
val retryInterval = getRetryInterval(job)
|
|
|
|
Log.i("Jobs", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).")
|
|
|
|
Log.i("Jobs", "${job::class.simpleName} failed; scheduling retry (failure count is ${job.failureCount}).")
|
|
|
|
timer.schedule(delay = retryInterval) {
|
|
|
|
timer.schedule(delay = retryInterval) {
|
|
|
@ -110,10 +125,13 @@ class JobQueue : JobDelegate {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
override fun handleJobFailedPermanently(job: Job, error: Exception) {
|
|
|
|
override fun handleJobFailedPermanently(job: Job, error: Exception) {
|
|
|
|
job.failureCount += 1
|
|
|
|
val jobId = job.id ?: return
|
|
|
|
|
|
|
|
handleJobFailedPermanently(jobId)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private fun handleJobFailedPermanently(jobId: String) {
|
|
|
|
val storage = MessagingModuleConfiguration.shared.storage
|
|
|
|
val storage = MessagingModuleConfiguration.shared.storage
|
|
|
|
storage.persistJob(job)
|
|
|
|
storage.markJobAsFailed(jobId)
|
|
|
|
storage.markJobAsFailed(job)
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private fun getRetryInterval(job: Job): Long {
|
|
|
|
private fun getRetryInterval(job: Job): Long {
|
|
|
|