Kobo.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. from Globals import Globals
  2. from KoboDrmRemover import KoboDrmRemover
  3. import requests
  4. from typing import Dict, Tuple
  5. import base64
  6. import html
  7. import os
  8. import re
  9. import urllib
  10. import uuid
  11. class KoboException( Exception ):
  12. pass
  13. # The hook's workflow is based on this:
  14. # https://github.com/requests/toolbelt/blob/master/requests_toolbelt/auth/http_proxy_digest.py
  15. def ReauthenticationHook( r, *args, **kwargs ):
  16. if r.status_code != requests.codes.unauthorized: # 401
  17. return
  18. Globals.Logger.debug( "Refreshing expired authentication token" )
  19. # Consume content and release the original connection to allow our new request to reuse the same one.
  20. r.content
  21. r.close()
  22. prep = r.request.copy()
  23. # Refresh the authentication token and use it.
  24. Globals.Kobo.RefreshAuthentication()
  25. headers = Kobo.GetHeaderWithAccessToken()
  26. prep.headers[ "Authorization" ] = headers[ "Authorization" ]
  27. # Don't retry to reauthenticate this request again.
  28. prep.deregister_hook( "response", ReauthenticationHook )
  29. # Resend the failed request.
  30. _r = r.connection.send( prep, **kwargs )
  31. _r.history.append( r )
  32. _r.request = prep
  33. return _r
  34. class SessionWithTimeOut( requests.Session ):
  35. def request( self, method, url, **kwargs ):
  36. if "timeout" not in kwargs:
  37. kwargs[ "timeout" ] = 30 # 30 seconds
  38. return super().request( method, url, **kwargs )
  39. class Kobo:
  40. Affiliate = "Kobo"
  41. ApplicationVersion = "8.11.24971"
  42. DefaultPlatformId = "00000000-0000-0000-0000-000000004000"
  43. DisplayProfile = "Android"
  44. def __init__( self ):
  45. # Use the user agent of the Kobo Android app, otherwise the login request hangs forever.
  46. headers = {
  47. "User-Agent": "Mozilla/5.0 (Linux; Android 6.0; Google Nexus 7 2013 Build/MRA58K; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/74.0.3729.186 Safari/537.36 KoboApp/8.40.2.29861 KoboPlatform Id/00000000-0000-0000-0000-000000004000 KoboAffiliate/Kobo KoboBuildFlavor/global",
  48. }
  49. self.InitializationSettings = {}
  50. self.Session = SessionWithTimeOut()
  51. self.Session.headers.update( headers )
  52. # This could be added to the session but then we would need to add { "Authorization": None } headers to all other
  53. # functions that doesn't need authorization.
  54. @staticmethod
  55. def GetHeaderWithAccessToken() -> dict:
  56. authorization = "Bearer " + Globals.Settings.AccessToken
  57. headers = { "Authorization": authorization }
  58. return headers
  59. # This could be added to the session too. See the comment at GetHeaderWithAccessToken.
  60. @staticmethod
  61. def __GetReauthenticationHook() -> dict:
  62. return { "response": ReauthenticationHook }
  63. # The initial device authentication request for a non-logged in user doesn't require a user key, and the returned
  64. # user key can't be used for anything.
  65. def AuthenticateDevice( self, userKey: str = "" ) -> None:
  66. Globals.Logger.debug( "Kobo.AuthenticateDevice" )
  67. if len( Globals.Settings.DeviceId ) == 0:
  68. Globals.Settings.DeviceId = str( uuid.uuid4() )
  69. Globals.Settings.AccessToken = ""
  70. Globals.Settings.RefreshToken = ""
  71. postData = {
  72. "AffiliateName": Kobo.Affiliate,
  73. "AppVersion": Kobo.ApplicationVersion,
  74. "ClientKey": base64.b64encode( Kobo.DefaultPlatformId.encode() ).decode(),
  75. "DeviceId": Globals.Settings.DeviceId,
  76. "PlatformId": Kobo.DefaultPlatformId
  77. }
  78. if len( userKey ) > 0:
  79. postData[ "UserKey" ] = userKey
  80. response = self.Session.post( "https://storeapi.kobo.com/v1/auth/device", json = postData )
  81. response.raise_for_status()
  82. jsonResponse = response.json()
  83. if jsonResponse[ "TokenType" ] != "Bearer":
  84. raise KoboException( "Device authentication returned with an unsupported token type: '%s'" % jsonResponse[ "TokenType" ] )
  85. Globals.Settings.AccessToken = jsonResponse[ "AccessToken" ]
  86. Globals.Settings.RefreshToken = jsonResponse[ "RefreshToken" ]
  87. if not Globals.Settings.AreAuthenticationSettingsSet():
  88. raise KoboException( "Authentication settings are not set after device authentication." )
  89. if len( userKey ) > 0:
  90. Globals.Settings.UserKey = jsonResponse[ "UserKey" ]
  91. Globals.Settings.Save()
  92. def RefreshAuthentication( self ) -> None:
  93. Globals.Logger.debug( "Kobo.RefreshAuthentication" )
  94. headers = Kobo.GetHeaderWithAccessToken()
  95. postData = {
  96. "AppVersion": Kobo.ApplicationVersion,
  97. "ClientKey": base64.b64encode( Kobo.DefaultPlatformId.encode() ).decode(),
  98. "PlatformId": Kobo.DefaultPlatformId,
  99. "RefreshToken": Globals.Settings.RefreshToken
  100. }
  101. # The reauthentication hook is intentionally not set.
  102. response = self.Session.post( "https://storeapi.kobo.com/v1/auth/refresh", json = postData, headers = headers )
  103. response.raise_for_status()
  104. jsonResponse = response.json()
  105. if jsonResponse[ "TokenType" ] != "Bearer":
  106. raise KoboException( "Authentication refresh returned with an unsupported token type: '%s'" % jsonResponse[ "TokenType" ] )
  107. Globals.Settings.AccessToken = jsonResponse[ "AccessToken" ]
  108. Globals.Settings.RefreshToken = jsonResponse[ "RefreshToken" ]
  109. if not Globals.Settings.AreAuthenticationSettingsSet():
  110. raise KoboException( "Authentication settings are not set after authentication refresh." )
  111. Globals.Settings.Save()
  112. def LoadInitializationSettings( self ) -> None:
  113. Globals.Logger.debug( "Kobo.LoadInitializationSettings" )
  114. headers = Kobo.GetHeaderWithAccessToken()
  115. hooks = Kobo.__GetReauthenticationHook()
  116. response = self.Session.get( "https://storeapi.kobo.com/v1/initialization", headers = headers, hooks = hooks )
  117. response.raise_for_status()
  118. jsonResponse = response.json()
  119. self.InitializationSettings = jsonResponse[ "Resources" ]
  120. def __GetExtraLoginParameters( self ) -> Tuple[ str, str, str ]:
  121. Globals.Logger.debug( "Kobo.__GetExtraLoginParameters" )
  122. signInUrl = self.InitializationSettings[ "sign_in_page" ]
  123. params = {
  124. "wsa": Kobo.Affiliate,
  125. "pwsav": Kobo.ApplicationVersion,
  126. "pwspid": Kobo.DefaultPlatformId,
  127. "pwsdid": Globals.Settings.DeviceId
  128. }
  129. response = self.Session.get( signInUrl, params = params )
  130. response.raise_for_status()
  131. htmlResponse = response.text
  132. # The link can be found in the response ('<a class="kobo-link partner-option kobo"') but the Android app does not use the entire path.
  133. # (The entire path looks like this: "/ww/en/signin/signin/kobo?workflowId=01234567-0123-0123-0123-0123456789ab".)
  134. parsed = urllib.parse.urlparse( signInUrl )
  135. koboSignInUrl = parsed._replace( query = None, path = "/ww/en/signin/signin" ).geturl()
  136. match = re.search( r""" name="LogInModel.WorkflowId" type="hidden" value="([^"]+)" />""", htmlResponse )
  137. if match is None:
  138. raise KoboException( "Can't find the workflow ID in the login form. The page format might have changed." )
  139. workflowId = html.unescape( match.group( 1 ) )
  140. match = re.search( r"""<input name="__RequestVerificationToken" type="hidden" value="([^"]+)" />""", htmlResponse )
  141. if match is None:
  142. raise KoboException( "Can't find the request verification token in the login form. The page format might have changed." )
  143. requestVerificationToken = html.unescape( match.group( 1 ) )
  144. return koboSignInUrl, workflowId, requestVerificationToken
  145. def Login( self, email: str, password: str, captcha: str ) -> None:
  146. Globals.Logger.debug( "Kobo.Login" )
  147. signInUrl, workflowId, requestVerificationToken = self.__GetExtraLoginParameters()
  148. postData = {
  149. "LogInModel.WorkflowId": workflowId,
  150. "LogInModel.Provider": Kobo.Affiliate,
  151. "ReturnUrl": "",
  152. "__RequestVerificationToken": requestVerificationToken,
  153. "LogInModel.UserName": email,
  154. "LogInModel.Password": password,
  155. "g-recaptcha-response": captcha,
  156. "h-captcha-response": captcha
  157. }
  158. response = self.Session.post( signInUrl, data = postData )
  159. response.raise_for_status()
  160. htmlResponse = response.text
  161. match = re.search( r"'(kobo://UserAuthenticated\?[^']+)';", htmlResponse )
  162. if match is None:
  163. raise KoboException( "Authenticated user URL can't be found. The page format might have changed." )
  164. url = match.group( 1 )
  165. parsed = urllib.parse.urlparse( url )
  166. parsedQueries = urllib.parse.parse_qs( parsed.query )
  167. Globals.Settings.UserId = parsedQueries[ "userId" ][ 0 ] # We don't call Settings.Save here, AuthenticateDevice will do that if it succeeds.
  168. userKey = parsedQueries[ "userKey" ][ 0 ]
  169. self.AuthenticateDevice( userKey )
  170. def GetBookInfo( self, productId: str ) -> dict:
  171. Globals.Logger.debug( "Kobo.GetBookInfo" )
  172. url = self.InitializationSettings[ "book" ].replace( "{ProductId}", productId )
  173. headers = Kobo.GetHeaderWithAccessToken()
  174. hooks = Kobo.__GetReauthenticationHook()
  175. response = self.Session.get( url, headers = headers, hooks = hooks )
  176. response.raise_for_status()
  177. jsonResponse = response.json()
  178. return jsonResponse
  179. def __GetMyBookListPage( self, syncToken: str ) -> Tuple[ list, str ]:
  180. Globals.Logger.debug( "Kobo.__GetMyBookListPage" )
  181. url = self.InitializationSettings[ "library_sync" ]
  182. headers = Kobo.GetHeaderWithAccessToken()
  183. hooks = Kobo.__GetReauthenticationHook()
  184. if len( syncToken ) > 0:
  185. headers[ "x-kobo-synctoken" ] = syncToken
  186. response = Globals.Kobo.Session.get( url, headers = headers, hooks = hooks )
  187. response.raise_for_status()
  188. bookList = response.json()
  189. syncToken = ""
  190. syncResult = response.headers.get( "x-kobo-sync" )
  191. if syncResult == "continue":
  192. syncToken = response.headers.get( "x-kobo-synctoken", "" )
  193. return bookList, syncToken
  194. def GetMyBookList( self ) -> list:
  195. # The "library_sync" name and the synchronization tokens make it somewhat suspicious that we should use
  196. # "library_items" instead to get the My Books list, but "library_items" gives back less info (even with the
  197. # embed=ProductMetadata query parameter set).
  198. fullBookList = []
  199. syncToken = ""
  200. while True:
  201. bookList, syncToken = self.__GetMyBookListPage( syncToken )
  202. fullBookList += bookList
  203. if len( syncToken ) == 0:
  204. break
  205. return fullBookList
  206. def GetMyWishList( self ) -> list:
  207. Globals.Logger.debug( "Kobo.GetMyWishList" )
  208. items = []
  209. currentPageIndex = 0
  210. while True:
  211. url = self.InitializationSettings[ "user_wishlist" ]
  212. headers = Kobo.GetHeaderWithAccessToken()
  213. hooks = Kobo.__GetReauthenticationHook()
  214. params = {
  215. "PageIndex": currentPageIndex,
  216. "PageSize": 100, # 100 is the default if PageSize is not specified.
  217. }
  218. response = Globals.Kobo.Session.get( url, params = params, headers = headers, hooks = hooks )
  219. response.raise_for_status()
  220. wishList = response.json()
  221. items.extend( wishList[ "Items" ] )
  222. currentPageIndex += 1
  223. if currentPageIndex >= wishList[ "TotalPageCount" ]:
  224. break
  225. return items
  226. def __GetContentAccessBook( self, productId: str, displayProfile: str ) -> dict:
  227. Globals.Logger.debug( "Kobo.__GetContentAccessBook" )
  228. url = self.InitializationSettings[ "content_access_book" ].replace( "{ProductId}", productId )
  229. params = { "DisplayProfile": displayProfile }
  230. headers = Kobo.GetHeaderWithAccessToken()
  231. hooks = Kobo.__GetReauthenticationHook()
  232. response = self.Session.get( url, params = params, headers = headers, hooks = hooks )
  233. response.raise_for_status()
  234. jsonResponse = response.json()
  235. return jsonResponse
  236. @staticmethod
  237. def __GetContentKeys( contentAccessBookResponse: dict ) -> Dict[ str, str ]:
  238. jsonContentKeys = contentAccessBookResponse.get( "ContentKeys" )
  239. if jsonContentKeys is None:
  240. return {}
  241. contentKeys = {}
  242. for contentKey in jsonContentKeys:
  243. contentKeys[ contentKey[ "Name" ] ] = contentKey[ "Value" ]
  244. return contentKeys
  245. @staticmethod
  246. def __GetDownloadInfo( productId: str, contentAccessBookResponse: dict ) -> Tuple[ str, bool ]:
  247. jsonContentUrls = contentAccessBookResponse.get( "ContentUrls" )
  248. if jsonContentUrls is None:
  249. raise KoboException( "Download URL can't be found for product '%s'." % productId )
  250. if len( jsonContentUrls ) == 0:
  251. raise KoboException( "Download URL list is empty for product '%s'. If this is an archived book then it must be unarchived first on the Kobo website (https://www.kobo.com/help/en-US/article/1799/restoring-deleted-books-or-magazines)." % productId )
  252. for jsonContentUrl in jsonContentUrls:
  253. if ( jsonContentUrl[ "DRMType" ] == "KDRM" or jsonContentUrl[ "DRMType" ] == "SignedNoDrm" ) and \
  254. ( jsonContentUrl[ "UrlFormat" ] == "EPUB3" or jsonContentUrl[ "UrlFormat" ] == "KEPUB" ):
  255. hasDrm = jsonContentUrl[ "DRMType" ] == "KDRM"
  256. return jsonContentUrl[ "DownloadUrl" ], hasDrm
  257. message = "Download URL for supported formats can't be found for product '%s'.\n" % productId
  258. message += "Available formats:"
  259. for jsonContentUrl in jsonContentUrls:
  260. message += "\nDRMType: '%s', UrlFormat: '%s'" % ( jsonContentUrl[ "DRMType" ], jsonContentUrl[ "UrlFormat" ] )
  261. raise KoboException( message )
  262. def __DownloadToFile( self, url, outputPath: str ) -> None:
  263. Globals.Logger.debug( "Kobo.__DownloadToFile" )
  264. response = self.Session.get( url, stream = True )
  265. response.raise_for_status()
  266. with open( outputPath, "wb" ) as f:
  267. for chunk in response.iter_content( chunk_size = 1024 * 256 ):
  268. f.write( chunk )
  269. # Downloading archived books is not possible, the "content_access_book" API endpoint returns with empty ContentKeys
  270. # and ContentUrls for them.
  271. def Download( self, productId: str, displayProfile: str, outputPath: str ) -> None:
  272. Globals.Logger.debug( "Kobo.Download" )
  273. jsonResponse = self.__GetContentAccessBook( productId, displayProfile )
  274. contentKeys = Kobo.__GetContentKeys( jsonResponse )
  275. downloadUrl, hasDrm = Kobo.__GetDownloadInfo( productId, jsonResponse )
  276. temporaryOutputPath = outputPath + ".downloading"
  277. try:
  278. self.__DownloadToFile( downloadUrl, temporaryOutputPath )
  279. if hasDrm:
  280. drmRemover = KoboDrmRemover( Globals.Settings.DeviceId, Globals.Settings.UserId )
  281. drmRemover.RemoveDrm( temporaryOutputPath, outputPath, contentKeys )
  282. os.remove( temporaryOutputPath )
  283. else:
  284. os.rename( temporaryOutputPath, outputPath )
  285. except:
  286. if os.path.isfile( temporaryOutputPath ):
  287. os.remove( temporaryOutputPath )
  288. if os.path.isfile( outputPath ):
  289. os.remove( outputPath )
  290. raise