test_client.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. #!/usr/bin/env python3
  2. """
  3. Test client for SeaweedFS PostgreSQL protocol support.
  4. This script demonstrates how to connect to SeaweedFS using standard PostgreSQL
  5. libraries and execute various types of queries.
  6. Requirements:
  7. pip install psycopg2-binary
  8. Usage:
  9. python test_client.py
  10. python test_client.py --host localhost --port 5432 --user seaweedfs --database default
  11. """
  12. import sys
  13. import argparse
  14. import time
  15. import traceback
  16. try:
  17. import psycopg2
  18. import psycopg2.extras
  19. except ImportError:
  20. print("Error: psycopg2 not found. Install with: pip install psycopg2-binary")
  21. sys.exit(1)
  22. def test_connection(host, port, user, database, password=None):
  23. """Test basic connection to SeaweedFS PostgreSQL server."""
  24. print(f"🔗 Testing connection to {host}:{port}/{database} as user '{user}'")
  25. try:
  26. conn_params = {
  27. 'host': host,
  28. 'port': port,
  29. 'user': user,
  30. 'database': database,
  31. 'connect_timeout': 10
  32. }
  33. if password:
  34. conn_params['password'] = password
  35. conn = psycopg2.connect(**conn_params)
  36. print("✅ Connection successful!")
  37. # Test basic query
  38. cursor = conn.cursor()
  39. cursor.execute("SELECT 1 as test")
  40. result = cursor.fetchone()
  41. print(f"✅ Basic query successful: {result}")
  42. cursor.close()
  43. conn.close()
  44. return True
  45. except Exception as e:
  46. print(f"❌ Connection failed: {e}")
  47. return False
  48. def test_system_queries(host, port, user, database, password=None):
  49. """Test PostgreSQL system queries."""
  50. print("\n🔧 Testing PostgreSQL system queries...")
  51. try:
  52. conn_params = {
  53. 'host': host,
  54. 'port': port,
  55. 'user': user,
  56. 'database': database
  57. }
  58. if password:
  59. conn_params['password'] = password
  60. conn = psycopg2.connect(**conn_params)
  61. cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
  62. system_queries = [
  63. ("Version", "SELECT version()"),
  64. ("Current Database", "SELECT current_database()"),
  65. ("Current User", "SELECT current_user"),
  66. ("Server Encoding", "SELECT current_setting('server_encoding')"),
  67. ("Client Encoding", "SELECT current_setting('client_encoding')"),
  68. ]
  69. for name, query in system_queries:
  70. try:
  71. cursor.execute(query)
  72. result = cursor.fetchone()
  73. print(f" ✅ {name}: {result[0]}")
  74. except Exception as e:
  75. print(f" ❌ {name}: {e}")
  76. cursor.close()
  77. conn.close()
  78. except Exception as e:
  79. print(f"❌ System queries failed: {e}")
  80. def test_schema_queries(host, port, user, database, password=None):
  81. """Test schema and metadata queries."""
  82. print("\n📊 Testing schema queries...")
  83. try:
  84. conn_params = {
  85. 'host': host,
  86. 'port': port,
  87. 'user': user,
  88. 'database': database
  89. }
  90. if password:
  91. conn_params['password'] = password
  92. conn = psycopg2.connect(**conn_params)
  93. cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
  94. schema_queries = [
  95. ("Show Databases", "SHOW DATABASES"),
  96. ("Show Tables", "SHOW TABLES"),
  97. ("List Schemas", "SELECT 'public' as schema_name"),
  98. ]
  99. for name, query in schema_queries:
  100. try:
  101. cursor.execute(query)
  102. results = cursor.fetchall()
  103. print(f" ✅ {name}: Found {len(results)} items")
  104. for row in results[:3]: # Show first 3 results
  105. print(f" - {dict(row)}")
  106. if len(results) > 3:
  107. print(f" ... and {len(results) - 3} more")
  108. except Exception as e:
  109. print(f" ❌ {name}: {e}")
  110. cursor.close()
  111. conn.close()
  112. except Exception as e:
  113. print(f"❌ Schema queries failed: {e}")
  114. def test_data_queries(host, port, user, database, password=None):
  115. """Test data queries on actual topics."""
  116. print("\n📝 Testing data queries...")
  117. try:
  118. conn_params = {
  119. 'host': host,
  120. 'port': port,
  121. 'user': user,
  122. 'database': database
  123. }
  124. if password:
  125. conn_params['password'] = password
  126. conn = psycopg2.connect(**conn_params)
  127. cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
  128. # First, try to get available tables/topics
  129. cursor.execute("SHOW TABLES")
  130. tables = cursor.fetchall()
  131. if not tables:
  132. print(" ℹ️ No tables/topics found for data testing")
  133. cursor.close()
  134. conn.close()
  135. return
  136. # Test with first available table
  137. table_name = tables[0][0] if tables[0] else 'test_topic'
  138. print(f" 📋 Testing with table: {table_name}")
  139. test_queries = [
  140. (f"Count records in {table_name}", f"SELECT COUNT(*) FROM \"{table_name}\""),
  141. (f"Sample data from {table_name}", f"SELECT * FROM \"{table_name}\" LIMIT 3"),
  142. (f"System columns from {table_name}", f"SELECT _timestamp_ns, _key, _source FROM \"{table_name}\" LIMIT 3"),
  143. (f"Describe {table_name}", f"DESCRIBE \"{table_name}\""),
  144. ]
  145. for name, query in test_queries:
  146. try:
  147. cursor.execute(query)
  148. results = cursor.fetchall()
  149. if "COUNT" in query.upper():
  150. count = results[0][0] if results else 0
  151. print(f" ✅ {name}: {count} records")
  152. elif "DESCRIBE" in query.upper():
  153. print(f" ✅ {name}: {len(results)} columns")
  154. for row in results[:5]: # Show first 5 columns
  155. print(f" - {dict(row)}")
  156. else:
  157. print(f" ✅ {name}: {len(results)} rows")
  158. for row in results:
  159. print(f" - {dict(row)}")
  160. except Exception as e:
  161. print(f" ❌ {name}: {e}")
  162. cursor.close()
  163. conn.close()
  164. except Exception as e:
  165. print(f"❌ Data queries failed: {e}")
  166. def test_prepared_statements(host, port, user, database, password=None):
  167. """Test prepared statements."""
  168. print("\n📝 Testing prepared statements...")
  169. try:
  170. conn_params = {
  171. 'host': host,
  172. 'port': port,
  173. 'user': user,
  174. 'database': database
  175. }
  176. if password:
  177. conn_params['password'] = password
  178. conn = psycopg2.connect(**conn_params)
  179. cursor = conn.cursor()
  180. # Test parameterized query
  181. try:
  182. cursor.execute("SELECT %s as param1, %s as param2", ("hello", 42))
  183. result = cursor.fetchone()
  184. print(f" ✅ Prepared statement: {result}")
  185. except Exception as e:
  186. print(f" ❌ Prepared statement: {e}")
  187. cursor.close()
  188. conn.close()
  189. except Exception as e:
  190. print(f"❌ Prepared statements test failed: {e}")
  191. def test_transaction_support(host, port, user, database, password=None):
  192. """Test transaction support (should be no-op for read-only)."""
  193. print("\n🔄 Testing transaction support...")
  194. try:
  195. conn_params = {
  196. 'host': host,
  197. 'port': port,
  198. 'user': user,
  199. 'database': database
  200. }
  201. if password:
  202. conn_params['password'] = password
  203. conn = psycopg2.connect(**conn_params)
  204. cursor = conn.cursor()
  205. transaction_commands = [
  206. "BEGIN",
  207. "SELECT 1 as in_transaction",
  208. "COMMIT",
  209. "SELECT 1 as after_commit",
  210. ]
  211. for cmd in transaction_commands:
  212. try:
  213. cursor.execute(cmd)
  214. if "SELECT" in cmd:
  215. result = cursor.fetchone()
  216. print(f" ✅ {cmd}: {result}")
  217. else:
  218. print(f" ✅ {cmd}: OK")
  219. except Exception as e:
  220. print(f" ❌ {cmd}: {e}")
  221. cursor.close()
  222. conn.close()
  223. except Exception as e:
  224. print(f"❌ Transaction test failed: {e}")
  225. def test_performance(host, port, user, database, password=None, iterations=10):
  226. """Test query performance."""
  227. print(f"\n⚡ Testing performance ({iterations} iterations)...")
  228. try:
  229. conn_params = {
  230. 'host': host,
  231. 'port': port,
  232. 'user': user,
  233. 'database': database
  234. }
  235. if password:
  236. conn_params['password'] = password
  237. times = []
  238. for i in range(iterations):
  239. start_time = time.time()
  240. conn = psycopg2.connect(**conn_params)
  241. cursor = conn.cursor()
  242. cursor.execute("SELECT 1")
  243. result = cursor.fetchone()
  244. cursor.close()
  245. conn.close()
  246. elapsed = time.time() - start_time
  247. times.append(elapsed)
  248. if i < 3: # Show first 3 iterations
  249. print(f" Iteration {i+1}: {elapsed:.3f}s")
  250. avg_time = sum(times) / len(times)
  251. min_time = min(times)
  252. max_time = max(times)
  253. print(f" ✅ Performance results:")
  254. print(f" - Average: {avg_time:.3f}s")
  255. print(f" - Min: {min_time:.3f}s")
  256. print(f" - Max: {max_time:.3f}s")
  257. except Exception as e:
  258. print(f"❌ Performance test failed: {e}")
  259. def main():
  260. parser = argparse.ArgumentParser(description="Test SeaweedFS PostgreSQL Protocol")
  261. parser.add_argument("--host", default="localhost", help="PostgreSQL server host")
  262. parser.add_argument("--port", type=int, default=5432, help="PostgreSQL server port")
  263. parser.add_argument("--user", default="seaweedfs", help="PostgreSQL username")
  264. parser.add_argument("--password", help="PostgreSQL password")
  265. parser.add_argument("--database", default="default", help="PostgreSQL database")
  266. parser.add_argument("--skip-performance", action="store_true", help="Skip performance tests")
  267. args = parser.parse_args()
  268. print("🧪 SeaweedFS PostgreSQL Protocol Test Client")
  269. print("=" * 50)
  270. # Test basic connection first
  271. if not test_connection(args.host, args.port, args.user, args.database, args.password):
  272. print("\n❌ Basic connection failed. Cannot continue with other tests.")
  273. sys.exit(1)
  274. # Run all tests
  275. try:
  276. test_system_queries(args.host, args.port, args.user, args.database, args.password)
  277. test_schema_queries(args.host, args.port, args.user, args.database, args.password)
  278. test_data_queries(args.host, args.port, args.user, args.database, args.password)
  279. test_prepared_statements(args.host, args.port, args.user, args.database, args.password)
  280. test_transaction_support(args.host, args.port, args.user, args.database, args.password)
  281. if not args.skip_performance:
  282. test_performance(args.host, args.port, args.user, args.database, args.password)
  283. except KeyboardInterrupt:
  284. print("\n\n⚠️ Tests interrupted by user")
  285. sys.exit(0)
  286. except Exception as e:
  287. print(f"\n❌ Unexpected error during testing: {e}")
  288. traceback.print_exc()
  289. sys.exit(1)
  290. print("\n🎉 All tests completed!")
  291. print("\nTo use SeaweedFS with PostgreSQL tools:")
  292. print(f" psql -h {args.host} -p {args.port} -U {args.user} -d {args.database}")
  293. print(f" Connection string: postgresql://{args.user}@{args.host}:{args.port}/{args.database}")
  294. if __name__ == "__main__":
  295. main()