mirror of
https://github.com/VinylDNS/vinyldns
synced 2025-08-22 02:02:14 +00:00
Merge pull request #1389 from Aravindh-Raju/aravindhr/fix-update-logic
Fix batch update logic
This commit is contained in:
commit
2e81e05104
@ -34,10 +34,6 @@ import vinyldns.core.queue.MessageQueue
|
|||||||
class BatchChangeConverter(batchChangeRepo: BatchChangeRepository, messageQueue: MessageQueue)
|
class BatchChangeConverter(batchChangeRepo: BatchChangeRepository, messageQueue: MessageQueue)
|
||||||
extends BatchChangeConverterAlgebra {
|
extends BatchChangeConverterAlgebra {
|
||||||
|
|
||||||
private val nonExistentRecordDeleteMessage: String = "This record does not exist. " +
|
|
||||||
"No further action is required."
|
|
||||||
private val nonExistentRecordDataDeleteMessage: String = "Record data entered does not exist. " +
|
|
||||||
"No further action is required."
|
|
||||||
private val failedMessage: String = "Error queueing RecordSetChange for processing"
|
private val failedMessage: String = "Error queueing RecordSetChange for processing"
|
||||||
private val logger = LoggerFactory.getLogger(classOf[BatchChangeConverter])
|
private val logger = LoggerFactory.getLogger(classOf[BatchChangeConverter])
|
||||||
|
|
||||||
@ -140,7 +136,7 @@ class BatchChangeConverter(batchChangeRepo: BatchChangeRepository, messageQueue:
|
|||||||
def storeQueuingFailures(batchChange: BatchChange): BatchResult[Unit] = {
|
def storeQueuingFailures(batchChange: BatchChange): BatchResult[Unit] = {
|
||||||
// Update if Single change is Failed or if a record that does not exist is deleted
|
// Update if Single change is Failed or if a record that does not exist is deleted
|
||||||
val failedAndNotExistsChanges = batchChange.changes.collect {
|
val failedAndNotExistsChanges = batchChange.changes.collect {
|
||||||
case change if change.status == SingleChangeStatus.Failed || change.systemMessage.contains(nonExistentRecordDeleteMessage) || change.systemMessage.contains(nonExistentRecordDataDeleteMessage) => change
|
case change if change.status == SingleChangeStatus.Failed => change
|
||||||
}
|
}
|
||||||
val storeChanges = batchChangeRepo.updateSingleChanges(failedAndNotExistsChanges).as(())
|
val storeChanges = batchChangeRepo.updateSingleChanges(failedAndNotExistsChanges).as(())
|
||||||
storeChanges
|
storeChanges
|
||||||
@ -218,7 +214,7 @@ class BatchChangeConverter(batchChangeRepo: BatchChangeRepository, messageQueue:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// New record set for add/update or single delete
|
// New record set for add/update/full deletes
|
||||||
lazy val newRecordSet = {
|
lazy val newRecordSet = {
|
||||||
val firstAddChange = singleChangeNel.collect {
|
val firstAddChange = singleChangeNel.collect {
|
||||||
case sac: SingleAddChange => sac
|
case sac: SingleAddChange => sac
|
||||||
@ -246,6 +242,32 @@ class BatchChangeConverter(batchChangeRepo: BatchChangeRepository, messageQueue:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// New record set for single delete which exists in dns backend but not in vinyl
|
||||||
|
lazy val newDeleteRecordSet = {
|
||||||
|
val firstDeleteChange = singleChangeNel.collect {
|
||||||
|
case sad: SingleDeleteRRSetChange => sad
|
||||||
|
}.headOption
|
||||||
|
|
||||||
|
val newTtlRecordNameTuple = firstDeleteChange
|
||||||
|
.map(del => del.recordName)
|
||||||
|
.orElse(existingRecordSet.map(rs => Some(rs.name)))
|
||||||
|
|
||||||
|
newTtlRecordNameTuple.collect{
|
||||||
|
case Some(recordName) =>
|
||||||
|
RecordSet(
|
||||||
|
zone.id,
|
||||||
|
recordName,
|
||||||
|
recordType,
|
||||||
|
7200L,
|
||||||
|
RecordSetStatus.Pending,
|
||||||
|
Instant.now.truncatedTo(ChronoUnit.MILLIS),
|
||||||
|
None,
|
||||||
|
proposedRecordData.toList,
|
||||||
|
ownerGroupId = setOwnerGroupId
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Generate RecordSetChange based on logical type
|
// Generate RecordSetChange based on logical type
|
||||||
logicalChangeType match {
|
logicalChangeType match {
|
||||||
case Add =>
|
case Add =>
|
||||||
@ -261,7 +283,12 @@ class BatchChangeConverter(batchChangeRepo: BatchChangeRepository, messageQueue:
|
|||||||
existingRs <- existingRecordSet
|
existingRs <- existingRecordSet
|
||||||
newRs <- newRecordSet
|
newRs <- newRecordSet
|
||||||
} yield RecordSetChangeGenerator.forUpdate(existingRs, newRs, zone, userId, singleChangeIds)
|
} yield RecordSetChangeGenerator.forUpdate(existingRs, newRs, zone, userId, singleChangeIds)
|
||||||
case _ => None // This case should never happen
|
case OutOfSync =>
|
||||||
|
newDeleteRecordSet.map { newDelRs =>
|
||||||
|
RecordSetChangeGenerator.forOutOfSync(newDelRs, zone, userId, singleChangeIds)
|
||||||
|
}
|
||||||
|
case _ =>
|
||||||
|
None // This case should never happen
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -236,19 +236,36 @@ object BatchTransformations {
|
|||||||
// New proposed record data (assuming all validations pass)
|
// New proposed record data (assuming all validations pass)
|
||||||
val proposedRecordData = existingRecords -- deleteChangeSet ++ addChangeRecordDataSet
|
val proposedRecordData = existingRecords -- deleteChangeSet ++ addChangeRecordDataSet
|
||||||
|
|
||||||
// Note: "Add" where an Add and DeleteRecordSet is provided for a DNS record that does not exist.
|
|
||||||
// Adds the record if it doesn't exist and ignores the delete.
|
|
||||||
val logicalChangeType = (addChangeRecordDataSet.nonEmpty, deleteChangeSet.nonEmpty) match {
|
val logicalChangeType = (addChangeRecordDataSet.nonEmpty, deleteChangeSet.nonEmpty) match {
|
||||||
case (true, true) =>
|
case (true, true) =>
|
||||||
if((deleteChangeSet -- existingRecords).nonEmpty) LogicalChangeType.Add else LogicalChangeType.Update
|
if (existingRecords.isEmpty) {
|
||||||
case (true, false) => LogicalChangeType.Add
|
// Note: "Add" where an Add and DeleteRecordSet is provided for a DNS record that does not exist.
|
||||||
case (false, true) =>
|
// Adds the record if it doesn't exist and ignores the delete.
|
||||||
if ((existingRecords -- deleteChangeSet).isEmpty) {
|
LogicalChangeType.Add
|
||||||
LogicalChangeType.FullDelete
|
|
||||||
} else {
|
} else {
|
||||||
|
// Note: "Update" where an Add and DeleteRecordSet is provided for a DNS record that exist, but record data for DeleteRecordSet does not exist.
|
||||||
|
// Updates the record and ignores the delete.
|
||||||
LogicalChangeType.Update
|
LogicalChangeType.Update
|
||||||
}
|
}
|
||||||
case (false, false) => LogicalChangeType.NotEditedInBatch
|
case (true, false) => LogicalChangeType.Add
|
||||||
|
case (false, true) =>
|
||||||
|
if (existingRecords == deleteChangeSet) {
|
||||||
|
LogicalChangeType.FullDelete
|
||||||
|
} else if (deleteChangeSet.exists(existingRecords.contains)) {
|
||||||
|
LogicalChangeType.Update
|
||||||
|
} else {
|
||||||
|
LogicalChangeType.OutOfSync
|
||||||
|
}
|
||||||
|
case (false, false) =>
|
||||||
|
if(changes.exists {
|
||||||
|
case _: DeleteRRSetChangeForValidation => true
|
||||||
|
case _ => false
|
||||||
|
}
|
||||||
|
){
|
||||||
|
LogicalChangeType.OutOfSync
|
||||||
|
} else {
|
||||||
|
LogicalChangeType.NotEditedInBatch
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
new ValidationChanges(addChangeRecordDataSet, deleteChangeSet, proposedRecordData, logicalChangeType)
|
new ValidationChanges(addChangeRecordDataSet, deleteChangeSet, proposedRecordData, logicalChangeType)
|
||||||
@ -270,6 +287,6 @@ object BatchTransformations {
|
|||||||
|
|
||||||
object LogicalChangeType extends Enumeration {
|
object LogicalChangeType extends Enumeration {
|
||||||
type LogicalChangeType = Value
|
type LogicalChangeType = Value
|
||||||
val Add, FullDelete, Update, NotEditedInBatch = Value
|
val Add, FullDelete, Update, NotEditedInBatch, OutOfSync = Value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -113,6 +113,25 @@ object RecordSetChangeGenerator extends DnsConversions {
|
|||||||
singleBatchChangeIds = singleBatchChangeIds
|
singleBatchChangeIds = singleBatchChangeIds
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def forOutOfSync(
|
||||||
|
recordSet: RecordSet,
|
||||||
|
zone: Zone,
|
||||||
|
userId: String,
|
||||||
|
singleBatchChangeIds: List[String]
|
||||||
|
): RecordSetChange =
|
||||||
|
RecordSetChange(
|
||||||
|
zone = zone,
|
||||||
|
recordSet = recordSet.copy(
|
||||||
|
name = relativize(recordSet.name, zone.name),
|
||||||
|
status = RecordSetStatus.PendingDelete,
|
||||||
|
updated = Some(Instant.now.truncatedTo(ChronoUnit.MILLIS))
|
||||||
|
),
|
||||||
|
userId = userId,
|
||||||
|
changeType = RecordSetChangeType.Sync,
|
||||||
|
updates = Some(recordSet),
|
||||||
|
singleBatchChangeIds = singleBatchChangeIds
|
||||||
|
)
|
||||||
|
|
||||||
def forDelete(
|
def forDelete(
|
||||||
recordSet: RecordSet,
|
recordSet: RecordSet,
|
||||||
zone: Zone,
|
zone: Zone,
|
||||||
|
@ -38,7 +38,6 @@ object RecordSetChangeHandler extends TransactionProvider {
|
|||||||
private val outOfSyncFailureMessage: String = "This record set is out of sync with the DNS backend; sync this zone before attempting to update this record set."
|
private val outOfSyncFailureMessage: String = "This record set is out of sync with the DNS backend; sync this zone before attempting to update this record set."
|
||||||
private val incompatibleRecordFailureMessage: String = "Incompatible record in DNS."
|
private val incompatibleRecordFailureMessage: String = "Incompatible record in DNS."
|
||||||
private val syncZoneMessage: String = "This record set is out of sync with the DNS backend. Sync this zone before attempting to update this record set."
|
private val syncZoneMessage: String = "This record set is out of sync with the DNS backend. Sync this zone before attempting to update this record set."
|
||||||
private val wrongRecordDataMessage: String = "The record data entered doesn't exist. Please enter the correct record data or leave the field empty if it's a delete operation."
|
|
||||||
private val recordConflictMessage: String = "Conflict due to the record having the same name as an NS record in the same zone. Please create the record using the DNS service the NS record has been delegated to (ex. AWS r53), or use a different record name."
|
private val recordConflictMessage: String = "Conflict due to the record having the same name as an NS record in the same zone. Please create the record using the DNS service the NS record has been delegated to (ex. AWS r53), or use a different record name."
|
||||||
|
|
||||||
final case class Requeue(change: RecordSetChange) extends Throwable
|
final case class Requeue(change: RecordSetChange) extends Throwable
|
||||||
@ -194,6 +193,16 @@ object RecordSetChangeHandler extends TransactionProvider {
|
|||||||
case RecordSetChangeType.Delete =>
|
case RecordSetChangeType.Delete =>
|
||||||
if (existingRecords.nonEmpty) ReadyToApply(change) // we have a record set, move forward
|
if (existingRecords.nonEmpty) ReadyToApply(change) // we have a record set, move forward
|
||||||
else AlreadyApplied(change) // we did not find the record set, so already applied
|
else AlreadyApplied(change) // we did not find the record set, so already applied
|
||||||
|
|
||||||
|
case RecordSetChangeType.Sync =>
|
||||||
|
if (existingRecords.nonEmpty) {
|
||||||
|
Failure(
|
||||||
|
change,
|
||||||
|
outOfSyncFailureMessage
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
AlreadyApplied(change)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -393,18 +402,12 @@ object RecordSetChangeHandler extends TransactionProvider {
|
|||||||
case AlreadyApplied(_) => Completed(change.successful)
|
case AlreadyApplied(_) => Completed(change.successful)
|
||||||
case ReadyToApply(_) => Validated(change)
|
case ReadyToApply(_) => Validated(change)
|
||||||
case Failure(_, message) =>
|
case Failure(_, message) =>
|
||||||
if(message == outOfSyncFailureMessage){
|
if(message == outOfSyncFailureMessage || message == incompatibleRecordFailureMessage){
|
||||||
Completed(
|
Completed(
|
||||||
change.failed(
|
change.failed(
|
||||||
syncZoneMessage
|
syncZoneMessage
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
} else if (message == incompatibleRecordFailureMessage) {
|
|
||||||
Completed(
|
|
||||||
change.failed(
|
|
||||||
wrongRecordDataMessage
|
|
||||||
)
|
|
||||||
)
|
|
||||||
} else if (message == "referral") {
|
} else if (message == "referral") {
|
||||||
Completed(
|
Completed(
|
||||||
change.failed(
|
change.failed(
|
||||||
|
@ -862,12 +862,6 @@ class VinylDNSClient(object):
|
|||||||
if type(latest_change) != str:
|
if type(latest_change) != str:
|
||||||
change = latest_change
|
change = latest_change
|
||||||
|
|
||||||
if change["status"] != expected_status:
|
|
||||||
print("Failed waiting for record change status")
|
|
||||||
print(json.dumps(change, indent=3))
|
|
||||||
if "systemMessage" in change:
|
|
||||||
print("systemMessage is " + change["systemMessage"])
|
|
||||||
|
|
||||||
assert_that(change["status"], is_(expected_status))
|
assert_that(change["status"], is_(expected_status))
|
||||||
return change
|
return change
|
||||||
|
|
||||||
@ -892,10 +886,6 @@ class VinylDNSClient(object):
|
|||||||
else:
|
else:
|
||||||
change = latest_change
|
change = latest_change
|
||||||
|
|
||||||
if not self.batch_is_completed(change):
|
|
||||||
print("Failed waiting for record change status")
|
|
||||||
print(change)
|
|
||||||
|
|
||||||
assert_that(self.batch_is_completed(change), is_(True))
|
assert_that(self.batch_is_completed(change), is_(True))
|
||||||
return change
|
return change
|
||||||
|
|
||||||
|
@ -546,7 +546,7 @@ class BatchChangeConverterSpec extends AnyWordSpec with Matchers {
|
|||||||
savedBatch shouldBe Some(returnedBatch)
|
savedBatch shouldBe Some(returnedBatch)
|
||||||
}
|
}
|
||||||
|
|
||||||
"set status to complete when deleting a record that does not exist" in {
|
"set status to pending when deleting a record that does not exist" in {
|
||||||
val batchWithBadChange =
|
val batchWithBadChange =
|
||||||
BatchChange(
|
BatchChange(
|
||||||
okUser.id,
|
okUser.id,
|
||||||
@ -570,10 +570,10 @@ class BatchChangeConverterSpec extends AnyWordSpec with Matchers {
|
|||||||
|
|
||||||
// validate completed status returned
|
// validate completed status returned
|
||||||
val receivedChange = returnedBatch.changes(0)
|
val receivedChange = returnedBatch.changes(0)
|
||||||
receivedChange.status shouldBe SingleChangeStatus.Complete
|
receivedChange.status shouldBe SingleChangeStatus.Pending
|
||||||
receivedChange.recordChangeId shouldBe None
|
receivedChange.recordChangeId shouldBe None
|
||||||
receivedChange.systemMessage shouldBe Some(nonExistentRecordDeleteMessage)
|
receivedChange.systemMessage shouldBe Some(nonExistentRecordDeleteMessage)
|
||||||
returnedBatch.changes(0) shouldBe singleChangesOneDelete(0).copy(systemMessage = Some(nonExistentRecordDeleteMessage), status = SingleChangeStatus.Complete)
|
returnedBatch.changes(0) shouldBe singleChangesOneDelete(0).copy(systemMessage = Some(nonExistentRecordDeleteMessage), status = SingleChangeStatus.Pending)
|
||||||
|
|
||||||
// check the update has been made in the DB
|
// check the update has been made in the DB
|
||||||
val savedBatch: Option[BatchChange] =
|
val savedBatch: Option[BatchChange] =
|
||||||
|
@ -27,7 +27,7 @@ import org.scalatest.matchers.should.Matchers
|
|||||||
import org.scalatest.wordspec.AnyWordSpec
|
import org.scalatest.wordspec.AnyWordSpec
|
||||||
import org.scalatest.{BeforeAndAfterEach, EitherValues}
|
import org.scalatest.{BeforeAndAfterEach, EitherValues}
|
||||||
import vinyldns.api.backend.dns.DnsProtocol.{NotAuthorized, TryAgain}
|
import vinyldns.api.backend.dns.DnsProtocol.{NotAuthorized, TryAgain}
|
||||||
import vinyldns.api.engine.RecordSetChangeHandler.{AlreadyApplied, ReadyToApply, Requeue}
|
import vinyldns.api.engine.RecordSetChangeHandler.{AlreadyApplied, ReadyToApply, Requeue, Failure}
|
||||||
import vinyldns.api.repository.InMemoryBatchChangeRepository
|
import vinyldns.api.repository.InMemoryBatchChangeRepository
|
||||||
import vinyldns.core.domain.batch.{BatchChange, BatchChangeApprovalStatus, SingleAddChange, SingleChangeStatus}
|
import vinyldns.core.domain.batch.{BatchChange, BatchChangeApprovalStatus, SingleAddChange, SingleChangeStatus}
|
||||||
import vinyldns.core.domain.record.RecordType.RecordType
|
import vinyldns.core.domain.record.RecordType.RecordType
|
||||||
@ -867,6 +867,25 @@ class RecordSetChangeHandlerSpec
|
|||||||
processorStatus shouldBe a[AlreadyApplied]
|
processorStatus shouldBe a[AlreadyApplied]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"return Failed if there is a record in the DNS backend but not in vinyldns, and we try to delete it " in {
|
||||||
|
doReturn(IO.pure(List(rs)))
|
||||||
|
.when(mockBackend)
|
||||||
|
.resolve(rs.name, rsChange.zone.name, rs.typ)
|
||||||
|
doReturn(IO.pure(List(rs))).when(mockRsRepo).getRecordSetsByName(cs.zoneId, rs.name)
|
||||||
|
|
||||||
|
val processorStatus = RecordSetChangeHandler
|
||||||
|
.syncAndGetProcessingStatusFromDnsBackend(
|
||||||
|
rsChange.copy(changeType = RecordSetChangeType.Sync),
|
||||||
|
mockBackend,
|
||||||
|
mockRsRepo,
|
||||||
|
mockChangeRepo,
|
||||||
|
mockRecordSetDataRepo,
|
||||||
|
true
|
||||||
|
)
|
||||||
|
.unsafeRunSync()
|
||||||
|
processorStatus shouldBe a[Failure]
|
||||||
|
}
|
||||||
|
|
||||||
"sync in the DNS backend for Delete change if record exists" in {
|
"sync in the DNS backend for Delete change if record exists" in {
|
||||||
doReturn(IO.pure(List(rs)))
|
doReturn(IO.pure(List(rs)))
|
||||||
.when(mockBackend)
|
.when(mockBackend)
|
||||||
|
@ -202,6 +202,7 @@ object MySqlRecordChangeRepository extends ProtobufConversions {
|
|||||||
case RecordSetChangeType.Create => 1
|
case RecordSetChangeType.Create => 1
|
||||||
case RecordSetChangeType.Delete => 2
|
case RecordSetChangeType.Delete => 2
|
||||||
case RecordSetChangeType.Update => 3
|
case RecordSetChangeType.Update => 3
|
||||||
|
case RecordSetChangeType.Sync => 4
|
||||||
}
|
}
|
||||||
|
|
||||||
def toRecordSetChange(ws: WrappedResultSet): RecordSetChange =
|
def toRecordSetChange(ws: WrappedResultSet): RecordSetChange =
|
||||||
|
Loading…
x
Reference in New Issue
Block a user