diff --git a/.gitignore b/.gitignore index 1f53fe894..4ed2231c7 100644 --- a/.gitignore +++ b/.gitignore @@ -34,7 +34,6 @@ project/metals.sbt quickstart/data **/.virtualenv **/.venv* -**/*cache* **/artifacts/ **/.env.overrides **/node_modules diff --git a/build/assemble_artifacts.sh b/build/assemble_artifacts.sh index 3a8b8e668..4b377f5be 100755 --- a/build/assemble_artifacts.sh +++ b/build/assemble_artifacts.sh @@ -17,11 +17,11 @@ usage() { clean(){ echo "Cleaning artifacts" - if [ -d "${DIR}/../artifacts/" ] && [ -f "${DIR}/../artifacts/*.jar" ]; then - rm "${DIR}/../artifacts/*.jar" + if [ -d "${DIR}/../artifacts/" ] && [ -f "${DIR}/../artifacts/vinyldns-api.jar" ]; then + rm "${DIR}/../artifacts/vinyldns-api.jar" fi - if [ -d "${DIR}/../artifacts/" ] && [ -f "${DIR}/../artifacts/*.zip" ]; then - rm "${DIR}/../artifacts/*.zip" + if [ -d "${DIR}/../artifacts/" ] && [ -f "${DIR}/../artifacts/vinyldns-portal.zip" ]; then + rm "${DIR}/../artifacts/vinyldns-portal.zip" fi } diff --git a/modules/api/src/main/scala/vinyldns/api/domain/record/RecordSetCacheService.scala b/modules/api/src/main/scala/vinyldns/api/domain/record/RecordSetCacheService.scala new file mode 100644 index 000000000..6c90aaa1a --- /dev/null +++ b/modules/api/src/main/scala/vinyldns/api/domain/record/RecordSetCacheService.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2018 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package vinyldns.api.domain.record + +import cats.effect.IO +import org.slf4j.LoggerFactory +import scalikejdbc.DB +import vinyldns.core.domain.record.{NameSort, ListRecordSetResults, RecordSetCacheRepository, RecordSetRepository} +import vinyldns.mysql.TransactionProvider + + +class RecordSetCacheService(recordSetRepository: RecordSetRepository, + recordSetCacheRepository: RecordSetCacheRepository) extends TransactionProvider { + private val logger = LoggerFactory.getLogger(classOf[RecordSetCacheService]) + + final def populateRecordSetCache(nextId: Option[String] = None): IO[ListRecordSetResults] = { + logger.info(s"Populating recordset data. Starting at $nextId") + for { + result <- recordSetRepository.listRecordSets(None, nextId, Some(1000), None, None, None, NameSort.ASC) + + _ <- executeWithinTransaction { db: DB => + IO { + result.recordSets.par.foreach(recordSet => { + recordSetCacheRepository.updateRecordDataList(db, recordSet.id, recordSet.records, recordSet.typ, recordSet.zoneId, recordSet.fqdn.get) + }) + } + } + _ <- populateRecordSetCache(result.nextId) + } yield { + result + } + } +} diff --git a/modules/core/src/main/scala/vinyldns/core/domain/record/RecordSetCacheRepository.scala b/modules/core/src/main/scala/vinyldns/core/domain/record/RecordSetCacheRepository.scala new file mode 100644 index 000000000..de4f9745c --- /dev/null +++ b/modules/core/src/main/scala/vinyldns/core/domain/record/RecordSetCacheRepository.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2018 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package vinyldns.core.domain.record + +import cats.effect._ +import scalikejdbc.DB +import vinyldns.core.domain.record.NameSort.NameSort +import vinyldns.core.domain.record.RecordType.RecordType +import vinyldns.core.repository.Repository + +trait RecordSetCacheRepository extends Repository { + + def save(db: DB, changeSet: ChangeSet): IO[ChangeSet] + + def deleteRecordSetDataInZone(db: DB, zoneId: String, zoneName: String): IO[Unit] + + def listRecordSetData( + zoneId: Option[String], + startFrom: Option[String], + maxItems: Option[Int], + recordNameFilter: Option[String], + recordTypeFilter: Option[Set[RecordType]], + recordOwnerGroupFilter: Option[String], + nameSort: NameSort + ): IO[ListRecordSetResults] + + /** + * Saves the recordset data to the database + * + * @param db The database + * @param recordID The record identifier + * @param recordData The list of record data + * @param recordType The record type + * @param zoneId The zone identifier + * @param FQDN The fully qualified domain name + */ + def updateRecordDataList(db: DB, + recordID: String, + recordData: List[RecordData], + recordType: RecordType, + zoneId: String, + FQDN: String + ): Unit + +} diff --git a/modules/core/src/main/scala/vinyldns/core/repository/DataStore.scala b/modules/core/src/main/scala/vinyldns/core/repository/DataStore.scala index cfb562406..8118a4048 100755 --- a/modules/core/src/main/scala/vinyldns/core/repository/DataStore.scala +++ b/modules/core/src/main/scala/vinyldns/core/repository/DataStore.scala @@ -19,7 +19,7 @@ package vinyldns.core.repository import cats.effect.IO import vinyldns.core.domain.batch.BatchChangeRepository import vinyldns.core.domain.membership._ -import vinyldns.core.domain.record.{RecordChangeRepository, RecordSetRepository, RecordSetCacheRepository} +import vinyldns.core.domain.record.{RecordChangeRepository, RecordSetCacheRepository, RecordSetRepository} import vinyldns.core.domain.zone.{ZoneRepository, ZoneChangeRepository} import vinyldns.core.repository.RepositoryName.RepositoryName import vinyldns.core.health.HealthCheck.HealthCheck diff --git a/modules/mysql/src/it/scala/vinyldns/mysql/repository/MySqlRecordSetCacheRepositoryIntegrationSpec.scala b/modules/mysql/src/it/scala/vinyldns/mysql/repository/MySqlRecordSetCacheRepositoryIntegrationSpec.scala new file mode 100644 index 000000000..9027b2dac --- /dev/null +++ b/modules/mysql/src/it/scala/vinyldns/mysql/repository/MySqlRecordSetCacheRepositoryIntegrationSpec.scala @@ -0,0 +1,486 @@ +/* + * Copyright 2018 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package vinyldns.mysql.repository +import java.util.UUID +import cats.scalatest.EitherMatchers +import org.joda.time.DateTime +import org.scalatest._ +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec +import scalikejdbc.DB +import vinyldns.core.domain.record._ +import vinyldns.core.domain.record.RecordType._ +import vinyldns.core.domain.zone.Zone +import vinyldns.mysql.TestMySqlInstance +import vinyldns.mysql.repository.MySqlRecordSetRepository.PagingKey +import vinyldns.mysql.TransactionProvider + +class MySqlRecordSetCacheRepositoryIntegrationSpec + extends AnyWordSpec + with BeforeAndAfterEach + with BeforeAndAfterAll + with Matchers + with EitherMatchers + with TransactionProvider { + + import vinyldns.core.TestRecordSetData._ + import vinyldns.core.TestZoneData._ + private val recordSetCacheRepo = TestMySqlInstance.recordSetCacheRepository.asInstanceOf[MySqlRecordSetCacheRepository] + + private val recordSetRepo = TestMySqlInstance.recordSetRepository.asInstanceOf[MySqlRecordSetRepository] + + override protected def beforeEach(): Unit = clear() + + override protected def afterAll(): Unit = clear() + + def clear(): Unit = + DB.localTx { s => + s.executeUpdate("DELETE FROM recordset_data") + s.executeUpdate("DELETE FROM recordset") + } + + def generateInserts(zone: Zone, count: Int, word: String = "insert"): List[RecordSetChange] = { + val newRecordSetsData = + for { + i <- 1 to count + } yield aaaa.copy( + zoneId = zone.id, + name = s"$i-${word}-apply-test", + id = UUID.randomUUID().toString + ) + + newRecordSetsData.map(makeTestAddChange(_, zone)).toList + } + + def insert(zone: Zone, count: Int, word: String = "insert"): List[RecordSetChange] = { + val pendingChanges = generateInserts(zone, count, word) + val bigPendingChangeSet = ChangeSet(pendingChanges) + executeWithinTransaction { db: DB => + recordSetCacheRepo.save(db, bigPendingChangeSet) + recordSetRepo.apply(db, bigPendingChangeSet) + + }.unsafeRunSync() + pendingChanges + } + + def insert(changes: List[RecordSetChange]): Unit = { + val bigPendingChangeSet = ChangeSet(changes) + executeWithinTransaction { db: DB => + recordSetCacheRepo.save(db, bigPendingChangeSet) + recordSetRepo.apply(db, bigPendingChangeSet) + + }.unsafeRunSync() + () + } + + def recordSetDataWithFQDN(recordSet: RecordSet, zone: Zone): RecordSet = + recordSet.copy(fqdn = Some(s"""${recordSet.name}.${zone.name}""")) + + "apply" should { + "properly revert changes that fail processing" in { + val existing = insert(okZone, 2).map(_.recordSet) + + val addChange = makeTestAddChange(rsOk.copy(id = UUID.randomUUID().toString)) + .copy(status = RecordSetChangeStatus.Failed) + val updateChange = + makePendingTestUpdateChange(existing.head, existing.head.copy(name = "updated-name")) + .copy(status = RecordSetChangeStatus.Failed) + val deleteChange = makePendingTestDeleteChange(existing(1)) + .copy(status = RecordSetChangeStatus.Failed) + executeWithinTransaction { db: DB => + recordSetCacheRepo.save(db, ChangeSet(Seq(addChange, updateChange, deleteChange))) + recordSetRepo.apply(db, ChangeSet(Seq(addChange, updateChange, deleteChange))) + } + recordSetCacheRepo.getRecordSetData(rsOk.id).unsafeRunSync() shouldBe None + recordSetCacheRepo.getRecordSetData(existing.head.id).unsafeRunSync() shouldBe Some( + recordSetDataWithFQDN(existing.head, okZone) + ) + recordSetCacheRepo.getRecordSetData(existing(1).id).unsafeRunSync() shouldBe Some( + recordSetDataWithFQDN(existing(1), okZone) + ) + } + + "apply successful and pending creates, and delete failed creates" in { + val zone = okZone + val recordForSuccess = RecordSet( + "test-create-converter", + "createSuccess", + RecordType.A, + 123, + RecordSetStatus.Active, + DateTime.now, + records = List(AData("127.0.0.1")) + ) + val recordForPending = RecordSet( + "test-create-converter", + "createPending", + RecordType.A, + 123, + RecordSetStatus.Pending, + DateTime.now, + records = List(AData("127.0.0.1")) + ) + val recordForFailed = RecordSet( + "test-create-converter", + "failed", + RecordType.A, + 123, + RecordSetStatus.Inactive, + DateTime.now, + records = List(AData("127.0.0.1")) + ) + + val successfulChange = + RecordSetChange( + zone, + recordForSuccess, + "abc", + RecordSetChangeType.Create, + RecordSetChangeStatus.Complete + ) + + val pendingChange = + successfulChange.copy(recordSet = recordForPending, status = RecordSetChangeStatus.Pending) + val failedChange = + successfulChange.copy(recordSet = recordForFailed, status = RecordSetChangeStatus.Failed) + + // to be deleted - assume this was already saved as pending + val existingPending = failedChange.copy( + recordSet = recordForFailed.copy(status = RecordSetStatus.Pending), + status = RecordSetChangeStatus.Pending + ) + executeWithinTransaction { db: DB => + recordSetCacheRepo.save(db, ChangeSet(existingPending)) + recordSetRepo.apply(db, ChangeSet(existingPending)) + }.attempt.unsafeRunSync() + recordSetCacheRepo.getRecordSetData(failedChange.recordSet.id).unsafeRunSync() shouldBe + Some( + existingPending.recordSet + .copy(fqdn = Some(s"""${failedChange.recordSet.name}.${okZone.name}""")) + ) + executeWithinTransaction { db: DB => + recordSetCacheRepo.save(db, ChangeSet(Seq(successfulChange, pendingChange, failedChange))) + recordSetRepo.apply(db, ChangeSet(Seq(successfulChange, pendingChange, failedChange))) + }.attempt.unsafeRunSync() + + // success and pending changes have records saved + recordSetCacheRepo + .getRecordSetData(successfulChange.recordSet.id) + .unsafeRunSync() shouldBe + Some(recordSetDataWithFQDN(successfulChange.recordSet, okZone)) + recordSetCacheRepo + .getRecordSetData(pendingChange.recordSet.id) + .unsafeRunSync() shouldBe + Some(recordSetDataWithFQDN(pendingChange.recordSet, okZone)) + + // check that the pending record was deleted because of failed record change + recordSetCacheRepo + .getRecordSetData(failedChange.recordSet.id) + .unsafeRunSync() shouldBe None + } + } + "inserting record sets" should { + "be idempotent for inserts" in { + val pendingChanges = generateInserts(okZone, 1000) + val bigPendingChangeSet = ChangeSet(pendingChanges) + val saveRecSets = executeWithinTransaction { db: DB => + recordSetCacheRepo.save(db, bigPendingChangeSet) + recordSetCacheRepo.save(db, bigPendingChangeSet) + recordSetRepo.apply(db, bigPendingChangeSet) + recordSetRepo.apply(db, bigPendingChangeSet) + } + saveRecSets.attempt.unsafeRunSync() shouldBe right + } + "work for multiple inserts" in { + val pendingChanges = generateInserts(okZone, 20) + + val bigPendingChangeSet = ChangeSet(pendingChanges) + executeWithinTransaction { db: DB => + recordSetCacheRepo.save(db, bigPendingChangeSet) + recordSetRepo.apply(db, bigPendingChangeSet) + + }.attempt.unsafeRunSync() + // let's make sure we have all 1000 records + val recordCount = recordSetCacheRepo.getRecordSetDataCount(okZone.id).unsafeRunSync() + recordCount shouldBe 20 + } + "work for deletes, updates, and inserts" in { + // create some record sets to be updated + val existing = insert(okZone, 10).map(_.recordSet) + + // update a few, delete a few + val deletes = existing + .take(2) + .map(makePendingTestDeleteChange(_, okZone).copy(status = RecordSetChangeStatus.Complete)) + + // updates we will just add the letter u to + val updates = existing.slice(3, 5).map { rs => + val update = rs.copy(name = "u" + rs.name) + makeCompleteTestUpdateChange(rs, update, okZone) + } + + // insert a few more + val inserts = generateInserts(okZone, 2, "more-inserts") + + // exercise the entire change set + val cs = ChangeSet(deletes ++ updates ++ inserts) + executeWithinTransaction { db: DB => + recordSetCacheRepo.save(db, cs) + recordSetRepo.apply(db, cs) + + }.attempt.unsafeRunSync() + // make sure the deletes are gone + recordSetCacheRepo.getRecordSetData(deletes(0).recordSet.id).unsafeRunSync() shouldBe None + recordSetCacheRepo.getRecordSetData(deletes(1).recordSet.id).unsafeRunSync() shouldBe None + + // make sure the updates are updated + recordSetCacheRepo.getRecordSetData(updates(0).recordSet.id).unsafeRunSync().map(_.name) shouldBe + Some(updates(0).recordSet.name) + recordSetCacheRepo.getRecordSetData(updates(1).recordSet.id).unsafeRunSync().map(_.name) shouldBe + Some(updates(1).recordSet.name) + + // make sure the new ones are there + recordSetCacheRepo.getRecordSetData(inserts(0).recordSet.id).unsafeRunSync().map(_.name) shouldBe + Some(inserts(0).recordSet.name) + recordSetCacheRepo.getRecordSetData(inserts(1).recordSet.id).unsafeRunSync().map(_.name) shouldBe + Some(inserts(1).recordSet.name) + } + } + "list record sets" should { + "return all record sets in a zone when optional params are not set" in { + val existing = insert(okZone, 10).map(_.recordSet) + val found = recordSetCacheRepo + .listRecordSetData(Some(okZone.id), None, None, None, None, None, NameSort.ASC) + .unsafeRunSync() + found.recordSets should contain theSameElementsAs existing.map( + r => recordSetDataWithFQDN(r, okZone) + ) + } + "return record sets after the startFrom when set" in { + // load 5, start after the 3rd, we should get back the last two + val existing = insert(okZone, 5).map(_.recordSet).sortBy(_.name) + val startFrom = Some(PagingKey.toNextId(existing(2), true)) + val found = recordSetCacheRepo + .listRecordSetData(Some(okZone.id), startFrom, None, None, None, None, NameSort.ASC) + .unsafeRunSync() + + (found.recordSets should contain).theSameElementsInOrderAs( + existing + .drop(3) + .map(r => recordSetDataWithFQDN(r, okZone)) + ) + } + "return the record sets after the startFrom respecting maxItems" in { + // load 5, start after the 2nd, take 2, we should get back the 3rd and 4th + val existing = insert(okZone, 5).map(_.recordSet).sortBy(_.name) + val startFrom = Some(PagingKey.toNextId(existing(1), true)) + val found = recordSetCacheRepo + .listRecordSetData(Some(okZone.id), startFrom, Some(2), None, None, None, NameSort.ASC) + .unsafeRunSync() + + (found.recordSets should contain).theSameElementsInOrderAs( + existing + .slice(2, 4) + .map(r => recordSetDataWithFQDN(r, okZone)) + ) + } + "return the record sets after startFrom respecting maxItems and filter" in { + val recordNames = + List("aaa", "bbb", "ccc", "ddd", "eeez", "fffz", "ggg", "hhhz", "iii", "jjj") + val expectedNames = recordNames.filter(_.contains("z")) + + val newRecordSets = + for { + n <- recordNames + } yield aaaa.copy(zoneId = okZone.id, name = n, id = UUID.randomUUID().toString) + + val changes = newRecordSets.map(makeTestAddChange(_, okZone)) + insert(changes) + + val startFrom = Some(PagingKey.toNextId(newRecordSets(1), true)) + val found = recordSetCacheRepo + .listRecordSetData( + Some(okZone.id), + startFrom, + Some(3), + Some("*z*"), + None, + None, + NameSort.ASC + ) + .unsafeRunSync() + (found.recordSets.map(_.name) should contain).theSameElementsInOrderAs(expectedNames) + } + "return record sets using starts with wildcard" in { + val recordNames = List("aaa", "aab", "ccc") + val expectedNames = recordNames.filter(_.startsWith("aa")) + + val newRecordSets = + for { + n <- recordNames + } yield aaaa.copy(zoneId = okZone.id, name = n, id = UUID.randomUUID().toString) + + val changes = newRecordSets.map(makeTestAddChange(_, okZone)) + insert(changes) + + val found = recordSetCacheRepo + .listRecordSetData(Some(okZone.id), None, Some(3), Some("aa*"), None, None, NameSort.ASC) + .unsafeRunSync() + (found.recordSets.map(_.name) should contain).theSameElementsInOrderAs(expectedNames) + } + "return record sets using ends with wildcard" in { + val recordNames = List("aaa", "aab", "ccb") + val expectedNames = recordNames.filter(_.endsWith("b")) + + val newRecordSets = + for { + n <- recordNames + } yield aaaa.copy(zoneId = okZone.id, name = n, id = UUID.randomUUID().toString) + + val changes = newRecordSets.map(makeTestAddChange(_, okZone)) + insert(changes) + + val found = recordSetCacheRepo + .listRecordSetData(Some(okZone.id), None, Some(3), Some("*b"), None, None, NameSort.ASC) + .unsafeRunSync() + (found.recordSets.map(_.name) should contain).theSameElementsInOrderAs(expectedNames) + } + "return record sets exact match with no wildcards" in { + // load some deterministic names so we can filter and respect max items + val recordNames = List("aaa", "aab", "ccb") + val expectedNames = List("aaa") + + val newRecordSets = + for { + n <- recordNames + } yield aaaa.copy(zoneId = okZone.id, name = n, id = UUID.randomUUID().toString) + + val changes = newRecordSets.map(makeTestAddChange(_, okZone)) + insert(changes) + + val found = recordSetCacheRepo + .listRecordSetData(Some(okZone.id), None, Some(3), Some("aaa"), None, None, NameSort.ASC) + .unsafeRunSync() + (found.recordSets.map(_.name) should contain).theSameElementsInOrderAs(expectedNames) + } + "return select types of recordsets in a zone" in { + insert(okZone, 10).map(_.recordSet) + val found = recordSetCacheRepo + .listRecordSetData(Some(okZone.id), None, None, None, Some(Set(CNAME)), None, NameSort.ASC) + .unsafeRunSync() + found.recordSets shouldBe List() + found.recordTypeFilter shouldBe Some(Set(CNAME)) + } + "return all recordsets in a zone in descending order" in { + val existing = insert(okZone, 10).map(_.recordSet) + val found = recordSetCacheRepo + .listRecordSetData(Some(okZone.id), None, None, None, None, None, NameSort.DESC) + .unsafeRunSync() + found.recordSets should contain theSameElementsAs existing.map( + r => recordSetDataWithFQDN(r, okZone) + ) + found.nameSort shouldBe NameSort.DESC + } + "pages through the list properly" in { + // load 5 records, pages of 2, last page should have 1 result and no next id + val existing = insert(okZone, 5).map(_.recordSet).sortBy(_.name) + val page1 = recordSetCacheRepo + .listRecordSetData(Some(okZone.id), None, Some(2), None, None, None, NameSort.ASC) + .unsafeRunSync() + (page1.recordSets should contain).theSameElementsInOrderAs( + existing + .slice(0, 2) + .map(r => recordSetDataWithFQDN(r, okZone)) + ) + page1.nextId shouldBe Some(PagingKey.toNextId(page1.recordSets(1), true)) + + val page2 = recordSetCacheRepo + .listRecordSetData(Some(okZone.id), page1.nextId, Some(2), None, None, None, NameSort.ASC) + .unsafeRunSync() + (page2.recordSets should contain).theSameElementsInOrderAs( + existing + .slice(2, 4) + .map(r => recordSetDataWithFQDN(r, okZone)) + ) + page2.nextId shouldBe Some(PagingKey.toNextId(page2.recordSets(1), true)) + + val page3 = recordSetCacheRepo + .listRecordSetData(Some(okZone.id), page2.nextId, Some(2), None, None, None, NameSort.ASC) + .unsafeRunSync() + (page3.recordSets should contain).theSameElementsInOrderAs( + existing + .slice(4, 5) + .map(r => recordSetDataWithFQDN(r, okZone)) + ) + page3.nextId shouldBe None + } + + "return applicable recordsets in ascending order when recordNameFilter is given" in { + val existing = insert(okZone, 10).map(_.recordSet) + val found = recordSetCacheRepo + .listRecordSetData(None, None, None, Some("*.ok*"), None, None, NameSort.ASC) + .unsafeRunSync() + found.recordSets should contain theSameElementsAs existing.map( + r => recordSetDataWithFQDN(r, okZone) + ) + } + "return applicable recordsets in descending order when recordNameFilter is given and name sort is descending" in { + val existing = insert(okZone, 10).map(_.recordSet) + val found = recordSetCacheRepo + .listRecordSetData(None, None, None, Some("*.ok*"), None, None, NameSort.DESC) + .unsafeRunSync() + found.recordSets should contain theSameElementsAs existing + .map(r => recordSetDataWithFQDN(r, okZone)) + .reverse + } + "return applicable recordsets in descending order when recordNameFilter supports full-text" in { + val existing = insert(okZone, 10).map(_.recordSet) + val found = recordSetCacheRepo + .listRecordSetData(None, None, None, Some("apply*test"), None, None, NameSort.DESC) + .unsafeRunSync() + found.recordSets should contain theSameElementsAs existing + .map(r => recordSetDataWithFQDN(r, okZone)) + .reverse + } + "return no recordsets when no zoneId or recordNameFilter are given" in { + val found = + recordSetCacheRepo.listRecordSetData(None, None, None, None, None, None, NameSort.ASC).unsafeRunSync() + found.recordSets shouldBe empty + } + } + + "deleteRecordSetsInZone" should { + "delete recordsets from table with matching zone id" in { + insert(okZone, 20) + insert(abcZone, 10) + + recordSetCacheRepo.getRecordSetDataCount(okZone.id).unsafeRunSync() shouldBe 20 + recordSetCacheRepo.getRecordSetDataCount(abcZone.id).unsafeRunSync() shouldBe 10 + executeWithinTransaction { db: DB => + recordSetCacheRepo.deleteRecordSetDataInZone(db, okZone.id, okZone.name)}.unsafeRunSync() should not be a[Throwable] + + recordSetCacheRepo.getRecordSetDataCount(okZone.id).unsafeRunSync() shouldBe 0 + recordSetCacheRepo.getRecordSetDataCount(abcZone.id).unsafeRunSync() shouldBe 10 + } + + "not fail if there is nothing to delete" in { + recordSetCacheRepo.getRecordSetDataCount(okZone.id).unsafeRunSync() shouldBe 0 + executeWithinTransaction { db: DB => + recordSetCacheRepo.deleteRecordSetDataInZone(db,okZone.id, okZone.name)}.unsafeRunSync() should not be a[Throwable] + } + } +} diff --git a/modules/mysql/src/main/scala/vinyldns/mysql/repository/MySqlRecordSetCacheRepository.scala b/modules/mysql/src/main/scala/vinyldns/mysql/repository/MySqlRecordSetCacheRepository.scala new file mode 100644 index 000000000..6d5e95895 --- /dev/null +++ b/modules/mysql/src/main/scala/vinyldns/mysql/repository/MySqlRecordSetCacheRepository.scala @@ -0,0 +1,439 @@ +/* + * Copyright 2018 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package vinyldns.mysql.repository + +import cats.implicits._ +import org.slf4j.LoggerFactory +import scalikejdbc._ +import vinyldns.core.domain.record._ +import vinyldns.core.protobuf.ProtobufConversions +import vinyldns.core.route.Monitored +import cats.effect.IO +import inet.ipaddr.IPAddressString +import vinyldns.core.domain.record.NameSort.{ASC, NameSort} +import vinyldns.core.domain.record.RecordType.RecordType +import vinyldns.mysql.repository.MySqlRecordSetRepository.{PagingKey, fromRecordType, toFQDN} +import vinyldns.proto.VinylDNSProto + +import scala.util.{Try, Success, Failure} + +class MySqlRecordSetCacheRepository + extends RecordSetCacheRepository + with Monitored + with ProtobufConversions { + + private val INSERT_RECORDSETDATA = + sql"INSERT INTO recordset_data(recordset_id, zone_id, fqdn, reverse_fqdn, type, record_data, ip) VALUES ({recordset_id}, {zone_id}, {fqdn}, {reverse_fqdn}, {type}, {record_data}, INET6_ATON({ip}))" + + private val DELETE_RECORDSETDATA = + sql"DELETE FROM recordset_data WHERE recordset_id = ?" + + private val DELETE_RECORDSETDATA_IN_ZONE = + sql"DELETE FROM recordset_data WHERE zone_id = ?" + + private val COUNT_RECORDSETDATA_IN_ZONE = + sql""" + |SELECT count(*) + | FROM recordset_data + | WHERE zone_id = {zone_id} + """.stripMargin + + private val FIND_BY_ID = + sql""" + |SELECT recordset.data, recordset_data.fqdn + | FROM recordset_data + | INNER JOIN recordset ON recordset.id=recordset_data.recordset_id + | WHERE recordset_data.recordset_id = {recordset_id} + """.stripMargin + + private final val logger = LoggerFactory.getLogger(classOf[MySqlRecordSetRepository]) + + def save(db: DB, changeSet: ChangeSet): IO[ChangeSet] = { + val byStatus = changeSet.changes.groupBy(_.status) + val failedChanges = byStatus.getOrElse(RecordSetChangeStatus.Failed, Seq()) + val (failedCreates, failedUpdatesOrDeletes) = + failedChanges.partition(_.changeType == RecordSetChangeType.Create) + + val reversionDeletes = failedCreates.map(d => Seq[Any](d.recordSet.id)) + failedUpdatesOrDeletes.flatMap { change => + change.updates.map { oldRs => + Seq[Any]( + updateRecordDataList( + db, + oldRs.id, + oldRs.records, + oldRs.typ, + oldRs.zoneId, + toFQDN(change.zone.name, oldRs.name) + ) + ) + } + } + // address successful and pending changes + val completeChanges = byStatus.getOrElse(RecordSetChangeStatus.Complete, Seq()) + val completeChangesByType = completeChanges.groupBy(_.changeType) + val completeCreates = completeChangesByType.getOrElse(RecordSetChangeType.Create, Seq()) + val completeUpdates = completeChangesByType.getOrElse(RecordSetChangeType.Update, Seq()) + val completeDeletes = completeChangesByType.getOrElse(RecordSetChangeType.Delete, Seq()) + + val pendingChanges = byStatus.getOrElse(RecordSetChangeStatus.Pending, Seq()) + + // all pending changes are saved as if they are creates + (completeCreates ++ pendingChanges).map { i => + Seq[Any]( + insertRecordDataList( + db, + i.recordSet.id, + i.recordSet.records, + i.recordSet.typ, + i.recordSet.zoneId, + toFQDN(i.zone.name, i.recordSet.name) + )) + } + completeUpdates.map { u => + Seq[Any]( + updateRecordDataList( + db, + u.recordSet.id, + u.recordSet.records, + u.recordSet.typ, + u.recordSet.zoneId, + toFQDN(u.zone.name, u.recordSet.name), + ) + ) + } + + val deletes: Seq[Seq[Any]] = completeDeletes.map(d => Seq[Any](d.recordSet.id)) + IO { + db.withinTx { implicit session => + (deletes ++ reversionDeletes).grouped(1000).foreach { group => + DELETE_RECORDSETDATA.batch(group: _*).apply() + } + } + }.as(changeSet) + + } + + def deleteRecordSetDataInZone(db: DB, zone_id: String, zoneName: String): IO[Unit] = + monitor("repo.RecordSet.deleteRecordSetDataInZone") { + IO { + val numDeleted = db.withinTx { implicit session => + DELETE_RECORDSETDATA_IN_ZONE + .bind(zone_id) + .update() + .apply() + } + logger.info(s"Deleted $numDeleted records from zone $zoneName (zone id: $zone_id)") + }.handleErrorWith { error => + logger.error(s"Failed deleting records from zone $zoneName (zone id: $zone_id)", error) + IO.raiseError(error) + } + } + + def insertRecordDataList(db: DB, + recordID: String, + recordData: List[RecordData], + recordType: RecordType, + zoneId: String, + fqdn: String): Unit = storeRecordDataList(db, recordID, recordData, recordType, zoneId, fqdn) + + def updateRecordDataList(db: DB, + recordID: String, + recordData: List[RecordData], + recordType: RecordType, + zoneId: String, + fqdn: String): Unit = { + db.withinTx { implicit session => + DELETE_RECORDSETDATA + .bind(recordID) + .update() + .apply() + storeRecordDataList(db, recordID, recordData, recordType, zoneId, fqdn) + } + } + + private def storeRecordDataList(db: DB, + recordId: String, + recordData: List[RecordData], + recordType: RecordType, + zoneId: String, + fqdn: String): Unit = { + recordData.foreach(record => saveRecordSetData(db, recordId, zoneId, fqdn, recordType, record)) + } + + /** + * Inserts data into the RecordSet Data table + * + * @param db The database connection + * @param recordId The record identifier + * @param zoneId The zone identifier + * @param fqdn The fully qualified domain name + * @param recordType The record type + * @param recordData The record data + */ + private def saveRecordSetData(db: DB, + recordId: String, + zoneId: String, + fqdn: String, + recordType: RecordType, + recordData: RecordData, + ): Unit = { + // We want to get the protobuf string format of the record data. This provides + // slightly more information when doing RData searches. + // Example: + // An SOA record may contain the following + // mname:"auth.vinyldns.io." rname:"admin.vinyldns.io." serial:14 refresh:7200 retry:3600 expire:1209600 minimum:900 + // This allows us to potentially search for SOA records with "refresh:7200" + val recordDataString = raw"""([a-z]+): ("|\d)""".r.replaceAllIn(recordDataToPB(recordData).toString.trim, "$1:$2") + + // Extract the IP address from the forward or reverse record + val parsedIp = recordType match { + case RecordType.PTR => parseIP(fqdn) + case RecordType.A | RecordType.AAAA => parseIP(recordDataString) + case _ => None + } + + db.withinTx { implicit session => + INSERT_RECORDSETDATA + .bindByName( + 'recordset_id -> recordId, + 'zone_id -> zoneId, + 'fqdn -> fqdn, + 'reverse_fqdn -> fqdn.reverse, + 'type -> recordType.toString, + 'record_data -> recordDataString, + 'ip -> parsedIp.orNull + ) + .update() + .apply() + }} + + /** + * Retrieves recordset data for records with the given {@code recordId} in the recordset + * In MySql we do not need the zone id, since can hit the key directly + * + * @param recordSetId The identifier for the recordset + * @return A list of {@link RecordSet} matching the criteria + */ + def getRecordSetData(recordSetId: String): IO[Option[RecordSet]] = + monitor("repo.RecordSet.getRecordSetData") { + IO { + DB.readOnly { implicit s => + FIND_BY_ID.bindByName('recordset_id -> recordSetId).map(toRecordSetData).single().apply() + } + } + } + + /** + * Retrieves no. of recordset data counts for records with the given {@code zoneId} in the recordset + * + * @param zoneId The identifier for the zone + * @return A no. of recordset data counts of {@link RecordSet} matching the criteria + */ + def getRecordSetDataCount(zoneId: String): IO[Int] = + monitor("repo.RecordSet.getRecordSetDataCount") { + IO { + DB.readOnly { implicit s => + // this is a count query, so should always return a value. However, scalikejdbc doesn't have this, + // so we have to default to 0. it is literally impossible to not return a value + COUNT_RECORDSETDATA_IN_ZONE + .bindByName('zone_id -> zoneId) + .map(_.int(1)) + .single + .apply() + .getOrElse(0) + } + } + } + + /** + * Retrieves recordset data for records with the given {@code recordNameFilter} in the zone given by {@code zoneId} of the + * record type given by {@code recordTypeFilter} given by {@code recordOwnerGroupFilter} + * + * @param zoneId The identifier for the zone + * @param startFrom The Limits for Start from records + * @param maxItems The Limits for maximum records(INT) + * @param recordNameFilter The name of the record + * @param recordTypeFilter The {@link RecordType} to include in the results + * @param recordOwnerGroupFilter The OwnerGroup of the record + * @param nameSort The Sort of record Name + * @return A list of {@link RecordSet} matching the criteria + */ + def listRecordSetData( + zoneId: Option[String], + startFrom: Option[String], + maxItems: Option[Int], + recordNameFilter: Option[String], + recordTypeFilter: Option[Set[RecordType]], + recordOwnerGroupFilter: Option[String], + nameSort: NameSort + ): IO[ListRecordSetResults] = + monitor("repo.RecordSet.listRecordSetData") { + IO { + val maxPlusOne = maxItems.map(_ + 1) + val wildcardStart = raw"^\s*[*%](.+[^*%])\s*$$".r + + // setup optional filters + val zoneAndNameFilters = (zoneId, recordNameFilter) match { + case (Some(zId), Some(rName)) => + Some(sqls"recordset.zone_id = $zId AND recordset.name LIKE ${rName.replace('*', '%')} ") + case (None, Some(fqdn)) => fqdn match { + case fqdn if wildcardStart.pattern.matcher(fqdn).matches() => + // If we have a wildcard at the beginning only, then use the reverse_fqdn DB index + Some(sqls"recordset_data.reverse_fqdn LIKE ${wildcardStart.replaceAllIn(fqdn, "$1").reverse.replace('*', '%') + "%"} ") + case _ => + // By default, just use a LIKE query + Some(sqls"recordset.fqdn LIKE ${fqdn.replace('*', '%')} ") + } + case (Some(zId), None) => Some(sqls"recordset.zone_id = $zId ") + case _ => None + } + + val searchByZone = zoneId.fold[Boolean](false)(_ => true) + val pagingKey = PagingKey(startFrom) + + // sort by name or fqdn in given order + val sortBy = (searchByZone, nameSort) match { + case (true, NameSort.DESC) => + pagingKey.as( + sqls"((recordset.name <= ${pagingKey.map(pk => pk.recordName)} AND recordset.type > ${pagingKey.map(pk => pk.recordType)}) OR recordset.name < ${pagingKey.map(pk => pk.recordName)})" + ) + case (false, NameSort.ASC) => + pagingKey.as( + sqls"((recordset_data.fqdn >= ${pagingKey.map(pk => pk.recordName)} AND recordset.type > ${pagingKey.map(pk => pk.recordType)}) OR recordset_data.fqdn > ${pagingKey.map(pk => pk.recordName)})" + ) + case (false, NameSort.DESC) => + pagingKey.as( + sqls"((recordset_data.fqdn <= ${pagingKey.map(pk => pk.recordName)} AND recordset.type > ${pagingKey.map(pk => pk.recordType)}) OR recordset_data.fqdn < ${pagingKey.map(pk => pk.recordName)})" + ) + case _ => + pagingKey.as( + sqls"((recordset.name >= ${pagingKey.map(pk => pk.recordName)} AND recordset.type > ${pagingKey.map(pk => pk.recordType)}) OR recordset.name > ${pagingKey.map(pk => pk.recordName)})" + ) + } + + val typeFilter = recordTypeFilter.map { t => + val list = t.map(fromRecordType) + sqls"recordset.type IN ($list)" + } + + val ownerGroupFilter = + recordOwnerGroupFilter.map(owner => sqls"recordset.owner_group_id = $owner ") + + val opts = + (zoneAndNameFilters ++ sortBy ++ typeFilter ++ ownerGroupFilter).toList + + val qualifiers = if (nameSort == ASC) { + sqls"ORDER BY recordset_data.fqdn ASC, recordset.type ASC " + } + else { + sqls"ORDER BY recordset_data.fqdn DESC, recordset.type ASC " + } + + val recordLimit = maxPlusOne match { + case Some(limit) => sqls"LIMIT $limit" + case None => sqls"" + } + + val finalQualifiers = qualifiers.append(recordLimit) + + // Construct query. We include the MySQL MAX_EXECUTION_TIME directive here to limit the maximum amount of time + // this query can execute. The API should timeout before we reach 20s - this is just to avoid user-generated + // long-running queries leading to performance degradation + val initialQuery = sqls"SELECT /*+ MAX_EXECUTION_TIME(20000) */ recordset.data, recordset.fqdn FROM recordset_data " + + // Join query for data column from recordset table + val recordsetDataJoin = sqls"RIGHT JOIN recordset ON recordset.id=recordset_data.recordset_id " + + val recordsetDataJoinQuery = initialQuery.append(recordsetDataJoin) + + val appendOpts = if (opts.nonEmpty) { + val setDelimiter = SQLSyntax.join(opts, sqls"AND") + val addWhere = sqls"WHERE" + addWhere.append(setDelimiter) + } else sqls"" + + val appendQueries = recordsetDataJoinQuery.append(appendOpts) + + val finalQuery = appendQueries.append(finalQualifiers) + DB.readOnly { implicit s => + val results = sql"$finalQuery" + .map(toRecordSetData) + .list() + .apply() + + val newResults = if (maxPlusOne.contains(results.size)) { + results.dropRight(1) + } else { + results + } + + // if size of results is less than the maxItems plus one, we don't have a next id + // if maxItems is None, we don't have a next id + val nextId = maxPlusOne + .filter(_ == results.size) + .flatMap(_ => newResults.lastOption.map(PagingKey.toNextId(_, searchByZone))) + + ListRecordSetResults( + recordSets = newResults, + nextId = nextId, + startFrom = startFrom, + maxItems = maxItems, + recordNameFilter = recordNameFilter, + recordTypeFilter = recordTypeFilter, + nameSort = nameSort) + } + } + } + + + private val IPV4_ARPA = ".in-addr.arpa." + private val IPV6_ARPA = ".ip6.arpa." + private val innerRecordRegex = "(?i).*?\"((?:[0-9a-f]+[:.]+)+[0-9a-f]+)\".*".r + + /** + * parseIP address from given record data. + * + * @param ipAsString The IP address to parse + * @return The parsed IP address + */ + def parseIP(ipAsString: String): Option[String] = { + + def reverse4(ipv4: String) = ipv4.split('.').reverse.mkString(".") + + def reverse6(ipv6: String) = ipv6.split('.').reverse.grouped(4).toArray.map(ip => ip.mkString("")).mkString(":") + + val extractedIp = ipAsString match { + case addr if addr.endsWith(IPV4_ARPA) => reverse4(addr.replace(IPV4_ARPA, "")) + case addr if addr.endsWith(IPV6_ARPA) => reverse6(addr.replace(IPV6_ARPA, "")) + case addr => innerRecordRegex.replaceAllIn(addr, "$1") + } + + // Create a canonical address + Try { + new IPAddressString(extractedIp).toAddress + } match { + case Success(v) => Some(v.toCanonicalString) + case Failure(e) => + logger.warn(s"error parsing IP address $extractedIp", e) + None + } + } + + def toRecordSetData(rs: WrappedResultSet): RecordSet = + fromPB(VinylDNSProto.RecordSet.parseFrom(rs.bytes(1))).copy(fqdn = rs.stringOpt(2)) +}