Kobo.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. from Globals import Globals
  2. from KoboDrmRemover import KoboDrmRemover
  3. import requests
  4. from typing import Dict, Tuple
  5. import base64
  6. import html
  7. import os
  8. import re
  9. import secrets
  10. import string
  11. import time
  12. import urllib
  13. # It was not possible to enter the entire captcha response on MacOS.
  14. # Importing readline changes the implementation of input() and solves the issue.
  15. # See https://stackoverflow.com/q/65735885 and https://stackoverflow.com/q/7357007.
  16. import readline
  17. class KoboException( Exception ):
  18. pass
  19. # The hook's workflow is based on this:
  20. # https://github.com/requests/toolbelt/blob/master/requests_toolbelt/auth/http_proxy_digest.py
  21. def ReauthenticationHook( r, *args, **kwargs ):
  22. if r.status_code != requests.codes.unauthorized: # 401
  23. return
  24. Globals.Logger.debug( "Refreshing expired authentication token" )
  25. # Consume content and release the original connection to allow our new request to reuse the same one.
  26. r.content
  27. r.close()
  28. prep = r.request.copy()
  29. # Refresh the authentication token and use it.
  30. Globals.Kobo.RefreshAuthentication()
  31. headers = Kobo.GetHeaderWithAccessToken()
  32. prep.headers[ "Authorization" ] = headers[ "Authorization" ]
  33. # Don't retry to reauthenticate this request again.
  34. prep.deregister_hook( "response", ReauthenticationHook )
  35. # Resend the failed request.
  36. _r = r.connection.send( prep, **kwargs )
  37. _r.history.append( r )
  38. _r.request = prep
  39. return _r
  40. class SessionWithTimeOut( requests.Session ):
  41. def request( self, method, url, **kwargs ):
  42. if "timeout" not in kwargs:
  43. kwargs[ "timeout" ] = 30 # 30 seconds
  44. return super().request( method, url, **kwargs )
  45. class Kobo:
  46. Affiliate = "Kobo"
  47. ApplicationVersion = "4.38.23171"
  48. DefaultPlatformId = "00000000-0000-0000-0000-000000000373"
  49. DisplayProfile = "Android"
  50. DeviceModel = "Kobo Aura ONE"
  51. DeviceOs = "3.0.35+"
  52. DeviceOsVersion = "NA"
  53. def __init__( self ):
  54. headers = {
  55. # Use the user agent of the Kobo e-readers.
  56. "User-Agent": "Mozilla/5.0 (Linux; U; Android 2.0; en-us;) AppleWebKit/538.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/538.1 (Kobo Touch 0373/4.38.23171)",
  57. }
  58. self.InitializationSettings = {}
  59. self.Session = SessionWithTimeOut()
  60. self.Session.headers.update( headers )
  61. # This could be added to the session but then we would need to add { "Authorization": None } headers to all other
  62. # functions that doesn't need authorization.
  63. @staticmethod
  64. def GetHeaderWithAccessToken() -> dict:
  65. authorization = "Bearer " + Globals.Settings.AccessToken
  66. headers = { "Authorization": authorization }
  67. return headers
  68. # This could be added to the session too. See the comment at GetHeaderWithAccessToken.
  69. @staticmethod
  70. def __GetReauthenticationHook() -> dict:
  71. return { "response": ReauthenticationHook }
  72. @staticmethod
  73. def __GenerateRandomHexDigitString( length: int ) -> str:
  74. id = "".join( secrets.choice( string.hexdigits ) for _ in range( length ) )
  75. return id.lower()
  76. # The initial device authentication request for a non-logged in user doesn't require a user key, and the returned
  77. # user key can't be used for anything.
  78. def AuthenticateDevice( self, userKey: str = "" ) -> None:
  79. Globals.Logger.debug( "Kobo.AuthenticateDevice" )
  80. if len( Globals.Settings.DeviceId ) == 0:
  81. Globals.Settings.DeviceId = Kobo.__GenerateRandomHexDigitString( 64 )
  82. Globals.Settings.SerialNumber = Kobo.__GenerateRandomHexDigitString( 32 )
  83. Globals.Settings.AccessToken = ""
  84. Globals.Settings.RefreshToken = ""
  85. postData = {
  86. "AffiliateName": Kobo.Affiliate,
  87. "AppVersion": Kobo.ApplicationVersion,
  88. "ClientKey": base64.b64encode( Kobo.DefaultPlatformId.encode() ).decode(),
  89. "DeviceId": Globals.Settings.DeviceId,
  90. "PlatformId": Kobo.DefaultPlatformId,
  91. "SerialNumber": Globals.Settings.SerialNumber,
  92. }
  93. if len( userKey ) > 0:
  94. postData[ "UserKey" ] = userKey
  95. response = self.Session.post( "https://storeapi.kobo.com/v1/auth/device", json = postData )
  96. response.raise_for_status()
  97. jsonResponse = response.json()
  98. if jsonResponse[ "TokenType" ] != "Bearer":
  99. raise KoboException( "Device authentication returned with an unsupported token type: '%s'" % jsonResponse[ "TokenType" ] )
  100. Globals.Settings.AccessToken = jsonResponse[ "AccessToken" ]
  101. Globals.Settings.RefreshToken = jsonResponse[ "RefreshToken" ]
  102. if not Globals.Settings.AreAuthenticationSettingsSet():
  103. raise KoboException( "Authentication settings are not set after device authentication." )
  104. if len( userKey ) > 0:
  105. Globals.Settings.UserKey = jsonResponse[ "UserKey" ]
  106. Globals.Settings.Save()
  107. def RefreshAuthentication( self ) -> None:
  108. Globals.Logger.debug( "Kobo.RefreshAuthentication" )
  109. headers = Kobo.GetHeaderWithAccessToken()
  110. postData = {
  111. "AppVersion": Kobo.ApplicationVersion,
  112. "ClientKey": base64.b64encode( Kobo.DefaultPlatformId.encode() ).decode(),
  113. "PlatformId": Kobo.DefaultPlatformId,
  114. "RefreshToken": Globals.Settings.RefreshToken
  115. }
  116. # The reauthentication hook is intentionally not set.
  117. response = self.Session.post( "https://storeapi.kobo.com/v1/auth/refresh", json = postData, headers = headers )
  118. response.raise_for_status()
  119. jsonResponse = response.json()
  120. if jsonResponse[ "TokenType" ] != "Bearer":
  121. raise KoboException( "Authentication refresh returned with an unsupported token type: '%s'" % jsonResponse[ "TokenType" ] )
  122. Globals.Settings.AccessToken = jsonResponse[ "AccessToken" ]
  123. Globals.Settings.RefreshToken = jsonResponse[ "RefreshToken" ]
  124. if not Globals.Settings.AreAuthenticationSettingsSet():
  125. raise KoboException( "Authentication settings are not set after authentication refresh." )
  126. Globals.Settings.Save()
  127. def LoadInitializationSettings( self ) -> None:
  128. Globals.Logger.debug( "Kobo.LoadInitializationSettings" )
  129. headers = Kobo.GetHeaderWithAccessToken()
  130. hooks = Kobo.__GetReauthenticationHook()
  131. response = self.Session.get( "https://storeapi.kobo.com/v1/initialization", headers = headers, hooks = hooks )
  132. response.raise_for_status()
  133. jsonResponse = response.json()
  134. self.InitializationSettings = jsonResponse[ "Resources" ]
  135. def WaitTillActivation( self, activationCheckUrl: str ) -> Tuple[ str, str ]:
  136. while True:
  137. print( "Waiting for you to finish the activation..." )
  138. time.sleep( 5 )
  139. response = self.Session.post( activationCheckUrl )
  140. response.raise_for_status()
  141. jsonResponse = None
  142. try:
  143. jsonResponse = response.json()
  144. except Exception:
  145. Globals.Logger.debug( f"Activation check's response:\n{response.text}" )
  146. raise KoboException( "Error checking the activation's status. The response is not JSON." )
  147. if jsonResponse[ "Status" ] == "Complete":
  148. # RedirectUrl looks like this:
  149. # kobo://UserAuthenticated?returnUrl=https%3A%2F%2Fwww.kobo.com%2Fww%2Fen%2F&userKey=...&userId=...&email=...
  150. redirectUrl = jsonResponse[ "RedirectUrl" ]
  151. parsed = urllib.parse.urlparse( redirectUrl )
  152. parsedQueries = urllib.parse.parse_qs( parsed.query )
  153. userId = parsedQueries[ "userId" ][ 0 ]
  154. userKey = parsedQueries[ "userKey" ][ 0 ]
  155. return userId, userKey
  156. def ActivateOnWeb( self ) -> Tuple[ str, str ]:
  157. print( "Initiating web-based activation" )
  158. params = {
  159. "pwspid": Kobo.DefaultPlatformId,
  160. "wsa": Kobo.Affiliate,
  161. "pwsdid": Globals.Settings.DeviceId,
  162. "pwsav": Kobo.ApplicationVersion,
  163. "pwsdm": Kobo.DefaultPlatformId, # In the Android app this is the device model but Nickel sends the platform ID...
  164. "pwspos": Kobo.DeviceOs,
  165. "pwspov": Kobo.DeviceOsVersion,
  166. }
  167. response = self.Session.get( "https://auth.kobobooks.com/ActivateOnWeb", params = params )
  168. response.raise_for_status()
  169. htmlResponse = response.text
  170. match = re.search( 'data-poll-endpoint="([^"]+)"', htmlResponse )
  171. if match is None:
  172. raise KoboException( "Can't find the activation poll endpoint in the response. The page format might have changed." )
  173. activationCheckUrl = "https://auth.kobobooks.com" + html.unescape( match.group( 1 ) )
  174. match = re.search( r"""qrcodegenerator/generate.+?%26code%3D(\d+)""", htmlResponse )
  175. if match is None:
  176. raise KoboException( "Can't find the activation code in the response. The page format might have changed." )
  177. activationCode = match.group( 1 )
  178. return activationCheckUrl, activationCode
  179. def Login( self ) -> None:
  180. Globals.Logger.debug( "Kobo.Login" )
  181. activationCheckUrl, activationCode = self.ActivateOnWeb()
  182. print( "" )
  183. print( "kobo-book-downloader uses the same web-based activation method to log in as the Kobo e-readers." )
  184. print( "You will have to open the link below in your browser and enter the code, then you might need to login too if kobo.com asks you to." )
  185. print( "kobo-book-downloader will wait now and periodically check for the activation to complete." )
  186. print( "" )
  187. print( f"Open https://www.kobo.com/activate and enter {activationCode}." )
  188. print( "" )
  189. userId, userKey = self.WaitTillActivation( activationCheckUrl )
  190. print( "" )
  191. # We don't call Settings.Save here, AuthenticateDevice will do that if it succeeds.
  192. Globals.Settings.UserId = userId
  193. self.AuthenticateDevice( userKey )
  194. def GetBookInfo( self, productId: str ) -> dict:
  195. Globals.Logger.debug( "Kobo.GetBookInfo" )
  196. url = self.InitializationSettings[ "book" ].replace( "{ProductId}", productId )
  197. headers = Kobo.GetHeaderWithAccessToken()
  198. hooks = Kobo.__GetReauthenticationHook()
  199. response = self.Session.get( url, headers = headers, hooks = hooks )
  200. response.raise_for_status()
  201. jsonResponse = response.json()
  202. return jsonResponse
  203. def __GetMyBookListPage( self, syncToken: str ) -> Tuple[ list, str ]:
  204. Globals.Logger.debug( "Kobo.__GetMyBookListPage" )
  205. url = self.InitializationSettings[ "library_sync" ]
  206. headers = Kobo.GetHeaderWithAccessToken()
  207. hooks = Kobo.__GetReauthenticationHook()
  208. if len( syncToken ) > 0:
  209. headers[ "x-kobo-synctoken" ] = syncToken
  210. response = Globals.Kobo.Session.get( url, headers = headers, hooks = hooks )
  211. response.raise_for_status()
  212. bookList = response.json()
  213. syncToken = ""
  214. syncResult = response.headers.get( "x-kobo-sync" )
  215. if syncResult == "continue":
  216. syncToken = response.headers.get( "x-kobo-synctoken", "" )
  217. return bookList, syncToken
  218. def GetMyBookList( self ) -> list:
  219. # The "library_sync" name and the synchronization tokens make it somewhat suspicious that we should use
  220. # "library_items" instead to get the My Books list, but "library_items" gives back less info (even with the
  221. # embed=ProductMetadata query parameter set).
  222. fullBookList = []
  223. syncToken = ""
  224. while True:
  225. bookList, syncToken = self.__GetMyBookListPage( syncToken )
  226. fullBookList += bookList
  227. if len( syncToken ) == 0:
  228. break
  229. return fullBookList
  230. def GetMyWishList( self ) -> list:
  231. Globals.Logger.debug( "Kobo.GetMyWishList" )
  232. items = []
  233. currentPageIndex = 0
  234. while True:
  235. url = self.InitializationSettings[ "user_wishlist" ]
  236. headers = Kobo.GetHeaderWithAccessToken()
  237. hooks = Kobo.__GetReauthenticationHook()
  238. params = {
  239. "PageIndex": currentPageIndex,
  240. "PageSize": 100, # 100 is the default if PageSize is not specified.
  241. }
  242. response = Globals.Kobo.Session.get( url, params = params, headers = headers, hooks = hooks )
  243. response.raise_for_status()
  244. wishList = response.json()
  245. items.extend( wishList[ "Items" ] )
  246. currentPageIndex += 1
  247. if currentPageIndex >= wishList[ "TotalPageCount" ]:
  248. break
  249. return items
  250. def __GetContentAccessBook( self, productId: str, displayProfile: str ) -> dict:
  251. Globals.Logger.debug( "Kobo.__GetContentAccessBook" )
  252. url = self.InitializationSettings[ "content_access_book" ].replace( "{ProductId}", productId )
  253. params = { "DisplayProfile": displayProfile }
  254. headers = Kobo.GetHeaderWithAccessToken()
  255. hooks = Kobo.__GetReauthenticationHook()
  256. response = self.Session.get( url, params = params, headers = headers, hooks = hooks )
  257. response.raise_for_status()
  258. jsonResponse = response.json()
  259. return jsonResponse
  260. @staticmethod
  261. def __GetContentKeys( contentAccessBookResponse: dict ) -> Dict[ str, str ]:
  262. jsonContentKeys = contentAccessBookResponse.get( "ContentKeys" )
  263. if jsonContentKeys is None:
  264. return {}
  265. contentKeys = {}
  266. for contentKey in jsonContentKeys:
  267. contentKeys[ contentKey[ "Name" ] ] = contentKey[ "Value" ]
  268. return contentKeys
  269. @staticmethod
  270. def __GetDownloadInfo( productId: str, contentAccessBookResponse: dict ) -> Tuple[ str, bool ]:
  271. jsonContentUrls = contentAccessBookResponse.get( "ContentUrls" )
  272. if jsonContentUrls is None:
  273. raise KoboException( "Download URL can't be found for product '%s'." % productId )
  274. if len( jsonContentUrls ) == 0:
  275. raise KoboException( "Download URL list is empty for product '%s'. If this is an archived book then it must be unarchived first on the Kobo website (https://www.kobo.com/help/en-US/article/1799/restoring-deleted-books-or-magazines)." % productId )
  276. for jsonContentUrl in jsonContentUrls:
  277. if ( jsonContentUrl[ "DRMType" ] == "KDRM" or jsonContentUrl[ "DRMType" ] == "SignedNoDrm" ) and \
  278. ( jsonContentUrl[ "UrlFormat" ] == "EPUB3" or jsonContentUrl[ "UrlFormat" ] == "KEPUB" ):
  279. # Remove the mysterious "b" query parameter that causes forbidden downloads.
  280. url = jsonContentUrl[ "DownloadUrl" ]
  281. parsed = urllib.parse.urlparse( url )
  282. parsedQueries = urllib.parse.parse_qs( parsed.query )
  283. parsedQueries.pop( "b", None )
  284. url = parsed._replace( query = urllib.parse.urlencode( parsedQueries, doseq = True ) ).geturl()
  285. hasDrm = jsonContentUrl[ "DRMType" ] == "KDRM"
  286. return url, hasDrm
  287. message = "Download URL for supported formats can't be found for product '%s'.\n" % productId
  288. message += "Available formats:"
  289. for jsonContentUrl in jsonContentUrls:
  290. message += "\nDRMType: '%s', UrlFormat: '%s'" % ( jsonContentUrl[ "DRMType" ], jsonContentUrl[ "UrlFormat" ] )
  291. raise KoboException( message )
  292. def __DownloadToFile( self, url, outputPath: str ) -> None:
  293. Globals.Logger.debug( "Kobo.__DownloadToFile" )
  294. response = self.Session.get( url, stream = True )
  295. response.raise_for_status()
  296. with open( outputPath, "wb" ) as f:
  297. for chunk in response.iter_content( chunk_size = 1024 * 256 ):
  298. f.write( chunk )
  299. # Downloading archived books is not possible, the "content_access_book" API endpoint returns with empty ContentKeys
  300. # and ContentUrls for them.
  301. def Download( self, productId: str, displayProfile: str, outputPath: str ) -> None:
  302. Globals.Logger.debug( "Kobo.Download" )
  303. jsonResponse = self.__GetContentAccessBook( productId, displayProfile )
  304. contentKeys = Kobo.__GetContentKeys( jsonResponse )
  305. downloadUrl, hasDrm = Kobo.__GetDownloadInfo( productId, jsonResponse )
  306. temporaryOutputPath = outputPath + ".downloading"
  307. try:
  308. self.__DownloadToFile( downloadUrl, temporaryOutputPath )
  309. if hasDrm:
  310. drmRemover = KoboDrmRemover( Globals.Settings.DeviceId, Globals.Settings.UserId )
  311. drmRemover.RemoveDrm( temporaryOutputPath, outputPath, contentKeys )
  312. os.remove( temporaryOutputPath )
  313. else:
  314. os.rename( temporaryOutputPath, outputPath )
  315. except:
  316. if os.path.isfile( temporaryOutputPath ):
  317. os.remove( temporaryOutputPath )
  318. if os.path.isfile( outputPath ):
  319. os.remove( outputPath )
  320. raise