benchmark.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583
  1. package command
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/util/version"
  8. "io"
  9. "math"
  10. "math/rand"
  11. "os"
  12. "runtime"
  13. "runtime/pprof"
  14. "sort"
  15. "sync"
  16. "time"
  17. "google.golang.org/grpc"
  18. "github.com/seaweedfs/seaweedfs/weed/glog"
  19. "github.com/seaweedfs/seaweedfs/weed/operation"
  20. "github.com/seaweedfs/seaweedfs/weed/security"
  21. "github.com/seaweedfs/seaweedfs/weed/util"
  22. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  23. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  24. )
  25. type BenchmarkOptions struct {
  26. masters *string
  27. concurrency *int
  28. numberOfFiles *int
  29. fileSize *int
  30. idListFile *string
  31. write *bool
  32. deletePercentage *int
  33. read *bool
  34. sequentialRead *bool
  35. collection *string
  36. replication *string
  37. diskType *string
  38. cpuprofile *string
  39. maxCpu *int
  40. grpcDialOption grpc.DialOption
  41. masterClient *wdclient.MasterClient
  42. fsync *bool
  43. }
  44. var (
  45. b BenchmarkOptions
  46. sharedBytes []byte
  47. isSecure bool
  48. )
  49. func init() {
  50. cmdBenchmark.Run = runBenchmark // break init cycle
  51. cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
  52. b.masters = cmdBenchmark.Flag.String("master", "localhost:9333", "SeaweedFS master location")
  53. b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
  54. b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding")
  55. b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
  56. b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
  57. b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
  58. b.deletePercentage = cmdBenchmark.Flag.Int("deletePercent", 0, "the percent of writes that are deletes")
  59. b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
  60. b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
  61. b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
  62. b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
  63. b.diskType = cmdBenchmark.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
  64. b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
  65. b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  66. b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
  67. sharedBytes = make([]byte, 1024)
  68. }
  69. var cmdBenchmark = &Command{
  70. UsageLine: "benchmark -master=localhost:9333 -c=10 -n=100000",
  71. Short: "benchmark by writing millions of files and reading them out",
  72. Long: `benchmark on an empty SeaweedFS file system.
  73. Two tests during benchmark:
  74. 1) write lots of small files to the system
  75. 2) read the files out
  76. The file content is mostly zeros, but no compression is done.
  77. You can choose to only benchmark read or write.
  78. During write, the list of uploaded file ids is stored in "-list" specified file.
  79. You can also use your own list of file ids to run read test.
  80. Write speed and read speed will be collected.
  81. The numbers are used to get a sense of the system.
  82. Usually your network or the hard drive is the real bottleneck.
  83. Another thing to watch is whether the volumes are evenly distributed
  84. to each volume server. Because the 7 more benchmark volumes are randomly distributed
  85. to servers with free slots, it's highly possible some servers have uneven amount of
  86. benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
  87. before starting the benchmark command:
  88. http://localhost:9333/vol/grow?collection=benchmark&count=5
  89. After benchmarking, you can clean up the written data by deleting the benchmark collection
  90. http://localhost:9333/col/delete?collection=benchmark
  91. `,
  92. }
  93. var (
  94. wait sync.WaitGroup
  95. writeStats *stats
  96. readStats *stats
  97. )
  98. func runBenchmark(cmd *Command, args []string) bool {
  99. util.LoadSecurityConfiguration()
  100. b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  101. fmt.Printf("This is SeaweedFS version %s %s %s\n", version.Version(), runtime.GOOS, runtime.GOARCH)
  102. if *b.maxCpu < 1 {
  103. *b.maxCpu = runtime.NumCPU()
  104. }
  105. runtime.GOMAXPROCS(*b.maxCpu)
  106. if *b.cpuprofile != "" {
  107. f, err := os.Create(*b.cpuprofile)
  108. if err != nil {
  109. glog.Fatal(err)
  110. }
  111. pprof.StartCPUProfile(f)
  112. defer pprof.StopCPUProfile()
  113. }
  114. b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", "", *pb.ServerAddresses(*b.masters).ToServiceDiscovery())
  115. ctx := context.Background()
  116. go b.masterClient.KeepConnectedToMaster(ctx)
  117. b.masterClient.WaitUntilConnected(ctx)
  118. if *b.write {
  119. benchWrite()
  120. }
  121. if *b.read {
  122. benchRead()
  123. }
  124. return true
  125. }
  126. func benchWrite() {
  127. fileIdLineChan := make(chan string)
  128. finishChan := make(chan bool)
  129. writeStats = newStats(*b.concurrency)
  130. idChan := make(chan int)
  131. go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
  132. for i := 0; i < *b.concurrency; i++ {
  133. wait.Add(1)
  134. go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i])
  135. }
  136. writeStats.start = time.Now()
  137. writeStats.total = *b.numberOfFiles
  138. go writeStats.checkProgress("Writing Benchmark", finishChan)
  139. for i := 0; i < *b.numberOfFiles; i++ {
  140. idChan <- i
  141. }
  142. close(idChan)
  143. wait.Wait()
  144. writeStats.end = time.Now()
  145. wait.Add(2)
  146. finishChan <- true
  147. finishChan <- true
  148. wait.Wait()
  149. close(finishChan)
  150. writeStats.printStats()
  151. }
  152. func benchRead() {
  153. fileIdLineChan := make(chan string)
  154. finishChan := make(chan bool)
  155. readStats = newStats(*b.concurrency)
  156. go readFileIds(*b.idListFile, fileIdLineChan)
  157. readStats.start = time.Now()
  158. readStats.total = *b.numberOfFiles
  159. go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
  160. for i := 0; i < *b.concurrency; i++ {
  161. wait.Add(1)
  162. go readFiles(fileIdLineChan, &readStats.localStats[i])
  163. }
  164. wait.Wait()
  165. wait.Add(1)
  166. finishChan <- true
  167. wait.Wait()
  168. close(finishChan)
  169. readStats.end = time.Now()
  170. readStats.printStats()
  171. }
  172. type delayedFile struct {
  173. enterTime time.Time
  174. fp *operation.FilePart
  175. }
  176. func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
  177. defer wait.Done()
  178. delayedDeleteChan := make(chan *delayedFile, 100)
  179. var waitForDeletions sync.WaitGroup
  180. for i := 0; i < 7; i++ {
  181. waitForDeletions.Add(1)
  182. go func() {
  183. defer waitForDeletions.Done()
  184. for df := range delayedDeleteChan {
  185. if df.enterTime.After(time.Now()) {
  186. time.Sleep(df.enterTime.Sub(time.Now()))
  187. }
  188. var jwtAuthorization security.EncodedJwt
  189. if isSecure {
  190. jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(context.Background()), b.grpcDialOption, df.fp.Fid)
  191. }
  192. if e := util_http.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
  193. s.completed++
  194. } else {
  195. s.failed++
  196. }
  197. }
  198. }()
  199. }
  200. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  201. for id := range idChan {
  202. start := time.Now()
  203. fileSize := int64(*b.fileSize + random.Intn(64))
  204. fp := &operation.FilePart{
  205. Reader: &FakeReader{id: uint64(id), size: fileSize, random: random},
  206. FileSize: fileSize,
  207. MimeType: "image/bench", // prevent gzip benchmark content
  208. Fsync: *b.fsync,
  209. }
  210. ar := &operation.VolumeAssignRequest{
  211. Count: 1,
  212. Collection: *b.collection,
  213. Replication: *b.replication,
  214. DiskType: *b.diskType,
  215. }
  216. if assignResult, err := operation.Assign(context.Background(), b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil {
  217. fp.Server, fp.Fid, fp.Pref.Collection = assignResult.Url, assignResult.Fid, *b.collection
  218. if !isSecure && assignResult.Auth != "" {
  219. isSecure = true
  220. }
  221. if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
  222. if random.Intn(100) < *b.deletePercentage {
  223. s.total++
  224. delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
  225. } else {
  226. fileIdLineChan <- fp.Fid
  227. }
  228. s.completed++
  229. s.transferred += fileSize
  230. } else {
  231. s.failed++
  232. fmt.Printf("Failed to write with error:%v\n", err)
  233. }
  234. writeStats.addSample(time.Now().Sub(start))
  235. if *cmdBenchmark.IsDebug {
  236. fmt.Printf("writing %d file %s\n", id, fp.Fid)
  237. }
  238. } else {
  239. s.failed++
  240. println("writing file error:", err.Error())
  241. }
  242. }
  243. close(delayedDeleteChan)
  244. waitForDeletions.Wait()
  245. }
  246. func readFiles(fileIdLineChan chan string, s *stat) {
  247. defer wait.Done()
  248. for fid := range fileIdLineChan {
  249. if len(fid) == 0 {
  250. continue
  251. }
  252. if fid[0] == '#' {
  253. continue
  254. }
  255. if *cmdBenchmark.IsDebug {
  256. fmt.Printf("reading file %s\n", fid)
  257. }
  258. start := time.Now()
  259. var bytesRead int
  260. var err error
  261. urls, err := b.masterClient.LookupFileId(context.Background(), fid)
  262. if err != nil {
  263. s.failed++
  264. println("!!!! ", fid, " location not found!!!!!")
  265. continue
  266. }
  267. var bytes []byte
  268. for _, url := range urls {
  269. bytes, _, err = util_http.Get(url)
  270. if err == nil {
  271. break
  272. }
  273. }
  274. bytesRead = len(bytes)
  275. if err == nil {
  276. s.completed++
  277. s.transferred += int64(bytesRead)
  278. readStats.addSample(time.Now().Sub(start))
  279. } else {
  280. s.failed++
  281. fmt.Printf("Failed to read %s error:%v\n", fid, err)
  282. }
  283. }
  284. }
  285. func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
  286. file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  287. if err != nil {
  288. glog.Fatalf("File to create file %s: %s\n", fileName, err)
  289. }
  290. defer file.Close()
  291. for {
  292. select {
  293. case <-finishChan:
  294. wait.Done()
  295. return
  296. case line := <-fileIdLineChan:
  297. file.Write([]byte(line))
  298. file.Write([]byte("\n"))
  299. }
  300. }
  301. }
  302. func readFileIds(fileName string, fileIdLineChan chan string) {
  303. file, err := os.Open(fileName) // For read access.
  304. if err != nil {
  305. glog.Fatalf("File to read file %s: %s\n", fileName, err)
  306. }
  307. defer file.Close()
  308. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  309. r := bufio.NewReader(file)
  310. if *b.sequentialRead {
  311. for {
  312. if line, err := Readln(r); err == nil {
  313. fileIdLineChan <- string(line)
  314. } else {
  315. break
  316. }
  317. }
  318. } else {
  319. lines := make([]string, 0, readStats.total)
  320. for {
  321. if line, err := Readln(r); err == nil {
  322. lines = append(lines, string(line))
  323. } else {
  324. break
  325. }
  326. }
  327. if len(lines) > 0 {
  328. for i := 0; i < readStats.total; i++ {
  329. fileIdLineChan <- lines[random.Intn(len(lines))]
  330. }
  331. }
  332. }
  333. close(fileIdLineChan)
  334. }
  335. const (
  336. benchResolution = 10000 // 0.1 microsecond
  337. benchBucket = 1000000000 / benchResolution
  338. )
  339. // An efficient statics collecting and rendering
  340. type stats struct {
  341. data []int
  342. overflow []int
  343. localStats []stat
  344. start time.Time
  345. end time.Time
  346. total int
  347. }
  348. type stat struct {
  349. completed int
  350. failed int
  351. total int
  352. transferred int64
  353. }
  354. var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
  355. func newStats(n int) *stats {
  356. return &stats{
  357. data: make([]int, benchResolution),
  358. overflow: make([]int, 0),
  359. localStats: make([]stat, n),
  360. }
  361. }
  362. func (s *stats) addSample(d time.Duration) {
  363. index := int(d / benchBucket)
  364. if index < 0 {
  365. fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000)
  366. } else if index < len(s.data) {
  367. s.data[int(d/benchBucket)]++
  368. } else {
  369. s.overflow = append(s.overflow, index)
  370. }
  371. }
  372. func (s *stats) checkProgress(testName string, finishChan chan bool) {
  373. fmt.Printf("\n------------ %s ----------\n", testName)
  374. ticker := time.Tick(time.Second)
  375. lastCompleted, lastTransferred, lastTime := 0, int64(0), time.Now()
  376. for {
  377. select {
  378. case <-finishChan:
  379. wait.Done()
  380. return
  381. case t := <-ticker:
  382. completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total
  383. for _, localStat := range s.localStats {
  384. completed += localStat.completed
  385. transferred += localStat.transferred
  386. total += localStat.total
  387. }
  388. fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
  389. completed, total, float64(completed)*100/float64(total),
  390. float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)),
  391. float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
  392. )
  393. lastCompleted, lastTransferred, lastTime = completed, transferred, t
  394. }
  395. }
  396. }
  397. func (s *stats) printStats() {
  398. completed, failed, transferred, total := 0, 0, int64(0), s.total
  399. for _, localStat := range s.localStats {
  400. completed += localStat.completed
  401. failed += localStat.failed
  402. transferred += localStat.transferred
  403. total += localStat.total
  404. }
  405. timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
  406. fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
  407. fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
  408. fmt.Printf("Completed requests: %d\n", completed)
  409. fmt.Printf("Failed requests: %d\n", failed)
  410. fmt.Printf("Total transferred: %d bytes\n", transferred)
  411. fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken)
  412. fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken)
  413. n, sum := 0, 0
  414. min, max := 10000000, 0
  415. for i := 0; i < len(s.data); i++ {
  416. n += s.data[i]
  417. sum += s.data[i] * i
  418. if s.data[i] > 0 {
  419. if min > i {
  420. min = i
  421. }
  422. if max < i {
  423. max = i
  424. }
  425. }
  426. }
  427. n += len(s.overflow)
  428. for i := 0; i < len(s.overflow); i++ {
  429. sum += s.overflow[i]
  430. if min > s.overflow[i] {
  431. min = s.overflow[i]
  432. }
  433. if max < s.overflow[i] {
  434. max = s.overflow[i]
  435. }
  436. }
  437. avg := float64(sum) / float64(n)
  438. varianceSum := 0.0
  439. for i := 0; i < len(s.data); i++ {
  440. if s.data[i] > 0 {
  441. d := float64(i) - avg
  442. varianceSum += d * d * float64(s.data[i])
  443. }
  444. }
  445. for i := 0; i < len(s.overflow); i++ {
  446. d := float64(s.overflow[i]) - avg
  447. varianceSum += d * d
  448. }
  449. std := math.Sqrt(varianceSum / float64(n))
  450. fmt.Printf("\nConnection Times (ms)\n")
  451. fmt.Printf(" min avg max std\n")
  452. fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
  453. // printing percentiles
  454. fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
  455. percentiles := make([]int, len(percentages))
  456. for i := 0; i < len(percentages); i++ {
  457. percentiles[i] = n * percentages[i] / 100
  458. }
  459. percentiles[len(percentiles)-1] = n
  460. percentileIndex := 0
  461. currentSum := 0
  462. for i := 0; i < len(s.data); i++ {
  463. currentSum += s.data[i]
  464. if s.data[i] > 0 && percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  465. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(i)/10.0)
  466. percentileIndex++
  467. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  468. percentileIndex++
  469. }
  470. }
  471. }
  472. sort.Ints(s.overflow)
  473. for i := 0; i < len(s.overflow); i++ {
  474. currentSum++
  475. if percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  476. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(s.overflow[i])/10.0)
  477. percentileIndex++
  478. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  479. percentileIndex++
  480. }
  481. }
  482. }
  483. }
  484. // a fake reader to generate content to upload
  485. type FakeReader struct {
  486. id uint64 // an id number
  487. size int64 // max bytes
  488. random *rand.Rand
  489. }
  490. func (l *FakeReader) Read(p []byte) (n int, err error) {
  491. if l.size <= 0 {
  492. return 0, io.EOF
  493. }
  494. if int64(len(p)) > l.size {
  495. n = int(l.size)
  496. } else {
  497. n = len(p)
  498. }
  499. if n >= 8 {
  500. for i := 0; i < 8; i++ {
  501. p[i] = byte(l.id >> uint(i*8))
  502. }
  503. l.random.Read(p[8:])
  504. }
  505. l.size -= int64(n)
  506. return
  507. }
  508. func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) {
  509. size := int(l.size)
  510. bufferSize := len(sharedBytes)
  511. for size > 0 {
  512. tempBuffer := sharedBytes
  513. if size < bufferSize {
  514. tempBuffer = sharedBytes[0:size]
  515. }
  516. count, e := w.Write(tempBuffer)
  517. if e != nil {
  518. return int64(size), e
  519. }
  520. size -= count
  521. }
  522. return l.size, nil
  523. }
  524. func Readln(r *bufio.Reader) ([]byte, error) {
  525. var (
  526. isPrefix = true
  527. err error
  528. line, ln []byte
  529. )
  530. for isPrefix && err == nil {
  531. line, isPrefix, err = r.ReadLine()
  532. ln = append(ln, line...)
  533. }
  534. return ln, err
  535. }