Kobo.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. from Globals import Globals
  2. from KoboDrmRemover import KoboDrmRemover
  3. import requests
  4. from typing import Dict, Tuple
  5. import base64
  6. import html
  7. import os
  8. import re
  9. import urllib
  10. import uuid
  11. class KoboException( Exception ):
  12. pass
  13. # The hook's workflow is based on this:
  14. # https://github.com/requests/toolbelt/blob/master/requests_toolbelt/auth/http_proxy_digest.py
  15. def ReauthenticationHook( r, *args, **kwargs ):
  16. if r.status_code != requests.codes.unauthorized: # 401
  17. return
  18. print( "Refreshing expired authentication token" )
  19. # Consume content and release the original connection to allow our new request to reuse the same one.
  20. r.content
  21. r.close()
  22. prep = r.request.copy()
  23. # Refresh the authentication token and use it.
  24. Globals.Kobo.RefreshAuthentication()
  25. headers = Kobo.GetHeaderWithAccessToken()
  26. prep.headers[ "Authorization" ] = headers[ "Authorization" ]
  27. # Don't retry to reauthenticate this request again.
  28. prep.deregister_hook( "response", ReauthenticationHook )
  29. # Resend the failed request.
  30. _r = r.connection.send( prep, **kwargs )
  31. _r.history.append( r )
  32. _r.request = prep
  33. return _r
  34. class Kobo:
  35. Affiliate = "Kobo"
  36. ApplicationVersion = "7.1.21543"
  37. DefaultPlatformId = "00000000-0000-0000-0000-000000004000"
  38. DisplayProfile = "Android"
  39. def __init__( self ):
  40. self.InitializationSettings = {}
  41. self.Session = requests.session()
  42. # This could be added to the session but then we would need to add { "Authorization": None } headers to all other
  43. # functions that doesn't need authorization.
  44. @staticmethod
  45. def GetHeaderWithAccessToken() -> dict:
  46. authorization = "Bearer " + Globals.Settings.AccessToken
  47. headers = { "Authorization": authorization }
  48. return headers
  49. # This could be added to the session too. See the comment at GetHeaderWithAccessToken.
  50. @staticmethod
  51. def __GetReauthenticationHook() -> dict:
  52. return { "response": ReauthenticationHook }
  53. # The initial device authentication request for a non-logged in user doesn't require a user key, and the returned
  54. # user key can't be used for anything.
  55. def AuthenticateDevice( self, userKey: str = "" ) -> None:
  56. if len( Globals.Settings.DeviceId ) == 0:
  57. Globals.Settings.DeviceId = str( uuid.uuid4() )
  58. Globals.Settings.AccessToken = ""
  59. Globals.Settings.RefreshToken = ""
  60. postData = {
  61. "AffiliateName": Kobo.Affiliate,
  62. "AppVersion": Kobo.ApplicationVersion,
  63. "ClientKey": base64.b64encode( Kobo.DefaultPlatformId.encode() ).decode(),
  64. "DeviceId": Globals.Settings.DeviceId,
  65. "PlatformId": Kobo.DefaultPlatformId
  66. }
  67. if len( userKey ) > 0:
  68. postData[ "UserKey" ] = userKey
  69. response = self.Session.post( "https://storeapi.kobo.com/v1/auth/device", json = postData )
  70. response.raise_for_status()
  71. jsonResponse = response.json()
  72. if jsonResponse[ "TokenType" ] != "Bearer":
  73. raise KoboException( "Device authentication returned with an unsupported token type: '%s'" % jsonResponse[ "TokenType" ] )
  74. Globals.Settings.AccessToken = jsonResponse[ "AccessToken" ]
  75. Globals.Settings.RefreshToken = jsonResponse[ "RefreshToken" ]
  76. if not Globals.Settings.AreAuthenticationSettingsSet():
  77. raise KoboException( "Authentication settings are not set after device authentication." )
  78. if len( userKey ) > 0:
  79. Globals.Settings.UserKey = jsonResponse[ "UserKey" ]
  80. Globals.Settings.Save()
  81. def RefreshAuthentication( self ) -> None:
  82. headers = Kobo.GetHeaderWithAccessToken()
  83. postData = {
  84. "AppVersion": Kobo.ApplicationVersion,
  85. "ClientKey": base64.b64encode( Kobo.DefaultPlatformId.encode() ).decode(),
  86. "PlatformId": Kobo.DefaultPlatformId,
  87. "RefreshToken": Globals.Settings.RefreshToken
  88. }
  89. # The reauthentication hook is intentionally not set.
  90. response = self.Session.post( "https://storeapi.kobo.com/v1/auth/refresh", json = postData, headers = headers )
  91. response.raise_for_status()
  92. jsonResponse = response.json()
  93. if jsonResponse[ "TokenType" ] != "Bearer":
  94. raise KoboException( "Authentication refresh returned with an unsupported token type: '%s'" % jsonResponse[ "TokenType" ] )
  95. Globals.Settings.AccessToken = jsonResponse[ "AccessToken" ]
  96. Globals.Settings.RefreshToken = jsonResponse[ "RefreshToken" ]
  97. if not Globals.Settings.AreAuthenticationSettingsSet():
  98. raise KoboException( "Authentication settings are not set after authentication refresh." )
  99. Globals.Settings.Save()
  100. def LoadInitializationSettings( self ) -> None:
  101. headers = Kobo.GetHeaderWithAccessToken()
  102. hooks = Kobo.__GetReauthenticationHook()
  103. response = self.Session.get( "https://storeapi.kobo.com/v1/initialization", headers = headers, hooks = hooks )
  104. response.raise_for_status()
  105. jsonResponse = response.json()
  106. self.InitializationSettings = jsonResponse[ "Resources" ]
  107. def __GetAuthenticationUrlFromLoginPage( self ) -> str:
  108. signInPageUrl = self.InitializationSettings[ "sign_in_page" ]
  109. params = {
  110. "wsa": Kobo.Affiliate,
  111. "wscf": "kepub",
  112. "pwsav": Kobo.ApplicationVersion,
  113. "pwspid": Kobo.DefaultPlatformId,
  114. "pwsdid": Globals.Settings.DeviceId
  115. }
  116. response = self.Session.get( signInPageUrl, params = params )
  117. response.raise_for_status()
  118. htmlResponse = response.text
  119. match = re.search( r"""<form action="(/[^"]+/Authenticate\?[^"]+)" method="post">""", htmlResponse )
  120. if match is None:
  121. raise KoboException( "Can't find login form. The page format might have changed." )
  122. authenticationUrl = match.group( 1 )
  123. authenticationUrl = html.unescape( authenticationUrl )
  124. authenticationUrl = urllib.parse.urljoin( signInPageUrl, authenticationUrl )
  125. return authenticationUrl
  126. def Login( self, email: str, password: str ) -> None:
  127. authenticationUrl = self.__GetAuthenticationUrlFromLoginPage()
  128. postData = {
  129. "EditModel.Email": email,
  130. "EditModel.Password": password,
  131. "EditModel.AuthenticationAction": "Authenticate",
  132. "AffiliateObject.Name": Kobo.Affiliate,
  133. "IsFTE": "False",
  134. "RequireSharedToken": "False"
  135. }
  136. response = self.Session.post( authenticationUrl, data = postData )
  137. response.raise_for_status()
  138. htmlResponse = response.text
  139. match = re.search( r"'(kobo://UserAuthenticated\?[^']+)';", htmlResponse )
  140. if match is None:
  141. raise KoboException( "Authenticated user URL can't be found. The page format might have changed." )
  142. url = match.group( 1 )
  143. parsed = urllib.parse.urlparse( url )
  144. parsedQueries = urllib.parse.parse_qs( parsed.query )
  145. Globals.Settings.UserId = parsedQueries[ "userId" ][ 0 ] # We don't call Settings.Save here, AuthenticateDevice will do that if it succeeds.
  146. userKey = parsedQueries[ "userKey" ][ 0 ]
  147. self.AuthenticateDevice( userKey )
  148. def GetBookInfo( self, productId: str ) -> dict:
  149. url = self.InitializationSettings[ "book" ].replace( "{ProductId}", productId )
  150. headers = Kobo.GetHeaderWithAccessToken()
  151. hooks = Kobo.__GetReauthenticationHook()
  152. response = self.Session.get( url, headers = headers, hooks = hooks )
  153. response.raise_for_status()
  154. jsonResponse = response.json()
  155. return jsonResponse
  156. def GetMyBookList( self ) -> dict:
  157. # There is also "library_items" but that is paged and gives back less info (even with the embed=ProductMetadata
  158. # query parameter set).
  159. url = self.InitializationSettings[ "library_sync" ]
  160. headers = Kobo.GetHeaderWithAccessToken()
  161. hooks = Kobo.__GetReauthenticationHook()
  162. response = Globals.Kobo.Session.get( url, headers = headers, hooks = hooks )
  163. response.raise_for_status()
  164. bookList = response.json()
  165. return bookList
  166. def __GetContentAccessBook( self, productId: str, displayProfile: str ) -> dict:
  167. url = self.InitializationSettings[ "content_access_book" ].replace( "{ProductId}", productId )
  168. params = { "DisplayProfile": displayProfile }
  169. headers = Kobo.GetHeaderWithAccessToken()
  170. hooks = Kobo.__GetReauthenticationHook()
  171. response = self.Session.get( url, params = params, headers = headers, hooks = hooks )
  172. response.raise_for_status()
  173. jsonResponse = response.json()
  174. return jsonResponse
  175. @staticmethod
  176. def __GetContentKeys( contentAccessBookResponse: dict ) -> Dict[ str, str ]:
  177. jsonContentKeys = contentAccessBookResponse.get( "ContentKeys" )
  178. if jsonContentKeys is None:
  179. return {}
  180. contentKeys = {}
  181. for contentKey in jsonContentKeys:
  182. contentKeys[ contentKey[ "Name" ] ] = contentKey[ "Value" ]
  183. return contentKeys
  184. @staticmethod
  185. def __GetDownloadInfo( productId: str, contentAccessBookResponse: dict ) -> Tuple[ str, bool ]:
  186. jsonContentUrls = contentAccessBookResponse.get( "ContentUrls" )
  187. if jsonContentUrls is None:
  188. raise KoboException( "Content URL can't be found for product '%s'." % productId )
  189. for jsonContentUrl in jsonContentUrls:
  190. if ( jsonContentUrl[ "DRMType" ] == "KDRM" or jsonContentUrl[ "DRMType" ] == "SignedNoDrm" ) and \
  191. ( jsonContentUrl[ "UrlFormat" ] == "EPUB3" or jsonContentUrl[ "UrlFormat" ] == "KEPUB" ):
  192. hasDrm = jsonContentUrl[ "DRMType" ] == "KDRM"
  193. return jsonContentUrl[ "DownloadUrl" ], hasDrm
  194. raise KoboException( "Download URL for supported formats can't be found for product '%s'." % productId )
  195. def __DownloadToFile( self, url, outputPath: str ) -> None:
  196. response = self.Session.get( url, stream = True )
  197. response.raise_for_status()
  198. with open( outputPath, "wb" ) as f:
  199. for chunk in response.iter_content( chunk_size = 1024 * 256 ):
  200. f.write( chunk )
  201. def Download( self, productId: str, displayProfile: str, outputPath: str ) -> None:
  202. jsonResponse = self.__GetContentAccessBook( productId, displayProfile )
  203. contentKeys = Kobo.__GetContentKeys( jsonResponse )
  204. downloadUrl, hasDrm = Kobo.__GetDownloadInfo( productId, jsonResponse )
  205. temporaryOutputPath = outputPath + ".downloading"
  206. try:
  207. self.__DownloadToFile( downloadUrl, temporaryOutputPath )
  208. if hasDrm:
  209. drmRemover = KoboDrmRemover( Globals.Settings.DeviceId, Globals.Settings.UserId )
  210. drmRemover.RemoveDrm( temporaryOutputPath, outputPath, contentKeys )
  211. os.remove( temporaryOutputPath )
  212. else:
  213. os.rename( temporaryOutputPath, outputPath )
  214. except:
  215. if os.path.isfile( temporaryOutputPath ):
  216. os.remove( temporaryOutputPath )
  217. if os.path.isfile( outputPath ):
  218. os.remove( outputPath )
  219. raise