mongodb_store.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. package mongodb
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "fmt"
  7. "os"
  8. "regexp"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/filer"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/util"
  14. "go.mongodb.org/mongo-driver/bson"
  15. "go.mongodb.org/mongo-driver/mongo"
  16. "go.mongodb.org/mongo-driver/mongo/options"
  17. )
  18. func init() {
  19. filer.Stores = append(filer.Stores, &MongodbStore{})
  20. }
  21. type MongodbStore struct {
  22. connect *mongo.Client
  23. database string
  24. collectionName string
  25. }
  26. type Model struct {
  27. Directory string `bson:"directory"`
  28. Name string `bson:"name"`
  29. Meta []byte `bson:"meta"`
  30. }
  31. func (store *MongodbStore) GetName() string {
  32. return "mongodb"
  33. }
  34. func (store *MongodbStore) Initialize(configuration util.Configuration, prefix string) (err error) {
  35. store.database = configuration.GetString(prefix + "database")
  36. store.collectionName = "filemeta"
  37. poolSize := configuration.GetInt(prefix + "option_pool_size")
  38. uri := configuration.GetString(prefix + "uri")
  39. ssl := configuration.GetBool(prefix + "ssl")
  40. sslCAFile := configuration.GetString(prefix + "ssl_ca_file")
  41. sslCertFile := configuration.GetString(prefix + "ssl_cert_file")
  42. sslKeyFile := configuration.GetString(prefix + "ssl_key_file")
  43. username := configuration.GetString(prefix + "username")
  44. password := configuration.GetString(prefix + "password")
  45. insecure_skip_verify := configuration.GetBool(prefix + "insecure_skip_verify")
  46. return store.connection(uri, uint64(poolSize), ssl, sslCAFile, sslCertFile, sslKeyFile, username, password, insecure_skip_verify)
  47. }
  48. func (store *MongodbStore) connection(uri string, poolSize uint64, ssl bool, sslCAFile, sslCertFile, sslKeyFile string, username, password string, insecure bool) (err error) {
  49. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  50. defer cancel()
  51. opts := options.Client().ApplyURI(uri)
  52. if poolSize > 0 {
  53. opts.SetMaxPoolSize(poolSize)
  54. }
  55. if ssl {
  56. tlsConfig, err := configureTLS(sslCAFile, sslCertFile, sslKeyFile, insecure)
  57. if err != nil {
  58. return err
  59. }
  60. opts.SetTLSConfig(tlsConfig)
  61. }
  62. if username != "" && password != "" {
  63. creds := options.Credential{
  64. Username: username,
  65. Password: password,
  66. }
  67. opts.SetAuth(creds)
  68. }
  69. client, err := mongo.Connect(ctx, opts)
  70. if err != nil {
  71. return err
  72. }
  73. c := client.Database(store.database).Collection(store.collectionName)
  74. err = store.indexUnique(c)
  75. store.connect = client
  76. return err
  77. }
  78. func configureTLS(caFile, certFile, keyFile string, insecure bool) (*tls.Config, error) {
  79. cert, err := tls.LoadX509KeyPair(certFile, keyFile)
  80. if err != nil {
  81. return nil, fmt.Errorf("could not load client key pair: %s", err)
  82. }
  83. caCert, err := os.ReadFile(caFile)
  84. if err != nil {
  85. return nil, fmt.Errorf("could not read CA certificate: %s", err)
  86. }
  87. caCertPool := x509.NewCertPool()
  88. if !caCertPool.AppendCertsFromPEM(caCert) {
  89. return nil, fmt.Errorf("failed to append CA certificate")
  90. }
  91. tlsConfig := &tls.Config{
  92. Certificates: []tls.Certificate{cert},
  93. RootCAs: caCertPool,
  94. InsecureSkipVerify: insecure,
  95. }
  96. return tlsConfig, nil
  97. }
  98. func (store *MongodbStore) createIndex(c *mongo.Collection, index mongo.IndexModel, opts *options.CreateIndexesOptions) error {
  99. _, err := c.Indexes().CreateOne(context.Background(), index, opts)
  100. return err
  101. }
  102. func (store *MongodbStore) indexUnique(c *mongo.Collection) error {
  103. opts := options.CreateIndexes().SetMaxTime(10 * time.Second)
  104. unique := new(bool)
  105. *unique = true
  106. index := mongo.IndexModel{
  107. Keys: bson.D{{Key: "directory", Value: int32(1)}, {Key: "name", Value: int32(1)}},
  108. Options: &options.IndexOptions{
  109. Unique: unique,
  110. },
  111. }
  112. return store.createIndex(c, index, opts)
  113. }
  114. func (store *MongodbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  115. return ctx, nil
  116. }
  117. func (store *MongodbStore) CommitTransaction(ctx context.Context) error {
  118. return nil
  119. }
  120. func (store *MongodbStore) RollbackTransaction(ctx context.Context) error {
  121. return nil
  122. }
  123. func (store *MongodbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  124. return store.UpdateEntry(ctx, entry)
  125. }
  126. func (store *MongodbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  127. dir, name := entry.FullPath.DirAndName()
  128. meta, err := entry.EncodeAttributesAndChunks()
  129. if err != nil {
  130. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  131. }
  132. if len(entry.GetChunks()) > filer.CountEntryChunksForGzip {
  133. meta = util.MaybeGzipData(meta)
  134. }
  135. c := store.connect.Database(store.database).Collection(store.collectionName)
  136. opts := options.Update().SetUpsert(true)
  137. filter := bson.D{{"directory", dir}, {"name", name}}
  138. update := bson.D{{"$set", bson.D{{"meta", meta}}}}
  139. _, err = c.UpdateOne(ctx, filter, update, opts)
  140. if err != nil {
  141. return fmt.Errorf("UpdateEntry %s: %v", entry.FullPath, err)
  142. }
  143. return nil
  144. }
  145. func (store *MongodbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
  146. dir, name := fullpath.DirAndName()
  147. var data Model
  148. var where = bson.M{"directory": dir, "name": name}
  149. err = store.connect.Database(store.database).Collection(store.collectionName).FindOne(ctx, where).Decode(&data)
  150. if err != mongo.ErrNoDocuments && err != nil {
  151. glog.ErrorfCtx(ctx, "find %s: %v", fullpath, err)
  152. return nil, filer_pb.ErrNotFound
  153. }
  154. if len(data.Meta) == 0 {
  155. return nil, filer_pb.ErrNotFound
  156. }
  157. entry = &filer.Entry{
  158. FullPath: fullpath,
  159. }
  160. err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data.Meta))
  161. if err != nil {
  162. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  163. }
  164. return entry, nil
  165. }
  166. func (store *MongodbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
  167. dir, name := fullpath.DirAndName()
  168. where := bson.M{"directory": dir, "name": name}
  169. _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteMany(ctx, where)
  170. if err != nil {
  171. return fmt.Errorf("delete %s : %v", fullpath, err)
  172. }
  173. return nil
  174. }
  175. func (store *MongodbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
  176. where := bson.M{"directory": fullpath}
  177. _, err := store.connect.Database(store.database).Collection(store.collectionName).DeleteMany(ctx, where)
  178. if err != nil {
  179. return fmt.Errorf("delete %s : %v", fullpath, err)
  180. }
  181. return nil
  182. }
  183. func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  184. where := bson.M{
  185. "directory": string(dirPath),
  186. }
  187. nameQuery := bson.M{}
  188. if len(prefix) > 0 {
  189. nameQuery["$regex"] = "^" + regexp.QuoteMeta(prefix)
  190. }
  191. if len(startFileName) > 0 {
  192. if includeStartFile {
  193. nameQuery["$gte"] = startFileName
  194. } else {
  195. nameQuery["$gt"] = startFileName
  196. }
  197. }
  198. if len(nameQuery) > 0 {
  199. where["name"] = nameQuery
  200. }
  201. optLimit := int64(limit)
  202. opts := &options.FindOptions{Limit: &optLimit, Sort: bson.M{"name": 1}}
  203. cur, err := store.connect.Database(store.database).Collection(store.collectionName).Find(ctx, where, opts)
  204. if err != nil {
  205. return lastFileName, fmt.Errorf("failed to list directory entries: find error: %w", err)
  206. }
  207. for cur.Next(ctx) {
  208. var data Model
  209. err = cur.Decode(&data)
  210. if err != nil {
  211. break
  212. }
  213. entry := &filer.Entry{
  214. FullPath: util.NewFullPath(string(dirPath), data.Name),
  215. }
  216. lastFileName = data.Name
  217. if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data.Meta)); decodeErr != nil {
  218. err = decodeErr
  219. glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
  220. break
  221. }
  222. if !eachEntryFunc(entry) {
  223. break
  224. }
  225. }
  226. if err := cur.Close(ctx); err != nil {
  227. glog.V(0).InfofCtx(ctx, "list iterator close: %v", err)
  228. }
  229. return lastFileName, err
  230. }
  231. func (store *MongodbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  232. return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
  233. }
  234. func (store *MongodbStore) Shutdown() {
  235. ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
  236. store.connect.Disconnect(ctx)
  237. }