20240929-autumn-rewrite---prod-migration.mjs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. // THIS FILE IS TAILORED TO REVOLT PRODUCTION
  2. // MIGRATING FROM A BACKUP & EXISTING CDN NODE
  3. // INTO BACKBLAZE B2
  4. //
  5. // THIS IS ONLY INCLUDED FOR REFERENCE PURPOSES
  6. // NODE_EXTRA_CA_CERTS=~/projects/revolt-admin-panel/revolt.crt node index.mjs
  7. // NODE_EXTRA_CA_CERTS=/cwd/revolt.crt node /cwd/index.mjs
  8. import { readdir, readFile, writeFile } from "node:fs/promises";
  9. import { createCipheriv, createHash, randomBytes } from "node:crypto";
  10. import { resolve } from "node:path";
  11. import { MongoClient } from "mongodb";
  12. import { config } from "dotenv";
  13. import assert from "node:assert";
  14. import bfj from "bfj";
  15. config();
  16. config({ path: "/cwd/.env" });
  17. import BackBlazeB2 from "backblaze-b2";
  18. import axiosRetry from "axios-retry";
  19. import { decodeTime } from "ulid";
  20. // .env:
  21. // ENCRYPTION_KEY=
  22. // MONGODB=
  23. // B2_APP_KEYID=
  24. // B2_APP_KEY=
  25. /**
  26. * @type {string | null}
  27. */
  28. const USE_CACHE = "/cwd/cache.json";
  29. let processed_ids = new Set();
  30. async function dumpCache() {
  31. if (USE_CACHE) await bfj.write(USE_CACHE, [...processed_ids]);
  32. }
  33. if (USE_CACHE) {
  34. try {
  35. processed_ids = new Set(await bfj.read(USE_CACHE));
  36. } catch (err) {
  37. console.error(err);
  38. }
  39. }
  40. const b2 = new BackBlazeB2({
  41. applicationKeyId: process.env.B2_APP_KEYID,
  42. applicationKey: process.env.B2_APP_KEY,
  43. retry: {
  44. retryDelay: axiosRetry.exponentialDelay,
  45. },
  46. });
  47. await b2.authorize();
  48. //const encKey = Buffer.from(randomBytes(32), "utf8");
  49. //console.info(encKey.toString("base64"));
  50. const encKey = Buffer.from(process.env.ENCRYPTION_KEY, "base64");
  51. const mongo = new MongoClient(process.env.MONGODB);
  52. await mongo.connect();
  53. // TODO: set all existing files to current timestamp
  54. const dirs = [
  55. // "banners",
  56. // "emojis", // TODO: timestamps
  57. // "avatars",
  58. // "backgrounds",
  59. // "icons",
  60. "attachments", // https://stackoverflow.com/a/18777877
  61. ];
  62. async function encryptFile(data) {
  63. const iv = Buffer.from(randomBytes(12), "utf8");
  64. const cipher = createCipheriv("aes-256-gcm", encKey, iv);
  65. let enc = cipher.update(data, "utf8", "base64");
  66. enc += cipher.final("base64");
  67. // enc += cipher.getAuthTag();
  68. enc = Buffer.from(enc, "base64");
  69. return {
  70. iv,
  71. data: Buffer.concat([enc, cipher.getAuthTag()]),
  72. };
  73. }
  74. const cache = {};
  75. const objectLookup = {};
  76. /**
  77. * aaa
  78. */
  79. async function determineUploaderIdAndUse(f, v, i) {
  80. if (f.tag === "attachments" && v === "attachments") {
  81. if (typeof f.message_id !== "string") {
  82. console.warn(i, "No message id specified.");
  83. return null;
  84. }
  85. if (!objectLookup[f.message_id]) {
  86. objectLookup[f.message_id] = await mongo
  87. .db("revolt")
  88. .collection("messages")
  89. .findOne({
  90. _id: f.message_id,
  91. });
  92. }
  93. if (!objectLookup[f.message_id]) {
  94. console.warn(i, "Message", f.message_id, "doesn't exist anymore!");
  95. return null;
  96. }
  97. return {
  98. uploaded_at: new Date(decodeTime(f.message_id)),
  99. uploader_id: objectLookup[f.message_id].author,
  100. used_for: {
  101. type: "message",
  102. id: f.message_id,
  103. },
  104. };
  105. } else if (f.tag === "banners" && v === "banners") {
  106. if (typeof f.server_id !== "string") {
  107. console.warn(i, "No server id specified.");
  108. return null;
  109. }
  110. if (!objectLookup[f.server_id]) {
  111. objectLookup[f.server_id] = await mongo
  112. .db("revolt")
  113. .collection("servers")
  114. .findOne({
  115. _id: f.server_id,
  116. });
  117. }
  118. if (!objectLookup[f.server_id]) {
  119. console.warn(i, "Server", f.server_id, "doesn't exist anymore!");
  120. return null;
  121. }
  122. return {
  123. uploaded_at: new Date(),
  124. uploader_id: objectLookup[f.server_id].owner,
  125. used_for: {
  126. type: "serverBanner",
  127. id: f.server_id,
  128. },
  129. };
  130. } else if (f.tag === "emojis" && v === "emojis") {
  131. if (typeof f.object_id !== "string") {
  132. return null;
  133. }
  134. if (!objectLookup[f.object_id]) {
  135. objectLookup[f.object_id] = await mongo
  136. .db("revolt")
  137. .collection("emojis")
  138. .findOne({
  139. _id: f.object_id,
  140. });
  141. }
  142. if (!objectLookup[f.object_id]) {
  143. console.warn(i, "Emoji", f.object_id, "doesn't exist anymore!");
  144. return null;
  145. }
  146. return {
  147. uploaded_at: new Date(decodeTime(f.object_id)),
  148. uploader_id: objectLookup[f.object_id].creator_id,
  149. used_for: {
  150. type: "emoji",
  151. id: f.object_id,
  152. },
  153. };
  154. } else if (f.tag === "avatars" && v === "avatars") {
  155. if (typeof f.user_id !== "string") {
  156. return null;
  157. }
  158. if (!objectLookup[f.user_id]) {
  159. objectLookup[f.user_id] = await mongo
  160. .db("revolt")
  161. .collection("users")
  162. .findOne({
  163. _id: f.user_id,
  164. });
  165. }
  166. if (!objectLookup[f.user_id]) {
  167. console.warn(i, "User", f.user_id, "doesn't exist anymore!");
  168. return null;
  169. }
  170. if (objectLookup[f.user_id].avatar?._id !== f._id) {
  171. console.warn(
  172. i,
  173. "Attachment no longer in use.",
  174. f._id,
  175. "for",
  176. f.user_id,
  177. "current:",
  178. objectLookup[f.user_id].avatar?._id
  179. );
  180. return null;
  181. }
  182. return {
  183. uploaded_at: new Date(),
  184. uploader_id: f.user_id,
  185. used_for: {
  186. type: "userAvatar",
  187. id: f.user_id,
  188. },
  189. };
  190. } else if (f.tag === "backgrounds" && v === "backgrounds") {
  191. if (typeof f.user_id !== "string") {
  192. return null;
  193. }
  194. if (!objectLookup[f.user_id]) {
  195. objectLookup[f.user_id] = await mongo
  196. .db("revolt")
  197. .collection("users")
  198. .findOne({
  199. _id: f.user_id,
  200. });
  201. }
  202. if (!objectLookup[f.user_id]) {
  203. console.warn(i, "User", f.user_id, "doesn't exist anymore!");
  204. return null;
  205. }
  206. if (objectLookup[f.user_id].profile?.background?._id !== f._id) {
  207. console.warn(
  208. i,
  209. "Attachment no longer in use.",
  210. f._id,
  211. "for",
  212. f.user_id,
  213. "current:",
  214. objectLookup[f.user_id].profile?.background?._id
  215. );
  216. return null;
  217. }
  218. return {
  219. uploaded_at: new Date(),
  220. uploader_id: f.user_id,
  221. used_for: {
  222. type: "userProfileBackground",
  223. id: f.user_id,
  224. },
  225. };
  226. } else if (f.tag === "icons" && v === "icons") {
  227. if (typeof f.object_id !== "string") {
  228. return null;
  229. }
  230. // some bugged files at start
  231. // ... expensive to compute at worst case =(
  232. // so instead we can just disable it until everything is processed
  233. // then re-run on these!
  234. if (false) {
  235. objectLookup[f.object_id] = await mongo
  236. .db("revolt")
  237. .collection("users")
  238. .findOne({
  239. _id: f.object_id,
  240. });
  241. if (!objectLookup[f.object_id]) {
  242. console.warn(i, "No legacy match!");
  243. return null;
  244. }
  245. return {
  246. uploaded_at: new Date(),
  247. uploader_id: f.object_id,
  248. used_for: {
  249. type: "legacyGroupIcon",
  250. id: f.object_id,
  251. },
  252. };
  253. }
  254. if (!objectLookup[f.object_id]) {
  255. objectLookup[f.object_id] = await mongo
  256. .db("revolt")
  257. .collection("servers")
  258. .findOne({
  259. _id: f.object_id,
  260. });
  261. }
  262. if (
  263. !objectLookup[f.object_id] ||
  264. // heuristic for not server
  265. !objectLookup[f.object_id].channels
  266. ) {
  267. console.warn(i, "Server", f.object_id, "doesn't exist!");
  268. if (!objectLookup[f.object_id]) {
  269. objectLookup[f.object_id] = await mongo
  270. .db("revolt")
  271. .collection("channels")
  272. .findOne({
  273. _id: f.object_id,
  274. });
  275. }
  276. if (!objectLookup[f.object_id]) {
  277. console.warn(i, "Channel", f.object_id, "doesn't exist!");
  278. return null;
  279. }
  280. let server;
  281. const serverId = objectLookup[f.object_id].server;
  282. if (serverId) {
  283. server = objectLookup[serverId];
  284. if (!server) {
  285. server = await mongo.db("revolt").collection("servers").findOne({
  286. _id: serverId,
  287. });
  288. console.info(
  289. i,
  290. "Couldn't find matching server for channel " + f.object_id + "!"
  291. );
  292. if (!server) return null;
  293. objectLookup[serverId] = server;
  294. }
  295. }
  296. return {
  297. uploaded_at: new Date(),
  298. uploader_id: (server ?? objectLookup[f.object_id]).owner,
  299. used_for: {
  300. type: "channelIcon",
  301. id: f.object_id,
  302. },
  303. };
  304. }
  305. return {
  306. uploaded_at: new Date(),
  307. uploader_id: objectLookup[f.object_id].owner,
  308. used_for: {
  309. type: "serverIcon",
  310. id: f.object_id,
  311. },
  312. };
  313. } else {
  314. throw (
  315. "couldn't find uploader id for " +
  316. f._id +
  317. " expected " +
  318. v +
  319. " but got " +
  320. f.tag
  321. );
  322. }
  323. }
  324. const workerCount = 8;
  325. let workingOnHashes = [];
  326. for (const dir of dirs) {
  327. console.info(dir);
  328. // const RESUME = 869000 + 283000 + 772000;
  329. // UPLOAD FROM LOCAL FILE LISTING:
  330. // const RESUME = 0;
  331. // const files = (await readdir(dir)).slice(RESUME);
  332. // const total = files.length;
  333. // UPLOAD FROM DATABASE FILE LISTING:
  334. const files = await mongo
  335. .db("revolt")
  336. .collection("attachments")
  337. .find(
  338. {
  339. tag: dir,
  340. // don't upload delete files
  341. deleted: {
  342. $ne: true,
  343. },
  344. // don't upload already processed files
  345. hash: {
  346. $exists: false,
  347. },
  348. },
  349. {
  350. projection: { _id: 1 },
  351. }
  352. )
  353. .toArray()
  354. .then((arr) => arr.map((x) => x._id));
  355. const total = files.length;
  356. let i = 0;
  357. let skipsA = 0,
  358. skipsB = 0;
  359. await Promise.all(
  360. new Array(workerCount).fill(0).map(async (_) => {
  361. while (true) {
  362. const file = files.shift();
  363. if (!file) return;
  364. i++;
  365. console.info(i, files.length, file);
  366. // if (i < 869000) continue; // TODO
  367. // if (i > 3000) break;
  368. if (USE_CACHE) {
  369. if (processed_ids.has(file)) {
  370. console.info(i, "Skip, known file.");
  371. continue;
  372. }
  373. }
  374. const doc = await mongo
  375. .db("revolt")
  376. .collection("attachments")
  377. .findOne({
  378. _id: file,
  379. // don't upload delete files
  380. deleted: {
  381. $ne: true,
  382. },
  383. // don't upload already processed files
  384. hash: {
  385. $exists: false,
  386. },
  387. });
  388. if (!doc) {
  389. console.info(
  390. i,
  391. "Skipping as it does not exist in DB, is queued for deletion, or has already been processed!"
  392. );
  393. skipsA += 1;
  394. continue;
  395. }
  396. const metaUseInfo = await determineUploaderIdAndUse(doc, dir, i);
  397. if (!metaUseInfo) {
  398. if (USE_CACHE) {
  399. processed_ids.add(file);
  400. }
  401. console.info(i, "Skipping as it hasn't been attached to anything!");
  402. skipsB += 1;
  403. continue;
  404. }
  405. const start = +new Date();
  406. let buff;
  407. try {
  408. buff = await readFile(resolve(dir, file));
  409. } catch (err) {
  410. if (err.code === "ENOENT") {
  411. if (USE_CACHE) {
  412. processed_ids.add(file);
  413. }
  414. console.log(i, "File not found!");
  415. await mongo.db("revolt").collection("logs").insertOne({
  416. type: "missingFile",
  417. desc: "File doesn't exist!",
  418. file,
  419. });
  420. continue;
  421. } else {
  422. throw err;
  423. }
  424. }
  425. const hash = createHash("sha256").update(buff).digest("hex");
  426. while (workingOnHashes.includes(hash)) {
  427. console.log(
  428. "Waiting to avoid race condition... hash is already being processed..."
  429. );
  430. await new Promise((r) => setTimeout(r, 1000));
  431. }
  432. workingOnHashes.push(hash);
  433. // merge existing
  434. const existingHash = await mongo
  435. .db("revolt")
  436. .collection("attachment_hashes")
  437. .findOne({
  438. _id: hash,
  439. });
  440. if (existingHash) {
  441. console.info(i, "Hash already uploaded, merging!");
  442. await mongo
  443. .db("revolt")
  444. .collection("attachments")
  445. .updateOne(
  446. {
  447. _id: file,
  448. },
  449. {
  450. $set: {
  451. size: existingHash.size,
  452. hash,
  453. ...metaUseInfo,
  454. },
  455. }
  456. );
  457. await mongo.db("revolt").collection("logs").insertOne({
  458. type: "mergeHash",
  459. desc: "Merged an existing file!",
  460. hash: existingHash._id,
  461. size: existingHash.size,
  462. });
  463. workingOnHashes = workingOnHashes.filter((x) => x !== hash);
  464. continue;
  465. }
  466. // encrypt
  467. const { iv, data } = await encryptFile(buff);
  468. const end = +new Date();
  469. console.info(metaUseInfo); // + write hash
  470. console.info(
  471. file,
  472. hash,
  473. iv,
  474. `${end - start}ms`,
  475. buff.byteLength,
  476. "bytes"
  477. );
  478. let retry = true;
  479. while (retry) {
  480. try {
  481. const urlResp = await b2.getUploadUrl({
  482. bucketId: "---", // revolt-uploads
  483. });
  484. await b2.uploadFile({
  485. uploadUrl: urlResp.data.uploadUrl,
  486. uploadAuthToken: urlResp.data.authorizationToken,
  487. fileName: hash,
  488. data,
  489. onUploadProgress: (event) => console.info(event),
  490. });
  491. await mongo
  492. .db("revolt")
  493. .collection("attachment_hashes")
  494. .insertOne({
  495. _id: hash,
  496. processed_hash: hash,
  497. created_at: new Date(), // TODO on all
  498. bucket_id: "revolt-uploads",
  499. path: hash,
  500. iv: iv.toString("base64"),
  501. metadata: doc.metadata,
  502. content_type: doc.content_type,
  503. size: data.byteLength,
  504. });
  505. await mongo
  506. .db("revolt")
  507. .collection("attachments")
  508. .updateOne(
  509. {
  510. _id: file,
  511. },
  512. {
  513. $set: {
  514. size: data.byteLength,
  515. hash,
  516. ...metaUseInfo,
  517. },
  518. }
  519. );
  520. retry = false;
  521. } catch (err) {
  522. if (
  523. (err.isAxiosError &&
  524. (err.response?.status === 503 ||
  525. err.response?.status === 500)) ||
  526. (err?.code === "ENOTFOUND" && err?.syscall === "getaddrinfo") ||
  527. (err?.code === "ETIMEDOUT" && err?.syscall === "connect") ||
  528. (err?.code === "ECONNREFUSED" && err?.syscall === "connect")
  529. ) {
  530. console.error(i, err.response.status, "ERROR RETRYING");
  531. await mongo
  532. .db("revolt")
  533. .collection("logs")
  534. .insertOne({
  535. type: "upload503",
  536. desc:
  537. "Hit status " +
  538. (err?.code === "ETIMEDOUT" && err?.syscall === "connect"
  539. ? "Network issue (ETIMEDOUT connect)"
  540. : err?.code === "ECONNREFUSED" &&
  541. err?.syscall === "connect"
  542. ? "Network issue (ECONNREFUSED connect)"
  543. : err?.code === "ENOTFOUND" &&
  544. err?.syscall === "getaddrinfo"
  545. ? "DNS issue (ENOTFOUND getaddrinfo)"
  546. : err.response?.status) +
  547. ", trying a new URL!",
  548. hash,
  549. });
  550. await new Promise((r) => setTimeout(() => r(), 1500));
  551. } else {
  552. await dumpCache().catch(console.error);
  553. throw err;
  554. }
  555. }
  556. }
  557. console.info(i, "Successfully uploaded", file, "to S3!");
  558. console.info(
  559. "*** ➡️ Processed",
  560. i,
  561. "out of",
  562. total,
  563. "files",
  564. ((i / total) * 100).toFixed(2),
  565. "%"
  566. );
  567. workingOnHashes = workingOnHashes.filter((x) => x !== hash);
  568. }
  569. })
  570. );
  571. console.info("Skips (A):", skipsA, "(B):", skipsB);
  572. break;
  573. }
  574. await dumpCache().catch(console.error);
  575. process.exit(0);