|
|
@@ -0,0 +1,279 @@
|
|
|
+from Globals import Globals
|
|
|
+from KoboDrmRemover import KoboDrmRemover
|
|
|
+
|
|
|
+import requests
|
|
|
+
|
|
|
+from typing import Dict, Tuple
|
|
|
+import base64
|
|
|
+import html
|
|
|
+import os
|
|
|
+import re
|
|
|
+import urllib
|
|
|
+import uuid
|
|
|
+
|
|
|
+class KoboException( Exception ):
|
|
|
+ pass
|
|
|
+
|
|
|
+# The hook's workflow is based on this:
|
|
|
+# https://github.com/requests/toolbelt/blob/master/requests_toolbelt/auth/http_proxy_digest.py
|
|
|
+def ReauthenticationHook( r, *args, **kwargs ):
|
|
|
+ if r.status_code != requests.codes.unauthorized: # 401
|
|
|
+ return
|
|
|
+
|
|
|
+ print( "Refreshing expired authentication token" )
|
|
|
+
|
|
|
+ # Consume content and release the original connection to allow our new request to reuse the same one.
|
|
|
+ r.content
|
|
|
+ r.close()
|
|
|
+
|
|
|
+ prep = r.request.copy()
|
|
|
+
|
|
|
+ # Refresh the authentication token and use it.
|
|
|
+ Globals.Kobo.RefreshAuthentication()
|
|
|
+ headers = Kobo.GetHeaderWithAccessToken()
|
|
|
+ prep.headers[ "Authorization" ] = headers[ "Authorization" ]
|
|
|
+
|
|
|
+ # Don't retry to reauthenticate this request again.
|
|
|
+ prep.deregister_hook( "response", ReauthenticationHook )
|
|
|
+
|
|
|
+ # Resend the failed request.
|
|
|
+ _r = r.connection.send( prep, **kwargs )
|
|
|
+ _r.history.append( r )
|
|
|
+ _r.request = prep
|
|
|
+
|
|
|
+ return _r
|
|
|
+
|
|
|
+class Kobo:
|
|
|
+ Affiliate = "Kobo"
|
|
|
+ ApplicationVersion = "7.1.21543"
|
|
|
+ DefaultPlatformId = "00000000-0000-0000-0000-000000004000"
|
|
|
+ DisplayProfile = "Android"
|
|
|
+
|
|
|
+ def __init__( self ):
|
|
|
+ self.InitializationSettings = {}
|
|
|
+ self.Session = requests.session()
|
|
|
+
|
|
|
+ # This could be added to the session but then we would need to add { "Authorization": None } headers to all other
|
|
|
+ # functions that doesn't need authorization.
|
|
|
+ @staticmethod
|
|
|
+ def GetHeaderWithAccessToken() -> dict:
|
|
|
+ authorization = "Bearer " + Globals.Settings.AccessToken
|
|
|
+ headers = { "Authorization": authorization }
|
|
|
+ return headers
|
|
|
+
|
|
|
+ # This could be added to the session too. See the comment at GetHeaderWithAccessToken.
|
|
|
+ @staticmethod
|
|
|
+ def __GetReauthenticationHook() -> dict:
|
|
|
+ return { "response": ReauthenticationHook }
|
|
|
+
|
|
|
+ # The initial device authentication request for a non-logged in user doesn't require a user key, and the returned
|
|
|
+ # user key can't be used for anything.
|
|
|
+ def AuthenticateDevice( self, userKey: str = "" ) -> None:
|
|
|
+ if len( Globals.Settings.DeviceId ) == 0:
|
|
|
+ Globals.Settings.DeviceId = str( uuid.uuid4() )
|
|
|
+ Globals.Settings.AccessToken = ""
|
|
|
+ Globals.Settings.RefreshToken = ""
|
|
|
+
|
|
|
+ postData = {
|
|
|
+ "AffiliateName": Kobo.Affiliate,
|
|
|
+ "AppVersion": Kobo.ApplicationVersion,
|
|
|
+ "ClientKey": base64.b64encode( Kobo.DefaultPlatformId.encode() ).decode(),
|
|
|
+ "DeviceId": Globals.Settings.DeviceId,
|
|
|
+ "PlatformId": Kobo.DefaultPlatformId
|
|
|
+ }
|
|
|
+
|
|
|
+ if len( userKey ) > 0:
|
|
|
+ postData[ "UserKey" ] = userKey
|
|
|
+
|
|
|
+ response = self.Session.post( "https://storeapi.kobo.com/v1/auth/device", json = postData )
|
|
|
+ response.raise_for_status()
|
|
|
+ jsonResponse = response.json()
|
|
|
+
|
|
|
+ if jsonResponse[ "TokenType" ] != "Bearer":
|
|
|
+ raise KoboException( "Device authentication returned with an unsupported token type: '%s'" % jsonResponse[ "TokenType" ] )
|
|
|
+
|
|
|
+ Globals.Settings.AccessToken = jsonResponse[ "AccessToken" ]
|
|
|
+ Globals.Settings.RefreshToken = jsonResponse[ "RefreshToken" ]
|
|
|
+ if not Globals.Settings.AreAuthenticationSettingsSet():
|
|
|
+ raise KoboException( "Authentication settings are not set after device authentication." )
|
|
|
+
|
|
|
+ if len( userKey ) > 0:
|
|
|
+ Globals.Settings.UserKey = jsonResponse[ "UserKey" ]
|
|
|
+
|
|
|
+ Globals.Settings.Save()
|
|
|
+
|
|
|
+ def RefreshAuthentication( self ) -> None:
|
|
|
+ headers = Kobo.GetHeaderWithAccessToken()
|
|
|
+
|
|
|
+ postData = {
|
|
|
+ "AppVersion": Kobo.ApplicationVersion,
|
|
|
+ "ClientKey": base64.b64encode( Kobo.DefaultPlatformId.encode() ).decode(),
|
|
|
+ "PlatformId": Kobo.DefaultPlatformId,
|
|
|
+ "RefreshToken": Globals.Settings.RefreshToken
|
|
|
+ }
|
|
|
+
|
|
|
+ # The reauthentication hook is intentionally not set.
|
|
|
+ response = self.Session.post( "https://storeapi.kobo.com/v1/auth/refresh", json = postData, headers = headers )
|
|
|
+ response.raise_for_status()
|
|
|
+ jsonResponse = response.json()
|
|
|
+
|
|
|
+ if jsonResponse[ "TokenType" ] != "Bearer":
|
|
|
+ raise KoboException( "Authentication refresh returned with an unsupported token type: '%s'" % jsonResponse[ "TokenType" ] )
|
|
|
+
|
|
|
+ Globals.Settings.AccessToken = jsonResponse[ "AccessToken" ]
|
|
|
+ Globals.Settings.RefreshToken = jsonResponse[ "RefreshToken" ]
|
|
|
+ if not Globals.Settings.AreAuthenticationSettingsSet():
|
|
|
+ raise KoboException( "Authentication settings are not set after authentication refresh." )
|
|
|
+
|
|
|
+ Globals.Settings.Save()
|
|
|
+
|
|
|
+ def LoadInitializationSettings( self ) -> None:
|
|
|
+ headers = Kobo.GetHeaderWithAccessToken()
|
|
|
+ hooks = Kobo.__GetReauthenticationHook()
|
|
|
+ response = self.Session.get( "https://storeapi.kobo.com/v1/initialization", headers = headers, hooks = hooks )
|
|
|
+ response.raise_for_status()
|
|
|
+ jsonResponse = response.json()
|
|
|
+ self.InitializationSettings = jsonResponse[ "Resources" ]
|
|
|
+
|
|
|
+ def __GetAuthenticationUrlFromLoginPage( self ) -> str:
|
|
|
+ signInPageUrl = self.InitializationSettings[ "sign_in_page" ]
|
|
|
+
|
|
|
+ params = {
|
|
|
+ "wsa": Kobo.Affiliate,
|
|
|
+ "wscf": "kepub",
|
|
|
+ "pwsav": Kobo.ApplicationVersion,
|
|
|
+ "pwspid": Kobo.DefaultPlatformId,
|
|
|
+ "pwsdid": Globals.Settings.DeviceId
|
|
|
+ }
|
|
|
+
|
|
|
+ response = self.Session.get( signInPageUrl, params = params )
|
|
|
+ response.raise_for_status()
|
|
|
+ htmlResponse = response.text
|
|
|
+
|
|
|
+ match = re.search( r"""<form action="(/[^"]+/Authenticate\?[^"]+)" method="post">""", htmlResponse )
|
|
|
+ if match is None:
|
|
|
+ raise KoboException( "Can't find login form. The page format might have changed." )
|
|
|
+
|
|
|
+ authenticationUrl = match.group( 1 )
|
|
|
+ authenticationUrl = html.unescape( authenticationUrl )
|
|
|
+ authenticationUrl = urllib.parse.urljoin( signInPageUrl, authenticationUrl )
|
|
|
+ return authenticationUrl
|
|
|
+
|
|
|
+ def Login( self, email: str, password: str ) -> None:
|
|
|
+ authenticationUrl = self.__GetAuthenticationUrlFromLoginPage()
|
|
|
+
|
|
|
+ postData = {
|
|
|
+ "EditModel.Email": email,
|
|
|
+ "EditModel.Password": password,
|
|
|
+ "EditModel.AuthenticationAction": "Authenticate",
|
|
|
+ "AffiliateObject.Name": Kobo.Affiliate,
|
|
|
+ "IsFTE": "False",
|
|
|
+ "RequireSharedToken": "False"
|
|
|
+ }
|
|
|
+
|
|
|
+ response = self.Session.post( authenticationUrl, data = postData )
|
|
|
+ response.raise_for_status()
|
|
|
+ htmlResponse = response.text
|
|
|
+
|
|
|
+ match = re.search( r"'(kobo://UserAuthenticated\?[^']+)';", htmlResponse )
|
|
|
+ if match is None:
|
|
|
+ raise KoboException( "Authenticated user URL can't be found. The page format might have changed." )
|
|
|
+
|
|
|
+ url = match.group( 1 )
|
|
|
+ parsed = urllib.parse.urlparse( url )
|
|
|
+ parsedQueries = urllib.parse.parse_qs( parsed.query )
|
|
|
+ Globals.Settings.UserId = parsedQueries[ "userId" ][ 0 ] # We don't call Settings.Save here, AuthenticateDevice will do that if it succeeds.
|
|
|
+ userKey = parsedQueries[ "userKey" ][ 0 ]
|
|
|
+
|
|
|
+ self.AuthenticateDevice( userKey )
|
|
|
+
|
|
|
+ def GetBookInfo( self, productId: str ) -> dict:
|
|
|
+ url = self.InitializationSettings[ "book" ].replace( "{ProductId}", productId )
|
|
|
+ headers = Kobo.GetHeaderWithAccessToken()
|
|
|
+ hooks = Kobo.__GetReauthenticationHook()
|
|
|
+
|
|
|
+ response = self.Session.get( url, headers = headers, hooks = hooks )
|
|
|
+ response.raise_for_status()
|
|
|
+ jsonResponse = response.json()
|
|
|
+ return jsonResponse
|
|
|
+
|
|
|
+ def GetMyBookList( self ) -> dict:
|
|
|
+ # There is also "library_items" but that is paged and gives back less info (even with the embed=ProductMetadata
|
|
|
+ # query parameter set).
|
|
|
+
|
|
|
+ url = self.InitializationSettings[ "library_sync" ]
|
|
|
+ headers = Kobo.GetHeaderWithAccessToken()
|
|
|
+ hooks = Kobo.__GetReauthenticationHook()
|
|
|
+
|
|
|
+ response = Globals.Kobo.Session.get( url, headers = headers, hooks = hooks )
|
|
|
+ response.raise_for_status()
|
|
|
+ bookList = response.json()
|
|
|
+
|
|
|
+ return bookList
|
|
|
+
|
|
|
+ def __GetContentAccessBook( self, productId: str, displayProfile: str ) -> dict:
|
|
|
+ url = self.InitializationSettings[ "content_access_book" ].replace( "{ProductId}", productId )
|
|
|
+ params = { "DisplayProfile": displayProfile }
|
|
|
+ headers = Kobo.GetHeaderWithAccessToken()
|
|
|
+ hooks = Kobo.__GetReauthenticationHook()
|
|
|
+
|
|
|
+ response = self.Session.get( url, params = params, headers = headers, hooks = hooks )
|
|
|
+ response.raise_for_status()
|
|
|
+ jsonResponse = response.json()
|
|
|
+ return jsonResponse
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def __GetContentKeys( contentAccessBookResponse: dict ) -> Dict[ str, str ]:
|
|
|
+ jsonContentKeys = contentAccessBookResponse.get( "ContentKeys" )
|
|
|
+ if jsonContentKeys is None:
|
|
|
+ return {}
|
|
|
+
|
|
|
+ contentKeys = {}
|
|
|
+ for contentKey in jsonContentKeys:
|
|
|
+ contentKeys[ contentKey[ "Name" ] ] = contentKey[ "Value" ]
|
|
|
+ return contentKeys
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def __GetDownloadInfo( productId: str, contentAccessBookResponse: dict ) -> Tuple[ str, bool ]:
|
|
|
+ jsonContentUrls = contentAccessBookResponse.get( "ContentUrls" )
|
|
|
+ if jsonContentUrls is None:
|
|
|
+ raise KoboException( "Content URL can't be found for product '%s'." % productId )
|
|
|
+
|
|
|
+ for jsonContentUrl in jsonContentUrls:
|
|
|
+ if ( jsonContentUrl[ "DRMType" ] == "KDRM" or jsonContentUrl[ "DRMType" ] == "SignedNoDrm" ) and \
|
|
|
+ ( jsonContentUrl[ "UrlFormat" ] == "EPUB3" or jsonContentUrl[ "UrlFormat" ] == "KEPUB" ):
|
|
|
+ hasDrm = jsonContentUrl[ "DRMType" ] == "KDRM"
|
|
|
+ return jsonContentUrl[ "DownloadUrl" ], hasDrm
|
|
|
+
|
|
|
+ raise KoboException( "Download URL for supported formats can't be found for product '%s'." % productId )
|
|
|
+
|
|
|
+ def __DownloadToFile( self, url, outputPath: str ) -> None:
|
|
|
+ response = self.Session.get( url, stream = True )
|
|
|
+ response.raise_for_status()
|
|
|
+ with open( outputPath, "wb" ) as f:
|
|
|
+ for chunk in response.iter_content( chunk_size = 1024 * 256 ):
|
|
|
+ f.write( chunk )
|
|
|
+
|
|
|
+ def Download( self, productId: str, displayProfile: str, outputPath: str ) -> None:
|
|
|
+ jsonResponse = self.__GetContentAccessBook( productId, displayProfile )
|
|
|
+ contentKeys = Kobo.__GetContentKeys( jsonResponse )
|
|
|
+ downloadUrl, hasDrm = Kobo.__GetDownloadInfo( productId, jsonResponse )
|
|
|
+
|
|
|
+ temporaryOutputPath = outputPath + ".downloading"
|
|
|
+
|
|
|
+ try:
|
|
|
+ self.__DownloadToFile( downloadUrl, temporaryOutputPath )
|
|
|
+
|
|
|
+ if hasDrm:
|
|
|
+ drmRemover = KoboDrmRemover( Globals.Settings.DeviceId, Globals.Settings.UserId )
|
|
|
+ drmRemover.RemoveDrm( temporaryOutputPath, outputPath, contentKeys )
|
|
|
+ os.remove( temporaryOutputPath )
|
|
|
+ else:
|
|
|
+ os.rename( temporaryOutputPath, outputPath )
|
|
|
+ except:
|
|
|
+ if os.path.isfile( temporaryOutputPath ):
|
|
|
+ os.remove( temporaryOutputPath )
|
|
|
+ if os.path.isfile( outputPath ):
|
|
|
+ os.remove( outputPath )
|
|
|
+
|
|
|
+ raise
|