mirror of
https://github.com/VinylDNS/vinyldns
synced 2025-08-22 10:10:12 +00:00
Add MySqlRecordSetRepository (#309)
* Add MySqlRecordSetRepository * Updated docker for mysql to use general_log for fun sql debug times * Made sure to use rewriteBatchStatements to acheive new hights for bulk inserts * `MySqlDataStoreProvider` support for the record set repo
This commit is contained in:
parent
fdea04a536
commit
5832fcbb73
@ -55,7 +55,6 @@ lazy val sharedSettings = Seq(
|
|||||||
scalacOptions in (Compile, doc) += "-no-link-warnings",
|
scalacOptions in (Compile, doc) += "-no-link-warnings",
|
||||||
// Use wart remover to eliminate code badness
|
// Use wart remover to eliminate code badness
|
||||||
wartremoverErrors ++= Seq(
|
wartremoverErrors ++= Seq(
|
||||||
Wart.ArrayEquals,
|
|
||||||
Wart.EitherProjectionPartial,
|
Wart.EitherProjectionPartial,
|
||||||
Wart.IsInstanceOf,
|
Wart.IsInstanceOf,
|
||||||
Wart.JavaConversions,
|
Wart.JavaConversions,
|
||||||
@ -298,7 +297,7 @@ lazy val mysql = (project in file("modules/mysql"))
|
|||||||
.settings(libraryDependencies ++= mysqlDependencies ++ commonTestDependencies.map(_ % "test, it"))
|
.settings(libraryDependencies ++= mysqlDependencies ++ commonTestDependencies.map(_ % "test, it"))
|
||||||
.settings(scalaStyleCompile ++ scalaStyleTest)
|
.settings(scalaStyleCompile ++ scalaStyleTest)
|
||||||
.settings(
|
.settings(
|
||||||
organization := "io.vinyldns",
|
organization := "io.vinyldns"
|
||||||
).dependsOn(core % "compile->compile;test->test")
|
).dependsOn(core % "compile->compile;test->test")
|
||||||
.settings(name := "mysql")
|
.settings(name := "mysql")
|
||||||
|
|
||||||
|
@ -244,6 +244,7 @@ vinyldns {
|
|||||||
prepStmtCacheSqlLimit = 2048
|
prepStmtCacheSqlLimit = 2048
|
||||||
cachePrepStmts = true
|
cachePrepStmts = true
|
||||||
useServerPrepStmts = true
|
useServerPrepStmts = true
|
||||||
|
rewriteBatchedStatements = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -483,6 +484,7 @@ vinyldns {
|
|||||||
prepStmtCacheSqlLimit = 2048
|
prepStmtCacheSqlLimit = 2048
|
||||||
cachePrepStmts = true
|
cachePrepStmts = true
|
||||||
useServerPrepStmts = true
|
useServerPrepStmts = true
|
||||||
|
rewriteBatchedStatements = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
3
modules/mysql/docker/conf/config-file.cnf
Normal file
3
modules/mysql/docker/conf/config-file.cnf
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
[mysqld]
|
||||||
|
# Turn this on to record queries
|
||||||
|
general_log = 1
|
@ -7,3 +7,5 @@ services:
|
|||||||
- MYSQL_ROOT_HOST=% # this is required as mysql is currently locked down to localhost
|
- MYSQL_ROOT_HOST=% # this is required as mysql is currently locked down to localhost
|
||||||
ports:
|
ports:
|
||||||
- "19004:3306"
|
- "19004:3306"
|
||||||
|
volumes:
|
||||||
|
- ./conf:/etc/mysql/conf.d
|
||||||
|
@ -7,8 +7,8 @@ mysql {
|
|||||||
# assumes a docker or mysql instance running locally
|
# assumes a docker or mysql instance running locally
|
||||||
name = "vinyldns"
|
name = "vinyldns"
|
||||||
driver = "org.mariadb.jdbc.Driver"
|
driver = "org.mariadb.jdbc.Driver"
|
||||||
migration-url = "jdbc:mariadb://localhost:19004/?user=root&password=pass"
|
migration-url = "jdbc:mariadb://localhost:19004/"
|
||||||
url = "jdbc:mariadb://localhost:19004/vinyldns?user=root&password=pass"
|
url = "jdbc:mariadb://localhost:19004/vinyldns"
|
||||||
user = "root"
|
user = "root"
|
||||||
password = "pass"
|
password = "pass"
|
||||||
|
|
||||||
@ -23,12 +23,14 @@ mysql {
|
|||||||
cachePrepStmts=true
|
cachePrepStmts=true
|
||||||
prepStmtCacheSize=250
|
prepStmtCacheSize=250
|
||||||
prepStmtCacheSqlLimit=2048
|
prepStmtCacheSqlLimit=2048
|
||||||
|
rewriteBatchedStatements=true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
repositories {
|
repositories {
|
||||||
batch-change {}
|
batch-change {}
|
||||||
zone {}
|
zone {}
|
||||||
|
record-set {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,10 +17,11 @@
|
|||||||
package vinyldns.mysql
|
package vinyldns.mysql
|
||||||
|
|
||||||
import com.typesafe.config.{Config, ConfigFactory}
|
import com.typesafe.config.{Config, ConfigFactory}
|
||||||
import vinyldns.core.crypto.NoOpCrypto
|
|
||||||
import vinyldns.core.domain.batch.BatchChangeRepository
|
import vinyldns.core.domain.batch.BatchChangeRepository
|
||||||
import vinyldns.core.domain.membership.UserRepository
|
import vinyldns.core.domain.membership.UserRepository
|
||||||
import vinyldns.core.domain.zone.{ZoneChangeRepository, ZoneRepository}
|
import vinyldns.core.domain.zone.{ZoneChangeRepository, ZoneRepository}
|
||||||
|
import vinyldns.core.crypto.NoOpCrypto
|
||||||
|
import vinyldns.core.domain.record.RecordSetRepository
|
||||||
import vinyldns.core.repository.{DataStore, DataStoreConfig, RepositoryName}
|
import vinyldns.core.repository.{DataStore, DataStoreConfig, RepositoryName}
|
||||||
import vinyldns.mysql.repository.MySqlDataStoreProvider
|
import vinyldns.mysql.repository.MySqlDataStoreProvider
|
||||||
|
|
||||||
@ -41,6 +42,8 @@ trait MySqlIntegrationSpec {
|
|||||||
instance.get[ZoneChangeRepository](RepositoryName.zoneChange).get
|
instance.get[ZoneChangeRepository](RepositoryName.zoneChange).get
|
||||||
lazy val userRepository: UserRepository =
|
lazy val userRepository: UserRepository =
|
||||||
instance.get[UserRepository](RepositoryName.user).get
|
instance.get[UserRepository](RepositoryName.user).get
|
||||||
|
lazy val recordSetRepository: RecordSetRepository =
|
||||||
|
instance.get[RecordSetRepository](RepositoryName.recordSet).get
|
||||||
|
|
||||||
def shutdown(): Unit = provider.shutdown().unsafeRunSync()
|
def shutdown(): Unit = provider.shutdown().unsafeRunSync()
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,234 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2018 Comcast Cable Communications Management, LLC
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package vinyldns.mysql.repository
|
||||||
|
import java.util.UUID
|
||||||
|
|
||||||
|
import cats.scalatest.EitherMatchers
|
||||||
|
import org.scalatest._
|
||||||
|
import scalikejdbc.DB
|
||||||
|
import vinyldns.core.domain.record.{ChangeSet, RecordSetChange}
|
||||||
|
import vinyldns.core.domain.zone.Zone
|
||||||
|
import vinyldns.mysql.TestMySqlInstance
|
||||||
|
|
||||||
|
class MySqlRecordSetRepositoryIntegrationSpec
|
||||||
|
extends WordSpec
|
||||||
|
with BeforeAndAfterEach
|
||||||
|
with BeforeAndAfterAll
|
||||||
|
with Matchers
|
||||||
|
with EitherMatchers {
|
||||||
|
|
||||||
|
import vinyldns.core.TestRecordSetData._
|
||||||
|
import vinyldns.core.TestZoneData._
|
||||||
|
private val repo = TestMySqlInstance.recordSetRepository.asInstanceOf[MySqlRecordSetRepository]
|
||||||
|
|
||||||
|
override protected def beforeEach(): Unit = clear()
|
||||||
|
|
||||||
|
override protected def afterAll(): Unit = clear()
|
||||||
|
|
||||||
|
def clear(): Unit =
|
||||||
|
DB.localTx { s =>
|
||||||
|
s.executeUpdate("DELETE FROM recordset")
|
||||||
|
}
|
||||||
|
|
||||||
|
def generateInserts(zone: Zone, count: Int): List[RecordSetChange] = {
|
||||||
|
val newRecordSets =
|
||||||
|
for {
|
||||||
|
i <- 1 to count
|
||||||
|
} yield
|
||||||
|
aaaa.copy(
|
||||||
|
zoneId = zone.id,
|
||||||
|
name = s"$i-apply-test",
|
||||||
|
id = UUID.randomUUID().toString)
|
||||||
|
|
||||||
|
newRecordSets.map(makeTestAddChange(_, zone)).toList
|
||||||
|
}
|
||||||
|
|
||||||
|
def insert(zone: Zone, count: Int): List[RecordSetChange] = {
|
||||||
|
val pendingChanges = generateInserts(zone, count)
|
||||||
|
val bigPendingChangeSet = ChangeSet(pendingChanges)
|
||||||
|
repo.apply(bigPendingChangeSet).unsafeRunSync()
|
||||||
|
pendingChanges
|
||||||
|
}
|
||||||
|
|
||||||
|
def insert(changes: List[RecordSetChange]): Unit = {
|
||||||
|
val bigPendingChangeSet = ChangeSet(changes)
|
||||||
|
repo.apply(bigPendingChangeSet).unsafeRunSync()
|
||||||
|
()
|
||||||
|
}
|
||||||
|
|
||||||
|
"inserting record sets" should {
|
||||||
|
"be idempotent for inserts" in {
|
||||||
|
val pendingChanges = generateInserts(okZone, 1000)
|
||||||
|
val bigPendingChangeSet = ChangeSet(pendingChanges)
|
||||||
|
repo.apply(bigPendingChangeSet).unsafeRunSync()
|
||||||
|
repo.apply(bigPendingChangeSet).attempt.unsafeRunSync() shouldBe right
|
||||||
|
}
|
||||||
|
"work for multiple inserts" in {
|
||||||
|
val pendingChanges = generateInserts(okZone, 20)
|
||||||
|
|
||||||
|
val bigPendingChangeSet = ChangeSet(pendingChanges)
|
||||||
|
repo.apply(bigPendingChangeSet).unsafeRunSync()
|
||||||
|
|
||||||
|
// let's make sure we have all 1000 records
|
||||||
|
val recordCount = repo.getRecordSetCount(okZone.id).unsafeRunSync()
|
||||||
|
recordCount shouldBe 20
|
||||||
|
}
|
||||||
|
"works for deletes, updates, and inserts" in {
|
||||||
|
// create some record sets to be updated
|
||||||
|
val existing = insert(okZone, 10).map(_.recordSet)
|
||||||
|
|
||||||
|
// update a few, delete a few
|
||||||
|
val deletes = existing.take(2).map(makeTestDeleteChange(_, okZone))
|
||||||
|
|
||||||
|
// updates we will just add the letter u to
|
||||||
|
val updates = existing.slice(3, 5).map { rs =>
|
||||||
|
val update = rs.copy(name = "u" + rs.name)
|
||||||
|
makeTestUpdateChange(rs, update, okZone)
|
||||||
|
}
|
||||||
|
|
||||||
|
// insert a few more
|
||||||
|
val inserts = generateInserts(okZone, 2)
|
||||||
|
|
||||||
|
// exercise the entire change set
|
||||||
|
val cs = ChangeSet(deletes ++ updates ++ inserts)
|
||||||
|
repo.apply(cs).unsafeRunSync()
|
||||||
|
|
||||||
|
// make sure the deletes are gone
|
||||||
|
repo.getRecordSet(okZone.id, deletes(0).recordSet.id).unsafeRunSync() shouldBe None
|
||||||
|
repo.getRecordSet(okZone.id, deletes(1).recordSet.id).unsafeRunSync() shouldBe None
|
||||||
|
|
||||||
|
// make sure the updates are updated
|
||||||
|
repo.getRecordSet(okZone.id, updates(0).recordSet.id).unsafeRunSync().map(_.name) shouldBe Some(updates(0).recordSet.name)
|
||||||
|
repo.getRecordSet(okZone.id, updates(1).recordSet.id).unsafeRunSync().map(_.name) shouldBe Some(updates(1).recordSet.name)
|
||||||
|
|
||||||
|
// make sure the new ones are there
|
||||||
|
repo.getRecordSet(okZone.id, inserts(0).recordSet.id).unsafeRunSync().map(_.name) shouldBe Some(inserts(0).recordSet.name)
|
||||||
|
repo.getRecordSet(okZone.id, inserts(1).recordSet.id).unsafeRunSync().map(_.name) shouldBe Some(inserts(1).recordSet.name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"list record sets" should {
|
||||||
|
"return all record sets in a zone when optional params are not set" in {
|
||||||
|
val existing = insert(okZone, 10).map(_.recordSet)
|
||||||
|
val found = repo.listRecordSets(okZone.id, None, None, None).unsafeRunSync()
|
||||||
|
found.recordSets should contain theSameElementsAs existing
|
||||||
|
}
|
||||||
|
"return record sets after the startFrom when set" in {
|
||||||
|
// load 5, start after the 3rd, we should get back the last two
|
||||||
|
val existing = insert(okZone, 5).map(_.recordSet).sortBy(_.name)
|
||||||
|
val startFrom = Some(existing(2).name)
|
||||||
|
val found = repo.listRecordSets(okZone.id, startFrom, None, None).unsafeRunSync()
|
||||||
|
|
||||||
|
found.recordSets should contain theSameElementsInOrderAs existing.drop(3)
|
||||||
|
}
|
||||||
|
"return the record sets after the startFrom respecting maxItems" in {
|
||||||
|
// load 5, start after the 2nd, take 2, we should get back the 3rd and 4th
|
||||||
|
val existing = insert(okZone, 5).map(_.recordSet).sortBy(_.name)
|
||||||
|
val startFrom = Some(existing(1).name)
|
||||||
|
val found = repo.listRecordSets(okZone.id, startFrom, Some(2), None).unsafeRunSync()
|
||||||
|
|
||||||
|
found.recordSets should contain theSameElementsInOrderAs existing.slice(2, 4)
|
||||||
|
}
|
||||||
|
"return the record sets after startFrom respecting maxItems and filter" in {
|
||||||
|
// load some deterministic names so we can filter and respect max items
|
||||||
|
val recordNames = List("aaa", "bbb", "ccc", "ddd", "eeez", "fffz", "ggg", "hhhz", "iii", "jjj")
|
||||||
|
|
||||||
|
// our search will be filtered by records with "z"
|
||||||
|
val expectedNames = recordNames.filter(_.contains("z"))
|
||||||
|
|
||||||
|
val newRecordSets =
|
||||||
|
for {
|
||||||
|
n <- recordNames
|
||||||
|
} yield
|
||||||
|
aaaa.copy(
|
||||||
|
zoneId = okZone.id,
|
||||||
|
name = n,
|
||||||
|
id = UUID.randomUUID().toString)
|
||||||
|
|
||||||
|
val changes = newRecordSets.map(makeTestAddChange(_, okZone))
|
||||||
|
insert(changes)
|
||||||
|
|
||||||
|
// start after the second, pulling 3 records that have "z"
|
||||||
|
val startFrom = Some(newRecordSets(1).name)
|
||||||
|
val found = repo.listRecordSets(okZone.id, startFrom, Some(3), Some("z")).unsafeRunSync()
|
||||||
|
found.recordSets.map(_.name) should contain theSameElementsInOrderAs expectedNames
|
||||||
|
}
|
||||||
|
"pages through the list properly" in {
|
||||||
|
// load 5 records, pages of 2, last page should have 1 result and no next id
|
||||||
|
val existing = insert(okZone, 5).map(_.recordSet).sortBy(_.name)
|
||||||
|
val page1 = repo.listRecordSets(okZone.id, None, Some(2), None).unsafeRunSync()
|
||||||
|
page1.recordSets should contain theSameElementsInOrderAs existing.slice(0, 2)
|
||||||
|
page1.nextId shouldBe Some(page1.recordSets(1).name)
|
||||||
|
|
||||||
|
val page2 = repo.listRecordSets(okZone.id, page1.nextId, Some(2), None).unsafeRunSync()
|
||||||
|
page2.recordSets should contain theSameElementsInOrderAs existing.slice(2, 4)
|
||||||
|
page2.nextId shouldBe Some(page2.recordSets(1).name)
|
||||||
|
|
||||||
|
val page3 = repo.listRecordSets(okZone.id, page2.nextId, Some(2), None).unsafeRunSync()
|
||||||
|
page3.recordSets should contain theSameElementsInOrderAs existing.slice(4, 5)
|
||||||
|
page3.nextId shouldBe None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"get record sets by name and type" should {
|
||||||
|
"return a record set when there is a match" in {
|
||||||
|
val existing = insert(okZone, 1).map(_.recordSet)
|
||||||
|
val results = repo.getRecordSets(okZone.id, existing(0).name, existing(0).typ).unsafeRunSync()
|
||||||
|
results.headOption shouldBe existing.headOption
|
||||||
|
}
|
||||||
|
"return none when there is no match" in {
|
||||||
|
val existing = insert(okZone, 1).map(_.recordSet)
|
||||||
|
val results = repo.getRecordSets(okZone.id, "not-there", existing(0).typ).unsafeRunSync()
|
||||||
|
results shouldBe empty
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"get record set by id" should {
|
||||||
|
"return a record set when there is a match" in {
|
||||||
|
val existing = insert(okZone, 1).map(_.recordSet)
|
||||||
|
val result = repo.getRecordSet(okZone.id, existing(0).id).unsafeRunSync()
|
||||||
|
result shouldBe existing.headOption
|
||||||
|
}
|
||||||
|
"return none when there is no match" in {
|
||||||
|
insert(okZone, 1).map(_.recordSet)
|
||||||
|
val result = repo.getRecordSet(okZone.id, "not-there").unsafeRunSync()
|
||||||
|
result shouldBe None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"get record set count for zone" should {
|
||||||
|
"return the correct number of records in the zone" in {
|
||||||
|
insert(okZone, 10)
|
||||||
|
repo.getRecordSetCount(okZone.id).unsafeRunSync() shouldBe 10
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"get record sets by name" should {
|
||||||
|
"return a record set when there is a match" in {
|
||||||
|
val newRecordSets = List(
|
||||||
|
aaaa.copy(name = "foo"),
|
||||||
|
rsOk.copy(name = "foo")
|
||||||
|
)
|
||||||
|
val changes = newRecordSets.map(makeTestAddChange(_, okZone))
|
||||||
|
val expected = changes.map(_.recordSet)
|
||||||
|
repo.apply(ChangeSet(changes)).unsafeRunSync()
|
||||||
|
val results = repo.getRecordSetsByName(okZone.id, "foo").unsafeRunSync()
|
||||||
|
results should contain theSameElementsAs expected
|
||||||
|
}
|
||||||
|
"return none when there is no match" in {
|
||||||
|
insert(okZone, 1).map(_.recordSet)
|
||||||
|
val results = repo.getRecordSetsByName(okZone.id, "not-there").unsafeRunSync()
|
||||||
|
results shouldBe empty
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,13 @@
|
|||||||
|
CREATE SCHEMA IF NOT EXISTS ${dbName};
|
||||||
|
|
||||||
|
USE ${dbName};
|
||||||
|
|
||||||
|
CREATE TABLE recordset (
|
||||||
|
id CHAR(36) NOT NULL,
|
||||||
|
zone_id CHAR(36) NOT NULL,
|
||||||
|
name VARCHAR(256) NOT NULL,
|
||||||
|
type TINYINT NOT NULL,
|
||||||
|
data BLOB NOT NULL,
|
||||||
|
PRIMARY KEY (id),
|
||||||
|
INDEX zone_id_name_index (zone_id, name, type)
|
||||||
|
);
|
@ -36,7 +36,8 @@ class MySqlDataStoreProvider extends DataStoreProvider {
|
|||||||
RepositoryName.zone,
|
RepositoryName.zone,
|
||||||
RepositoryName.batchChange,
|
RepositoryName.batchChange,
|
||||||
RepositoryName.zoneChange,
|
RepositoryName.zoneChange,
|
||||||
RepositoryName.user)
|
RepositoryName.user,
|
||||||
|
RepositoryName.recordSet)
|
||||||
|
|
||||||
implicit val mySqlPropertiesReader: ConfigReader[Map[String, AnyRef]] =
|
implicit val mySqlPropertiesReader: ConfigReader[Map[String, AnyRef]] =
|
||||||
MySqlConnectionConfig.mySqlPropertiesReader
|
MySqlConnectionConfig.mySqlPropertiesReader
|
||||||
@ -66,11 +67,13 @@ class MySqlDataStoreProvider extends DataStoreProvider {
|
|||||||
val batchChanges = Some(new MySqlBatchChangeRepository())
|
val batchChanges = Some(new MySqlBatchChangeRepository())
|
||||||
val zoneChanges = Some(new MySqlZoneChangeRepository())
|
val zoneChanges = Some(new MySqlZoneChangeRepository())
|
||||||
val users = Some(new MySqlUserRepository(cryptoAlgebra))
|
val users = Some(new MySqlUserRepository(cryptoAlgebra))
|
||||||
|
val recordSets = Some(new MySqlRecordSetRepository())
|
||||||
DataStore(
|
DataStore(
|
||||||
zoneRepository = zones,
|
zoneRepository = zones,
|
||||||
batchChangeRepository = batchChanges,
|
batchChangeRepository = batchChanges,
|
||||||
zoneChangeRepository = zoneChanges,
|
zoneChangeRepository = zoneChanges,
|
||||||
userRepository = users)
|
userRepository = users,
|
||||||
|
recordSetRepository = recordSets)
|
||||||
}
|
}
|
||||||
|
|
||||||
def setupDBConnection(config: MySqlConnectionConfig): IO[Unit] = {
|
def setupDBConnection(config: MySqlConnectionConfig): IO[Unit] = {
|
||||||
|
@ -0,0 +1,238 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2018 Comcast Cable Communications Management, LLC
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package vinyldns.mysql.repository
|
||||||
|
|
||||||
|
import cats.effect._
|
||||||
|
import cats.implicits._
|
||||||
|
import scalikejdbc._
|
||||||
|
import vinyldns.core.domain.record.RecordType.RecordType
|
||||||
|
import vinyldns.core.domain.record._
|
||||||
|
import vinyldns.core.protobuf.ProtobufConversions
|
||||||
|
import vinyldns.core.route.Monitored
|
||||||
|
import vinyldns.proto.VinylDNSProto
|
||||||
|
|
||||||
|
class MySqlRecordSetRepository extends RecordSetRepository with Monitored {
|
||||||
|
import MySqlRecordSetRepository._
|
||||||
|
|
||||||
|
private val FIND_BY_ZONEID_NAME_TYPE =
|
||||||
|
sql"""
|
||||||
|
|SELECT data
|
||||||
|
| FROM recordset
|
||||||
|
| WHERE zone_id = {zoneId} AND name = {name} AND type = {type}
|
||||||
|
""".stripMargin
|
||||||
|
|
||||||
|
private val FIND_BY_ZONEID_NAME =
|
||||||
|
sql"""
|
||||||
|
|SELECT data
|
||||||
|
| FROM recordset
|
||||||
|
| WHERE zone_id = {zoneId} AND name = {name}
|
||||||
|
""".stripMargin
|
||||||
|
|
||||||
|
private val FIND_BY_ID =
|
||||||
|
sql"""
|
||||||
|
|SELECT data
|
||||||
|
| FROM recordset
|
||||||
|
| WHERE id = {id}
|
||||||
|
""".stripMargin
|
||||||
|
|
||||||
|
private val COUNT_RECORDSETS_IN_ZONE =
|
||||||
|
sql"""
|
||||||
|
|SELECT count(*)
|
||||||
|
| FROM recordset
|
||||||
|
| WHERE zone_id = {zoneId}
|
||||||
|
""".stripMargin
|
||||||
|
|
||||||
|
private val INSERT_RECORDSET =
|
||||||
|
sql"INSERT IGNORE INTO recordset(id, zone_id, name, type, data) VALUES (?, ?, ?, ?, ?)"
|
||||||
|
|
||||||
|
private val UPDATE_RECORDSET =
|
||||||
|
sql"UPDATE recordset SET zone_id = ?, name = ?, type = ?, data = ? WHERE id = ?"
|
||||||
|
|
||||||
|
private val DELETE_RECORDSET =
|
||||||
|
sql"DELETE FROM recordset WHERE id = ?"
|
||||||
|
|
||||||
|
def apply(changeSet: ChangeSet): IO[ChangeSet] =
|
||||||
|
monitor("repo.RecordSet.apply") {
|
||||||
|
val byChangeType = changeSet.changes.groupBy(_.changeType)
|
||||||
|
val inserts: Seq[Seq[Any]] = byChangeType.getOrElse(RecordSetChangeType.Create, Nil).map {
|
||||||
|
i =>
|
||||||
|
Seq[Any](
|
||||||
|
i.recordSet.id,
|
||||||
|
i.recordSet.zoneId,
|
||||||
|
i.recordSet.name,
|
||||||
|
fromRecordType(i.recordSet.typ),
|
||||||
|
toPB(i.recordSet).toByteArray
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
val updates: Seq[Seq[Any]] = byChangeType.getOrElse(RecordSetChangeType.Update, Nil).map {
|
||||||
|
u =>
|
||||||
|
Seq[Any](
|
||||||
|
u.zoneId,
|
||||||
|
u.recordSet.name,
|
||||||
|
fromRecordType(u.recordSet.typ),
|
||||||
|
toPB(u.recordSet).toByteArray,
|
||||||
|
u.recordSet.id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// deletes are just the record set id
|
||||||
|
val deletes: Seq[Seq[Any]] =
|
||||||
|
byChangeType.getOrElse(RecordSetChangeType.Delete, Nil).map(d => Seq[Any](d.recordSet.id))
|
||||||
|
|
||||||
|
IO {
|
||||||
|
DB.localTx { implicit s =>
|
||||||
|
// sql batch groups should preferably be smaller rather than larger for performance purposes
|
||||||
|
// to reduce contention on the table. 1000 worked well in performance tests
|
||||||
|
inserts.grouped(1000).foreach { group =>
|
||||||
|
INSERT_RECORDSET.batch(group: _*).apply()
|
||||||
|
}
|
||||||
|
updates.grouped(1000).foreach { group =>
|
||||||
|
UPDATE_RECORDSET.batch(group: _*).apply()
|
||||||
|
}
|
||||||
|
deletes.grouped(1000).foreach { group =>
|
||||||
|
DELETE_RECORDSET.batch(group: _*).apply()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}.as(changeSet)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TODO: There is a potential issue with the way we do this today. We load all record sets eagerly, potentially
|
||||||
|
* causing memory pressure for the app depending on the number of records in the zone.
|
||||||
|
*
|
||||||
|
* This needs to change in the future; however, for right now we just need to tune
|
||||||
|
* the JVM as we have the same issue in the DynamoDB repository. Until
|
||||||
|
* we create a better sync and load process that is better for memory, this should
|
||||||
|
* be the same as the other repo.
|
||||||
|
*/
|
||||||
|
def listRecordSets(
|
||||||
|
zoneId: String,
|
||||||
|
startFrom: Option[String],
|
||||||
|
maxItems: Option[Int],
|
||||||
|
recordNameFilter: Option[String]): IO[ListRecordSetResults] =
|
||||||
|
monitor("repo.RecordSet.listRecordSets") {
|
||||||
|
IO {
|
||||||
|
DB.readOnly { implicit s =>
|
||||||
|
// make sure we sort ascending, so we can do the correct comparison later
|
||||||
|
val opts = (startFrom.as("AND name > {startFrom}") ++
|
||||||
|
recordNameFilter.as("AND name LIKE {nameFilter}") ++
|
||||||
|
Some("ORDER BY name ASC") ++
|
||||||
|
maxItems.as("LIMIT {maxItems}")).toList.mkString(" ")
|
||||||
|
|
||||||
|
val params = (Some('zoneId -> zoneId) ++
|
||||||
|
startFrom.map(n => 'startFrom -> n) ++
|
||||||
|
recordNameFilter.map(f => 'nameFilter -> s"%$f%") ++
|
||||||
|
maxItems.map(m => 'maxItems -> m)).toSeq
|
||||||
|
|
||||||
|
val query = "SELECT data FROM recordset WHERE zone_id = {zoneId} " + opts
|
||||||
|
|
||||||
|
val results = SQL(query)
|
||||||
|
.bindByName(params: _*)
|
||||||
|
.map(toRecordSet)
|
||||||
|
.list()
|
||||||
|
.apply()
|
||||||
|
|
||||||
|
// if size of results is less than the number returned, we don't have a next id
|
||||||
|
// if maxItems is None, we don't have a next id
|
||||||
|
val nextId =
|
||||||
|
maxItems.filter(_ == results.size).flatMap(_ => results.lastOption.map(_.name))
|
||||||
|
|
||||||
|
ListRecordSetResults(
|
||||||
|
recordSets = results,
|
||||||
|
nextId = nextId,
|
||||||
|
startFrom = startFrom,
|
||||||
|
maxItems = maxItems,
|
||||||
|
recordNameFilter = recordNameFilter
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def getRecordSets(zoneId: String, name: String, typ: RecordType): IO[List[RecordSet]] =
|
||||||
|
monitor("repo.RecordSet.getRecordSets") {
|
||||||
|
IO {
|
||||||
|
DB.readOnly { implicit s =>
|
||||||
|
FIND_BY_ZONEID_NAME_TYPE
|
||||||
|
.bindByName('zoneId -> zoneId, 'name -> name, 'type -> fromRecordType(typ))
|
||||||
|
.map(toRecordSet)
|
||||||
|
.list()
|
||||||
|
.apply()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note: In MySql we do not need the zone id, since can hit the key directly
|
||||||
|
def getRecordSet(zoneId: String, recordSetId: String): IO[Option[RecordSet]] =
|
||||||
|
monitor("repo.RecordSet.getRecordSet") {
|
||||||
|
IO {
|
||||||
|
DB.readOnly { implicit s =>
|
||||||
|
FIND_BY_ID.bindByName('id -> recordSetId).map(toRecordSet).single().apply()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def getRecordSetCount(zoneId: String): IO[Int] =
|
||||||
|
monitor("repo.RecordSet.getRecordSetCount") {
|
||||||
|
IO {
|
||||||
|
DB.readOnly { implicit s =>
|
||||||
|
// this is a count query, so should always return a value. However, scalikejdbc doesn't have this,
|
||||||
|
// so we have to default to 0. it is literally impossible to not return a value
|
||||||
|
COUNT_RECORDSETS_IN_ZONE
|
||||||
|
.bindByName('zoneId -> zoneId)
|
||||||
|
.map(_.int(1))
|
||||||
|
.single
|
||||||
|
.apply()
|
||||||
|
.getOrElse(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def getRecordSetsByName(zoneId: String, name: String): IO[List[RecordSet]] =
|
||||||
|
monitor("repo.RecordSet.getRecordSetsByName") {
|
||||||
|
IO {
|
||||||
|
DB.readOnly { implicit s =>
|
||||||
|
FIND_BY_ZONEID_NAME
|
||||||
|
.bindByName('zoneId -> zoneId, 'name -> name)
|
||||||
|
.map(toRecordSet)
|
||||||
|
.list()
|
||||||
|
.apply()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
object MySqlRecordSetRepository extends ProtobufConversions {
|
||||||
|
val unknownRecordType: Int = 100
|
||||||
|
|
||||||
|
def toRecordSet(rs: WrappedResultSet): RecordSet =
|
||||||
|
fromPB(VinylDNSProto.RecordSet.parseFrom(rs.bytes(1)))
|
||||||
|
|
||||||
|
def fromRecordType(typ: RecordType): Int =
|
||||||
|
typ match {
|
||||||
|
case RecordType.A => 1
|
||||||
|
case RecordType.AAAA => 2
|
||||||
|
case RecordType.CNAME => 3
|
||||||
|
case RecordType.MX => 4
|
||||||
|
case RecordType.NS => 5
|
||||||
|
case RecordType.PTR => 6
|
||||||
|
case RecordType.SPF => 7
|
||||||
|
case RecordType.SRV => 8
|
||||||
|
case RecordType.SSHFP => 9
|
||||||
|
case RecordType.TXT => 10
|
||||||
|
case RecordType.UNKNOWN => unknownRecordType
|
||||||
|
}
|
||||||
|
}
|
@ -23,6 +23,7 @@ mysql {
|
|||||||
cachePrepStmts=true
|
cachePrepStmts=true
|
||||||
prepStmtCacheSize=250
|
prepStmtCacheSize=250
|
||||||
prepStmtCacheSqlLimit=2048
|
prepStmtCacheSqlLimit=2048
|
||||||
|
rewriteBatchedStatements=true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,8 +35,10 @@ class MySqlConnectionConfigSpec extends WordSpec with Matchers {
|
|||||||
"include all specified properties" in {
|
"include all specified properties" in {
|
||||||
val configProperties = Map[String, AnyRef](
|
val configProperties = Map[String, AnyRef](
|
||||||
"cachePrepStmts" -> java.lang.Boolean.TRUE,
|
"cachePrepStmts" -> java.lang.Boolean.TRUE,
|
||||||
|
"rewriteBatchedStatements" -> java.lang.Boolean.TRUE,
|
||||||
"prepStmtCacheSize" -> Integer.valueOf(250),
|
"prepStmtCacheSize" -> Integer.valueOf(250),
|
||||||
"prepStmtCacheSqlLimit" -> Integer.valueOf(2048))
|
"prepStmtCacheSqlLimit" -> Integer.valueOf(2048)
|
||||||
|
)
|
||||||
|
|
||||||
val settingsConfig =
|
val settingsConfig =
|
||||||
loadConfigF[IO, MySqlConnectionConfig](dataStoreSettings.settings).unsafeRunSync()
|
loadConfigF[IO, MySqlConnectionConfig](dataStoreSettings.settings).unsafeRunSync()
|
||||||
|
@ -0,0 +1,38 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2018 Comcast Cable Communications Management, LLC
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package vinyldns.mysql.repository
|
||||||
|
import org.scalatest.{Matchers, WordSpec}
|
||||||
|
import vinyldns.core.domain.record.RecordType
|
||||||
|
|
||||||
|
class MySqlRecordSetRepositorySpec extends WordSpec with Matchers {
|
||||||
|
import MySqlRecordSetRepository._
|
||||||
|
"fromRecordType" should {
|
||||||
|
"support all record types" in {
|
||||||
|
fromRecordType(RecordType.A) shouldBe 1
|
||||||
|
fromRecordType(RecordType.AAAA) shouldBe 2
|
||||||
|
fromRecordType(RecordType.CNAME) shouldBe 3
|
||||||
|
fromRecordType(RecordType.MX) shouldBe 4
|
||||||
|
fromRecordType(RecordType.NS) shouldBe 5
|
||||||
|
fromRecordType(RecordType.PTR) shouldBe 6
|
||||||
|
fromRecordType(RecordType.SPF) shouldBe 7
|
||||||
|
fromRecordType(RecordType.SRV) shouldBe 8
|
||||||
|
fromRecordType(RecordType.SSHFP) shouldBe 9
|
||||||
|
fromRecordType(RecordType.TXT) shouldBe 10
|
||||||
|
fromRecordType(RecordType.UNKNOWN) shouldBe unknownRecordType
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -70,7 +70,7 @@ object Dependencies {
|
|||||||
|
|
||||||
lazy val mysqlDependencies = Seq(
|
lazy val mysqlDependencies = Seq(
|
||||||
"org.flywaydb" % "flyway-core" % "5.1.4",
|
"org.flywaydb" % "flyway-core" % "5.1.4",
|
||||||
"org.mariadb.jdbc" % "mariadb-java-client" % "2.2.6",
|
"org.mariadb.jdbc" % "mariadb-java-client" % "2.3.0",
|
||||||
"org.scalikejdbc" %% "scalikejdbc" % scalikejdbcV,
|
"org.scalikejdbc" %% "scalikejdbc" % scalikejdbcV,
|
||||||
"org.scalikejdbc" %% "scalikejdbc-config" % scalikejdbcV,
|
"org.scalikejdbc" %% "scalikejdbc-config" % scalikejdbcV,
|
||||||
"com.zaxxer" % "HikariCP" % "3.2.0"
|
"com.zaxxer" % "HikariCP" % "3.2.0"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user