Kobo.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. from Globals import Globals
  2. from KoboDrmRemover import KoboDrmRemover
  3. import requests
  4. from typing import Dict, Tuple
  5. import base64
  6. import html
  7. import os
  8. import re
  9. import urllib
  10. import uuid
  11. # It was not possible to enter the entire captcha response on MacOS.
  12. # Importing readline changes the implementation of input() and solves the issue.
  13. # See https://stackoverflow.com/q/65735885 and https://stackoverflow.com/q/7357007.
  14. import readline
  15. class KoboException( Exception ):
  16. pass
  17. # The hook's workflow is based on this:
  18. # https://github.com/requests/toolbelt/blob/master/requests_toolbelt/auth/http_proxy_digest.py
  19. def ReauthenticationHook( r, *args, **kwargs ):
  20. if r.status_code != requests.codes.unauthorized: # 401
  21. return
  22. Globals.Logger.debug( "Refreshing expired authentication token" )
  23. # Consume content and release the original connection to allow our new request to reuse the same one.
  24. r.content
  25. r.close()
  26. prep = r.request.copy()
  27. # Refresh the authentication token and use it.
  28. Globals.Kobo.RefreshAuthentication()
  29. headers = Kobo.GetHeaderWithAccessToken()
  30. prep.headers[ "Authorization" ] = headers[ "Authorization" ]
  31. # Don't retry to reauthenticate this request again.
  32. prep.deregister_hook( "response", ReauthenticationHook )
  33. # Resend the failed request.
  34. _r = r.connection.send( prep, **kwargs )
  35. _r.history.append( r )
  36. _r.request = prep
  37. return _r
  38. class SessionWithTimeOut( requests.Session ):
  39. def request( self, method, url, **kwargs ):
  40. if "timeout" not in kwargs:
  41. kwargs[ "timeout" ] = 30 # 30 seconds
  42. return super().request( method, url, **kwargs )
  43. class Kobo:
  44. Affiliate = "Kobo"
  45. ApplicationVersion = "8.11.24971"
  46. DefaultPlatformId = "00000000-0000-0000-0000-000000004000"
  47. DisplayProfile = "Android"
  48. def __init__( self ):
  49. # Use the user agent of the Kobo Android app, otherwise the login request hangs forever.
  50. headers = {
  51. "User-Agent": "Mozilla/5.0 (Linux; Android 6.0; Google Nexus 7 2013 Build/MRA58K; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/74.0.3729.186 Safari/537.36 KoboApp/8.40.2.29861 KoboPlatform Id/00000000-0000-0000-0000-000000004000 KoboAffiliate/Kobo KoboBuildFlavor/global",
  52. }
  53. self.InitializationSettings = {}
  54. self.Session = SessionWithTimeOut()
  55. self.Session.headers.update( headers )
  56. # This could be added to the session but then we would need to add { "Authorization": None } headers to all other
  57. # functions that doesn't need authorization.
  58. @staticmethod
  59. def GetHeaderWithAccessToken() -> dict:
  60. authorization = "Bearer " + Globals.Settings.AccessToken
  61. headers = { "Authorization": authorization }
  62. return headers
  63. # This could be added to the session too. See the comment at GetHeaderWithAccessToken.
  64. @staticmethod
  65. def __GetReauthenticationHook() -> dict:
  66. return { "response": ReauthenticationHook }
  67. # The initial device authentication request for a non-logged in user doesn't require a user key, and the returned
  68. # user key can't be used for anything.
  69. def AuthenticateDevice( self, userKey: str = "" ) -> None:
  70. Globals.Logger.debug( "Kobo.AuthenticateDevice" )
  71. if len( Globals.Settings.DeviceId ) == 0:
  72. Globals.Settings.DeviceId = str( uuid.uuid4() )
  73. Globals.Settings.AccessToken = ""
  74. Globals.Settings.RefreshToken = ""
  75. postData = {
  76. "AffiliateName": Kobo.Affiliate,
  77. "AppVersion": Kobo.ApplicationVersion,
  78. "ClientKey": base64.b64encode( Kobo.DefaultPlatformId.encode() ).decode(),
  79. "DeviceId": Globals.Settings.DeviceId,
  80. "PlatformId": Kobo.DefaultPlatformId
  81. }
  82. if len( userKey ) > 0:
  83. postData[ "UserKey" ] = userKey
  84. response = self.Session.post( "https://storeapi.kobo.com/v1/auth/device", json = postData )
  85. response.raise_for_status()
  86. jsonResponse = response.json()
  87. if jsonResponse[ "TokenType" ] != "Bearer":
  88. raise KoboException( "Device authentication returned with an unsupported token type: '%s'" % jsonResponse[ "TokenType" ] )
  89. Globals.Settings.AccessToken = jsonResponse[ "AccessToken" ]
  90. Globals.Settings.RefreshToken = jsonResponse[ "RefreshToken" ]
  91. if not Globals.Settings.AreAuthenticationSettingsSet():
  92. raise KoboException( "Authentication settings are not set after device authentication." )
  93. if len( userKey ) > 0:
  94. Globals.Settings.UserKey = jsonResponse[ "UserKey" ]
  95. Globals.Settings.Save()
  96. def RefreshAuthentication( self ) -> None:
  97. Globals.Logger.debug( "Kobo.RefreshAuthentication" )
  98. headers = Kobo.GetHeaderWithAccessToken()
  99. postData = {
  100. "AppVersion": Kobo.ApplicationVersion,
  101. "ClientKey": base64.b64encode( Kobo.DefaultPlatformId.encode() ).decode(),
  102. "PlatformId": Kobo.DefaultPlatformId,
  103. "RefreshToken": Globals.Settings.RefreshToken
  104. }
  105. # The reauthentication hook is intentionally not set.
  106. response = self.Session.post( "https://storeapi.kobo.com/v1/auth/refresh", json = postData, headers = headers )
  107. response.raise_for_status()
  108. jsonResponse = response.json()
  109. if jsonResponse[ "TokenType" ] != "Bearer":
  110. raise KoboException( "Authentication refresh returned with an unsupported token type: '%s'" % jsonResponse[ "TokenType" ] )
  111. Globals.Settings.AccessToken = jsonResponse[ "AccessToken" ]
  112. Globals.Settings.RefreshToken = jsonResponse[ "RefreshToken" ]
  113. if not Globals.Settings.AreAuthenticationSettingsSet():
  114. raise KoboException( "Authentication settings are not set after authentication refresh." )
  115. Globals.Settings.Save()
  116. def LoadInitializationSettings( self ) -> None:
  117. Globals.Logger.debug( "Kobo.LoadInitializationSettings" )
  118. headers = Kobo.GetHeaderWithAccessToken()
  119. hooks = Kobo.__GetReauthenticationHook()
  120. response = self.Session.get( "https://storeapi.kobo.com/v1/initialization", headers = headers, hooks = hooks )
  121. response.raise_for_status()
  122. jsonResponse = response.json()
  123. self.InitializationSettings = jsonResponse[ "Resources" ]
  124. def __GetExtraLoginParameters( self ) -> Tuple[ str, str, str ]:
  125. Globals.Logger.debug( "Kobo.__GetExtraLoginParameters" )
  126. signInUrl = self.InitializationSettings[ "sign_in_page" ]
  127. params = {
  128. "wsa": Kobo.Affiliate,
  129. "pwsav": Kobo.ApplicationVersion,
  130. "pwspid": Kobo.DefaultPlatformId,
  131. "pwsdid": Globals.Settings.DeviceId
  132. }
  133. response = self.Session.get( signInUrl, params = params )
  134. response.raise_for_status()
  135. htmlResponse = response.text
  136. # The link can be found in the response ('<a class="kobo-link partner-option kobo"') but the Android app does not use the entire path.
  137. # (The entire path looks like this: "/ww/en/signin/signin/kobo?workflowId=01234567-0123-0123-0123-0123456789ab".)
  138. parsed = urllib.parse.urlparse( signInUrl )
  139. koboSignInUrl = parsed._replace( query = None, path = "/ww/en/signin/signin" ).geturl()
  140. match = re.search( r"""/signin/kobo\?workflowId=([0-9a-f\-]+)""", htmlResponse )
  141. if match is None:
  142. raise KoboException( "Can't find the workflow ID. The page format might have changed." )
  143. workflowId = html.unescape( match.group( 1 ) )
  144. match = re.search( r"""<input name="__RequestVerificationToken" type="hidden" value="([^"]+)" />""", htmlResponse )
  145. if match is None:
  146. raise KoboException( "Can't find the request verification token in the login form. The page format might have changed." )
  147. requestVerificationToken = html.unescape( match.group( 1 ) )
  148. return koboSignInUrl, workflowId, requestVerificationToken
  149. def Login( self, email: str, password: str, captcha: str ) -> None:
  150. Globals.Logger.debug( "Kobo.Login" )
  151. signInUrl, workflowId, requestVerificationToken = self.__GetExtraLoginParameters()
  152. postData = {
  153. "LogInModel.WorkflowId": workflowId,
  154. "LogInModel.Provider": Kobo.Affiliate,
  155. "ReturnUrl": "",
  156. "__RequestVerificationToken": requestVerificationToken,
  157. "LogInModel.UserName": email,
  158. "LogInModel.Password": password,
  159. "g-recaptcha-response": captcha,
  160. "h-captcha-response": captcha
  161. }
  162. response = self.Session.post( signInUrl, data = postData )
  163. response.raise_for_status()
  164. htmlResponse = response.text
  165. match = re.search( r"'(kobo://UserAuthenticated\?[^']+)';", htmlResponse )
  166. if match is None:
  167. raise KoboException( "Authenticated user URL can't be found. The page format might have changed." )
  168. url = match.group( 1 )
  169. parsed = urllib.parse.urlparse( url )
  170. parsedQueries = urllib.parse.parse_qs( parsed.query )
  171. Globals.Settings.UserId = parsedQueries[ "userId" ][ 0 ] # We don't call Settings.Save here, AuthenticateDevice will do that if it succeeds.
  172. userKey = parsedQueries[ "userKey" ][ 0 ]
  173. self.AuthenticateDevice( userKey )
  174. def GetBookInfo( self, productId: str ) -> dict:
  175. Globals.Logger.debug( "Kobo.GetBookInfo" )
  176. url = self.InitializationSettings[ "book" ].replace( "{ProductId}", productId )
  177. headers = Kobo.GetHeaderWithAccessToken()
  178. hooks = Kobo.__GetReauthenticationHook()
  179. response = self.Session.get( url, headers = headers, hooks = hooks )
  180. response.raise_for_status()
  181. jsonResponse = response.json()
  182. return jsonResponse
  183. def __GetMyBookListPage( self, syncToken: str ) -> Tuple[ list, str ]:
  184. Globals.Logger.debug( "Kobo.__GetMyBookListPage" )
  185. url = self.InitializationSettings[ "library_sync" ]
  186. headers = Kobo.GetHeaderWithAccessToken()
  187. hooks = Kobo.__GetReauthenticationHook()
  188. if len( syncToken ) > 0:
  189. headers[ "x-kobo-synctoken" ] = syncToken
  190. response = Globals.Kobo.Session.get( url, headers = headers, hooks = hooks )
  191. response.raise_for_status()
  192. bookList = response.json()
  193. syncToken = ""
  194. syncResult = response.headers.get( "x-kobo-sync" )
  195. if syncResult == "continue":
  196. syncToken = response.headers.get( "x-kobo-synctoken", "" )
  197. return bookList, syncToken
  198. def GetMyBookList( self ) -> list:
  199. # The "library_sync" name and the synchronization tokens make it somewhat suspicious that we should use
  200. # "library_items" instead to get the My Books list, but "library_items" gives back less info (even with the
  201. # embed=ProductMetadata query parameter set).
  202. fullBookList = []
  203. syncToken = ""
  204. while True:
  205. bookList, syncToken = self.__GetMyBookListPage( syncToken )
  206. fullBookList += bookList
  207. if len( syncToken ) == 0:
  208. break
  209. return fullBookList
  210. def GetMyWishList( self ) -> list:
  211. Globals.Logger.debug( "Kobo.GetMyWishList" )
  212. items = []
  213. currentPageIndex = 0
  214. while True:
  215. url = self.InitializationSettings[ "user_wishlist" ]
  216. headers = Kobo.GetHeaderWithAccessToken()
  217. hooks = Kobo.__GetReauthenticationHook()
  218. params = {
  219. "PageIndex": currentPageIndex,
  220. "PageSize": 100, # 100 is the default if PageSize is not specified.
  221. }
  222. response = Globals.Kobo.Session.get( url, params = params, headers = headers, hooks = hooks )
  223. response.raise_for_status()
  224. wishList = response.json()
  225. items.extend( wishList[ "Items" ] )
  226. currentPageIndex += 1
  227. if currentPageIndex >= wishList[ "TotalPageCount" ]:
  228. break
  229. return items
  230. def __GetContentAccessBook( self, productId: str, displayProfile: str ) -> dict:
  231. Globals.Logger.debug( "Kobo.__GetContentAccessBook" )
  232. url = self.InitializationSettings[ "content_access_book" ].replace( "{ProductId}", productId )
  233. params = { "DisplayProfile": displayProfile }
  234. headers = Kobo.GetHeaderWithAccessToken()
  235. hooks = Kobo.__GetReauthenticationHook()
  236. response = self.Session.get( url, params = params, headers = headers, hooks = hooks )
  237. response.raise_for_status()
  238. jsonResponse = response.json()
  239. return jsonResponse
  240. @staticmethod
  241. def __GetContentKeys( contentAccessBookResponse: dict ) -> Dict[ str, str ]:
  242. jsonContentKeys = contentAccessBookResponse.get( "ContentKeys" )
  243. if jsonContentKeys is None:
  244. return {}
  245. contentKeys = {}
  246. for contentKey in jsonContentKeys:
  247. contentKeys[ contentKey[ "Name" ] ] = contentKey[ "Value" ]
  248. return contentKeys
  249. @staticmethod
  250. def __GetDownloadInfo( productId: str, contentAccessBookResponse: dict ) -> Tuple[ str, bool ]:
  251. jsonContentUrls = contentAccessBookResponse.get( "ContentUrls" )
  252. if jsonContentUrls is None:
  253. raise KoboException( "Download URL can't be found for product '%s'." % productId )
  254. if len( jsonContentUrls ) == 0:
  255. raise KoboException( "Download URL list is empty for product '%s'. If this is an archived book then it must be unarchived first on the Kobo website (https://www.kobo.com/help/en-US/article/1799/restoring-deleted-books-or-magazines)." % productId )
  256. for jsonContentUrl in jsonContentUrls:
  257. if ( jsonContentUrl[ "DRMType" ] == "KDRM" or jsonContentUrl[ "DRMType" ] == "SignedNoDrm" ) and \
  258. ( jsonContentUrl[ "UrlFormat" ] == "EPUB3" or jsonContentUrl[ "UrlFormat" ] == "KEPUB" ):
  259. hasDrm = jsonContentUrl[ "DRMType" ] == "KDRM"
  260. return jsonContentUrl[ "DownloadUrl" ], hasDrm
  261. message = "Download URL for supported formats can't be found for product '%s'.\n" % productId
  262. message += "Available formats:"
  263. for jsonContentUrl in jsonContentUrls:
  264. message += "\nDRMType: '%s', UrlFormat: '%s'" % ( jsonContentUrl[ "DRMType" ], jsonContentUrl[ "UrlFormat" ] )
  265. raise KoboException( message )
  266. def __DownloadToFile( self, url, outputPath: str ) -> None:
  267. Globals.Logger.debug( "Kobo.__DownloadToFile" )
  268. response = self.Session.get( url, stream = True )
  269. response.raise_for_status()
  270. with open( outputPath, "wb" ) as f:
  271. for chunk in response.iter_content( chunk_size = 1024 * 256 ):
  272. f.write( chunk )
  273. # Downloading archived books is not possible, the "content_access_book" API endpoint returns with empty ContentKeys
  274. # and ContentUrls for them.
  275. def Download( self, productId: str, displayProfile: str, outputPath: str ) -> None:
  276. Globals.Logger.debug( "Kobo.Download" )
  277. jsonResponse = self.__GetContentAccessBook( productId, displayProfile )
  278. contentKeys = Kobo.__GetContentKeys( jsonResponse )
  279. downloadUrl, hasDrm = Kobo.__GetDownloadInfo( productId, jsonResponse )
  280. temporaryOutputPath = outputPath + ".downloading"
  281. try:
  282. self.__DownloadToFile( downloadUrl, temporaryOutputPath )
  283. if hasDrm:
  284. drmRemover = KoboDrmRemover( Globals.Settings.DeviceId, Globals.Settings.UserId )
  285. drmRemover.RemoveDrm( temporaryOutputPath, outputPath, contentKeys )
  286. os.remove( temporaryOutputPath )
  287. else:
  288. os.rename( temporaryOutputPath, outputPath )
  289. except:
  290. if os.path.isfile( temporaryOutputPath ):
  291. os.remove( temporaryOutputPath )
  292. if os.path.isfile( outputPath ):
  293. os.remove( outputPath )
  294. raise