steps.py 51 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292
  1. import asyncio
  2. import collections
  3. import json
  4. import os
  5. import re
  6. import socket
  7. import subprocess
  8. import sys
  9. import threading
  10. import time
  11. from contextlib import closing
  12. from re import RegexFlag
  13. import aiohttp
  14. import numpy as np
  15. import openai
  16. from behave import step
  17. from behave.api.async_step import async_run_until_complete
  18. from prometheus_client import parser
  19. @step("a server listening on {server_fqdn}:{server_port}")
  20. def step_server_config(context, server_fqdn, server_port):
  21. context.server_fqdn = server_fqdn
  22. context.server_port = int(server_port)
  23. context.n_gpu_layer = None
  24. if 'PORT' in os.environ:
  25. context.server_port = int(os.environ['PORT'])
  26. print(f"$PORT set, overriding server port with to {context.server_port}")
  27. if 'FQDN' in os.environ:
  28. context.server_fqdn = os.environ['FQDN']
  29. print(f"$FQDN set, overriding server fqdn with to {context.server_fqdn}")
  30. if 'N_GPU_LAYERS' in os.environ:
  31. context.n_gpu_layer = int(os.environ['N_GPU_LAYERS'])
  32. print(f"$N_GPU_LAYERS set, overriding n_gpu_layer with to {context.n_gpu_layer}")
  33. context.base_url = f'http://{context.server_fqdn}:{context.server_port}'
  34. context.model_alias = None
  35. context.model_file = None
  36. context.model_hf_repo = None
  37. context.model_hf_file = None
  38. context.model_url = None
  39. context.n_batch = None
  40. context.n_ubatch = None
  41. context.n_ctx = None
  42. context.n_ga = None
  43. context.n_ga_w = None
  44. context.n_predict = None
  45. context.n_prompts = 0
  46. context.n_server_predict = None
  47. context.slot_save_path = None
  48. context.id_slot = None
  49. context.cache_prompt = None
  50. context.n_slots = None
  51. context.prompt_prefix = None
  52. context.prompt_suffix = None
  53. context.server_api_key = None
  54. context.server_continuous_batching = False
  55. context.server_embeddings = False
  56. context.server_metrics = False
  57. context.server_process = None
  58. context.seed = None
  59. context.draft = None
  60. context.server_seed = None
  61. context.user_api_key = None
  62. context.response_format = None
  63. context.temperature = None
  64. context.tasks_result = []
  65. context.concurrent_tasks = []
  66. context.prompts = []
  67. @step('a model file {hf_file} from HF repo {hf_repo}')
  68. def step_download_hf_model(context, hf_file, hf_repo):
  69. context.model_hf_repo = hf_repo
  70. context.model_hf_file = hf_file
  71. context.model_file = os.path.basename(hf_file)
  72. @step('a model file {model_file}')
  73. def step_model_file(context, model_file):
  74. context.model_file = model_file
  75. @step('a model url {model_url}')
  76. def step_model_url(context, model_url):
  77. context.model_url = model_url
  78. @step('a model alias {model_alias}')
  79. def step_model_alias(context, model_alias):
  80. context.model_alias = model_alias
  81. @step('{seed:d} as server seed')
  82. def step_seed(context, seed):
  83. context.server_seed = seed
  84. @step('{ngl:d} GPU offloaded layers')
  85. def step_n_gpu_layer(context, ngl):
  86. if 'N_GPU_LAYERS' in os.environ:
  87. new_ngl = int(os.environ['N_GPU_LAYERS'])
  88. if context.debug:
  89. print(f"-ngl upgraded from {ngl} to {new_ngl}")
  90. ngl = new_ngl
  91. context.n_gpu_layer = ngl
  92. @step('{draft:d} as draft')
  93. def step_draft(context, draft):
  94. context.draft = draft
  95. @step('{n_ctx:d} KV cache size')
  96. def step_n_ctx(context, n_ctx):
  97. context.n_ctx = n_ctx
  98. @step('{n_slots:d} slots')
  99. def step_n_slots(context, n_slots):
  100. context.n_slots = n_slots
  101. @step('{n_predict:d} server max tokens to predict')
  102. def step_server_n_predict(context, n_predict):
  103. context.n_server_predict = n_predict
  104. @step('{slot_save_path} as slot save path')
  105. def step_slot_save_path(context, slot_save_path):
  106. context.slot_save_path = slot_save_path
  107. @step('using slot id {id_slot:d}')
  108. def step_id_slot(context, id_slot):
  109. context.id_slot = id_slot
  110. @step('prompt caching is enabled')
  111. def step_enable_prompt_cache(context):
  112. context.cache_prompt = True
  113. @step('continuous batching')
  114. def step_server_continuous_batching(context):
  115. context.server_continuous_batching = True
  116. @step('embeddings extraction')
  117. def step_server_embeddings(context):
  118. context.server_embeddings = True
  119. @step('prometheus compatible metrics exposed')
  120. def step_server_metrics(context):
  121. context.server_metrics = True
  122. @step("the server is starting")
  123. def step_start_server(context):
  124. start_server_background(context)
  125. attempts = 0
  126. max_attempts = 20
  127. if 'GITHUB_ACTIONS' in os.environ:
  128. max_attempts *= 2
  129. addrs = socket.getaddrinfo(context.server_fqdn, context.server_port, type=socket.SOCK_STREAM)
  130. family, typ, proto, _, sockaddr = addrs[0]
  131. while True:
  132. with closing(socket.socket(family, typ, proto)) as sock:
  133. result = sock.connect_ex(sockaddr)
  134. if result == 0:
  135. print("\x1b[33;46mserver started!\x1b[0m")
  136. return
  137. attempts += 1
  138. if attempts > max_attempts:
  139. assert False, "server not started"
  140. print(f"waiting for server to start, connect error code = {result}...")
  141. time.sleep(0.1)
  142. @step("the server is {expecting_status}")
  143. @async_run_until_complete
  144. async def step_wait_for_the_server_to_be_started(context, expecting_status):
  145. match expecting_status:
  146. case 'healthy':
  147. await wait_for_health_status(context, context.base_url, 200, 'ok',
  148. timeout=30)
  149. case 'ready' | 'idle':
  150. await wait_for_health_status(context, context.base_url, 200, 'ok',
  151. timeout=10,
  152. params={'fail_on_no_slot': 0, 'include_slots': 0},
  153. slots_idle=context.n_slots,
  154. slots_processing=0,
  155. expected_slots=[{'id': slot_id, 'state': 0}
  156. for slot_id in
  157. range(context.n_slots if context.n_slots else 1)])
  158. case 'busy':
  159. await wait_for_health_status(context, context.base_url, 503,
  160. 'no slot available',
  161. params={'fail_on_no_slot': 0, 'include_slots': 0},
  162. slots_idle=0,
  163. slots_processing=context.n_slots,
  164. expected_slots=[{'id': slot_id, 'state': 1}
  165. for slot_id in
  166. range(context.n_slots if context.n_slots else 1)])
  167. case _:
  168. assert False, "unknown status"
  169. @step('all slots are {expected_slot_status_string}')
  170. @async_run_until_complete
  171. async def step_all_slots_status(context, expected_slot_status_string):
  172. match expected_slot_status_string:
  173. case 'idle':
  174. expected_slot_status = 0
  175. case 'busy':
  176. expected_slot_status = 1
  177. case _:
  178. assert False, "unknown status"
  179. expected_slots = [{'id': slot_id, 'state': expected_slot_status}
  180. for slot_id in range(context.n_slots)]
  181. await request_slots_status(context, expected_slots)
  182. @step('a completion request with {api_error} api error')
  183. @async_run_until_complete
  184. async def step_request_completion(context, api_error):
  185. expect_api_error = api_error == 'raised'
  186. seeds = await completions_seed(context, num_seeds=1)
  187. completion = await request_completion(context.prompts.pop(),
  188. seeds[0] if seeds is not None else seeds,
  189. context.base_url,
  190. debug=context.debug,
  191. n_predict=context.n_predict,
  192. cache_prompt=context.cache_prompt,
  193. id_slot=context.id_slot,
  194. expect_api_error=expect_api_error,
  195. user_api_key=context.user_api_key,
  196. temperature=context.temperature)
  197. context.tasks_result.append(completion)
  198. if context.debug:
  199. print(f"Completion response: {completion}")
  200. if expect_api_error:
  201. assert completion == 401, f"completion must be an 401 status code: {completion}"
  202. @step('{predicted_n:d} tokens are predicted matching {re_content}')
  203. def step_n_tokens_predicted_with_content(context, predicted_n, re_content):
  204. context.completion = context.tasks_result.pop()
  205. assert_n_tokens_predicted(context.completion, predicted_n, re_content)
  206. @step('{predicted_n:d} tokens are predicted')
  207. def step_n_tokens_predicted(context, predicted_n):
  208. context.completion = context.tasks_result.pop()
  209. assert_n_tokens_predicted(context.completion, predicted_n)
  210. @step('all predictions are equal')
  211. @async_run_until_complete
  212. async def step_predictions_equal(context):
  213. n_completions = await gather_tasks_results(context)
  214. assert n_completions >= 2, "need at least 2 completions"
  215. assert_all_predictions_equal(context.tasks_result)
  216. context.tasks_result = []
  217. @step('all predictions are different')
  218. @async_run_until_complete
  219. async def step_predictions_equal(context):
  220. n_completions = await gather_tasks_results(context)
  221. assert n_completions >= 2, "need at least 2 completions"
  222. assert_all_predictions_different(context.tasks_result)
  223. context.tasks_result = []
  224. @step('the completion is truncated')
  225. def step_assert_completion_truncated(context):
  226. step_assert_completion_truncated(context, '')
  227. @step('the completion is {truncated} truncated')
  228. def step_assert_completion_truncated(context, truncated):
  229. truncated = truncated != "not"
  230. assert context.completion['truncated'] == truncated, f'{context.completion}'
  231. @step('{n_prompt:d} prompt tokens are processed')
  232. def step_impl(context, n_prompt):
  233. assert n_prompt < 0 or n_prompt == context.completion['timings']['prompt_n'], f"n_prompt={context.completion['timings']['prompt_n']}"
  234. @step('a user prompt {user_prompt}')
  235. def step_user_prompt(context, user_prompt):
  236. context.prompts.append(user_prompt)
  237. context.n_prompts = len(context.prompts)
  238. @step('a system prompt {system_prompt}')
  239. def step_system_prompt(context, system_prompt):
  240. context.system_prompt = system_prompt
  241. @step('a model {model}')
  242. def step_model(context, model):
  243. context.model = model
  244. @step('{max_tokens:d} max tokens to predict')
  245. def step_max_tokens(context, max_tokens):
  246. context.n_predict = max_tokens
  247. @step('a response format {response_format}')
  248. def step_response_format(context, response_format):
  249. context.response_format = json.loads(response_format)
  250. @step('{temperature:f} temperature')
  251. def step_temperature(context, temperature):
  252. context.temperature = temperature
  253. @step('streaming is {enable_streaming}')
  254. def step_streaming(context, enable_streaming):
  255. context.enable_streaming = enable_streaming == 'enabled'
  256. @step('a user api key {user_api_key}')
  257. def step_user_api_key(context, user_api_key):
  258. context.user_api_key = user_api_key
  259. @step('no user api key')
  260. def step_no_user_api_key(context):
  261. context.user_api_key = None
  262. @step('a user api key ')
  263. def step_no_user_api_key_space(context):
  264. context.user_api_key = None
  265. @step('a server api key {server_api_key}')
  266. def step_server_api_key(context, server_api_key):
  267. context.server_api_key = server_api_key
  268. @step('{n_junk:d} as number of junk')
  269. def step_n_junk(context, n_junk):
  270. context.n_junk = n_junk
  271. @step('{n_batch:d} as batch size')
  272. def step_n_batch(context, n_batch):
  273. context.n_batch = n_batch
  274. @step('{n_ubatch:d} as ubatch size')
  275. def step_n_ubatch(context, n_ubatch):
  276. context.n_ubatch = n_ubatch
  277. @step('{seed:d} as seed')
  278. def step_seed(context, seed):
  279. if context.seed is None:
  280. context.seed = [seed]
  281. else:
  282. context.seed.append(seed)
  283. @step('a prefix prompt')
  284. def step_prompt_prefix(context):
  285. context.prompt_prefix = context_text(context)
  286. @step('a junk suffix prompt')
  287. def step_prompt_junk_suffix(context):
  288. context.prompt_junk_suffix = context_text(context)
  289. @step('a suffix prompt')
  290. def step_prompt_suffix(context):
  291. context.prompt_suffix = context_text(context)
  292. @step('{n_ga:d} group attention factor'
  293. ' to extend context size through self-extend')
  294. def step_impl(context, n_ga):
  295. context.n_ga = n_ga
  296. @step('{n_ga_w:d} group attention width to extend context size through self-extend')
  297. def step_impl(context, n_ga_w):
  298. context.n_ga_w = n_ga_w
  299. @step('a passkey prompt template')
  300. def step_prompt_passkey(context):
  301. context.prompt_passkey = context_text(context)
  302. @step('{n_prompts:d} fixed prompts')
  303. def step_fixed_prompts(context, n_prompts):
  304. context.prompts.extend([str(0)*(context.n_batch if context.n_batch is not None else 512) for i in range(n_prompts)])
  305. context.n_prompts = n_prompts
  306. @step('a "{passkey}" passkey challenge prompt with the passkey inserted every {i_pos:d} junk')
  307. def step_prompt_passkey(context, passkey, i_pos):
  308. prompt = ""
  309. for i in range(context.n_junk):
  310. if i % context.n_junk == i_pos:
  311. prompt += context.prompt_passkey # the passkey is already substituted
  312. prompt += context.prompt_junk_suffix
  313. if context.debug:
  314. passkey_highlight = "\x1b[33m" + passkey + "\x1b[0m"
  315. print(f"Passkey challenge:\n```{prompt.replace(passkey, passkey_highlight)}```")
  316. context.prompts.append(context.prompt_prefix + prompt + context.prompt_suffix)
  317. context.n_prompts = len(context.prompts)
  318. @step('an OAI compatible chat completions request with {api_error} api error')
  319. @async_run_until_complete
  320. async def step_oai_chat_completions(context, api_error):
  321. if context.debug:
  322. print(f"Submitting OAI compatible completions request...")
  323. expect_api_error = api_error == 'raised'
  324. seeds = await completions_seed(context, num_seeds=1),
  325. completion = await oai_chat_completions(context.prompts.pop(),
  326. seeds[0] if seeds is not None else seeds,
  327. context.system_prompt,
  328. context.base_url,
  329. '/v1/chat',
  330. False,
  331. model=context.model if hasattr(context, 'model') else None,
  332. n_predict=context.n_predict
  333. if hasattr(context, 'n_predict') else None,
  334. enable_streaming=context.enable_streaming
  335. if hasattr(context, 'enable_streaming') else None,
  336. response_format=context.response_format
  337. if hasattr(context, 'response_format') else None,
  338. user_api_key=context.user_api_key
  339. if hasattr(context, 'user_api_key') else None,
  340. expect_api_error=expect_api_error)
  341. context.tasks_result.append(completion)
  342. if context.debug:
  343. print(f"Completion response: {completion}")
  344. if expect_api_error:
  345. assert completion == 401, f"completion must be an 401 status code: {completion}"
  346. if context.debug:
  347. print(f"Completion response: {completion}")
  348. @step('a prompt')
  349. def step_a_prompt(context):
  350. context.prompts.append(context_text(context))
  351. context.n_prompts = len(context.prompts)
  352. @step('a prompt {prompt}')
  353. def step_a_prompt_prompt(context, prompt):
  354. context.prompts.append(prompt)
  355. context.n_prompts = len(context.prompts)
  356. @step('{num_prompts:d} prompts {prompt} with seed {seed:d}')
  357. def step_many_prompts(context, num_prompts, prompt, seed):
  358. if context.seed is None:
  359. context.seed = []
  360. for _ in range(num_prompts):
  361. context.seed.append(seed)
  362. context.prompts.append(prompt)
  363. context.n_prompts = len(context.prompts)
  364. @step('concurrent completion requests')
  365. @async_run_until_complete()
  366. async def step_concurrent_completion_requests(context):
  367. await concurrent_requests(
  368. context,
  369. request_completion,
  370. # prompt is inserted automatically
  371. context.base_url,
  372. debug=context.debug,
  373. prompt_prefix=context.prompt_prefix,
  374. prompt_suffix=context.prompt_suffix,
  375. n_predict=context.n_predict if hasattr(context, 'n_predict') else None,
  376. user_api_key=context.user_api_key if hasattr(context, 'user_api_key') else None,
  377. temperature=context.temperature,
  378. )
  379. @step('concurrent OAI completions requests')
  380. @async_run_until_complete
  381. async def step_oai_chat_completions(context):
  382. await concurrent_requests(context, oai_chat_completions,
  383. # user_prompt is inserted automatically
  384. context.system_prompt,
  385. context.base_url,
  386. '/v1/chat/completions',
  387. True, # async_client
  388. model=context.model
  389. if hasattr(context, 'model') else None,
  390. n_predict=context.n_predict
  391. if hasattr(context, 'n_predict') else None,
  392. enable_streaming=context.enable_streaming
  393. if hasattr(context, 'enable_streaming') else None,
  394. response_format=context.response_format
  395. if hasattr(context, 'response_format') else None,
  396. user_api_key=context.user_api_key
  397. if hasattr(context, 'user_api_key') else None)
  398. @step('concurrent OAI completions requests no v1')
  399. @async_run_until_complete
  400. async def step_oai_chat_completions(context):
  401. await concurrent_requests(context, oai_chat_completions,
  402. # user_prompt is inserted automatically
  403. context.system_prompt,
  404. context.base_url,
  405. '/chat/completions',
  406. True, # async_client
  407. model=context.model
  408. if hasattr(context, 'model') else None,
  409. n_predict=context.n_predict
  410. if hasattr(context, 'n_predict') else None,
  411. enable_streaming=context.enable_streaming
  412. if hasattr(context, 'enable_streaming') else None,
  413. response_format=context.response_format
  414. if hasattr(context, 'response_format') else None,
  415. user_api_key=context.user_api_key
  416. if hasattr(context, 'user_api_key') else None)
  417. @step('all prompts are predicted')
  418. @async_run_until_complete
  419. async def step_all_prompts_are_predicted(context):
  420. await all_prompts_are_predicted(context)
  421. @step('all prompts are predicted with {n_expected_predicted:d} tokens')
  422. @async_run_until_complete
  423. async def step_all_prompts_are_predicted_with_n_tokens(context, n_expected_predicted):
  424. await all_prompts_are_predicted(context, n_expected_predicted)
  425. async def all_prompts_are_predicted(context, expected_predicted_n=None):
  426. n_completions = await gather_tasks_results(context)
  427. assert n_completions > 0
  428. for i in range(n_completions):
  429. assert_n_tokens_predicted(context.tasks_result.pop(), expected_predicted_n=expected_predicted_n)
  430. assert len(context.concurrent_tasks) == 0, f"{len(context.concurrent_tasks)} pending requests"
  431. @step('embeddings are computed for')
  432. @async_run_until_complete
  433. async def step_compute_embedding(context):
  434. context.n_prompts = 1
  435. context.embeddings = await request_embedding(context_text(context), None, base_url=context.base_url)
  436. @step('all embeddings are the same')
  437. @async_run_until_complete
  438. async def step_all_embeddings_are_the_same(context):
  439. n_embedding_requests = await gather_tasks_results(context)
  440. assert n_embedding_requests > 0
  441. embeddings = []
  442. for i in range(n_embedding_requests):
  443. embedding = context.tasks_result.pop().pop()
  444. embeddings.append(embedding)
  445. assert_embeddings(embedding)
  446. n = len(embeddings)
  447. for i in range(n-1):
  448. for j in range(i+1, n):
  449. embedding1 = np.array(embeddings[i])
  450. embedding2 = np.array(embeddings[j])
  451. if context.debug:
  452. print(f"embedding1: {embedding1[-8:]}")
  453. print(f"embedding2: {embedding2[-8:]}")
  454. similarity = np.dot(embedding1, embedding2) / (np.linalg.norm(embedding1) * np.linalg.norm(embedding2))
  455. msg = f"Similarity between {i} and {j}: {similarity:.10f}"
  456. if context.debug:
  457. print(f"{msg}")
  458. assert np.isclose(similarity, 1.0, rtol=1e-05, atol=1e-08, equal_nan=False), msg
  459. @step('embeddings are generated')
  460. def step_assert_embeddings(context):
  461. assert context.n_prompts == len(context.embeddings), (f"unexpected response:\n"
  462. f"context.n_prompts={context.n_prompts}\n"
  463. f"context.embeddings={context.embeddings}")
  464. for embedding in context.embeddings:
  465. assert_embeddings(embedding)
  466. @step('an OAI compatible embeddings computation request for')
  467. @async_run_until_complete
  468. async def step_oai_compute_embeddings(context):
  469. context.n_prompts = 1
  470. context.embeddings = await request_oai_embeddings(context_text(context), None,
  471. base_url=context.base_url,
  472. user_api_key=context.user_api_key,
  473. model=context.model)
  474. @step('an OAI compatible embeddings computation request for multiple inputs')
  475. @async_run_until_complete
  476. async def step_oai_compute_embeddings_multiple_inputs(context):
  477. context.embeddings = await request_oai_embeddings(context.prompts, None,
  478. base_url=context.base_url,
  479. user_api_key=context.user_api_key,
  480. model=context.model)
  481. context.prompts.clear()
  482. @step('concurrent embedding requests')
  483. @async_run_until_complete()
  484. async def step_concurrent_embedding_requests(context):
  485. await concurrent_requests(context,
  486. request_embedding,
  487. # prompt is inserted automatically
  488. base_url=context.base_url)
  489. @step('concurrent OAI embedding requests')
  490. @async_run_until_complete()
  491. async def step_concurrent_oai_embedding_requests(context):
  492. await concurrent_requests(context,
  493. request_oai_embeddings,
  494. # prompt is inserted automatically
  495. base_url=context.base_url,
  496. async_client=True,
  497. model=context.model)
  498. @step('all embeddings are generated')
  499. @async_run_until_complete()
  500. async def all_embeddings_are_generated(context):
  501. n_embedding_requests = await gather_tasks_results(context)
  502. assert n_embedding_requests == context.n_prompts
  503. for i in range(n_embedding_requests):
  504. assert_embeddings(context.tasks_result.pop().pop())
  505. @step('tokenizing')
  506. @async_run_until_complete
  507. async def step_tokenize(context):
  508. context.tokenized_text = context_text(context)
  509. async with aiohttp.ClientSession() as session:
  510. async with session.post(f'{context.base_url}/tokenize',
  511. json={
  512. "content": context.tokenized_text,
  513. }) as response:
  514. assert response.status == 200
  515. tokenize_json = await response.json()
  516. context.tokens = tokenize_json['tokens']
  517. @step('tokens can be detokenize')
  518. @async_run_until_complete
  519. async def step_detokenize(context):
  520. assert len(context.tokens) > 0
  521. async with aiohttp.ClientSession() as session:
  522. async with session.post(f'{context.base_url}/detokenize',
  523. json={
  524. "tokens": context.tokens,
  525. }) as response:
  526. assert response.status == 200
  527. detokenize_json = await response.json()
  528. # SPM tokenizer adds a whitespace prefix: https://github.com/google/sentencepiece/issues/15
  529. assert context.tokenized_text == detokenize_json['content'].strip()
  530. @step('an OPTIONS request is sent from {origin}')
  531. @async_run_until_complete
  532. async def step_options_request(context, origin):
  533. async with aiohttp.ClientSession() as session:
  534. headers = {'Authorization': f'Bearer {context.user_api_key}', 'Origin': origin}
  535. async with session.options(f'{context.base_url}/v1/chat/completions',
  536. headers=headers) as response:
  537. assert response.status == 200
  538. context.options_response = response
  539. @step('CORS header {cors_header} is set to {cors_header_value}')
  540. def step_check_options_header_value(context, cors_header, cors_header_value):
  541. assert context.options_response.headers[cors_header] == cors_header_value
  542. @step('prometheus metrics are exposed')
  543. @async_run_until_complete
  544. async def step_prometheus_metrics_exported(context):
  545. async with aiohttp.ClientSession() as session:
  546. async with await session.get(f'{context.base_url}/metrics') as metrics_response:
  547. assert metrics_response.status == 200
  548. assert metrics_response.headers['Content-Type'] == "text/plain; version=0.0.4"
  549. metrics_raw = await metrics_response.text()
  550. metric_exported = False
  551. if context.debug:
  552. print(f"/metrics answer:\n{metrics_raw}")
  553. context.metrics = {}
  554. for metric in parser.text_string_to_metric_families(metrics_raw):
  555. match metric.name:
  556. case "llamacpp:kv_cache_usage_ratio":
  557. assert len(metric.samples) > 0
  558. metric_exported = True
  559. context.metrics[metric.name] = metric
  560. assert int(metrics_response.headers["Process-Start-Time-Unix"]) > 0, "no header process start time"
  561. assert metric_exported, "No metrics exported"
  562. @step('metric {metric_name} is {metric_value:d}')
  563. def step_assert_metric_value(context, metric_name, metric_value):
  564. if metric_name not in context.metrics:
  565. assert False, f"no metric {metric_name} in {context.metrics.keys()}"
  566. assert context.metrics[metric_name].samples[0].value == metric_value, f"metric: {context.metrics[metric_name]}"
  567. @step('available models')
  568. def step_available_models(context):
  569. # openai client always expects an api_key
  570. openai.api_key = context.user_api_key if context.user_api_key is not None else 'nope'
  571. openai.api_base = f'{context.base_url}/v1'
  572. context.models = openai.Model.list().data
  573. @step('{n_model:d} models are supported')
  574. def step_supported_models(context, n_model):
  575. if context.debug:
  576. print("server models available:", context.models)
  577. assert len(context.models) == n_model
  578. @step('model {i_model:d} is {param} {preposition} {param_value}')
  579. def step_supported_models(context, i_model, param, preposition, param_value):
  580. assert i_model < len(context.models)
  581. model = context.models[i_model]
  582. param_value = param_value.split(' ', 1)[0]
  583. match param:
  584. case 'identified':
  585. value = model.id
  586. case 'trained':
  587. value = str(model.meta.n_ctx_train)
  588. case _:
  589. assert False, "param {param} not supported"
  590. assert param_value == value, f"model param {param} {value} != {param_value}"
  591. async def concurrent_requests(context, f_completion, *args, **kwargs):
  592. context.n_prompts = len(context.prompts)
  593. if context.debug:
  594. print(f"starting {context.n_prompts} concurrent completion requests...")
  595. assert context.n_prompts > 0
  596. seeds = await completions_seed(context)
  597. for prompt_no in range(context.n_prompts):
  598. shifted_args = [context.prompts.pop(), seeds[prompt_no], *args]
  599. context.concurrent_tasks.append(asyncio.create_task(f_completion(*shifted_args, **kwargs)))
  600. await asyncio.sleep(0.1)
  601. @step('the slot {slot_id:d} is saved with filename "{filename}"')
  602. @async_run_until_complete
  603. async def step_save_slot(context, slot_id, filename):
  604. async with aiohttp.ClientSession() as session:
  605. async with session.post(f'{context.base_url}/slots/{slot_id}?action=save',
  606. json={"filename": filename},
  607. headers={"Content-Type": "application/json"}) as response:
  608. context.response = response
  609. @step('the slot {slot_id:d} is restored with filename "{filename}"')
  610. @async_run_until_complete
  611. async def step_restore_slot(context, slot_id, filename):
  612. async with aiohttp.ClientSession() as session:
  613. async with session.post(f'{context.base_url}/slots/{slot_id}?action=restore',
  614. json={"filename": filename},
  615. headers={"Content-Type": "application/json"}) as response:
  616. context.response = response
  617. @step('the slot {slot_id:d} is erased')
  618. @async_run_until_complete
  619. async def step_erase_slot(context, slot_id):
  620. async with aiohttp.ClientSession() as session:
  621. async with session.post(f'{context.base_url}/slots/{slot_id}?action=erase',
  622. headers={"Content-Type": "application/json"}) as response:
  623. context.response = response
  624. @step('the server responds with status code {status_code:d}')
  625. def step_server_responds_with_status_code(context, status_code):
  626. assert context.response.status == status_code
  627. async def request_completion(prompt,
  628. seed,
  629. base_url,
  630. debug=False,
  631. prompt_prefix=None,
  632. prompt_suffix=None,
  633. n_predict=None,
  634. cache_prompt=False,
  635. id_slot=None,
  636. expect_api_error=None,
  637. user_api_key=None,
  638. temperature=None):
  639. if debug:
  640. print(f"Sending completion request: {prompt}")
  641. origin = "my.super.domain"
  642. headers = {
  643. 'Origin': origin
  644. }
  645. if user_api_key is not None:
  646. if debug:
  647. print(f"Set user_api_key: {user_api_key}")
  648. headers['Authorization'] = f'Bearer {user_api_key}'
  649. async with aiohttp.ClientSession() as session:
  650. async with session.post(f'{base_url}/completion',
  651. json={
  652. "input_prefix": prompt_prefix,
  653. "prompt": prompt,
  654. "input_suffix": prompt_suffix,
  655. "n_predict": n_predict if n_predict is not None else -1,
  656. "cache_prompt": cache_prompt,
  657. "id_slot": id_slot,
  658. "seed": seed if seed is not None else 42,
  659. "temperature": temperature if temperature is not None else "0.8f",
  660. },
  661. headers=headers,
  662. timeout=3600) as response:
  663. if expect_api_error is None or not expect_api_error:
  664. assert response.status == 200
  665. assert response.headers['Access-Control-Allow-Origin'] == origin
  666. return await response.json()
  667. else:
  668. return response.status
  669. async def oai_chat_completions(user_prompt,
  670. seed,
  671. system_prompt,
  672. base_url,
  673. base_path,
  674. async_client,
  675. debug=False,
  676. model=None,
  677. n_predict=None,
  678. enable_streaming=None,
  679. response_format=None,
  680. user_api_key=None,
  681. expect_api_error=None):
  682. if debug:
  683. print(f"Sending OAI Chat completions request: {user_prompt}")
  684. # openai client always expects an api key
  685. user_api_key = user_api_key if user_api_key is not None else 'nope'
  686. seed = seed if seed is not None else 42
  687. enable_streaming = enable_streaming if enable_streaming is not None else False
  688. payload = {
  689. "messages": [
  690. {
  691. "role": "system",
  692. "content": system_prompt,
  693. },
  694. {
  695. "role": "user",
  696. "content": user_prompt,
  697. }
  698. ],
  699. "model": model,
  700. "max_tokens": n_predict,
  701. "stream": enable_streaming,
  702. "seed": seed
  703. }
  704. if response_format is not None:
  705. payload['response_format'] = response_format
  706. completion_response = {
  707. 'content': '',
  708. 'timings': {
  709. 'predicted_n': 0,
  710. 'prompt_n': 0
  711. }
  712. }
  713. if async_client:
  714. origin = 'llama.cpp'
  715. headers = {'Authorization': f'Bearer {user_api_key}', 'Origin': origin}
  716. async with aiohttp.ClientSession() as session:
  717. async with session.post(f'{base_url}{base_path}',
  718. json=payload,
  719. headers=headers) as response:
  720. if enable_streaming:
  721. assert response.status == 200
  722. assert response.headers['Access-Control-Allow-Origin'] == origin
  723. assert response.headers['Content-Type'] == "text/event-stream"
  724. event_received = True
  725. while event_received:
  726. event_received = False
  727. async for line_in_bytes in response.content:
  728. line = line_in_bytes.decode('utf8')
  729. line = line.rstrip('\n').rstrip('\r')
  730. if line == '':
  731. continue
  732. event_data = line.split(': ', 1)
  733. assert event_data[0] == 'data', f'Bad event code received: ```{event_data}```'
  734. chunk_raw = event_data[1]
  735. chunk = json.loads(chunk_raw)
  736. assert len(chunk['choices']) == 1, f"no choices provided, line ```{line}```"
  737. delta = chunk['choices'][0]['delta']
  738. if 'content' in delta:
  739. completion_response['content'] += delta['content']
  740. completion_response['timings']['predicted_n'] += 1
  741. else:
  742. if expect_api_error is None or not expect_api_error:
  743. assert response.status == 200
  744. assert response.headers['Access-Control-Allow-Origin'] == origin
  745. assert response.headers['Content-Type'] == "application/json; charset=utf-8"
  746. chat_completion_raw = await response.json()
  747. completion_response = {
  748. 'content': chat_completion_raw['choices'][0]['message'],
  749. 'timings': {
  750. 'predicted_n': chat_completion_raw['usage']['completion_tokens'],
  751. 'prompt_n': chat_completion_raw['usage']['prompt_tokens']
  752. }
  753. }
  754. else:
  755. return response.status
  756. else:
  757. try:
  758. openai.api_key = user_api_key
  759. openai.api_base = f'{base_url}{base_path}'
  760. chat_completion = openai.Completion.create(
  761. messages=payload['messages'],
  762. model=model,
  763. max_tokens=n_predict,
  764. stream=enable_streaming,
  765. response_format=payload.get('response_format'),
  766. seed=seed
  767. )
  768. except openai.error.AuthenticationError as e:
  769. if expect_api_error is not None and expect_api_error:
  770. return 401
  771. else:
  772. assert False, f'error raised: {e}'
  773. if enable_streaming:
  774. for chunk in chat_completion:
  775. assert len(chunk.choices) == 1
  776. delta = chunk.choices[0].delta
  777. if 'content' in delta:
  778. completion_response['content'] += delta['content']
  779. completion_response['timings']['predicted_n'] += 1
  780. completion_response['truncated'] = chunk.choices[0].finish_reason != 'stop'
  781. else:
  782. assert len(chat_completion.choices) == 1
  783. completion_response = {
  784. 'content': chat_completion.choices[0].message.content,
  785. 'timings': {
  786. 'predicted_n': chat_completion.usage.completion_tokens,
  787. 'prompt_n': chat_completion.usage.prompt_tokens
  788. },
  789. 'truncated': chat_completion.choices[0].finish_reason != 'stop'
  790. }
  791. if debug:
  792. print("OAI response formatted to llama.cpp:", completion_response)
  793. return completion_response
  794. async def request_embedding(content, seed, base_url=None):
  795. async with aiohttp.ClientSession() as session:
  796. async with session.post(f'{base_url}/embedding',
  797. json={
  798. "content": content,
  799. }) as response:
  800. assert response.status == 200
  801. response_json = await response.json()
  802. return [response_json['embedding']]
  803. async def request_oai_embeddings(input, seed,
  804. base_url=None, user_api_key=None,
  805. model=None, async_client=False):
  806. # openai client always expects an api_key
  807. user_api_key = user_api_key if user_api_key is not None else 'nope'
  808. if async_client:
  809. origin = 'llama.cpp'
  810. headers=[]
  811. if user_api_key is not None:
  812. headers = {'Authorization': f'Bearer {user_api_key}', 'Origin': origin}
  813. async with aiohttp.ClientSession() as session:
  814. async with session.post(f'{base_url}/v1/embeddings',
  815. json={
  816. "input": input,
  817. "model": model,
  818. },
  819. headers=headers,
  820. timeout=3600) as response:
  821. assert response.status == 200, f"received status code not expected: {response.status}"
  822. assert response.headers['Access-Control-Allow-Origin'] == origin
  823. assert response.headers['Content-Type'] == "application/json; charset=utf-8"
  824. response_json = await response.json()
  825. assert response_json['model'] == model, f"invalid model received: {response_json['model']}"
  826. assert response_json['object'] == 'list'
  827. if isinstance(input, collections.abc.Sequence):
  828. embeddings = []
  829. for an_oai_embeddings in response_json['data']:
  830. embeddings.append(an_oai_embeddings['embedding'])
  831. else:
  832. embeddings = [response_json['data']['embedding']]
  833. return embeddings
  834. else:
  835. openai.api_key = user_api_key
  836. openai.api_base = f'{base_url}/v1'
  837. oai_embeddings = openai.Embedding.create(
  838. model=model,
  839. input=input,
  840. )
  841. if isinstance(input, collections.abc.Sequence):
  842. embeddings = []
  843. for an_oai_embeddings in oai_embeddings.data:
  844. embeddings.append(an_oai_embeddings.embedding)
  845. else:
  846. embeddings = [oai_embeddings.data.embedding]
  847. return embeddings
  848. def assert_n_tokens_predicted(completion_response, expected_predicted_n=None, re_content=None):
  849. content = completion_response['content']
  850. n_predicted = completion_response['timings']['predicted_n']
  851. assert len(content) > 0, "no token predicted"
  852. if re_content is not None:
  853. p = re.compile(re_content, flags=RegexFlag.IGNORECASE | RegexFlag.MULTILINE | RegexFlag.DOTALL)
  854. matches = p.finditer(content)
  855. last_match = 0
  856. highlighted = ''
  857. for match in matches:
  858. start, end = match.span()
  859. highlighted += content[last_match: start]
  860. highlighted += '\x1b[33m'
  861. highlighted += content[start: end]
  862. highlighted += '\x1b[0m'
  863. last_match = end
  864. highlighted += content[last_match:]
  865. if 'DEBUG' in os.environ and os.environ['DEBUG'] == 'ON':
  866. print(f"Checking completion response: {highlighted}")
  867. assert last_match > 0, f'/{re_content}/ must match ```{highlighted}```'
  868. if expected_predicted_n and expected_predicted_n > 0:
  869. assert n_predicted == expected_predicted_n, (f'invalid number of tokens predicted:'
  870. f' {n_predicted} <> {expected_predicted_n}')
  871. def assert_all_predictions_equal(completion_responses):
  872. if 'DEBUG' in os.environ and os.environ['DEBUG'] == 'ON':
  873. for i, response_i in enumerate(completion_responses):
  874. content_i = response_i['content']
  875. print(f"content {i}: {content_i}")
  876. for i, response_i in enumerate(completion_responses):
  877. content_i = response_i['content']
  878. for j, response_j in enumerate(completion_responses):
  879. if i == j:
  880. continue
  881. content_j = response_j['content']
  882. assert content_i == content_j, "contents not equal"
  883. def assert_all_predictions_different(completion_responses):
  884. if 'DEBUG' in os.environ and os.environ['DEBUG'] == 'ON':
  885. for i, response_i in enumerate(completion_responses):
  886. content_i = response_i['content']
  887. print(f"content {i}: {content_i}")
  888. for i, response_i in enumerate(completion_responses):
  889. content_i = response_i['content']
  890. for j, response_j in enumerate(completion_responses):
  891. if i == j:
  892. continue
  893. content_j = response_j['content']
  894. assert content_i != content_j, "contents not different"
  895. async def gather_tasks_results(context):
  896. n_tasks = len(context.concurrent_tasks)
  897. if context.debug:
  898. print(f"Waiting for all {n_tasks} tasks results...")
  899. for task_no in range(n_tasks):
  900. context.tasks_result.append(await context.concurrent_tasks.pop())
  901. n_completions = len(context.tasks_result)
  902. return n_completions
  903. async def wait_for_health_status(context,
  904. base_url,
  905. expected_http_status_code,
  906. expected_health_status,
  907. timeout=3,
  908. params=None,
  909. slots_idle=None,
  910. slots_processing=None,
  911. expected_slots=None):
  912. if context.debug:
  913. print(f"Starting checking for health for expected_health_status={expected_health_status}")
  914. interval = 0.5
  915. counter = 0
  916. if 'GITHUB_ACTIONS' in os.environ:
  917. timeout *= 2
  918. async with aiohttp.ClientSession() as session:
  919. while True:
  920. async with await session.get(f'{base_url}/health', params=params) as health_response:
  921. status_code = health_response.status
  922. health = await health_response.json()
  923. if context.debug:
  924. print(f"HEALTH - response for expected health status='{expected_health_status}' on "
  925. f"'{base_url}/health'?{params} is {health}\n")
  926. if (status_code == expected_http_status_code
  927. and health['status'] == expected_health_status
  928. and (slots_idle is None or health['slots_idle'] == slots_idle)
  929. and (slots_processing is None or health['slots_processing'] == slots_processing)):
  930. if expected_slots is not None:
  931. assert_slots_status(health['slots'], expected_slots)
  932. return
  933. if (status_code == expected_http_status_code
  934. and health['status'] == expected_health_status
  935. and (slots_idle is None or health['slots_idle'] == slots_idle)
  936. and (slots_processing is None or health['slots_processing'] == slots_processing)):
  937. if expected_slots is not None:
  938. assert_slots_status(health['slots'], expected_slots)
  939. return
  940. await asyncio.sleep(interval)
  941. counter += interval
  942. if counter >= timeout:
  943. # Sometimes health requests are triggered after completions are predicted
  944. if expected_http_status_code == 503:
  945. if len(context.tasks_result) == 0:
  946. print("\x1b[5;37;43mWARNING: forcing concurrent tasks,"
  947. " busy health check missed, probably too fast inference\x1b[0m\n")
  948. n_completions = await gather_tasks_results(context)
  949. if n_completions > 0:
  950. return
  951. assert False, f'{expected_health_status} timeout exceeded {counter}s>={timeout}'
  952. def assert_embeddings(embeddings):
  953. assert len(embeddings) > 0
  954. embeddings_computed = False
  955. for emb in embeddings:
  956. if not isinstance(emb, float):
  957. assert False, f"Bad embeddings: {embeddings}"
  958. if emb != 0:
  959. embeddings_computed = True
  960. assert embeddings_computed, f"Embeddings: {embeddings}"
  961. async def request_slots_status(context, expected_slots):
  962. async with aiohttp.ClientSession() as session:
  963. async with await session.get(f'{context.base_url}/slots') as slots_response:
  964. assert slots_response.status == 200
  965. slots = await slots_response.json()
  966. assert_slots_status(slots, expected_slots)
  967. def assert_slots_status(slots, expected_slots):
  968. assert len(slots) == len(expected_slots)
  969. for slot_id, (expected, slot) in enumerate(zip(expected_slots, slots)):
  970. for key in expected:
  971. assert expected[key] == slot[key], (f"invalid slot {slot_id}"
  972. f" expected[{key}] != slot[{key}]"
  973. f" = {expected[key]} != {slot[key]}")
  974. async def completions_seed(context, num_seeds=None):
  975. if hasattr(context, "seed") and context.seed is not None:
  976. assert len(context.seed) == context.n_prompts
  977. if num_seeds is None:
  978. num_seeds = context.n_prompts
  979. assert num_seeds <= context.n_prompts
  980. seeds = context.seed[:num_seeds]
  981. context.seed = context.seed[num_seeds:] if num_seeds < context.n_prompts else None
  982. return seeds
  983. if hasattr(context, "server_seed") and context.server_seed is not None:
  984. if num_seeds is None:
  985. return [context.server_seed] * context.n_prompts
  986. else:
  987. return [context.server_seed] * num_seeds
  988. return None
  989. def context_text(context):
  990. return context.text.replace('\r', '')
  991. def start_server_background(context):
  992. if os.name == 'nt':
  993. context.server_path = '../../../build/bin/Release/server.exe'
  994. else:
  995. context.server_path = '../../../build/bin/server'
  996. if 'LLAMA_SERVER_BIN_PATH' in os.environ:
  997. context.server_path = os.environ['LLAMA_SERVER_BIN_PATH']
  998. server_listen_addr = context.server_fqdn
  999. server_args = [
  1000. '--host', server_listen_addr,
  1001. '--port', context.server_port,
  1002. ]
  1003. if context.model_file:
  1004. server_args.extend(['--model', context.model_file])
  1005. if context.model_url:
  1006. server_args.extend(['--model-url', context.model_url])
  1007. if context.model_hf_repo:
  1008. server_args.extend(['--hf-repo', context.model_hf_repo])
  1009. if context.model_hf_file:
  1010. server_args.extend(['--hf-file', context.model_hf_file])
  1011. if context.n_batch:
  1012. server_args.extend(['--batch-size', context.n_batch])
  1013. if context.n_ubatch:
  1014. server_args.extend(['--ubatch-size', context.n_ubatch])
  1015. if context.n_gpu_layer:
  1016. server_args.extend(['--n-gpu-layers', context.n_gpu_layer])
  1017. if context.draft is not None:
  1018. server_args.extend(['--draft', context.draft])
  1019. if context.server_continuous_batching:
  1020. server_args.append('--cont-batching')
  1021. if context.server_embeddings:
  1022. server_args.append('--embedding')
  1023. if context.server_metrics:
  1024. server_args.append('--metrics')
  1025. if context.model_alias:
  1026. server_args.extend(['--alias', context.model_alias])
  1027. if context.n_ctx:
  1028. server_args.extend(['--ctx-size', context.n_ctx])
  1029. if context.n_slots:
  1030. server_args.extend(['--parallel', context.n_slots])
  1031. if context.n_server_predict:
  1032. server_args.extend(['--n-predict', context.n_server_predict])
  1033. if context.slot_save_path:
  1034. server_args.extend(['--slot-save-path', context.slot_save_path])
  1035. if context.server_api_key:
  1036. server_args.extend(['--api-key', context.server_api_key])
  1037. if context.n_ga:
  1038. server_args.extend(['--grp-attn-n', context.n_ga])
  1039. if context.n_ga_w:
  1040. server_args.extend(['--grp-attn-w', context.n_ga_w])
  1041. if context.debug:
  1042. server_args.append('--verbose')
  1043. if 'SERVER_LOG_FORMAT_JSON' not in os.environ:
  1044. server_args.extend(['--log-format', "text"])
  1045. args = [str(arg) for arg in [context.server_path, *server_args]]
  1046. print(f"bench: starting server with: {' '.join(args)}")
  1047. flags = 0
  1048. if 'nt' == os.name:
  1049. flags |= subprocess.DETACHED_PROCESS
  1050. flags |= subprocess.CREATE_NEW_PROCESS_GROUP
  1051. flags |= subprocess.CREATE_NO_WINDOW
  1052. pkwargs = {
  1053. 'creationflags': flags,
  1054. 'stdout': subprocess.PIPE,
  1055. 'stderr': subprocess.PIPE
  1056. }
  1057. context.server_process = subprocess.Popen(
  1058. [str(arg) for arg in [context.server_path, *server_args]],
  1059. **pkwargs)
  1060. def server_log(in_stream, out_stream):
  1061. for line in iter(in_stream.readline, b''):
  1062. print(line.decode('utf-8'), end='', file=out_stream)
  1063. thread_stdout = threading.Thread(target=server_log, args=(context.server_process.stdout, sys.stdout))
  1064. thread_stdout.start()
  1065. thread_stderr = threading.Thread(target=server_log, args=(context.server_process.stderr, sys.stderr))
  1066. thread_stderr.start()
  1067. print(f"server pid={context.server_process.pid}, behave pid={os.getpid()}")