How Localmetry Works#

本文将详细介绍如何用这个库来定时采集 worldserver 游戏服务器的性能数据.

底层实现

acore_server_monitoring_measurement/localmetry.py 模块中的 WorldServerStatusMeasurement.measure_on_worldserver_ec2 方法实现了对 worldserver 游戏服务器的性能数据进行一次采集.

如果你有需要, 你完全可以将这个方法作为一个库函数在其他项目中使用.

acore_server_monitoring_measurement/localmetry.py
 1# -*- coding: utf-8 -*-
 2
 3"""
 4todo: docstring
 5"""
 6
 7import psutil
 8from acore_server_metadata.api import Server
 9from acore_soap.api import gm
10import acore_server_monitoring_core.api as acore_server_monitoring_core
11
12from .utils import get_create_at_expire_at, get_server_status
13
14
15class WorldServerStatusMeasurement(
16    acore_server_monitoring_core.WorldServerStatusMeasurement,
17):
18    """
19    todo: docstring
20    """
21
22    @classmethod
23    def measure_on_worldserver_ec2(
24        cls,
25        save: bool = True,
26    ):
27        """
28        todo: docstring
29        """
30        create_at, expire_at = get_create_at_expire_at()
31        server = Server.from_ec2_inside()
32        (
33            is_ec2_exists,
34            is_rds_exists,
35            is_ec2_running,
36            is_rds_running,
37            ec2_status,
38            rds_status,
39        ) = get_server_status(server)
40
41        try:
42            soap_response = gm.ServerInfoRequest().send()
43            server_info_response = gm.ServerInfoResponse.from_soap_response(
44                soap_response
45            )
46            connected_players = server_info_response.connected_players
47            characters_in_world = server_info_response.characters_in_world
48            server_uptime = server_info_response.server_uptime
49        except Exception:
50            connected_players = None
51            characters_in_world = None
52            server_uptime = None
53
54        cpu_usage = psutil.cpu_percent(interval=1)
55        virtual_memory = psutil.virtual_memory()
56        memory_usage = virtual_memory.percent
57        total_memory = int(virtual_memory.total / 1000000)
58        available_memory = int(virtual_memory.available / 1000000)
59
60        measurement = cls(
61            series_id=f"{server.id}-{acore_server_monitoring_core.UseCaseEnum.worldserver_status.value}",
62            create_at=create_at,
63            expire_at=expire_at,
64            is_ec2_exists=is_ec2_exists,
65            is_rds_exists=is_rds_exists,
66            is_ec2_running=is_ec2_running,
67            is_rds_running=is_rds_running,
68            ec2_status=ec2_status,
69            rds_status=rds_status,
70            connected_players=connected_players,
71            characters_in_world=characters_in_world,
72            server_uptime=server_uptime,
73            cpu_usage=cpu_usage,
74            memory_usage=memory_usage,
75            total_memory=total_memory,
76            available_memory=available_memory,
77        )
78        if save:
79            measurement.save()
80        return measurement

中层封装

acore_server_monitoring_measurement/cron_job.py 实现了两个重要函数:

  • run_measure_worldserver_cron_job(): 采集 worldserver 的统计数据并发送到 DynamoDB table 中. 这个 cron job 主要是为了采集一段时间内的历史数据.

  • run_log_to_ec2_tag_cron_job(): 采集 worldserver 的统计数据并将其写入到 EC2 AWS Tag. 这个 cron job 主要是为了采集实时数据并给人类看的.

Important

这两个函数只能在 worldserver 所在的 EC2 的环境中运行**, 能自动检测本机的 server_id, 定位到 DynamoDB table 或 EC2 AWS Tag.

acore_server_monitoring_measurement/cron_job.py
  1# -*- coding: utf-8 -*-
  2
  3"""
  4Implement Cron job running on worldserver EC2 instance.
  5"""
  6
  7from pathlib import Path
  8
  9import pynamodb_mate.api as pm
 10from simple_aws_ec2.api import EC2MetadataCache
 11from acore_constants.api import TagKey
 12from acore_server_metadata.api import Server
 13
 14from .utils import every
 15from .paths import path_env_name_cache
 16from .localmetry import WorldServerStatusMeasurement as Base
 17
 18
 19def ensure_ec2_environment():  # pragma: no cover
 20    """
 21    Ensure all functions in this module is running inside EC2 environment.
 22    """
 23    if (
 24        Path("/home/ubuntu").exists()
 25        and Path(
 26            "/home/ubuntu/git_repos/acore_server_monitoring_measurement-project/acore_server_monitoring_measurement/cron_job.py"
 27        ).exists()
 28    ):
 29        pass
 30    else:
 31        raise EnvironmentError("You cannot run this outside of EC2 environment")
 32
 33
 34def get_env_name() -> str:
 35    try:
 36        env_name = path_env_name_cache.read_text()
 37    except FileNotFoundError:  # pragma: no cover
 38        server = Server.from_ec2_inside()
 39        env_name = server.env_name
 40        path_env_name_cache.write_text(env_name)
 41    return env_name
 42
 43
 44def run_measure_worldserver_cron_job(
 45    delay: int = 300,
 46    verbose: bool = True,
 47):
 48    """
 49    Measure the worldserver status every 5 minutes.
 50    """
 51    if delay % 60 != 0:
 52        raise ValueError("delay must be a multiple of 60")
 53
 54    ensure_ec2_environment()
 55
 56    env_name = get_env_name()
 57
 58    class WorldServerStatusMeasurement(Base):
 59        class Meta:
 60            table_name = f"wserver_infra-{env_name}-server_monitoring"
 61            region = "us-east-1"
 62            billing_mode = pm.constants.PAY_PER_REQUEST_BILLING_MODE
 63
 64    for _ in every(seconds=delay, verbose=verbose):
 65        WorldServerStatusMeasurement.measure_on_worldserver_ec2()
 66
 67
 68def run_log_to_ec2_tag_cron_job(
 69    delay: int = 60,
 70    verbose: bool = True,
 71):
 72    """
 73    Put worldserver status measurement to EC2 tags every 1 minutes.
 74    """
 75    if delay % 60 != 0:
 76        raise ValueError("delay must be a multiple of 60")
 77
 78    ensure_ec2_environment()
 79
 80    WorldServerStatusMeasurement = Base
 81
 82    for _ in every(seconds=delay, verbose=verbose):
 83        measurement = WorldServerStatusMeasurement.measure_on_worldserver_ec2(
 84            save=False
 85        )
 86        tags = {
 87            TagKey.WORLDSERVER_MEASURE_TIME: measurement.create_at.isoformat(),
 88            TagKey.WORLDSERVER_IS_RDS_EXISTS: str(measurement.is_rds_exists),
 89            TagKey.WORLDSERVER_IS_RDS_RUNNING: str(measurement.is_rds_running),
 90            TagKey.WORLDSERVER_RDS_STATUS: str(measurement.rds_status),
 91            TagKey.WORLDSERVER_CONNECTED_PLAYERS: str(measurement.connected_players),
 92            TagKey.WORLDSERVER_CHARACTERS_IN_WORLD: str(
 93                measurement.characters_in_world
 94            ),
 95            TagKey.WORLDSERVER_SERVER_UPTIME: str(measurement.server_uptime),
 96            TagKey.WORLDSERVER_CPU_USAGE: str(measurement.cpu_usage),
 97            TagKey.WORLDSERVER_MEMORY_USAGE: str(measurement.memory_usage),
 98            TagKey.WORLDSERVER_TOTAL_MEMORY: str(measurement.total_memory),
 99            TagKey.WORLDSERVER_AVAILABLE_MEMORY: str(measurement.available_memory),
100        }
101        ec2_metadata_cache = EC2MetadataCache.load()
102        instance_id = ec2_metadata_cache.get_instance_id()
103        boto_ses = ec2_metadata_cache.get_boto_ses_from_ec2_inside()
104        ec2_client = boto_ses.client("ec2")
105        ec2_client.create_tags(
106            Resources=[instance_id],
107            Tags=[dict(Key=k, Value=v) for k, v in tags.items()],
108        )

采集脚本

cron_job/run_log_to_ec2_tag_cron_job.pycron_job/run_measure_worldserver_cron_job.py 是用于在 GNU Screen 中后台运行的脚本. 它们会每隔一段时间就采集一次数据. 这两个脚本分别对应中层封装中的两个函数. 这两个脚本会在 acore_server_bootstrap 项目中的 Check server status 功能中被调用.

EC2 Init

最终这个把定时任务放在 GNU Screen session 中运行的动作也要通过随着 EC2 启动时的 cloud-init 脚本来启动. 详细原理请参考 setup_ec2_run_on_restart_script 中的文档.