diff --git a/modules/api/functional_test/live_tests/batch/create_batch_change_test.py b/modules/api/functional_test/live_tests/batch/create_batch_change_test.py index 48ed0a77d..3147babc9 100644 --- a/modules/api/functional_test/live_tests/batch/create_batch_change_test.py +++ b/modules/api/functional_test/live_tests/batch/create_batch_change_test.py @@ -4080,7 +4080,7 @@ def test_create_batch_multi_record_update_succeeds(shared_zone_test_context): get_change_A_AAAA_json(a_delete_record_and_record_set_fqdn, address="1.1.1.1", change_type="DeleteRecordSet"), get_change_A_AAAA_json(a_delete_record_and_record_set_fqdn, change_type="DeleteRecordSet"), get_change_TXT_json(txt_delete_record_and_record_set_fqdn, text="hello", change_type="DeleteRecordSet"), - get_change_TXT_json(txt_delete_record_and_record_set_fqdn, change_type="DeleteRecordSet") + get_change_TXT_json(txt_delete_record_and_record_set_fqdn, change_type="DeleteRecordSet"), ] } @@ -4091,8 +4091,10 @@ def test_create_batch_multi_record_update_succeeds(shared_zone_test_context): create_rs = client.create_recordset(rs, status=202) to_delete.append(client.wait_until_recordset_change_status(create_rs, 'Complete')) - result = client.create_batch_change(batch_change_input, status=202) - client.wait_until_batch_change_completed(result) + initial_result = client.create_batch_change(batch_change_input, status=202) + result = client.wait_until_batch_change_completed(initial_result) + + assert_that(result['status'], is_('Complete')) # Check batch change response assert_change_success_response_values(result['changes'], zone=ok_zone, index=0, input_name=a_update_record_set_fqdn, record_name=a_update_record_set_name, record_data=None, change_type="DeleteRecordSet") @@ -4171,6 +4173,60 @@ def test_create_batch_multi_record_update_succeeds(shared_zone_test_context): finally: clear_recordset_list(to_delete, client) +@pytest.mark.skip_production +def test_create_batch_update_record_type_succeeds(shared_zone_test_context): + """ + Test existing record sets can be updated to a different type in batch (relies on skip-prod) + """ + client = shared_zone_test_context.ok_vinyldns_client + ok_zone = shared_zone_test_context.ok_zone + + # record sets to setup + a_update_to_cname_and_record_set_name = generate_record_name() + a_update_to_cname_and_record_set_fqdn = a_update_to_cname_and_record_set_name + ".ok." + a_update_to_cname_and_record_set = get_recordset_json(ok_zone, a_update_to_cname_and_record_set_name, "A", [{"address": "1.1.1.1"}], 200) + + cname_update_from_a_and_record_set_fqdn = a_update_to_cname_and_record_set_name + ".ok." + + batch_change_input = { + "comments": "this is optional", + "changes": [ + # Update A record to CNAME + get_change_A_AAAA_json(a_update_to_cname_and_record_set_fqdn, change_type="DeleteRecordSet"), + get_change_CNAME_json(cname_update_from_a_and_record_set_fqdn, cname="example.com.") + ] + } + + to_delete = [] + try: + create_rs = client.create_recordset(a_update_to_cname_and_record_set, status=202) + to_delete.append(client.wait_until_recordset_change_status(create_rs, 'Complete')) + + initial_result = client.create_batch_change(batch_change_input, status=202) + result = client.wait_until_batch_change_completed(initial_result) + + assert_that(result['status'], is_('Complete')) + + # Check batch change response + assert_change_success_response_values(result['changes'], zone=ok_zone, index=0, input_name=a_update_to_cname_and_record_set_fqdn, record_name=a_update_to_cname_and_record_set_name, record_type="A", record_data=None, change_type="DeleteRecordSet") + assert_change_success_response_values(result['changes'], zone=ok_zone, index=1, input_name=cname_update_from_a_and_record_set_fqdn, record_name=a_update_to_cname_and_record_set_name, record_type="CNAME", record_data="example.com.") + + # Perform look up to verify record set data + for rs in to_delete: + rs_id = rs['recordSet']['id'] + zone_id = rs['zone']['id'] + + client.get_recordset(zone_id, rs_id, status=404) + + new_cname_result_rs = result['changes'][1] + new_cname_rs = client.get_recordset(new_cname_result_rs['zoneId'], new_cname_result_rs['recordSetId'], status=200)['recordSet'] + new_cname_rdata = new_cname_rs['records'] + assert_that(new_cname_rdata, contains({"cname": "example.com."})) + client.delete_recordset(new_cname_rs['zoneId'], new_cname_rs['id'], status=202) + + finally: + clear_recordset_list(to_delete, client) + def test_create_batch_deletes_succeeds(shared_zone_test_context): """ Test creating batch change with DeleteRecordSet with valid record data succeeds diff --git a/modules/api/functional_test/vinyldns_python.py b/modules/api/functional_test/vinyldns_python.py index 799a7ecf5..e77a7a83f 100644 --- a/modules/api/functional_test/vinyldns_python.py +++ b/modules/api/functional_test/vinyldns_python.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) __all__ = [u'VinylDNSClient', u'MAX_RETRIES', u'RETRY_WAIT'] -MAX_RETRIES = 40 +MAX_RETRIES = 75 RETRY_WAIT = 0.05 class VinylDNSClient(object): diff --git a/modules/api/src/main/scala/vinyldns/api/engine/RecordSetChangeHandler.scala b/modules/api/src/main/scala/vinyldns/api/engine/RecordSetChangeHandler.scala index 0c8bec642..ae6fe3bd5 100644 --- a/modules/api/src/main/scala/vinyldns/api/engine/RecordSetChangeHandler.scala +++ b/modules/api/src/main/scala/vinyldns/api/engine/RecordSetChangeHandler.scala @@ -20,21 +20,19 @@ import cats.effect.{ContextShift, IO, Timer} import cats.implicits._ import org.slf4j.LoggerFactory import vinyldns.api.domain.dns.DnsConnection -import vinyldns.api.domain.dns.DnsProtocol.NoError +import vinyldns.api.domain.dns.DnsProtocol.{NoError, Refused, TryAgain} import vinyldns.api.domain.record.RecordSetHelpers._ import vinyldns.core.domain.batch.{BatchChangeRepository, SingleChange} import vinyldns.core.domain.record._ -import scala.concurrent.ExecutionContext -import scala.concurrent.duration._ - object RecordSetChangeHandler { private val logger = LoggerFactory.getLogger("vinyldns.api.engine.RecordSetChangeHandler") - private implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) private implicit val cs: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global) + final case class Requeue(change: RecordSetChange) extends Throwable + def apply( recordSetRepository: RecordSetRepository, recordChangeRepository: RecordChangeRepository, @@ -96,6 +94,8 @@ object RecordSetChangeHandler { private final case class Completed(change: RecordSetChange) extends ProcessorState + private final case class Retrying(change: RecordSetChange) extends ProcessorState + private case class ProcessingError(message: String) sealed trait ProcessingStatus @@ -111,6 +111,9 @@ object RecordSetChangeHandler { // at which point the response will be returned. final case class ReadyToApply(change: RecordSetChange) extends ProcessingStatus + // Failure to process change. Permitted to retry. + final case class Retry(change: RecordSetChange) extends ProcessingStatus + def getProcessingStatus(change: RecordSetChange, dnsConn: DnsConnection): IO[ProcessingStatus] = { def isDnsMatch(dnsResult: List[RecordSet], recordSet: RecordSet, zoneName: String): Boolean = dnsResult.exists(matches(_, recordSet, zoneName)) @@ -145,6 +148,7 @@ object RecordSetChangeHandler { if (existingRecords.nonEmpty) ReadyToApply(change) // we have a record set, move forward else AlreadyApplied(change) // we did not find the record set, so already applied } + case Left(_: TryAgain) => Retry(change) case Left(error) => Failure(change, error.getMessage) } } @@ -196,6 +200,10 @@ object RecordSetChangeHandler { case done: Completed => logger.info(s"CHANGE COMPLETED; ${getChangeLog(done.change)}") IO.pure(done) + + case Retrying(change) => + logger.info(s"CHANGE RETRYING; ${getChangeLog(change)}") + IO.raiseError(Requeue(change)) } } @@ -210,6 +218,7 @@ object RecordSetChangeHandler { s"Failed validating update to DNS for change ${change.id}:${change.recordSet.name}: " + message ) ) + case Retry(_) => Retrying(change) } /* Step 2: Apply the change to the dns backend */ @@ -217,6 +226,8 @@ object RecordSetChangeHandler { dnsConn.applyChange(change).value.map { case Right(_: NoError) => Applied(change) + case Left(_: Refused) => + Retrying(change) case Left(error) => Completed( change.failed( @@ -225,32 +236,17 @@ object RecordSetChangeHandler { ) } - /* Step 3: Verify the record was created. We attempt 12 times over 6 seconds */ - private def verify(change: RecordSetChange, dnsConn: DnsConnection): IO[ProcessorState] = { - def loop(retries: Int = 11): IO[ProcessorState] = - getProcessingStatus(change, dnsConn).flatMap { - case AlreadyApplied(_) => IO.pure(Completed(change.successful)) - case ReadyToApply(_) if retries <= 0 => - IO.pure( - Completed( - change.failed(s"""Failed verifying update to DNS for - |change ${change.id}:${change.recordSet.name}: Verify out of retries.""".stripMargin) + /* Step 3: Verify the record was created. If the ProcessorState is applied or failed we requeue the record.*/ + private def verify(change: RecordSetChange, dnsConn: DnsConnection): IO[ProcessorState] = + getProcessingStatus(change, dnsConn).map { + case AlreadyApplied(_) => Completed(change.successful) + case Failure(_, message) => Completed( + change.failed( + s"Failed verifying update to DNS for change ${change.id}:${change.recordSet.name}: $message" ) ) - case ReadyToApply(_) => - IO.sleep(500.milliseconds) *> loop(retries - 1) - case Failure(_, message) => - IO.pure( - Completed( - change.failed( - s"Failed verifying update to DNS for change ${change.id}:${change.recordSet.name}: $message" - ) - ) - ) - } - - loop() - } + case _ => Retrying(change) + } private def getChangeLog(change: RecordSetChange): String = { val sb = new StringBuilder diff --git a/modules/api/src/test/scala/vinyldns/api/engine/RecordSetChangeHandlerSpec.scala b/modules/api/src/test/scala/vinyldns/api/engine/RecordSetChangeHandlerSpec.scala index 3cf272fc4..15683a215 100644 --- a/modules/api/src/test/scala/vinyldns/api/engine/RecordSetChangeHandlerSpec.scala +++ b/modules/api/src/test/scala/vinyldns/api/engine/RecordSetChangeHandlerSpec.scala @@ -25,8 +25,8 @@ import org.scalatest.mockito.MockitoSugar import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpec} import org.xbill.DNS import vinyldns.api.domain.dns.DnsConnection -import vinyldns.api.domain.dns.DnsProtocol.{NoError, Refused} -import vinyldns.api.engine.RecordSetChangeHandler.{AlreadyApplied, Failure, ReadyToApply} +import vinyldns.api.domain.dns.DnsProtocol.{NoError, NotAuthorized, Refused, TryAgain} +import vinyldns.api.engine.RecordSetChangeHandler.{AlreadyApplied, Failure, ReadyToApply, Requeue} import vinyldns.api.repository.InMemoryBatchChangeRepository import vinyldns.api.{CatsHelpers, Interfaces} import vinyldns.core.domain.batch.{ @@ -206,7 +206,9 @@ class RecordSetChangeHandlerSpec .when(mockConn) .resolve(rs.name, rsChange.zone.name, rs.typ) - doReturn(Interfaces.result(Left(Refused("dns failure")))).when(mockConn).applyChange(rsChange) + doReturn(Interfaces.result(Left(NotAuthorized("dns failure")))) + .when(mockConn) + .applyChange(rsChange) doReturn(IO.pure(cs)).when(mockChangeRepo).save(any[ChangeSet]) doReturn(IO.pure(cs)).when(mockRsRepo).apply(any[ChangeSet]) @@ -244,98 +246,10 @@ class RecordSetChangeHandlerSpec batchChangeUpdates.get.changes shouldBe scExpected } - "retry multiple times in verify if verify finds record does not exist" in { - // All returns after first are for verify. Retry 2 times and succeed - doReturn(Interfaces.result(List())) - .doReturn(Interfaces.result(List())) - .doReturn(Interfaces.result(List())) - .doReturn(Interfaces.result(List(rs))) - .when(mockConn) - .resolve(rs.name, rsChange.zone.name, rs.typ) - - doReturn(Interfaces.result(NoError(mockDnsMessage))).when(mockConn).applyChange(rsChange) - doReturn(IO.pure(cs)).when(mockChangeRepo).save(any[ChangeSet]) - doReturn(IO.pure(cs)).when(mockRsRepo).apply(any[ChangeSet]) - - val test = underTest.apply(mockConn, rsChange) - test.unsafeRunSync() - - verify(mockRsRepo).apply(rsRepoCaptor.capture()) - verify(mockChangeRepo).save(changeRepoCaptor.capture()) - - val appliedCs = rsRepoCaptor.getValue - appliedCs.status shouldBe ChangeSetStatus.Complete - appliedCs.changes.head.status shouldBe RecordSetChangeStatus.Complete - appliedCs.changes.head.recordSet.status shouldBe RecordSetStatus.Active - - val savedCs = changeRepoCaptor.getValue - savedCs.status shouldBe ChangeSetStatus.Complete - savedCs.changes.head.status shouldBe RecordSetChangeStatus.Complete - - // make sure the record was applied and then verified - verify(mockConn).applyChange(rsChange) - - // we will retry the verify 3 times based on the mock setup - verify(mockConn, times(4)).resolve(rs.name, rsChange.zone.name, rs.typ) - - val batchChangeUpdates = await(batchRepo.getBatchChange(batchChange.id)) - val updatedSingleChanges = completeCreateAAAASingleChanges.map { ch => - ch.copy( - status = SingleChangeStatus.Complete, - recordChangeId = Some(rsChange.id), - recordSetId = Some(rsChange.recordSet.id) - ) - } - val scExpected = notUpdatedChange :: updatedSingleChanges - batchChangeUpdates.get.changes shouldBe scExpected - } - - "fail the change if retry expires" in { - doReturn(Interfaces.result(List())) - .when(mockConn) - .resolve(rs.name, rsChange.zone.name, rs.typ) - - doReturn(Interfaces.result(NoError(mockDnsMessage))).when(mockConn).applyChange(rsChange) - doReturn(IO.pure(cs)).when(mockChangeRepo).save(any[ChangeSet]) - doReturn(IO.pure(cs)).when(mockRsRepo).apply(any[ChangeSet]) - - val test = underTest.apply(mockConn, rsChange) - test.unsafeRunSync() - - verify(mockRsRepo).apply(rsRepoCaptor.capture()) - verify(mockChangeRepo).save(changeRepoCaptor.capture()) - - val appliedCs = rsRepoCaptor.getValue - appliedCs.status shouldBe ChangeSetStatus.Complete - appliedCs.changes.head.status shouldBe RecordSetChangeStatus.Failed - appliedCs.changes.head.recordSet.status shouldBe RecordSetStatus.Inactive - - val savedCs = changeRepoCaptor.getValue - savedCs.status shouldBe ChangeSetStatus.Complete - savedCs.changes.head.status shouldBe RecordSetChangeStatus.Failed - - // make sure the record was applied and then verified - verify(mockConn).applyChange(rsChange) - - // resolve called once when validating, 12x for retries - verify(mockConn, times(13)).resolve(rs.name, rsChange.zone.name, rs.typ) - - val batchChangeUpdates = batchRepo.getBatchChange(batchChange.id).unsafeRunSync() - val updatedSingleChanges = completeCreateAAAASingleChanges.map { ch => - ch.copy( - status = SingleChangeStatus.Failed, - recordChangeId = Some(rsChange.id), - systemMessage = savedCs.changes.head.systemMessage - ) - } - val scExpected = notUpdatedChange :: updatedSingleChanges - batchChangeUpdates.get.changes shouldBe scExpected - } - "fail the change in verify if verify errors" in { // All returns after first are for verify. Retry 2 times and succeed doReturn(Interfaces.result(List())) - .doReturn(Interfaces.result(Left(Refused("dns-fail")))) + .doReturn(Interfaces.result(Left(NotAuthorized("dns-fail")))) .when(mockConn) .resolve(rs.name, rsChange.zone.name, rs.typ) @@ -376,9 +290,23 @@ class RecordSetChangeHandlerSpec batchChangeUpdates.get.changes shouldBe scExpected } + "requeue the change in verify if permissible errors" in { + doReturn(Interfaces.result(List())) + .doReturn(Interfaces.result(Left(TryAgain("dns-fail")))) + .when(mockConn) + .resolve(rs.name, rsChange.zone.name, rs.typ) + + doReturn(Interfaces.result(NoError(mockDnsMessage))).when(mockConn).applyChange(rsChange) + doReturn(IO.pure(cs)).when(mockChangeRepo).save(any[ChangeSet]) + doReturn(IO.pure(cs)).when(mockRsRepo).apply(any[ChangeSet]) + + val test = underTest.apply(mockConn, rsChange) + a[Requeue] shouldBe thrownBy(test.unsafeRunSync()) + } + "fail the change if validating fails with an error" in { // Stage an error on the first resolve, which will cause validate to fail - doReturn(Interfaces.result(Left(Refused("dns-failure")))) + doReturn(Interfaces.result(Left(NotAuthorized("dns-failure")))) .when(mockConn) .resolve(rs.name, rsChange.zone.name, rs.typ) @@ -420,7 +348,9 @@ class RecordSetChangeHandlerSpec doReturn(Interfaces.result(List())) .when(mockConn) .resolve(rs.name, rsChange.zone.name, rs.typ) - doReturn(Interfaces.result(Left(Refused("dns-fail")))).when(mockConn).applyChange(rsChange) + doReturn(Interfaces.result(Left(NotAuthorized("dns-fail")))) + .when(mockConn) + .applyChange(rsChange) doReturn(IO.pure(cs)).when(mockChangeRepo).save(any[ChangeSet]) doReturn(IO.pure(cs)).when(mockRsRepo).apply(any[ChangeSet]) @@ -455,6 +385,20 @@ class RecordSetChangeHandlerSpec batchChangeUpdates.get.changes shouldBe scExpected } + "requeue the change in apply if permissible errors" in { + doReturn(Interfaces.result(List())) + .when(mockConn) + .resolve(rs.name, rsChange.zone.name, rs.typ) + doReturn(Interfaces.result(Left(Refused("dns-fail")))) + .when(mockConn) + .applyChange(rsChange) + doReturn(IO.pure(cs)).when(mockChangeRepo).save(any[ChangeSet]) + doReturn(IO.pure(cs)).when(mockRsRepo).apply(any[ChangeSet]) + + val test = underTest.apply(mockConn, rsChange) + a[Requeue] shouldBe thrownBy(test.unsafeRunSync()) + } + "bypass the validate and verify steps if a wildcard record exists" in { // Return a wildcard record doReturn(IO.pure(List(rsChange.recordSet)))