Kobo.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. from Globals import Globals
  2. from KoboDrmRemover import KoboDrmRemover
  3. import requests
  4. from typing import Dict, Tuple
  5. import base64
  6. import html
  7. import os
  8. import re
  9. import urllib
  10. import uuid
  11. class KoboException( Exception ):
  12. pass
  13. # The hook's workflow is based on this:
  14. # https://github.com/requests/toolbelt/blob/master/requests_toolbelt/auth/http_proxy_digest.py
  15. def ReauthenticationHook( r, *args, **kwargs ):
  16. if r.status_code != requests.codes.unauthorized: # 401
  17. return
  18. print( "Refreshing expired authentication token" )
  19. # Consume content and release the original connection to allow our new request to reuse the same one.
  20. r.content
  21. r.close()
  22. prep = r.request.copy()
  23. # Refresh the authentication token and use it.
  24. Globals.Kobo.RefreshAuthentication()
  25. headers = Kobo.GetHeaderWithAccessToken()
  26. prep.headers[ "Authorization" ] = headers[ "Authorization" ]
  27. # Don't retry to reauthenticate this request again.
  28. prep.deregister_hook( "response", ReauthenticationHook )
  29. # Resend the failed request.
  30. _r = r.connection.send( prep, **kwargs )
  31. _r.history.append( r )
  32. _r.request = prep
  33. return _r
  34. class SessionWithTimeOut( requests.Session ):
  35. def request( self, method, url, **kwargs ):
  36. if "timeout" not in kwargs:
  37. kwargs[ "timeout" ] = 30 # 30 seconds
  38. return super().request( method, url, **kwargs )
  39. class Kobo:
  40. Affiliate = "Kobo"
  41. ApplicationVersion = "8.11.24971"
  42. DefaultPlatformId = "00000000-0000-0000-0000-000000004000"
  43. DisplayProfile = "Android"
  44. def __init__( self ):
  45. self.InitializationSettings = {}
  46. self.Session = SessionWithTimeOut()
  47. # This could be added to the session but then we would need to add { "Authorization": None } headers to all other
  48. # functions that doesn't need authorization.
  49. @staticmethod
  50. def GetHeaderWithAccessToken() -> dict:
  51. authorization = "Bearer " + Globals.Settings.AccessToken
  52. headers = { "Authorization": authorization }
  53. return headers
  54. # This could be added to the session too. See the comment at GetHeaderWithAccessToken.
  55. @staticmethod
  56. def __GetReauthenticationHook() -> dict:
  57. return { "response": ReauthenticationHook }
  58. # The initial device authentication request for a non-logged in user doesn't require a user key, and the returned
  59. # user key can't be used for anything.
  60. def AuthenticateDevice( self, userKey: str = "" ) -> None:
  61. if len( Globals.Settings.DeviceId ) == 0:
  62. Globals.Settings.DeviceId = str( uuid.uuid4() )
  63. Globals.Settings.AccessToken = ""
  64. Globals.Settings.RefreshToken = ""
  65. postData = {
  66. "AffiliateName": Kobo.Affiliate,
  67. "AppVersion": Kobo.ApplicationVersion,
  68. "ClientKey": base64.b64encode( Kobo.DefaultPlatformId.encode() ).decode(),
  69. "DeviceId": Globals.Settings.DeviceId,
  70. "PlatformId": Kobo.DefaultPlatformId
  71. }
  72. if len( userKey ) > 0:
  73. postData[ "UserKey" ] = userKey
  74. response = self.Session.post( "https://storeapi.kobo.com/v1/auth/device", json = postData )
  75. response.raise_for_status()
  76. jsonResponse = response.json()
  77. if jsonResponse[ "TokenType" ] != "Bearer":
  78. raise KoboException( "Device authentication returned with an unsupported token type: '%s'" % jsonResponse[ "TokenType" ] )
  79. Globals.Settings.AccessToken = jsonResponse[ "AccessToken" ]
  80. Globals.Settings.RefreshToken = jsonResponse[ "RefreshToken" ]
  81. if not Globals.Settings.AreAuthenticationSettingsSet():
  82. raise KoboException( "Authentication settings are not set after device authentication." )
  83. if len( userKey ) > 0:
  84. Globals.Settings.UserKey = jsonResponse[ "UserKey" ]
  85. Globals.Settings.Save()
  86. def RefreshAuthentication( self ) -> None:
  87. headers = Kobo.GetHeaderWithAccessToken()
  88. postData = {
  89. "AppVersion": Kobo.ApplicationVersion,
  90. "ClientKey": base64.b64encode( Kobo.DefaultPlatformId.encode() ).decode(),
  91. "PlatformId": Kobo.DefaultPlatformId,
  92. "RefreshToken": Globals.Settings.RefreshToken
  93. }
  94. # The reauthentication hook is intentionally not set.
  95. response = self.Session.post( "https://storeapi.kobo.com/v1/auth/refresh", json = postData, headers = headers )
  96. response.raise_for_status()
  97. jsonResponse = response.json()
  98. if jsonResponse[ "TokenType" ] != "Bearer":
  99. raise KoboException( "Authentication refresh returned with an unsupported token type: '%s'" % jsonResponse[ "TokenType" ] )
  100. Globals.Settings.AccessToken = jsonResponse[ "AccessToken" ]
  101. Globals.Settings.RefreshToken = jsonResponse[ "RefreshToken" ]
  102. if not Globals.Settings.AreAuthenticationSettingsSet():
  103. raise KoboException( "Authentication settings are not set after authentication refresh." )
  104. Globals.Settings.Save()
  105. def LoadInitializationSettings( self ) -> None:
  106. headers = Kobo.GetHeaderWithAccessToken()
  107. hooks = Kobo.__GetReauthenticationHook()
  108. response = self.Session.get( "https://storeapi.kobo.com/v1/initialization", headers = headers, hooks = hooks )
  109. response.raise_for_status()
  110. jsonResponse = response.json()
  111. self.InitializationSettings = jsonResponse[ "Resources" ]
  112. def __GetExtraLoginParameters( self ) -> Tuple[ str, str, str ]:
  113. signInUrl = self.InitializationSettings[ "sign_in_page" ]
  114. params = {
  115. "wsa": Kobo.Affiliate,
  116. "pwsav": Kobo.ApplicationVersion,
  117. "pwspid": Kobo.DefaultPlatformId,
  118. "pwsdid": Globals.Settings.DeviceId
  119. }
  120. response = self.Session.get( signInUrl, params = params )
  121. response.raise_for_status()
  122. htmlResponse = response.text
  123. # The link can be found in the response ('<a class="kobo-link partner-option kobo"') but this will do for now.
  124. parsed = urllib.parse.urlparse( signInUrl )
  125. koboSignInUrl = parsed._replace( query = None, path = "/ww/en/signin/signin/kobo" ).geturl()
  126. match = re.search( r""" name="LogInModel.WorkflowId" type="hidden" value="([^"]+)" />""", htmlResponse )
  127. if match is None:
  128. raise KoboException( "Can't find the workflow ID in the login form. The page format might have changed." )
  129. workflowId = html.unescape( match.group( 1 ) )
  130. match = re.search( r"""<input name="__RequestVerificationToken" type="hidden" value="([^"]+)" />""", htmlResponse )
  131. if match is None:
  132. raise KoboException( "Can't find the request verification token in the login form. The page format might have changed." )
  133. requestVerificationToken = html.unescape( match.group( 1 ) )
  134. return koboSignInUrl, workflowId, requestVerificationToken
  135. def Login( self, email: str, password: str, captcha: str ) -> None:
  136. signInUrl, workflowId, requestVerificationToken = self.__GetExtraLoginParameters()
  137. postData = {
  138. "LogInModel.WorkflowId": workflowId,
  139. "LogInModel.Provider": Kobo.Affiliate,
  140. "ReturnUrl": "",
  141. "__RequestVerificationToken": requestVerificationToken,
  142. "LogInModel.UserName": email,
  143. "LogInModel.Password": password,
  144. "g-recaptcha-response": captcha
  145. }
  146. response = self.Session.post( signInUrl, data = postData )
  147. response.raise_for_status()
  148. htmlResponse = response.text
  149. match = re.search( r"'(kobo://UserAuthenticated\?[^']+)';", htmlResponse )
  150. if match is None:
  151. raise KoboException( "Authenticated user URL can't be found. The page format might have changed." )
  152. url = match.group( 1 )
  153. parsed = urllib.parse.urlparse( url )
  154. parsedQueries = urllib.parse.parse_qs( parsed.query )
  155. Globals.Settings.UserId = parsedQueries[ "userId" ][ 0 ] # We don't call Settings.Save here, AuthenticateDevice will do that if it succeeds.
  156. userKey = parsedQueries[ "userKey" ][ 0 ]
  157. self.AuthenticateDevice( userKey )
  158. def GetBookInfo( self, productId: str ) -> dict:
  159. url = self.InitializationSettings[ "book" ].replace( "{ProductId}", productId )
  160. headers = Kobo.GetHeaderWithAccessToken()
  161. hooks = Kobo.__GetReauthenticationHook()
  162. response = self.Session.get( url, headers = headers, hooks = hooks )
  163. response.raise_for_status()
  164. jsonResponse = response.json()
  165. return jsonResponse
  166. def __GetMyBookListPage( self, syncToken: str ) -> Tuple[ list, str ]:
  167. url = self.InitializationSettings[ "library_sync" ]
  168. headers = Kobo.GetHeaderWithAccessToken()
  169. hooks = Kobo.__GetReauthenticationHook()
  170. if len( syncToken ) > 0:
  171. headers[ "x-kobo-synctoken" ] = syncToken
  172. response = Globals.Kobo.Session.get( url, headers = headers, hooks = hooks )
  173. response.raise_for_status()
  174. bookList = response.json()
  175. syncToken = ""
  176. syncResult = response.headers.get( "x-kobo-sync" )
  177. if syncResult == "continue":
  178. syncToken = response.headers.get( "x-kobo-synctoken", "" )
  179. return bookList, syncToken
  180. def GetMyBookList( self ) -> list:
  181. # The "library_sync" name and the synchronization tokens make it somewhat suspicious that we should use
  182. # "library_items" instead to get the My Books list, but "library_items" gives back less info (even with the
  183. # embed=ProductMetadata query parameter set).
  184. fullBookList = []
  185. syncToken = ""
  186. while True:
  187. bookList, syncToken = self.__GetMyBookListPage( syncToken )
  188. fullBookList += bookList
  189. if len( syncToken ) == 0:
  190. break
  191. return fullBookList
  192. def GetMyWishList( self ) -> list:
  193. items = []
  194. currentPageIndex = 0
  195. while True:
  196. url = self.InitializationSettings[ "user_wishlist" ]
  197. headers = Kobo.GetHeaderWithAccessToken()
  198. hooks = Kobo.__GetReauthenticationHook()
  199. params = {
  200. "PageIndex": currentPageIndex,
  201. "PageSize": 100, # 100 is the default if PageSize is not specified.
  202. }
  203. response = Globals.Kobo.Session.get( url, params = params, headers = headers, hooks = hooks )
  204. response.raise_for_status()
  205. wishList = response.json()
  206. items.extend( wishList[ "Items" ] )
  207. currentPageIndex += 1
  208. if currentPageIndex >= wishList[ "TotalPageCount" ]:
  209. break
  210. return items
  211. def __GetContentAccessBook( self, productId: str, displayProfile: str ) -> dict:
  212. url = self.InitializationSettings[ "content_access_book" ].replace( "{ProductId}", productId )
  213. params = { "DisplayProfile": displayProfile }
  214. headers = Kobo.GetHeaderWithAccessToken()
  215. hooks = Kobo.__GetReauthenticationHook()
  216. response = self.Session.get( url, params = params, headers = headers, hooks = hooks )
  217. response.raise_for_status()
  218. jsonResponse = response.json()
  219. return jsonResponse
  220. @staticmethod
  221. def __GetContentKeys( contentAccessBookResponse: dict ) -> Dict[ str, str ]:
  222. jsonContentKeys = contentAccessBookResponse.get( "ContentKeys" )
  223. if jsonContentKeys is None:
  224. return {}
  225. contentKeys = {}
  226. for contentKey in jsonContentKeys:
  227. contentKeys[ contentKey[ "Name" ] ] = contentKey[ "Value" ]
  228. return contentKeys
  229. @staticmethod
  230. def __GetDownloadInfo( productId: str, contentAccessBookResponse: dict ) -> Tuple[ str, bool ]:
  231. jsonContentUrls = contentAccessBookResponse.get( "ContentUrls" )
  232. if jsonContentUrls is None:
  233. raise KoboException( "Download URL can't be found for product '%s'." % productId )
  234. if len( jsonContentUrls ) == 0:
  235. raise KoboException( "Download URL list is empty for product '%s'. If this is an archived book then it must be unarchived first on the Kobo website (https://www.kobo.com/help/en-US/article/1799/restoring-deleted-books-or-magazines)." % productId )
  236. for jsonContentUrl in jsonContentUrls:
  237. if ( jsonContentUrl[ "DRMType" ] == "KDRM" or jsonContentUrl[ "DRMType" ] == "SignedNoDrm" ) and \
  238. ( jsonContentUrl[ "UrlFormat" ] == "EPUB3" or jsonContentUrl[ "UrlFormat" ] == "KEPUB" ):
  239. hasDrm = jsonContentUrl[ "DRMType" ] == "KDRM"
  240. return jsonContentUrl[ "DownloadUrl" ], hasDrm
  241. message = "Download URL for supported formats can't be found for product '%s'.\n" % productId
  242. message += "Available formats:"
  243. for jsonContentUrl in jsonContentUrls:
  244. message += "\nDRMType: '%s', UrlFormat: '%s'" % ( jsonContentUrl[ "DRMType" ], jsonContentUrl[ "UrlFormat" ] )
  245. raise KoboException( message )
  246. def __DownloadToFile( self, url, outputPath: str ) -> None:
  247. response = self.Session.get( url, stream = True )
  248. response.raise_for_status()
  249. with open( outputPath, "wb" ) as f:
  250. for chunk in response.iter_content( chunk_size = 1024 * 256 ):
  251. f.write( chunk )
  252. # Downloading archived books is not possible, the "content_access_book" API endpoint returns with empty ContentKeys
  253. # and ContentUrls for them.
  254. def Download( self, productId: str, displayProfile: str, outputPath: str ) -> None:
  255. jsonResponse = self.__GetContentAccessBook( productId, displayProfile )
  256. contentKeys = Kobo.__GetContentKeys( jsonResponse )
  257. downloadUrl, hasDrm = Kobo.__GetDownloadInfo( productId, jsonResponse )
  258. temporaryOutputPath = outputPath + ".downloading"
  259. try:
  260. self.__DownloadToFile( downloadUrl, temporaryOutputPath )
  261. if hasDrm:
  262. drmRemover = KoboDrmRemover( Globals.Settings.DeviceId, Globals.Settings.UserId )
  263. drmRemover.RemoveDrm( temporaryOutputPath, outputPath, contentKeys )
  264. os.remove( temporaryOutputPath )
  265. else:
  266. os.rename( temporaryOutputPath, outputPath )
  267. except:
  268. if os.path.isfile( temporaryOutputPath ):
  269. os.remove( temporaryOutputPath )
  270. if os.path.isfile( outputPath ):
  271. os.remove( outputPath )
  272. raise