How Localmetry Works#
本文将详细介绍如何用这个库来定时采集 worldserver 游戏服务器的性能数据.
底层实现
acore_server_monitoring_measurement/localmetry.py 模块中的 WorldServerStatusMeasurement.measure_on_worldserver_ec2 方法实现了对 worldserver 游戏服务器的性能数据进行一次采集.
如果你有需要, 你完全可以将这个方法作为一个库函数在其他项目中使用.
acore_server_monitoring_measurement/localmetry.py
1# -*- coding: utf-8 -*-
2
3"""
4todo: docstring
5"""
6
7import psutil
8from acore_server_metadata.api import Server
9from acore_soap.api import gm
10import acore_server_monitoring_core.api as acore_server_monitoring_core
11
12from .utils import get_create_at_expire_at, get_server_status
13
14
15class WorldServerStatusMeasurement(
16 acore_server_monitoring_core.WorldServerStatusMeasurement,
17):
18 """
19 todo: docstring
20 """
21
22 @classmethod
23 def measure_on_worldserver_ec2(
24 cls,
25 save: bool = True,
26 ):
27 """
28 todo: docstring
29 """
30 create_at, expire_at = get_create_at_expire_at()
31 server = Server.from_ec2_inside()
32 (
33 is_ec2_exists,
34 is_rds_exists,
35 is_ec2_running,
36 is_rds_running,
37 ec2_status,
38 rds_status,
39 ) = get_server_status(server)
40
41 try:
42 soap_response = gm.ServerInfoRequest().send()
43 server_info_response = gm.ServerInfoResponse.from_soap_response(
44 soap_response
45 )
46 connected_players = server_info_response.connected_players
47 characters_in_world = server_info_response.characters_in_world
48 server_uptime = server_info_response.server_uptime
49 except Exception:
50 connected_players = None
51 characters_in_world = None
52 server_uptime = None
53
54 cpu_usage = psutil.cpu_percent(interval=1)
55 virtual_memory = psutil.virtual_memory()
56 memory_usage = virtual_memory.percent
57 total_memory = int(virtual_memory.total / 1000000)
58 available_memory = int(virtual_memory.available / 1000000)
59
60 measurement = cls(
61 series_id=f"{server.id}-{acore_server_monitoring_core.UseCaseEnum.worldserver_status.value}",
62 create_at=create_at,
63 expire_at=expire_at,
64 is_ec2_exists=is_ec2_exists,
65 is_rds_exists=is_rds_exists,
66 is_ec2_running=is_ec2_running,
67 is_rds_running=is_rds_running,
68 ec2_status=ec2_status,
69 rds_status=rds_status,
70 connected_players=connected_players,
71 characters_in_world=characters_in_world,
72 server_uptime=server_uptime,
73 cpu_usage=cpu_usage,
74 memory_usage=memory_usage,
75 total_memory=total_memory,
76 available_memory=available_memory,
77 )
78 if save:
79 measurement.save()
80 return measurement
中层封装
acore_server_monitoring_measurement/cron_job.py 实现了两个重要函数:
run_measure_worldserver_cron_job(): 采集 worldserver 的统计数据并发送到 DynamoDB table 中. 这个 cron job 主要是为了采集一段时间内的历史数据.run_log_to_ec2_tag_cron_job(): 采集 worldserver 的统计数据并将其写入到 EC2 AWS Tag. 这个 cron job 主要是为了采集实时数据并给人类看的.
Important
这两个函数只能在 worldserver 所在的 EC2 的环境中运行**, 能自动检测本机的 server_id, 定位到 DynamoDB table 或 EC2 AWS Tag.
acore_server_monitoring_measurement/cron_job.py
1# -*- coding: utf-8 -*-
2
3"""
4Implement Cron job running on worldserver EC2 instance.
5"""
6
7from pathlib import Path
8
9import pynamodb_mate.api as pm
10from simple_aws_ec2.api import EC2MetadataCache
11from acore_constants.api import TagKey
12from acore_server_metadata.api import Server
13
14from .utils import every
15from .paths import path_env_name_cache
16from .localmetry import WorldServerStatusMeasurement as Base
17
18
19def ensure_ec2_environment(): # pragma: no cover
20 """
21 Ensure all functions in this module is running inside EC2 environment.
22 """
23 if (
24 Path("/home/ubuntu").exists()
25 and Path(
26 "/home/ubuntu/git_repos/acore_server_monitoring_measurement-project/acore_server_monitoring_measurement/cron_job.py"
27 ).exists()
28 ):
29 pass
30 else:
31 raise EnvironmentError("You cannot run this outside of EC2 environment")
32
33
34def get_env_name() -> str:
35 try:
36 env_name = path_env_name_cache.read_text()
37 except FileNotFoundError: # pragma: no cover
38 server = Server.from_ec2_inside()
39 env_name = server.env_name
40 path_env_name_cache.write_text(env_name)
41 return env_name
42
43
44def run_measure_worldserver_cron_job(
45 delay: int = 300,
46 verbose: bool = True,
47):
48 """
49 Measure the worldserver status every 5 minutes.
50 """
51 if delay % 60 != 0:
52 raise ValueError("delay must be a multiple of 60")
53
54 ensure_ec2_environment()
55
56 env_name = get_env_name()
57
58 class WorldServerStatusMeasurement(Base):
59 class Meta:
60 table_name = f"wserver_infra-{env_name}-server_monitoring"
61 region = "us-east-1"
62 billing_mode = pm.constants.PAY_PER_REQUEST_BILLING_MODE
63
64 for _ in every(seconds=delay, verbose=verbose):
65 WorldServerStatusMeasurement.measure_on_worldserver_ec2()
66
67
68def run_log_to_ec2_tag_cron_job(
69 delay: int = 60,
70 verbose: bool = True,
71):
72 """
73 Put worldserver status measurement to EC2 tags every 1 minutes.
74 """
75 if delay % 60 != 0:
76 raise ValueError("delay must be a multiple of 60")
77
78 ensure_ec2_environment()
79
80 WorldServerStatusMeasurement = Base
81
82 for _ in every(seconds=delay, verbose=verbose):
83 measurement = WorldServerStatusMeasurement.measure_on_worldserver_ec2(
84 save=False
85 )
86 tags = {
87 TagKey.WORLDSERVER_MEASURE_TIME: measurement.create_at.isoformat(),
88 TagKey.WORLDSERVER_IS_RDS_EXISTS: str(measurement.is_rds_exists),
89 TagKey.WORLDSERVER_IS_RDS_RUNNING: str(measurement.is_rds_running),
90 TagKey.WORLDSERVER_RDS_STATUS: str(measurement.rds_status),
91 TagKey.WORLDSERVER_CONNECTED_PLAYERS: str(measurement.connected_players),
92 TagKey.WORLDSERVER_CHARACTERS_IN_WORLD: str(
93 measurement.characters_in_world
94 ),
95 TagKey.WORLDSERVER_SERVER_UPTIME: str(measurement.server_uptime),
96 TagKey.WORLDSERVER_CPU_USAGE: str(measurement.cpu_usage),
97 TagKey.WORLDSERVER_MEMORY_USAGE: str(measurement.memory_usage),
98 TagKey.WORLDSERVER_TOTAL_MEMORY: str(measurement.total_memory),
99 TagKey.WORLDSERVER_AVAILABLE_MEMORY: str(measurement.available_memory),
100 }
101 ec2_metadata_cache = EC2MetadataCache.load()
102 instance_id = ec2_metadata_cache.get_instance_id()
103 boto_ses = ec2_metadata_cache.get_boto_ses_from_ec2_inside()
104 ec2_client = boto_ses.client("ec2")
105 ec2_client.create_tags(
106 Resources=[instance_id],
107 Tags=[dict(Key=k, Value=v) for k, v in tags.items()],
108 )
采集脚本
cron_job/run_log_to_ec2_tag_cron_job.py 和 cron_job/run_measure_worldserver_cron_job.py 是用于在 GNU Screen 中后台运行的脚本. 它们会每隔一段时间就采集一次数据. 这两个脚本分别对应中层封装中的两个函数. 这两个脚本会在 acore_server_bootstrap 项目中的 Check server status 功能中被调用.
EC2 Init
最终这个把定时任务放在 GNU Screen session 中运行的动作也要通过随着 EC2 启动时的 cloud-init 脚本来启动. 详细原理请参考 setup_ec2_run_on_restart_script 中的文档.