test_embedding.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. import base64
  2. import struct
  3. import pytest
  4. from openai import OpenAI
  5. from utils import *
  6. server = ServerPreset.bert_bge_small()
  7. EPSILON = 1e-3
  8. @pytest.fixture(scope="module", autouse=True)
  9. def create_server():
  10. global server
  11. server = ServerPreset.bert_bge_small()
  12. def test_embedding_single():
  13. global server
  14. server.pooling = 'last'
  15. server.start()
  16. res = server.make_request("POST", "/v1/embeddings", data={
  17. "input": "I believe the meaning of life is",
  18. })
  19. assert res.status_code == 200
  20. assert len(res.body['data']) == 1
  21. assert 'embedding' in res.body['data'][0]
  22. assert len(res.body['data'][0]['embedding']) > 1
  23. # make sure embedding vector is normalized
  24. assert abs(sum([x ** 2 for x in res.body['data'][0]['embedding']]) - 1) < EPSILON
  25. def test_embedding_multiple():
  26. global server
  27. server.pooling = 'last'
  28. server.start()
  29. res = server.make_request("POST", "/v1/embeddings", data={
  30. "input": [
  31. "I believe the meaning of life is",
  32. "Write a joke about AI from a very long prompt which will not be truncated",
  33. "This is a test",
  34. "This is another test",
  35. ],
  36. })
  37. assert res.status_code == 200
  38. assert len(res.body['data']) == 4
  39. for d in res.body['data']:
  40. assert 'embedding' in d
  41. assert len(d['embedding']) > 1
  42. @pytest.mark.parametrize(
  43. "input,is_multi_prompt",
  44. [
  45. # do not crash on empty input
  46. ("", False),
  47. # single prompt
  48. ("string", False),
  49. ([12, 34, 56], False),
  50. ([12, 34, "string", 56, 78], False),
  51. # multiple prompts
  52. (["string1", "string2"], True),
  53. (["string1", [12, 34, 56]], True),
  54. ([[12, 34, 56], [12, 34, 56]], True),
  55. ([[12, 34, 56], [12, "string", 34, 56]], True),
  56. ]
  57. )
  58. def test_embedding_mixed_input(input, is_multi_prompt: bool):
  59. global server
  60. server.start()
  61. res = server.make_request("POST", "/v1/embeddings", data={"input": input})
  62. assert res.status_code == 200
  63. data = res.body['data']
  64. if is_multi_prompt:
  65. assert len(data) == len(input)
  66. for d in data:
  67. assert 'embedding' in d
  68. assert len(d['embedding']) > 1
  69. else:
  70. assert 'embedding' in data[0]
  71. assert len(data[0]['embedding']) > 1
  72. def test_embedding_pooling_none():
  73. global server
  74. server.pooling = 'none'
  75. server.start()
  76. res = server.make_request("POST", "/embeddings", data={
  77. "input": "hello hello hello",
  78. })
  79. assert res.status_code == 200
  80. assert 'embedding' in res.body[0]
  81. assert len(res.body[0]['embedding']) == 5 # 3 text tokens + 2 special
  82. # make sure embedding vector is not normalized
  83. for x in res.body[0]['embedding']:
  84. assert abs(sum([x ** 2 for x in x]) - 1) > EPSILON
  85. def test_embedding_pooling_none_oai():
  86. global server
  87. server.pooling = 'none'
  88. server.start()
  89. res = server.make_request("POST", "/v1/embeddings", data={
  90. "input": "hello hello hello",
  91. })
  92. # /v1/embeddings does not support pooling type 'none'
  93. assert res.status_code == 400
  94. assert "error" in res.body
  95. def test_embedding_openai_library_single():
  96. global server
  97. server.pooling = 'last'
  98. server.start()
  99. client = OpenAI(api_key="dummy", base_url=f"http://{server.server_host}:{server.server_port}/v1")
  100. res = client.embeddings.create(model="text-embedding-3-small", input="I believe the meaning of life is")
  101. assert len(res.data) == 1
  102. assert len(res.data[0].embedding) > 1
  103. def test_embedding_openai_library_multiple():
  104. global server
  105. server.pooling = 'last'
  106. server.start()
  107. client = OpenAI(api_key="dummy", base_url=f"http://{server.server_host}:{server.server_port}/v1")
  108. res = client.embeddings.create(model="text-embedding-3-small", input=[
  109. "I believe the meaning of life is",
  110. "Write a joke about AI from a very long prompt which will not be truncated",
  111. "This is a test",
  112. "This is another test",
  113. ])
  114. assert len(res.data) == 4
  115. for d in res.data:
  116. assert len(d.embedding) > 1
  117. def test_embedding_error_prompt_too_long():
  118. global server
  119. server.pooling = 'last'
  120. server.start()
  121. res = server.make_request("POST", "/v1/embeddings", data={
  122. "input": "This is a test " * 512,
  123. })
  124. assert res.status_code != 200
  125. assert "too large" in res.body["error"]["message"]
  126. def test_same_prompt_give_same_result():
  127. server.pooling = 'last'
  128. server.start()
  129. res = server.make_request("POST", "/v1/embeddings", data={
  130. "input": [
  131. "I believe the meaning of life is",
  132. "I believe the meaning of life is",
  133. "I believe the meaning of life is",
  134. "I believe the meaning of life is",
  135. "I believe the meaning of life is",
  136. ],
  137. })
  138. assert res.status_code == 200
  139. assert len(res.body['data']) == 5
  140. for i in range(1, len(res.body['data'])):
  141. v0 = res.body['data'][0]['embedding']
  142. vi = res.body['data'][i]['embedding']
  143. for x, y in zip(v0, vi):
  144. assert abs(x - y) < EPSILON
  145. @pytest.mark.parametrize(
  146. "content,n_tokens",
  147. [
  148. ("I believe the meaning of life is", 9),
  149. ("This is a test", 6),
  150. ]
  151. )
  152. def test_embedding_usage_single(content, n_tokens):
  153. global server
  154. server.start()
  155. res = server.make_request("POST", "/v1/embeddings", data={"input": content})
  156. assert res.status_code == 200
  157. assert res.body['usage']['prompt_tokens'] == res.body['usage']['total_tokens']
  158. assert res.body['usage']['prompt_tokens'] == n_tokens
  159. def test_embedding_usage_multiple():
  160. global server
  161. server.start()
  162. res = server.make_request("POST", "/v1/embeddings", data={
  163. "input": [
  164. "I believe the meaning of life is",
  165. "I believe the meaning of life is",
  166. ],
  167. })
  168. assert res.status_code == 200
  169. assert res.body['usage']['prompt_tokens'] == res.body['usage']['total_tokens']
  170. assert res.body['usage']['prompt_tokens'] == 2 * 9
  171. def test_embedding_openai_library_base64():
  172. server.start()
  173. test_input = "Test base64 embedding output"
  174. # get embedding in default format
  175. res = server.make_request("POST", "/v1/embeddings", data={
  176. "input": test_input
  177. })
  178. assert res.status_code == 200
  179. vec0 = res.body["data"][0]["embedding"]
  180. # get embedding in base64 format
  181. res = server.make_request("POST", "/v1/embeddings", data={
  182. "input": test_input,
  183. "encoding_format": "base64"
  184. })
  185. assert res.status_code == 200
  186. assert "data" in res.body
  187. assert len(res.body["data"]) == 1
  188. embedding_data = res.body["data"][0]
  189. assert "embedding" in embedding_data
  190. assert isinstance(embedding_data["embedding"], str)
  191. # Verify embedding is valid base64
  192. decoded = base64.b64decode(embedding_data["embedding"])
  193. # Verify decoded data can be converted back to float array
  194. float_count = len(decoded) // 4 # 4 bytes per float
  195. floats = struct.unpack(f'{float_count}f', decoded)
  196. assert len(floats) > 0
  197. assert all(isinstance(x, float) for x in floats)
  198. assert len(floats) == len(vec0)
  199. # make sure the decoded data is the same as the original
  200. for x, y in zip(floats, vec0):
  201. assert abs(x - y) < EPSILON