| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- from Globals import Globals
- from KoboDrmRemover import KoboDrmRemover
- import requests
- from typing import Dict, Tuple
- import base64
- import html
- import os
- import re
- import urllib
- import uuid
- class KoboException( Exception ):
- pass
- # The hook's workflow is based on this:
- # https://github.com/requests/toolbelt/blob/master/requests_toolbelt/auth/http_proxy_digest.py
- def ReauthenticationHook( r, *args, **kwargs ):
- if r.status_code != requests.codes.unauthorized: # 401
- return
- print( "Refreshing expired authentication token" )
- # Consume content and release the original connection to allow our new request to reuse the same one.
- r.content
- r.close()
- prep = r.request.copy()
- # Refresh the authentication token and use it.
- Globals.Kobo.RefreshAuthentication()
- headers = Kobo.GetHeaderWithAccessToken()
- prep.headers[ "Authorization" ] = headers[ "Authorization" ]
- # Don't retry to reauthenticate this request again.
- prep.deregister_hook( "response", ReauthenticationHook )
- # Resend the failed request.
- _r = r.connection.send( prep, **kwargs )
- _r.history.append( r )
- _r.request = prep
- return _r
- class Kobo:
- Affiliate = "Kobo"
- ApplicationVersion = "7.1.21543"
- DefaultPlatformId = "00000000-0000-0000-0000-000000004000"
- DisplayProfile = "Android"
- def __init__( self ):
- self.InitializationSettings = {}
- self.Session = requests.session()
- # This could be added to the session but then we would need to add { "Authorization": None } headers to all other
- # functions that doesn't need authorization.
- @staticmethod
- def GetHeaderWithAccessToken() -> dict:
- authorization = "Bearer " + Globals.Settings.AccessToken
- headers = { "Authorization": authorization }
- return headers
- # This could be added to the session too. See the comment at GetHeaderWithAccessToken.
- @staticmethod
- def __GetReauthenticationHook() -> dict:
- return { "response": ReauthenticationHook }
- # The initial device authentication request for a non-logged in user doesn't require a user key, and the returned
- # user key can't be used for anything.
- def AuthenticateDevice( self, userKey: str = "" ) -> None:
- if len( Globals.Settings.DeviceId ) == 0:
- Globals.Settings.DeviceId = str( uuid.uuid4() )
- Globals.Settings.AccessToken = ""
- Globals.Settings.RefreshToken = ""
- postData = {
- "AffiliateName": Kobo.Affiliate,
- "AppVersion": Kobo.ApplicationVersion,
- "ClientKey": base64.b64encode( Kobo.DefaultPlatformId.encode() ).decode(),
- "DeviceId": Globals.Settings.DeviceId,
- "PlatformId": Kobo.DefaultPlatformId
- }
- if len( userKey ) > 0:
- postData[ "UserKey" ] = userKey
- response = self.Session.post( "https://storeapi.kobo.com/v1/auth/device", json = postData )
- response.raise_for_status()
- jsonResponse = response.json()
- if jsonResponse[ "TokenType" ] != "Bearer":
- raise KoboException( "Device authentication returned with an unsupported token type: '%s'" % jsonResponse[ "TokenType" ] )
- Globals.Settings.AccessToken = jsonResponse[ "AccessToken" ]
- Globals.Settings.RefreshToken = jsonResponse[ "RefreshToken" ]
- if not Globals.Settings.AreAuthenticationSettingsSet():
- raise KoboException( "Authentication settings are not set after device authentication." )
- if len( userKey ) > 0:
- Globals.Settings.UserKey = jsonResponse[ "UserKey" ]
- Globals.Settings.Save()
- def RefreshAuthentication( self ) -> None:
- headers = Kobo.GetHeaderWithAccessToken()
- postData = {
- "AppVersion": Kobo.ApplicationVersion,
- "ClientKey": base64.b64encode( Kobo.DefaultPlatformId.encode() ).decode(),
- "PlatformId": Kobo.DefaultPlatformId,
- "RefreshToken": Globals.Settings.RefreshToken
- }
- # The reauthentication hook is intentionally not set.
- response = self.Session.post( "https://storeapi.kobo.com/v1/auth/refresh", json = postData, headers = headers )
- response.raise_for_status()
- jsonResponse = response.json()
- if jsonResponse[ "TokenType" ] != "Bearer":
- raise KoboException( "Authentication refresh returned with an unsupported token type: '%s'" % jsonResponse[ "TokenType" ] )
- Globals.Settings.AccessToken = jsonResponse[ "AccessToken" ]
- Globals.Settings.RefreshToken = jsonResponse[ "RefreshToken" ]
- if not Globals.Settings.AreAuthenticationSettingsSet():
- raise KoboException( "Authentication settings are not set after authentication refresh." )
- Globals.Settings.Save()
- def LoadInitializationSettings( self ) -> None:
- headers = Kobo.GetHeaderWithAccessToken()
- hooks = Kobo.__GetReauthenticationHook()
- response = self.Session.get( "https://storeapi.kobo.com/v1/initialization", headers = headers, hooks = hooks )
- response.raise_for_status()
- jsonResponse = response.json()
- self.InitializationSettings = jsonResponse[ "Resources" ]
- def __GetAuthenticationUrlFromLoginPage( self ) -> str:
- signInPageUrl = self.InitializationSettings[ "sign_in_page" ]
- params = {
- "wsa": Kobo.Affiliate,
- "wscf": "kepub",
- "pwsav": Kobo.ApplicationVersion,
- "pwspid": Kobo.DefaultPlatformId,
- "pwsdid": Globals.Settings.DeviceId
- }
- response = self.Session.get( signInPageUrl, params = params )
- response.raise_for_status()
- htmlResponse = response.text
- match = re.search( r"""<form action="(/[^"]+/Authenticate\?[^"]+)" method="post">""", htmlResponse )
- if match is None:
- raise KoboException( "Can't find login form. The page format might have changed." )
- authenticationUrl = match.group( 1 )
- authenticationUrl = html.unescape( authenticationUrl )
- authenticationUrl = urllib.parse.urljoin( signInPageUrl, authenticationUrl )
- return authenticationUrl
- def Login( self, email: str, password: str ) -> None:
- authenticationUrl = self.__GetAuthenticationUrlFromLoginPage()
- postData = {
- "EditModel.Email": email,
- "EditModel.Password": password,
- "EditModel.AuthenticationAction": "Authenticate",
- "AffiliateObject.Name": Kobo.Affiliate,
- "IsFTE": "False",
- "RequireSharedToken": "False"
- }
- response = self.Session.post( authenticationUrl, data = postData )
- response.raise_for_status()
- htmlResponse = response.text
- match = re.search( r"'(kobo://UserAuthenticated\?[^']+)';", htmlResponse )
- if match is None:
- raise KoboException( "Authenticated user URL can't be found. The page format might have changed." )
- url = match.group( 1 )
- parsed = urllib.parse.urlparse( url )
- parsedQueries = urllib.parse.parse_qs( parsed.query )
- Globals.Settings.UserId = parsedQueries[ "userId" ][ 0 ] # We don't call Settings.Save here, AuthenticateDevice will do that if it succeeds.
- userKey = parsedQueries[ "userKey" ][ 0 ]
- self.AuthenticateDevice( userKey )
- def GetBookInfo( self, productId: str ) -> dict:
- url = self.InitializationSettings[ "book" ].replace( "{ProductId}", productId )
- headers = Kobo.GetHeaderWithAccessToken()
- hooks = Kobo.__GetReauthenticationHook()
- response = self.Session.get( url, headers = headers, hooks = hooks )
- response.raise_for_status()
- jsonResponse = response.json()
- return jsonResponse
- def GetMyBookList( self ) -> dict:
- # There is also "library_items" but that is paged and gives back less info (even with the embed=ProductMetadata
- # query parameter set).
- url = self.InitializationSettings[ "library_sync" ]
- headers = Kobo.GetHeaderWithAccessToken()
- hooks = Kobo.__GetReauthenticationHook()
- response = Globals.Kobo.Session.get( url, headers = headers, hooks = hooks )
- response.raise_for_status()
- bookList = response.json()
- return bookList
- def __GetContentAccessBook( self, productId: str, displayProfile: str ) -> dict:
- url = self.InitializationSettings[ "content_access_book" ].replace( "{ProductId}", productId )
- params = { "DisplayProfile": displayProfile }
- headers = Kobo.GetHeaderWithAccessToken()
- hooks = Kobo.__GetReauthenticationHook()
- response = self.Session.get( url, params = params, headers = headers, hooks = hooks )
- response.raise_for_status()
- jsonResponse = response.json()
- return jsonResponse
- @staticmethod
- def __GetContentKeys( contentAccessBookResponse: dict ) -> Dict[ str, str ]:
- jsonContentKeys = contentAccessBookResponse.get( "ContentKeys" )
- if jsonContentKeys is None:
- return {}
- contentKeys = {}
- for contentKey in jsonContentKeys:
- contentKeys[ contentKey[ "Name" ] ] = contentKey[ "Value" ]
- return contentKeys
- @staticmethod
- def __GetDownloadInfo( productId: str, contentAccessBookResponse: dict ) -> Tuple[ str, bool ]:
- jsonContentUrls = contentAccessBookResponse.get( "ContentUrls" )
- if jsonContentUrls is None:
- raise KoboException( "Content URL can't be found for product '%s'." % productId )
- for jsonContentUrl in jsonContentUrls:
- if ( jsonContentUrl[ "DRMType" ] == "KDRM" or jsonContentUrl[ "DRMType" ] == "SignedNoDrm" ) and \
- ( jsonContentUrl[ "UrlFormat" ] == "EPUB3" or jsonContentUrl[ "UrlFormat" ] == "KEPUB" ):
- hasDrm = jsonContentUrl[ "DRMType" ] == "KDRM"
- return jsonContentUrl[ "DownloadUrl" ], hasDrm
- raise KoboException( "Download URL for supported formats can't be found for product '%s'." % productId )
- def __DownloadToFile( self, url, outputPath: str ) -> None:
- response = self.Session.get( url, stream = True )
- response.raise_for_status()
- with open( outputPath, "wb" ) as f:
- for chunk in response.iter_content( chunk_size = 1024 * 256 ):
- f.write( chunk )
- def Download( self, productId: str, displayProfile: str, outputPath: str ) -> None:
- jsonResponse = self.__GetContentAccessBook( productId, displayProfile )
- contentKeys = Kobo.__GetContentKeys( jsonResponse )
- downloadUrl, hasDrm = Kobo.__GetDownloadInfo( productId, jsonResponse )
- temporaryOutputPath = outputPath + ".downloading"
- try:
- self.__DownloadToFile( downloadUrl, temporaryOutputPath )
- if hasDrm:
- drmRemover = KoboDrmRemover( Globals.Settings.DeviceId, Globals.Settings.UserId )
- drmRemover.RemoveDrm( temporaryOutputPath, outputPath, contentKeys )
- os.remove( temporaryOutputPath )
- else:
- os.rename( temporaryOutputPath, outputPath )
- except:
- if os.path.isfile( temporaryOutputPath ):
- os.remove( temporaryOutputPath )
- if os.path.isfile( outputPath ):
- os.remove( outputPath )
- raise
|