| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448 |
- package fuse_test
- import (
- "bytes"
- "crypto/rand"
- "fmt"
- "os"
- "path/filepath"
- "sync"
- "testing"
- "time"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- )
- // TestConcurrentFileOperations tests concurrent file operations
- func TestConcurrentFileOperations(t *testing.T) {
- framework := NewFuseTestFramework(t, DefaultTestConfig())
- defer framework.Cleanup()
- require.NoError(t, framework.Setup(DefaultTestConfig()))
- t.Run("ConcurrentFileWrites", func(t *testing.T) {
- testConcurrentFileWrites(t, framework)
- })
- t.Run("ConcurrentFileReads", func(t *testing.T) {
- testConcurrentFileReads(t, framework)
- })
- t.Run("ConcurrentReadWrite", func(t *testing.T) {
- testConcurrentReadWrite(t, framework)
- })
- t.Run("ConcurrentDirectoryOperations", func(t *testing.T) {
- testConcurrentDirectoryOperations(t, framework)
- })
- t.Run("ConcurrentFileCreation", func(t *testing.T) {
- testConcurrentFileCreation(t, framework)
- })
- }
- // testConcurrentFileWrites tests multiple goroutines writing to different files
- func testConcurrentFileWrites(t *testing.T, framework *FuseTestFramework) {
- numWorkers := 10
- filesPerWorker := 5
- var wg sync.WaitGroup
- var mutex sync.Mutex
- errors := make([]error, 0)
- // Function to collect errors safely
- addError := func(err error) {
- mutex.Lock()
- defer mutex.Unlock()
- errors = append(errors, err)
- }
- // Start concurrent workers
- for worker := 0; worker < numWorkers; worker++ {
- wg.Add(1)
- go func(workerID int) {
- defer wg.Done()
- for file := 0; file < filesPerWorker; file++ {
- filename := fmt.Sprintf("worker_%d_file_%d.txt", workerID, file)
- content := []byte(fmt.Sprintf("Worker %d, File %d - %s", workerID, file, time.Now().String()))
- mountPath := filepath.Join(framework.GetMountPoint(), filename)
- if err := os.WriteFile(mountPath, content, 0644); err != nil {
- addError(fmt.Errorf("worker %d file %d: %v", workerID, file, err))
- return
- }
- // Verify file was written correctly
- readContent, err := os.ReadFile(mountPath)
- if err != nil {
- addError(fmt.Errorf("worker %d file %d read: %v", workerID, file, err))
- return
- }
- if !bytes.Equal(content, readContent) {
- addError(fmt.Errorf("worker %d file %d: content mismatch", workerID, file))
- return
- }
- }
- }(worker)
- }
- wg.Wait()
- // Check for errors
- require.Empty(t, errors, "Concurrent writes failed: %v", errors)
- // Verify all files exist and have correct content
- for worker := 0; worker < numWorkers; worker++ {
- for file := 0; file < filesPerWorker; file++ {
- filename := fmt.Sprintf("worker_%d_file_%d.txt", worker, file)
- framework.AssertFileExists(filename)
- }
- }
- }
- // testConcurrentFileReads tests multiple goroutines reading from the same file
- func testConcurrentFileReads(t *testing.T, framework *FuseTestFramework) {
- // Create a test file
- filename := "concurrent_read_test.txt"
- testData := make([]byte, 1024*1024) // 1MB
- _, err := rand.Read(testData)
- require.NoError(t, err)
- framework.CreateTestFile(filename, testData)
- numReaders := 20
- var wg sync.WaitGroup
- var mutex sync.Mutex
- errors := make([]error, 0)
- addError := func(err error) {
- mutex.Lock()
- defer mutex.Unlock()
- errors = append(errors, err)
- }
- // Start concurrent readers
- for reader := 0; reader < numReaders; reader++ {
- wg.Add(1)
- go func(readerID int) {
- defer wg.Done()
- mountPath := filepath.Join(framework.GetMountPoint(), filename)
- // Read multiple times
- for i := 0; i < 3; i++ {
- readData, err := os.ReadFile(mountPath)
- if err != nil {
- addError(fmt.Errorf("reader %d iteration %d: %v", readerID, i, err))
- return
- }
- if !bytes.Equal(testData, readData) {
- addError(fmt.Errorf("reader %d iteration %d: data mismatch", readerID, i))
- return
- }
- }
- }(reader)
- }
- wg.Wait()
- require.Empty(t, errors, "Concurrent reads failed: %v", errors)
- }
- // testConcurrentReadWrite tests simultaneous read and write operations
- func testConcurrentReadWrite(t *testing.T, framework *FuseTestFramework) {
- filename := "concurrent_rw_test.txt"
- initialData := bytes.Repeat([]byte("INITIAL"), 1000)
- framework.CreateTestFile(filename, initialData)
- var wg sync.WaitGroup
- var mutex sync.Mutex
- errors := make([]error, 0)
- addError := func(err error) {
- mutex.Lock()
- defer mutex.Unlock()
- errors = append(errors, err)
- }
- mountPath := filepath.Join(framework.GetMountPoint(), filename)
- // Start readers
- numReaders := 5
- for i := 0; i < numReaders; i++ {
- wg.Add(1)
- go func(readerID int) {
- defer wg.Done()
- for j := 0; j < 10; j++ {
- _, err := os.ReadFile(mountPath)
- if err != nil {
- addError(fmt.Errorf("reader %d: %v", readerID, err))
- return
- }
- time.Sleep(10 * time.Millisecond)
- }
- }(i)
- }
- // Start writers
- numWriters := 2
- for i := 0; i < numWriters; i++ {
- wg.Add(1)
- go func(writerID int) {
- defer wg.Done()
- for j := 0; j < 5; j++ {
- newData := bytes.Repeat([]byte(fmt.Sprintf("WRITER%d", writerID)), 1000)
- err := os.WriteFile(mountPath, newData, 0644)
- if err != nil {
- addError(fmt.Errorf("writer %d: %v", writerID, err))
- return
- }
- time.Sleep(50 * time.Millisecond)
- }
- }(i)
- }
- wg.Wait()
- require.Empty(t, errors, "Concurrent read/write failed: %v", errors)
- // Verify file still exists and is readable
- framework.AssertFileExists(filename)
- }
- // testConcurrentDirectoryOperations tests concurrent directory operations
- func testConcurrentDirectoryOperations(t *testing.T, framework *FuseTestFramework) {
- numWorkers := 8
- var wg sync.WaitGroup
- var mutex sync.Mutex
- errors := make([]error, 0)
- addError := func(err error) {
- mutex.Lock()
- defer mutex.Unlock()
- errors = append(errors, err)
- }
- // Each worker creates a directory tree
- for worker := 0; worker < numWorkers; worker++ {
- wg.Add(1)
- go func(workerID int) {
- defer wg.Done()
- // Create worker directory
- workerDir := fmt.Sprintf("worker_%d", workerID)
- mountPath := filepath.Join(framework.GetMountPoint(), workerDir)
- if err := os.Mkdir(mountPath, 0755); err != nil {
- addError(fmt.Errorf("worker %d mkdir: %v", workerID, err))
- return
- }
- // Create subdirectories and files
- for i := 0; i < 5; i++ {
- subDir := filepath.Join(mountPath, fmt.Sprintf("subdir_%d", i))
- if err := os.Mkdir(subDir, 0755); err != nil {
- addError(fmt.Errorf("worker %d subdir %d: %v", workerID, i, err))
- return
- }
- // Create file in subdirectory
- testFile := filepath.Join(subDir, "test.txt")
- content := []byte(fmt.Sprintf("Worker %d, Subdir %d", workerID, i))
- if err := os.WriteFile(testFile, content, 0644); err != nil {
- addError(fmt.Errorf("worker %d file %d: %v", workerID, i, err))
- return
- }
- }
- }(worker)
- }
- wg.Wait()
- require.Empty(t, errors, "Concurrent directory operations failed: %v", errors)
- // Verify all structures were created
- for worker := 0; worker < numWorkers; worker++ {
- workerDir := fmt.Sprintf("worker_%d", worker)
- mountPath := filepath.Join(framework.GetMountPoint(), workerDir)
- info, err := os.Stat(mountPath)
- require.NoError(t, err)
- assert.True(t, info.IsDir())
- // Check subdirectories
- for i := 0; i < 5; i++ {
- subDir := filepath.Join(mountPath, fmt.Sprintf("subdir_%d", i))
- info, err := os.Stat(subDir)
- require.NoError(t, err)
- assert.True(t, info.IsDir())
- testFile := filepath.Join(subDir, "test.txt")
- expectedContent := []byte(fmt.Sprintf("Worker %d, Subdir %d", worker, i))
- actualContent, err := os.ReadFile(testFile)
- require.NoError(t, err)
- assert.Equal(t, expectedContent, actualContent)
- }
- }
- }
- // testConcurrentFileCreation tests concurrent creation of files in same directory
- func testConcurrentFileCreation(t *testing.T, framework *FuseTestFramework) {
- // Create test directory
- testDir := "concurrent_creation"
- framework.CreateTestDir(testDir)
- numWorkers := 15
- filesPerWorker := 10
- var wg sync.WaitGroup
- var mutex sync.Mutex
- errors := make([]error, 0)
- createdFiles := make(map[string]bool)
- addError := func(err error) {
- mutex.Lock()
- defer mutex.Unlock()
- errors = append(errors, err)
- }
- addFile := func(filename string) {
- mutex.Lock()
- defer mutex.Unlock()
- createdFiles[filename] = true
- }
- // Create files concurrently
- for worker := 0; worker < numWorkers; worker++ {
- wg.Add(1)
- go func(workerID int) {
- defer wg.Done()
- for file := 0; file < filesPerWorker; file++ {
- filename := fmt.Sprintf("file_%d_%d.txt", workerID, file)
- relativePath := filepath.Join(testDir, filename)
- mountPath := filepath.Join(framework.GetMountPoint(), relativePath)
- content := []byte(fmt.Sprintf("Worker %d, File %d, Time: %s",
- workerID, file, time.Now().Format(time.RFC3339Nano)))
- if err := os.WriteFile(mountPath, content, 0644); err != nil {
- addError(fmt.Errorf("worker %d file %d: %v", workerID, file, err))
- return
- }
- addFile(filename)
- }
- }(worker)
- }
- wg.Wait()
- require.Empty(t, errors, "Concurrent file creation failed: %v", errors)
- // Verify all files were created
- expectedCount := numWorkers * filesPerWorker
- assert.Equal(t, expectedCount, len(createdFiles))
- // Read directory and verify count
- mountPath := filepath.Join(framework.GetMountPoint(), testDir)
- entries, err := os.ReadDir(mountPath)
- require.NoError(t, err)
- assert.Equal(t, expectedCount, len(entries))
- // Verify each file exists and has content
- for filename := range createdFiles {
- relativePath := filepath.Join(testDir, filename)
- framework.AssertFileExists(relativePath)
- }
- }
- // TestStressOperations tests high-load scenarios
- func TestStressOperations(t *testing.T) {
- framework := NewFuseTestFramework(t, DefaultTestConfig())
- defer framework.Cleanup()
- require.NoError(t, framework.Setup(DefaultTestConfig()))
- t.Run("HighFrequencySmallWrites", func(t *testing.T) {
- testHighFrequencySmallWrites(t, framework)
- })
- t.Run("ManySmallFiles", func(t *testing.T) {
- testManySmallFiles(t, framework)
- })
- }
- // testHighFrequencySmallWrites tests many small writes to the same file
- func testHighFrequencySmallWrites(t *testing.T, framework *FuseTestFramework) {
- filename := "high_freq_writes.txt"
- mountPath := filepath.Join(framework.GetMountPoint(), filename)
- // Open file for writing
- file, err := os.OpenFile(mountPath, os.O_CREATE|os.O_WRONLY, 0644)
- require.NoError(t, err)
- defer file.Close()
- // Perform many small writes
- numWrites := 1000
- writeSize := 100
- for i := 0; i < numWrites; i++ {
- data := []byte(fmt.Sprintf("Write %04d: %s\n", i, bytes.Repeat([]byte("x"), writeSize-20)))
- _, err := file.Write(data)
- require.NoError(t, err)
- }
- file.Close()
- // Verify file size
- info, err := os.Stat(mountPath)
- require.NoError(t, err)
- assert.Equal(t, totalSize, info.Size())
- }
- // testManySmallFiles tests creating many small files
- func testManySmallFiles(t *testing.T, framework *FuseTestFramework) {
- testDir := "many_small_files"
- framework.CreateTestDir(testDir)
- numFiles := 500
- var wg sync.WaitGroup
- var mutex sync.Mutex
- errors := make([]error, 0)
- addError := func(err error) {
- mutex.Lock()
- defer mutex.Unlock()
- errors = append(errors, err)
- }
- // Create files in batches
- batchSize := 50
- for batch := 0; batch < numFiles/batchSize; batch++ {
- wg.Add(1)
- go func(batchID int) {
- defer wg.Done()
- for i := 0; i < batchSize; i++ {
- fileNum := batchID*batchSize + i
- filename := filepath.Join(testDir, fmt.Sprintf("small_file_%04d.txt", fileNum))
- content := []byte(fmt.Sprintf("File %d content", fileNum))
- mountPath := filepath.Join(framework.GetMountPoint(), filename)
- if err := os.WriteFile(mountPath, content, 0644); err != nil {
- addError(fmt.Errorf("file %d: %v", fileNum, err))
- return
- }
- }
- }(batch)
- }
- wg.Wait()
- require.Empty(t, errors, "Many small files creation failed: %v", errors)
- // Verify directory listing
- mountPath := filepath.Join(framework.GetMountPoint(), testDir)
- entries, err := os.ReadDir(mountPath)
- require.NoError(t, err)
- assert.Equal(t, numFiles, len(entries))
- }
|