Kobo.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. from Globals import Globals
  2. from KoboDrmRemover import KoboDrmRemover
  3. import requests
  4. from typing import Dict, Tuple
  5. import base64
  6. import html
  7. import os
  8. import re
  9. import urllib
  10. import uuid
  11. # It was not possible to enter the entire captcha response on MacOS.
  12. # Importing readline changes the implementation of input() and solves the issue.
  13. # See https://stackoverflow.com/q/65735885 and https://stackoverflow.com/q/7357007.
  14. import readline
  15. class KoboException( Exception ):
  16. pass
  17. # The hook's workflow is based on this:
  18. # https://github.com/requests/toolbelt/blob/master/requests_toolbelt/auth/http_proxy_digest.py
  19. def ReauthenticationHook( r, *args, **kwargs ):
  20. if r.status_code != requests.codes.unauthorized: # 401
  21. return
  22. Globals.Logger.debug( "Refreshing expired authentication token" )
  23. # Consume content and release the original connection to allow our new request to reuse the same one.
  24. r.content
  25. r.close()
  26. prep = r.request.copy()
  27. # Refresh the authentication token and use it.
  28. Globals.Kobo.RefreshAuthentication()
  29. headers = Kobo.GetHeaderWithAccessToken()
  30. prep.headers[ "Authorization" ] = headers[ "Authorization" ]
  31. # Don't retry to reauthenticate this request again.
  32. prep.deregister_hook( "response", ReauthenticationHook )
  33. # Resend the failed request.
  34. _r = r.connection.send( prep, **kwargs )
  35. _r.history.append( r )
  36. _r.request = prep
  37. return _r
  38. class SessionWithTimeOut( requests.Session ):
  39. def request( self, method, url, **kwargs ):
  40. if "timeout" not in kwargs:
  41. kwargs[ "timeout" ] = 30 # 30 seconds
  42. return super().request( method, url, **kwargs )
  43. class Kobo:
  44. Affiliate = "Kobo"
  45. ApplicationVersion = "10.1.2.39807"
  46. DefaultPlatformId = "00000000-0000-0000-0000-000000004000"
  47. DisplayProfile = "Android"
  48. CarrierName = "310270"
  49. DeviceModel = "Pixel"
  50. DeviceOsVersion = "33"
  51. def __init__( self ):
  52. headers = {
  53. # Use the user agent of the Kobo Android app, otherwise the login request hangs forever.
  54. "User-Agent": "Mozilla/5.0 (Linux; Android 13; Pixel Build/TQ2B.230505.005.A1; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/101.0.4951.61 Safari/537.36 KoboApp/10.1.2.39807 KoboPlatform Id/00000000-0000-0000-0000-000000004000 KoboAffiliate/Kobo KoboBuildFlavor/global",
  55. "x-kobo-affiliatename": Kobo.Affiliate,
  56. "x-kobo-appversion": Kobo.ApplicationVersion,
  57. "x-kobo-platformid": Kobo.DefaultPlatformId,
  58. "x-kobo-carriername": Kobo.CarrierName,
  59. "x-kobo-devicemodel": Kobo.DeviceModel,
  60. "x-kobo-deviceos": "Android",
  61. "x-kobo-deviceosversion": Kobo.DeviceOsVersion,
  62. "X-Requested-With": "com.kobobooks.android",
  63. }
  64. self.InitializationSettings = {}
  65. self.Session = SessionWithTimeOut()
  66. self.Session.headers.update( headers )
  67. # This could be added to the session but then we would need to add { "Authorization": None } headers to all other
  68. # functions that doesn't need authorization.
  69. @staticmethod
  70. def GetHeaderWithAccessToken() -> dict:
  71. authorization = "Bearer " + Globals.Settings.AccessToken
  72. headers = { "Authorization": authorization }
  73. return headers
  74. # This could be added to the session too. See the comment at GetHeaderWithAccessToken.
  75. @staticmethod
  76. def __GetReauthenticationHook() -> dict:
  77. return { "response": ReauthenticationHook }
  78. # The initial device authentication request for a non-logged in user doesn't require a user key, and the returned
  79. # user key can't be used for anything.
  80. def AuthenticateDevice( self, userKey: str = "" ) -> None:
  81. Globals.Logger.debug( "Kobo.AuthenticateDevice" )
  82. if len( Globals.Settings.DeviceId ) == 0:
  83. Globals.Settings.DeviceId = str( uuid.uuid4() )
  84. Globals.Settings.AccessToken = ""
  85. Globals.Settings.RefreshToken = ""
  86. postData = {
  87. "AffiliateName": Kobo.Affiliate,
  88. "AppVersion": Kobo.ApplicationVersion,
  89. "ClientKey": base64.b64encode( Kobo.DefaultPlatformId.encode() ).decode(),
  90. "DeviceId": Globals.Settings.DeviceId,
  91. "PlatformId": Kobo.DefaultPlatformId
  92. }
  93. if len( userKey ) > 0:
  94. postData[ "UserKey" ] = userKey
  95. response = self.Session.post( "https://storeapi.kobo.com/v1/auth/device", json = postData )
  96. response.raise_for_status()
  97. jsonResponse = response.json()
  98. if jsonResponse[ "TokenType" ] != "Bearer":
  99. raise KoboException( "Device authentication returned with an unsupported token type: '%s'" % jsonResponse[ "TokenType" ] )
  100. Globals.Settings.AccessToken = jsonResponse[ "AccessToken" ]
  101. Globals.Settings.RefreshToken = jsonResponse[ "RefreshToken" ]
  102. if not Globals.Settings.AreAuthenticationSettingsSet():
  103. raise KoboException( "Authentication settings are not set after device authentication." )
  104. if len( userKey ) > 0:
  105. Globals.Settings.UserKey = jsonResponse[ "UserKey" ]
  106. Globals.Settings.Save()
  107. def RefreshAuthentication( self ) -> None:
  108. Globals.Logger.debug( "Kobo.RefreshAuthentication" )
  109. headers = Kobo.GetHeaderWithAccessToken()
  110. postData = {
  111. "AppVersion": Kobo.ApplicationVersion,
  112. "ClientKey": base64.b64encode( Kobo.DefaultPlatformId.encode() ).decode(),
  113. "PlatformId": Kobo.DefaultPlatformId,
  114. "RefreshToken": Globals.Settings.RefreshToken
  115. }
  116. # The reauthentication hook is intentionally not set.
  117. response = self.Session.post( "https://storeapi.kobo.com/v1/auth/refresh", json = postData, headers = headers )
  118. response.raise_for_status()
  119. jsonResponse = response.json()
  120. if jsonResponse[ "TokenType" ] != "Bearer":
  121. raise KoboException( "Authentication refresh returned with an unsupported token type: '%s'" % jsonResponse[ "TokenType" ] )
  122. Globals.Settings.AccessToken = jsonResponse[ "AccessToken" ]
  123. Globals.Settings.RefreshToken = jsonResponse[ "RefreshToken" ]
  124. if not Globals.Settings.AreAuthenticationSettingsSet():
  125. raise KoboException( "Authentication settings are not set after authentication refresh." )
  126. Globals.Settings.Save()
  127. def LoadInitializationSettings( self ) -> None:
  128. Globals.Logger.debug( "Kobo.LoadInitializationSettings" )
  129. headers = Kobo.GetHeaderWithAccessToken()
  130. hooks = Kobo.__GetReauthenticationHook()
  131. response = self.Session.get( "https://storeapi.kobo.com/v1/initialization", headers = headers, hooks = hooks )
  132. response.raise_for_status()
  133. jsonResponse = response.json()
  134. self.InitializationSettings = jsonResponse[ "Resources" ]
  135. def __GetExtraLoginParameters( self ) -> Tuple[ str, str, str ]:
  136. Globals.Logger.debug( "Kobo.__GetExtraLoginParameters" )
  137. signInUrl = self.InitializationSettings[ "sign_in_page" ]
  138. params = {
  139. "wsa": Kobo.Affiliate,
  140. "pwsav": Kobo.ApplicationVersion,
  141. "pwspid": Kobo.DefaultPlatformId,
  142. "pwsdid": Globals.Settings.DeviceId,
  143. "wscfv": "1.5",
  144. "wscf": "kepub",
  145. "wsmc": Kobo.CarrierName,
  146. "pwspov": Kobo.DeviceOsVersion,
  147. "pwspt": "Mobile",
  148. "pwsdm": Kobo.DeviceModel,
  149. }
  150. response = self.Session.get( signInUrl, params = params )
  151. response.raise_for_status()
  152. htmlResponse = response.text
  153. # The link can be found in the response ('<a class="kobo-link partner-option kobo"') but the Android app does not use the entire path.
  154. # (The entire path looks like this: "/ww/en/signin/signin/kobo?workflowId=01234567-0123-0123-0123-0123456789ab".)
  155. parsed = urllib.parse.urlparse( signInUrl )
  156. koboSignInUrl = parsed._replace( query = None, path = "/ww/en/signin/signin" ).geturl()
  157. match = re.search( r"""/signin/kobo\?workflowId=([0-9a-f\-]+)""", htmlResponse )
  158. if match is None:
  159. raise KoboException( "Can't find the workflow ID. The page format might have changed." )
  160. workflowId = html.unescape( match.group( 1 ) )
  161. match = re.search( r"""<input name="__RequestVerificationToken" type="hidden" value="([^"]+)" />""", htmlResponse )
  162. if match is None:
  163. raise KoboException( "Can't find the request verification token in the login form. The page format might have changed." )
  164. requestVerificationToken = html.unescape( match.group( 1 ) )
  165. return koboSignInUrl, workflowId, requestVerificationToken
  166. def Login( self, email: str, password: str, captcha: str ) -> None:
  167. Globals.Logger.debug( "Kobo.Login" )
  168. signInUrl, workflowId, requestVerificationToken = self.__GetExtraLoginParameters()
  169. postData = {
  170. "LogInModel.WorkflowId": workflowId,
  171. "LogInModel.Provider": Kobo.Affiliate,
  172. "ReturnUrl": "",
  173. "__RequestVerificationToken": requestVerificationToken,
  174. "LogInModel.UserName": email,
  175. "LogInModel.Password": password,
  176. "g-recaptcha-response": captcha,
  177. "h-captcha-response": captcha
  178. }
  179. response = self.Session.post( signInUrl, data = postData )
  180. response.raise_for_status()
  181. htmlResponse = response.text
  182. match = re.search( r"'(kobo://UserAuthenticated\?[^']+)';", htmlResponse )
  183. if match is None:
  184. raise KoboException( "Authenticated user URL can't be found. The page format might have changed." )
  185. url = match.group( 1 )
  186. parsed = urllib.parse.urlparse( url )
  187. parsedQueries = urllib.parse.parse_qs( parsed.query )
  188. Globals.Settings.UserId = parsedQueries[ "userId" ][ 0 ] # We don't call Settings.Save here, AuthenticateDevice will do that if it succeeds.
  189. userKey = parsedQueries[ "userKey" ][ 0 ]
  190. self.AuthenticateDevice( userKey )
  191. def GetBookInfo( self, productId: str ) -> dict:
  192. Globals.Logger.debug( "Kobo.GetBookInfo" )
  193. url = self.InitializationSettings[ "book" ].replace( "{ProductId}", productId )
  194. headers = Kobo.GetHeaderWithAccessToken()
  195. hooks = Kobo.__GetReauthenticationHook()
  196. response = self.Session.get( url, headers = headers, hooks = hooks )
  197. response.raise_for_status()
  198. jsonResponse = response.json()
  199. return jsonResponse
  200. def __GetMyBookListPage( self, syncToken: str ) -> Tuple[ list, str ]:
  201. Globals.Logger.debug( "Kobo.__GetMyBookListPage" )
  202. url = self.InitializationSettings[ "library_sync" ]
  203. headers = Kobo.GetHeaderWithAccessToken()
  204. hooks = Kobo.__GetReauthenticationHook()
  205. if len( syncToken ) > 0:
  206. headers[ "x-kobo-synctoken" ] = syncToken
  207. response = Globals.Kobo.Session.get( url, headers = headers, hooks = hooks )
  208. response.raise_for_status()
  209. bookList = response.json()
  210. syncToken = ""
  211. syncResult = response.headers.get( "x-kobo-sync" )
  212. if syncResult == "continue":
  213. syncToken = response.headers.get( "x-kobo-synctoken", "" )
  214. return bookList, syncToken
  215. def GetMyBookList( self ) -> list:
  216. # The "library_sync" name and the synchronization tokens make it somewhat suspicious that we should use
  217. # "library_items" instead to get the My Books list, but "library_items" gives back less info (even with the
  218. # embed=ProductMetadata query parameter set).
  219. fullBookList = []
  220. syncToken = ""
  221. while True:
  222. bookList, syncToken = self.__GetMyBookListPage( syncToken )
  223. fullBookList += bookList
  224. if len( syncToken ) == 0:
  225. break
  226. return fullBookList
  227. def GetMyWishList( self ) -> list:
  228. Globals.Logger.debug( "Kobo.GetMyWishList" )
  229. items = []
  230. currentPageIndex = 0
  231. while True:
  232. url = self.InitializationSettings[ "user_wishlist" ]
  233. headers = Kobo.GetHeaderWithAccessToken()
  234. hooks = Kobo.__GetReauthenticationHook()
  235. params = {
  236. "PageIndex": currentPageIndex,
  237. "PageSize": 100, # 100 is the default if PageSize is not specified.
  238. }
  239. response = Globals.Kobo.Session.get( url, params = params, headers = headers, hooks = hooks )
  240. response.raise_for_status()
  241. wishList = response.json()
  242. items.extend( wishList[ "Items" ] )
  243. currentPageIndex += 1
  244. if currentPageIndex >= wishList[ "TotalPageCount" ]:
  245. break
  246. return items
  247. def __GetContentAccessBook( self, productId: str, displayProfile: str ) -> dict:
  248. Globals.Logger.debug( "Kobo.__GetContentAccessBook" )
  249. url = self.InitializationSettings[ "content_access_book" ].replace( "{ProductId}", productId )
  250. params = { "DisplayProfile": displayProfile }
  251. headers = Kobo.GetHeaderWithAccessToken()
  252. hooks = Kobo.__GetReauthenticationHook()
  253. response = self.Session.get( url, params = params, headers = headers, hooks = hooks )
  254. response.raise_for_status()
  255. jsonResponse = response.json()
  256. return jsonResponse
  257. @staticmethod
  258. def __GetContentKeys( contentAccessBookResponse: dict ) -> Dict[ str, str ]:
  259. jsonContentKeys = contentAccessBookResponse.get( "ContentKeys" )
  260. if jsonContentKeys is None:
  261. return {}
  262. contentKeys = {}
  263. for contentKey in jsonContentKeys:
  264. contentKeys[ contentKey[ "Name" ] ] = contentKey[ "Value" ]
  265. return contentKeys
  266. @staticmethod
  267. def __GetDownloadInfo( productId: str, contentAccessBookResponse: dict ) -> Tuple[ str, bool ]:
  268. jsonContentUrls = contentAccessBookResponse.get( "ContentUrls" )
  269. if jsonContentUrls is None:
  270. raise KoboException( "Download URL can't be found for product '%s'." % productId )
  271. if len( jsonContentUrls ) == 0:
  272. raise KoboException( "Download URL list is empty for product '%s'. If this is an archived book then it must be unarchived first on the Kobo website (https://www.kobo.com/help/en-US/article/1799/restoring-deleted-books-or-magazines)." % productId )
  273. for jsonContentUrl in jsonContentUrls:
  274. if ( jsonContentUrl[ "DRMType" ] == "KDRM" or jsonContentUrl[ "DRMType" ] == "SignedNoDrm" ) and \
  275. ( jsonContentUrl[ "UrlFormat" ] == "EPUB3" or jsonContentUrl[ "UrlFormat" ] == "KEPUB" ):
  276. # Remove the mysterious "b" query parameter that causes forbidden downloads.
  277. url = jsonContentUrl[ "DownloadUrl" ]
  278. parsed = urllib.parse.urlparse( url )
  279. parsedQueries = urllib.parse.parse_qs( parsed.query )
  280. parsedQueries.pop( "b", None )
  281. url = parsed._replace( query = urllib.parse.urlencode( parsedQueries, doseq = True ) ).geturl()
  282. hasDrm = jsonContentUrl[ "DRMType" ] == "KDRM"
  283. return url, hasDrm
  284. message = "Download URL for supported formats can't be found for product '%s'.\n" % productId
  285. message += "Available formats:"
  286. for jsonContentUrl in jsonContentUrls:
  287. message += "\nDRMType: '%s', UrlFormat: '%s'" % ( jsonContentUrl[ "DRMType" ], jsonContentUrl[ "UrlFormat" ] )
  288. raise KoboException( message )
  289. def __DownloadToFile( self, url, outputPath: str ) -> None:
  290. Globals.Logger.debug( "Kobo.__DownloadToFile" )
  291. response = self.Session.get( url, stream = True )
  292. response.raise_for_status()
  293. with open( outputPath, "wb" ) as f:
  294. for chunk in response.iter_content( chunk_size = 1024 * 256 ):
  295. f.write( chunk )
  296. # Downloading archived books is not possible, the "content_access_book" API endpoint returns with empty ContentKeys
  297. # and ContentUrls for them.
  298. def Download( self, productId: str, displayProfile: str, outputPath: str ) -> None:
  299. Globals.Logger.debug( "Kobo.Download" )
  300. jsonResponse = self.__GetContentAccessBook( productId, displayProfile )
  301. contentKeys = Kobo.__GetContentKeys( jsonResponse )
  302. downloadUrl, hasDrm = Kobo.__GetDownloadInfo( productId, jsonResponse )
  303. temporaryOutputPath = outputPath + ".downloading"
  304. try:
  305. self.__DownloadToFile( downloadUrl, temporaryOutputPath )
  306. if hasDrm:
  307. drmRemover = KoboDrmRemover( Globals.Settings.DeviceId, Globals.Settings.UserId )
  308. drmRemover.RemoveDrm( temporaryOutputPath, outputPath, contentKeys )
  309. os.remove( temporaryOutputPath )
  310. else:
  311. os.rename( temporaryOutputPath, outputPath )
  312. except:
  313. if os.path.isfile( temporaryOutputPath ):
  314. os.remove( temporaryOutputPath )
  315. if os.path.isfile( outputPath ):
  316. os.remove( outputPath )
  317. raise