mysql_store.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package mysql
  2. import (
  3. "crypto/tls"
  4. "crypto/x509"
  5. "database/sql"
  6. "fmt"
  7. "github.com/go-sql-driver/mysql"
  8. "os"
  9. "strings"
  10. "time"
  11. "github.com/seaweedfs/seaweedfs/weed/filer"
  12. _ "github.com/go-sql-driver/mysql"
  13. "github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql"
  14. "github.com/seaweedfs/seaweedfs/weed/util"
  15. )
  16. const (
  17. CONNECTION_URL_PATTERN = "%s:%s@tcp(%s:%d)/%s?collation=utf8mb4_bin"
  18. CONNECTION_TLS_URL_PATTERN = "%s:%s@tcp(%s:%d)/%s?collation=utf8mb4_bin&tls=mysql-tls"
  19. )
  20. func init() {
  21. filer.Stores = append(filer.Stores, &MysqlStore{})
  22. }
  23. type MysqlStore struct {
  24. abstract_sql.AbstractSqlStore
  25. }
  26. func (store *MysqlStore) GetName() string {
  27. return "mysql"
  28. }
  29. func (store *MysqlStore) Initialize(configuration util.Configuration, prefix string) (err error) {
  30. return store.initialize(
  31. configuration.GetString(prefix+"dsn"),
  32. configuration.GetString(prefix+"upsertQuery"),
  33. configuration.GetBool(prefix+"enableUpsert"),
  34. configuration.GetString(prefix+"username"),
  35. configuration.GetString(prefix+"password"),
  36. configuration.GetString(prefix+"hostname"),
  37. configuration.GetInt(prefix+"port"),
  38. configuration.GetString(prefix+"database"),
  39. configuration.GetInt(prefix+"connection_max_idle"),
  40. configuration.GetInt(prefix+"connection_max_open"),
  41. configuration.GetInt(prefix+"connection_max_lifetime_seconds"),
  42. configuration.GetBool(prefix+"interpolateParams"),
  43. configuration.GetBool(prefix+"enable_tls"),
  44. configuration.GetString(prefix+"ca_crt"),
  45. configuration.GetString(prefix+"client_crt"),
  46. configuration.GetString(prefix+"client_key"),
  47. )
  48. }
  49. func (store *MysqlStore) initialize(dsn string, upsertQuery string, enableUpsert bool, user, password, hostname string, port int, database string, maxIdle, maxOpen,
  50. maxLifetimeSeconds int, interpolateParams bool, enableTls bool, caCrtDir string, clientCrtDir string, clientKeyDir string) (err error) {
  51. store.SupportBucketTable = false
  52. if !enableUpsert {
  53. upsertQuery = ""
  54. }
  55. store.SqlGenerator = &SqlGenMysql{
  56. CreateTableSqlTemplate: "",
  57. DropTableSqlTemplate: "DROP TABLE `%s`",
  58. UpsertQueryTemplate: upsertQuery,
  59. }
  60. if enableTls {
  61. rootCertPool := x509.NewCertPool()
  62. pem, err := os.ReadFile(caCrtDir)
  63. if err != nil {
  64. return err
  65. }
  66. if ok := rootCertPool.AppendCertsFromPEM(pem); !ok {
  67. return fmt.Errorf("failed to append root certificate")
  68. }
  69. clientCert := make([]tls.Certificate, 0)
  70. if cert, err := tls.LoadX509KeyPair(clientCrtDir, clientKeyDir); err == nil {
  71. clientCert = append(clientCert, cert)
  72. }
  73. tlsConfig := &tls.Config{
  74. RootCAs: rootCertPool,
  75. Certificates: clientCert,
  76. MinVersion: tls.VersionTLS12,
  77. }
  78. err = mysql.RegisterTLSConfig("mysql-tls", tlsConfig)
  79. if err != nil {
  80. return err
  81. }
  82. }
  83. if dsn == "" {
  84. pattern := CONNECTION_URL_PATTERN
  85. if enableTls {
  86. pattern = CONNECTION_TLS_URL_PATTERN
  87. }
  88. dsn = fmt.Sprintf(pattern, user, password, hostname, port, database)
  89. if interpolateParams {
  90. dsn += "&interpolateParams=true"
  91. }
  92. }
  93. cfg, err := mysql.ParseDSN(dsn)
  94. if err != nil {
  95. return fmt.Errorf("can not parse DSN error:%w", err)
  96. }
  97. var dbErr error
  98. store.DB, dbErr = sql.Open("mysql", dsn)
  99. if dbErr != nil {
  100. store.DB.Close()
  101. store.DB = nil
  102. return fmt.Errorf("can not connect to %s error:%v", strings.ReplaceAll(dsn, cfg.Passwd, "<ADAPTED>"), err)
  103. }
  104. store.DB.SetMaxIdleConns(maxIdle)
  105. store.DB.SetMaxOpenConns(maxOpen)
  106. store.DB.SetConnMaxLifetime(time.Duration(maxLifetimeSeconds) * time.Second)
  107. if err = store.DB.Ping(); err != nil {
  108. return fmt.Errorf("connect to %s error:%v", strings.ReplaceAll(dsn, cfg.Passwd, "<ADAPTED>"), err)
  109. }
  110. return nil
  111. }