filer_server_handlers_proxy.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package weed_server
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/security"
  5. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  6. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  7. "github.com/seaweedfs/seaweedfs/weed/util/request_id"
  8. "io"
  9. "math/rand/v2"
  10. "net/http"
  11. )
  12. func (fs *FilerServer) maybeAddVolumeJwtAuthorization(r *http.Request, fileId string, isWrite bool) {
  13. encodedJwt := fs.maybeGetVolumeJwtAuthorizationToken(fileId, isWrite)
  14. if encodedJwt == "" {
  15. return
  16. }
  17. r.Header.Set("Authorization", "BEARER "+string(encodedJwt))
  18. }
  19. func (fs *FilerServer) maybeGetVolumeJwtAuthorizationToken(fileId string, isWrite bool) string {
  20. var encodedJwt security.EncodedJwt
  21. if isWrite {
  22. encodedJwt = security.GenJwtForVolumeServer(fs.volumeGuard.SigningKey, fs.volumeGuard.ExpiresAfterSec, fileId)
  23. } else {
  24. encodedJwt = security.GenJwtForVolumeServer(fs.volumeGuard.ReadSigningKey, fs.volumeGuard.ReadExpiresAfterSec, fileId)
  25. }
  26. return string(encodedJwt)
  27. }
  28. func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Request, fileId string) {
  29. ctx := r.Context()
  30. urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(ctx, fileId)
  31. if err != nil {
  32. glog.ErrorfCtx(ctx, "locate %s: %v", fileId, err)
  33. w.WriteHeader(http.StatusInternalServerError)
  34. return
  35. }
  36. if len(urlStrings) == 0 {
  37. w.WriteHeader(http.StatusNotFound)
  38. return
  39. }
  40. proxyReq, err := http.NewRequest(r.Method, urlStrings[rand.IntN(len(urlStrings))], r.Body)
  41. if err != nil {
  42. glog.ErrorfCtx(ctx, "NewRequest %s: %v", urlStrings[0], err)
  43. w.WriteHeader(http.StatusInternalServerError)
  44. return
  45. }
  46. proxyReq.Header.Set("Host", r.Host)
  47. proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
  48. request_id.InjectToRequest(ctx, proxyReq)
  49. for header, values := range r.Header {
  50. for _, value := range values {
  51. proxyReq.Header.Add(header, value)
  52. }
  53. }
  54. proxyResponse, postErr := util_http.GetGlobalHttpClient().Do(proxyReq)
  55. if postErr != nil {
  56. glog.ErrorfCtx(ctx, "post to filer: %v", postErr)
  57. w.WriteHeader(http.StatusInternalServerError)
  58. return
  59. }
  60. defer util_http.CloseResponse(proxyResponse)
  61. for k, v := range proxyResponse.Header {
  62. w.Header()[k] = v
  63. }
  64. w.WriteHeader(proxyResponse.StatusCode)
  65. buf := mem.Allocate(128 * 1024)
  66. defer mem.Free(buf)
  67. io.CopyBuffer(w, proxyResponse.Body, buf)
  68. }