ydb_store.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. //go:build ydb
  2. // +build ydb
  3. package ydb
  4. import (
  5. "context"
  6. "fmt"
  7. "os"
  8. "path"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/ydb-platform/ydb-go-sdk/v3/query"
  13. "github.com/ydb-platform/ydb-go-sdk/v3/table/options"
  14. "github.com/seaweedfs/seaweedfs/weed/filer"
  15. "github.com/seaweedfs/seaweedfs/weed/filer/abstract_sql"
  16. "github.com/seaweedfs/seaweedfs/weed/glog"
  17. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  18. "github.com/seaweedfs/seaweedfs/weed/util"
  19. environ "github.com/ydb-platform/ydb-go-sdk-auth-environ"
  20. "github.com/ydb-platform/ydb-go-sdk/v3"
  21. "github.com/ydb-platform/ydb-go-sdk/v3/table"
  22. "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
  23. )
  24. const (
  25. defaultDialTimeOut = 10
  26. defaultPartitionBySizeEnabled = true
  27. defaultPartitionSizeMb = 200
  28. defaultPartitionByLoadEnabled = true
  29. defaultMinPartitionsCount = 5
  30. defaultMaxPartitionsCount = 1000
  31. defaultMaxListChunk = 2000
  32. )
  33. var (
  34. roQC = query.WithTxControl(query.OnlineReadOnlyTxControl())
  35. rwQC = query.WithTxControl(query.DefaultTxControl())
  36. )
  37. type YdbStore struct {
  38. DB *ydb.Driver
  39. dirBuckets string
  40. tablePathPrefix string
  41. SupportBucketTable bool
  42. partitionBySizeEnabled options.FeatureFlag
  43. partitionSizeMb uint64
  44. partitionByLoadEnabled options.FeatureFlag
  45. minPartitionsCount uint64
  46. maxPartitionsCount uint64
  47. maxListChunk int
  48. dbs map[string]bool
  49. dbsLock sync.Mutex
  50. }
  51. func init() {
  52. filer.Stores = append(filer.Stores, &YdbStore{})
  53. }
  54. func (store *YdbStore) GetName() string {
  55. return "ydb"
  56. }
  57. func (store *YdbStore) Initialize(configuration util.Configuration, prefix string) (err error) {
  58. configuration.SetDefault(prefix+"partitionBySizeEnabled", defaultPartitionBySizeEnabled)
  59. configuration.SetDefault(prefix+"partitionSizeMb", defaultPartitionSizeMb)
  60. configuration.SetDefault(prefix+"partitionByLoadEnabled", defaultPartitionByLoadEnabled)
  61. configuration.SetDefault(prefix+"minPartitionsCount", defaultMinPartitionsCount)
  62. configuration.SetDefault(prefix+"maxPartitionsCount", defaultMaxPartitionsCount)
  63. configuration.SetDefault(prefix+"maxListChunk", defaultMaxListChunk)
  64. return store.initialize(
  65. configuration.GetString("filer.options.buckets_folder"),
  66. configuration.GetString(prefix+"dsn"),
  67. configuration.GetString(prefix+"prefix"),
  68. configuration.GetBool(prefix+"useBucketPrefix"),
  69. configuration.GetInt(prefix+"dialTimeOut"),
  70. configuration.GetInt(prefix+"poolSizeLimit"),
  71. configuration.GetBool(prefix+"partitionBySizeEnabled"),
  72. uint64(configuration.GetInt(prefix+"partitionSizeMb")),
  73. configuration.GetBool(prefix+"partitionByLoadEnabled"),
  74. uint64(configuration.GetInt(prefix+"minPartitionsCount")),
  75. uint64(configuration.GetInt(prefix+"maxPartitionsCount")),
  76. configuration.GetInt(prefix+"maxListChunk"),
  77. )
  78. }
  79. func (store *YdbStore) initialize(dirBuckets string, dsn string, tablePathPrefix string, useBucketPrefix bool, dialTimeOut int, poolSizeLimit int, partitionBySizeEnabled bool, partitionSizeMb uint64, partitionByLoadEnabled bool, minPartitionsCount uint64, maxPartitionsCount uint64, maxListChunk int) (err error) {
  80. store.dirBuckets = dirBuckets
  81. store.SupportBucketTable = useBucketPrefix
  82. if partitionBySizeEnabled {
  83. store.partitionBySizeEnabled = options.FeatureEnabled
  84. } else {
  85. store.partitionBySizeEnabled = options.FeatureDisabled
  86. }
  87. if partitionByLoadEnabled {
  88. store.partitionByLoadEnabled = options.FeatureEnabled
  89. } else {
  90. store.partitionByLoadEnabled = options.FeatureDisabled
  91. }
  92. store.partitionSizeMb = partitionSizeMb
  93. store.minPartitionsCount = minPartitionsCount
  94. store.maxPartitionsCount = maxPartitionsCount
  95. store.maxListChunk = maxListChunk
  96. if store.SupportBucketTable {
  97. glog.V(0).Infof("enabled BucketPrefix")
  98. }
  99. store.dbs = make(map[string]bool)
  100. ctx := context.Background()
  101. if dialTimeOut == 0 {
  102. dialTimeOut = defaultDialTimeOut
  103. }
  104. opts := []ydb.Option{
  105. ydb.WithDialTimeout(time.Duration(dialTimeOut) * time.Second),
  106. environ.WithEnvironCredentials(),
  107. }
  108. if poolSizeLimit > 0 {
  109. opts = append(opts, ydb.WithSessionPoolSizeLimit(poolSizeLimit))
  110. }
  111. if dsn == "" {
  112. dsn = os.Getenv("YDB_CONNECTION_STRING")
  113. }
  114. store.DB, err = ydb.Open(ctx, dsn, opts...)
  115. if err != nil {
  116. return fmt.Errorf("can not connect to %s: %w", dsn, err)
  117. }
  118. store.tablePathPrefix = path.Join(store.DB.Name(), tablePathPrefix)
  119. if err := store.ensureTables(ctx); err != nil {
  120. return err
  121. }
  122. return err
  123. }
  124. func (store *YdbStore) doTxOrDB(ctx context.Context, q *string, params *table.QueryParameters, ts query.ExecuteOption, processResultFunc func(res query.Result) error) (err error) {
  125. var res query.Result
  126. if tx, ok := ctx.Value("tx").(query.Transaction); ok {
  127. res, err = tx.Query(ctx, *q, query.WithParameters(params))
  128. if err != nil {
  129. return fmt.Errorf("execute transaction: %w", err)
  130. }
  131. } else {
  132. err = store.DB.Query().Do(ctx, func(ctx context.Context, s query.Session) (err error) {
  133. res, err = s.Query(ctx, *q, query.WithParameters(params), ts)
  134. if err != nil {
  135. return fmt.Errorf("execute statement: %w", err)
  136. }
  137. return nil
  138. }, query.WithIdempotent())
  139. }
  140. if err != nil {
  141. return err
  142. }
  143. if res != nil {
  144. defer func() { _ = res.Close(ctx) }()
  145. if processResultFunc != nil {
  146. if err = processResultFunc(res); err != nil {
  147. return fmt.Errorf("process result: %w", err)
  148. }
  149. }
  150. }
  151. return err
  152. }
  153. func (store *YdbStore) insertOrUpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  154. dir, name := entry.FullPath.DirAndName()
  155. meta, err := entry.EncodeAttributesAndChunks()
  156. if err != nil {
  157. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  158. }
  159. if len(entry.GetChunks()) > filer.CountEntryChunksForGzip {
  160. meta = util.MaybeGzipData(meta)
  161. }
  162. tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
  163. fileMeta := FileMeta{util.HashStringToLong(dir), name, *shortDir, meta}
  164. return store.doTxOrDB(ctx, withPragma(tablePathPrefix, upsertQuery), fileMeta.queryParameters(entry.TtlSec), rwQC, nil)
  165. }
  166. func (store *YdbStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  167. return store.insertOrUpdateEntry(ctx, entry)
  168. }
  169. func (store *YdbStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  170. return store.insertOrUpdateEntry(ctx, entry)
  171. }
  172. func (store *YdbStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
  173. dir, name := fullpath.DirAndName()
  174. var data []byte
  175. entryFound := false
  176. tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
  177. q := withPragma(tablePathPrefix, findQuery)
  178. queryParams := table.NewQueryParameters(
  179. table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))),
  180. table.ValueParam("$directory", types.UTF8Value(*shortDir)),
  181. table.ValueParam("$name", types.UTF8Value(name)))
  182. err = store.doTxOrDB(ctx, q, queryParams, roQC, func(res query.Result) error {
  183. for rs, err := range res.ResultSets(ctx) {
  184. if err != nil {
  185. return err
  186. }
  187. for row, err := range rs.Rows(ctx) {
  188. if err != nil {
  189. return err
  190. }
  191. if scanErr := row.Scan(&data); scanErr != nil {
  192. return fmt.Errorf("scan %s: %v", fullpath, scanErr)
  193. }
  194. entryFound = true
  195. return nil
  196. }
  197. }
  198. return nil
  199. })
  200. if err != nil {
  201. return nil, err
  202. }
  203. if !entryFound {
  204. return nil, filer_pb.ErrNotFound
  205. }
  206. entry = &filer.Entry{FullPath: fullpath}
  207. if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil {
  208. return nil, fmt.Errorf("decode %s: %v", fullpath, decodeErr)
  209. }
  210. return entry, nil
  211. }
  212. func (store *YdbStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
  213. dir, name := fullpath.DirAndName()
  214. tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
  215. q := withPragma(tablePathPrefix, deleteQuery)
  216. glog.V(4).InfofCtx(ctx, "DeleteEntry %s, tablePathPrefix %s, shortDir %s", fullpath, *tablePathPrefix, *shortDir)
  217. queryParams := table.NewQueryParameters(
  218. table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))),
  219. table.ValueParam("$directory", types.UTF8Value(*shortDir)),
  220. table.ValueParam("$name", types.UTF8Value(name)))
  221. return store.doTxOrDB(ctx, q, queryParams, rwQC, nil)
  222. }
  223. func (store *YdbStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
  224. dir := string(fullpath)
  225. tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
  226. q := withPragma(tablePathPrefix, deleteFolderChildrenQuery)
  227. queryParams := table.NewQueryParameters(
  228. table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))),
  229. table.ValueParam("$directory", types.UTF8Value(*shortDir)))
  230. return store.doTxOrDB(ctx, q, queryParams, rwQC, nil)
  231. }
  232. func (store *YdbStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  233. return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
  234. }
  235. func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  236. dir := string(dirPath)
  237. tablePathPrefix, shortDir := store.getPrefix(ctx, &dir)
  238. baseInclusive := withPragma(tablePathPrefix, listInclusiveDirectoryQuery)
  239. baseExclusive := withPragma(tablePathPrefix, listDirectoryQuery)
  240. var entryCount int64
  241. var prevFetchedLessThanChunk bool
  242. for entryCount < limit {
  243. if prevFetchedLessThanChunk {
  244. break
  245. }
  246. var q *string
  247. if entryCount == 0 && includeStartFile {
  248. q = baseInclusive
  249. } else {
  250. q = baseExclusive
  251. }
  252. rest := limit - entryCount
  253. chunkLimit := rest
  254. if chunkLimit > int64(store.maxListChunk) {
  255. chunkLimit = int64(store.maxListChunk)
  256. }
  257. var rowCount int64
  258. params := table.NewQueryParameters(
  259. table.ValueParam("$dir_hash", types.Int64Value(util.HashStringToLong(*shortDir))),
  260. table.ValueParam("$directory", types.UTF8Value(*shortDir)),
  261. table.ValueParam("$start_name", types.UTF8Value(startFileName)),
  262. table.ValueParam("$prefix", types.UTF8Value(prefix+"%")),
  263. table.ValueParam("$limit", types.Uint64Value(uint64(chunkLimit))),
  264. )
  265. err := store.doTxOrDB(ctx, q, params, roQC, func(res query.Result) error {
  266. for rs, err := range res.ResultSets(ctx) {
  267. if err != nil {
  268. return err
  269. }
  270. for row, err := range rs.Rows(ctx) {
  271. if err != nil {
  272. return err
  273. }
  274. var name string
  275. var data []byte
  276. if scanErr := row.Scan(&name, &data); scanErr != nil {
  277. return fmt.Errorf("scan %s: %w", dir, scanErr)
  278. }
  279. lastFileName = name
  280. entry := &filer.Entry{FullPath: util.NewFullPath(dir, name)}
  281. if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil {
  282. return fmt.Errorf("decode entry %s: %w", entry.FullPath, decodeErr)
  283. }
  284. if !eachEntryFunc(entry) {
  285. return nil
  286. }
  287. rowCount++
  288. entryCount++
  289. startFileName = lastFileName
  290. if entryCount >= limit {
  291. return nil
  292. }
  293. }
  294. }
  295. return nil
  296. })
  297. if err != nil {
  298. return lastFileName, err
  299. }
  300. if rowCount < chunkLimit {
  301. prevFetchedLessThanChunk = true
  302. }
  303. }
  304. return lastFileName, nil
  305. }
  306. func (store *YdbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  307. session, err := store.DB.Table().CreateSession(ctx)
  308. if err != nil {
  309. return ctx, err
  310. }
  311. tx, err := session.BeginTransaction(ctx, table.TxSettings(table.WithSerializableReadWrite()))
  312. if err != nil {
  313. return ctx, err
  314. }
  315. return context.WithValue(ctx, "tx", tx), nil
  316. }
  317. func (store *YdbStore) CommitTransaction(ctx context.Context) error {
  318. if tx, ok := ctx.Value("tx").(table.Transaction); ok {
  319. _, err := tx.CommitTx(ctx)
  320. return err
  321. }
  322. return nil
  323. }
  324. func (store *YdbStore) RollbackTransaction(ctx context.Context) error {
  325. if tx, ok := ctx.Value("tx").(table.Transaction); ok {
  326. return tx.Rollback(ctx)
  327. }
  328. return nil
  329. }
  330. func (store *YdbStore) Shutdown() {
  331. _ = store.DB.Close(context.Background())
  332. }
  333. var _ filer.BucketAware = (*YdbStore)(nil)
  334. func (store *YdbStore) CanDropWholeBucket() bool {
  335. return store.SupportBucketTable
  336. }
  337. func (store *YdbStore) OnBucketCreation(bucket string) {
  338. if !store.SupportBucketTable {
  339. return
  340. }
  341. prefix := path.Join(store.tablePathPrefix, bucket)
  342. store.dbsLock.Lock()
  343. defer store.dbsLock.Unlock()
  344. if err := store.createTable(context.Background(), prefix); err != nil {
  345. glog.Errorf("createTable %s: %v", prefix, err)
  346. }
  347. if store.dbs == nil {
  348. return
  349. }
  350. store.dbs[bucket] = true
  351. }
  352. func (store *YdbStore) OnBucketDeletion(bucket string) {
  353. if !store.SupportBucketTable {
  354. return
  355. }
  356. store.dbsLock.Lock()
  357. defer store.dbsLock.Unlock()
  358. prefix := path.Join(store.tablePathPrefix, bucket)
  359. glog.V(4).Infof("deleting table %s", prefix)
  360. if err := store.deleteTable(context.Background(), prefix); err != nil {
  361. glog.Errorf("deleteTable %s: %v", prefix, err)
  362. }
  363. if err := store.DB.Scheme().RemoveDirectory(context.Background(), prefix); err != nil {
  364. glog.Errorf("remove directory %s: %v", prefix, err)
  365. }
  366. if store.dbs == nil {
  367. return
  368. }
  369. delete(store.dbs, bucket)
  370. }
  371. func (store *YdbStore) createTable(ctx context.Context, prefix string) error {
  372. return store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
  373. return s.CreateTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE), store.createTableOptions()...)
  374. })
  375. }
  376. func (store *YdbStore) deleteTable(ctx context.Context, prefix string) error {
  377. if !store.SupportBucketTable {
  378. return nil
  379. }
  380. if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
  381. return s.DropTable(ctx, path.Join(prefix, abstract_sql.DEFAULT_TABLE))
  382. }); err != nil {
  383. return err
  384. }
  385. glog.V(4).InfofCtx(ctx, "deleted table %s", prefix)
  386. return nil
  387. }
  388. func (store *YdbStore) getPrefix(ctx context.Context, dir *string) (tablePathPrefix *string, shortDir *string) {
  389. tablePathPrefix = &store.tablePathPrefix
  390. shortDir = dir
  391. if !store.SupportBucketTable {
  392. return
  393. }
  394. prefixBuckets := store.dirBuckets + "/"
  395. glog.V(4).InfofCtx(ctx, "dir: %s, prefixBuckets: %s", *dir, prefixBuckets)
  396. if strings.HasPrefix(*dir, prefixBuckets) {
  397. // detect bucket
  398. bucketAndDir := (*dir)[len(prefixBuckets):]
  399. glog.V(4).InfofCtx(ctx, "bucketAndDir: %s", bucketAndDir)
  400. var bucket string
  401. if t := strings.Index(bucketAndDir, "/"); t > 0 {
  402. bucket = bucketAndDir[:t]
  403. } else if t < 0 {
  404. bucket = bucketAndDir
  405. }
  406. if bucket == "" {
  407. return
  408. }
  409. store.dbsLock.Lock()
  410. defer store.dbsLock.Unlock()
  411. if _, found := store.dbs[bucket]; !found {
  412. glog.V(4).InfofCtx(ctx, "bucket %q not in cache, verifying existence via DescribeTable", bucket)
  413. tablePath := path.Join(store.tablePathPrefix, bucket, abstract_sql.DEFAULT_TABLE)
  414. err2 := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
  415. _, err3 := s.DescribeTable(ctx, tablePath)
  416. return err3
  417. })
  418. if err2 != nil {
  419. glog.V(4).InfofCtx(ctx, "bucket %q not found (DescribeTable %s failed)", bucket, tablePath)
  420. return
  421. }
  422. glog.V(4).InfofCtx(ctx, "bucket %q exists, adding to cache", bucket)
  423. store.dbs[bucket] = true
  424. }
  425. bucketPrefix := path.Join(store.tablePathPrefix, bucket)
  426. tablePathPrefix = &bucketPrefix
  427. }
  428. return
  429. }
  430. func (store *YdbStore) ensureTables(ctx context.Context) error {
  431. prefixFull := store.tablePathPrefix
  432. glog.V(4).InfofCtx(ctx, "creating base table %s", prefixFull)
  433. baseTable := path.Join(prefixFull, abstract_sql.DEFAULT_TABLE)
  434. if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
  435. return s.CreateTable(ctx, baseTable, store.createTableOptions()...)
  436. }); err != nil {
  437. return fmt.Errorf("failed to create base table %s: %v", baseTable, err)
  438. }
  439. glog.V(4).InfofCtx(ctx, "creating bucket tables")
  440. if store.SupportBucketTable {
  441. store.dbsLock.Lock()
  442. defer store.dbsLock.Unlock()
  443. for bucket := range store.dbs {
  444. glog.V(4).InfofCtx(ctx, "creating bucket table %s", bucket)
  445. bucketTable := path.Join(prefixFull, bucket, abstract_sql.DEFAULT_TABLE)
  446. if err := store.DB.Table().Do(ctx, func(ctx context.Context, s table.Session) error {
  447. return s.CreateTable(ctx, bucketTable, store.createTableOptions()...)
  448. }); err != nil {
  449. glog.ErrorfCtx(ctx, "failed to create bucket table %s: %v", bucketTable, err)
  450. }
  451. }
  452. }
  453. return nil
  454. }