mirror of
https://github.com/VinylDNS/vinyldns
synced 2025-08-30 05:47:56 +00:00
retry some failed record changes (#907)
This commit is contained in:
parent
31b86f9733
commit
f461a62ee5
@ -4080,7 +4080,7 @@ def test_create_batch_multi_record_update_succeeds(shared_zone_test_context):
|
||||
get_change_A_AAAA_json(a_delete_record_and_record_set_fqdn, address="1.1.1.1", change_type="DeleteRecordSet"),
|
||||
get_change_A_AAAA_json(a_delete_record_and_record_set_fqdn, change_type="DeleteRecordSet"),
|
||||
get_change_TXT_json(txt_delete_record_and_record_set_fqdn, text="hello", change_type="DeleteRecordSet"),
|
||||
get_change_TXT_json(txt_delete_record_and_record_set_fqdn, change_type="DeleteRecordSet")
|
||||
get_change_TXT_json(txt_delete_record_and_record_set_fqdn, change_type="DeleteRecordSet"),
|
||||
]
|
||||
}
|
||||
|
||||
@ -4091,8 +4091,10 @@ def test_create_batch_multi_record_update_succeeds(shared_zone_test_context):
|
||||
create_rs = client.create_recordset(rs, status=202)
|
||||
to_delete.append(client.wait_until_recordset_change_status(create_rs, 'Complete'))
|
||||
|
||||
result = client.create_batch_change(batch_change_input, status=202)
|
||||
client.wait_until_batch_change_completed(result)
|
||||
initial_result = client.create_batch_change(batch_change_input, status=202)
|
||||
result = client.wait_until_batch_change_completed(initial_result)
|
||||
|
||||
assert_that(result['status'], is_('Complete'))
|
||||
|
||||
# Check batch change response
|
||||
assert_change_success_response_values(result['changes'], zone=ok_zone, index=0, input_name=a_update_record_set_fqdn, record_name=a_update_record_set_name, record_data=None, change_type="DeleteRecordSet")
|
||||
@ -4171,6 +4173,60 @@ def test_create_batch_multi_record_update_succeeds(shared_zone_test_context):
|
||||
finally:
|
||||
clear_recordset_list(to_delete, client)
|
||||
|
||||
@pytest.mark.skip_production
|
||||
def test_create_batch_update_record_type_succeeds(shared_zone_test_context):
|
||||
"""
|
||||
Test existing record sets can be updated to a different type in batch (relies on skip-prod)
|
||||
"""
|
||||
client = shared_zone_test_context.ok_vinyldns_client
|
||||
ok_zone = shared_zone_test_context.ok_zone
|
||||
|
||||
# record sets to setup
|
||||
a_update_to_cname_and_record_set_name = generate_record_name()
|
||||
a_update_to_cname_and_record_set_fqdn = a_update_to_cname_and_record_set_name + ".ok."
|
||||
a_update_to_cname_and_record_set = get_recordset_json(ok_zone, a_update_to_cname_and_record_set_name, "A", [{"address": "1.1.1.1"}], 200)
|
||||
|
||||
cname_update_from_a_and_record_set_fqdn = a_update_to_cname_and_record_set_name + ".ok."
|
||||
|
||||
batch_change_input = {
|
||||
"comments": "this is optional",
|
||||
"changes": [
|
||||
# Update A record to CNAME
|
||||
get_change_A_AAAA_json(a_update_to_cname_and_record_set_fqdn, change_type="DeleteRecordSet"),
|
||||
get_change_CNAME_json(cname_update_from_a_and_record_set_fqdn, cname="example.com.")
|
||||
]
|
||||
}
|
||||
|
||||
to_delete = []
|
||||
try:
|
||||
create_rs = client.create_recordset(a_update_to_cname_and_record_set, status=202)
|
||||
to_delete.append(client.wait_until_recordset_change_status(create_rs, 'Complete'))
|
||||
|
||||
initial_result = client.create_batch_change(batch_change_input, status=202)
|
||||
result = client.wait_until_batch_change_completed(initial_result)
|
||||
|
||||
assert_that(result['status'], is_('Complete'))
|
||||
|
||||
# Check batch change response
|
||||
assert_change_success_response_values(result['changes'], zone=ok_zone, index=0, input_name=a_update_to_cname_and_record_set_fqdn, record_name=a_update_to_cname_and_record_set_name, record_type="A", record_data=None, change_type="DeleteRecordSet")
|
||||
assert_change_success_response_values(result['changes'], zone=ok_zone, index=1, input_name=cname_update_from_a_and_record_set_fqdn, record_name=a_update_to_cname_and_record_set_name, record_type="CNAME", record_data="example.com.")
|
||||
|
||||
# Perform look up to verify record set data
|
||||
for rs in to_delete:
|
||||
rs_id = rs['recordSet']['id']
|
||||
zone_id = rs['zone']['id']
|
||||
|
||||
client.get_recordset(zone_id, rs_id, status=404)
|
||||
|
||||
new_cname_result_rs = result['changes'][1]
|
||||
new_cname_rs = client.get_recordset(new_cname_result_rs['zoneId'], new_cname_result_rs['recordSetId'], status=200)['recordSet']
|
||||
new_cname_rdata = new_cname_rs['records']
|
||||
assert_that(new_cname_rdata, contains({"cname": "example.com."}))
|
||||
client.delete_recordset(new_cname_rs['zoneId'], new_cname_rs['id'], status=202)
|
||||
|
||||
finally:
|
||||
clear_recordset_list(to_delete, client)
|
||||
|
||||
def test_create_batch_deletes_succeeds(shared_zone_test_context):
|
||||
"""
|
||||
Test creating batch change with DeleteRecordSet with valid record data succeeds
|
||||
|
@ -26,7 +26,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
__all__ = [u'VinylDNSClient', u'MAX_RETRIES', u'RETRY_WAIT']
|
||||
|
||||
MAX_RETRIES = 40
|
||||
MAX_RETRIES = 75
|
||||
RETRY_WAIT = 0.05
|
||||
|
||||
class VinylDNSClient(object):
|
||||
|
@ -20,21 +20,19 @@ import cats.effect.{ContextShift, IO, Timer}
|
||||
import cats.implicits._
|
||||
import org.slf4j.LoggerFactory
|
||||
import vinyldns.api.domain.dns.DnsConnection
|
||||
import vinyldns.api.domain.dns.DnsProtocol.NoError
|
||||
import vinyldns.api.domain.dns.DnsProtocol.{NoError, Refused, TryAgain}
|
||||
import vinyldns.api.domain.record.RecordSetHelpers._
|
||||
import vinyldns.core.domain.batch.{BatchChangeRepository, SingleChange}
|
||||
import vinyldns.core.domain.record._
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object RecordSetChangeHandler {
|
||||
|
||||
private val logger = LoggerFactory.getLogger("vinyldns.api.engine.RecordSetChangeHandler")
|
||||
private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
|
||||
private implicit val cs: ContextShift[IO] =
|
||||
IO.contextShift(scala.concurrent.ExecutionContext.global)
|
||||
|
||||
final case class Requeue(change: RecordSetChange) extends Throwable
|
||||
|
||||
def apply(
|
||||
recordSetRepository: RecordSetRepository,
|
||||
recordChangeRepository: RecordChangeRepository,
|
||||
@ -96,6 +94,8 @@ object RecordSetChangeHandler {
|
||||
|
||||
private final case class Completed(change: RecordSetChange) extends ProcessorState
|
||||
|
||||
private final case class Retrying(change: RecordSetChange) extends ProcessorState
|
||||
|
||||
private case class ProcessingError(message: String)
|
||||
|
||||
sealed trait ProcessingStatus
|
||||
@ -111,6 +111,9 @@ object RecordSetChangeHandler {
|
||||
// at which point the response will be returned.
|
||||
final case class ReadyToApply(change: RecordSetChange) extends ProcessingStatus
|
||||
|
||||
// Failure to process change. Permitted to retry.
|
||||
final case class Retry(change: RecordSetChange) extends ProcessingStatus
|
||||
|
||||
def getProcessingStatus(change: RecordSetChange, dnsConn: DnsConnection): IO[ProcessingStatus] = {
|
||||
def isDnsMatch(dnsResult: List[RecordSet], recordSet: RecordSet, zoneName: String): Boolean =
|
||||
dnsResult.exists(matches(_, recordSet, zoneName))
|
||||
@ -145,6 +148,7 @@ object RecordSetChangeHandler {
|
||||
if (existingRecords.nonEmpty) ReadyToApply(change) // we have a record set, move forward
|
||||
else AlreadyApplied(change) // we did not find the record set, so already applied
|
||||
}
|
||||
case Left(_: TryAgain) => Retry(change)
|
||||
case Left(error) => Failure(change, error.getMessage)
|
||||
}
|
||||
}
|
||||
@ -196,6 +200,10 @@ object RecordSetChangeHandler {
|
||||
case done: Completed =>
|
||||
logger.info(s"CHANGE COMPLETED; ${getChangeLog(done.change)}")
|
||||
IO.pure(done)
|
||||
|
||||
case Retrying(change) =>
|
||||
logger.info(s"CHANGE RETRYING; ${getChangeLog(change)}")
|
||||
IO.raiseError(Requeue(change))
|
||||
}
|
||||
}
|
||||
|
||||
@ -210,6 +218,7 @@ object RecordSetChangeHandler {
|
||||
s"Failed validating update to DNS for change ${change.id}:${change.recordSet.name}: " + message
|
||||
)
|
||||
)
|
||||
case Retry(_) => Retrying(change)
|
||||
}
|
||||
|
||||
/* Step 2: Apply the change to the dns backend */
|
||||
@ -217,6 +226,8 @@ object RecordSetChangeHandler {
|
||||
dnsConn.applyChange(change).value.map {
|
||||
case Right(_: NoError) =>
|
||||
Applied(change)
|
||||
case Left(_: Refused) =>
|
||||
Retrying(change)
|
||||
case Left(error) =>
|
||||
Completed(
|
||||
change.failed(
|
||||
@ -225,32 +236,17 @@ object RecordSetChangeHandler {
|
||||
)
|
||||
}
|
||||
|
||||
/* Step 3: Verify the record was created. We attempt 12 times over 6 seconds */
|
||||
private def verify(change: RecordSetChange, dnsConn: DnsConnection): IO[ProcessorState] = {
|
||||
def loop(retries: Int = 11): IO[ProcessorState] =
|
||||
getProcessingStatus(change, dnsConn).flatMap {
|
||||
case AlreadyApplied(_) => IO.pure(Completed(change.successful))
|
||||
case ReadyToApply(_) if retries <= 0 =>
|
||||
IO.pure(
|
||||
Completed(
|
||||
change.failed(s"""Failed verifying update to DNS for
|
||||
|change ${change.id}:${change.recordSet.name}: Verify out of retries.""".stripMargin)
|
||||
/* Step 3: Verify the record was created. If the ProcessorState is applied or failed we requeue the record.*/
|
||||
private def verify(change: RecordSetChange, dnsConn: DnsConnection): IO[ProcessorState] =
|
||||
getProcessingStatus(change, dnsConn).map {
|
||||
case AlreadyApplied(_) => Completed(change.successful)
|
||||
case Failure(_, message) => Completed(
|
||||
change.failed(
|
||||
s"Failed verifying update to DNS for change ${change.id}:${change.recordSet.name}: $message"
|
||||
)
|
||||
)
|
||||
case ReadyToApply(_) =>
|
||||
IO.sleep(500.milliseconds) *> loop(retries - 1)
|
||||
case Failure(_, message) =>
|
||||
IO.pure(
|
||||
Completed(
|
||||
change.failed(
|
||||
s"Failed verifying update to DNS for change ${change.id}:${change.recordSet.name}: $message"
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
loop()
|
||||
}
|
||||
case _ => Retrying(change)
|
||||
}
|
||||
|
||||
private def getChangeLog(change: RecordSetChange): String = {
|
||||
val sb = new StringBuilder
|
||||
|
@ -25,8 +25,8 @@ import org.scalatest.mockito.MockitoSugar
|
||||
import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpec}
|
||||
import org.xbill.DNS
|
||||
import vinyldns.api.domain.dns.DnsConnection
|
||||
import vinyldns.api.domain.dns.DnsProtocol.{NoError, Refused}
|
||||
import vinyldns.api.engine.RecordSetChangeHandler.{AlreadyApplied, Failure, ReadyToApply}
|
||||
import vinyldns.api.domain.dns.DnsProtocol.{NoError, NotAuthorized, Refused, TryAgain}
|
||||
import vinyldns.api.engine.RecordSetChangeHandler.{AlreadyApplied, Failure, ReadyToApply, Requeue}
|
||||
import vinyldns.api.repository.InMemoryBatchChangeRepository
|
||||
import vinyldns.api.{CatsHelpers, Interfaces}
|
||||
import vinyldns.core.domain.batch.{
|
||||
@ -206,7 +206,9 @@ class RecordSetChangeHandlerSpec
|
||||
.when(mockConn)
|
||||
.resolve(rs.name, rsChange.zone.name, rs.typ)
|
||||
|
||||
doReturn(Interfaces.result(Left(Refused("dns failure")))).when(mockConn).applyChange(rsChange)
|
||||
doReturn(Interfaces.result(Left(NotAuthorized("dns failure"))))
|
||||
.when(mockConn)
|
||||
.applyChange(rsChange)
|
||||
doReturn(IO.pure(cs)).when(mockChangeRepo).save(any[ChangeSet])
|
||||
doReturn(IO.pure(cs)).when(mockRsRepo).apply(any[ChangeSet])
|
||||
|
||||
@ -244,98 +246,10 @@ class RecordSetChangeHandlerSpec
|
||||
batchChangeUpdates.get.changes shouldBe scExpected
|
||||
}
|
||||
|
||||
"retry multiple times in verify if verify finds record does not exist" in {
|
||||
// All returns after first are for verify. Retry 2 times and succeed
|
||||
doReturn(Interfaces.result(List()))
|
||||
.doReturn(Interfaces.result(List()))
|
||||
.doReturn(Interfaces.result(List()))
|
||||
.doReturn(Interfaces.result(List(rs)))
|
||||
.when(mockConn)
|
||||
.resolve(rs.name, rsChange.zone.name, rs.typ)
|
||||
|
||||
doReturn(Interfaces.result(NoError(mockDnsMessage))).when(mockConn).applyChange(rsChange)
|
||||
doReturn(IO.pure(cs)).when(mockChangeRepo).save(any[ChangeSet])
|
||||
doReturn(IO.pure(cs)).when(mockRsRepo).apply(any[ChangeSet])
|
||||
|
||||
val test = underTest.apply(mockConn, rsChange)
|
||||
test.unsafeRunSync()
|
||||
|
||||
verify(mockRsRepo).apply(rsRepoCaptor.capture())
|
||||
verify(mockChangeRepo).save(changeRepoCaptor.capture())
|
||||
|
||||
val appliedCs = rsRepoCaptor.getValue
|
||||
appliedCs.status shouldBe ChangeSetStatus.Complete
|
||||
appliedCs.changes.head.status shouldBe RecordSetChangeStatus.Complete
|
||||
appliedCs.changes.head.recordSet.status shouldBe RecordSetStatus.Active
|
||||
|
||||
val savedCs = changeRepoCaptor.getValue
|
||||
savedCs.status shouldBe ChangeSetStatus.Complete
|
||||
savedCs.changes.head.status shouldBe RecordSetChangeStatus.Complete
|
||||
|
||||
// make sure the record was applied and then verified
|
||||
verify(mockConn).applyChange(rsChange)
|
||||
|
||||
// we will retry the verify 3 times based on the mock setup
|
||||
verify(mockConn, times(4)).resolve(rs.name, rsChange.zone.name, rs.typ)
|
||||
|
||||
val batchChangeUpdates = await(batchRepo.getBatchChange(batchChange.id))
|
||||
val updatedSingleChanges = completeCreateAAAASingleChanges.map { ch =>
|
||||
ch.copy(
|
||||
status = SingleChangeStatus.Complete,
|
||||
recordChangeId = Some(rsChange.id),
|
||||
recordSetId = Some(rsChange.recordSet.id)
|
||||
)
|
||||
}
|
||||
val scExpected = notUpdatedChange :: updatedSingleChanges
|
||||
batchChangeUpdates.get.changes shouldBe scExpected
|
||||
}
|
||||
|
||||
"fail the change if retry expires" in {
|
||||
doReturn(Interfaces.result(List()))
|
||||
.when(mockConn)
|
||||
.resolve(rs.name, rsChange.zone.name, rs.typ)
|
||||
|
||||
doReturn(Interfaces.result(NoError(mockDnsMessage))).when(mockConn).applyChange(rsChange)
|
||||
doReturn(IO.pure(cs)).when(mockChangeRepo).save(any[ChangeSet])
|
||||
doReturn(IO.pure(cs)).when(mockRsRepo).apply(any[ChangeSet])
|
||||
|
||||
val test = underTest.apply(mockConn, rsChange)
|
||||
test.unsafeRunSync()
|
||||
|
||||
verify(mockRsRepo).apply(rsRepoCaptor.capture())
|
||||
verify(mockChangeRepo).save(changeRepoCaptor.capture())
|
||||
|
||||
val appliedCs = rsRepoCaptor.getValue
|
||||
appliedCs.status shouldBe ChangeSetStatus.Complete
|
||||
appliedCs.changes.head.status shouldBe RecordSetChangeStatus.Failed
|
||||
appliedCs.changes.head.recordSet.status shouldBe RecordSetStatus.Inactive
|
||||
|
||||
val savedCs = changeRepoCaptor.getValue
|
||||
savedCs.status shouldBe ChangeSetStatus.Complete
|
||||
savedCs.changes.head.status shouldBe RecordSetChangeStatus.Failed
|
||||
|
||||
// make sure the record was applied and then verified
|
||||
verify(mockConn).applyChange(rsChange)
|
||||
|
||||
// resolve called once when validating, 12x for retries
|
||||
verify(mockConn, times(13)).resolve(rs.name, rsChange.zone.name, rs.typ)
|
||||
|
||||
val batchChangeUpdates = batchRepo.getBatchChange(batchChange.id).unsafeRunSync()
|
||||
val updatedSingleChanges = completeCreateAAAASingleChanges.map { ch =>
|
||||
ch.copy(
|
||||
status = SingleChangeStatus.Failed,
|
||||
recordChangeId = Some(rsChange.id),
|
||||
systemMessage = savedCs.changes.head.systemMessage
|
||||
)
|
||||
}
|
||||
val scExpected = notUpdatedChange :: updatedSingleChanges
|
||||
batchChangeUpdates.get.changes shouldBe scExpected
|
||||
}
|
||||
|
||||
"fail the change in verify if verify errors" in {
|
||||
// All returns after first are for verify. Retry 2 times and succeed
|
||||
doReturn(Interfaces.result(List()))
|
||||
.doReturn(Interfaces.result(Left(Refused("dns-fail"))))
|
||||
.doReturn(Interfaces.result(Left(NotAuthorized("dns-fail"))))
|
||||
.when(mockConn)
|
||||
.resolve(rs.name, rsChange.zone.name, rs.typ)
|
||||
|
||||
@ -376,9 +290,23 @@ class RecordSetChangeHandlerSpec
|
||||
batchChangeUpdates.get.changes shouldBe scExpected
|
||||
}
|
||||
|
||||
"requeue the change in verify if permissible errors" in {
|
||||
doReturn(Interfaces.result(List()))
|
||||
.doReturn(Interfaces.result(Left(TryAgain("dns-fail"))))
|
||||
.when(mockConn)
|
||||
.resolve(rs.name, rsChange.zone.name, rs.typ)
|
||||
|
||||
doReturn(Interfaces.result(NoError(mockDnsMessage))).when(mockConn).applyChange(rsChange)
|
||||
doReturn(IO.pure(cs)).when(mockChangeRepo).save(any[ChangeSet])
|
||||
doReturn(IO.pure(cs)).when(mockRsRepo).apply(any[ChangeSet])
|
||||
|
||||
val test = underTest.apply(mockConn, rsChange)
|
||||
a[Requeue] shouldBe thrownBy(test.unsafeRunSync())
|
||||
}
|
||||
|
||||
"fail the change if validating fails with an error" in {
|
||||
// Stage an error on the first resolve, which will cause validate to fail
|
||||
doReturn(Interfaces.result(Left(Refused("dns-failure"))))
|
||||
doReturn(Interfaces.result(Left(NotAuthorized("dns-failure"))))
|
||||
.when(mockConn)
|
||||
.resolve(rs.name, rsChange.zone.name, rs.typ)
|
||||
|
||||
@ -420,7 +348,9 @@ class RecordSetChangeHandlerSpec
|
||||
doReturn(Interfaces.result(List()))
|
||||
.when(mockConn)
|
||||
.resolve(rs.name, rsChange.zone.name, rs.typ)
|
||||
doReturn(Interfaces.result(Left(Refused("dns-fail")))).when(mockConn).applyChange(rsChange)
|
||||
doReturn(Interfaces.result(Left(NotAuthorized("dns-fail"))))
|
||||
.when(mockConn)
|
||||
.applyChange(rsChange)
|
||||
doReturn(IO.pure(cs)).when(mockChangeRepo).save(any[ChangeSet])
|
||||
doReturn(IO.pure(cs)).when(mockRsRepo).apply(any[ChangeSet])
|
||||
|
||||
@ -455,6 +385,20 @@ class RecordSetChangeHandlerSpec
|
||||
batchChangeUpdates.get.changes shouldBe scExpected
|
||||
}
|
||||
|
||||
"requeue the change in apply if permissible errors" in {
|
||||
doReturn(Interfaces.result(List()))
|
||||
.when(mockConn)
|
||||
.resolve(rs.name, rsChange.zone.name, rs.typ)
|
||||
doReturn(Interfaces.result(Left(Refused("dns-fail"))))
|
||||
.when(mockConn)
|
||||
.applyChange(rsChange)
|
||||
doReturn(IO.pure(cs)).when(mockChangeRepo).save(any[ChangeSet])
|
||||
doReturn(IO.pure(cs)).when(mockRsRepo).apply(any[ChangeSet])
|
||||
|
||||
val test = underTest.apply(mockConn, rsChange)
|
||||
a[Requeue] shouldBe thrownBy(test.unsafeRunSync())
|
||||
}
|
||||
|
||||
"bypass the validate and verify steps if a wildcard record exists" in {
|
||||
// Return a wildcard record
|
||||
doReturn(IO.pure(List(rsChange.recordSet)))
|
||||
|
Loading…
x
Reference in New Issue
Block a user