mirror of
https://github.com/VinylDNS/vinyldns
synced 2025-09-02 07:15:24 +00:00
Merge branch 'master' into fix_change_failure_metrics
This commit is contained in:
@@ -22,10 +22,12 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.wordspec.AnyWordSpec
|
||||
import scalikejdbc._
|
||||
import vinyldns.core.domain.record.{ChangeSet, RecordChangeRepository, RecordSetChange, RecordSetChangeStatus, RecordSetChangeType}
|
||||
import vinyldns.core.domain.record.{ChangeSet, RecordChangeRepository, RecordSetChange, RecordSetChangeStatus, RecordSetChangeType, RecordType}
|
||||
import vinyldns.core.domain.zone.Zone
|
||||
import vinyldns.mysql.TestMySqlInstance
|
||||
import vinyldns.mysql.TransactionProvider
|
||||
import java.time.Instant
|
||||
|
||||
class MySqlRecordChangeRepositoryIntegrationSpec
|
||||
extends AnyWordSpec
|
||||
with Matchers
|
||||
@@ -60,6 +62,15 @@ class MySqlRecordChangeRepositoryIntegrationSpec
|
||||
newRecordSets.map(makeTestAddChange(_, zone)).toList
|
||||
}
|
||||
|
||||
def generateSameInserts(zone: Zone, count: Int): List[RecordSetChange] = {
|
||||
val newRecordSets =
|
||||
for {
|
||||
i <- 1 to count
|
||||
} yield aaaa.copy(zoneId = zone.id, name = s"apply-test", id = UUID.randomUUID().toString, created = Instant.now.plusSeconds(i))
|
||||
|
||||
newRecordSets.map(makeTestAddChange(_, zone)).toList
|
||||
}
|
||||
|
||||
def generateFailedInserts(zone: Zone, count: Int): List[RecordSetChange] = {
|
||||
val newRecordSets =
|
||||
for {
|
||||
@@ -102,7 +113,7 @@ class MySqlRecordChangeRepositoryIntegrationSpec
|
||||
repo.save(db, ChangeSet(inserts))
|
||||
}
|
||||
saveRecChange.attempt.unsafeRunSync() shouldBe right
|
||||
val result = repo.listRecordSetChanges(okZone.id, None, 5).unsafeRunSync()
|
||||
val result = repo.listRecordSetChanges(Some(okZone.id), None, 5).unsafeRunSync()
|
||||
result.nextId shouldBe defined
|
||||
result.maxItems shouldBe 5
|
||||
(result.items should have).length(5)
|
||||
@@ -121,21 +132,48 @@ class MySqlRecordChangeRepositoryIntegrationSpec
|
||||
repo.save(db, ChangeSet(timeSpaced))
|
||||
}
|
||||
saveRecChange.attempt.unsafeRunSync() shouldBe right
|
||||
val page1 = repo.listRecordSetChanges(okZone.id, None, 2).unsafeRunSync()
|
||||
val page1 = repo.listRecordSetChanges(Some(okZone.id), None, 2).unsafeRunSync()
|
||||
page1.nextId shouldBe Some(2)
|
||||
page1.maxItems shouldBe 2
|
||||
(page1.items should contain).theSameElementsInOrderAs(expectedOrder.take(2))
|
||||
|
||||
val page2 = repo.listRecordSetChanges(okZone.id, page1.nextId, 2).unsafeRunSync()
|
||||
val page2 = repo.listRecordSetChanges(Some(okZone.id), page1.nextId, 2).unsafeRunSync()
|
||||
page2.nextId shouldBe Some(4)
|
||||
page2.maxItems shouldBe 2
|
||||
(page2.items should contain).theSameElementsInOrderAs(expectedOrder.slice(2, 4))
|
||||
|
||||
val page3 = repo.listRecordSetChanges(okZone.id, page2.nextId, 2).unsafeRunSync()
|
||||
val page3 = repo.listRecordSetChanges(Some(okZone.id), page2.nextId, 2).unsafeRunSync()
|
||||
page3.nextId shouldBe None
|
||||
page3.maxItems shouldBe 2
|
||||
page3.items should contain theSameElementsAs expectedOrder.slice(4, 5)
|
||||
}
|
||||
"list a particular recordset's changes by fqdn and record type" in {
|
||||
val inserts = generateInserts(okZone, 10)
|
||||
val saveRecChange = executeWithinTransaction { db: DB =>
|
||||
repo.save(db, ChangeSet(inserts))
|
||||
}
|
||||
saveRecChange.attempt.unsafeRunSync() shouldBe right
|
||||
val result = repo.listRecordSetChanges(None, None, 5, Some("1-apply-test.ok.zone.recordsets."), Some(RecordType.AAAA)).unsafeRunSync()
|
||||
result.nextId shouldBe None
|
||||
result.maxItems shouldBe 5
|
||||
(result.items should have).length(1)
|
||||
}
|
||||
"page through a particular recordset's changes by fqdn and record type" in {
|
||||
val inserts = generateSameInserts(okZone, 8)
|
||||
val saveRecChange = executeWithinTransaction { db: DB =>
|
||||
repo.save(db, ChangeSet(inserts))
|
||||
}
|
||||
saveRecChange.attempt.unsafeRunSync() shouldBe right
|
||||
val page1 = repo.listRecordSetChanges(None, None, 5, Some("apply-test.ok.zone.recordsets."), Some(RecordType.AAAA)).unsafeRunSync()
|
||||
page1.nextId shouldBe defined
|
||||
page1.maxItems shouldBe 5
|
||||
(page1.items should have).length(5)
|
||||
|
||||
val page2 = repo.listRecordSetChanges(None, page1.nextId, 5, Some("apply-test.ok.zone.recordsets."), Some(RecordType.AAAA)).unsafeRunSync()
|
||||
page2.nextId shouldBe None
|
||||
page2.maxItems shouldBe 5
|
||||
(page2.items should have).length(3)
|
||||
}
|
||||
}
|
||||
|
||||
"list failed record changes" should {
|
||||
|
@@ -0,0 +1,8 @@
|
||||
CREATE SCHEMA IF NOT EXISTS ${dbName};
|
||||
|
||||
USE ${dbName};
|
||||
|
||||
ALTER TABLE record_change ADD COLUMN fqdn VARCHAR(255) NOT NULL;
|
||||
ALTER TABLE record_change ADD COLUMN record_type VARCHAR(255) NOT NULL;
|
||||
CREATE INDEX fqdn_index ON record_change (fqdn);
|
||||
CREATE INDEX record_type_index ON record_change (record_type);
|
@@ -19,9 +19,11 @@ package vinyldns.mysql.repository
|
||||
import cats.effect._
|
||||
import scalikejdbc._
|
||||
import vinyldns.core.domain.record.RecordSetChangeType.RecordSetChangeType
|
||||
import vinyldns.core.domain.record.RecordType.RecordType
|
||||
import vinyldns.core.domain.record._
|
||||
import vinyldns.core.protobuf.ProtobufConversions
|
||||
import vinyldns.core.route.Monitored
|
||||
import vinyldns.mysql.repository.MySqlRecordSetRepository.fromRecordType
|
||||
import vinyldns.proto.VinylDNSProto
|
||||
|
||||
class MySqlRecordChangeRepository
|
||||
@@ -39,6 +41,24 @@ class MySqlRecordChangeRepository
|
||||
| LIMIT {limit} OFFSET {startFrom}
|
||||
""".stripMargin
|
||||
|
||||
private val LIST_CHANGES_WITH_START_FQDN_TYPE =
|
||||
sql"""
|
||||
|SELECT data
|
||||
| FROM record_change
|
||||
| WHERE fqdn = {fqdn} AND record_type = {type}
|
||||
| ORDER BY created DESC
|
||||
| LIMIT {limit} OFFSET {startFrom}
|
||||
""".stripMargin
|
||||
|
||||
private val LIST_CHANGES_WITHOUT_START_FQDN_TYPE =
|
||||
sql"""
|
||||
|SELECT data
|
||||
| FROM record_change
|
||||
| WHERE fqdn = {fqdn} AND record_type = {type}
|
||||
| ORDER BY created DESC
|
||||
| LIMIT {limit}
|
||||
""".stripMargin
|
||||
|
||||
private val LIST_RECORD_CHANGES =
|
||||
sql"""
|
||||
|SELECT data
|
||||
@@ -63,7 +83,7 @@ class MySqlRecordChangeRepository
|
||||
""".stripMargin
|
||||
|
||||
private val INSERT_CHANGES =
|
||||
sql"INSERT IGNORE INTO record_change (id, zone_id, created, type, data) VALUES (?, ?, ?, ?, ?)"
|
||||
sql"INSERT IGNORE INTO record_change (id, zone_id, created, type, fqdn, record_type, data) VALUES (?, ?, ?, ?, ?, ?, ?)"
|
||||
|
||||
/**
|
||||
* We have the same issue with changes as record sets, namely we may have to save millions of them
|
||||
@@ -82,7 +102,9 @@ class MySqlRecordChangeRepository
|
||||
change.zoneId,
|
||||
change.created.toEpochMilli,
|
||||
fromChangeType(change.changeType),
|
||||
toPB(change).toByteArray
|
||||
if(change.recordSet.name == change.zone.name) change.zone.name else change.recordSet.name + "." + change.zone.name,
|
||||
fromRecordType(change.recordSet.typ),
|
||||
toPB(change).toByteArray,
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -94,33 +116,52 @@ class MySqlRecordChangeRepository
|
||||
}
|
||||
|
||||
def listRecordSetChanges(
|
||||
zoneId: String,
|
||||
zoneId: Option[String],
|
||||
startFrom: Option[Int],
|
||||
maxItems: Int
|
||||
maxItems: Int,
|
||||
fqdn: Option[String],
|
||||
recordType: Option[RecordType]
|
||||
): IO[ListRecordSetChangesResults] =
|
||||
monitor("repo.RecordChange.listRecordSetChanges") {
|
||||
IO {
|
||||
DB.readOnly { implicit s =>
|
||||
val changes = startFrom match {
|
||||
case Some(start) =>
|
||||
LIST_CHANGES_WITH_START
|
||||
.bindByName('zoneId -> zoneId, 'startFrom -> start, 'limit -> maxItems)
|
||||
.map(toRecordSetChange)
|
||||
.list()
|
||||
.apply()
|
||||
case None =>
|
||||
LIST_CHANGES_NO_START
|
||||
.bindByName('zoneId -> zoneId, 'limit -> maxItems)
|
||||
.map(toRecordSetChange)
|
||||
.list()
|
||||
.apply()
|
||||
val changes = if(startFrom.isDefined && fqdn.isDefined && recordType.isDefined){
|
||||
LIST_CHANGES_WITH_START_FQDN_TYPE
|
||||
.bindByName('fqdn -> fqdn.get, 'type -> fromRecordType(recordType.get), 'startFrom -> startFrom.get, 'limit -> (maxItems + 1))
|
||||
.map(toRecordSetChange)
|
||||
.list()
|
||||
.apply()
|
||||
} else if(fqdn.isDefined && recordType.isDefined){
|
||||
LIST_CHANGES_WITHOUT_START_FQDN_TYPE
|
||||
.bindByName('fqdn -> fqdn.get, 'type -> fromRecordType(recordType.get), 'limit -> (maxItems + 1))
|
||||
.map(toRecordSetChange)
|
||||
.list()
|
||||
.apply()
|
||||
} else if(startFrom.isDefined){
|
||||
LIST_CHANGES_WITH_START
|
||||
.bindByName('zoneId -> zoneId.get, 'startFrom -> startFrom.get, 'limit -> (maxItems + 1))
|
||||
.map(toRecordSetChange)
|
||||
.list()
|
||||
.apply()
|
||||
} else {
|
||||
LIST_CHANGES_NO_START
|
||||
.bindByName('zoneId -> zoneId.get, 'limit -> (maxItems + 1))
|
||||
.map(toRecordSetChange)
|
||||
.list()
|
||||
.apply()
|
||||
}
|
||||
|
||||
val maxQueries = changes.take(maxItems)
|
||||
val startValue = startFrom.getOrElse(0)
|
||||
val nextId = if (changes.size < maxItems) None else Some(startValue + maxItems)
|
||||
|
||||
// earlier maxItems was incremented, if the (maxItems + 1) size is not reached then pages are exhausted
|
||||
val nextId = changes match {
|
||||
case _ if changes.size <= maxItems | changes.isEmpty => None
|
||||
case _ => Some(startValue + maxItems)
|
||||
}
|
||||
|
||||
ListRecordSetChangesResults(
|
||||
changes,
|
||||
maxQueries,
|
||||
nextId,
|
||||
startFrom,
|
||||
maxItems
|
||||
|
Reference in New Issue
Block a user