steps.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827
  1. import asyncio
  2. import collections
  3. import json
  4. import os
  5. import re
  6. import socket
  7. import subprocess
  8. import time
  9. from contextlib import closing
  10. from re import RegexFlag
  11. import aiohttp
  12. import openai
  13. from behave import step
  14. from behave.api.async_step import async_run_until_complete
  15. from prometheus_client import parser
  16. @step(u"a server listening on {server_fqdn}:{server_port}")
  17. def step_server_config(context, server_fqdn, server_port):
  18. context.server_fqdn = server_fqdn
  19. context.server_port = int(server_port)
  20. if 'PORT' in os.environ:
  21. context.server_port = int(os.environ['PORT'])
  22. print(f"$PORT set, overriding server port with to {context.server_port}")
  23. context.base_url = f'http://{context.server_fqdn}:{context.server_port}'
  24. context.debug = 'DEBUG' in os.environ and os.environ['DEBUG'] == 'ON'
  25. context.model_alias = None
  26. context.n_ctx = None
  27. context.n_predict = None
  28. context.n_server_predict = None
  29. context.n_slots = None
  30. context.server_api_key = None
  31. context.server_continuous_batching = False
  32. context.server_embeddings = False
  33. context.server_metrics = False
  34. context.server_process = None
  35. context.server_seed = None
  36. context.user_api_key = None
  37. context.tasks_result = []
  38. context.concurrent_tasks = []
  39. context.prompts = []
  40. @step(u'a model file {model_file}')
  41. def step_model_file(context, model_file):
  42. context.model_file = model_file
  43. @step(u'a model alias {model_alias}')
  44. def step_model_alias(context, model_alias):
  45. context.model_alias = model_alias
  46. @step(u'{seed} as server seed')
  47. def step_seed(context, seed):
  48. context.server_seed = int(seed)
  49. @step(u'{n_ctx} KV cache size')
  50. def step_n_ctx(context, n_ctx):
  51. context.n_ctx = int(n_ctx)
  52. @step(u'{n_slots} slots')
  53. def step_n_slots(context, n_slots):
  54. context.n_slots = int(n_slots)
  55. @step(u'{n_predict} server max tokens to predict')
  56. def step_server_n_predict(context, n_predict):
  57. context.n_server_predict = int(n_predict)
  58. @step(u'continuous batching')
  59. def step_server_continuous_batching(context):
  60. context.server_continuous_batching = True
  61. @step(u'embeddings extraction')
  62. def step_server_embeddings(context):
  63. context.server_embeddings = True
  64. @step(u'prometheus compatible metrics exposed')
  65. def step_server_metrics(context):
  66. context.server_metrics = True
  67. @step(u"the server is starting")
  68. def step_start_server(context):
  69. start_server_background(context)
  70. attempts = 0
  71. while True:
  72. with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
  73. result = sock.connect_ex((context.server_fqdn, context.server_port))
  74. if result == 0:
  75. print("\x1b[33;46mserver started!\x1b[0m")
  76. return
  77. attempts += 1
  78. if attempts > 20:
  79. assert False, "server not started"
  80. print(f"waiting for server to start, connect error code = {result}...")
  81. time.sleep(0.1)
  82. @step(u"the server is {expecting_status}")
  83. @async_run_until_complete
  84. async def step_wait_for_the_server_to_be_started(context, expecting_status):
  85. match expecting_status:
  86. case 'healthy':
  87. await wait_for_health_status(context, context.base_url, 200, 'ok')
  88. case 'ready' | 'idle':
  89. await wait_for_health_status(context, context.base_url, 200, 'ok',
  90. params={'fail_on_no_slot': 0, 'include_slots': 0},
  91. slots_idle=context.n_slots,
  92. slots_processing=0,
  93. expected_slots=[{'id': slot_id, 'state': 0}
  94. for slot_id in range(context.n_slots)])
  95. case 'busy':
  96. await wait_for_health_status(context, context.base_url, 503,
  97. 'no slot available',
  98. params={'fail_on_no_slot': 0, 'include_slots': 0},
  99. slots_idle=0,
  100. slots_processing=context.n_slots,
  101. expected_slots=[{'id': slot_id, 'state': 1}
  102. for slot_id in range(context.n_slots)])
  103. case _:
  104. assert False, "unknown status"
  105. @step(u'all slots are {expected_slot_status_string}')
  106. @async_run_until_complete
  107. async def step_all_slots_status(context, expected_slot_status_string):
  108. match expected_slot_status_string:
  109. case 'idle':
  110. expected_slot_status = 0
  111. case 'busy':
  112. expected_slot_status = 1
  113. case _:
  114. assert False, "unknown status"
  115. expected_slots = [{'id': slot_id, 'state': expected_slot_status}
  116. for slot_id in range(context.n_slots)]
  117. await request_slots_status(context, expected_slots)
  118. @step(u'a completion request with {api_error} api error')
  119. @async_run_until_complete
  120. async def step_request_completion(context, api_error):
  121. expect_api_error = api_error == 'raised'
  122. completion = await request_completion(context.prompts.pop(),
  123. context.base_url,
  124. debug=context.debug,
  125. n_predict=context.n_predict,
  126. server_seed=context.server_seed,
  127. expect_api_error=expect_api_error,
  128. user_api_key=context.user_api_key)
  129. context.tasks_result.append(completion)
  130. if context.debug:
  131. print(f"Completion response: {completion}")
  132. if expect_api_error:
  133. assert completion == 401, f"completion must be an 401 status code: {completion}"
  134. @step(u'{predicted_n} tokens are predicted matching {re_content}')
  135. def step_n_tokens_predicted_with_content(context, predicted_n, re_content):
  136. assert_n_tokens_predicted(context.tasks_result.pop(), int(predicted_n), re_content)
  137. @step(u'{predicted_n} tokens are predicted')
  138. def step_n_tokens_predicted(context, predicted_n):
  139. assert_n_tokens_predicted(context.tasks_result.pop(), int(predicted_n))
  140. @step(u'a user prompt {user_prompt}')
  141. def step_user_prompt(context, user_prompt):
  142. context.prompts.append(user_prompt)
  143. @step(u'a system prompt {system_prompt}')
  144. def step_system_prompt(context, system_prompt):
  145. context.system_prompt = system_prompt
  146. @step(u'a model {model}')
  147. def step_model(context, model):
  148. context.model = model
  149. @step(u'{max_tokens} max tokens to predict')
  150. def step_max_tokens(context, max_tokens):
  151. context.n_predict = int(max_tokens)
  152. @step(u'streaming is {enable_streaming}')
  153. def step_streaming(context, enable_streaming):
  154. context.enable_streaming = enable_streaming == 'enabled'
  155. @step(u'a user api key {user_api_key}')
  156. def step_user_api_key(context, user_api_key):
  157. context.user_api_key = user_api_key
  158. @step(u'no user api key')
  159. def step_no_user_api_key(context):
  160. context.user_api_key = None
  161. @step(u'a user api key ')
  162. def step_no_user_api_key_space(context):
  163. context.user_api_key = None
  164. @step(u'a server api key {server_api_key}')
  165. def step_server_api_key(context, server_api_key):
  166. context.server_api_key = server_api_key
  167. @step(u'an OAI compatible chat completions request with {api_error} api error')
  168. @async_run_until_complete
  169. async def step_oai_chat_completions(context, api_error):
  170. if context.debug:
  171. print(f"Submitting OAI compatible completions request...")
  172. expect_api_error = api_error == 'raised'
  173. completion = await oai_chat_completions(context.prompts.pop(),
  174. context.system_prompt,
  175. context.base_url,
  176. '/v1/chat',
  177. False,
  178. model=context.model if hasattr(context, 'model') else None,
  179. n_predict=context.n_predict
  180. if hasattr(context, 'n_predict') else None,
  181. enable_streaming=context.enable_streaming
  182. if hasattr(context, 'enable_streaming') else None,
  183. server_seed=context.server_seed
  184. if hasattr(context, 'server_seed') else None,
  185. user_api_key=context.user_api_key
  186. if hasattr(context, 'user_api_key') else None,
  187. expect_api_error=expect_api_error)
  188. context.tasks_result.append(completion)
  189. if context.debug:
  190. print(f"Completion response: {completion}")
  191. if expect_api_error:
  192. assert completion == 401, f"completion must be an 401 status code: {completion}"
  193. if context.debug:
  194. print(f"Completion response: {completion}")
  195. @step(u'a prompt')
  196. def step_a_prompt(context):
  197. context.prompts.append(context.text)
  198. @step(u'a prompt {prompt}')
  199. def step_a_prompt_prompt(context, prompt):
  200. context.prompts.append(prompt)
  201. @step(u'concurrent completion requests')
  202. @async_run_until_complete()
  203. async def step_concurrent_completion_requests(context):
  204. await concurrent_requests(context,
  205. request_completion,
  206. # prompt is inserted automatically
  207. context.base_url,
  208. debug=context.debug,
  209. n_predict=context.n_predict if hasattr(context, 'n_predict') else None,
  210. server_seed=context.server_seed if hasattr(context, 'server_seed') else None,
  211. user_api_key=context.user_api_key if hasattr(context,
  212. 'user_api_key') else None)
  213. @step(u'concurrent OAI completions requests')
  214. @async_run_until_complete
  215. async def step_oai_chat_completions(context):
  216. await concurrent_requests(context, oai_chat_completions,
  217. # user_prompt is inserted automatically
  218. context.system_prompt,
  219. context.base_url,
  220. '/v1/chat/completions',
  221. True, # async_client
  222. model=context.model
  223. if hasattr(context, 'model') else None,
  224. n_predict=context.n_predict
  225. if hasattr(context, 'n_predict') else None,
  226. enable_streaming=context.enable_streaming
  227. if hasattr(context, 'enable_streaming') else None,
  228. server_seed=context.server_seed
  229. if hasattr(context, 'server_seed') else None,
  230. user_api_key=context.user_api_key
  231. if hasattr(context, 'user_api_key') else None)
  232. @step(u'concurrent OAI completions requests no v1')
  233. @async_run_until_complete
  234. async def step_oai_chat_completions(context):
  235. await concurrent_requests(context, oai_chat_completions,
  236. # user_prompt is inserted automatically
  237. context.system_prompt,
  238. context.base_url,
  239. '/chat/completions',
  240. True, # async_client
  241. model=context.model
  242. if hasattr(context, 'model') else None,
  243. n_predict=context.n_predict
  244. if hasattr(context, 'n_predict') else None,
  245. enable_streaming=context.enable_streaming
  246. if hasattr(context, 'enable_streaming') else None,
  247. server_seed=context.server_seed
  248. if hasattr(context, 'server_seed') else None,
  249. user_api_key=context.user_api_key
  250. if hasattr(context, 'user_api_key') else None)
  251. @step(u'all prompts are predicted')
  252. @async_run_until_complete
  253. async def step_all_prompts_are_predicted(context):
  254. await all_prompts_are_predicted(context)
  255. @step(u'all prompts are predicted with {n_predict} tokens')
  256. @async_run_until_complete
  257. async def step_all_prompts_are_predicted_with_n_tokens(context, n_predict):
  258. expected_predicted_n = int(n_predict)
  259. await all_prompts_are_predicted(context, expected_predicted_n)
  260. async def all_prompts_are_predicted(context, expected_predicted_n=None):
  261. n_completions = await gather_tasks_results(context)
  262. assert n_completions > 0
  263. for i in range(n_completions):
  264. assert_n_tokens_predicted(context.tasks_result.pop(), expected_predicted_n=expected_predicted_n)
  265. assert len(context.concurrent_tasks) == 0, f"{len(context.concurrent_tasks)} pending requests"
  266. @step(u'embeddings are computed for')
  267. @async_run_until_complete
  268. async def step_compute_embedding(context):
  269. context.embeddings = await request_embedding(context.text, base_url=context.base_url)
  270. @step(u'embeddings are generated')
  271. def step_assert_embeddings(context):
  272. if len(context.prompts) == 0:
  273. assert_embeddings(context.embeddings)
  274. else:
  275. assert len(context.embeddings) == len(context.prompts), (f"unexpected response:\n"
  276. f"context.prompts={context.prompts}\n"
  277. f"context.embeddings={context.embeddings}")
  278. for embedding in context.embeddings:
  279. context.prompts.pop()
  280. assert_embeddings(embedding)
  281. @step(u'an OAI compatible embeddings computation request for')
  282. @async_run_until_complete
  283. async def step_oai_compute_embeddings(context):
  284. context.embeddings = await request_oai_embeddings(context.text,
  285. base_url=context.base_url,
  286. user_api_key=context.user_api_key,
  287. model=context.model)
  288. @step(u'an OAI compatible embeddings computation request for multiple inputs')
  289. @async_run_until_complete
  290. async def step_oai_compute_embeddings_multiple_inputs(context):
  291. context.embeddings = await request_oai_embeddings(context.prompts,
  292. base_url=context.base_url,
  293. user_api_key=context.user_api_key,
  294. model=context.model)
  295. @step(u'concurrent embedding requests')
  296. @async_run_until_complete()
  297. async def step_concurrent_embedding_requests(context):
  298. await concurrent_requests(context,
  299. request_embedding,
  300. # prompt is inserted automatically
  301. base_url=context.base_url)
  302. @step(u'concurrent OAI embedding requests')
  303. @async_run_until_complete()
  304. async def step_concurrent_oai_embedding_requests(context):
  305. await concurrent_requests(context,
  306. request_oai_embeddings,
  307. # prompt is inserted automatically
  308. base_url=context.base_url,
  309. async_client=True,
  310. model=context.model)
  311. @step(u'all embeddings are generated')
  312. @async_run_until_complete()
  313. async def all_embeddings_are_generated(context):
  314. n_embedding_requests = await gather_tasks_results(context)
  315. assert n_embedding_requests > 0
  316. for i in range(n_embedding_requests):
  317. assert_embeddings(context.tasks_result.pop())
  318. @step(u'tokenizing')
  319. @async_run_until_complete
  320. async def step_tokenize(context):
  321. context.tokenized_text = context.text
  322. async with aiohttp.ClientSession() as session:
  323. async with session.post(f'{context.base_url}/tokenize',
  324. json={
  325. "content": context.tokenized_text,
  326. }) as response:
  327. assert response.status == 200
  328. tokenize_json = await response.json()
  329. context.tokens = tokenize_json['tokens']
  330. @step(u'tokens can be detokenize')
  331. @async_run_until_complete
  332. async def step_detokenize(context):
  333. assert len(context.tokens) > 0
  334. async with aiohttp.ClientSession() as session:
  335. async with session.post(f'{context.base_url}/detokenize',
  336. json={
  337. "tokens": context.tokens,
  338. }) as response:
  339. assert response.status == 200
  340. detokenize_json = await response.json()
  341. # SPM tokenizer adds a whitespace prefix: https://github.com/google/sentencepiece/issues/15
  342. assert context.tokenized_text == detokenize_json['content'].strip()
  343. @step(u'an OPTIONS request is sent from {origin}')
  344. @async_run_until_complete
  345. async def step_options_request(context, origin):
  346. async with aiohttp.ClientSession() as session:
  347. async with session.options(f'{context.base_url}/v1/chat/completions',
  348. headers={"Origin": origin}) as response:
  349. assert response.status == 200
  350. context.options_response = response
  351. @step(u'CORS header {cors_header} is set to {cors_header_value}')
  352. def step_check_options_header_value(context, cors_header, cors_header_value):
  353. assert context.options_response.headers[cors_header] == cors_header_value
  354. @step(u'prometheus metrics are exposed')
  355. @async_run_until_complete
  356. async def step_prometheus_metrics_exported(context):
  357. async with aiohttp.ClientSession() as session:
  358. async with await session.get(f'{context.base_url}/metrics') as metrics_response:
  359. assert metrics_response.status == 200
  360. assert metrics_response.headers['Content-Type'] == "text/plain; version=0.0.4"
  361. metrics_raw = await metrics_response.text()
  362. metric_exported = False
  363. for metric in parser.text_string_to_metric_families(metrics_raw):
  364. match metric.name:
  365. case "llamacpp:kv_cache_usage_ratio":
  366. assert len(metric.samples) > 0
  367. metric_exported = True
  368. assert metric_exported, "No metrics exported"
  369. async def concurrent_requests(context, f_completion, *args, **kwargs):
  370. n_prompts = len(context.prompts)
  371. if context.debug:
  372. print(f"starting {n_prompts} concurrent completion requests...")
  373. assert n_prompts > 0
  374. for prompt_no in range(n_prompts):
  375. shifted_args = [context.prompts.pop(), *args]
  376. context.concurrent_tasks.append(asyncio.create_task(f_completion(*shifted_args, **kwargs)))
  377. await asyncio.sleep(0.1)
  378. async def request_completion(prompt,
  379. base_url,
  380. debug=False,
  381. n_predict=None,
  382. server_seed=None,
  383. expect_api_error=None,
  384. user_api_key=None):
  385. if debug:
  386. print(f"Sending completion request: {prompt}")
  387. origin = "my.super.domain"
  388. headers = {
  389. 'Origin': origin
  390. }
  391. if user_api_key is not None:
  392. if debug:
  393. print(f"Set user_api_key: {user_api_key}")
  394. headers['Authorization'] = f'Bearer {user_api_key}'
  395. async with aiohttp.ClientSession() as session:
  396. async with session.post(f'{base_url}/completion',
  397. json={
  398. "prompt": prompt,
  399. "n_predict": int(n_predict) if n_predict is not None else -1,
  400. "seed": server_seed if server_seed is not None else 42
  401. },
  402. headers=headers) as response:
  403. if expect_api_error is None or not expect_api_error:
  404. assert response.status == 200
  405. assert response.headers['Access-Control-Allow-Origin'] == origin
  406. return await response.json()
  407. else:
  408. return response.status
  409. async def oai_chat_completions(user_prompt,
  410. system_prompt,
  411. base_url,
  412. base_path,
  413. async_client,
  414. debug=False,
  415. model=None,
  416. n_predict=None,
  417. enable_streaming=None,
  418. server_seed=None,
  419. user_api_key=None,
  420. expect_api_error=None):
  421. if debug:
  422. print(f"Sending OAI Chat completions request: {user_prompt}")
  423. # openai client always expects an api key
  424. user_api_key = user_api_key if user_api_key is not None else 'nope'
  425. seed = server_seed if server_seed is not None else 42
  426. enable_streaming = enable_streaming if enable_streaming is not None else False
  427. payload = {
  428. "messages": [
  429. {
  430. "role": "system",
  431. "content": system_prompt,
  432. },
  433. {
  434. "role": "user",
  435. "content": user_prompt,
  436. }
  437. ],
  438. "model": model,
  439. "max_tokens": n_predict,
  440. "stream": enable_streaming,
  441. "seed": seed
  442. }
  443. completion_response = {
  444. 'content': '',
  445. 'timings': {
  446. 'predicted_n': 0
  447. }
  448. }
  449. if async_client:
  450. origin = 'llama.cpp'
  451. headers = {'Authorization': f'Bearer {user_api_key}', 'Origin': origin}
  452. async with aiohttp.ClientSession() as session:
  453. async with session.post(f'{base_url}{base_path}',
  454. json=payload,
  455. headers=headers) as response:
  456. if enable_streaming:
  457. assert response.status == 200
  458. assert response.headers['Access-Control-Allow-Origin'] == origin
  459. assert response.headers['Content-Type'] == "text/event-stream"
  460. event_received = True
  461. while event_received:
  462. event_received = False
  463. async for line_in_bytes in response.content:
  464. line = line_in_bytes.decode('utf8')
  465. line = line.rstrip('\n').rstrip('\r')
  466. if line == '':
  467. continue
  468. event_data = line.split(': ', 1)
  469. assert event_data[0] == 'data', f'Bad event code received: ```{event_data}```'
  470. chunk_raw = event_data[1]
  471. chunk = json.loads(chunk_raw)
  472. assert len(chunk['choices']) == 1, f"no choices provided, line ```{line}```"
  473. delta = chunk['choices'][0]['delta']
  474. if 'content' in delta:
  475. completion_response['content'] += delta['content']
  476. completion_response['timings']['predicted_n'] += 1
  477. else:
  478. if expect_api_error is None or not expect_api_error:
  479. assert response.status == 200
  480. assert response.headers['Access-Control-Allow-Origin'] == origin
  481. assert response.headers['Content-Type'] == "application/json; charset=utf-8"
  482. chat_completion_raw = await response.json()
  483. completion_response = {
  484. 'content': chat_completion_raw['choices'][0]['message'],
  485. 'timings': {
  486. 'predicted_n': chat_completion_raw['usage']['completion_tokens']
  487. }
  488. }
  489. else:
  490. return response.status
  491. else:
  492. try:
  493. openai.api_key = user_api_key
  494. openai.api_base = f'{base_url}{base_path}'
  495. chat_completion = openai.Completion.create(
  496. messages=payload['messages'],
  497. model=model,
  498. max_tokens=n_predict,
  499. stream=enable_streaming,
  500. seed=seed
  501. )
  502. except openai.error.APIError as e:
  503. if expect_api_error is not None and expect_api_error:
  504. return 401
  505. else:
  506. assert False, f'error raised: {e}'
  507. if enable_streaming:
  508. for chunk in chat_completion:
  509. assert len(chunk.choices) == 1
  510. delta = chunk.choices[0].delta
  511. if 'content' in delta:
  512. completion_response['content'] += delta['content']
  513. completion_response['timings']['predicted_n'] += 1
  514. else:
  515. assert len(chat_completion.choices) == 1
  516. completion_response = {
  517. 'content': chat_completion.choices[0].message.content,
  518. 'timings': {
  519. 'predicted_n': chat_completion.usage.completion_tokens
  520. }
  521. }
  522. if debug:
  523. print("OAI response formatted to llama.cpp:", completion_response)
  524. return completion_response
  525. async def request_embedding(content, base_url=None):
  526. async with aiohttp.ClientSession() as session:
  527. async with session.post(f'{base_url}/embedding',
  528. json={
  529. "content": content,
  530. }) as response:
  531. assert response.status == 200
  532. response_json = await response.json()
  533. return response_json['embedding']
  534. async def request_oai_embeddings(input,
  535. base_url=None, user_api_key=None,
  536. model=None, async_client=False):
  537. # openai client always expects an api_key
  538. user_api_key = user_api_key if user_api_key is not None else 'nope'
  539. if async_client:
  540. origin = 'llama.cpp'
  541. if user_api_key is not None:
  542. headers = {'Authorization': f'Bearer {user_api_key}', 'Origin': origin}
  543. async with aiohttp.ClientSession() as session:
  544. async with session.post(f'{base_url}/v1/embeddings',
  545. json={
  546. "input": input,
  547. "model": model,
  548. },
  549. headers=headers) as response:
  550. assert response.status == 200, f"received status code not expected: {response.status}"
  551. assert response.headers['Access-Control-Allow-Origin'] == origin
  552. assert response.headers['Content-Type'] == "application/json; charset=utf-8"
  553. response_json = await response.json()
  554. assert response_json['model'] == model, f"invalid model received: {response_json['model']}"
  555. assert response_json['object'] == 'list'
  556. return response_json['data']
  557. else:
  558. openai.api_key = user_api_key
  559. openai.api_base = f'{base_url}/v1'
  560. oai_embeddings = openai.Embedding.create(
  561. model=model,
  562. input=input,
  563. )
  564. if isinstance(input, collections.abc.Sequence):
  565. embeddings = []
  566. for an_oai_embeddings in oai_embeddings.data:
  567. embeddings.append(an_oai_embeddings.embedding)
  568. else:
  569. embeddings = oai_embeddings.data.embedding
  570. return embeddings
  571. def assert_n_tokens_predicted(completion_response, expected_predicted_n=None, re_content=None):
  572. content = completion_response['content']
  573. n_predicted = completion_response['timings']['predicted_n']
  574. assert len(content) > 0, "no token predicted"
  575. if expected_predicted_n is not None:
  576. assert n_predicted == expected_predicted_n, (f'invalid number of tokens predicted:'
  577. f' {n_predicted} <> {expected_predicted_n}')
  578. if re_content is not None:
  579. re_content = '^.*' + re_content.replace('<or>', '|') + '.*$'
  580. assert re.match(re_content, content, flags=RegexFlag.IGNORECASE | RegexFlag.MULTILINE | RegexFlag.DOTALL), (
  581. f'invalid tokens predicted:'
  582. f' ```\n{content}\n``` do not match /{re_content}/')
  583. async def gather_tasks_results(context):
  584. n_tasks = len(context.concurrent_tasks)
  585. if context.debug:
  586. print(f"Waiting for all {n_tasks} tasks results...")
  587. for task_no in range(n_tasks):
  588. context.tasks_result.append(await context.concurrent_tasks.pop())
  589. n_completions = len(context.tasks_result)
  590. return n_completions
  591. async def wait_for_health_status(context,
  592. base_url,
  593. expected_http_status_code,
  594. expected_health_status,
  595. params=None,
  596. slots_idle=None,
  597. slots_processing=None,
  598. expected_slots=None):
  599. if context.debug:
  600. print(f"Starting checking for health for expected_health_status={expected_health_status}")
  601. timeout = 3 # seconds
  602. if expected_health_status == 'ok':
  603. timeout = 10 # CI slow inference
  604. interval = 0.5
  605. counter = 0
  606. async with aiohttp.ClientSession() as session:
  607. while True:
  608. async with await session.get(f'{base_url}/health', params=params) as health_response:
  609. status_code = health_response.status
  610. health = await health_response.json()
  611. if context.debug:
  612. print(f"HEALTH - response for expected health status='{expected_health_status}' on "
  613. f"'{base_url}/health'?{params} is {health}")
  614. if (status_code == expected_http_status_code
  615. and health['status'] == expected_health_status
  616. and (slots_idle is None or health['slots_idle'] == slots_idle)
  617. and (slots_processing is None or health['slots_processing'] == slots_processing)):
  618. if expected_slots is not None:
  619. assert_slots_status(health['slots'], expected_slots)
  620. return
  621. if (status_code == expected_http_status_code
  622. and health['status'] == expected_health_status
  623. and (slots_idle is None or health['slots_idle'] == slots_idle)
  624. and (slots_processing is None or health['slots_processing'] == slots_processing)):
  625. if expected_slots is not None:
  626. assert_slots_status(health['slots'], expected_slots)
  627. return
  628. await asyncio.sleep(interval)
  629. counter += interval
  630. if counter >= timeout:
  631. # Sometimes health requests are triggered after completions are predicted
  632. if expected_http_status_code == 503:
  633. if len(context.tasks_result) == 0:
  634. print("\x1b[5;37;43mWARNING: forcing concurrent tasks,"
  635. " busy health check missed, probably too fast inference\x1b[0m")
  636. n_completions = await gather_tasks_results(context)
  637. if n_completions > 0:
  638. return
  639. assert False, f'{expected_health_status} timeout exceeded {counter}s>={timeout}'
  640. def assert_embeddings(embeddings):
  641. assert len(embeddings) > 0
  642. embeddings_computed = False
  643. for emb in embeddings:
  644. if emb != 0:
  645. embeddings_computed = True
  646. assert embeddings_computed, f"Embeddings: {embeddings}"
  647. async def request_slots_status(context, expected_slots):
  648. async with aiohttp.ClientSession() as session:
  649. async with await session.get(f'{context.base_url}/slots') as slots_response:
  650. assert slots_response.status == 200
  651. slots = await slots_response.json()
  652. assert_slots_status(slots, expected_slots)
  653. def assert_slots_status(slots, expected_slots):
  654. assert len(slots) == len(expected_slots)
  655. for slot_id, (expected, slot) in enumerate(zip(expected_slots, slots)):
  656. for key in expected:
  657. assert expected[key] == slot[key], (f"invalid slot {slot_id}"
  658. f" expected[{key}] != slot[{key}]"
  659. f" = {expected[key]} != {slot[key]}")
  660. def start_server_background(context):
  661. context.server_path = '../../../build/bin/server'
  662. if 'LLAMA_SERVER_BIN_PATH' in os.environ:
  663. context.server_path = os.environ['LLAMA_SERVER_BIN_PATH']
  664. server_args = [
  665. '--host', context.server_fqdn,
  666. '--port', context.server_port,
  667. '--model', context.model_file
  668. ]
  669. if context.server_continuous_batching:
  670. server_args.append('--cont-batching')
  671. if context.server_embeddings:
  672. server_args.append('--embedding')
  673. if context.server_metrics:
  674. server_args.append('--metrics')
  675. if context.model_alias is not None:
  676. server_args.extend(['--alias', context.model_alias])
  677. if context.n_ctx is not None:
  678. server_args.extend(['--ctx-size', context.n_ctx])
  679. if context.n_slots is not None:
  680. server_args.extend(['--parallel', context.n_slots])
  681. if context.n_server_predict is not None:
  682. server_args.extend(['--n-predict', context.n_server_predict])
  683. if context.server_api_key is not None:
  684. server_args.extend(['--api-key', context.server_api_key])
  685. if context.debug:
  686. server_args.append('--verbose')
  687. if 'SERVER_LOG_FORMAT_JSON' not in os.environ:
  688. server_args.extend(['--log-format', "text"])
  689. print(f"starting server with: {context.server_path}", *server_args)
  690. context.server_process = subprocess.Popen(
  691. [str(arg) for arg in [context.server_path, *server_args]],
  692. close_fds=True)
  693. print(f"server pid={context.server_process.pid}")