redis_store.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package redis2
  2. import (
  3. "crypto/tls"
  4. "crypto/x509"
  5. "net"
  6. "os"
  7. "github.com/redis/go-redis/v9"
  8. "github.com/seaweedfs/seaweedfs/weed/filer"
  9. "github.com/seaweedfs/seaweedfs/weed/glog"
  10. "github.com/seaweedfs/seaweedfs/weed/util"
  11. )
  12. func init() {
  13. filer.Stores = append(filer.Stores, &Redis2Store{})
  14. }
  15. type Redis2Store struct {
  16. UniversalRedis2Store
  17. }
  18. func (store *Redis2Store) GetName() string {
  19. return "redis2"
  20. }
  21. func (store *Redis2Store) Initialize(configuration util.Configuration, prefix string) (err error) {
  22. return store.initialize(
  23. configuration.GetString(prefix+"address"),
  24. configuration.GetString(prefix+"password"),
  25. configuration.GetInt(prefix+"database"),
  26. configuration.GetStringSlice(prefix+"superLargeDirectories"),
  27. configuration.GetBool(prefix+"enable_mtls"),
  28. configuration.GetString(prefix+"ca_cert_path"),
  29. configuration.GetString(prefix+"client_cert_path"),
  30. configuration.GetString(prefix+"client_key_path"),
  31. )
  32. }
  33. func (store *Redis2Store) initialize(hostPort string, password string, database int, superLargeDirectories []string, enableMtls bool, caCertPath string, clientCertPath string, clientKeyPath string) (err error) {
  34. if enableMtls {
  35. clientCert, err := tls.LoadX509KeyPair(clientCertPath, clientKeyPath)
  36. if err != nil {
  37. glog.Fatalf("Error loading client certificate and key pair: %v", err)
  38. }
  39. caCertBytes, err := os.ReadFile(caCertPath)
  40. if err != nil {
  41. glog.Fatalf("Error reading CA certificate file: %v", err)
  42. }
  43. caCertPool := x509.NewCertPool()
  44. if ok := caCertPool.AppendCertsFromPEM(caCertBytes); !ok {
  45. glog.Fatalf("Error appending CA certificate to pool")
  46. }
  47. redisHost, _, err := net.SplitHostPort(hostPort)
  48. if err != nil {
  49. glog.Fatalf("Error parsing redis host and port from %s: %v", hostPort, err)
  50. }
  51. tlsConfig := &tls.Config{
  52. Certificates: []tls.Certificate{clientCert},
  53. RootCAs: caCertPool,
  54. ServerName: redisHost,
  55. MinVersion: tls.VersionTLS12,
  56. }
  57. store.Client = redis.NewClient(&redis.Options{
  58. Addr: hostPort,
  59. Password: password,
  60. DB: database,
  61. TLSConfig: tlsConfig,
  62. })
  63. } else {
  64. store.Client = redis.NewClient(&redis.Options{
  65. Addr: hostPort,
  66. Password: password,
  67. DB: database,
  68. })
  69. }
  70. store.loadSuperLargeDirectories(superLargeDirectories)
  71. return
  72. }