concurrent_operations_test.go 12 KB


  1. package fuse_test
  2. import (
  3. "bytes"
  4. "crypto/rand"
  5. "fmt"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "testing"
  10. "time"
  11. "github.com/stretchr/testify/assert"
  12. "github.com/stretchr/testify/require"
  13. )
  14. // TestConcurrentFileOperations tests concurrent file operations
  15. func TestConcurrentFileOperations(t *testing.T) {
  16. framework := NewFuseTestFramework(t, DefaultTestConfig())
  17. defer framework.Cleanup()
  18. require.NoError(t, framework.Setup(DefaultTestConfig()))
  19. t.Run("ConcurrentFileWrites", func(t *testing.T) {
  20. testConcurrentFileWrites(t, framework)
  21. })
  22. t.Run("ConcurrentFileReads", func(t *testing.T) {
  23. testConcurrentFileReads(t, framework)
  24. })
  25. t.Run("ConcurrentReadWrite", func(t *testing.T) {
  26. testConcurrentReadWrite(t, framework)
  27. })
  28. t.Run("ConcurrentDirectoryOperations", func(t *testing.T) {
  29. testConcurrentDirectoryOperations(t, framework)
  30. })
  31. t.Run("ConcurrentFileCreation", func(t *testing.T) {
  32. testConcurrentFileCreation(t, framework)
  33. })
  34. }
  35. // testConcurrentFileWrites tests multiple goroutines writing to different files
  36. func testConcurrentFileWrites(t *testing.T, framework *FuseTestFramework) {
  37. numWorkers := 10
  38. filesPerWorker := 5
  39. var wg sync.WaitGroup
  40. var mutex sync.Mutex
  41. errors := make([]error, 0)
  42. // Function to collect errors safely
  43. addError := func(err error) {
  44. mutex.Lock()
  45. defer mutex.Unlock()
  46. errors = append(errors, err)
  47. }
  48. // Start concurrent workers
  49. for worker := 0; worker < numWorkers; worker++ {
  50. wg.Add(1)
  51. go func(workerID int) {
  52. defer wg.Done()
  53. for file := 0; file < filesPerWorker; file++ {
  54. filename := fmt.Sprintf("worker_%d_file_%d.txt", workerID, file)
  55. content := []byte(fmt.Sprintf("Worker %d, File %d - %s", workerID, file, time.Now().String()))
  56. mountPath := filepath.Join(framework.GetMountPoint(), filename)
  57. if err := os.WriteFile(mountPath, content, 0644); err != nil {
  58. addError(fmt.Errorf("worker %d file %d: %v", workerID, file, err))
  59. return
  60. }
  61. // Verify file was written correctly
  62. readContent, err := os.ReadFile(mountPath)
  63. if err != nil {
  64. addError(fmt.Errorf("worker %d file %d read: %v", workerID, file, err))
  65. return
  66. }
  67. if !bytes.Equal(content, readContent) {
  68. addError(fmt.Errorf("worker %d file %d: content mismatch", workerID, file))
  69. return
  70. }
  71. }
  72. }(worker)
  73. }
  74. wg.Wait()
  75. // Check for errors
  76. require.Empty(t, errors, "Concurrent writes failed: %v", errors)
  77. // Verify all files exist and have correct content
  78. for worker := 0; worker < numWorkers; worker++ {
  79. for file := 0; file < filesPerWorker; file++ {
  80. filename := fmt.Sprintf("worker_%d_file_%d.txt", worker, file)
  81. framework.AssertFileExists(filename)
  82. }
  83. }
  84. }
  85. // testConcurrentFileReads tests multiple goroutines reading from the same file
  86. func testConcurrentFileReads(t *testing.T, framework *FuseTestFramework) {
  87. // Create a test file
  88. filename := "concurrent_read_test.txt"
  89. testData := make([]byte, 1024*1024) // 1MB
  90. _, err := rand.Read(testData)
  91. require.NoError(t, err)
  92. framework.CreateTestFile(filename, testData)
  93. numReaders := 20
  94. var wg sync.WaitGroup
  95. var mutex sync.Mutex
  96. errors := make([]error, 0)
  97. addError := func(err error) {
  98. mutex.Lock()
  99. defer mutex.Unlock()
  100. errors = append(errors, err)
  101. }
  102. // Start concurrent readers
  103. for reader := 0; reader < numReaders; reader++ {
  104. wg.Add(1)
  105. go func(readerID int) {
  106. defer wg.Done()
  107. mountPath := filepath.Join(framework.GetMountPoint(), filename)
  108. // Read multiple times
  109. for i := 0; i < 3; i++ {
  110. readData, err := os.ReadFile(mountPath)
  111. if err != nil {
  112. addError(fmt.Errorf("reader %d iteration %d: %v", readerID, i, err))
  113. return
  114. }
  115. if !bytes.Equal(testData, readData) {
  116. addError(fmt.Errorf("reader %d iteration %d: data mismatch", readerID, i))
  117. return
  118. }
  119. }
  120. }(reader)
  121. }
  122. wg.Wait()
  123. require.Empty(t, errors, "Concurrent reads failed: %v", errors)
  124. }
  125. // testConcurrentReadWrite tests simultaneous read and write operations
  126. func testConcurrentReadWrite(t *testing.T, framework *FuseTestFramework) {
  127. filename := "concurrent_rw_test.txt"
  128. initialData := bytes.Repeat([]byte("INITIAL"), 1000)
  129. framework.CreateTestFile(filename, initialData)
  130. var wg sync.WaitGroup
  131. var mutex sync.Mutex
  132. errors := make([]error, 0)
  133. addError := func(err error) {
  134. mutex.Lock()
  135. defer mutex.Unlock()
  136. errors = append(errors, err)
  137. }
  138. mountPath := filepath.Join(framework.GetMountPoint(), filename)
  139. // Start readers
  140. numReaders := 5
  141. for i := 0; i < numReaders; i++ {
  142. wg.Add(1)
  143. go func(readerID int) {
  144. defer wg.Done()
  145. for j := 0; j < 10; j++ {
  146. _, err := os.ReadFile(mountPath)
  147. if err != nil {
  148. addError(fmt.Errorf("reader %d: %v", readerID, err))
  149. return
  150. }
  151. time.Sleep(10 * time.Millisecond)
  152. }
  153. }(i)
  154. }
  155. // Start writers
  156. numWriters := 2
  157. for i := 0; i < numWriters; i++ {
  158. wg.Add(1)
  159. go func(writerID int) {
  160. defer wg.Done()
  161. for j := 0; j < 5; j++ {
  162. newData := bytes.Repeat([]byte(fmt.Sprintf("WRITER%d", writerID)), 1000)
  163. err := os.WriteFile(mountPath, newData, 0644)
  164. if err != nil {
  165. addError(fmt.Errorf("writer %d: %v", writerID, err))
  166. return
  167. }
  168. time.Sleep(50 * time.Millisecond)
  169. }
  170. }(i)
  171. }
  172. wg.Wait()
  173. require.Empty(t, errors, "Concurrent read/write failed: %v", errors)
  174. // Verify file still exists and is readable
  175. framework.AssertFileExists(filename)
  176. }
  177. // testConcurrentDirectoryOperations tests concurrent directory operations
  178. func testConcurrentDirectoryOperations(t *testing.T, framework *FuseTestFramework) {
  179. numWorkers := 8
  180. var wg sync.WaitGroup
  181. var mutex sync.Mutex
  182. errors := make([]error, 0)
  183. addError := func(err error) {
  184. mutex.Lock()
  185. defer mutex.Unlock()
  186. errors = append(errors, err)
  187. }
  188. // Each worker creates a directory tree
  189. for worker := 0; worker < numWorkers; worker++ {
  190. wg.Add(1)
  191. go func(workerID int) {
  192. defer wg.Done()
  193. // Create worker directory
  194. workerDir := fmt.Sprintf("worker_%d", workerID)
  195. mountPath := filepath.Join(framework.GetMountPoint(), workerDir)
  196. if err := os.Mkdir(mountPath, 0755); err != nil {
  197. addError(fmt.Errorf("worker %d mkdir: %v", workerID, err))
  198. return
  199. }
  200. // Create subdirectories and files
  201. for i := 0; i < 5; i++ {
  202. subDir := filepath.Join(mountPath, fmt.Sprintf("subdir_%d", i))
  203. if err := os.Mkdir(subDir, 0755); err != nil {
  204. addError(fmt.Errorf("worker %d subdir %d: %v", workerID, i, err))
  205. return
  206. }
  207. // Create file in subdirectory
  208. testFile := filepath.Join(subDir, "test.txt")
  209. content := []byte(fmt.Sprintf("Worker %d, Subdir %d", workerID, i))
  210. if err := os.WriteFile(testFile, content, 0644); err != nil {
  211. addError(fmt.Errorf("worker %d file %d: %v", workerID, i, err))
  212. return
  213. }
  214. }
  215. }(worker)
  216. }
  217. wg.Wait()
  218. require.Empty(t, errors, "Concurrent directory operations failed: %v", errors)
  219. // Verify all structures were created
  220. for worker := 0; worker < numWorkers; worker++ {
  221. workerDir := fmt.Sprintf("worker_%d", worker)
  222. mountPath := filepath.Join(framework.GetMountPoint(), workerDir)
  223. info, err := os.Stat(mountPath)
  224. require.NoError(t, err)
  225. assert.True(t, info.IsDir())
  226. // Check subdirectories
  227. for i := 0; i < 5; i++ {
  228. subDir := filepath.Join(mountPath, fmt.Sprintf("subdir_%d", i))
  229. info, err := os.Stat(subDir)
  230. require.NoError(t, err)
  231. assert.True(t, info.IsDir())
  232. testFile := filepath.Join(subDir, "test.txt")
  233. expectedContent := []byte(fmt.Sprintf("Worker %d, Subdir %d", worker, i))
  234. actualContent, err := os.ReadFile(testFile)
  235. require.NoError(t, err)
  236. assert.Equal(t, expectedContent, actualContent)
  237. }
  238. }
  239. }
  240. // testConcurrentFileCreation tests concurrent creation of files in same directory
  241. func testConcurrentFileCreation(t *testing.T, framework *FuseTestFramework) {
  242. // Create test directory
  243. testDir := "concurrent_creation"
  244. framework.CreateTestDir(testDir)
  245. numWorkers := 15
  246. filesPerWorker := 10
  247. var wg sync.WaitGroup
  248. var mutex sync.Mutex
  249. errors := make([]error, 0)
  250. createdFiles := make(map[string]bool)
  251. addError := func(err error) {
  252. mutex.Lock()
  253. defer mutex.Unlock()
  254. errors = append(errors, err)
  255. }
  256. addFile := func(filename string) {
  257. mutex.Lock()
  258. defer mutex.Unlock()
  259. createdFiles[filename] = true
  260. }
  261. // Create files concurrently
  262. for worker := 0; worker < numWorkers; worker++ {
  263. wg.Add(1)
  264. go func(workerID int) {
  265. defer wg.Done()
  266. for file := 0; file < filesPerWorker; file++ {
  267. filename := fmt.Sprintf("file_%d_%d.txt", workerID, file)
  268. relativePath := filepath.Join(testDir, filename)
  269. mountPath := filepath.Join(framework.GetMountPoint(), relativePath)
  270. content := []byte(fmt.Sprintf("Worker %d, File %d, Time: %s",
  271. workerID, file, time.Now().Format(time.RFC3339Nano)))
  272. if err := os.WriteFile(mountPath, content, 0644); err != nil {
  273. addError(fmt.Errorf("worker %d file %d: %v", workerID, file, err))
  274. return
  275. }
  276. addFile(filename)
  277. }
  278. }(worker)
  279. }
  280. wg.Wait()
  281. require.Empty(t, errors, "Concurrent file creation failed: %v", errors)
  282. // Verify all files were created
  283. expectedCount := numWorkers * filesPerWorker
  284. assert.Equal(t, expectedCount, len(createdFiles))
  285. // Read directory and verify count
  286. mountPath := filepath.Join(framework.GetMountPoint(), testDir)
  287. entries, err := os.ReadDir(mountPath)
  288. require.NoError(t, err)
  289. assert.Equal(t, expectedCount, len(entries))
  290. // Verify each file exists and has content
  291. for filename := range createdFiles {
  292. relativePath := filepath.Join(testDir, filename)
  293. framework.AssertFileExists(relativePath)
  294. }
  295. }
  296. // TestStressOperations tests high-load scenarios
  297. func TestStressOperations(t *testing.T) {
  298. framework := NewFuseTestFramework(t, DefaultTestConfig())
  299. defer framework.Cleanup()
  300. require.NoError(t, framework.Setup(DefaultTestConfig()))
  301. t.Run("HighFrequencySmallWrites", func(t *testing.T) {
  302. testHighFrequencySmallWrites(t, framework)
  303. })
  304. t.Run("ManySmallFiles", func(t *testing.T) {
  305. testManySmallFiles(t, framework)
  306. })
  307. }
  308. // testHighFrequencySmallWrites tests many small writes to the same file
  309. func testHighFrequencySmallWrites(t *testing.T, framework *FuseTestFramework) {
  310. filename := "high_freq_writes.txt"
  311. mountPath := filepath.Join(framework.GetMountPoint(), filename)
  312. // Open file for writing
  313. file, err := os.OpenFile(mountPath, os.O_CREATE|os.O_WRONLY, 0644)
  314. require.NoError(t, err)
  315. defer file.Close()
  316. // Perform many small writes
  317. numWrites := 1000
  318. writeSize := 100
  319. for i := 0; i < numWrites; i++ {
  320. data := []byte(fmt.Sprintf("Write %04d: %s\n", i, bytes.Repeat([]byte("x"), writeSize-20)))
  321. _, err := file.Write(data)
  322. require.NoError(t, err)
  323. }
  324. file.Close()
  325. // Verify file size
  326. info, err := os.Stat(mountPath)
  327. require.NoError(t, err)
  328. assert.Equal(t, totalSize, info.Size())
  329. }
  330. // testManySmallFiles tests creating many small files
  331. func testManySmallFiles(t *testing.T, framework *FuseTestFramework) {
  332. testDir := "many_small_files"
  333. framework.CreateTestDir(testDir)
  334. numFiles := 500
  335. var wg sync.WaitGroup
  336. var mutex sync.Mutex
  337. errors := make([]error, 0)
  338. addError := func(err error) {
  339. mutex.Lock()
  340. defer mutex.Unlock()
  341. errors = append(errors, err)
  342. }
  343. // Create files in batches
  344. batchSize := 50
  345. for batch := 0; batch < numFiles/batchSize; batch++ {
  346. wg.Add(1)
  347. go func(batchID int) {
  348. defer wg.Done()
  349. for i := 0; i < batchSize; i++ {
  350. fileNum := batchID*batchSize + i
  351. filename := filepath.Join(testDir, fmt.Sprintf("small_file_%04d.txt", fileNum))
  352. content := []byte(fmt.Sprintf("File %d content", fileNum))
  353. mountPath := filepath.Join(framework.GetMountPoint(), filename)
  354. if err := os.WriteFile(mountPath, content, 0644); err != nil {
  355. addError(fmt.Errorf("file %d: %v", fileNum, err))
  356. return
  357. }
  358. }
  359. }(batch)
  360. }
  361. wg.Wait()
  362. require.Empty(t, errors, "Many small files creation failed: %v", errors)
  363. // Verify directory listing
  364. mountPath := filepath.Join(framework.GetMountPoint(), testDir)
  365. entries, err := os.ReadDir(mountPath)
  366. require.NoError(t, err)
  367. assert.Equal(t, numFiles, len(entries))
  368. }