2
0
mirror of https://github.com/VinylDNS/vinyldns synced 2025-08-30 22:05:21 +00:00

move zone change processing into zone sync handler (#398)

* move zone change processing into zone sync handler
This commit is contained in:
Britney Wright
2018-12-20 12:20:16 -05:00
committed by GitHub
parent 48c7f9fb0b
commit 03b37366a5
5 changed files with 257 additions and 147 deletions

View File

@@ -87,17 +87,6 @@ records_post_update = [
'records': [{u'address': u'6.7.8.9'}]}]
def wait_for_zone_sync_to_complete(client, zone_id, latest_sync):
retries = MAX_RETRIES
zone_request = client.get_zone(zone_id)
while zone_request[u'zone'][u'latestSync'] == latest_sync and retries > 0:
zone_request = client.get_zone(zone_id)
time.sleep(RETRY_WAIT)
retries -= 1
assert_that(zone_request[u'zone'][u'latestSync'], is_not(latest_sync))
@pytest.mark.skip_production
def test_sync_zone_success(shared_zone_test_context):
"""
@@ -163,8 +152,8 @@ def test_sync_zone_success(shared_zone_test_context):
time.sleep(10)
# sync again
client.sync_zone(zone['id'], status=202)
wait_for_zone_sync_to_complete(client, zone['id'], latest_sync)
change = client.sync_zone(zone['id'], status=202)
client.wait_until_zone_change_status_synced(change)
# confirm cannot again sync without waiting
client.sync_zone(zone['id'], status=403)

View File

@@ -134,14 +134,7 @@ object CommandHandler {
message.command match {
case sync: ZoneChange
if sync.changeType == ZoneChangeType.Sync || sync.changeType == ZoneChangeType.Create =>
val doSync =
for {
_ <- zoneChangeProcessor(sync) // make sure zone is updated to a syncing status
syncChange <- zoneSyncProcessor(sync)
_ <- zoneChangeProcessor(syncChange) // update zone to Active
} yield syncChange
outcomeOf(message)(doSync)
outcomeOf(message)(zoneSyncProcessor(sync))
case zoneChange: ZoneChange =>
outcomeOf(message)(zoneChangeProcessor(zoneChange))
@@ -196,7 +189,7 @@ object CommandHandler {
val recordChangeHandler =
RecordSetChangeHandler(recordSetRepo, recordChangeRepo, batchChangeRepo)
val zoneSyncHandler =
ZoneSyncHandler(recordSetRepo, recordChangeRepo)
ZoneSyncHandler(recordSetRepo, recordChangeRepo, zoneChangeRepo, zoneRepo)
CommandHandler
.mainFlow(

View File

@@ -23,8 +23,14 @@ import org.slf4j.LoggerFactory
import vinyldns.api.domain.dns.DnsConversions
import vinyldns.api.domain.zone.{DnsZoneViewLoader, VinylDNSZoneViewLoader}
import vinyldns.core.domain.record._
import vinyldns.core.domain.zone.{Zone, ZoneChange, ZoneChangeStatus, ZoneStatus}
import vinyldns.core.domain.zone.{Zone, ZoneStatus}
import vinyldns.core.route.Monitored
import vinyldns.core.domain.zone.{
ZoneChange,
ZoneChangeRepository,
ZoneChangeStatus,
ZoneRepository
}
object ZoneSyncHandler extends DnsConversions with Monitored {
@@ -35,71 +41,109 @@ object ZoneSyncHandler extends DnsConversions with Monitored {
def apply(
recordSetRepository: RecordSetRepository,
recordChangeRepository: RecordChangeRepository,
zoneChangeRepository: ZoneChangeRepository,
zoneRepository: ZoneRepository,
dnsLoader: Zone => DnsZoneViewLoader = DnsZoneViewLoader.apply,
vinyldnsLoader: (Zone, RecordSetRepository) => VinylDNSZoneViewLoader =
VinylDNSZoneViewLoader.apply): ZoneChange => IO[ZoneChange] =
zoneChange =>
monitor("zone.sync") {
time(s"zone.sync(${zoneChange.zoneId})") {
val zone = zoneChange.zone
for {
_ <- saveZoneAndChange(zoneRepository, zoneChangeRepository, zoneChange) // initial save to store zone status
// as Syncing
syncChange <- runSync(
recordSetRepository,
recordChangeRepository,
zoneChange,
dnsLoader,
vinyldnsLoader)
_ <- saveZoneAndChange(zoneRepository, zoneChangeRepository, syncChange) // final save to store zone status
// as Active
} yield syncChange
val dnsView = time(s"zone.sync.loadDnsView(${zoneChange.id})")(dnsLoader(zone).load())
val vinyldnsView = time(s"zone.sync.loadVinylDNSView(${zoneChange.id})")(
vinyldnsLoader(zone, recordSetRepository).load())
val recordSetChanges = (dnsView, vinyldnsView).parTupled.map {
case (dnsZoneView, vinylDnsZoneView) => vinylDnsZoneView.diff(dnsZoneView)
def saveZoneAndChange(
zoneRepository: ZoneRepository,
zoneChangeRepository: ZoneChangeRepository,
zoneChange: ZoneChange): IO[ZoneChange] =
zoneRepository.save(zoneChange.zone).flatMap {
case Left(duplicateZoneError) =>
zoneChangeRepository.save(
zoneChange.copy(
status = ZoneChangeStatus.Failed,
systemMessage = Some(duplicateZoneError.message))
)
case Right(_) =>
zoneChangeRepository.save(zoneChange)
}
def runSync(
recordSetRepository: RecordSetRepository,
recordChangeRepository: RecordChangeRepository,
zoneChange: ZoneChange,
dnsLoader: Zone => DnsZoneViewLoader = DnsZoneViewLoader.apply,
vinyldnsLoader: (Zone, RecordSetRepository) => VinylDNSZoneViewLoader =
VinylDNSZoneViewLoader.apply): IO[ZoneChange] =
monitor("zone.sync") {
time(s"zone.sync(${zoneChange.zoneId})") {
val zone = zoneChange.zone
val dnsView = time(s"zone.sync.loadDnsView(${zoneChange.id})")(dnsLoader(zone).load())
val vinyldnsView = time(s"zone.sync.loadVinylDNSView(${zoneChange.id})")(
vinyldnsLoader(zone, recordSetRepository).load())
val recordSetChanges = (dnsView, vinyldnsView).parTupled.map {
case (dnsZoneView, vinylDnsZoneView) => vinylDnsZoneView.diff(dnsZoneView)
}
recordSetChanges.flatMap { allChanges =>
val changesWithUserIds = allChanges.map(_.withUserId(zoneChange.userId))
// not accepting unknown record types or dotted hosts
val (changes, dropped) = changesWithUserIds.partition { rs =>
rs.recordSet.typ != RecordType.UNKNOWN
}
recordSetChanges.flatMap { allChanges =>
val changesWithUserIds = allChanges.map(_.withUserId(zoneChange.userId))
// not accepting unknown record types or dotted hosts
val (changes, dropped) = changesWithUserIds.partition { rs =>
rs.recordSet.typ != RecordType.UNKNOWN
}
if (dropped.nonEmpty) {
val droppedInfo = dropped
.map(chg => chg.recordSet.name + " " + chg.recordSet.typ)
.mkString(", ")
logger.warn(
s"Zone sync for change $zoneChange dropped ${dropped.size} recordsets: $droppedInfo")
}
if (dropped.nonEmpty) {
val droppedInfo = dropped
.map(chg => chg.recordSet.name + " " + chg.recordSet.typ)
.mkString(", ")
logger.warn(
s"Zone sync for change $zoneChange dropped ${dropped.size} recordsets: $droppedInfo")
}
if (changes.isEmpty) {
logger.info(s"Zone sync for change $zoneChange had no records to sync")
IO.pure(
zoneChange.copy(
zone.copy(status = ZoneStatus.Active, latestSync = Some(DateTime.now))))
} else {
logger.info(
s"Zone sync for change $zoneChange found ${changes.size} changes to be saved")
val changeSet = ChangeSet(changes).copy(status = ChangeSetStatus.Applied)
if (changes.isEmpty) {
logger.info(s"Zone sync for change $zoneChange had no records to sync")
IO.pure(
zoneChange.copy(
zone.copy(status = ZoneStatus.Active, latestSync = Some(DateTime.now))))
} else {
logger.info(
s"Zone sync for change $zoneChange found ${changes.size} changes to be saved")
val changeSet = ChangeSet(changes).copy(status = ChangeSetStatus.Applied)
// we want to make sure we write to both the change repo and record set repo
// at the same time as this can take a while
val saveRecordChanges = time(s"zone.sync.saveChanges(${zoneChange.zoneId})")(
recordChangeRepository.save(changeSet))
val saveRecordSets = time(s"zone.sync.saveRecordSets(${zoneChange.zoneId})")(
recordSetRepository.apply(changeSet))
// we want to make sure we write to both the change repo and record set repo
// at the same time as this can take a while
val saveRecordChanges = time(s"zone.sync.saveChanges(${zoneChange.zoneId})")(
recordChangeRepository.save(changeSet))
val saveRecordSets = time(s"zone.sync.saveRecordSets(${zoneChange.zoneId})")(
recordSetRepository.apply(changeSet))
// join together the results of saving both the record changes as well as the record sets
for {
_ <- saveRecordChanges
_ <- saveRecordSets
} yield
zoneChange.copy(
zone.copy(status = ZoneStatus.Active, latestSync = Some(DateTime.now)))
}
// join together the results of saving both the record changes as well as the record sets
for {
_ <- saveRecordChanges
_ <- saveRecordSets
} yield
zoneChange.copy(
zone.copy(status = ZoneStatus.Active, latestSync = Some(DateTime.now)),
status = ZoneChangeStatus.Synced)
}
}
}.attempt
.map {
case Left(e: Throwable) =>
logger.error(s"Encountered error syncing zone ${zoneChange.zone.name}", e)
// We want to just move back to an active status, do not update latest sync
zoneChange.copy(
zone = zoneChange.zone.copy(status = ZoneStatus.Active),
status = ZoneChangeStatus.Failed)
case Right(ok) => ok
}
}.attempt
.map {
case Left(e: Throwable) =>
logger.error(s"Encountered error syncing zone ${zoneChange.zone.name}", e)
// We want to just move back to an active status, do not update latest sync
zoneChange.copy(
zone = zoneChange.zone.copy(status = ZoneStatus.Active),
status = ZoneChangeStatus.Failed)
case Right(ok) => ok
}
}

View File

@@ -221,7 +221,6 @@ class CommandHandlerSpec
.apply(zoneCreate)
doReturn(IO.pure(zoneCreate)).when(mockZoneSyncProcessor).apply(zoneCreate)
Stream.emit(change).covary[IO].through(processor).compile.drain.unsafeRunSync()
verify(mockZoneChangeProcessor, times(2)).apply(zoneCreate)
verify(mockZoneSyncProcessor).apply(zoneCreate)
verifyZeroInteractions(mockRecordChangeProcessor)
}
@@ -231,7 +230,6 @@ class CommandHandlerSpec
doReturn(IO.pure(sync)).doReturn(IO.pure(change)).when(mockZoneChangeProcessor).apply(sync)
doReturn(IO.pure(sync)).when(mockZoneSyncProcessor).apply(sync)
Stream.emit(change).covary[IO].through(processor).compile.drain.unsafeRunSync()
verify(mockZoneChangeProcessor, times(2)).apply(sync)
verify(mockZoneSyncProcessor).apply(sync)
verifyZeroInteractions(mockRecordChangeProcessor)
}

View File

@@ -27,6 +27,7 @@ import vinyldns.api.VinylDNSTestData
import vinyldns.api.domain.record.RecordSetChangeGenerator
import vinyldns.api.domain.zone.{DnsZoneViewLoader, VinylDNSZoneViewLoader, ZoneView}
import vinyldns.core.domain.record._
import vinyldns.core.domain.zone.ZoneRepository.DuplicateZoneError
import vinyldns.core.domain.zone._
class ZoneSyncHandlerSpec
@@ -140,10 +141,26 @@ class ZoneSyncHandlerSpec
private val testRecordSetChange = RecordSetChangeGenerator.forSyncAdd(testRecord2, testZone)
private val testChangeSet =
ChangeSet.apply(testRecordSetChange).copy(status = ChangeSetStatus.Applied)
private val testZoneChange = ZoneChange(testZone, testZone.account, ZoneChangeType.Sync)
private val testZoneChange =
ZoneChange(testZone.copy(status = ZoneStatus.Syncing), testZone.account, ZoneChangeType.Sync)
private val testDnsView = ZoneView(testZone, List(testRecord1, testRecord2))
private val testVinylDNSView = ZoneView(testZone, List(testRecord1))
private val zoneSync = ZoneSyncHandler(
recordSetRepo,
recordChangeRepo,
zoneChangeRepo,
zoneRepo,
_ => mockDNSLoader,
(_, _) => mockVinylDNSLoader)
private val runSync = ZoneSyncHandler.runSync(
recordSetRepo,
recordChangeRepo,
testZoneChange,
_ => mockDNSLoader,
(_, _) => mockVinylDNSLoader)
override def beforeEach(): Unit = {
reset(recordSetRepo)
reset(recordChangeRepo)
@@ -164,31 +181,130 @@ class ZoneSyncHandlerSpec
doReturn(() => IO(testVinylDNSView)).when(mockVinylDNSLoader).load
}
"ZoneSyncer" should {
"Send the correct zone to the DNSZoneViewLoader" in {
"ZoneSyncHandler" should {
"process successful zone sync" in {
doReturn(IO.pure(Right(testZoneChange)))
.when(zoneRepo)
.save(any[Zone])
val result = zoneSync(testZoneChange).unsafeRunSync()
val changeCaptor = ArgumentCaptor.forClass(classOf[ZoneChange])
verify(zoneChangeRepo, times(2)).save(changeCaptor.capture())
val savedChange = changeCaptor.getAllValues
// first saveZoneAndChange
savedChange.get(0).status shouldBe ZoneChangeStatus.Pending
savedChange.get(0).zone.status shouldBe ZoneStatus.Syncing
savedChange.get(0).zone.latestSync should not be defined
// second saveZoneAndChange
savedChange.get(1).status shouldBe ZoneChangeStatus.Synced
savedChange.get(1).zone.status shouldBe ZoneStatus.Active
savedChange.get(1).zone.latestSync shouldBe defined
// returned result
result.status shouldBe ZoneChangeStatus.Synced
result.zone.status shouldBe ZoneStatus.Active
result.zone.latestSync shouldBe defined
}
"handle failed zone sync" in {
doReturn(() => IO.raiseError(new RuntimeException("Dns Failed")))
.when(mockVinylDNSLoader)
.load
doReturn(IO.pure(Right(testZoneChange)))
.when(zoneRepo)
.save(any[Zone])
val result = zoneSync(testZoneChange).unsafeRunSync()
val changeCaptor = ArgumentCaptor.forClass(classOf[ZoneChange])
verify(zoneChangeRepo, times(2)).save(changeCaptor.capture())
val savedChange = changeCaptor.getAllValues
// first saveZoneAndChange
savedChange.get(0).status shouldBe ZoneChangeStatus.Pending
savedChange.get(0).zone.status shouldBe ZoneStatus.Syncing
savedChange.get(0).zone.latestSync should not be defined
// second saveZoneAndChange
savedChange.get(1).status shouldBe ZoneChangeStatus.Failed
savedChange.get(1).zone.status shouldBe ZoneStatus.Active
savedChange.get(1).zone.latestSync should not be defined
// final result
result.status shouldBe ZoneChangeStatus.Failed
result.zone.status shouldBe ZoneStatus.Active
result.zone.latestSync should not be defined
}
}
"saveZoneAndChange" should {
"save zone and zoneChange with given statuses" in {
doReturn(IO.pure(Right(testZoneChange))).when(zoneRepo).save(testZoneChange.zone)
ZoneSyncHandler.saveZoneAndChange(zoneRepo, zoneChangeRepo, testZoneChange).unsafeRunSync()
val changeCaptor = ArgumentCaptor.forClass(classOf[ZoneChange])
verify(zoneChangeRepo).save(changeCaptor.capture())
val savedChange = changeCaptor.getValue
savedChange.status shouldBe ZoneChangeStatus.Pending
savedChange.zone.status shouldBe ZoneStatus.Syncing
savedChange.zone.latestSync shouldBe testZoneChange.zone.latestSync
}
"handle duplicateZoneError" in {
doReturn(IO.pure(Left(DuplicateZoneError("error")))).when(zoneRepo).save(testZoneChange.zone)
ZoneSyncHandler.saveZoneAndChange(zoneRepo, zoneChangeRepo, testZoneChange).unsafeRunSync()
val changeCaptor = ArgumentCaptor.forClass(classOf[ZoneChange])
verify(zoneChangeRepo).save(changeCaptor.capture())
val savedChange = changeCaptor.getValue
savedChange.status shouldBe ZoneChangeStatus.Failed
savedChange.zone.status shouldBe ZoneStatus.Syncing
savedChange.systemMessage shouldBe Some("Zone with name \"error\" already exists.")
}
}
"runSync" should {
"send the correct zone to the DNSZoneViewLoader" in {
val captor = ArgumentCaptor.forClass(classOf[Zone])
val dnsLoader = mock[Zone => DnsZoneViewLoader]
doReturn(mockDNSLoader).when(dnsLoader).apply(any[Zone])
val syncer =
ZoneSyncHandler(recordSetRepo, recordChangeRepo, dnsLoader, (_, _) => mockVinylDNSLoader)
syncer(testZoneChange).unsafeRunSync()
ZoneSyncHandler
.runSync(
recordSetRepo,
recordChangeRepo,
testZoneChange,
dnsLoader,
(_, _) => mockVinylDNSLoader)
.unsafeRunSync()
verify(dnsLoader).apply(captor.capture())
val req = captor.getValue
req shouldBe testZone
req shouldBe testZone.copy(status = ZoneStatus.Syncing)
}
"load the dns zone from DNSZoneViewLoader" in {
val syncer = ZoneSyncHandler(
recordSetRepo,
recordChangeRepo,
_ => mockDNSLoader,
(_, _) => mockVinylDNSLoader)
syncer(testZoneChange).unsafeRunSync()
ZoneSyncHandler
.runSync(
recordSetRepo,
recordChangeRepo,
testZoneChange,
_ => mockDNSLoader,
(_, _) => mockVinylDNSLoader)
.unsafeRunSync()
verify(mockDNSLoader, times(1)).load
}
@@ -200,22 +316,22 @@ class ZoneSyncHandlerSpec
val vinyldnsLoader = mock[(Zone, RecordSetRepository) => VinylDNSZoneViewLoader]
doReturn(mockVinylDNSLoader).when(vinyldnsLoader).apply(any[Zone], any[RecordSetRepository])
val syncer =
ZoneSyncHandler(recordSetRepo, recordChangeRepo, _ => mockDNSLoader, vinyldnsLoader)
syncer(testZoneChange).unsafeRunSync()
ZoneSyncHandler
.runSync(
recordSetRepo,
recordChangeRepo,
testZoneChange,
_ => mockDNSLoader,
vinyldnsLoader)
.unsafeRunSync()
verify(vinyldnsLoader).apply(zoneCaptor.capture(), repoCaptor.capture())
val req = zoneCaptor.getValue
req shouldBe testZone
req shouldBe testZone.copy(status = ZoneStatus.Syncing)
}
"load the dns zone from VinylDNSZoneViewLoader" in {
val syncer = ZoneSyncHandler(
recordSetRepo,
recordChangeRepo,
_ => mockDNSLoader,
(_, _) => mockVinylDNSLoader)
syncer(testZoneChange).unsafeRunSync()
runSync.unsafeRunSync()
verify(mockVinylDNSLoader, times(1)).load
}
@@ -227,12 +343,7 @@ class ZoneSyncHandlerSpec
doReturn(List(testRecordSetChange)).when(testVinylDNSView).diff(any[ZoneView])
doReturn(() => IO(testVinylDNSView)).when(mockVinylDNSLoader).load
val syncer = ZoneSyncHandler(
recordSetRepo,
recordChangeRepo,
_ => mockDNSLoader,
(_, _) => mockVinylDNSLoader)
syncer(testZoneChange).unsafeRunSync()
runSync.unsafeRunSync()
verify(testVinylDNSView).diff(captor.capture())
val req = captor.getValue
@@ -241,13 +352,7 @@ class ZoneSyncHandlerSpec
"save the record changes to the recordChangeRepo" in {
val captor = ArgumentCaptor.forClass(classOf[ChangeSet])
val syncer = ZoneSyncHandler(
recordSetRepo,
recordChangeRepo,
_ => mockDNSLoader,
(_, _) => mockVinylDNSLoader)
syncer(testZoneChange).unsafeRunSync()
runSync.unsafeRunSync()
verify(recordChangeRepo).save(captor.capture())
val req = captor.getValue
@@ -256,12 +361,7 @@ class ZoneSyncHandlerSpec
"save the record sets to the recordSetRepo" in {
val captor = ArgumentCaptor.forClass(classOf[ChangeSet])
val syncer = ZoneSyncHandler(
recordSetRepo,
recordChangeRepo,
_ => mockDNSLoader,
(_, _) => mockVinylDNSLoader)
syncer(testZoneChange).unsafeRunSync()
runSync.unsafeRunSync()
verify(recordSetRepo).apply(captor.capture())
val req = captor.getValue
@@ -271,13 +371,7 @@ class ZoneSyncHandlerSpec
"returns the zone as active and sets the latest sync" in {
val testVinylDNSView = ZoneView(testZone, List(testRecord1, testRecord2))
doReturn(() => IO(testVinylDNSView)).when(mockVinylDNSLoader).load
val syncer = ZoneSyncHandler(
recordSetRepo,
recordChangeRepo,
_ => mockDNSLoader,
(_, _) => mockVinylDNSLoader)
val result = syncer(testZoneChange).unsafeRunSync()
val result = runSync.unsafeRunSync()
result.zone.status shouldBe ZoneStatus.Active
result.zone.latestSync shouldBe defined
@@ -300,12 +394,7 @@ class ZoneSyncHandlerSpec
doReturn(IO(correctChangeSet)).when(recordSetRepo).apply(captor.capture())
doReturn(IO(correctChangeSet)).when(recordChangeRepo).save(any[ChangeSet])
val syncer = ZoneSyncHandler(
recordSetRepo,
recordChangeRepo,
_ => mockDNSLoader,
(_, _) => mockVinylDNSLoader)
syncer(testZoneChange).unsafeRunSync()
runSync.unsafeRunSync()
captor.getValue.changes should contain theSameElementsAs expectedChanges
}
@@ -328,12 +417,14 @@ class ZoneSyncHandlerSpec
val zoneChange = ZoneChange(testReverseZone, testReverseZone.account, ZoneChangeType.Sync)
val syncer = ZoneSyncHandler(
recordSetRepo,
recordChangeRepo,
_ => mockDNSLoader,
(_, _) => mockVinylDNSLoader)
syncer(zoneChange).unsafeRunSync()
ZoneSyncHandler
.runSync(
recordSetRepo,
recordChangeRepo,
zoneChange,
_ => mockDNSLoader,
(_, _) => mockVinylDNSLoader)
.unsafeRunSync()
captor.getValue.changes should contain theSameElementsAs expectedChanges
}
@@ -342,12 +433,7 @@ class ZoneSyncHandlerSpec
doReturn(() => IO.raiseError(new RuntimeException("Dns Failed")))
.when(mockVinylDNSLoader)
.load
val syncer = ZoneSyncHandler(
recordSetRepo,
recordChangeRepo,
_ => mockDNSLoader,
(_, _) => mockVinylDNSLoader)
val result = syncer(testZoneChange).unsafeRunSync()
val result = runSync.unsafeRunSync()
result.status shouldBe ZoneChangeStatus.Failed
result.zone.status shouldBe ZoneStatus.Active